discovery+server: RetransmitDelay->RetransmitTicker

Also let retransmitStaleChannels take a timestamp, to make it easier to
test.
This commit is contained in:
Johan T. Halseth 2019-09-10 14:37:17 +02:00
parent 74c9551564
commit e201fbe396
No known key found for this signature in database
GPG Key ID: 15BAADA29DA20D26
3 changed files with 19 additions and 13 deletions

View File

@ -156,9 +156,10 @@ type Config struct {
// the last trickle tick. // the last trickle tick.
TrickleDelay time.Duration TrickleDelay time.Duration
// RetransmitDelay is the period of a timer which indicates that we // RetransmitTicker is a ticker that ticks with a period which
// should check if we need re-broadcast any of our personal channels. // indicates that we should check if we need re-broadcast any of our
RetransmitDelay time.Duration // personal channels.
RetransmitTicker ticker.Ticker
// RebroadcastInterval is the maximum time we wait between sending out // RebroadcastInterval is the maximum time we wait between sending out
// channel updates for our active channels and our own node // channel updates for our active channels and our own node
@ -888,15 +889,15 @@ func (d *AuthenticatedGossiper) networkHandler() {
announcements := deDupedAnnouncements{} announcements := deDupedAnnouncements{}
announcements.Reset() announcements.Reset()
retransmitTimer := time.NewTicker(d.cfg.RetransmitDelay) d.cfg.RetransmitTicker.Resume()
defer retransmitTimer.Stop() defer d.cfg.RetransmitTicker.Stop()
trickleTimer := time.NewTicker(d.cfg.TrickleDelay) trickleTimer := time.NewTicker(d.cfg.TrickleDelay)
defer trickleTimer.Stop() defer trickleTimer.Stop()
// To start, we'll first check to see if there are any stale channels // To start, we'll first check to see if there are any stale channels
// that we need to re-transmit. // that we need to re-transmit.
if err := d.retransmitStaleChannels(); err != nil { if err := d.retransmitStaleChannels(time.Now()); err != nil {
log.Errorf("Unable to rebroadcast stale channels: %v", err) log.Errorf("Unable to rebroadcast stale channels: %v", err)
} }
@ -1111,8 +1112,8 @@ func (d *AuthenticatedGossiper) networkHandler() {
// personal channels. This addresses the case of "zombie" // personal channels. This addresses the case of "zombie"
// channels and channel advertisements that have been dropped, // channels and channel advertisements that have been dropped,
// or not properly propagated through the network. // or not properly propagated through the network.
case <-retransmitTimer.C: case tick := <-d.cfg.RetransmitTicker.Ticks():
if err := d.retransmitStaleChannels(); err != nil { if err := d.retransmitStaleChannels(tick); err != nil {
log.Errorf("unable to rebroadcast stale "+ log.Errorf("unable to rebroadcast stale "+
"channels: %v", err) "channels: %v", err)
} }
@ -1166,7 +1167,7 @@ func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message) bool {
// is known to maintain to check to see if any of them are "stale". A channel // is known to maintain to check to see if any of them are "stale". A channel
// is stale iff, the last timestamp of its rebroadcast is older then // is stale iff, the last timestamp of its rebroadcast is older then
// broadcastInterval. // broadcastInterval.
func (d *AuthenticatedGossiper) retransmitStaleChannels() error { func (d *AuthenticatedGossiper) retransmitStaleChannels(now time.Time) error {
// Iterate over all of our channels and check if any of them fall // Iterate over all of our channels and check if any of them fall
// within the prune interval or re-broadcast interval. // within the prune interval or re-broadcast interval.
type updateTuple struct { type updateTuple struct {
@ -1200,7 +1201,7 @@ func (d *AuthenticatedGossiper) retransmitStaleChannels() error {
return nil return nil
} }
timeElapsed := time.Since(edge.LastUpdate) timeElapsed := now.Sub(edge.LastUpdate)
// If it's been longer than RebroadcastInterval since we've // If it's been longer than RebroadcastInterval since we've
// re-broadcasted the channel, add the channel to the set of // re-broadcasted the channel, add the channel to the set of

View File

@ -58,6 +58,8 @@ var (
trickleDelay = time.Millisecond * 100 trickleDelay = time.Millisecond * 100
retransmitDelay = time.Hour * 1 retransmitDelay = time.Hour * 1
proofMatureDelta uint32 proofMatureDelta uint32
rebroadcastInterval = time.Hour * 1000000
) )
// makeTestDB creates a new instance of the ChannelDB for testing purposes. A // makeTestDB creates a new instance of the ChannelDB for testing purposes. A
@ -741,7 +743,8 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) {
}, },
Router: router, Router: router,
TrickleDelay: trickleDelay, TrickleDelay: trickleDelay,
RetransmitDelay: retransmitDelay, RetransmitTicker: ticker.NewForce(retransmitDelay),
RebroadcastInterval: rebroadcastInterval,
ProofMatureDelta: proofMatureDelta, ProofMatureDelta: proofMatureDelta,
WaitingProofStore: waitingProofStore, WaitingProofStore: waitingProofStore,
MessageStore: newMockMessageStore(), MessageStore: newMockMessageStore(),
@ -1493,7 +1496,8 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
NotifyWhenOffline: ctx.gossiper.reliableSender.cfg.NotifyWhenOffline, NotifyWhenOffline: ctx.gossiper.reliableSender.cfg.NotifyWhenOffline,
Router: ctx.gossiper.cfg.Router, Router: ctx.gossiper.cfg.Router,
TrickleDelay: trickleDelay, TrickleDelay: trickleDelay,
RetransmitDelay: retransmitDelay, RetransmitTicker: ticker.NewForce(retransmitDelay),
RebroadcastInterval: rebroadcastInterval,
ProofMatureDelta: proofMatureDelta, ProofMatureDelta: proofMatureDelta,
WaitingProofStore: ctx.gossiper.cfg.WaitingProofStore, WaitingProofStore: ctx.gossiper.cfg.WaitingProofStore,
MessageStore: ctx.gossiper.cfg.MessageStore, MessageStore: ctx.gossiper.cfg.MessageStore,
@ -1657,6 +1661,7 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) {
t.Fatal("channel update announcement was broadcast") t.Fatal("channel update announcement was broadcast")
case <-time.After(2 * trickleDelay): case <-time.After(2 * trickleDelay):
} }
select { select {
case msg := <-sentToPeer: case msg := <-sentToPeer:
assertMessage(t, batch.chanUpdAnn1, msg) assertMessage(t, batch.chanUpdAnn1, msg)

View File

@ -719,7 +719,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB,
NotifyWhenOffline: s.NotifyWhenOffline, NotifyWhenOffline: s.NotifyWhenOffline,
ProofMatureDelta: 0, ProofMatureDelta: 0,
TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay), TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay),
RetransmitDelay: time.Minute * 30, RetransmitTicker: ticker.New(time.Minute * 30),
RebroadcastInterval: time.Hour * 24, RebroadcastInterval: time.Hour * 24,
WaitingProofStore: waitingProofStore, WaitingProofStore: waitingProofStore,
MessageStore: gossipMessageStore, MessageStore: gossipMessageStore,