From 812c2f1ce6bbdc9b99bce5aa7478910893b428af Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Mon, 21 Aug 2017 23:40:02 -0700 Subject: [PATCH] discovery: add new PropagateFeeUpdate method This commit adds a new method to the AuthenticatedGossiper: PropagateFeeUpdate. This new method will allow callers to update the fee schedule advertised for a particular channel, or all currently active channels. With this method exposed, the AuthenticatedGossiper will now craft the new channel update messages, sign the new state, commit the new policy to the underlying graph, and finally add the message to the next announcement epoch batch. --- discovery/service.go | 242 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 214 insertions(+), 28 deletions(-) diff --git a/discovery/service.go b/discovery/service.go index d83d3e9e..d84b0d1f 100644 --- a/discovery/service.go +++ b/discovery/service.go @@ -2,6 +2,7 @@ package discovery import ( "bytes" + "fmt" "sync" "sync/atomic" "time" @@ -10,10 +11,11 @@ import ( "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing" "github.com/roasbeef/btcd/btcec" - "github.com/roasbeef/btcutil" + "github.com/roasbeef/btcd/wire" ) // networkMsg couples a routing related wire message with the peer that @@ -33,6 +35,17 @@ type syncRequest struct { node *btcec.PublicKey } +// feeUpdateRequest is a request that is sent to the server when a caller +// wishes to update the fees for a particular set of tchannels. New UpdateFee +// messages will be crafted to be sent out during the next broadcast epoch and +// the fee updates committed to the lower layer. +type feeUpdateRequest struct { + targetChans []wire.OutPoint + newSchema routing.FeeSchema + + errResp chan error +} + // Config defines the configuration for the service. ALL elements within the // configuration MUST be non-nil for the service to carry out its duties. type Config struct { @@ -69,27 +82,18 @@ type Config struct { // the last trickle tick. TrickleDelay time.Duration - // DB is a global boltdb instance which is needed to pass it in - // waiting proof storage to make waiting proofs persistent. + // DB is a global boltdb instance which is needed to pass it in waiting + // proof storage to make waiting proofs persistent. DB *channeldb.DB -} -// New creates a new AuthenticatedGossiper instance, initialized with the -// passed configuration parameters. -func New(cfg Config) (*AuthenticatedGossiper, error) { - storage, err := channeldb.NewWaitingProofStore(cfg.DB) - if err != nil { - return nil, err - } - - return &AuthenticatedGossiper{ - cfg: &cfg, - networkMsgs: make(chan *networkMsg), - quit: make(chan struct{}), - syncRequests: make(chan *syncRequest), - prematureAnnouncements: make(map[uint32][]*networkMsg), - waitingProofs: storage, - }, nil + // AnnSigner is an instance of the MessageSigner interface which will + // be used to manually sign any outgoing channel updates. The signer + // implementation should be backed by the public key of the backing + // Lightning node. + // + // TODO(roasbeef): extract ann crafting + sign from fundingMgr into + // here? + AnnSigner lnwallet.MessageSigner } // AuthenticatedGossiper is a subsystem which is responsible for receiving @@ -143,9 +147,36 @@ type AuthenticatedGossiper struct { // our PoV. syncRequests chan *syncRequest + // feeUpdates is a channel that requests to update the fee schedule of + // a set of channels is sent over. + feeUpdates chan *feeUpdateRequest + // bestHeight is the height of the block at the tip of the main chain // as we know it. bestHeight uint32 + + // selfKey is the identity public key of the backing Lighting node. + selfKey *btcec.PublicKey +} + +// New creates a new AuthenticatedGossiper instance, initialized with the +// passed configuration parameters. +func New(cfg Config, selfKey *btcec.PublicKey) (*AuthenticatedGossiper, error) { + storage, err := channeldb.NewWaitingProofStore(cfg.DB) + if err != nil { + return nil, err + } + + return &AuthenticatedGossiper{ + selfKey: selfKey, + cfg: &cfg, + networkMsgs: make(chan *networkMsg), + quit: make(chan struct{}), + syncRequests: make(chan *syncRequest), + feeUpdates: make(chan *feeUpdateRequest), + prematureAnnouncements: make(map[uint32][]*networkMsg), + waitingProofs: storage, + }, nil } // SynchronizeNode sends a message to the service indicating it should @@ -162,6 +193,30 @@ func (d *AuthenticatedGossiper) SynchronizeNode(pub *btcec.PublicKey) { } } +// PropagateFeeUpdate signals the AuthenticatedGossiper to update the fee +// schema for the specified channels. If no channels are specified, then the +// fee update will be applied to all outgoing channels from the source node. +// Fee updates are done in two stages: first, the AuthenticatedGossiper ensures +// the updated has been committed by dependant sub-systems, then it signs and +// broadcasts new updates to the network. +func (d *AuthenticatedGossiper) PropagateFeeUpdate(newSchema routing.FeeSchema, + chanPoints ...wire.OutPoint) error { + + errChan := make(chan error, 1) + feeUpdate := &feeUpdateRequest{ + targetChans: chanPoints, + newSchema: newSchema, + errResp: errChan, + } + + select { + case d.feeUpdates <- feeUpdate: + return <-errChan + case <-d.quit: + return fmt.Errorf("AuthenticatedGossiper shutting down") + } +} + // Start spawns network messages handler goroutine and registers on new block // notifications in order to properly handle the premature announcements. func (d *AuthenticatedGossiper) Start() error { @@ -281,6 +336,28 @@ func (d *AuthenticatedGossiper) networkHandler() { for { select { + // A new fee update has arrived. We'll commit it to the + // sub-systems below us, then craft, sign, and broadcast a new + // ChannelUpdate for the set of affected clients. + case feeUpdate := <-d.feeUpdates: + // First, we'll now create new fully signed updates for + // the affected channels and also update the underlying + // graph with the new state. + newChanUpdates, err := d.processFeeChanUpdate(feeUpdate) + if err != nil { + log.Errorf("Unable to craft fee updates: %v", err) + feeUpdate.errResp <- err + continue + } + + // Finally, with the updates committed, we'll now add + // them to the announcement batch to be flushed at the + // start of the next epoch. + announcementBatch = append(announcementBatch, + newChanUpdates...) + + feeUpdate.errResp <- nil + case announcement := <-d.networkMsgs: // Process the network announcement to determine if // this is either a new announcement from our PoV or an @@ -370,21 +447,22 @@ func (d *AuthenticatedGossiper) networkHandler() { // Iterate over our channels and construct the // announcements array. - err := d.cfg.Router.ForAllOutgoingChannels(func(p *channeldb.ChannelEdgePolicy) error { + err := d.cfg.Router.ForAllOutgoingChannels(func(_ *channeldb.ChannelEdgeInfo, + p *channeldb.ChannelEdgePolicy) error { + c := &lnwire.ChannelUpdate{ Signature: p.Signature, ShortChannelID: lnwire.NewShortChanIDFromInt(p.ChannelID), Timestamp: uint32(p.LastUpdate.Unix()), Flags: p.Flags, TimeLockDelta: p.TimeLockDelta, - HtlcMinimumMsat: uint64(p.MinHTLC), + HtlcMinimumMsat: p.MinHTLC, BaseFee: uint32(p.FeeBaseMSat), FeeRate: uint32(p.FeeProportionalMillionths), } selfChans = append(selfChans, c) return nil - }, - ) + }) if err != nil { log.Errorf("unable to retrieve outgoing channels: %v", err) continue @@ -425,6 +503,113 @@ func (d *AuthenticatedGossiper) networkHandler() { } } +// processFeeChanUpdate generates a new set of channel updates with the new fee +// schema applied for each specified channel identified by its channel point. +// In the case that no channel points are specified, then the fee update will +// be applied to all channels. Finally, the backing ChannelGraphSource is +// updated with the latest information reflecting the +// applied fee updates. +// +// TODO(roasbeef): generalize into generic for any channel update +func (d *AuthenticatedGossiper) processFeeChanUpdate(feeUpdate *feeUpdateRequest) ([]lnwire.Message, error) { + // First, we'll construct a set of all the channels that need to be + // updated. + chansToUpdate := make(map[wire.OutPoint]struct{}) + for _, chanPoint := range feeUpdate.targetChans { + chansToUpdate[chanPoint] = struct{}{} + } + + haveChanFilter := len(chansToUpdate) != 0 + + var chanUpdates []*lnwire.ChannelUpdate + chanEdges := make(map[lnwire.ShortChannelID]*channeldb.ChannelEdgePolicy) + + // Next, we'll loop over all the outgoing channels the router knows of. + // If we have a filter then we'll only collected those channels, + // otherwise we'll collect them all. + err := d.cfg.Router.ForAllOutgoingChannels(func(info *channeldb.ChannelEdgeInfo, + edge *channeldb.ChannelEdgePolicy) error { + + // If we have a channel filter, and this channel isn't a part + // of it, then we'll skip it. + if _, ok := chansToUpdate[info.ChannelPoint]; !ok && haveChanFilter { + return nil + } + + // Otherwise, add the channel update to our batch to be + // updated, as we'll be re-signing it shortly. + c := &lnwire.ChannelUpdate{ + Signature: edge.Signature, + ShortChannelID: lnwire.NewShortChanIDFromInt(edge.ChannelID), + Timestamp: uint32(edge.LastUpdate.Unix()), + Flags: edge.Flags, + TimeLockDelta: edge.TimeLockDelta, + HtlcMinimumMsat: edge.MinHTLC, + BaseFee: uint32(edge.FeeBaseMSat), + FeeRate: uint32(edge.FeeProportionalMillionths), + } + chanUpdates = append(chanUpdates, c) + + // We'll also add it to our edge map so we can find it easily + // later to update the state within the database. + chanEdges[c.ShortChannelID] = edge + return nil + }) + if err != nil { + return nil, err + } + + // With the set of channel updates we need to sign obtained, we'll not + // generate new signatures for each of them using applying the new fee + // schema before signing. + signedAnns := make([]lnwire.Message, len(chanUpdates)) + for i, chanUpdate := range chanUpdates { + edge := chanEdges[chanUpdate.ShortChannelID] + now := time.Now() + + // First, we'll apply the new few schema update to the channel + // update and also the backing database struct. + chanUpdate.BaseFee = uint32(feeUpdate.newSchema.BaseFee) + chanUpdate.FeeRate = feeUpdate.newSchema.FeeRate + chanUpdate.Timestamp = uint32(now.Unix()) + edge.FeeBaseMSat = feeUpdate.newSchema.BaseFee + edge.FeeProportionalMillionths = lnwire.MilliSatoshi( + feeUpdate.newSchema.FeeRate, + ) + edge.LastUpdate = now + + // With the update applied, we'll generate a new signature over + // a digest of the channel announcement itself. + sig, err := SignAnnouncement(d.cfg.AnnSigner, d.selfKey, + chanUpdate) + if err != nil { + return nil, err + } + + // Next, we'll set the new signature in place, and update the + // reference in the backing slice. + edge.Signature = sig + chanUpdate.Signature = sig + signedAnns[i] = chanUpdate + + // To ensure that our signature is valid, we'll verify it + // ourself before committing it to the slice returned. + err = d.validateChannelUpdateAnn(d.selfKey, chanUpdate) + if err != nil { + return nil, fmt.Errorf("generated invalid channel update "+ + "sig: %v", err) + } + + // Finally, we'll update the fee schema for this edge on disk. + edge.Node.PubKey.Curve = nil + if err := d.cfg.Router.UpdateEdge(edge); err != nil { + return nil, err + } + } + + return signedAnns, nil +} + // processNetworkAnnouncement processes a new network relate authenticated // channel or node announcement or announcements proofs. If the announcement // didn't affect the internal state due to either being out of date, invalid, @@ -432,6 +617,8 @@ func (d *AuthenticatedGossiper) networkHandler() { // be returned which should be broadcasted to the rest of the network. func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []lnwire.Message { isPremature := func(chanID lnwire.ShortChannelID, delta uint32) bool { + // TODO(roasbeef) make height delta 6 + // * or configurable return chanID.BlockHeight+delta > d.bestHeight } @@ -628,16 +815,15 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l return nil } - // TODO(roasbeef): should be msat here update := &channeldb.ChannelEdgePolicy{ Signature: msg.Signature, ChannelID: shortChanID, LastUpdate: time.Unix(int64(msg.Timestamp), 0), Flags: msg.Flags, TimeLockDelta: msg.TimeLockDelta, - MinHTLC: btcutil.Amount(msg.HtlcMinimumMsat), - FeeBaseMSat: btcutil.Amount(msg.BaseFee), - FeeProportionalMillionths: btcutil.Amount(msg.FeeRate), + MinHTLC: msg.HtlcMinimumMsat, + FeeBaseMSat: lnwire.MilliSatoshi(msg.BaseFee), + FeeProportionalMillionths: lnwire.MilliSatoshi(msg.FeeRate), } if err := d.cfg.Router.UpdateEdge(update); err != nil {