From bf4543e2bd48081358a1559d86bf0ea1d1232cf4 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 26 Apr 2019 20:05:10 -0700 Subject: [PATCH] discovery/syncer: make gossip sends synchronous This commit makes all replies in the gossip syncer synchronous, meaning that they will wait for each message to be successfully written to the remote peer before attempting to send the next. This helps throttle messages the remote peer has requested, preventing unintended disconnects when the remote peer is slow to process messages. This changes also helps out congestion in the peer by forcing the syncer to buffer the messages instead of dumping them into the peer's queue. --- discovery/sync_manager.go | 3 ++ discovery/syncer.go | 58 +++++++++++++++++++++++++-------------- discovery/syncer_test.go | 53 +++++++++++++++++++++-------------- 3 files changed, 73 insertions(+), 41 deletions(-) diff --git a/discovery/sync_manager.go b/discovery/sync_manager.go index 06022518..c1bb72c3 100644 --- a/discovery/sync_manager.go +++ b/discovery/sync_manager.go @@ -386,6 +386,9 @@ func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer { sendToPeer: func(msgs ...lnwire.Message) error { return peer.SendMessageLazy(false, msgs...) }, + sendToPeerSync: func(msgs ...lnwire.Message) error { + return peer.SendMessageLazy(true, msgs...) + }, }) // Gossip syncers are initialized by default in a PassiveSync type diff --git a/discovery/syncer.go b/discovery/syncer.go index 346215bf..02f90b79 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -9,6 +9,7 @@ import ( "time" "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnwire" "golang.org/x/time/rate" ) @@ -206,11 +207,16 @@ type gossipSyncerCfg struct { // the remote node in a single QueryShortChanIDs request. batchSize int32 - // sendToPeer is a function closure that should send the set of - // targeted messages to the peer we've been assigned to sync the graph - // state from. + // sendToPeer sends a variadic number of messages to the remote peer. + // This method should not block while waiting for sends to be written + // to the wire. sendToPeer func(...lnwire.Message) error + // sendToPeerSync sends a variadic number of messages to the remote + // peer, blocking until all messages have been sent successfully or a + // write error is encountered. + sendToPeerSync func(...lnwire.Message) error + // maxUndelayedQueryReplies specifies how many gossip queries we will // respond to immediately before starting to delay responses. maxUndelayedQueryReplies int @@ -565,6 +571,9 @@ func (g *GossipSyncer) replyHandler() { case err == ErrGossipSyncerExiting: return + case err == lnpeer.ErrPeerExiting: + return + case err != nil: log.Errorf("Unable to reply to peer "+ "query: %v", err) @@ -854,7 +863,7 @@ func (g *GossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) erro if isFinalChunk { replyChunk.Complete = 1 } - if err := g.cfg.sendToPeer(&replyChunk); err != nil { + if err := g.cfg.sendToPeerSync(&replyChunk); err != nil { return err } @@ -882,7 +891,7 @@ func (g *GossipSyncer) replyShortChanIDs(query *lnwire.QueryShortChanIDs) error "chain=%v, we're on chain=%v", g.cfg.chainHash, query.ChainHash) - return g.cfg.sendToPeer(&lnwire.ReplyShortChanIDsEnd{ + return g.cfg.sendToPeerSync(&lnwire.ReplyShortChanIDsEnd{ ChainHash: query.ChainHash, Complete: 0, }) @@ -909,23 +918,22 @@ func (g *GossipSyncer) replyShortChanIDs(query *lnwire.QueryShortChanIDs) error query.ShortChanIDs[0].ToUint64(), err) } - // If we didn't find any messages related to those channel ID's, then - // we'll send over a reply marking the end of our response, and exit - // early. - if len(replyMsgs) == 0 { - return g.cfg.sendToPeer(&lnwire.ReplyShortChanIDsEnd{ - ChainHash: query.ChainHash, - Complete: 1, - }) + // Reply with any messages related to those channel ID's, we'll write + // each one individually and synchronously to throttle the sends and + // perform buffering of responses in the syncer as opposed to the peer. + for _, msg := range replyMsgs { + err := g.cfg.sendToPeerSync(msg) + if err != nil { + return err + } } - // Otherwise, we'll send over our set of messages responding to the - // query, with the ending message appended to it. - replyMsgs = append(replyMsgs, &lnwire.ReplyShortChanIDsEnd{ + // Regardless of whether we had any messages to reply with, send over + // the sentinel message to signal that the stream has terminated. + return g.cfg.sendToPeerSync(&lnwire.ReplyShortChanIDsEnd{ ChainHash: query.ChainHash, Complete: 1, }) - return g.cfg.sendToPeer(replyMsgs...) } // ApplyGossipFilter applies a gossiper filter sent by the remote node to the @@ -966,9 +974,19 @@ func (g *GossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) er go func() { defer g.wg.Done() - if err := g.cfg.sendToPeer(newUpdatestoSend...); err != nil { - log.Errorf("unable to send messages for peer catch "+ - "up: %v", err) + for _, msg := range newUpdatestoSend { + err := g.cfg.sendToPeerSync(msg) + switch { + case err == ErrGossipSyncerExiting: + return + + case err == lnpeer.ErrPeerExiting: + return + + case err != nil: + log.Errorf("Unable to send message for "+ + "peer catch up: %v", err) + } } }() diff --git a/discovery/syncer_test.go b/discovery/syncer_test.go index b9ee24c7..fd6577d1 100644 --- a/discovery/syncer_test.go +++ b/discovery/syncer_test.go @@ -151,6 +151,10 @@ func newTestSyncer(hID lnwire.ShortChannelID, msgChan <- msgs return nil }, + sendToPeerSync: func(msgs ...lnwire.Message) error { + msgChan <- msgs + return nil + }, delayedQueryReplyInterval: 2 * time.Second, } syncer := newGossipSyncer(cfg) @@ -540,30 +544,37 @@ func TestGossipSyncerReplyShortChanIDs(t *testing.T) { t.Fatalf("unable to query for chan IDs: %v", err) } - select { - case <-time.After(time.Second * 15): - t.Fatalf("no msgs received") + for i := 0; i < len(queryReply)+1; i++ { + select { + case <-time.After(time.Second * 15): + t.Fatalf("no msgs received") - // We should get back exactly 4 messages. The first 3 are the same - // messages we sent above, and the query end message. - case msgs := <-msgChan: - if len(msgs) != 4 { - t.Fatalf("wrong messages: expected %v, got %v", - 4, len(msgs)) - } + // We should get back exactly 4 messages. The first 3 are the + // same messages we sent above, and the query end message. + case msgs := <-msgChan: + if len(msgs) != 1 { + t.Fatalf("wrong number of messages: "+ + "expected %v, got %v", 1, len(msgs)) + } - if !reflect.DeepEqual(queryReply, msgs[:3]) { - t.Fatalf("wrong set of messages: expected %v, got %v", - spew.Sdump(queryReply), spew.Sdump(msgs[:3])) - } + isQueryReply := i < len(queryReply) + finalMsg, ok := msgs[0].(*lnwire.ReplyShortChanIDsEnd) - finalMsg, ok := msgs[3].(*lnwire.ReplyShortChanIDsEnd) - if !ok { - t.Fatalf("expected lnwire.ReplyShortChanIDsEnd "+ - "instead got %T", msgs[3]) - } - if finalMsg.Complete != 1 { - t.Fatalf("complete wasn't set") + switch { + case isQueryReply && + !reflect.DeepEqual(queryReply[i], msgs[0]): + + t.Fatalf("wrong message: expected %v, got %v", + spew.Sdump(queryReply[i]), + spew.Sdump(msgs[0])) + + case !isQueryReply && !ok: + t.Fatalf("expected lnwire.ReplyShortChanIDsEnd"+ + " instead got %T", msgs[3]) + + case !isQueryReply && finalMsg.Complete != 1: + t.Fatalf("complete wasn't set") + } } } }