discovery: don't prune *our* channels during retransmission tick

This commit modifies the recently modified logic for self-channel
retransmission to exclude pruning *our* channels which haven’t been
updated since the broadcastInterval. Instead, we only re-broadcast
channels of ours that haven’t been updated in 24 hours.
This commit is contained in:
Olaoluwa Osuntokun 2017-10-04 19:30:54 -07:00
parent ebd2dfbfd9
commit 81cd954cfc
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21
2 changed files with 38 additions and 40 deletions

@ -690,7 +690,8 @@ func (c *ChannelGraph) PruneTip() (*chainhash.Hash, uint32, error) {
} }
// DeleteChannelEdge removes an edge from the database as identified by it's // DeleteChannelEdge removes an edge from the database as identified by it's
// funding outpoint. If the edge does not exist within the database, then this // funding outpoint. If the edge does not exist within the database, then
// ErrEdgeNotFound will be returned.
func (c *ChannelGraph) DeleteChannelEdge(chanPoint *wire.OutPoint) error { func (c *ChannelGraph) DeleteChannelEdge(chanPoint *wire.OutPoint) error {
// TODO(roasbeef): possibly delete from node bucket if node has no more // TODO(roasbeef): possibly delete from node bucket if node has no more
// channels // channels

@ -93,10 +93,7 @@ type Config struct {
TrickleDelay time.Duration TrickleDelay time.Duration
// RetransmitDelay is the period of a timer which indicates that we // RetransmitDelay is the period of a timer which indicates that we
// should check if we need to prune or re-broadcast any of our // should check if we need re-broadcast any of our personal channels.
// personal channels. This addresses the case of "zombie" channels and
// channel advertisements that have been dropped, or not properly
// propagated through the network.
RetransmitDelay time.Duration RetransmitDelay time.Duration
// DB is a global boltdb instance which is needed to pass it in waiting // DB is a global boltdb instance which is needed to pass it in waiting
@ -461,27 +458,24 @@ func (d *AuthenticatedGossiper) networkHandler() {
case <-retransmitTimer.C: case <-retransmitTimer.C:
var selfChans []lnwire.Message var selfChans []lnwire.Message
// Iterate over all of our channels and check if any of them fall within // Iterate over all of our channels and check if any of
// the prune interval or re-broadcast interval. // them fall within the prune interval or re-broadcast
err := d.cfg.Router.ForAllOutgoingChannels(func(info *channeldb.ChannelEdgeInfo, // interval.
err := d.cfg.Router.ForAllOutgoingChannels(func(
info *channeldb.ChannelEdgeInfo,
edge *channeldb.ChannelEdgePolicy) error { edge *channeldb.ChannelEdgePolicy) error {
const pruneInterval = time.Hour * 24 * 14 const broadcastInterval = time.Hour * 24
const broadcastInterval = time.Hour * 24 * 13
timeElapsed := time.Since(edge.LastUpdate) timeElapsed := time.Since(edge.LastUpdate)
// Prune the edge if it is has not been updated for the past 2 weeks. // If it's been a full day since we've
// Rebroadcast edge if its last update is close to the 2-week interval. // re-broadcasted the channel, then we'll
if timeElapsed >= pruneInterval { // re-sign it with an updated time stamp.
err := d.cfg.Router.DeleteEdge(info) if timeElapsed >= broadcastInterval {
if err != nil { // Re-sign and update the channel on
log.Errorf("unable to prune stale edge: %v", err) // disk and retrieve our ChannelUpdate
return err // to broadcast.
}
} else if timeElapsed >= broadcastInterval {
// Re-sign and update the channel on disk and retrieve our
// ChannelUpdate to broadcast.
chanUpdate, err := d.updateChannel(info, edge) chanUpdate, err := d.updateChannel(info, edge)
if err != nil { if err != nil {
log.Errorf("unable to update channel: %v", err) log.Errorf("unable to update channel: %v", err)
@ -493,11 +487,13 @@ func (d *AuthenticatedGossiper) networkHandler() {
return nil return nil
}) })
if err != nil { if err != nil {
log.Errorf("error while retrieving outgoing channels: %v", err) log.Errorf("error while retrieving outgoing "+
"channels: %v", err)
continue continue
} }
// If we don't have any channels to re-broadcast, then continue. // If we don't have any channels to re-broadcast, then
// continue.
if len(selfChans) == 0 { if len(selfChans) == 0 {
continue continue
} }
@ -505,6 +501,8 @@ func (d *AuthenticatedGossiper) networkHandler() {
log.Debugf("Retransmitting %v outgoing channels", log.Debugf("Retransmitting %v outgoing channels",
len(selfChans)) len(selfChans))
// TODO(roasbeef): also send the channel ann?
// With all the wire announcements properly crafted, // With all the wire announcements properly crafted,
// we'll broadcast our known outgoing channels to all // we'll broadcast our known outgoing channels to all
// our immediate peers. // our immediate peers.
@ -537,8 +535,7 @@ func (d *AuthenticatedGossiper) networkHandler() {
// schema applied for each specified channel identified by its channel point. // schema applied for each specified channel identified by its channel point.
// In the case that no channel points are specified, then the fee update will // In the case that no channel points are specified, then the fee update will
// be applied to all channels. Finally, the backing ChannelGraphSource is // be applied to all channels. Finally, the backing ChannelGraphSource is
// updated with the latest information reflecting the // updated with the latest information reflecting the applied fee updates.
// applied fee updates.
// //
// TODO(roasbeef): generalize into generic for any channel update // TODO(roasbeef): generalize into generic for any channel update
func (d *AuthenticatedGossiper) processFeeChanUpdate(feeUpdate *feeUpdateRequest) ([]lnwire.Message, error) { func (d *AuthenticatedGossiper) processFeeChanUpdate(feeUpdate *feeUpdateRequest) ([]lnwire.Message, error) {
@ -551,7 +548,8 @@ func (d *AuthenticatedGossiper) processFeeChanUpdate(feeUpdate *feeUpdateRequest
haveChanFilter := len(chansToUpdate) != 0 haveChanFilter := len(chansToUpdate) != 0
var signedAnns []lnwire.Message var chanUpdates []lnwire.Message
// Next, we'll loop over all the outgoing channels the router knows of. // Next, we'll loop over all the outgoing channels the router knows of.
// If we have a filter then we'll only collected those channels, // If we have a filter then we'll only collected those channels,
// otherwise we'll collect them all. // otherwise we'll collect them all.
@ -570,14 +568,14 @@ func (d *AuthenticatedGossiper) processFeeChanUpdate(feeUpdate *feeUpdateRequest
feeUpdate.newSchema.FeeRate, feeUpdate.newSchema.FeeRate,
) )
// Re-sign and update the backing ChannelGraphSource, and retrieve our // Re-sign and update the backing ChannelGraphSource, and
// ChannelUpdate to broadcast. // retrieve our ChannelUpdate to broadcast.
chanUpdate, err := d.updateChannel(info, edge) chanUpdate, err := d.updateChannel(info, edge)
if err != nil { if err != nil {
return err return err
} }
signedAnns = append(signedAnns, chanUpdate) chanUpdates = append(chanUpdates, chanUpdate)
return nil return nil
}) })
@ -585,7 +583,7 @@ func (d *AuthenticatedGossiper) processFeeChanUpdate(feeUpdate *feeUpdateRequest
return nil, err return nil, err
} }
return signedAnns, nil return chanUpdates, nil
} }
// processNetworkAnnouncement processes a new network relate authenticated // processNetworkAnnouncement processes a new network relate authenticated
@ -1169,10 +1167,9 @@ func (d *AuthenticatedGossiper) synchronizeWithNode(syncReq *syncRequest) error
return d.cfg.SendToPeer(targetNode, announceMessages...) return d.cfg.SendToPeer(targetNode, announceMessages...)
} }
// updateChannel creates a new fully signed update for the channel, // updateChannel creates a new fully signed update for the channel, and updates
// and updates the underlying graph with the new state. // the underlying graph with the new state.
func (d *AuthenticatedGossiper) updateChannel( func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo,
info *channeldb.ChannelEdgeInfo,
edge *channeldb.ChannelEdgePolicy) (*lnwire.ChannelUpdate, error) { edge *channeldb.ChannelEdgePolicy) (*lnwire.ChannelUpdate, error) {
edge.LastUpdate = time.Now() edge.LastUpdate = time.Now()
@ -1188,20 +1185,20 @@ func (d *AuthenticatedGossiper) updateChannel(
FeeRate: uint32(edge.FeeProportionalMillionths), FeeRate: uint32(edge.FeeProportionalMillionths),
} }
// With the update applied, we'll generate a new signature over // With the update applied, we'll generate a new signature over a
// a digest of the channel announcement itself. // digest of the channel announcement itself.
sig, err := SignAnnouncement(d.cfg.AnnSigner, d.selfKey, chanUpdate) sig, err := SignAnnouncement(d.cfg.AnnSigner, d.selfKey, chanUpdate)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Next, we'll set the new signature in place, and update the // Next, we'll set the new signature in place, and update the reference
// reference in the backing slice. // in the backing slice.
edge.Signature = sig edge.Signature = sig
chanUpdate.Signature = sig chanUpdate.Signature = sig
// To ensure that our signature is valid, we'll verify it // To ensure that our signature is valid, we'll verify it ourself
// ourself before committing it to the slice returned. // before committing it to the slice returned.
err = d.validateChannelUpdateAnn(d.selfKey, chanUpdate) err = d.validateChannelUpdateAnn(d.selfKey, chanUpdate)
if err != nil { if err != nil {
return nil, fmt.Errorf("generated invalid channel update "+ return nil, fmt.Errorf("generated invalid channel update "+