diff --git a/discovery/gossiper.go b/discovery/gossiper.go index ebb260ed..c0da5f53 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -346,6 +346,13 @@ func (d *AuthenticatedGossiper) networkHandler() { trickleTimer := time.NewTicker(d.cfg.TrickleDelay) defer trickleTimer.Stop() + // To start, we'll first check to see if there're any stale channels + // that we need to re-transmit. + if err := d.retransmitStaleChannels(); err != nil { + log.Errorf("unable to rebroadcast stale channels: %v", + err) + } + for { select { // A new fee update has arrived. We'll commit it to the @@ -456,58 +463,8 @@ func (d *AuthenticatedGossiper) networkHandler() { // channel advertisements that have been dropped, or not properly // propagated through the network. case <-retransmitTimer.C: - var selfChans []lnwire.Message - - // Iterate over all of our channels and check if any of - // them fall within the prune interval or re-broadcast - // interval. - err := d.cfg.Router.ForAllOutgoingChannels(func( - info *channeldb.ChannelEdgeInfo, - edge *channeldb.ChannelEdgePolicy) error { - - const broadcastInterval = time.Hour * 24 - - timeElapsed := time.Since(edge.LastUpdate) - - // If it's been a full day since we've - // re-broadcasted the channel, then we'll - // re-sign it with an updated time stamp. - if timeElapsed >= broadcastInterval { - // Re-sign and update the channel on - // disk and retrieve our ChannelUpdate - // to broadcast. - chanUpdate, err := d.updateChannel(info, edge) - if err != nil { - log.Errorf("unable to update channel: %v", err) - return err - } - selfChans = append(selfChans, chanUpdate) - } - - return nil - }) - if err != nil { - log.Errorf("error while retrieving outgoing "+ - "channels: %v", err) - continue - } - - // If we don't have any channels to re-broadcast, then - // continue. - if len(selfChans) == 0 { - continue - } - - log.Debugf("Retransmitting %v outgoing channels", - len(selfChans)) - - // TODO(roasbeef): also send the channel ann? - - // With all the wire announcements properly crafted, - // we'll broadcast our known outgoing channels to all - // our immediate peers. - if err := d.cfg.Broadcast(nil, selfChans...); err != nil { - log.Errorf("unable to re-broadcast "+ + if err := d.retransmitStaleChannels(); err != nil { + log.Errorf("unable to rebroadcast stale "+ "channels: %v", err) } @@ -531,6 +488,74 @@ func (d *AuthenticatedGossiper) networkHandler() { } } +// retransmitStaleChannels eaxmines all outgoing channels that the source node +// is known to maintain to check to see if any of them are "stale". A channel +// is stale iff, the last timestamp of it's rebroadcast is older then +// broadcastInterval. +func (d *AuthenticatedGossiper) retransmitStaleChannels() error { + // Iterate over all of our channels and check if any of them fall + // within the prune interval or re-broadcast interval. + type updateTuple struct { + info *channeldb.ChannelEdgeInfo + edge *channeldb.ChannelEdgePolicy + } + var edgesToUpdate []updateTuple + err := d.cfg.Router.ForAllOutgoingChannels(func( + info *channeldb.ChannelEdgeInfo, + edge *channeldb.ChannelEdgePolicy) error { + + const broadcastInterval = time.Hour * 24 + + timeElapsed := time.Since(edge.LastUpdate) + + // If it's been a full day since we've re-broadcasted the + // channel, add the channel to the set of edges we need to + // update. + if timeElapsed >= broadcastInterval { + edgesToUpdate = append(edgesToUpdate, updateTuple{ + info: info, + edge: edge, + }) + } + + return nil + }) + if err != nil { + return fmt.Errorf("error while retrieving outgoing "+ + "channels: %v", err) + } + + var signedUpdates []lnwire.Message + for _, chanToUpdate := range edgesToUpdate { + // Re-sign and update the channel on disk and retrieve our + // ChannelUpdate to broadcast. + chanUpdate, err := d.updateChannel(chanToUpdate.info, + chanToUpdate.edge) + if err != nil { + return fmt.Errorf("unable to update channel: %v", err) + } + signedUpdates = append(signedUpdates, chanUpdate) + } + + // If we don't have any channels to re-broadcast, then we'll exit + // early. + if len(signedUpdates) == 0 { + return nil + } + + log.Infof("Retransmitting %v outgoing channels", len(signedUpdates)) + + // TODO(roasbeef): also send the channel ann? + + // With all the wire announcements properly crafted, we'll broadcast + // our known outgoing channels to all our immediate peers. + if err := d.cfg.Broadcast(nil, signedUpdates...); err != nil { + return fmt.Errorf("unable to re-broadcast channels: %v", err) + } + + return nil +} + // processFeeChanUpdate generates a new set of channel updates with the new fee // schema applied for each specified channel identified by its channel point. // In the case that no channel points are specified, then the fee update will