discovery: add new SyncState method to gossipSyncer

This new method allows outside callers to sample the current state of
the gossipSyncer in a concurrent-safe manner. In order to achieve this,
we now only modify the g.state variable atomically.
This commit is contained in:
Olaoluwa Osuntokun 2018-04-27 16:25:12 -07:00
parent 70dffe7e99
commit 6a6700e629
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21
2 changed files with 21 additions and 13 deletions

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"math" "math"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
@ -13,7 +14,7 @@ import (
// syncerState is an enum that represents the current state of the // syncerState is an enum that represents the current state of the
// gossipSyncer. As the syncer is a state machine, we'll gate our actions // gossipSyncer. As the syncer is a state machine, we'll gate our actions
// based off of the current state and the next incoming message. // based off of the current state and the next incoming message.
type syncerState uint8 type syncerState uint32
const ( const (
// syncingChans is the default state of the gossipSyncer. We start in // syncingChans is the default state of the gossipSyncer. We start in
@ -183,7 +184,9 @@ type gossipSyncer struct {
localUpdateHorizon *lnwire.GossipTimestampRange localUpdateHorizon *lnwire.GossipTimestampRange
// state is the current state of the gossipSyncer. // state is the current state of the gossipSyncer.
state syncerState //
// NOTE: This variable MUST be used atomically.
state uint32
// gossipMsgs is a channel that all messages from the target peer will // gossipMsgs is a channel that all messages from the target peer will
// be sent over. // be sent over.
@ -252,9 +255,10 @@ func (g *gossipSyncer) channelGraphSyncer() {
// * needed if we want to sync chan state very few blocks? // * needed if we want to sync chan state very few blocks?
for { for {
log.Debugf("gossipSyncer(%x): state=%v", g.peerPub[:], g.state) state := atomic.LoadUint32(&g.state)
log.Debugf("gossipSyncer(%x): state=%v", g.peerPub[:], state)
switch g.state { switch syncerState(state) {
// When we're in this state, we're trying to synchronize our // When we're in this state, we're trying to synchronize our
// view of the network with the remote peer. We'll kick off // view of the network with the remote peer. We'll kick off
// this sync by asking them for the set of channels they // this sync by asking them for the set of channels they
@ -279,7 +283,7 @@ func (g *gossipSyncer) channelGraphSyncer() {
// With the message sent successfully, we'll transition // With the message sent successfully, we'll transition
// into the next state where we wait for their reply. // into the next state where we wait for their reply.
g.state = waitingQueryRangeReply atomic.StoreUint32(&g.state, uint32(waitingQueryRangeReply))
// In this state, we've sent out our initial channel range // In this state, we've sent out our initial channel range
// query and are waiting for the final response from the remote // query and are waiting for the final response from the remote
@ -336,13 +340,13 @@ func (g *gossipSyncer) channelGraphSyncer() {
// If this wasn't our last query, then we'll need to // If this wasn't our last query, then we'll need to
// transition to our waiting state. // transition to our waiting state.
if !done { if !done {
g.state = waitingQueryChanReply atomic.StoreUint32(&g.state, uint32(waitingQueryChanReply))
continue continue
} }
// If we're fully synchronized, then we can transition // If we're fully synchronized, then we can transition
// to our terminal state. // to our terminal state.
g.state = chansSynced atomic.StoreUint32(&g.state, uint32(chansSynced))
// In this state, we've just sent off a new query for channels // In this state, we've just sent off a new query for channels
// that we don't yet know of. We'll remain in this state until // that we don't yet know of. We'll remain in this state until
@ -359,7 +363,7 @@ func (g *gossipSyncer) channelGraphSyncer() {
// state to send of the remaining query chunks. // state to send of the remaining query chunks.
_, ok := msg.(*lnwire.ReplyShortChanIDsEnd) _, ok := msg.(*lnwire.ReplyShortChanIDsEnd)
if ok { if ok {
g.state = queryNewChannels atomic.StoreUint32(&g.state, uint32(queryNewChannels))
continue continue
} }
@ -518,14 +522,14 @@ func (g *gossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) erro
log.Infof("gossipSyncer(%x): remote peer has no new chans", log.Infof("gossipSyncer(%x): remote peer has no new chans",
g.peerPub[:]) g.peerPub[:])
g.state = chansSynced atomic.StoreUint32(&g.state, uint32(chansSynced))
return nil return nil
} }
// Otherwise, we'll set the set of channels that we need to query for // Otherwise, we'll set the set of channels that we need to query for
// the next state, and also transition our state. // the next state, and also transition our state.
g.newChansToQuery = newChans g.newChansToQuery = newChans
g.state = queryNewChannels atomic.StoreUint32(&g.state, uint32(queryNewChannels))
log.Infof("gossipSyncer(%x): starting query for %v new chans", log.Infof("gossipSyncer(%x): starting query for %v new chans",
g.peerPub[:], len(newChans)) g.peerPub[:], len(newChans))
@ -852,7 +856,6 @@ func (g *gossipSyncer) FilterGossipMsgs(msgs ...msgWithSenders) {
log.Warnf("no channel updates found for "+ log.Warnf("no channel updates found for "+
"short_chan_id=%v", "short_chan_id=%v",
msg.ShortChannelID) msg.ShortChannelID)
msgsToSend = append(msgsToSend, msg)
continue continue
} }
} }
@ -904,3 +907,8 @@ func (g *gossipSyncer) ProcessQueryMsg(msg lnwire.Message) {
return return
} }
} }
// SyncerState returns the current syncerState of the target gossipSyncer.
func (g *gossipSyncer) SyncState() syncerState {
return syncerState(atomic.LoadUint32(&g.state))
}

@ -834,7 +834,7 @@ func TestGossipSyncerProcessChanRangeReply(t *testing.T) {
t.Fatalf("unable to process reply: %v", err) t.Fatalf("unable to process reply: %v", err)
} }
if syncer.state != queryNewChannels { if syncer.SyncState() != queryNewChannels {
t.Fatalf("wrong state: expected %v instead got %v", t.Fatalf("wrong state: expected %v instead got %v",
queryNewChannels, syncer.state) queryNewChannels, syncer.state)
} }
@ -867,7 +867,7 @@ func TestGossipSyncerProcessChanRangeReply(t *testing.T) {
t.Fatalf("unable to process reply: %v", err) t.Fatalf("unable to process reply: %v", err)
} }
if syncer.state != chansSynced { if syncer.SyncState() != chansSynced {
t.Fatalf("wrong state: expected %v instead got %v", t.Fatalf("wrong state: expected %v instead got %v",
chansSynced, syncer.state) chansSynced, syncer.state)
} }