discovery+server: use new gossiper's SyncManager subsystem

This commit is contained in:
Wilmer Paulino 2019-03-22 19:56:33 -07:00
parent 80b84eef9c
commit 70be812747
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
5 changed files with 116 additions and 156 deletions

@ -20,6 +20,7 @@ import (
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/multimutex"
"github.com/lightningnetwork/lnd/routing"
"github.com/lightningnetwork/lnd/ticker"
)
var (
@ -143,6 +144,28 @@ type Config struct {
// TODO(roasbeef): extract ann crafting + sign from fundingMgr into
// here?
AnnSigner lnwallet.MessageSigner
// NumActiveSyncers is the number of peers for which we should have
// active syncers with. After reaching NumActiveSyncers, any future
// gossip syncers will be passive.
NumActiveSyncers int
// RotateTicker is a ticker responsible for notifying the SyncManager
// when it should rotate its active syncers. A single active syncer with
// a chansSynced state will be exchanged for a passive syncer in order
// to ensure we don't keep syncing with the same peers.
RotateTicker ticker.Ticker
// HistoricalSyncTicker is a ticker responsible for notifying the
// syncManager when it should attempt a historical sync with a gossip
// sync peer.
HistoricalSyncTicker ticker.Ticker
// ActiveSyncerTimeoutTicker is a ticker responsible for notifying the
// syncManager when it should attempt to start the next pending
// activeSyncer due to the current one not completing its state machine
// within the timeout.
ActiveSyncerTimeoutTicker ticker.Ticker
}
// AuthenticatedGossiper is a subsystem which is responsible for receiving
@ -212,13 +235,14 @@ type AuthenticatedGossiper struct {
rejectMtx sync.RWMutex
recentRejects map[uint64]struct{}
// peerSyncers keeps track of all the gossip syncers we're maintain for
// peers that understand this mode of operation. When we go to send out
// new updates, for all peers in the map, we'll send the messages
// directly to their gossiper, rather than broadcasting them. With this
// change, we ensure we filter out all updates properly.
syncerMtx sync.RWMutex
peerSyncers map[routing.Vertex]*GossipSyncer
// syncMgr is a subsystem responsible for managing the gossip syncers
// for peers currently connected. When a new peer is connected, the
// manager will create its accompanying gossip syncer and determine
// whether it should have an activeSync or passiveSync sync type based
// on how many other gossip syncers are currently active. Any activeSync
// gossip syncers are started in a round-robin manner to ensure we're
// not syncing with multiple peers at the same time.
syncMgr *SyncManager
// reliableSender is a subsystem responsible for handling reliable
// message send requests to peers. This should only be used for channels
@ -243,7 +267,14 @@ func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper {
prematureChannelUpdates: make(map[uint64][]*networkMsg),
channelMtx: multimutex.NewMutex(),
recentRejects: make(map[uint64]struct{}),
peerSyncers: make(map[routing.Vertex]*GossipSyncer),
syncMgr: newSyncManager(&SyncManagerCfg{
ChainHash: cfg.ChainHash,
ChanSeries: cfg.ChanSeries,
RotateTicker: cfg.RotateTicker,
HistoricalSyncTicker: cfg.HistoricalSyncTicker,
ActiveSyncerTimeoutTicker: cfg.ActiveSyncerTimeoutTicker,
NumActiveSyncers: cfg.NumActiveSyncers,
}),
}
gossiper.reliableSender = newReliableSender(&reliableSenderCfg{
@ -419,6 +450,8 @@ func (d *AuthenticatedGossiper) Start() error {
return err
}
d.syncMgr.Start()
d.wg.Add(1)
go d.networkHandler()
@ -435,11 +468,7 @@ func (d *AuthenticatedGossiper) Stop() {
d.blockEpochs.Cancel()
d.syncerMtx.RLock()
for _, syncer := range d.peerSyncers {
syncer.Stop()
}
d.syncerMtx.RUnlock()
d.syncMgr.Stop()
close(d.quit)
d.wg.Wait()
@ -471,12 +500,12 @@ func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,
*lnwire.ReplyChannelRange,
*lnwire.ReplyShortChanIDsEnd:
syncer, err := d.findGossipSyncer(peer.IdentityKey())
if err != nil {
log.Warnf("Unable to find gossip syncer for "+
"peer=%x: %v", peer.PubKey(), err)
syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
if !ok {
log.Warnf("Gossip syncer for peer=%x not found",
peer.PubKey())
errChan <- err
errChan <- ErrGossipSyncerNotFound
return errChan
}
@ -490,22 +519,20 @@ func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,
// If a peer is updating its current update horizon, then we'll dispatch
// that directly to the proper GossipSyncer.
case *lnwire.GossipTimestampRange:
syncer, err := d.findGossipSyncer(peer.IdentityKey())
if err != nil {
log.Warnf("Unable to find gossip syncer for "+
"peer=%x: %v", peer.PubKey(), err)
syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
if !ok {
log.Warnf("Gossip syncer for peer=%x not found",
peer.PubKey())
errChan <- err
errChan <- ErrGossipSyncerNotFound
return errChan
}
// If we've found the message target, then we'll dispatch the
// message directly to it.
err = syncer.ApplyGossipFilter(m)
if err != nil {
log.Warnf("unable to apply gossip "+
"filter for peer=%x: %v",
peer.PubKey(), err)
if err := syncer.ApplyGossipFilter(m); err != nil {
log.Warnf("Unable to apply gossip filter for peer=%x: "+
"%v", peer.PubKey(), err)
errChan <- err
return errChan
@ -812,28 +839,6 @@ func (d *deDupedAnnouncements) Emit() []msgWithSenders {
return msgs
}
// findGossipSyncer is a utility method used by the gossiper to locate the
// gossip syncer for an inbound message so we can properly dispatch the
// incoming message. If a gossip syncer isn't found, then one will be created
// for the target peer.
func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) (
*GossipSyncer, error) {
target := routing.NewVertex(pub)
// First, we'll try to find an existing gossiper for this peer.
d.syncerMtx.RLock()
syncer, ok := d.peerSyncers[target]
d.syncerMtx.RUnlock()
// If one exists, then we'll return it directly.
if ok {
return syncer, nil
}
return nil, ErrGossipSyncerNotFound
}
// networkHandler is the primary goroutine that drives this service. The roles
// of this goroutine includes answering queries related to the state of the
// network, syncing up newly connected peers, and also periodically
@ -1028,12 +1033,7 @@ func (d *AuthenticatedGossiper) networkHandler() {
// For the set of peers that have an active gossip
// syncers, we'll collect their pubkeys so we can avoid
// sending them the full message blast below.
d.syncerMtx.RLock()
syncerPeers := make(map[routing.Vertex]*GossipSyncer)
for peerPub, syncer := range d.peerSyncers {
syncerPeers[peerPub] = syncer
}
d.syncerMtx.RUnlock()
syncerPeers := d.syncMgr.GossipSyncers()
log.Infof("Broadcasting batch of %v new announcements",
len(announcementBatch))
@ -1088,66 +1088,16 @@ func (d *AuthenticatedGossiper) networkHandler() {
// InitSyncState is called by outside sub-systems when a connection is
// established to a new peer that understands how to perform channel range
// queries. We'll allocate a new gossip syncer for it, and start any goroutines
// needed to handle new queries. The recvUpdates bool indicates if we should
// continue to receive real-time updates from the remote peer once we've synced
// channel state.
func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer,
recvUpdates bool) {
d.syncerMtx.Lock()
defer d.syncerMtx.Unlock()
// If we already have a syncer, then we'll exit early as we don't want
// to override it.
nodeID := routing.Vertex(syncPeer.PubKey())
if _, ok := d.peerSyncers[nodeID]; ok {
return
}
log.Infof("Creating new GossipSyncer for peer=%x", nodeID[:])
encoding := lnwire.EncodingSortedPlain
syncer := newGossipSyncer(gossipSyncerCfg{
chainHash: d.cfg.ChainHash,
peerPub: nodeID,
channelSeries: d.cfg.ChanSeries,
encodingType: encoding,
chunkSize: encodingTypeToChunkSize[encoding],
sendToPeer: func(msgs ...lnwire.Message) error {
return syncPeer.SendMessageLazy(false, msgs...)
},
})
if !recvUpdates {
syncer.syncType = uint32(PassiveSync)
}
d.peerSyncers[nodeID] = syncer
syncer.Start()
// needed to handle new queries.
func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer) {
d.syncMgr.InitSyncState(syncPeer)
}
// PruneSyncState is called by outside sub-systems once a peer that we were
// previously connected to has been disconnected. In this case we can stop the
// existing GossipSyncer assigned to the peer and free up resources.
func (d *AuthenticatedGossiper) PruneSyncState(peer *btcec.PublicKey) {
d.syncerMtx.Lock()
defer d.syncerMtx.Unlock()
log.Infof("Removing GossipSyncer for peer=%x",
peer.SerializeCompressed())
vertex := routing.NewVertex(peer)
syncer, ok := d.peerSyncers[vertex]
if !ok {
return
}
syncer.Stop()
delete(d.peerSyncers, vertex)
return
func (d *AuthenticatedGossiper) PruneSyncState(peer routing.Vertex) {
d.syncMgr.PruneSyncState(peer)
}
// isRecentlyRejectedMsg returns true if we recently rejected a message, and
@ -2518,3 +2468,8 @@ func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo,
return chanAnn, chanUpdate, err
}
// SyncManager returns the gossiper's SyncManager instance.
func (d *AuthenticatedGossiper) SyncManager() *SyncManager {
return d.syncMgr
}

@ -27,6 +27,7 @@ import (
"github.com/lightningnetwork/lnd/lntest"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing"
"github.com/lightningnetwork/lnd/ticker"
)
var (
@ -713,12 +714,16 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) {
c := make(chan struct{})
return c
},
Router: router,
TrickleDelay: trickleDelay,
RetransmitDelay: retransmitDelay,
ProofMatureDelta: proofMatureDelta,
WaitingProofStore: waitingProofStore,
MessageStore: newMockMessageStore(),
Router: router,
TrickleDelay: trickleDelay,
RetransmitDelay: retransmitDelay,
ProofMatureDelta: proofMatureDelta,
WaitingProofStore: waitingProofStore,
MessageStore: newMockMessageStore(),
RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval),
HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval),
ActiveSyncerTimeoutTicker: ticker.NewForce(DefaultActiveSyncerTimeout),
NumActiveSyncers: 3,
}, nodeKeyPub1)
if err := gossiper.Start(); err != nil {
@ -1447,16 +1452,20 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
// the message to the peer.
ctx.gossiper.Stop()
gossiper := New(Config{
Notifier: ctx.gossiper.cfg.Notifier,
Broadcast: ctx.gossiper.cfg.Broadcast,
NotifyWhenOnline: ctx.gossiper.reliableSender.cfg.NotifyWhenOnline,
NotifyWhenOffline: ctx.gossiper.reliableSender.cfg.NotifyWhenOffline,
Router: ctx.gossiper.cfg.Router,
TrickleDelay: trickleDelay,
RetransmitDelay: retransmitDelay,
ProofMatureDelta: proofMatureDelta,
WaitingProofStore: ctx.gossiper.cfg.WaitingProofStore,
MessageStore: ctx.gossiper.cfg.MessageStore,
Notifier: ctx.gossiper.cfg.Notifier,
Broadcast: ctx.gossiper.cfg.Broadcast,
NotifyWhenOnline: ctx.gossiper.reliableSender.cfg.NotifyWhenOnline,
NotifyWhenOffline: ctx.gossiper.reliableSender.cfg.NotifyWhenOffline,
Router: ctx.gossiper.cfg.Router,
TrickleDelay: trickleDelay,
RetransmitDelay: retransmitDelay,
ProofMatureDelta: proofMatureDelta,
WaitingProofStore: ctx.gossiper.cfg.WaitingProofStore,
MessageStore: ctx.gossiper.cfg.MessageStore,
RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval),
HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval),
ActiveSyncerTimeoutTicker: ticker.NewForce(DefaultActiveSyncerTimeout),
NumActiveSyncers: 3,
}, ctx.gossiper.selfKey)
if err != nil {
t.Fatalf("unable to recreate gossiper: %v", err)

@ -205,8 +205,6 @@ type gossipSyncerCfg struct {
// filter out which messages should be sent to a remote peer based on their
// update horizon. If the update horizon isn't specified, then we won't send
// them any channel updates at all.
//
// TODO(roasbeef): modify to only sync from one peer at a time?
type GossipSyncer struct {
started sync.Once
stopped sync.Once

15
peer.go

@ -396,19 +396,16 @@ func (p *peer) initGossipSync() {
srvrLog.Infof("Negotiated chan series queries with %x",
p.pubKeyBytes[:])
// We'll only request channel updates from the remote peer if
// its enabled in the config, or we're already getting updates
// from enough peers.
//
// TODO(roasbeef): craft s.t. we only get updates from a few
// peers
recvUpdates := cfg.NumGraphSyncPeers != 0
// Register the this peer's for gossip syncer with the gossiper.
// This is blocks synchronously to ensure the gossip syncer is
// registered with the gossiper before attempting to read
// messages from the remote peer.
p.server.authGossiper.InitSyncState(p, recvUpdates)
//
// TODO(wilmer): Only sync updates from non-channel peers. This
// requires an improved version of the current network
// bootstrapper to ensure we can find and connect to non-channel
// peers.
p.server.authGossiper.InitSyncState(p)
// If the remote peer has the initial sync feature bit set, then we'll
// being the synchronization protocol to exchange authenticated channel

@ -636,10 +636,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
return nil, fmt.Errorf("can't create router: %v", err)
}
chanSeries := discovery.NewChanSeries(
s.chanDB.ChannelGraph(),
)
chanSeries := discovery.NewChanSeries(s.chanDB.ChannelGraph())
gossipMessageStore, err := discovery.NewMessageStore(s.chanDB)
if err != nil {
return nil, err
@ -650,19 +647,23 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
}
s.authGossiper = discovery.New(discovery.Config{
Router: s.chanRouter,
Notifier: s.cc.chainNotifier,
ChainHash: *activeNetParams.GenesisHash,
Broadcast: s.BroadcastMessage,
ChanSeries: chanSeries,
NotifyWhenOnline: s.NotifyWhenOnline,
NotifyWhenOffline: s.NotifyWhenOffline,
ProofMatureDelta: 0,
TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay),
RetransmitDelay: time.Minute * 30,
WaitingProofStore: waitingProofStore,
MessageStore: gossipMessageStore,
AnnSigner: s.nodeSigner,
Router: s.chanRouter,
Notifier: s.cc.chainNotifier,
ChainHash: *activeNetParams.GenesisHash,
Broadcast: s.BroadcastMessage,
ChanSeries: chanSeries,
NotifyWhenOnline: s.NotifyWhenOnline,
NotifyWhenOffline: s.NotifyWhenOffline,
ProofMatureDelta: 0,
TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay),
RetransmitDelay: time.Minute * 30,
WaitingProofStore: waitingProofStore,
MessageStore: gossipMessageStore,
AnnSigner: s.nodeSigner,
RotateTicker: ticker.New(discovery.DefaultSyncerRotationInterval),
HistoricalSyncTicker: ticker.New(discovery.DefaultHistoricalSyncInterval),
ActiveSyncerTimeoutTicker: ticker.New(discovery.DefaultActiveSyncerTimeout),
NumActiveSyncers: cfg.NumGraphSyncPeers,
},
s.identityPriv.PubKey(),
)
@ -2622,7 +2623,7 @@ func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) {
// We'll also inform the gossiper that this peer is no longer active,
// so we don't need to maintain sync state for it any longer.
s.authGossiper.PruneSyncState(pubKey)
s.authGossiper.PruneSyncState(p.PubKey())
// Tell the switch to remove all links associated with this peer.
// Passing nil as the target link indicates that all links associated