routing+discovery: implement 2-week network view pruning

This commit is contained in:
Brandon 2017-09-24 18:47:48 -07:00 committed by Olaoluwa Osuntokun
parent 32508344db
commit 3907ae65c2
3 changed files with 112 additions and 87 deletions

@ -92,6 +92,13 @@ type Config struct {
// the last trickle tick. // the last trickle tick.
TrickleDelay time.Duration TrickleDelay time.Duration
// RetransmitDelay is the period of a timer which indicates that we
// should check if we need to prune or re-broadcast any of our
// personal channels. This addresses the case of "zombie" channels and
// channel advertisements that have been dropped, or not properly
// propagated through the network.
RetransmitDelay time.Duration
// DB is a global boltdb instance which is needed to pass it in waiting // DB is a global boltdb instance which is needed to pass it in waiting
// proof storage to make waiting proofs persistent. // proof storage to make waiting proofs persistent.
DB *channeldb.DB DB *channeldb.DB
@ -336,11 +343,9 @@ func (d *AuthenticatedGossiper) networkHandler() {
// * can use mostly empty struct in db as place holder // * can use mostly empty struct in db as place holder
var announcementBatch []lnwire.Message var announcementBatch []lnwire.Message
// TODO(roasbeef): parametrize the above retransmitTimer := time.NewTicker(d.cfg.RetransmitDelay)
retransmitTimer := time.NewTicker(time.Minute * 30)
defer retransmitTimer.Stop() defer retransmitTimer.Stop()
// TODO(roasbeef): parametrize the above
trickleTimer := time.NewTicker(d.cfg.TrickleDelay) trickleTimer := time.NewTicker(d.cfg.TrickleDelay)
defer trickleTimer.Stop() defer trickleTimer.Stop()
@ -449,36 +454,50 @@ func (d *AuthenticatedGossiper) networkHandler() {
announcementBatch = nil announcementBatch = nil
// The retransmission timer has ticked which indicates that we // The retransmission timer has ticked which indicates that we
// should broadcast our personal channels to the network. This // should check if we need to prune or re-broadcast any of our
// addresses the case of channel advertisements whether being // personal channels. This addresses the case of "zombie" channels and
// dropped, or not properly propagated through the network. // channel advertisements that have been dropped, or not properly
// propagated through the network.
case <-retransmitTimer.C: case <-retransmitTimer.C:
var selfChans []lnwire.Message var selfChans []lnwire.Message
// Iterate over our channels and construct the // Iterate over all of our channels and check if any of them fall within
// announcements array. // the prune interval or re-broadcast interval.
err := d.cfg.Router.ForAllOutgoingChannels(func(ei *channeldb.ChannelEdgeInfo, err := d.cfg.Router.ForAllOutgoingChannels(func(info *channeldb.ChannelEdgeInfo,
p *channeldb.ChannelEdgePolicy) error { edge *channeldb.ChannelEdgePolicy) error {
c := &lnwire.ChannelUpdate{ const pruneInterval = time.Hour * 24 * 14
Signature: p.Signature, const broadcastInterval = time.Hour * 24 * 13
ShortChannelID: lnwire.NewShortChanIDFromInt(p.ChannelID),
ChainHash: ei.ChainHash, timeElapsed := time.Since(edge.LastUpdate)
Timestamp: uint32(p.LastUpdate.Unix()),
Flags: p.Flags, // Prune the edge if it is has not been updated for the past 2 weeks.
TimeLockDelta: p.TimeLockDelta, // Rebroadcast edge if its last update is close to the 2-week interval.
HtlcMinimumMsat: p.MinHTLC, if timeElapsed >= pruneInterval {
BaseFee: uint32(p.FeeBaseMSat), err := d.cfg.Router.DeleteEdge(info)
FeeRate: uint32(p.FeeProportionalMillionths), if err != nil {
log.Errorf("unable to prune stale edge: %v", err)
return err
}
} else if timeElapsed >= broadcastInterval {
// Re-sign and update the channel on disk and retrieve our
// ChannelUpdate to broadcast.
chanUpdate, err := d.updateChannel(info, edge)
if err != nil {
log.Errorf("unable to update channel: %v", err)
return err
}
selfChans = append(selfChans, chanUpdate)
} }
selfChans = append(selfChans, c)
return nil return nil
}) })
if err != nil { if err != nil {
log.Errorf("unable to retrieve outgoing channels: %v", err) log.Errorf("error while retrieving outgoing channels: %v", err)
continue continue
} }
// If we don't have any channels to re-broadcast, then continue.
if len(selfChans) == 0 { if len(selfChans) == 0 {
continue continue
} }
@ -487,7 +506,7 @@ func (d *AuthenticatedGossiper) networkHandler() {
len(selfChans)) len(selfChans))
// With all the wire announcements properly crafted, // With all the wire announcements properly crafted,
// we'll broadcast our known outgoing channel to all // we'll broadcast our known outgoing channels to all
// our immediate peers. // our immediate peers.
if err := d.cfg.Broadcast(nil, selfChans...); err != nil { if err := d.cfg.Broadcast(nil, selfChans...); err != nil {
log.Errorf("unable to re-broadcast "+ log.Errorf("unable to re-broadcast "+
@ -532,9 +551,7 @@ func (d *AuthenticatedGossiper) processFeeChanUpdate(feeUpdate *feeUpdateRequest
haveChanFilter := len(chansToUpdate) != 0 haveChanFilter := len(chansToUpdate) != 0
var chanUpdates []*lnwire.ChannelUpdate var signedAnns []lnwire.Message
chanEdges := make(map[lnwire.ShortChannelID]*channeldb.ChannelEdgePolicy)
// Next, we'll loop over all the outgoing channels the router knows of. // Next, we'll loop over all the outgoing channels the router knows of.
// If we have a filter then we'll only collected those channels, // If we have a filter then we'll only collected those channels,
// otherwise we'll collect them all. // otherwise we'll collect them all.
@ -547,76 +564,25 @@ func (d *AuthenticatedGossiper) processFeeChanUpdate(feeUpdate *feeUpdateRequest
return nil return nil
} }
// Otherwise, add the channel update to our batch to be // Apply the new fee schema to the edge.
// updated, as we'll be re-signing it shortly.
c := &lnwire.ChannelUpdate{
Signature: edge.Signature,
ChainHash: info.ChainHash,
ShortChannelID: lnwire.NewShortChanIDFromInt(edge.ChannelID),
Timestamp: uint32(edge.LastUpdate.Unix()),
Flags: edge.Flags,
TimeLockDelta: edge.TimeLockDelta,
HtlcMinimumMsat: edge.MinHTLC,
BaseFee: uint32(edge.FeeBaseMSat),
FeeRate: uint32(edge.FeeProportionalMillionths),
}
chanUpdates = append(chanUpdates, c)
// We'll also add it to our edge map so we can find it easily
// later to update the state within the database.
chanEdges[c.ShortChannelID] = edge
return nil
})
if err != nil {
return nil, err
}
// With the set of channel updates we need to sign obtained, we'll not
// generate new signatures for each of them using applying the new fee
// schema before signing.
signedAnns := make([]lnwire.Message, len(chanUpdates))
for i, chanUpdate := range chanUpdates {
edge := chanEdges[chanUpdate.ShortChannelID]
now := time.Now()
// First, we'll apply the new few schema update to the channel
// update and also the backing database struct.
chanUpdate.BaseFee = uint32(feeUpdate.newSchema.BaseFee)
chanUpdate.FeeRate = feeUpdate.newSchema.FeeRate
chanUpdate.Timestamp = uint32(now.Unix())
edge.FeeBaseMSat = feeUpdate.newSchema.BaseFee edge.FeeBaseMSat = feeUpdate.newSchema.BaseFee
edge.FeeProportionalMillionths = lnwire.MilliSatoshi( edge.FeeProportionalMillionths = lnwire.MilliSatoshi(
feeUpdate.newSchema.FeeRate, feeUpdate.newSchema.FeeRate,
) )
edge.LastUpdate = now
// With the update applied, we'll generate a new signature over // Re-sign and update the backing ChannelGraphSource, and retrieve our
// a digest of the channel announcement itself. // ChannelUpdate to broadcast.
sig, err := SignAnnouncement(d.cfg.AnnSigner, d.selfKey, chanUpdate, err := d.updateChannel(info, edge)
chanUpdate)
if err != nil { if err != nil {
return nil, err return err
} }
// Next, we'll set the new signature in place, and update the signedAnns = append(signedAnns, chanUpdate)
// reference in the backing slice.
edge.Signature = sig
chanUpdate.Signature = sig
signedAnns[i] = chanUpdate
// To ensure that our signature is valid, we'll verify it return nil
// ourself before committing it to the slice returned. })
err = d.validateChannelUpdateAnn(d.selfKey, chanUpdate) if err != nil {
if err != nil { return nil, err
return nil, fmt.Errorf("generated invalid channel update "+
"sig: %v", err)
}
// Finally, we'll update the fee schema for this edge on disk.
edge.Node.PubKey.Curve = nil
if err := d.cfg.Router.UpdateEdge(edge); err != nil {
return nil, err
}
} }
return signedAnns, nil return signedAnns, nil
@ -1202,3 +1168,51 @@ func (d *AuthenticatedGossiper) synchronizeWithNode(syncReq *syncRequest) error
// single batch to the target peer. // single batch to the target peer.
return d.cfg.SendToPeer(targetNode, announceMessages...) return d.cfg.SendToPeer(targetNode, announceMessages...)
} }
// updateChannel creates a new fully signed update for the channel,
// and updates the underlying graph with the new state.
func (d *AuthenticatedGossiper) updateChannel(
info *channeldb.ChannelEdgeInfo,
edge *channeldb.ChannelEdgePolicy) (*lnwire.ChannelUpdate, error) {
edge.LastUpdate = time.Now()
chanUpdate := &lnwire.ChannelUpdate{
Signature: edge.Signature,
ChainHash: info.ChainHash,
ShortChannelID: lnwire.NewShortChanIDFromInt(edge.ChannelID),
Timestamp: uint32(edge.LastUpdate.Unix()),
Flags: edge.Flags,
TimeLockDelta: edge.TimeLockDelta,
HtlcMinimumMsat: edge.MinHTLC,
BaseFee: uint32(edge.FeeBaseMSat),
FeeRate: uint32(edge.FeeProportionalMillionths),
}
// With the update applied, we'll generate a new signature over
// a digest of the channel announcement itself.
sig, err := SignAnnouncement(d.cfg.AnnSigner, d.selfKey, chanUpdate)
if err != nil {
return nil, err
}
// Next, we'll set the new signature in place, and update the
// reference in the backing slice.
edge.Signature = sig
chanUpdate.Signature = sig
// To ensure that our signature is valid, we'll verify it
// ourself before committing it to the slice returned.
err = d.validateChannelUpdateAnn(d.selfKey, chanUpdate)
if err != nil {
return nil, fmt.Errorf("generated invalid channel update "+
"sig: %v", err)
}
// Finally, we'll write the new edge policy to disk.
edge.Node.PubKey.Curve = nil
if err := d.cfg.Router.UpdateEdge(edge); err != nil {
return nil, err
}
return chanUpdate, err
}

@ -39,6 +39,9 @@ type ChannelGraphSource interface {
// edge/channel might be used in construction of payment path. // edge/channel might be used in construction of payment path.
AddEdge(edge *channeldb.ChannelEdgeInfo) error AddEdge(edge *channeldb.ChannelEdgeInfo) error
// DeleteEdge is used to delete an edge from the router database.
DeleteEdge(edge *channeldb.ChannelEdgeInfo) error
// AddProof updates the channel edge info with proof which is needed to // AddProof updates the channel edge info with proof which is needed to
// properly announce the edge to the rest of the network. // properly announce the edge to the rest of the network.
AddProof(chanID lnwire.ShortChannelID, proof *channeldb.ChannelAuthProof) error AddProof(chanID lnwire.ShortChannelID, proof *channeldb.ChannelAuthProof) error
@ -1294,6 +1297,13 @@ func (r *ChannelRouter) AddEdge(edge *channeldb.ChannelEdgeInfo) error {
} }
} }
// DeleteEdge is used to delete an edge from the router database.
//
// NOTE: This method is part of the ChannelGraphSource interface.
func (r *ChannelRouter) DeleteEdge(edge *channeldb.ChannelEdgeInfo) error {
return r.cfg.Graph.DeleteChannelEdge(&edge.ChannelPoint)
}
// UpdateEdge is used to update edge information, without this message edge // UpdateEdge is used to update edge information, without this message edge
// considered as not fully constructed. // considered as not fully constructed.
// //

@ -279,6 +279,7 @@ func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl,
SendToPeer: s.SendToPeer, SendToPeer: s.SendToPeer,
ProofMatureDelta: 0, ProofMatureDelta: 0,
TrickleDelay: time.Millisecond * 300, TrickleDelay: time.Millisecond * 300,
RetransmitDelay: time.Minute * 30,
DB: chanDB, DB: chanDB,
AnnSigner: s.nodeSigner, AnnSigner: s.nodeSigner,
}, },