From 70be81274725983bc60239df2fc90991b54a73a9 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Fri, 22 Mar 2019 19:56:33 -0700 Subject: [PATCH] discovery+server: use new gossiper's SyncManager subsystem --- discovery/gossiper.go | 177 ++++++++++++++----------------------- discovery/gossiper_test.go | 41 +++++---- discovery/syncer.go | 2 - peer.go | 15 ++-- server.go | 37 ++++---- 5 files changed, 116 insertions(+), 156 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index b1d8c146..7e124c02 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -20,6 +20,7 @@ import ( "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/multimutex" "github.com/lightningnetwork/lnd/routing" + "github.com/lightningnetwork/lnd/ticker" ) var ( @@ -143,6 +144,28 @@ type Config struct { // TODO(roasbeef): extract ann crafting + sign from fundingMgr into // here? AnnSigner lnwallet.MessageSigner + + // NumActiveSyncers is the number of peers for which we should have + // active syncers with. After reaching NumActiveSyncers, any future + // gossip syncers will be passive. + NumActiveSyncers int + + // RotateTicker is a ticker responsible for notifying the SyncManager + // when it should rotate its active syncers. A single active syncer with + // a chansSynced state will be exchanged for a passive syncer in order + // to ensure we don't keep syncing with the same peers. + RotateTicker ticker.Ticker + + // HistoricalSyncTicker is a ticker responsible for notifying the + // syncManager when it should attempt a historical sync with a gossip + // sync peer. + HistoricalSyncTicker ticker.Ticker + + // ActiveSyncerTimeoutTicker is a ticker responsible for notifying the + // syncManager when it should attempt to start the next pending + // activeSyncer due to the current one not completing its state machine + // within the timeout. + ActiveSyncerTimeoutTicker ticker.Ticker } // AuthenticatedGossiper is a subsystem which is responsible for receiving @@ -212,13 +235,14 @@ type AuthenticatedGossiper struct { rejectMtx sync.RWMutex recentRejects map[uint64]struct{} - // peerSyncers keeps track of all the gossip syncers we're maintain for - // peers that understand this mode of operation. When we go to send out - // new updates, for all peers in the map, we'll send the messages - // directly to their gossiper, rather than broadcasting them. With this - // change, we ensure we filter out all updates properly. - syncerMtx sync.RWMutex - peerSyncers map[routing.Vertex]*GossipSyncer + // syncMgr is a subsystem responsible for managing the gossip syncers + // for peers currently connected. When a new peer is connected, the + // manager will create its accompanying gossip syncer and determine + // whether it should have an activeSync or passiveSync sync type based + // on how many other gossip syncers are currently active. Any activeSync + // gossip syncers are started in a round-robin manner to ensure we're + // not syncing with multiple peers at the same time. + syncMgr *SyncManager // reliableSender is a subsystem responsible for handling reliable // message send requests to peers. This should only be used for channels @@ -243,7 +267,14 @@ func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper { prematureChannelUpdates: make(map[uint64][]*networkMsg), channelMtx: multimutex.NewMutex(), recentRejects: make(map[uint64]struct{}), - peerSyncers: make(map[routing.Vertex]*GossipSyncer), + syncMgr: newSyncManager(&SyncManagerCfg{ + ChainHash: cfg.ChainHash, + ChanSeries: cfg.ChanSeries, + RotateTicker: cfg.RotateTicker, + HistoricalSyncTicker: cfg.HistoricalSyncTicker, + ActiveSyncerTimeoutTicker: cfg.ActiveSyncerTimeoutTicker, + NumActiveSyncers: cfg.NumActiveSyncers, + }), } gossiper.reliableSender = newReliableSender(&reliableSenderCfg{ @@ -419,6 +450,8 @@ func (d *AuthenticatedGossiper) Start() error { return err } + d.syncMgr.Start() + d.wg.Add(1) go d.networkHandler() @@ -435,11 +468,7 @@ func (d *AuthenticatedGossiper) Stop() { d.blockEpochs.Cancel() - d.syncerMtx.RLock() - for _, syncer := range d.peerSyncers { - syncer.Stop() - } - d.syncerMtx.RUnlock() + d.syncMgr.Stop() close(d.quit) d.wg.Wait() @@ -471,12 +500,12 @@ func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message, *lnwire.ReplyChannelRange, *lnwire.ReplyShortChanIDsEnd: - syncer, err := d.findGossipSyncer(peer.IdentityKey()) - if err != nil { - log.Warnf("Unable to find gossip syncer for "+ - "peer=%x: %v", peer.PubKey(), err) + syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey()) + if !ok { + log.Warnf("Gossip syncer for peer=%x not found", + peer.PubKey()) - errChan <- err + errChan <- ErrGossipSyncerNotFound return errChan } @@ -490,22 +519,20 @@ func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message, // If a peer is updating its current update horizon, then we'll dispatch // that directly to the proper GossipSyncer. case *lnwire.GossipTimestampRange: - syncer, err := d.findGossipSyncer(peer.IdentityKey()) - if err != nil { - log.Warnf("Unable to find gossip syncer for "+ - "peer=%x: %v", peer.PubKey(), err) + syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey()) + if !ok { + log.Warnf("Gossip syncer for peer=%x not found", + peer.PubKey()) - errChan <- err + errChan <- ErrGossipSyncerNotFound return errChan } // If we've found the message target, then we'll dispatch the // message directly to it. - err = syncer.ApplyGossipFilter(m) - if err != nil { - log.Warnf("unable to apply gossip "+ - "filter for peer=%x: %v", - peer.PubKey(), err) + if err := syncer.ApplyGossipFilter(m); err != nil { + log.Warnf("Unable to apply gossip filter for peer=%x: "+ + "%v", peer.PubKey(), err) errChan <- err return errChan @@ -812,28 +839,6 @@ func (d *deDupedAnnouncements) Emit() []msgWithSenders { return msgs } -// findGossipSyncer is a utility method used by the gossiper to locate the -// gossip syncer for an inbound message so we can properly dispatch the -// incoming message. If a gossip syncer isn't found, then one will be created -// for the target peer. -func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) ( - *GossipSyncer, error) { - - target := routing.NewVertex(pub) - - // First, we'll try to find an existing gossiper for this peer. - d.syncerMtx.RLock() - syncer, ok := d.peerSyncers[target] - d.syncerMtx.RUnlock() - - // If one exists, then we'll return it directly. - if ok { - return syncer, nil - } - - return nil, ErrGossipSyncerNotFound -} - // networkHandler is the primary goroutine that drives this service. The roles // of this goroutine includes answering queries related to the state of the // network, syncing up newly connected peers, and also periodically @@ -1028,12 +1033,7 @@ func (d *AuthenticatedGossiper) networkHandler() { // For the set of peers that have an active gossip // syncers, we'll collect their pubkeys so we can avoid // sending them the full message blast below. - d.syncerMtx.RLock() - syncerPeers := make(map[routing.Vertex]*GossipSyncer) - for peerPub, syncer := range d.peerSyncers { - syncerPeers[peerPub] = syncer - } - d.syncerMtx.RUnlock() + syncerPeers := d.syncMgr.GossipSyncers() log.Infof("Broadcasting batch of %v new announcements", len(announcementBatch)) @@ -1088,66 +1088,16 @@ func (d *AuthenticatedGossiper) networkHandler() { // InitSyncState is called by outside sub-systems when a connection is // established to a new peer that understands how to perform channel range // queries. We'll allocate a new gossip syncer for it, and start any goroutines -// needed to handle new queries. The recvUpdates bool indicates if we should -// continue to receive real-time updates from the remote peer once we've synced -// channel state. -func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer, - recvUpdates bool) { - - d.syncerMtx.Lock() - defer d.syncerMtx.Unlock() - - // If we already have a syncer, then we'll exit early as we don't want - // to override it. - nodeID := routing.Vertex(syncPeer.PubKey()) - if _, ok := d.peerSyncers[nodeID]; ok { - return - } - - log.Infof("Creating new GossipSyncer for peer=%x", nodeID[:]) - - encoding := lnwire.EncodingSortedPlain - syncer := newGossipSyncer(gossipSyncerCfg{ - chainHash: d.cfg.ChainHash, - peerPub: nodeID, - channelSeries: d.cfg.ChanSeries, - encodingType: encoding, - chunkSize: encodingTypeToChunkSize[encoding], - sendToPeer: func(msgs ...lnwire.Message) error { - return syncPeer.SendMessageLazy(false, msgs...) - }, - }) - - if !recvUpdates { - syncer.syncType = uint32(PassiveSync) - } - - d.peerSyncers[nodeID] = syncer - - syncer.Start() +// needed to handle new queries. +func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer) { + d.syncMgr.InitSyncState(syncPeer) } // PruneSyncState is called by outside sub-systems once a peer that we were // previously connected to has been disconnected. In this case we can stop the // existing GossipSyncer assigned to the peer and free up resources. -func (d *AuthenticatedGossiper) PruneSyncState(peer *btcec.PublicKey) { - d.syncerMtx.Lock() - defer d.syncerMtx.Unlock() - - log.Infof("Removing GossipSyncer for peer=%x", - peer.SerializeCompressed()) - - vertex := routing.NewVertex(peer) - syncer, ok := d.peerSyncers[vertex] - if !ok { - return - } - - syncer.Stop() - - delete(d.peerSyncers, vertex) - - return +func (d *AuthenticatedGossiper) PruneSyncState(peer routing.Vertex) { + d.syncMgr.PruneSyncState(peer) } // isRecentlyRejectedMsg returns true if we recently rejected a message, and @@ -2518,3 +2468,8 @@ func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo, return chanAnn, chanUpdate, err } + +// SyncManager returns the gossiper's SyncManager instance. +func (d *AuthenticatedGossiper) SyncManager() *SyncManager { + return d.syncMgr +} diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 2e19f9fe..066d76e2 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -27,6 +27,7 @@ import ( "github.com/lightningnetwork/lnd/lntest" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing" + "github.com/lightningnetwork/lnd/ticker" ) var ( @@ -713,12 +714,16 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) { c := make(chan struct{}) return c }, - Router: router, - TrickleDelay: trickleDelay, - RetransmitDelay: retransmitDelay, - ProofMatureDelta: proofMatureDelta, - WaitingProofStore: waitingProofStore, - MessageStore: newMockMessageStore(), + Router: router, + TrickleDelay: trickleDelay, + RetransmitDelay: retransmitDelay, + ProofMatureDelta: proofMatureDelta, + WaitingProofStore: waitingProofStore, + MessageStore: newMockMessageStore(), + RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval), + HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval), + ActiveSyncerTimeoutTicker: ticker.NewForce(DefaultActiveSyncerTimeout), + NumActiveSyncers: 3, }, nodeKeyPub1) if err := gossiper.Start(); err != nil { @@ -1447,16 +1452,20 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { // the message to the peer. ctx.gossiper.Stop() gossiper := New(Config{ - Notifier: ctx.gossiper.cfg.Notifier, - Broadcast: ctx.gossiper.cfg.Broadcast, - NotifyWhenOnline: ctx.gossiper.reliableSender.cfg.NotifyWhenOnline, - NotifyWhenOffline: ctx.gossiper.reliableSender.cfg.NotifyWhenOffline, - Router: ctx.gossiper.cfg.Router, - TrickleDelay: trickleDelay, - RetransmitDelay: retransmitDelay, - ProofMatureDelta: proofMatureDelta, - WaitingProofStore: ctx.gossiper.cfg.WaitingProofStore, - MessageStore: ctx.gossiper.cfg.MessageStore, + Notifier: ctx.gossiper.cfg.Notifier, + Broadcast: ctx.gossiper.cfg.Broadcast, + NotifyWhenOnline: ctx.gossiper.reliableSender.cfg.NotifyWhenOnline, + NotifyWhenOffline: ctx.gossiper.reliableSender.cfg.NotifyWhenOffline, + Router: ctx.gossiper.cfg.Router, + TrickleDelay: trickleDelay, + RetransmitDelay: retransmitDelay, + ProofMatureDelta: proofMatureDelta, + WaitingProofStore: ctx.gossiper.cfg.WaitingProofStore, + MessageStore: ctx.gossiper.cfg.MessageStore, + RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval), + HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval), + ActiveSyncerTimeoutTicker: ticker.NewForce(DefaultActiveSyncerTimeout), + NumActiveSyncers: 3, }, ctx.gossiper.selfKey) if err != nil { t.Fatalf("unable to recreate gossiper: %v", err) diff --git a/discovery/syncer.go b/discovery/syncer.go index 8ec322a5..1157cd4b 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -205,8 +205,6 @@ type gossipSyncerCfg struct { // filter out which messages should be sent to a remote peer based on their // update horizon. If the update horizon isn't specified, then we won't send // them any channel updates at all. -// -// TODO(roasbeef): modify to only sync from one peer at a time? type GossipSyncer struct { started sync.Once stopped sync.Once diff --git a/peer.go b/peer.go index f4f029fa..97bf5541 100644 --- a/peer.go +++ b/peer.go @@ -396,19 +396,16 @@ func (p *peer) initGossipSync() { srvrLog.Infof("Negotiated chan series queries with %x", p.pubKeyBytes[:]) - // We'll only request channel updates from the remote peer if - // its enabled in the config, or we're already getting updates - // from enough peers. - // - // TODO(roasbeef): craft s.t. we only get updates from a few - // peers - recvUpdates := cfg.NumGraphSyncPeers != 0 - // Register the this peer's for gossip syncer with the gossiper. // This is blocks synchronously to ensure the gossip syncer is // registered with the gossiper before attempting to read // messages from the remote peer. - p.server.authGossiper.InitSyncState(p, recvUpdates) + // + // TODO(wilmer): Only sync updates from non-channel peers. This + // requires an improved version of the current network + // bootstrapper to ensure we can find and connect to non-channel + // peers. + p.server.authGossiper.InitSyncState(p) // If the remote peer has the initial sync feature bit set, then we'll // being the synchronization protocol to exchange authenticated channel diff --git a/server.go b/server.go index 56615d81..5a8f6d6a 100644 --- a/server.go +++ b/server.go @@ -636,10 +636,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, return nil, fmt.Errorf("can't create router: %v", err) } - chanSeries := discovery.NewChanSeries( - s.chanDB.ChannelGraph(), - ) - + chanSeries := discovery.NewChanSeries(s.chanDB.ChannelGraph()) gossipMessageStore, err := discovery.NewMessageStore(s.chanDB) if err != nil { return nil, err @@ -650,19 +647,23 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, } s.authGossiper = discovery.New(discovery.Config{ - Router: s.chanRouter, - Notifier: s.cc.chainNotifier, - ChainHash: *activeNetParams.GenesisHash, - Broadcast: s.BroadcastMessage, - ChanSeries: chanSeries, - NotifyWhenOnline: s.NotifyWhenOnline, - NotifyWhenOffline: s.NotifyWhenOffline, - ProofMatureDelta: 0, - TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay), - RetransmitDelay: time.Minute * 30, - WaitingProofStore: waitingProofStore, - MessageStore: gossipMessageStore, - AnnSigner: s.nodeSigner, + Router: s.chanRouter, + Notifier: s.cc.chainNotifier, + ChainHash: *activeNetParams.GenesisHash, + Broadcast: s.BroadcastMessage, + ChanSeries: chanSeries, + NotifyWhenOnline: s.NotifyWhenOnline, + NotifyWhenOffline: s.NotifyWhenOffline, + ProofMatureDelta: 0, + TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay), + RetransmitDelay: time.Minute * 30, + WaitingProofStore: waitingProofStore, + MessageStore: gossipMessageStore, + AnnSigner: s.nodeSigner, + RotateTicker: ticker.New(discovery.DefaultSyncerRotationInterval), + HistoricalSyncTicker: ticker.New(discovery.DefaultHistoricalSyncInterval), + ActiveSyncerTimeoutTicker: ticker.New(discovery.DefaultActiveSyncerTimeout), + NumActiveSyncers: cfg.NumGraphSyncPeers, }, s.identityPriv.PubKey(), ) @@ -2622,7 +2623,7 @@ func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) { // We'll also inform the gossiper that this peer is no longer active, // so we don't need to maintain sync state for it any longer. - s.authGossiper.PruneSyncState(pubKey) + s.authGossiper.PruneSyncState(p.PubKey()) // Tell the switch to remove all links associated with this peer. // Passing nil as the target link indicates that all links associated