diff --git a/discovery/syncer.go b/discovery/syncer.go index a9818ae0..8ec322a5 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -271,6 +271,10 @@ type GossipSyncer struct { // number of queries. rateLimiter *rate.Limiter + // syncedSignal is a channel that, if set, will be closed when the + // GossipSyncer reaches its terminal chansSynced state. + syncedSignal chan struct{} + sync.Mutex quit chan struct{} @@ -470,6 +474,13 @@ func (g *GossipSyncer) channelGraphSyncer() { // This is our final terminal state where we'll only reply to // any further queries by the remote peer. case chansSynced: + g.Lock() + if g.syncedSignal != nil { + close(g.syncedSignal) + g.syncedSignal = nil + } + g.Unlock() + // If we haven't yet sent out our update horizon, and // we want to receive real-time channel updates, we'll // do so now. @@ -1049,6 +1060,24 @@ func (g *GossipSyncer) syncState() syncerState { return syncerState(atomic.LoadUint32(&g.state)) } +// ResetSyncedSignal returns a channel that will be closed in order to serve as +// a signal for when the GossipSyncer has reached its chansSynced state. +func (g *GossipSyncer) ResetSyncedSignal() chan struct{} { + g.Lock() + defer g.Unlock() + + syncedSignal := make(chan struct{}) + + syncState := syncerState(atomic.LoadUint32(&g.state)) + if syncState == chansSynced { + close(syncedSignal) + return syncedSignal + } + + g.syncedSignal = syncedSignal + return g.syncedSignal +} + // ProcessSyncTransition sends a request to the gossip syncer to transition its // sync type to a new one. // diff --git a/discovery/syncer_test.go b/discovery/syncer_test.go index 3850e364..ee5719d8 100644 --- a/discovery/syncer_test.go +++ b/discovery/syncer_test.go @@ -2140,3 +2140,53 @@ func TestGossipSyncerHistoricalSync(t *testing.T) { t.Fatalf("expected to send a lnwire.QueryChannelRange message") } } + +// TestGossipSyncerSyncedSignal ensures that we receive a signal when a gossip +// syncer reaches its terminal chansSynced state. +func TestGossipSyncerSyncedSignal(t *testing.T) { + t.Parallel() + + // We'll create a new gossip syncer and manually override its state to + // chansSynced. + _, syncer, _ := newTestSyncer( + lnwire.NewShortChanIDFromInt(10), defaultEncoding, + defaultChunkSize, + ) + syncer.setSyncState(chansSynced) + + // We'll go ahead and request a signal to be notified of when it reaches + // this state. + signalChan := syncer.ResetSyncedSignal() + + // Starting the gossip syncer should cause the signal to be delivered. + syncer.Start() + + select { + case <-signalChan: + case <-time.After(time.Second): + t.Fatal("expected to receive chansSynced signal") + } + + syncer.Stop() + + // We'll try this again, but this time we'll request the signal after + // the syncer is active and has already reached its chansSynced state. + _, syncer, _ = newTestSyncer( + lnwire.NewShortChanIDFromInt(10), defaultEncoding, + defaultChunkSize, + ) + + syncer.setSyncState(chansSynced) + + syncer.Start() + defer syncer.Stop() + + signalChan = syncer.ResetSyncedSignal() + + // The signal should be delivered immediately. + select { + case <-signalChan: + case <-time.After(time.Second): + t.Fatal("expected to receive chansSynced signal") + } +}