discovery/syncer: make gossip sends synchronous

This commit makes all replies in the gossip syncer synchronous, meaning
that they will wait for each message to be successfully written to the
remote peer before attempting to send the next. This helps throttle
messages the remote peer has requested, preventing unintended
disconnects when the remote peer is slow to process messages. This
changes also helps out congestion in the peer by forcing the syncer to
buffer the messages instead of dumping them into the peer's queue.
This commit is contained in:
Conner Fromknecht 2019-04-26 20:05:10 -07:00
parent ca358e9673
commit bf4543e2bd
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7
3 changed files with 73 additions and 41 deletions

@ -386,6 +386,9 @@ func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer {
sendToPeer: func(msgs ...lnwire.Message) error { sendToPeer: func(msgs ...lnwire.Message) error {
return peer.SendMessageLazy(false, msgs...) return peer.SendMessageLazy(false, msgs...)
}, },
sendToPeerSync: func(msgs ...lnwire.Message) error {
return peer.SendMessageLazy(true, msgs...)
},
}) })
// Gossip syncers are initialized by default in a PassiveSync type // Gossip syncers are initialized by default in a PassiveSync type

@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"golang.org/x/time/rate" "golang.org/x/time/rate"
) )
@ -206,11 +207,16 @@ type gossipSyncerCfg struct {
// the remote node in a single QueryShortChanIDs request. // the remote node in a single QueryShortChanIDs request.
batchSize int32 batchSize int32
// sendToPeer is a function closure that should send the set of // sendToPeer sends a variadic number of messages to the remote peer.
// targeted messages to the peer we've been assigned to sync the graph // This method should not block while waiting for sends to be written
// state from. // to the wire.
sendToPeer func(...lnwire.Message) error sendToPeer func(...lnwire.Message) error
// sendToPeerSync sends a variadic number of messages to the remote
// peer, blocking until all messages have been sent successfully or a
// write error is encountered.
sendToPeerSync func(...lnwire.Message) error
// maxUndelayedQueryReplies specifies how many gossip queries we will // maxUndelayedQueryReplies specifies how many gossip queries we will
// respond to immediately before starting to delay responses. // respond to immediately before starting to delay responses.
maxUndelayedQueryReplies int maxUndelayedQueryReplies int
@ -565,6 +571,9 @@ func (g *GossipSyncer) replyHandler() {
case err == ErrGossipSyncerExiting: case err == ErrGossipSyncerExiting:
return return
case err == lnpeer.ErrPeerExiting:
return
case err != nil: case err != nil:
log.Errorf("Unable to reply to peer "+ log.Errorf("Unable to reply to peer "+
"query: %v", err) "query: %v", err)
@ -854,7 +863,7 @@ func (g *GossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) erro
if isFinalChunk { if isFinalChunk {
replyChunk.Complete = 1 replyChunk.Complete = 1
} }
if err := g.cfg.sendToPeer(&replyChunk); err != nil { if err := g.cfg.sendToPeerSync(&replyChunk); err != nil {
return err return err
} }
@ -882,7 +891,7 @@ func (g *GossipSyncer) replyShortChanIDs(query *lnwire.QueryShortChanIDs) error
"chain=%v, we're on chain=%v", g.cfg.chainHash, "chain=%v, we're on chain=%v", g.cfg.chainHash,
query.ChainHash) query.ChainHash)
return g.cfg.sendToPeer(&lnwire.ReplyShortChanIDsEnd{ return g.cfg.sendToPeerSync(&lnwire.ReplyShortChanIDsEnd{
ChainHash: query.ChainHash, ChainHash: query.ChainHash,
Complete: 0, Complete: 0,
}) })
@ -909,23 +918,22 @@ func (g *GossipSyncer) replyShortChanIDs(query *lnwire.QueryShortChanIDs) error
query.ShortChanIDs[0].ToUint64(), err) query.ShortChanIDs[0].ToUint64(), err)
} }
// If we didn't find any messages related to those channel ID's, then // Reply with any messages related to those channel ID's, we'll write
// we'll send over a reply marking the end of our response, and exit // each one individually and synchronously to throttle the sends and
// early. // perform buffering of responses in the syncer as opposed to the peer.
if len(replyMsgs) == 0 { for _, msg := range replyMsgs {
return g.cfg.sendToPeer(&lnwire.ReplyShortChanIDsEnd{ err := g.cfg.sendToPeerSync(msg)
ChainHash: query.ChainHash, if err != nil {
Complete: 1, return err
}) }
} }
// Otherwise, we'll send over our set of messages responding to the // Regardless of whether we had any messages to reply with, send over
// query, with the ending message appended to it. // the sentinel message to signal that the stream has terminated.
replyMsgs = append(replyMsgs, &lnwire.ReplyShortChanIDsEnd{ return g.cfg.sendToPeerSync(&lnwire.ReplyShortChanIDsEnd{
ChainHash: query.ChainHash, ChainHash: query.ChainHash,
Complete: 1, Complete: 1,
}) })
return g.cfg.sendToPeer(replyMsgs...)
} }
// ApplyGossipFilter applies a gossiper filter sent by the remote node to the // ApplyGossipFilter applies a gossiper filter sent by the remote node to the
@ -966,9 +974,19 @@ func (g *GossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) er
go func() { go func() {
defer g.wg.Done() defer g.wg.Done()
if err := g.cfg.sendToPeer(newUpdatestoSend...); err != nil { for _, msg := range newUpdatestoSend {
log.Errorf("unable to send messages for peer catch "+ err := g.cfg.sendToPeerSync(msg)
"up: %v", err) switch {
case err == ErrGossipSyncerExiting:
return
case err == lnpeer.ErrPeerExiting:
return
case err != nil:
log.Errorf("Unable to send message for "+
"peer catch up: %v", err)
}
} }
}() }()

@ -151,6 +151,10 @@ func newTestSyncer(hID lnwire.ShortChannelID,
msgChan <- msgs msgChan <- msgs
return nil return nil
}, },
sendToPeerSync: func(msgs ...lnwire.Message) error {
msgChan <- msgs
return nil
},
delayedQueryReplyInterval: 2 * time.Second, delayedQueryReplyInterval: 2 * time.Second,
} }
syncer := newGossipSyncer(cfg) syncer := newGossipSyncer(cfg)
@ -540,30 +544,37 @@ func TestGossipSyncerReplyShortChanIDs(t *testing.T) {
t.Fatalf("unable to query for chan IDs: %v", err) t.Fatalf("unable to query for chan IDs: %v", err)
} }
select { for i := 0; i < len(queryReply)+1; i++ {
case <-time.After(time.Second * 15): select {
t.Fatalf("no msgs received") case <-time.After(time.Second * 15):
t.Fatalf("no msgs received")
// We should get back exactly 4 messages. The first 3 are the same // We should get back exactly 4 messages. The first 3 are the
// messages we sent above, and the query end message. // same messages we sent above, and the query end message.
case msgs := <-msgChan: case msgs := <-msgChan:
if len(msgs) != 4 { if len(msgs) != 1 {
t.Fatalf("wrong messages: expected %v, got %v", t.Fatalf("wrong number of messages: "+
4, len(msgs)) "expected %v, got %v", 1, len(msgs))
} }
if !reflect.DeepEqual(queryReply, msgs[:3]) { isQueryReply := i < len(queryReply)
t.Fatalf("wrong set of messages: expected %v, got %v", finalMsg, ok := msgs[0].(*lnwire.ReplyShortChanIDsEnd)
spew.Sdump(queryReply), spew.Sdump(msgs[:3]))
}
finalMsg, ok := msgs[3].(*lnwire.ReplyShortChanIDsEnd) switch {
if !ok { case isQueryReply &&
t.Fatalf("expected lnwire.ReplyShortChanIDsEnd "+ !reflect.DeepEqual(queryReply[i], msgs[0]):
"instead got %T", msgs[3])
} t.Fatalf("wrong message: expected %v, got %v",
if finalMsg.Complete != 1 { spew.Sdump(queryReply[i]),
t.Fatalf("complete wasn't set") spew.Sdump(msgs[0]))
case !isQueryReply && !ok:
t.Fatalf("expected lnwire.ReplyShortChanIDsEnd"+
" instead got %T", msgs[3])
case !isQueryReply && finalMsg.Complete != 1:
t.Fatalf("complete wasn't set")
}
} }
} }
} }