Merge pull request #3359 from cfromknecht/flag-flip-no-historical-gossip

discovery: flag flip no historical gossip
This commit is contained in:
Olaoluwa Osuntokun 2019-08-02 15:31:33 -07:00 committed by GitHub
commit 6e9e0eaddd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 114 additions and 23 deletions

@ -308,6 +308,8 @@ type config struct {
NumGraphSyncPeers int `long:"numgraphsyncpeers" description:"The number of peers that we should receive new graph updates from. This option can be tuned to save bandwidth for light clients or routing nodes."` NumGraphSyncPeers int `long:"numgraphsyncpeers" description:"The number of peers that we should receive new graph updates from. This option can be tuned to save bandwidth for light clients or routing nodes."`
HistoricalSyncInterval time.Duration `long:"historicalsyncinterval" description:"The polling interval between historical graph sync attempts. Each historical graph sync attempt ensures we reconcile with the remote peer's graph from the genesis block."` HistoricalSyncInterval time.Duration `long:"historicalsyncinterval" description:"The polling interval between historical graph sync attempts. Each historical graph sync attempt ensures we reconcile with the remote peer's graph from the genesis block."`
IgnoreHistoricalGossipFilters bool `long:"ignore-historical-gossip-filters" description:"If true, will not reply with historical data that matches the range specified by a remote peer's gossip_timestamp_filter. Doing so will result in lower memory and bandwidth requirements."`
RejectPush bool `long:"rejectpush" description:"If true, lnd will not accept channel opening requests with non-zero push amounts. This should prevent accidental pushes to merchant nodes."` RejectPush bool `long:"rejectpush" description:"If true, lnd will not accept channel opening requests with non-zero push amounts. This should prevent accidental pushes to merchant nodes."`
StaggerInitialReconnect bool `long:"stagger-initial-reconnect" description:"If true, will apply a randomized staggering between 0s and 30s when reconnecting to persistent peers on startup. The first 10 reconnections will be attempted instantly, regardless of the flag's value"` StaggerInitialReconnect bool `long:"stagger-initial-reconnect" description:"If true, will apply a randomized staggering between 0s and 30s when reconnecting to persistent peers on startup. The first 10 reconnections will be attempted instantly, regardless of the flag's value"`

@ -211,6 +211,12 @@ type Config struct {
// SubBatchDelay is the delay between sending sub batches of // SubBatchDelay is the delay between sending sub batches of
// gossip messages. // gossip messages.
SubBatchDelay time.Duration SubBatchDelay time.Duration
// IgnoreHistoricalFilters will prevent syncers from replying with
// historical data when the remote peer sets a gossip_timestamp_range.
// This prevents ranges with old start times from causing us to dump the
// graph on connect.
IgnoreHistoricalFilters bool
} }
// AuthenticatedGossiper is a subsystem which is responsible for receiving // AuthenticatedGossiper is a subsystem which is responsible for receiving
@ -313,11 +319,12 @@ func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper {
channelMtx: multimutex.NewMutex(), channelMtx: multimutex.NewMutex(),
recentRejects: make(map[uint64]struct{}), recentRejects: make(map[uint64]struct{}),
syncMgr: newSyncManager(&SyncManagerCfg{ syncMgr: newSyncManager(&SyncManagerCfg{
ChainHash: cfg.ChainHash, ChainHash: cfg.ChainHash,
ChanSeries: cfg.ChanSeries, ChanSeries: cfg.ChanSeries,
RotateTicker: cfg.RotateTicker, RotateTicker: cfg.RotateTicker,
HistoricalSyncTicker: cfg.HistoricalSyncTicker, HistoricalSyncTicker: cfg.HistoricalSyncTicker,
NumActiveSyncers: cfg.NumActiveSyncers, NumActiveSyncers: cfg.NumActiveSyncers,
IgnoreHistoricalFilters: cfg.IgnoreHistoricalFilters,
}), }),
} }

@ -82,6 +82,12 @@ type SyncManagerCfg struct {
// SyncManager when it should attempt a historical sync with a gossip // SyncManager when it should attempt a historical sync with a gossip
// sync peer. // sync peer.
HistoricalSyncTicker ticker.Ticker HistoricalSyncTicker ticker.Ticker
// IgnoreHistoricalFilters will prevent syncers from replying with
// historical data when the remote peer sets a gossip_timestamp_range.
// This prevents ranges with old start times from causing us to dump the
// graph on connect.
IgnoreHistoricalFilters bool
} }
// SyncManager is a subsystem of the gossiper that manages the gossip syncers // SyncManager is a subsystem of the gossiper that manages the gossip syncers
@ -400,6 +406,7 @@ func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer {
sendToPeerSync: func(msgs ...lnwire.Message) error { sendToPeerSync: func(msgs ...lnwire.Message) error {
return peer.SendMessageLazy(true, msgs...) return peer.SendMessageLazy(true, msgs...)
}, },
ignoreHistoricalFilters: m.cfg.IgnoreHistoricalFilters,
}) })
// Gossip syncers are initialized by default in a PassiveSync type // Gossip syncers are initialized by default in a PassiveSync type

@ -235,6 +235,12 @@ type gossipSyncerCfg struct {
// replyHandler, meaning we will not reply to queries from our remote // replyHandler, meaning we will not reply to queries from our remote
// peer. // peer.
noReplyQueries bool noReplyQueries bool
// ignoreHistoricalFilters will prevent syncers from replying with
// historical data when the remote peer sets a gossip_timestamp_range.
// This prevents ranges with old start times from causing us to dump the
// graph on connect.
ignoreHistoricalFilters bool
} }
// GossipSyncer is a struct that handles synchronizing the channel graph state // GossipSyncer is a struct that handles synchronizing the channel graph state
@ -951,6 +957,12 @@ func (g *GossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) er
g.Unlock() g.Unlock()
// If requested, don't reply with historical gossip data when the remote
// peer sets their gossip timestamp range.
if g.cfg.ignoreHistoricalFilters {
return nil
}
// Now that the remote peer has applied their filter, we'll query the // Now that the remote peer has applied their filter, we'll query the
// database for all the messages that are beyond this filter. // database for all the messages that are beyond this filter.
newUpdatestoSend, err := g.cfg.channelSeries.UpdatesInHorizon( newUpdatestoSend, err := g.cfg.channelSeries.UpdatesInHorizon(

@ -1,8 +1,10 @@
package discovery package discovery
import ( import (
"errors"
"math" "math"
"reflect" "reflect"
"sync"
"testing" "testing"
"time" "time"
@ -336,6 +338,66 @@ func TestGossipSyncerFilterGossipMsgsAllInMemory(t *testing.T) {
} }
} }
// TestGossipSyncerApplyNoHistoricalGossipFilter tests that once a gossip filter
// is applied for the remote peer, then we don't send the peer all known
// messages which are within their desired time horizon.
func TestGossipSyncerApplyNoHistoricalGossipFilter(t *testing.T) {
t.Parallel()
// First, we'll create a GossipSyncer instance with a canned sendToPeer
// message to allow us to intercept their potential sends.
_, syncer, chanSeries := newTestSyncer(
lnwire.NewShortChanIDFromInt(10), defaultEncoding,
defaultChunkSize,
)
syncer.cfg.ignoreHistoricalFilters = true
// We'll apply this gossip horizon for the remote peer.
remoteHorizon := &lnwire.GossipTimestampRange{
FirstTimestamp: unixStamp(25000),
TimestampRange: uint32(1000),
}
// After applying the gossip filter, the chan series should not be
// queried using the updated horizon.
errChan := make(chan error, 1)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
select {
// No query received, success.
case <-time.After(3 * time.Second):
errChan <- nil
// Unexpected query received.
case <-chanSeries.horizonReq:
errChan <- errors.New("chan series should not have been " +
"queried")
}
}()
// We'll now attempt to apply the gossip filter for the remote peer.
syncer.ApplyGossipFilter(remoteHorizon)
// Ensure that the syncer's remote horizon was properly updated.
if !reflect.DeepEqual(syncer.remoteUpdateHorizon, remoteHorizon) {
t.Fatalf("expected remote horizon: %v, got: %v",
remoteHorizon, syncer.remoteUpdateHorizon)
}
// Wait for the query check to finish.
wg.Wait()
// Assert that no query was made as a result of applying the gossip
// filter.
err := <-errChan
if err != nil {
t.Fatalf(err.Error())
}
}
// TestGossipSyncerApplyGossipFilter tests that once a gossip filter is applied // TestGossipSyncerApplyGossipFilter tests that once a gossip filter is applied
// for the remote peer, then we send the peer all known messages which are // for the remote peer, then we send the peer all known messages which are
// within their desired time horizon. // within their desired time horizon.

@ -712,24 +712,25 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB,
} }
s.authGossiper = discovery.New(discovery.Config{ s.authGossiper = discovery.New(discovery.Config{
Router: s.chanRouter, Router: s.chanRouter,
Notifier: s.cc.chainNotifier, Notifier: s.cc.chainNotifier,
ChainHash: *activeNetParams.GenesisHash, ChainHash: *activeNetParams.GenesisHash,
Broadcast: s.BroadcastMessage, Broadcast: s.BroadcastMessage,
ChanSeries: chanSeries, ChanSeries: chanSeries,
NotifyWhenOnline: s.NotifyWhenOnline, NotifyWhenOnline: s.NotifyWhenOnline,
NotifyWhenOffline: s.NotifyWhenOffline, NotifyWhenOffline: s.NotifyWhenOffline,
ProofMatureDelta: 0, ProofMatureDelta: 0,
TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay), TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay),
RetransmitDelay: time.Minute * 30, RetransmitDelay: time.Minute * 30,
WaitingProofStore: waitingProofStore, WaitingProofStore: waitingProofStore,
MessageStore: gossipMessageStore, MessageStore: gossipMessageStore,
AnnSigner: s.nodeSigner, AnnSigner: s.nodeSigner,
RotateTicker: ticker.New(discovery.DefaultSyncerRotationInterval), RotateTicker: ticker.New(discovery.DefaultSyncerRotationInterval),
HistoricalSyncTicker: ticker.New(cfg.HistoricalSyncInterval), HistoricalSyncTicker: ticker.New(cfg.HistoricalSyncInterval),
NumActiveSyncers: cfg.NumGraphSyncPeers, NumActiveSyncers: cfg.NumGraphSyncPeers,
MinimumBatchSize: 10, MinimumBatchSize: 10,
SubBatchDelay: time.Second * 5, SubBatchDelay: time.Second * 5,
IgnoreHistoricalFilters: cfg.IgnoreHistoricalGossipFilters,
}, },
s.identityPriv.PubKey(), s.identityPriv.PubKey(),
) )