diff --git a/discovery/service.go b/discovery/service.go index d83d3e9e..d84b0d1f 100644 --- a/discovery/service.go +++ b/discovery/service.go @@ -2,6 +2,7 @@ package discovery import ( "bytes" + "fmt" "sync" "sync/atomic" "time" @@ -10,10 +11,11 @@ import ( "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing" "github.com/roasbeef/btcd/btcec" - "github.com/roasbeef/btcutil" + "github.com/roasbeef/btcd/wire" ) // networkMsg couples a routing related wire message with the peer that @@ -33,6 +35,17 @@ type syncRequest struct { node *btcec.PublicKey } +// feeUpdateRequest is a request that is sent to the server when a caller +// wishes to update the fees for a particular set of tchannels. New UpdateFee +// messages will be crafted to be sent out during the next broadcast epoch and +// the fee updates committed to the lower layer. +type feeUpdateRequest struct { + targetChans []wire.OutPoint + newSchema routing.FeeSchema + + errResp chan error +} + // Config defines the configuration for the service. ALL elements within the // configuration MUST be non-nil for the service to carry out its duties. type Config struct { @@ -69,27 +82,18 @@ type Config struct { // the last trickle tick. TrickleDelay time.Duration - // DB is a global boltdb instance which is needed to pass it in - // waiting proof storage to make waiting proofs persistent. + // DB is a global boltdb instance which is needed to pass it in waiting + // proof storage to make waiting proofs persistent. DB *channeldb.DB -} -// New creates a new AuthenticatedGossiper instance, initialized with the -// passed configuration parameters. -func New(cfg Config) (*AuthenticatedGossiper, error) { - storage, err := channeldb.NewWaitingProofStore(cfg.DB) - if err != nil { - return nil, err - } - - return &AuthenticatedGossiper{ - cfg: &cfg, - networkMsgs: make(chan *networkMsg), - quit: make(chan struct{}), - syncRequests: make(chan *syncRequest), - prematureAnnouncements: make(map[uint32][]*networkMsg), - waitingProofs: storage, - }, nil + // AnnSigner is an instance of the MessageSigner interface which will + // be used to manually sign any outgoing channel updates. The signer + // implementation should be backed by the public key of the backing + // Lightning node. + // + // TODO(roasbeef): extract ann crafting + sign from fundingMgr into + // here? + AnnSigner lnwallet.MessageSigner } // AuthenticatedGossiper is a subsystem which is responsible for receiving @@ -143,9 +147,36 @@ type AuthenticatedGossiper struct { // our PoV. syncRequests chan *syncRequest + // feeUpdates is a channel that requests to update the fee schedule of + // a set of channels is sent over. + feeUpdates chan *feeUpdateRequest + // bestHeight is the height of the block at the tip of the main chain // as we know it. bestHeight uint32 + + // selfKey is the identity public key of the backing Lighting node. + selfKey *btcec.PublicKey +} + +// New creates a new AuthenticatedGossiper instance, initialized with the +// passed configuration parameters. +func New(cfg Config, selfKey *btcec.PublicKey) (*AuthenticatedGossiper, error) { + storage, err := channeldb.NewWaitingProofStore(cfg.DB) + if err != nil { + return nil, err + } + + return &AuthenticatedGossiper{ + selfKey: selfKey, + cfg: &cfg, + networkMsgs: make(chan *networkMsg), + quit: make(chan struct{}), + syncRequests: make(chan *syncRequest), + feeUpdates: make(chan *feeUpdateRequest), + prematureAnnouncements: make(map[uint32][]*networkMsg), + waitingProofs: storage, + }, nil } // SynchronizeNode sends a message to the service indicating it should @@ -162,6 +193,30 @@ func (d *AuthenticatedGossiper) SynchronizeNode(pub *btcec.PublicKey) { } } +// PropagateFeeUpdate signals the AuthenticatedGossiper to update the fee +// schema for the specified channels. If no channels are specified, then the +// fee update will be applied to all outgoing channels from the source node. +// Fee updates are done in two stages: first, the AuthenticatedGossiper ensures +// the updated has been committed by dependant sub-systems, then it signs and +// broadcasts new updates to the network. +func (d *AuthenticatedGossiper) PropagateFeeUpdate(newSchema routing.FeeSchema, + chanPoints ...wire.OutPoint) error { + + errChan := make(chan error, 1) + feeUpdate := &feeUpdateRequest{ + targetChans: chanPoints, + newSchema: newSchema, + errResp: errChan, + } + + select { + case d.feeUpdates <- feeUpdate: + return <-errChan + case <-d.quit: + return fmt.Errorf("AuthenticatedGossiper shutting down") + } +} + // Start spawns network messages handler goroutine and registers on new block // notifications in order to properly handle the premature announcements. func (d *AuthenticatedGossiper) Start() error { @@ -281,6 +336,28 @@ func (d *AuthenticatedGossiper) networkHandler() { for { select { + // A new fee update has arrived. We'll commit it to the + // sub-systems below us, then craft, sign, and broadcast a new + // ChannelUpdate for the set of affected clients. + case feeUpdate := <-d.feeUpdates: + // First, we'll now create new fully signed updates for + // the affected channels and also update the underlying + // graph with the new state. + newChanUpdates, err := d.processFeeChanUpdate(feeUpdate) + if err != nil { + log.Errorf("Unable to craft fee updates: %v", err) + feeUpdate.errResp <- err + continue + } + + // Finally, with the updates committed, we'll now add + // them to the announcement batch to be flushed at the + // start of the next epoch. + announcementBatch = append(announcementBatch, + newChanUpdates...) + + feeUpdate.errResp <- nil + case announcement := <-d.networkMsgs: // Process the network announcement to determine if // this is either a new announcement from our PoV or an @@ -370,21 +447,22 @@ func (d *AuthenticatedGossiper) networkHandler() { // Iterate over our channels and construct the // announcements array. - err := d.cfg.Router.ForAllOutgoingChannels(func(p *channeldb.ChannelEdgePolicy) error { + err := d.cfg.Router.ForAllOutgoingChannels(func(_ *channeldb.ChannelEdgeInfo, + p *channeldb.ChannelEdgePolicy) error { + c := &lnwire.ChannelUpdate{ Signature: p.Signature, ShortChannelID: lnwire.NewShortChanIDFromInt(p.ChannelID), Timestamp: uint32(p.LastUpdate.Unix()), Flags: p.Flags, TimeLockDelta: p.TimeLockDelta, - HtlcMinimumMsat: uint64(p.MinHTLC), + HtlcMinimumMsat: p.MinHTLC, BaseFee: uint32(p.FeeBaseMSat), FeeRate: uint32(p.FeeProportionalMillionths), } selfChans = append(selfChans, c) return nil - }, - ) + }) if err != nil { log.Errorf("unable to retrieve outgoing channels: %v", err) continue @@ -425,6 +503,113 @@ func (d *AuthenticatedGossiper) networkHandler() { } } +// processFeeChanUpdate generates a new set of channel updates with the new fee +// schema applied for each specified channel identified by its channel point. +// In the case that no channel points are specified, then the fee update will +// be applied to all channels. Finally, the backing ChannelGraphSource is +// updated with the latest information reflecting the +// applied fee updates. +// +// TODO(roasbeef): generalize into generic for any channel update +func (d *AuthenticatedGossiper) processFeeChanUpdate(feeUpdate *feeUpdateRequest) ([]lnwire.Message, error) { + // First, we'll construct a set of all the channels that need to be + // updated. + chansToUpdate := make(map[wire.OutPoint]struct{}) + for _, chanPoint := range feeUpdate.targetChans { + chansToUpdate[chanPoint] = struct{}{} + } + + haveChanFilter := len(chansToUpdate) != 0 + + var chanUpdates []*lnwire.ChannelUpdate + chanEdges := make(map[lnwire.ShortChannelID]*channeldb.ChannelEdgePolicy) + + // Next, we'll loop over all the outgoing channels the router knows of. + // If we have a filter then we'll only collected those channels, + // otherwise we'll collect them all. + err := d.cfg.Router.ForAllOutgoingChannels(func(info *channeldb.ChannelEdgeInfo, + edge *channeldb.ChannelEdgePolicy) error { + + // If we have a channel filter, and this channel isn't a part + // of it, then we'll skip it. + if _, ok := chansToUpdate[info.ChannelPoint]; !ok && haveChanFilter { + return nil + } + + // Otherwise, add the channel update to our batch to be + // updated, as we'll be re-signing it shortly. + c := &lnwire.ChannelUpdate{ + Signature: edge.Signature, + ShortChannelID: lnwire.NewShortChanIDFromInt(edge.ChannelID), + Timestamp: uint32(edge.LastUpdate.Unix()), + Flags: edge.Flags, + TimeLockDelta: edge.TimeLockDelta, + HtlcMinimumMsat: edge.MinHTLC, + BaseFee: uint32(edge.FeeBaseMSat), + FeeRate: uint32(edge.FeeProportionalMillionths), + } + chanUpdates = append(chanUpdates, c) + + // We'll also add it to our edge map so we can find it easily + // later to update the state within the database. + chanEdges[c.ShortChannelID] = edge + return nil + }) + if err != nil { + return nil, err + } + + // With the set of channel updates we need to sign obtained, we'll not + // generate new signatures for each of them using applying the new fee + // schema before signing. + signedAnns := make([]lnwire.Message, len(chanUpdates)) + for i, chanUpdate := range chanUpdates { + edge := chanEdges[chanUpdate.ShortChannelID] + now := time.Now() + + // First, we'll apply the new few schema update to the channel + // update and also the backing database struct. + chanUpdate.BaseFee = uint32(feeUpdate.newSchema.BaseFee) + chanUpdate.FeeRate = feeUpdate.newSchema.FeeRate + chanUpdate.Timestamp = uint32(now.Unix()) + edge.FeeBaseMSat = feeUpdate.newSchema.BaseFee + edge.FeeProportionalMillionths = lnwire.MilliSatoshi( + feeUpdate.newSchema.FeeRate, + ) + edge.LastUpdate = now + + // With the update applied, we'll generate a new signature over + // a digest of the channel announcement itself. + sig, err := SignAnnouncement(d.cfg.AnnSigner, d.selfKey, + chanUpdate) + if err != nil { + return nil, err + } + + // Next, we'll set the new signature in place, and update the + // reference in the backing slice. + edge.Signature = sig + chanUpdate.Signature = sig + signedAnns[i] = chanUpdate + + // To ensure that our signature is valid, we'll verify it + // ourself before committing it to the slice returned. + err = d.validateChannelUpdateAnn(d.selfKey, chanUpdate) + if err != nil { + return nil, fmt.Errorf("generated invalid channel update "+ + "sig: %v", err) + } + + // Finally, we'll update the fee schema for this edge on disk. + edge.Node.PubKey.Curve = nil + if err := d.cfg.Router.UpdateEdge(edge); err != nil { + return nil, err + } + } + + return signedAnns, nil +} + // processNetworkAnnouncement processes a new network relate authenticated // channel or node announcement or announcements proofs. If the announcement // didn't affect the internal state due to either being out of date, invalid, @@ -432,6 +617,8 @@ func (d *AuthenticatedGossiper) networkHandler() { // be returned which should be broadcasted to the rest of the network. func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []lnwire.Message { isPremature := func(chanID lnwire.ShortChannelID, delta uint32) bool { + // TODO(roasbeef) make height delta 6 + // * or configurable return chanID.BlockHeight+delta > d.bestHeight } @@ -628,16 +815,15 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l return nil } - // TODO(roasbeef): should be msat here update := &channeldb.ChannelEdgePolicy{ Signature: msg.Signature, ChannelID: shortChanID, LastUpdate: time.Unix(int64(msg.Timestamp), 0), Flags: msg.Flags, TimeLockDelta: msg.TimeLockDelta, - MinHTLC: btcutil.Amount(msg.HtlcMinimumMsat), - FeeBaseMSat: btcutil.Amount(msg.BaseFee), - FeeProportionalMillionths: btcutil.Amount(msg.FeeRate), + MinHTLC: msg.HtlcMinimumMsat, + FeeBaseMSat: lnwire.MilliSatoshi(msg.BaseFee), + FeeProportionalMillionths: lnwire.MilliSatoshi(msg.FeeRate), } if err := d.cfg.Router.UpdateEdge(update); err != nil {