diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 994c447c..b1d8c146 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -1108,16 +1108,20 @@ func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer, encoding := lnwire.EncodingSortedPlain syncer := newGossipSyncer(gossipSyncerCfg{ - chainHash: d.cfg.ChainHash, - peerPub: nodeID, - syncChanUpdates: recvUpdates, - channelSeries: d.cfg.ChanSeries, - encodingType: encoding, - chunkSize: encodingTypeToChunkSize[encoding], + chainHash: d.cfg.ChainHash, + peerPub: nodeID, + channelSeries: d.cfg.ChanSeries, + encodingType: encoding, + chunkSize: encodingTypeToChunkSize[encoding], sendToPeer: func(msgs ...lnwire.Message) error { return syncPeer.SendMessageLazy(false, msgs...) }, }) + + if !recvUpdates { + syncer.syncType = uint32(PassiveSync) + } + d.peerSyncers[nodeID] = syncer syncer.Start() diff --git a/discovery/syncer.go b/discovery/syncer.go index a8eb63e5..295be2ac 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -13,6 +13,39 @@ import ( "golang.org/x/time/rate" ) +// SyncerType encapsulates the different types of syncing mechanisms for a +// gossip syncer. +type SyncerType uint8 + +const ( + // ActiveSync denotes that a gossip syncer should exercise its default + // behavior. This includes reconciling the set of missing graph updates + // with the remote peer _and_ receiving new updates from them. + ActiveSync SyncerType = iota + + // PassiveSync denotes that a gossip syncer: + // + // 1. Should not attempt to query the remote peer for graph updates. + // 2. Should respond to queries from the remote peer. + // 3. Should not receive new updates from the remote peer. + // + // They are started in a chansSynced state in order to accomplish their + // responsibilities above. + PassiveSync +) + +// String returns a human readable string describing the target SyncerType. +func (t SyncerType) String() string { + switch t { + case ActiveSync: + return "ActiveSync" + case PassiveSync: + return "PassiveSync" + default: + return fmt.Sprintf("unknown sync type %d", t) + } +} + // syncerState is an enum that represents the current state of the GossipSyncer. // As the syncer is a state machine, we'll gate our actions based off of the // current state and the next incoming message. @@ -161,6 +194,17 @@ type GossipSyncer struct { started sync.Once stopped sync.Once + // state is the current state of the GossipSyncer. + // + // NOTE: This variable MUST be used atomically. + state uint32 + + // syncType denotes the SyncerType the gossip syncer is currently + // exercising. + // + // NOTE: This variable MUST be used atomically. + syncType uint32 + // remoteUpdateHorizon is the update horizon of the remote peer. We'll // use this to properly filter out any messages. remoteUpdateHorizon *lnwire.GossipTimestampRange @@ -169,11 +213,6 @@ type GossipSyncer struct { // determine if we've already sent out our update. localUpdateHorizon *lnwire.GossipTimestampRange - // state is the current state of the GossipSyncer. - // - // NOTE: This variable MUST be used atomically. - state uint32 - // gossipMsgs is a channel that all messages from the target peer will // be sent over. gossipMsgs chan lnwire.Message @@ -264,9 +303,11 @@ func (g *GossipSyncer) channelGraphSyncer() { // * needed if we want to sync chan state very few blocks? for { - state := atomic.LoadUint32(&g.state) - log.Debugf("GossipSyncer(%x): state=%v", g.cfg.peerPub[:], - syncerState(state)) + state := g.syncState() + syncType := g.SyncType() + + log.Debugf("GossipSyncer(%x): state=%v, type=%v", + g.cfg.peerPub[:], state, syncType) switch syncerState(state) { // When we're in this state, we're trying to synchronize our @@ -293,7 +334,7 @@ func (g *GossipSyncer) channelGraphSyncer() { // With the message sent successfully, we'll transition // into the next state where we wait for their reply. - atomic.StoreUint32(&g.state, uint32(waitingQueryRangeReply)) + g.setSyncState(waitingQueryRangeReply) // In this state, we've sent out our initial channel range // query and are waiting for the final response from the remote @@ -350,13 +391,13 @@ func (g *GossipSyncer) channelGraphSyncer() { // If this wasn't our last query, then we'll need to // transition to our waiting state. if !done { - atomic.StoreUint32(&g.state, uint32(waitingQueryChanReply)) + g.setSyncState(waitingQueryChanReply) continue } // If we're fully synchronized, then we can transition // to our terminal state. - atomic.StoreUint32(&g.state, uint32(chansSynced)) + g.setSyncState(chansSynced) // In this state, we've just sent off a new query for channels // that we don't yet know of. We'll remain in this state until @@ -373,7 +414,7 @@ func (g *GossipSyncer) channelGraphSyncer() { // state to send of the remaining query chunks. _, ok := msg.(*lnwire.ReplyShortChanIDsEnd) if ok { - atomic.StoreUint32(&g.state, uint32(queryNewChannels)) + g.setSyncState(queryNewChannels) continue } @@ -395,7 +436,7 @@ func (g *GossipSyncer) channelGraphSyncer() { // If we haven't yet sent out our update horizon, and // we want to receive real-time channel updates, we'll // do so now. - if g.localUpdateHorizon == nil && g.cfg.syncChanUpdates { + if g.localUpdateHorizon == nil && syncType == ActiveSync { // TODO(roasbeef): query DB for most recent // update? @@ -524,14 +565,14 @@ func (g *GossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) erro log.Infof("GossipSyncer(%x): remote peer has no new chans", g.cfg.peerPub[:]) - atomic.StoreUint32(&g.state, uint32(chansSynced)) + g.setSyncState(chansSynced) return nil } // Otherwise, we'll set the set of channels that we need to query for // the next state, and also transition our state. g.newChansToQuery = newChans - atomic.StoreUint32(&g.state, uint32(queryNewChannels)) + g.setSyncState(queryNewChannels) log.Infof("GossipSyncer(%x): starting query for %v new chans", g.cfg.peerPub[:], len(newChans)) @@ -933,7 +974,22 @@ func (g *GossipSyncer) ProcessQueryMsg(msg lnwire.Message, peerQuit <-chan struc } } +// setSyncState sets the gossip syncer's state to the given state. +func (g *GossipSyncer) setSyncState(state syncerState) { + atomic.StoreUint32(&g.state, uint32(state)) +} + // syncState returns the current syncerState of the target GossipSyncer. func (g *GossipSyncer) syncState() syncerState { return syncerState(atomic.LoadUint32(&g.state)) } + +// setSyncType sets the gossip syncer's sync type to the given type. +func (g *GossipSyncer) setSyncType(syncType SyncerType) { + atomic.StoreUint32(&g.syncType, uint32(syncType)) +} + +// SyncType returns the current SyncerType of the target GossipSyncer. +func (g *GossipSyncer) SyncType() SyncerType { + return SyncerType(atomic.LoadUint32(&g.syncType)) +} diff --git a/discovery/syncer_test.go b/discovery/syncer_test.go index e42e3b4a..3f01c8b0 100644 --- a/discovery/syncer_test.go +++ b/discovery/syncer_test.go @@ -120,10 +120,9 @@ func newTestSyncer(hID lnwire.ShortChannelID, msgChan := make(chan []lnwire.Message, 20) cfg := gossipSyncerCfg{ - syncChanUpdates: true, - channelSeries: newMockChannelGraphTimeSeries(hID), - encodingType: encodingType, - chunkSize: chunkSize, + channelSeries: newMockChannelGraphTimeSeries(hID), + encodingType: encodingType, + chunkSize: chunkSize, sendToPeer: func(msgs ...lnwire.Message) error { msgChan <- msgs return nil