diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 1b367c22..cd2d5573 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -205,6 +205,14 @@ type Config struct { // activeSyncer due to the current one not completing its state machine // within the timeout. ActiveSyncerTimeoutTicker ticker.Ticker + + // MinimumBatchSize is minimum size of a sub batch of announcement + // messages. + MinimumBatchSize int + + // SubBatchDelay is the delay between sending sub batches of + // gossip messages. + SubBatchDelay time.Duration } // AuthenticatedGossiper is a subsystem which is responsible for receiving @@ -881,6 +889,71 @@ func (d *deDupedAnnouncements) Emit() []msgWithSenders { return msgs } +// calculateSubBatchSize is a helper function that calculates the size to break +// down the batchSize into. +func calculateSubBatchSize(totalDelay, subBatchDelay time.Duration, + minimumBatchSize, batchSize int) int { + if subBatchDelay > totalDelay { + return batchSize + } + + subBatchSize := (int(batchSize)*int(subBatchDelay) + int(totalDelay) - 1) / + int(totalDelay) + + if subBatchSize < minimumBatchSize { + return minimumBatchSize + } + + return subBatchSize +} + +// splitAnnouncementBatches takes an exiting list of announcements and +// decomposes it into sub batches controlled by the `subBatchSize`. +func splitAnnouncementBatches(subBatchSize int, + announcementBatch []msgWithSenders) [][]msgWithSenders { + var splitAnnouncementBatch [][]msgWithSenders + + for subBatchSize < len(announcementBatch) { + // For slicing with minimal allocation + // https://github.com/golang/go/wiki/SliceTricks + announcementBatch, splitAnnouncementBatch = + announcementBatch[subBatchSize:], + append(splitAnnouncementBatch, + announcementBatch[0:subBatchSize:subBatchSize]) + } + splitAnnouncementBatch = append(splitAnnouncementBatch, announcementBatch) + + return splitAnnouncementBatch +} + +// sendBatch broadcasts a list of announcements to our peers. +func (d *AuthenticatedGossiper) sendBatch(announcementBatch []msgWithSenders) { + syncerPeers := d.syncMgr.GossipSyncers() + + // We'll first attempt to filter out this new message + // for all peers that have active gossip syncers + // active. + for _, syncer := range syncerPeers { + syncer.FilterGossipMsgs(announcementBatch...) + } + + for _, msgChunk := range announcementBatch { + // With the syncers taken care of, we'll merge + // the sender map with the set of syncers, so + // we don't send out duplicate messages. + msgChunk.mergeSyncerMap(syncerPeers) + + err := d.cfg.Broadcast( + msgChunk.senders, msgChunk.msg, + ) + if err != nil { + log.Errorf("Unable to send batch "+ + "announcements: %v", err) + continue + } + } +} + // networkHandler is the primary goroutine that drives this service. The roles // of this goroutine includes answering queries related to the state of the // network, syncing up newly connected peers, and also periodically @@ -1072,39 +1145,33 @@ func (d *AuthenticatedGossiper) networkHandler() { continue } - // For the set of peers that have an active gossip - // syncers, we'll collect their pubkeys so we can avoid - // sending them the full message blast below. - syncerPeers := d.syncMgr.GossipSyncers() - - log.Infof("Broadcasting batch of %v new announcements", - len(announcementBatch)) - - // We'll first attempt to filter out this new message - // for all peers that have active gossip syncers - // active. - for _, syncer := range syncerPeers { - syncer.FilterGossipMsgs(announcementBatch...) - } - // Next, If we have new things to announce then // broadcast them to all our immediately connected // peers. - for _, msgChunk := range announcementBatch { - // With the syncers taken care of, we'll merge - // the sender map with the set of syncers, so - // we don't send out duplicate messages. - msgChunk.mergeSyncerMap(syncerPeers) + subBatchSize := calculateSubBatchSize( + d.cfg.TrickleDelay, d.cfg.SubBatchDelay, d.cfg.MinimumBatchSize, + len(announcementBatch), + ) - err := d.cfg.Broadcast( - msgChunk.senders, msgChunk.msg, - ) - if err != nil { - log.Errorf("unable to send batch "+ - "announcements: %v", err) - continue + splitAnnouncementBatch := splitAnnouncementBatches( + subBatchSize, announcementBatch, + ) + + d.wg.Add(1) + go func() { + defer d.wg.Done() + log.Infof("Broadcasting %v new announcements in %d sub batches", + len(announcementBatch), len(splitAnnouncementBatch)) + + for _, announcementBatch := range splitAnnouncementBatch { + d.sendBatch(announcementBatch) + select { + case <-time.After(d.cfg.SubBatchDelay): + case <-d.quit: + return + } } - } + }() // The retransmission timer has ticked which indicates that we // should check if we need to prune or re-broadcast any of our diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index c84ebe88..c8bf9b06 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -752,6 +752,8 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) { HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval), NumActiveSyncers: 3, AnnSigner: &mockSigner{nodeKeyPriv1}, + SubBatchDelay: time.Second * 5, + MinimumBatchSize: 10, }, nodeKeyPub1) if err := gossiper.Start(); err != nil { @@ -1493,6 +1495,8 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval), HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval), NumActiveSyncers: 3, + MinimumBatchSize: 10, + SubBatchDelay: time.Second * 5, }, ctx.gossiper.selfKey) if err != nil { t.Fatalf("unable to recreate gossiper: %v", err) @@ -3544,3 +3548,98 @@ func assertMessage(t *testing.T, expected, got lnwire.Message) { spew.Sdump(got)) } } + +// TestSplitAnnouncementsCorrectSubBatches checks that we split a given +// sizes of announcement list into the correct number of batches. +func TestSplitAnnouncementsCorrectSubBatches(t *testing.T) { + t.Parallel() + + const subBatchSize = 10 + + announcementBatchSizes := []int{2, 5, 20, 45, 80, 100, 1005} + expectedNumberMiniBatches := []int{1, 1, 2, 5, 8, 10, 101} + + lengthAnnouncementBatchSizes := len(announcementBatchSizes) + lengthExpectedNumberMiniBatches := len(expectedNumberMiniBatches) + + if lengthAnnouncementBatchSizes != lengthExpectedNumberMiniBatches { + t.Fatal("Length of announcementBatchSizes and " + + "expectedNumberMiniBatches should be equal") + } + + for testIndex := range announcementBatchSizes { + var batchSize = announcementBatchSizes[testIndex] + announcementBatch := make([]msgWithSenders, batchSize) + + splitAnnouncementBatch := splitAnnouncementBatches( + subBatchSize, announcementBatch, + ) + + lengthMiniBatches := len(splitAnnouncementBatch) + + if lengthMiniBatches != expectedNumberMiniBatches[testIndex] { + t.Fatalf("Expecting %d mini batches, actual %d", + expectedNumberMiniBatches[testIndex], lengthMiniBatches) + } + } +} + +func assertCorrectSubBatchSize(t *testing.T, expectedSubBatchSize, + actualSubBatchSize int) { + + t.Helper() + + if actualSubBatchSize != expectedSubBatchSize { + t.Fatalf("Expecting subBatch size of %d, actual %d", + expectedSubBatchSize, actualSubBatchSize) + } +} + +// TestCalculateCorrectSubBatchSize checks that we check the correct +// sub batch size for each of the input vectors of batch sizes. +func TestCalculateCorrectSubBatchSizes(t *testing.T) { + t.Parallel() + + const minimumSubBatchSize = 10 + const batchDelay = time.Duration(100) + const subBatchDelay = time.Duration(10) + + batchSizes := []int{2, 200, 250, 305, 352, 10010, 1000001} + expectedSubBatchSize := []int{10, 20, 25, 31, 36, 1001, 100001} + + for testIndex := range batchSizes { + batchSize := batchSizes[testIndex] + expectedBatchSize := expectedSubBatchSize[testIndex] + + actualSubBatchSize := calculateSubBatchSize( + batchDelay, subBatchDelay, minimumSubBatchSize, batchSize, + ) + + assertCorrectSubBatchSize(t, expectedBatchSize, actualSubBatchSize) + } +} + +// TestCalculateCorrectSubBatchSizesDifferentDelay checks that we check the +// correct sub batch size for each of different delay. +func TestCalculateCorrectSubBatchSizesDifferentDelay(t *testing.T) { + t.Parallel() + + const batchSize = 100 + const minimumSubBatchSize = 10 + + batchDelays := []time.Duration{100, 50, 20, 25, 5, 0} + const subBatchDelay = 10 + + expectedSubBatchSize := []int{10, 20, 50, 40, 100, 100} + + for testIndex := range batchDelays { + batchDelay := batchDelays[testIndex] + expectedBatchSize := expectedSubBatchSize[testIndex] + + actualSubBatchSize := calculateSubBatchSize( + batchDelay, subBatchDelay, minimumSubBatchSize, batchSize, + ) + + assertCorrectSubBatchSize(t, expectedBatchSize, actualSubBatchSize) + } +} diff --git a/server.go b/server.go index 93f99737..37b72772 100644 --- a/server.go +++ b/server.go @@ -694,6 +694,8 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, RotateTicker: ticker.New(discovery.DefaultSyncerRotationInterval), HistoricalSyncTicker: ticker.New(cfg.HistoricalSyncInterval), NumActiveSyncers: cfg.NumGraphSyncPeers, + MinimumBatchSize: 10, + SubBatchDelay: time.Second * 5, }, s.identityPriv.PubKey(), )