discovery: replace GossipSyncer syncChanUpdates flag with SyncerType

In this commit, we introduce a new type: SyncerType. This type denotes
the type of sync a GossipSyncer is currently under. We only introduce
the two possible entry states, ActiveSync and PassiveSync. An ActiveSync
GossipSyncer will exchange channels with the remote peer and receive new
graph updates from them, while a PassiveSync GossipSyncer will not and
will only response to the remote peer's queries.

This commit does not modify the behavior and is only meant to be a
refactor.
This commit is contained in:
Wilmer Paulino 2019-03-22 19:55:01 -07:00
parent 7e92b9a4e2
commit 8d7c0a9899
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
3 changed files with 84 additions and 25 deletions

@ -1108,16 +1108,20 @@ func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer,
encoding := lnwire.EncodingSortedPlain encoding := lnwire.EncodingSortedPlain
syncer := newGossipSyncer(gossipSyncerCfg{ syncer := newGossipSyncer(gossipSyncerCfg{
chainHash: d.cfg.ChainHash, chainHash: d.cfg.ChainHash,
peerPub: nodeID, peerPub: nodeID,
syncChanUpdates: recvUpdates, channelSeries: d.cfg.ChanSeries,
channelSeries: d.cfg.ChanSeries, encodingType: encoding,
encodingType: encoding, chunkSize: encodingTypeToChunkSize[encoding],
chunkSize: encodingTypeToChunkSize[encoding],
sendToPeer: func(msgs ...lnwire.Message) error { sendToPeer: func(msgs ...lnwire.Message) error {
return syncPeer.SendMessageLazy(false, msgs...) return syncPeer.SendMessageLazy(false, msgs...)
}, },
}) })
if !recvUpdates {
syncer.syncType = uint32(PassiveSync)
}
d.peerSyncers[nodeID] = syncer d.peerSyncers[nodeID] = syncer
syncer.Start() syncer.Start()

@ -13,6 +13,39 @@ import (
"golang.org/x/time/rate" "golang.org/x/time/rate"
) )
// SyncerType encapsulates the different types of syncing mechanisms for a
// gossip syncer.
type SyncerType uint8
const (
// ActiveSync denotes that a gossip syncer should exercise its default
// behavior. This includes reconciling the set of missing graph updates
// with the remote peer _and_ receiving new updates from them.
ActiveSync SyncerType = iota
// PassiveSync denotes that a gossip syncer:
//
// 1. Should not attempt to query the remote peer for graph updates.
// 2. Should respond to queries from the remote peer.
// 3. Should not receive new updates from the remote peer.
//
// They are started in a chansSynced state in order to accomplish their
// responsibilities above.
PassiveSync
)
// String returns a human readable string describing the target SyncerType.
func (t SyncerType) String() string {
switch t {
case ActiveSync:
return "ActiveSync"
case PassiveSync:
return "PassiveSync"
default:
return fmt.Sprintf("unknown sync type %d", t)
}
}
// syncerState is an enum that represents the current state of the GossipSyncer. // syncerState is an enum that represents the current state of the GossipSyncer.
// As the syncer is a state machine, we'll gate our actions based off of the // As the syncer is a state machine, we'll gate our actions based off of the
// current state and the next incoming message. // current state and the next incoming message.
@ -161,6 +194,17 @@ type GossipSyncer struct {
started sync.Once started sync.Once
stopped sync.Once stopped sync.Once
// state is the current state of the GossipSyncer.
//
// NOTE: This variable MUST be used atomically.
state uint32
// syncType denotes the SyncerType the gossip syncer is currently
// exercising.
//
// NOTE: This variable MUST be used atomically.
syncType uint32
// remoteUpdateHorizon is the update horizon of the remote peer. We'll // remoteUpdateHorizon is the update horizon of the remote peer. We'll
// use this to properly filter out any messages. // use this to properly filter out any messages.
remoteUpdateHorizon *lnwire.GossipTimestampRange remoteUpdateHorizon *lnwire.GossipTimestampRange
@ -169,11 +213,6 @@ type GossipSyncer struct {
// determine if we've already sent out our update. // determine if we've already sent out our update.
localUpdateHorizon *lnwire.GossipTimestampRange localUpdateHorizon *lnwire.GossipTimestampRange
// state is the current state of the GossipSyncer.
//
// NOTE: This variable MUST be used atomically.
state uint32
// gossipMsgs is a channel that all messages from the target peer will // gossipMsgs is a channel that all messages from the target peer will
// be sent over. // be sent over.
gossipMsgs chan lnwire.Message gossipMsgs chan lnwire.Message
@ -264,9 +303,11 @@ func (g *GossipSyncer) channelGraphSyncer() {
// * needed if we want to sync chan state very few blocks? // * needed if we want to sync chan state very few blocks?
for { for {
state := atomic.LoadUint32(&g.state) state := g.syncState()
log.Debugf("GossipSyncer(%x): state=%v", g.cfg.peerPub[:], syncType := g.SyncType()
syncerState(state))
log.Debugf("GossipSyncer(%x): state=%v, type=%v",
g.cfg.peerPub[:], state, syncType)
switch syncerState(state) { switch syncerState(state) {
// When we're in this state, we're trying to synchronize our // When we're in this state, we're trying to synchronize our
@ -293,7 +334,7 @@ func (g *GossipSyncer) channelGraphSyncer() {
// With the message sent successfully, we'll transition // With the message sent successfully, we'll transition
// into the next state where we wait for their reply. // into the next state where we wait for their reply.
atomic.StoreUint32(&g.state, uint32(waitingQueryRangeReply)) g.setSyncState(waitingQueryRangeReply)
// In this state, we've sent out our initial channel range // In this state, we've sent out our initial channel range
// query and are waiting for the final response from the remote // query and are waiting for the final response from the remote
@ -350,13 +391,13 @@ func (g *GossipSyncer) channelGraphSyncer() {
// If this wasn't our last query, then we'll need to // If this wasn't our last query, then we'll need to
// transition to our waiting state. // transition to our waiting state.
if !done { if !done {
atomic.StoreUint32(&g.state, uint32(waitingQueryChanReply)) g.setSyncState(waitingQueryChanReply)
continue continue
} }
// If we're fully synchronized, then we can transition // If we're fully synchronized, then we can transition
// to our terminal state. // to our terminal state.
atomic.StoreUint32(&g.state, uint32(chansSynced)) g.setSyncState(chansSynced)
// In this state, we've just sent off a new query for channels // In this state, we've just sent off a new query for channels
// that we don't yet know of. We'll remain in this state until // that we don't yet know of. We'll remain in this state until
@ -373,7 +414,7 @@ func (g *GossipSyncer) channelGraphSyncer() {
// state to send of the remaining query chunks. // state to send of the remaining query chunks.
_, ok := msg.(*lnwire.ReplyShortChanIDsEnd) _, ok := msg.(*lnwire.ReplyShortChanIDsEnd)
if ok { if ok {
atomic.StoreUint32(&g.state, uint32(queryNewChannels)) g.setSyncState(queryNewChannels)
continue continue
} }
@ -395,7 +436,7 @@ func (g *GossipSyncer) channelGraphSyncer() {
// If we haven't yet sent out our update horizon, and // If we haven't yet sent out our update horizon, and
// we want to receive real-time channel updates, we'll // we want to receive real-time channel updates, we'll
// do so now. // do so now.
if g.localUpdateHorizon == nil && g.cfg.syncChanUpdates { if g.localUpdateHorizon == nil && syncType == ActiveSync {
// TODO(roasbeef): query DB for most recent // TODO(roasbeef): query DB for most recent
// update? // update?
@ -524,14 +565,14 @@ func (g *GossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) erro
log.Infof("GossipSyncer(%x): remote peer has no new chans", log.Infof("GossipSyncer(%x): remote peer has no new chans",
g.cfg.peerPub[:]) g.cfg.peerPub[:])
atomic.StoreUint32(&g.state, uint32(chansSynced)) g.setSyncState(chansSynced)
return nil return nil
} }
// Otherwise, we'll set the set of channels that we need to query for // Otherwise, we'll set the set of channels that we need to query for
// the next state, and also transition our state. // the next state, and also transition our state.
g.newChansToQuery = newChans g.newChansToQuery = newChans
atomic.StoreUint32(&g.state, uint32(queryNewChannels)) g.setSyncState(queryNewChannels)
log.Infof("GossipSyncer(%x): starting query for %v new chans", log.Infof("GossipSyncer(%x): starting query for %v new chans",
g.cfg.peerPub[:], len(newChans)) g.cfg.peerPub[:], len(newChans))
@ -933,7 +974,22 @@ func (g *GossipSyncer) ProcessQueryMsg(msg lnwire.Message, peerQuit <-chan struc
} }
} }
// setSyncState sets the gossip syncer's state to the given state.
func (g *GossipSyncer) setSyncState(state syncerState) {
atomic.StoreUint32(&g.state, uint32(state))
}
// syncState returns the current syncerState of the target GossipSyncer. // syncState returns the current syncerState of the target GossipSyncer.
func (g *GossipSyncer) syncState() syncerState { func (g *GossipSyncer) syncState() syncerState {
return syncerState(atomic.LoadUint32(&g.state)) return syncerState(atomic.LoadUint32(&g.state))
} }
// setSyncType sets the gossip syncer's sync type to the given type.
func (g *GossipSyncer) setSyncType(syncType SyncerType) {
atomic.StoreUint32(&g.syncType, uint32(syncType))
}
// SyncType returns the current SyncerType of the target GossipSyncer.
func (g *GossipSyncer) SyncType() SyncerType {
return SyncerType(atomic.LoadUint32(&g.syncType))
}

@ -120,10 +120,9 @@ func newTestSyncer(hID lnwire.ShortChannelID,
msgChan := make(chan []lnwire.Message, 20) msgChan := make(chan []lnwire.Message, 20)
cfg := gossipSyncerCfg{ cfg := gossipSyncerCfg{
syncChanUpdates: true, channelSeries: newMockChannelGraphTimeSeries(hID),
channelSeries: newMockChannelGraphTimeSeries(hID), encodingType: encodingType,
encodingType: encodingType, chunkSize: chunkSize,
chunkSize: chunkSize,
sendToPeer: func(msgs ...lnwire.Message) error { sendToPeer: func(msgs ...lnwire.Message) error {
msgChan <- msgs msgChan <- msgs
return nil return nil