discovery: introduce GossipSyncer sync transitions

In this commit, we introduce the ability for GossipSyncer's to
transition their sync type. This allows us to be more flexible with our
gossip syncers, as we can now prevent them from receiving new graph
updates at any time. It's now possible to transition between the
different sync types, as long as the GossipSyncer has reached its
terminal chansSynced sync state. Certain transitions require some
additional wire messages to be sent, like in the case of an ActiveSync
GossipSyncer transitioning to a PassiveSync type.
This commit is contained in:
Wilmer Paulino 2019-03-22 19:55:32 -07:00
parent acc42c1b68
commit ca4fbd598c
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
2 changed files with 309 additions and 40 deletions

@ -87,17 +87,6 @@ const (
chansSynced chansSynced
) )
const (
// DefaultMaxUndelayedQueryReplies specifies how many gossip queries we
// will respond to immediately before starting to delay responses.
DefaultMaxUndelayedQueryReplies = 10
// DefaultDelayedQueryReplyInterval is the length of time we will wait
// before responding to gossip queries after replying to
// maxUndelayedQueryReplies queries.
DefaultDelayedQueryReplyInterval = 5 * time.Second
)
// String returns a human readable string describing the target syncerState. // String returns a human readable string describing the target syncerState.
func (s syncerState) String() string { func (s syncerState) String() string {
switch s { switch s {
@ -121,6 +110,26 @@ func (s syncerState) String() string {
} }
} }
const (
// DefaultMaxUndelayedQueryReplies specifies how many gossip queries we
// will respond to immediately before starting to delay responses.
DefaultMaxUndelayedQueryReplies = 10
// DefaultDelayedQueryReplyInterval is the length of time we will wait
// before responding to gossip queries after replying to
// maxUndelayedQueryReplies queries.
DefaultDelayedQueryReplyInterval = 5 * time.Second
// chanRangeQueryBuffer is the number of blocks back that we'll go when
// asking the remote peer for their any channels they know of beyond
// our highest known channel ID.
chanRangeQueryBuffer = 144
// syncTransitionTimeout is the default timeout in which we'll wait up
// to when attempting to perform a sync transition.
syncTransitionTimeout = 5 * time.Second
)
var ( var (
// encodingTypeToChunkSize maps an encoding type, to the max number of // encodingTypeToChunkSize maps an encoding type, to the max number of
// short chan ID's using the encoding type that we can fit into a // short chan ID's using the encoding type that we can fit into a
@ -131,14 +140,22 @@ var (
// ErrGossipSyncerExiting signals that the syncer has been killed. // ErrGossipSyncerExiting signals that the syncer has been killed.
ErrGossipSyncerExiting = errors.New("gossip syncer exiting") ErrGossipSyncerExiting = errors.New("gossip syncer exiting")
// ErrSyncTransitionTimeout is an error returned when we've timed out
// attempting to perform a sync transition.
ErrSyncTransitionTimeout = errors.New("timed out attempting to " +
"transition sync type")
// zeroTimestamp is the timestamp we'll use when we want to indicate to
// peers that we do not want to receive any new graph updates.
zeroTimestamp time.Time
) )
const ( // syncTransitionReq encapsulates a request for a gossip syncer sync transition.
// chanRangeQueryBuffer is the number of blocks back that we'll go when type syncTransitionReq struct {
// asking the remote peer for their any channels they know of beyond newSyncType SyncerType
// our highest known channel ID. errChan chan error
chanRangeQueryBuffer = 144 }
)
// gossipSyncerCfg is a struct that packages all the information a GossipSyncer // gossipSyncerCfg is a struct that packages all the information a GossipSyncer
// needs to carry out its duties. // needs to carry out its duties.
@ -213,6 +230,12 @@ type GossipSyncer struct {
// determine if we've already sent out our update. // determine if we've already sent out our update.
localUpdateHorizon *lnwire.GossipTimestampRange localUpdateHorizon *lnwire.GossipTimestampRange
// syncTransitions is a channel through which new sync type transition
// requests will be sent through. These requests should only be handled
// when the gossip syncer is in a chansSynced state to ensure its state
// machine behaves as expected.
syncTransitionReqs chan *syncTransitionReq
// gossipMsgs is a channel that all messages from the target peer will // gossipMsgs is a channel that all messages from the target peer will
// be sent over. // be sent over.
gossipMsgs chan lnwire.Message gossipMsgs chan lnwire.Message
@ -265,10 +288,11 @@ func newGossipSyncer(cfg gossipSyncerCfg) *GossipSyncer {
) )
return &GossipSyncer{ return &GossipSyncer{
cfg: cfg, cfg: cfg,
rateLimiter: rateLimiter, rateLimiter: rateLimiter,
gossipMsgs: make(chan lnwire.Message, 100), syncTransitionReqs: make(chan *syncTransitionReq),
quit: make(chan struct{}), gossipMsgs: make(chan lnwire.Message, 100),
quit: make(chan struct{}),
} }
} }
@ -298,10 +322,6 @@ func (g *GossipSyncer) Stop() {
func (g *GossipSyncer) channelGraphSyncer() { func (g *GossipSyncer) channelGraphSyncer() {
defer g.wg.Done() defer g.wg.Done()
// TODO(roasbeef): also add ability to force transition back to syncing
// chans
// * needed if we want to sync chan state very few blocks?
for { for {
state := g.syncState() state := g.syncState()
syncType := g.SyncType() syncType := g.SyncType()
@ -437,25 +457,19 @@ func (g *GossipSyncer) channelGraphSyncer() {
// we want to receive real-time channel updates, we'll // we want to receive real-time channel updates, we'll
// do so now. // do so now.
if g.localUpdateHorizon == nil && syncType == ActiveSync { if g.localUpdateHorizon == nil && syncType == ActiveSync {
updateHorizon := time.Now() err := g.sendGossipTimestampRange(
log.Infof("GossipSyncer(%x): applying "+ time.Now(), math.MaxUint32,
"gossipFilter(start=%v)", )
g.cfg.peerPub[:], updateHorizon)
g.localUpdateHorizon = &lnwire.GossipTimestampRange{
ChainHash: g.cfg.chainHash,
FirstTimestamp: uint32(updateHorizon.Unix()),
TimestampRange: math.MaxUint32,
}
err := g.cfg.sendToPeer(g.localUpdateHorizon)
if err != nil { if err != nil {
log.Errorf("unable to send update "+ log.Errorf("Unable to send update "+
"horizon: %v", err) "horizon to %x: %v",
g.cfg.peerPub, err)
} }
} }
// With our horizon set, we'll simply reply to any new // With our horizon set, we'll simply reply to any new
// message and exit if needed. // messages or process any state transitions and exit if
// needed.
select { select {
case msg := <-g.gossipMsgs: case msg := <-g.gossipMsgs:
err := g.replyPeerQueries(msg) err := g.replyPeerQueries(msg)
@ -464,6 +478,9 @@ func (g *GossipSyncer) channelGraphSyncer() {
"query: %v", err) "query: %v", err)
} }
case req := <-g.syncTransitionReqs:
req.errChan <- g.handleSyncTransition(req)
case <-g.quit: case <-g.quit:
return return
} }
@ -471,6 +488,37 @@ func (g *GossipSyncer) channelGraphSyncer() {
} }
} }
// sendGossipTimestampRange constructs and sets a GossipTimestampRange for the
// syncer and sends it to the remote peer.
func (g *GossipSyncer) sendGossipTimestampRange(firstTimestamp time.Time,
timestampRange uint32) error {
endTimestamp := firstTimestamp.Add(
time.Duration(timestampRange) * time.Second,
)
log.Infof("GossipSyncer(%x): applying gossipFilter(start=%v, end=%v)",
g.cfg.peerPub[:], firstTimestamp, endTimestamp)
localUpdateHorizon := &lnwire.GossipTimestampRange{
ChainHash: g.cfg.chainHash,
FirstTimestamp: uint32(firstTimestamp.Unix()),
TimestampRange: timestampRange,
}
if err := g.cfg.sendToPeer(localUpdateHorizon); err != nil {
return err
}
if firstTimestamp == zeroTimestamp && timestampRange == 0 {
g.localUpdateHorizon = nil
} else {
g.localUpdateHorizon = localUpdateHorizon
}
return nil
}
// synchronizeChanIDs is called by the channelGraphSyncer when we need to query // synchronizeChanIDs is called by the channelGraphSyncer when we need to query
// the remote peer for its known set of channel IDs within a particular block // the remote peer for its known set of channel IDs within a particular block
// range. This method will be called continually until the entire range has // range. This method will be called continually until the entire range has
@ -978,6 +1026,85 @@ func (g *GossipSyncer) syncState() syncerState {
return syncerState(atomic.LoadUint32(&g.state)) return syncerState(atomic.LoadUint32(&g.state))
} }
// ProcessSyncTransition sends a request to the gossip syncer to transition its
// sync type to a new one.
//
// NOTE: This can only be done once the gossip syncer has reached its final
// chansSynced state.
func (g *GossipSyncer) ProcessSyncTransition(newSyncType SyncerType) error {
errChan := make(chan error, 1)
select {
case g.syncTransitionReqs <- &syncTransitionReq{
newSyncType: newSyncType,
errChan: errChan,
}:
case <-time.After(syncTransitionTimeout):
return ErrSyncTransitionTimeout
case <-g.quit:
return ErrGossipSyncerExiting
}
select {
case err := <-errChan:
return err
case <-g.quit:
return ErrGossipSyncerExiting
}
}
// handleSyncTransition handles a new sync type transition request.
//
// NOTE: The gossip syncer might have another sync state as a result of this
// transition.
func (g *GossipSyncer) handleSyncTransition(req *syncTransitionReq) error {
// Return early from any NOP sync transitions.
syncType := g.SyncType()
if syncType == req.newSyncType {
return nil
}
log.Debugf("GossipSyncer(%x): transitioning from %v to %v",
g.cfg.peerPub, syncType, req.newSyncType)
var (
firstTimestamp time.Time
timestampRange uint32
newState syncerState
)
switch req.newSyncType {
// If an active sync has been requested, then we should resume receiving
// new graph updates from the remote peer.
case ActiveSync:
firstTimestamp = time.Now()
timestampRange = math.MaxUint32
newState = syncingChans
// If a PassiveSync transition has been requested, then we should no
// longer receive any new updates from the remote peer. We can do this
// by setting our update horizon to a range in the past ensuring no
// graph updates match the timestamp range.
case PassiveSync:
firstTimestamp = zeroTimestamp
timestampRange = 0
newState = chansSynced
default:
return fmt.Errorf("unhandled sync transition %v",
req.newSyncType)
}
err := g.sendGossipTimestampRange(firstTimestamp, timestampRange)
if err != nil {
return fmt.Errorf("unable to send local update horizon: %v", err)
}
g.setSyncState(newState)
g.setSyncType(req.newSyncType)
return nil
}
// setSyncType sets the gossip syncer's sync type to the given type. // setSyncType sets the gossip syncer's sync type to the given type.
func (g *GossipSyncer) setSyncType(syncType SyncerType) { func (g *GossipSyncer) setSyncType(syncType SyncerType) {
atomic.StoreUint32(&g.syncType, uint32(syncType)) atomic.StoreUint32(&g.syncType, uint32(syncType))

@ -13,7 +13,9 @@ import (
) )
const ( const (
defaultEncoding = lnwire.EncodingSortedPlain defaultEncoding = lnwire.EncodingSortedPlain
latestKnownHeight = 1337
startHeight = latestKnownHeight - chanRangeQueryBuffer
) )
var ( var (
@ -1940,3 +1942,143 @@ func TestGossipSyncerAlreadySynced(t *testing.T) {
} }
} }
} }
// TestGossipSyncerSyncTransitions ensures that the gossip syncer properly
// carries out its duties when accepting a new sync transition request.
func TestGossipSyncerSyncTransitions(t *testing.T) {
t.Parallel()
assertMsgSent := func(t *testing.T, msgChan chan []lnwire.Message,
msg lnwire.Message) {
t.Helper()
var msgSent lnwire.Message
select {
case msgs := <-msgChan:
if len(msgs) != 1 {
t.Fatal("expected to send a single message at "+
"a time, got %d", len(msgs))
}
msgSent = msgs[0]
case <-time.After(time.Second):
t.Fatalf("expected to send %T message", msg)
}
if !reflect.DeepEqual(msgSent, msg) {
t.Fatalf("expected to send message: %v\ngot: %v",
spew.Sdump(msg), spew.Sdump(msgSent))
}
}
tests := []struct {
name string
entrySyncType SyncerType
finalSyncType SyncerType
assert func(t *testing.T, msgChan chan []lnwire.Message,
syncer *GossipSyncer)
}{
{
name: "active to passive",
entrySyncType: ActiveSync,
finalSyncType: PassiveSync,
assert: func(t *testing.T, msgChan chan []lnwire.Message,
g *GossipSyncer) {
// When transitioning from active to passive, we
// should expect to see a new local update
// horizon sent to the remote peer indicating
// that it would not like to receive any future
// updates.
assertMsgSent(t, msgChan, &lnwire.GossipTimestampRange{
FirstTimestamp: uint32(zeroTimestamp.Unix()),
TimestampRange: 0,
})
syncState := g.syncState()
if syncState != chansSynced {
t.Fatalf("expected syncerState %v, "+
"got %v", chansSynced,
syncState)
}
},
},
{
name: "passive to active",
entrySyncType: PassiveSync,
finalSyncType: ActiveSync,
assert: func(t *testing.T, msgChan chan []lnwire.Message,
g *GossipSyncer) {
// When transitioning from historical to active,
// we should expect to see a new local update
// horizon sent to the remote peer indicating
// that it would like to receive any future
// updates.
firstTimestamp := uint32(time.Now().Unix())
assertMsgSent(t, msgChan, &lnwire.GossipTimestampRange{
FirstTimestamp: firstTimestamp,
TimestampRange: math.MaxUint32,
})
// The local update horizon should be followed
// by a QueryChannelRange message sent to the
// remote peer requesting all channels it
// knows of from the highest height the syncer
// knows of.
assertMsgSent(t, msgChan, &lnwire.QueryChannelRange{
FirstBlockHeight: startHeight,
NumBlocks: math.MaxUint32 - startHeight,
})
syncState := g.syncState()
if syncState != waitingQueryRangeReply {
t.Fatalf("expected syncerState %v, "+
"got %v", waitingQueryRangeReply,
syncState)
}
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
t.Parallel()
// We'll start each test by creating our syncer. We'll
// initialize it with a state of chansSynced, as that's
// the only time when it can process sync transitions.
msgChan, syncer, _ := newTestSyncer(
lnwire.ShortChannelID{
BlockHeight: latestKnownHeight,
},
defaultEncoding, defaultChunkSize,
)
syncer.setSyncState(chansSynced)
// We'll set the initial syncType to what the test
// demands.
syncer.setSyncType(test.entrySyncType)
// We'll then start the syncer in order to process the
// request.
syncer.Start()
defer syncer.Stop()
syncer.ProcessSyncTransition(test.finalSyncType)
// The syncer should now have the expected final
// SyncerType that the test expects.
syncType := syncer.SyncType()
if syncType != test.finalSyncType {
t.Fatalf("expected syncType %v, got %v",
test.finalSyncType, syncType)
}
// Finally, we'll run a set of assertions for each test
// to ensure the syncer performed its expected duties
// after processing its sync transition.
test.assert(t, msgChan, syncer)
})
}
}