diff --git a/discovery/syncer.go b/discovery/syncer.go index 488e271f..1b441786 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -4,6 +4,7 @@ import ( "fmt" "math" "sync" + "sync/atomic" "time" "github.com/lightningnetwork/lnd/lnwire" @@ -13,7 +14,7 @@ import ( // syncerState is an enum that represents the current state of the // gossipSyncer. As the syncer is a state machine, we'll gate our actions // based off of the current state and the next incoming message. -type syncerState uint8 +type syncerState uint32 const ( // syncingChans is the default state of the gossipSyncer. We start in @@ -183,7 +184,9 @@ type gossipSyncer struct { localUpdateHorizon *lnwire.GossipTimestampRange // state is the current state of the gossipSyncer. - state syncerState + // + // NOTE: This variable MUST be used atomically. + state uint32 // gossipMsgs is a channel that all messages from the target peer will // be sent over. @@ -252,9 +255,10 @@ func (g *gossipSyncer) channelGraphSyncer() { // * needed if we want to sync chan state very few blocks? for { - log.Debugf("gossipSyncer(%x): state=%v", g.peerPub[:], g.state) + state := atomic.LoadUint32(&g.state) + log.Debugf("gossipSyncer(%x): state=%v", g.peerPub[:], state) - switch g.state { + switch syncerState(state) { // When we're in this state, we're trying to synchronize our // view of the network with the remote peer. We'll kick off // this sync by asking them for the set of channels they @@ -279,7 +283,7 @@ func (g *gossipSyncer) channelGraphSyncer() { // With the message sent successfully, we'll transition // into the next state where we wait for their reply. - g.state = waitingQueryRangeReply + atomic.StoreUint32(&g.state, uint32(waitingQueryRangeReply)) // In this state, we've sent out our initial channel range // query and are waiting for the final response from the remote @@ -336,13 +340,13 @@ func (g *gossipSyncer) channelGraphSyncer() { // If this wasn't our last query, then we'll need to // transition to our waiting state. if !done { - g.state = waitingQueryChanReply + atomic.StoreUint32(&g.state, uint32(waitingQueryChanReply)) continue } // If we're fully synchronized, then we can transition // to our terminal state. - g.state = chansSynced + atomic.StoreUint32(&g.state, uint32(chansSynced)) // In this state, we've just sent off a new query for channels // that we don't yet know of. We'll remain in this state until @@ -359,7 +363,7 @@ func (g *gossipSyncer) channelGraphSyncer() { // state to send of the remaining query chunks. _, ok := msg.(*lnwire.ReplyShortChanIDsEnd) if ok { - g.state = queryNewChannels + atomic.StoreUint32(&g.state, uint32(queryNewChannels)) continue } @@ -518,14 +522,14 @@ func (g *gossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) erro log.Infof("gossipSyncer(%x): remote peer has no new chans", g.peerPub[:]) - g.state = chansSynced + atomic.StoreUint32(&g.state, uint32(chansSynced)) return nil } // Otherwise, we'll set the set of channels that we need to query for // the next state, and also transition our state. g.newChansToQuery = newChans - g.state = queryNewChannels + atomic.StoreUint32(&g.state, uint32(queryNewChannels)) log.Infof("gossipSyncer(%x): starting query for %v new chans", g.peerPub[:], len(newChans)) @@ -852,7 +856,6 @@ func (g *gossipSyncer) FilterGossipMsgs(msgs ...msgWithSenders) { log.Warnf("no channel updates found for "+ "short_chan_id=%v", msg.ShortChannelID) - msgsToSend = append(msgsToSend, msg) continue } } @@ -904,3 +907,8 @@ func (g *gossipSyncer) ProcessQueryMsg(msg lnwire.Message) { return } } + +// SyncerState returns the current syncerState of the target gossipSyncer. +func (g *gossipSyncer) SyncState() syncerState { + return syncerState(atomic.LoadUint32(&g.state)) +} diff --git a/discovery/syncer_test.go b/discovery/syncer_test.go index 9bd1c22b..8aa6415c 100644 --- a/discovery/syncer_test.go +++ b/discovery/syncer_test.go @@ -834,7 +834,7 @@ func TestGossipSyncerProcessChanRangeReply(t *testing.T) { t.Fatalf("unable to process reply: %v", err) } - if syncer.state != queryNewChannels { + if syncer.SyncState() != queryNewChannels { t.Fatalf("wrong state: expected %v instead got %v", queryNewChannels, syncer.state) } @@ -867,7 +867,7 @@ func TestGossipSyncerProcessChanRangeReply(t *testing.T) { t.Fatalf("unable to process reply: %v", err) } - if syncer.state != chansSynced { + if syncer.SyncState() != chansSynced { t.Fatalf("wrong state: expected %v instead got %v", chansSynced, syncer.state) }