diff --git a/discovery/sync_manager.go b/discovery/sync_manager.go index 06022518..c1bb72c3 100644 --- a/discovery/sync_manager.go +++ b/discovery/sync_manager.go @@ -386,6 +386,9 @@ func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer { sendToPeer: func(msgs ...lnwire.Message) error { return peer.SendMessageLazy(false, msgs...) }, + sendToPeerSync: func(msgs ...lnwire.Message) error { + return peer.SendMessageLazy(true, msgs...) + }, }) // Gossip syncers are initialized by default in a PassiveSync type diff --git a/discovery/syncer.go b/discovery/syncer.go index 346215bf..02f90b79 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -9,6 +9,7 @@ import ( "time" "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnwire" "golang.org/x/time/rate" ) @@ -206,11 +207,16 @@ type gossipSyncerCfg struct { // the remote node in a single QueryShortChanIDs request. batchSize int32 - // sendToPeer is a function closure that should send the set of - // targeted messages to the peer we've been assigned to sync the graph - // state from. + // sendToPeer sends a variadic number of messages to the remote peer. + // This method should not block while waiting for sends to be written + // to the wire. sendToPeer func(...lnwire.Message) error + // sendToPeerSync sends a variadic number of messages to the remote + // peer, blocking until all messages have been sent successfully or a + // write error is encountered. + sendToPeerSync func(...lnwire.Message) error + // maxUndelayedQueryReplies specifies how many gossip queries we will // respond to immediately before starting to delay responses. maxUndelayedQueryReplies int @@ -565,6 +571,9 @@ func (g *GossipSyncer) replyHandler() { case err == ErrGossipSyncerExiting: return + case err == lnpeer.ErrPeerExiting: + return + case err != nil: log.Errorf("Unable to reply to peer "+ "query: %v", err) @@ -854,7 +863,7 @@ func (g *GossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) erro if isFinalChunk { replyChunk.Complete = 1 } - if err := g.cfg.sendToPeer(&replyChunk); err != nil { + if err := g.cfg.sendToPeerSync(&replyChunk); err != nil { return err } @@ -882,7 +891,7 @@ func (g *GossipSyncer) replyShortChanIDs(query *lnwire.QueryShortChanIDs) error "chain=%v, we're on chain=%v", g.cfg.chainHash, query.ChainHash) - return g.cfg.sendToPeer(&lnwire.ReplyShortChanIDsEnd{ + return g.cfg.sendToPeerSync(&lnwire.ReplyShortChanIDsEnd{ ChainHash: query.ChainHash, Complete: 0, }) @@ -909,23 +918,22 @@ func (g *GossipSyncer) replyShortChanIDs(query *lnwire.QueryShortChanIDs) error query.ShortChanIDs[0].ToUint64(), err) } - // If we didn't find any messages related to those channel ID's, then - // we'll send over a reply marking the end of our response, and exit - // early. - if len(replyMsgs) == 0 { - return g.cfg.sendToPeer(&lnwire.ReplyShortChanIDsEnd{ - ChainHash: query.ChainHash, - Complete: 1, - }) + // Reply with any messages related to those channel ID's, we'll write + // each one individually and synchronously to throttle the sends and + // perform buffering of responses in the syncer as opposed to the peer. + for _, msg := range replyMsgs { + err := g.cfg.sendToPeerSync(msg) + if err != nil { + return err + } } - // Otherwise, we'll send over our set of messages responding to the - // query, with the ending message appended to it. - replyMsgs = append(replyMsgs, &lnwire.ReplyShortChanIDsEnd{ + // Regardless of whether we had any messages to reply with, send over + // the sentinel message to signal that the stream has terminated. + return g.cfg.sendToPeerSync(&lnwire.ReplyShortChanIDsEnd{ ChainHash: query.ChainHash, Complete: 1, }) - return g.cfg.sendToPeer(replyMsgs...) } // ApplyGossipFilter applies a gossiper filter sent by the remote node to the @@ -966,9 +974,19 @@ func (g *GossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) er go func() { defer g.wg.Done() - if err := g.cfg.sendToPeer(newUpdatestoSend...); err != nil { - log.Errorf("unable to send messages for peer catch "+ - "up: %v", err) + for _, msg := range newUpdatestoSend { + err := g.cfg.sendToPeerSync(msg) + switch { + case err == ErrGossipSyncerExiting: + return + + case err == lnpeer.ErrPeerExiting: + return + + case err != nil: + log.Errorf("Unable to send message for "+ + "peer catch up: %v", err) + } } }() diff --git a/discovery/syncer_test.go b/discovery/syncer_test.go index b9ee24c7..fd6577d1 100644 --- a/discovery/syncer_test.go +++ b/discovery/syncer_test.go @@ -151,6 +151,10 @@ func newTestSyncer(hID lnwire.ShortChannelID, msgChan <- msgs return nil }, + sendToPeerSync: func(msgs ...lnwire.Message) error { + msgChan <- msgs + return nil + }, delayedQueryReplyInterval: 2 * time.Second, } syncer := newGossipSyncer(cfg) @@ -540,30 +544,37 @@ func TestGossipSyncerReplyShortChanIDs(t *testing.T) { t.Fatalf("unable to query for chan IDs: %v", err) } - select { - case <-time.After(time.Second * 15): - t.Fatalf("no msgs received") + for i := 0; i < len(queryReply)+1; i++ { + select { + case <-time.After(time.Second * 15): + t.Fatalf("no msgs received") - // We should get back exactly 4 messages. The first 3 are the same - // messages we sent above, and the query end message. - case msgs := <-msgChan: - if len(msgs) != 4 { - t.Fatalf("wrong messages: expected %v, got %v", - 4, len(msgs)) - } + // We should get back exactly 4 messages. The first 3 are the + // same messages we sent above, and the query end message. + case msgs := <-msgChan: + if len(msgs) != 1 { + t.Fatalf("wrong number of messages: "+ + "expected %v, got %v", 1, len(msgs)) + } - if !reflect.DeepEqual(queryReply, msgs[:3]) { - t.Fatalf("wrong set of messages: expected %v, got %v", - spew.Sdump(queryReply), spew.Sdump(msgs[:3])) - } + isQueryReply := i < len(queryReply) + finalMsg, ok := msgs[0].(*lnwire.ReplyShortChanIDsEnd) - finalMsg, ok := msgs[3].(*lnwire.ReplyShortChanIDsEnd) - if !ok { - t.Fatalf("expected lnwire.ReplyShortChanIDsEnd "+ - "instead got %T", msgs[3]) - } - if finalMsg.Complete != 1 { - t.Fatalf("complete wasn't set") + switch { + case isQueryReply && + !reflect.DeepEqual(queryReply[i], msgs[0]): + + t.Fatalf("wrong message: expected %v, got %v", + spew.Sdump(queryReply[i]), + spew.Sdump(msgs[0])) + + case !isQueryReply && !ok: + t.Fatalf("expected lnwire.ReplyShortChanIDsEnd"+ + " instead got %T", msgs[3]) + + case !isQueryReply && finalMsg.Complete != 1: + t.Fatalf("complete wasn't set") + } } } }