discovery: allow gossip syncer to perform historical syncs

In this commit, we introduce the ability for gossip syncers to perform
historical syncs. This allows us to reconcile any channels we're missing
that the remote peer has starting from the genesis block of the chain.
This commit serves as a prerequisite to the SyncManager, introduced in a
later commit, where we'll be able to make spot checks by performing
historical syncs with peers to ensure we have as much of the graph as
possible.
This commit is contained in:
Wilmer Paulino 2019-03-29 12:46:11 -07:00
parent ca4fbd598c
commit 042241dc48
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
2 changed files with 123 additions and 12 deletions

@ -236,6 +236,20 @@ type GossipSyncer struct {
// machine behaves as expected.
syncTransitionReqs chan *syncTransitionReq
// historicalSyncReqs is a channel that serves as a signal for the
// gossip syncer to perform a historical sync. Theese can only be done
// once the gossip syncer is in a chansSynced state to ensure its state
// machine behaves as expected.
historicalSyncReqs chan struct{}
// genHistoricalChanRangeQuery when true signals to the gossip syncer
// that it should request the remote peer for all of its known channel
// IDs starting from the genesis block of the chain. This can only
// happen if the gossip syncer receives a request to attempt a
// historical sync. It can be unset if the syncer ever transitions from
// PassiveSync to ActiveSync.
genHistoricalChanRangeQuery bool
// gossipMsgs is a channel that all messages from the target peer will
// be sent over.
gossipMsgs chan lnwire.Message
@ -291,6 +305,7 @@ func newGossipSyncer(cfg gossipSyncerCfg) *GossipSyncer {
cfg: cfg,
rateLimiter: rateLimiter,
syncTransitionReqs: make(chan *syncTransitionReq),
historicalSyncReqs: make(chan struct{}),
gossipMsgs: make(chan lnwire.Message, 100),
quit: make(chan struct{}),
}
@ -338,7 +353,9 @@ func (g *GossipSyncer) channelGraphSyncer() {
case syncingChans:
// If we're in this state, then we'll send the remote
// peer our opening QueryChannelRange message.
queryRangeMsg, err := g.genChanRangeQuery()
queryRangeMsg, err := g.genChanRangeQuery(
g.genHistoricalChanRangeQuery,
)
if err != nil {
log.Errorf("unable to gen chan range "+
"query: %v", err)
@ -481,6 +498,9 @@ func (g *GossipSyncer) channelGraphSyncer() {
case req := <-g.syncTransitionReqs:
req.errChan <- g.handleSyncTransition(req)
case <-g.historicalSyncReqs:
g.handleHistoricalSync()
case <-g.quit:
return
}
@ -624,8 +644,11 @@ func (g *GossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) erro
// genChanRangeQuery generates the initial message we'll send to the remote
// party when we're kicking off the channel graph synchronization upon
// connection.
func (g *GossipSyncer) genChanRangeQuery() (*lnwire.QueryChannelRange, error) {
// connection. The historicalQuery boolean can be used to generate a query from
// the genesis block of the chain.
func (g *GossipSyncer) genChanRangeQuery(
historicalQuery bool) (*lnwire.QueryChannelRange, error) {
// First, we'll query our channel graph time series for its highest
// known channel ID.
newestChan, err := g.cfg.channelSeries.HighestChanID(g.cfg.chainHash)
@ -633,17 +656,17 @@ func (g *GossipSyncer) genChanRangeQuery() (*lnwire.QueryChannelRange, error) {
return nil, err
}
// Once we have the chan ID of the newest, we'll obtain the block
// height of the channel, then subtract our default horizon to ensure
// we don't miss any channels. By default, we go back 1 day from the
// newest channel.
// Once we have the chan ID of the newest, we'll obtain the block height
// of the channel, then subtract our default horizon to ensure we don't
// miss any channels. By default, we go back 1 day from the newest
// channel, unless we're attempting a historical sync, where we'll
// actually start from the genesis block instead.
var startHeight uint32
switch {
case newestChan.BlockHeight <= chanRangeQueryBuffer:
case historicalQuery:
fallthrough
case newestChan.BlockHeight == 0:
case newestChan.BlockHeight <= chanRangeQueryBuffer:
startHeight = 0
default:
startHeight = uint32(newestChan.BlockHeight - chanRangeQueryBuffer)
}
@ -1080,6 +1103,10 @@ func (g *GossipSyncer) handleSyncTransition(req *syncTransitionReq) error {
timestampRange = math.MaxUint32
newState = syncingChans
// We'll set genHistoricalChanRangeQuery to false since in order
// to not perform another historical sync if we previously have.
g.genHistoricalChanRangeQuery = false
// If a PassiveSync transition has been requested, then we should no
// longer receive any new updates from the remote peer. We can do this
// by setting our update horizon to a range in the past ensuring no
@ -1114,3 +1141,29 @@ func (g *GossipSyncer) setSyncType(syncType SyncerType) {
func (g *GossipSyncer) SyncType() SyncerType {
return SyncerType(atomic.LoadUint32(&g.syncType))
}
// historicalSync sends a request to the gossip syncer to perofmr a historical
// sync.
//
// NOTE: This can only be done once the gossip syncer has reached its final
// chansSynced state.
func (g *GossipSyncer) historicalSync() error {
select {
case g.historicalSyncReqs <- struct{}{}:
return nil
case <-time.After(syncTransitionTimeout):
return ErrSyncTransitionTimeout
case <-g.quit:
return ErrGossiperShuttingDown
}
}
// handleHistoricalSync handles a request to the gossip syncer to perform a
// historical sync.
func (g *GossipSyncer) handleHistoricalSync() {
// We'll go back to our initial syncingChans state in order to request
// the remote peer to give us all of the channel IDs they know of
// starting from the genesis block.
g.genHistoricalChanRangeQuery = true
g.setSyncState(syncingChans)
}

@ -730,7 +730,7 @@ func TestGossipSyncerGenChanRangeQuery(t *testing.T) {
// If we now ask the syncer to generate an initial range query, it
// should return a start height that's back chanRangeQueryBuffer
// blocks.
rangeQuery, err := syncer.genChanRangeQuery()
rangeQuery, err := syncer.genChanRangeQuery(false)
if err != nil {
t.Fatalf("unable to resp: %v", err)
}
@ -743,7 +743,22 @@ func TestGossipSyncerGenChanRangeQuery(t *testing.T) {
}
if rangeQuery.NumBlocks != math.MaxUint32-firstHeight {
t.Fatalf("wrong num blocks: expected %v, got %v",
rangeQuery.NumBlocks, math.MaxUint32-firstHeight)
math.MaxUint32-firstHeight, rangeQuery.NumBlocks)
}
// Generating a historical range query should result in a start height
// of 0.
rangeQuery, err = syncer.genChanRangeQuery(true)
if err != nil {
t.Fatalf("unable to resp: %v", err)
}
if rangeQuery.FirstBlockHeight != 0 {
t.Fatalf("incorrect chan range query: expected %v, %v", 0,
rangeQuery.FirstBlockHeight)
}
if rangeQuery.NumBlocks != math.MaxUint32 {
t.Fatalf("wrong num blocks: expected %v, got %v",
math.MaxUint32, rangeQuery.NumBlocks)
}
}
@ -2082,3 +2097,46 @@ func TestGossipSyncerSyncTransitions(t *testing.T) {
})
}
}
// TestGossipSyncerHistoricalSync tests that a gossip syncer can perform a
// historical sync with the remote peer.
func TestGossipSyncerHistoricalSync(t *testing.T) {
t.Parallel()
// We'll create a new gossip syncer and manually override its state to
// chansSynced. This is necessary as the syncer can only process
// historical sync requests in this state.
msgChan, syncer, _ := newTestSyncer(
lnwire.ShortChannelID{BlockHeight: latestKnownHeight},
defaultEncoding, defaultChunkSize,
)
syncer.setSyncType(PassiveSync)
syncer.setSyncState(chansSynced)
syncer.Start()
defer syncer.Stop()
syncer.historicalSync()
// We should expect to see a single lnwire.QueryChannelRange message be
// sent to the remote peer with a FirstBlockHeight of 0.
expectedMsg := &lnwire.QueryChannelRange{
FirstBlockHeight: 0,
NumBlocks: math.MaxUint32,
}
select {
case msgs := <-msgChan:
if len(msgs) != 1 {
t.Fatalf("expected to send a single "+
"lnwire.QueryChannelRange message, got %d",
len(msgs))
}
if !reflect.DeepEqual(msgs[0], expectedMsg) {
t.Fatalf("expected to send message: %v\ngot: %v",
spew.Sdump(expectedMsg), spew.Sdump(msgs[0]))
}
case <-time.After(time.Second):
t.Fatalf("expected to send a lnwire.QueryChannelRange message")
}
}