discovery/syncer: delay replies after initial sync to prevent DOS
This commit is contained in:
parent
fb95858afc
commit
557cb6e253
@ -1,6 +1,7 @@
|
|||||||
package discovery
|
package discovery
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"sync"
|
"sync"
|
||||||
@ -52,6 +53,17 @@ const (
|
|||||||
chansSynced
|
chansSynced
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// DefaultMaxUndelayedQueryReplies specifies how many gossip queries we
|
||||||
|
// will respond to immediately before starting to delay responses.
|
||||||
|
DefaultMaxUndelayedQueryReplies = 5
|
||||||
|
|
||||||
|
// DefaultDelayedQueryReplyInterval is the length of time we will wait
|
||||||
|
// before responding to gossip queries after replying to
|
||||||
|
// maxUndelayedQueryReplies queries.
|
||||||
|
DefaultDelayedQueryReplyInterval = 30 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
// String returns a human readable string describing the target syncerState.
|
// String returns a human readable string describing the target syncerState.
|
||||||
func (s syncerState) String() string {
|
func (s syncerState) String() string {
|
||||||
switch s {
|
switch s {
|
||||||
@ -82,6 +94,9 @@ var (
|
|||||||
encodingTypeToChunkSize = map[lnwire.ShortChanIDEncoding]int32{
|
encodingTypeToChunkSize = map[lnwire.ShortChanIDEncoding]int32{
|
||||||
lnwire.EncodingSortedPlain: 8000,
|
lnwire.EncodingSortedPlain: 8000,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ErrGossipSyncerExiting signals that the syncer has been killed.
|
||||||
|
ErrGossipSyncerExiting = errors.New("gossip syncer exiting")
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -167,6 +182,15 @@ type gossipSyncerCfg struct {
|
|||||||
// targeted messages to the peer we've been assigned to sync the graph
|
// targeted messages to the peer we've been assigned to sync the graph
|
||||||
// state from.
|
// state from.
|
||||||
sendToPeer func(...lnwire.Message) error
|
sendToPeer func(...lnwire.Message) error
|
||||||
|
|
||||||
|
// maxUndelayedQueryReplies specifies how many gossip queries we will
|
||||||
|
// respond to immediately before starting to delay responses.
|
||||||
|
maxUndelayedQueryReplies int
|
||||||
|
|
||||||
|
// delayedQueryReplyInterval is the length of time we will wait before
|
||||||
|
// responding to gossip queries after replying to
|
||||||
|
// maxUndelayedQueryReplies queries.
|
||||||
|
delayedQueryReplyInterval time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// gossipSyncer is a struct that handles synchronizing the channel graph state
|
// gossipSyncer is a struct that handles synchronizing the channel graph state
|
||||||
@ -214,6 +238,11 @@ type gossipSyncer struct {
|
|||||||
|
|
||||||
cfg gossipSyncerCfg
|
cfg gossipSyncerCfg
|
||||||
|
|
||||||
|
// replyCount records how many query replies we've responded to. This is
|
||||||
|
// used to determine when to start delaying responses to peers to
|
||||||
|
// prevent DOS vulnerabilities.
|
||||||
|
replyCount int
|
||||||
|
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
|
|
||||||
quit chan struct{}
|
quit chan struct{}
|
||||||
@ -223,6 +252,18 @@ type gossipSyncer struct {
|
|||||||
// newGossiperSyncer returns a new instance of the gossipSyncer populated using
|
// newGossiperSyncer returns a new instance of the gossipSyncer populated using
|
||||||
// the passed config.
|
// the passed config.
|
||||||
func newGossiperSyncer(cfg gossipSyncerCfg) *gossipSyncer {
|
func newGossiperSyncer(cfg gossipSyncerCfg) *gossipSyncer {
|
||||||
|
// If no parameter was specified for max undelayed query replies, set it
|
||||||
|
// to the default of 5 queries.
|
||||||
|
if cfg.maxUndelayedQueryReplies <= 0 {
|
||||||
|
cfg.maxUndelayedQueryReplies = DefaultMaxUndelayedQueryReplies
|
||||||
|
}
|
||||||
|
|
||||||
|
// If no parameter was specified for delayed query reply interval, set
|
||||||
|
// to the default of 30 seconds.
|
||||||
|
if cfg.delayedQueryReplyInterval <= 0 {
|
||||||
|
cfg.delayedQueryReplyInterval = DefaultDelayedQueryReplyInterval
|
||||||
|
}
|
||||||
|
|
||||||
return &gossipSyncer{
|
return &gossipSyncer{
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
gossipMsgs: make(chan lnwire.Message, 100),
|
gossipMsgs: make(chan lnwire.Message, 100),
|
||||||
@ -332,7 +373,7 @@ func (g *gossipSyncer) channelGraphSyncer() {
|
|||||||
// Otherwise, it's the remote peer performing a
|
// Otherwise, it's the remote peer performing a
|
||||||
// query, which we'll attempt to reply to.
|
// query, which we'll attempt to reply to.
|
||||||
err := g.replyPeerQueries(msg)
|
err := g.replyPeerQueries(msg)
|
||||||
if err != nil {
|
if err != nil && err != ErrGossipSyncerExiting {
|
||||||
log.Errorf("unable to reply to peer "+
|
log.Errorf("unable to reply to peer "+
|
||||||
"query: %v", err)
|
"query: %v", err)
|
||||||
}
|
}
|
||||||
@ -386,7 +427,7 @@ func (g *gossipSyncer) channelGraphSyncer() {
|
|||||||
// Otherwise, it's the remote peer performing a
|
// Otherwise, it's the remote peer performing a
|
||||||
// query, which we'll attempt to deploy to.
|
// query, which we'll attempt to deploy to.
|
||||||
err := g.replyPeerQueries(msg)
|
err := g.replyPeerQueries(msg)
|
||||||
if err != nil {
|
if err != nil && err != ErrGossipSyncerExiting {
|
||||||
log.Errorf("unable to reply to peer "+
|
log.Errorf("unable to reply to peer "+
|
||||||
"query: %v", err)
|
"query: %v", err)
|
||||||
}
|
}
|
||||||
@ -430,7 +471,7 @@ func (g *gossipSyncer) channelGraphSyncer() {
|
|||||||
select {
|
select {
|
||||||
case msg := <-g.gossipMsgs:
|
case msg := <-g.gossipMsgs:
|
||||||
err := g.replyPeerQueries(msg)
|
err := g.replyPeerQueries(msg)
|
||||||
if err != nil {
|
if err != nil && err != ErrGossipSyncerExiting {
|
||||||
log.Errorf("unable to reply to peer "+
|
log.Errorf("unable to reply to peer "+
|
||||||
"query: %v", err)
|
"query: %v", err)
|
||||||
}
|
}
|
||||||
@ -588,6 +629,24 @@ func (g *gossipSyncer) genChanRangeQuery() (*lnwire.QueryChannelRange, error) {
|
|||||||
// replyPeerQueries is called in response to any query by the remote peer.
|
// replyPeerQueries is called in response to any query by the remote peer.
|
||||||
// We'll examine our state and send back our best response.
|
// We'll examine our state and send back our best response.
|
||||||
func (g *gossipSyncer) replyPeerQueries(msg lnwire.Message) error {
|
func (g *gossipSyncer) replyPeerQueries(msg lnwire.Message) error {
|
||||||
|
// If we've already replied a handful of times, we will start to delay
|
||||||
|
// responses back to the remote peer. This can help prevent DOS attacks
|
||||||
|
// where the remote peer spams us endlessly.
|
||||||
|
switch {
|
||||||
|
case g.replyCount == g.cfg.maxUndelayedQueryReplies:
|
||||||
|
log.Infof("gossipSyncer(%x): entering delayed gossip replies",
|
||||||
|
g.peerPub[:])
|
||||||
|
fallthrough
|
||||||
|
|
||||||
|
case g.replyCount > g.cfg.maxUndelayedQueryReplies:
|
||||||
|
select {
|
||||||
|
case <-time.After(g.cfg.delayedQueryReplyInterval):
|
||||||
|
case <-g.quit:
|
||||||
|
return ErrGossipSyncerExiting
|
||||||
|
}
|
||||||
|
}
|
||||||
|
g.replyCount++
|
||||||
|
|
||||||
switch msg := msg.(type) {
|
switch msg := msg.(type) {
|
||||||
|
|
||||||
// In this state, we'll also handle any incoming channel range queries
|
// In this state, we'll also handle any incoming channel range queries
|
||||||
|
Loading…
Reference in New Issue
Block a user