Merge pull request #2985 from johng/sub-batch
Broadcast gossip announcements in sub batches
This commit is contained in:
commit
f8287b0080
@ -205,6 +205,14 @@ type Config struct {
|
|||||||
// activeSyncer due to the current one not completing its state machine
|
// activeSyncer due to the current one not completing its state machine
|
||||||
// within the timeout.
|
// within the timeout.
|
||||||
ActiveSyncerTimeoutTicker ticker.Ticker
|
ActiveSyncerTimeoutTicker ticker.Ticker
|
||||||
|
|
||||||
|
// MinimumBatchSize is minimum size of a sub batch of announcement
|
||||||
|
// messages.
|
||||||
|
MinimumBatchSize int
|
||||||
|
|
||||||
|
// SubBatchDelay is the delay between sending sub batches of
|
||||||
|
// gossip messages.
|
||||||
|
SubBatchDelay time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// AuthenticatedGossiper is a subsystem which is responsible for receiving
|
// AuthenticatedGossiper is a subsystem which is responsible for receiving
|
||||||
@ -881,6 +889,71 @@ func (d *deDupedAnnouncements) Emit() []msgWithSenders {
|
|||||||
return msgs
|
return msgs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// calculateSubBatchSize is a helper function that calculates the size to break
|
||||||
|
// down the batchSize into.
|
||||||
|
func calculateSubBatchSize(totalDelay, subBatchDelay time.Duration,
|
||||||
|
minimumBatchSize, batchSize int) int {
|
||||||
|
if subBatchDelay > totalDelay {
|
||||||
|
return batchSize
|
||||||
|
}
|
||||||
|
|
||||||
|
subBatchSize := (int(batchSize)*int(subBatchDelay) + int(totalDelay) - 1) /
|
||||||
|
int(totalDelay)
|
||||||
|
|
||||||
|
if subBatchSize < minimumBatchSize {
|
||||||
|
return minimumBatchSize
|
||||||
|
}
|
||||||
|
|
||||||
|
return subBatchSize
|
||||||
|
}
|
||||||
|
|
||||||
|
// splitAnnouncementBatches takes an exiting list of announcements and
|
||||||
|
// decomposes it into sub batches controlled by the `subBatchSize`.
|
||||||
|
func splitAnnouncementBatches(subBatchSize int,
|
||||||
|
announcementBatch []msgWithSenders) [][]msgWithSenders {
|
||||||
|
var splitAnnouncementBatch [][]msgWithSenders
|
||||||
|
|
||||||
|
for subBatchSize < len(announcementBatch) {
|
||||||
|
// For slicing with minimal allocation
|
||||||
|
// https://github.com/golang/go/wiki/SliceTricks
|
||||||
|
announcementBatch, splitAnnouncementBatch =
|
||||||
|
announcementBatch[subBatchSize:],
|
||||||
|
append(splitAnnouncementBatch,
|
||||||
|
announcementBatch[0:subBatchSize:subBatchSize])
|
||||||
|
}
|
||||||
|
splitAnnouncementBatch = append(splitAnnouncementBatch, announcementBatch)
|
||||||
|
|
||||||
|
return splitAnnouncementBatch
|
||||||
|
}
|
||||||
|
|
||||||
|
// sendBatch broadcasts a list of announcements to our peers.
|
||||||
|
func (d *AuthenticatedGossiper) sendBatch(announcementBatch []msgWithSenders) {
|
||||||
|
syncerPeers := d.syncMgr.GossipSyncers()
|
||||||
|
|
||||||
|
// We'll first attempt to filter out this new message
|
||||||
|
// for all peers that have active gossip syncers
|
||||||
|
// active.
|
||||||
|
for _, syncer := range syncerPeers {
|
||||||
|
syncer.FilterGossipMsgs(announcementBatch...)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, msgChunk := range announcementBatch {
|
||||||
|
// With the syncers taken care of, we'll merge
|
||||||
|
// the sender map with the set of syncers, so
|
||||||
|
// we don't send out duplicate messages.
|
||||||
|
msgChunk.mergeSyncerMap(syncerPeers)
|
||||||
|
|
||||||
|
err := d.cfg.Broadcast(
|
||||||
|
msgChunk.senders, msgChunk.msg,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Unable to send batch "+
|
||||||
|
"announcements: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// networkHandler is the primary goroutine that drives this service. The roles
|
// networkHandler is the primary goroutine that drives this service. The roles
|
||||||
// of this goroutine includes answering queries related to the state of the
|
// of this goroutine includes answering queries related to the state of the
|
||||||
// network, syncing up newly connected peers, and also periodically
|
// network, syncing up newly connected peers, and also periodically
|
||||||
@ -1072,39 +1145,33 @@ func (d *AuthenticatedGossiper) networkHandler() {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// For the set of peers that have an active gossip
|
|
||||||
// syncers, we'll collect their pubkeys so we can avoid
|
|
||||||
// sending them the full message blast below.
|
|
||||||
syncerPeers := d.syncMgr.GossipSyncers()
|
|
||||||
|
|
||||||
log.Infof("Broadcasting batch of %v new announcements",
|
|
||||||
len(announcementBatch))
|
|
||||||
|
|
||||||
// We'll first attempt to filter out this new message
|
|
||||||
// for all peers that have active gossip syncers
|
|
||||||
// active.
|
|
||||||
for _, syncer := range syncerPeers {
|
|
||||||
syncer.FilterGossipMsgs(announcementBatch...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Next, If we have new things to announce then
|
// Next, If we have new things to announce then
|
||||||
// broadcast them to all our immediately connected
|
// broadcast them to all our immediately connected
|
||||||
// peers.
|
// peers.
|
||||||
for _, msgChunk := range announcementBatch {
|
subBatchSize := calculateSubBatchSize(
|
||||||
// With the syncers taken care of, we'll merge
|
d.cfg.TrickleDelay, d.cfg.SubBatchDelay, d.cfg.MinimumBatchSize,
|
||||||
// the sender map with the set of syncers, so
|
len(announcementBatch),
|
||||||
// we don't send out duplicate messages.
|
)
|
||||||
msgChunk.mergeSyncerMap(syncerPeers)
|
|
||||||
|
|
||||||
err := d.cfg.Broadcast(
|
splitAnnouncementBatch := splitAnnouncementBatches(
|
||||||
msgChunk.senders, msgChunk.msg,
|
subBatchSize, announcementBatch,
|
||||||
)
|
)
|
||||||
if err != nil {
|
|
||||||
log.Errorf("unable to send batch "+
|
d.wg.Add(1)
|
||||||
"announcements: %v", err)
|
go func() {
|
||||||
continue
|
defer d.wg.Done()
|
||||||
|
log.Infof("Broadcasting %v new announcements in %d sub batches",
|
||||||
|
len(announcementBatch), len(splitAnnouncementBatch))
|
||||||
|
|
||||||
|
for _, announcementBatch := range splitAnnouncementBatch {
|
||||||
|
d.sendBatch(announcementBatch)
|
||||||
|
select {
|
||||||
|
case <-time.After(d.cfg.SubBatchDelay):
|
||||||
|
case <-d.quit:
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}()
|
||||||
|
|
||||||
// The retransmission timer has ticked which indicates that we
|
// The retransmission timer has ticked which indicates that we
|
||||||
// should check if we need to prune or re-broadcast any of our
|
// should check if we need to prune or re-broadcast any of our
|
||||||
|
@ -752,6 +752,8 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) {
|
|||||||
HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval),
|
HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval),
|
||||||
NumActiveSyncers: 3,
|
NumActiveSyncers: 3,
|
||||||
AnnSigner: &mockSigner{nodeKeyPriv1},
|
AnnSigner: &mockSigner{nodeKeyPriv1},
|
||||||
|
SubBatchDelay: time.Second * 5,
|
||||||
|
MinimumBatchSize: 10,
|
||||||
}, nodeKeyPub1)
|
}, nodeKeyPub1)
|
||||||
|
|
||||||
if err := gossiper.Start(); err != nil {
|
if err := gossiper.Start(); err != nil {
|
||||||
@ -1493,6 +1495,8 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
|
|||||||
RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval),
|
RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval),
|
||||||
HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval),
|
HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval),
|
||||||
NumActiveSyncers: 3,
|
NumActiveSyncers: 3,
|
||||||
|
MinimumBatchSize: 10,
|
||||||
|
SubBatchDelay: time.Second * 5,
|
||||||
}, ctx.gossiper.selfKey)
|
}, ctx.gossiper.selfKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unable to recreate gossiper: %v", err)
|
t.Fatalf("unable to recreate gossiper: %v", err)
|
||||||
@ -3544,3 +3548,98 @@ func assertMessage(t *testing.T, expected, got lnwire.Message) {
|
|||||||
spew.Sdump(got))
|
spew.Sdump(got))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestSplitAnnouncementsCorrectSubBatches checks that we split a given
|
||||||
|
// sizes of announcement list into the correct number of batches.
|
||||||
|
func TestSplitAnnouncementsCorrectSubBatches(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
const subBatchSize = 10
|
||||||
|
|
||||||
|
announcementBatchSizes := []int{2, 5, 20, 45, 80, 100, 1005}
|
||||||
|
expectedNumberMiniBatches := []int{1, 1, 2, 5, 8, 10, 101}
|
||||||
|
|
||||||
|
lengthAnnouncementBatchSizes := len(announcementBatchSizes)
|
||||||
|
lengthExpectedNumberMiniBatches := len(expectedNumberMiniBatches)
|
||||||
|
|
||||||
|
if lengthAnnouncementBatchSizes != lengthExpectedNumberMiniBatches {
|
||||||
|
t.Fatal("Length of announcementBatchSizes and " +
|
||||||
|
"expectedNumberMiniBatches should be equal")
|
||||||
|
}
|
||||||
|
|
||||||
|
for testIndex := range announcementBatchSizes {
|
||||||
|
var batchSize = announcementBatchSizes[testIndex]
|
||||||
|
announcementBatch := make([]msgWithSenders, batchSize)
|
||||||
|
|
||||||
|
splitAnnouncementBatch := splitAnnouncementBatches(
|
||||||
|
subBatchSize, announcementBatch,
|
||||||
|
)
|
||||||
|
|
||||||
|
lengthMiniBatches := len(splitAnnouncementBatch)
|
||||||
|
|
||||||
|
if lengthMiniBatches != expectedNumberMiniBatches[testIndex] {
|
||||||
|
t.Fatalf("Expecting %d mini batches, actual %d",
|
||||||
|
expectedNumberMiniBatches[testIndex], lengthMiniBatches)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func assertCorrectSubBatchSize(t *testing.T, expectedSubBatchSize,
|
||||||
|
actualSubBatchSize int) {
|
||||||
|
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
if actualSubBatchSize != expectedSubBatchSize {
|
||||||
|
t.Fatalf("Expecting subBatch size of %d, actual %d",
|
||||||
|
expectedSubBatchSize, actualSubBatchSize)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestCalculateCorrectSubBatchSize checks that we check the correct
|
||||||
|
// sub batch size for each of the input vectors of batch sizes.
|
||||||
|
func TestCalculateCorrectSubBatchSizes(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
const minimumSubBatchSize = 10
|
||||||
|
const batchDelay = time.Duration(100)
|
||||||
|
const subBatchDelay = time.Duration(10)
|
||||||
|
|
||||||
|
batchSizes := []int{2, 200, 250, 305, 352, 10010, 1000001}
|
||||||
|
expectedSubBatchSize := []int{10, 20, 25, 31, 36, 1001, 100001}
|
||||||
|
|
||||||
|
for testIndex := range batchSizes {
|
||||||
|
batchSize := batchSizes[testIndex]
|
||||||
|
expectedBatchSize := expectedSubBatchSize[testIndex]
|
||||||
|
|
||||||
|
actualSubBatchSize := calculateSubBatchSize(
|
||||||
|
batchDelay, subBatchDelay, minimumSubBatchSize, batchSize,
|
||||||
|
)
|
||||||
|
|
||||||
|
assertCorrectSubBatchSize(t, expectedBatchSize, actualSubBatchSize)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestCalculateCorrectSubBatchSizesDifferentDelay checks that we check the
|
||||||
|
// correct sub batch size for each of different delay.
|
||||||
|
func TestCalculateCorrectSubBatchSizesDifferentDelay(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
const batchSize = 100
|
||||||
|
const minimumSubBatchSize = 10
|
||||||
|
|
||||||
|
batchDelays := []time.Duration{100, 50, 20, 25, 5, 0}
|
||||||
|
const subBatchDelay = 10
|
||||||
|
|
||||||
|
expectedSubBatchSize := []int{10, 20, 50, 40, 100, 100}
|
||||||
|
|
||||||
|
for testIndex := range batchDelays {
|
||||||
|
batchDelay := batchDelays[testIndex]
|
||||||
|
expectedBatchSize := expectedSubBatchSize[testIndex]
|
||||||
|
|
||||||
|
actualSubBatchSize := calculateSubBatchSize(
|
||||||
|
batchDelay, subBatchDelay, minimumSubBatchSize, batchSize,
|
||||||
|
)
|
||||||
|
|
||||||
|
assertCorrectSubBatchSize(t, expectedBatchSize, actualSubBatchSize)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -694,6 +694,8 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
|
|||||||
RotateTicker: ticker.New(discovery.DefaultSyncerRotationInterval),
|
RotateTicker: ticker.New(discovery.DefaultSyncerRotationInterval),
|
||||||
HistoricalSyncTicker: ticker.New(cfg.HistoricalSyncInterval),
|
HistoricalSyncTicker: ticker.New(cfg.HistoricalSyncInterval),
|
||||||
NumActiveSyncers: cfg.NumGraphSyncPeers,
|
NumActiveSyncers: cfg.NumGraphSyncPeers,
|
||||||
|
MinimumBatchSize: 10,
|
||||||
|
SubBatchDelay: time.Second * 5,
|
||||||
},
|
},
|
||||||
s.identityPriv.PubKey(),
|
s.identityPriv.PubKey(),
|
||||||
)
|
)
|
||||||
|
Loading…
Reference in New Issue
Block a user