discovery: introduce GossipSyncer signal delivery of chansSynced state

In this commit, we introduce another feature to the GossipSyncer in
which it can deliver a signal to an external caller once it reaches its
terminal chansSynced state. This is yet to be used, but will serve
useful with a round-robin sync mechanism, where we wait for to finish
syncing with a specific peer before moving on to the next.
This commit is contained in:
Wilmer Paulino 2019-03-22 19:55:47 -07:00
parent 042241dc48
commit e075817e44
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
2 changed files with 79 additions and 0 deletions

@ -271,6 +271,10 @@ type GossipSyncer struct {
// number of queries. // number of queries.
rateLimiter *rate.Limiter rateLimiter *rate.Limiter
// syncedSignal is a channel that, if set, will be closed when the
// GossipSyncer reaches its terminal chansSynced state.
syncedSignal chan struct{}
sync.Mutex sync.Mutex
quit chan struct{} quit chan struct{}
@ -470,6 +474,13 @@ func (g *GossipSyncer) channelGraphSyncer() {
// This is our final terminal state where we'll only reply to // This is our final terminal state where we'll only reply to
// any further queries by the remote peer. // any further queries by the remote peer.
case chansSynced: case chansSynced:
g.Lock()
if g.syncedSignal != nil {
close(g.syncedSignal)
g.syncedSignal = nil
}
g.Unlock()
// If we haven't yet sent out our update horizon, and // If we haven't yet sent out our update horizon, and
// we want to receive real-time channel updates, we'll // we want to receive real-time channel updates, we'll
// do so now. // do so now.
@ -1049,6 +1060,24 @@ func (g *GossipSyncer) syncState() syncerState {
return syncerState(atomic.LoadUint32(&g.state)) return syncerState(atomic.LoadUint32(&g.state))
} }
// ResetSyncedSignal returns a channel that will be closed in order to serve as
// a signal for when the GossipSyncer has reached its chansSynced state.
func (g *GossipSyncer) ResetSyncedSignal() chan struct{} {
g.Lock()
defer g.Unlock()
syncedSignal := make(chan struct{})
syncState := syncerState(atomic.LoadUint32(&g.state))
if syncState == chansSynced {
close(syncedSignal)
return syncedSignal
}
g.syncedSignal = syncedSignal
return g.syncedSignal
}
// ProcessSyncTransition sends a request to the gossip syncer to transition its // ProcessSyncTransition sends a request to the gossip syncer to transition its
// sync type to a new one. // sync type to a new one.
// //

@ -2140,3 +2140,53 @@ func TestGossipSyncerHistoricalSync(t *testing.T) {
t.Fatalf("expected to send a lnwire.QueryChannelRange message") t.Fatalf("expected to send a lnwire.QueryChannelRange message")
} }
} }
// TestGossipSyncerSyncedSignal ensures that we receive a signal when a gossip
// syncer reaches its terminal chansSynced state.
func TestGossipSyncerSyncedSignal(t *testing.T) {
t.Parallel()
// We'll create a new gossip syncer and manually override its state to
// chansSynced.
_, syncer, _ := newTestSyncer(
lnwire.NewShortChanIDFromInt(10), defaultEncoding,
defaultChunkSize,
)
syncer.setSyncState(chansSynced)
// We'll go ahead and request a signal to be notified of when it reaches
// this state.
signalChan := syncer.ResetSyncedSignal()
// Starting the gossip syncer should cause the signal to be delivered.
syncer.Start()
select {
case <-signalChan:
case <-time.After(time.Second):
t.Fatal("expected to receive chansSynced signal")
}
syncer.Stop()
// We'll try this again, but this time we'll request the signal after
// the syncer is active and has already reached its chansSynced state.
_, syncer, _ = newTestSyncer(
lnwire.NewShortChanIDFromInt(10), defaultEncoding,
defaultChunkSize,
)
syncer.setSyncState(chansSynced)
syncer.Start()
defer syncer.Stop()
signalChan = syncer.ResetSyncedSignal()
// The signal should be delivered immediately.
select {
case <-signalChan:
case <-time.After(time.Second):
t.Fatal("expected to receive chansSynced signal")
}
}