discovery: add new PropagateFeeUpdate method

This commit adds a new method to the AuthenticatedGossiper:
PropagateFeeUpdate. This new method will allow callers to update the
fee schedule advertised for a particular channel, or all currently
active channels. With this method exposed, the AuthenticatedGossiper
will now craft the new channel update messages, sign the new state,
commit the new policy to the underlying graph, and finally add the
message to the next announcement epoch batch.
This commit is contained in:
Olaoluwa Osuntokun 2017-08-21 23:40:02 -07:00
parent 8a51b1a0c6
commit 812c2f1ce6
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2

@ -2,6 +2,7 @@ package discovery
import ( import (
"bytes" "bytes"
"fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -10,10 +11,11 @@ import (
"github.com/go-errors/errors" "github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing" "github.com/lightningnetwork/lnd/routing"
"github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcutil" "github.com/roasbeef/btcd/wire"
) )
// networkMsg couples a routing related wire message with the peer that // networkMsg couples a routing related wire message with the peer that
@ -33,6 +35,17 @@ type syncRequest struct {
node *btcec.PublicKey node *btcec.PublicKey
} }
// feeUpdateRequest is a request that is sent to the server when a caller
// wishes to update the fees for a particular set of tchannels. New UpdateFee
// messages will be crafted to be sent out during the next broadcast epoch and
// the fee updates committed to the lower layer.
type feeUpdateRequest struct {
targetChans []wire.OutPoint
newSchema routing.FeeSchema
errResp chan error
}
// Config defines the configuration for the service. ALL elements within the // Config defines the configuration for the service. ALL elements within the
// configuration MUST be non-nil for the service to carry out its duties. // configuration MUST be non-nil for the service to carry out its duties.
type Config struct { type Config struct {
@ -69,27 +82,18 @@ type Config struct {
// the last trickle tick. // the last trickle tick.
TrickleDelay time.Duration TrickleDelay time.Duration
// DB is a global boltdb instance which is needed to pass it in // DB is a global boltdb instance which is needed to pass it in waiting
// waiting proof storage to make waiting proofs persistent. // proof storage to make waiting proofs persistent.
DB *channeldb.DB DB *channeldb.DB
}
// New creates a new AuthenticatedGossiper instance, initialized with the // AnnSigner is an instance of the MessageSigner interface which will
// passed configuration parameters. // be used to manually sign any outgoing channel updates. The signer
func New(cfg Config) (*AuthenticatedGossiper, error) { // implementation should be backed by the public key of the backing
storage, err := channeldb.NewWaitingProofStore(cfg.DB) // Lightning node.
if err != nil { //
return nil, err // TODO(roasbeef): extract ann crafting + sign from fundingMgr into
} // here?
AnnSigner lnwallet.MessageSigner
return &AuthenticatedGossiper{
cfg: &cfg,
networkMsgs: make(chan *networkMsg),
quit: make(chan struct{}),
syncRequests: make(chan *syncRequest),
prematureAnnouncements: make(map[uint32][]*networkMsg),
waitingProofs: storage,
}, nil
} }
// AuthenticatedGossiper is a subsystem which is responsible for receiving // AuthenticatedGossiper is a subsystem which is responsible for receiving
@ -143,9 +147,36 @@ type AuthenticatedGossiper struct {
// our PoV. // our PoV.
syncRequests chan *syncRequest syncRequests chan *syncRequest
// feeUpdates is a channel that requests to update the fee schedule of
// a set of channels is sent over.
feeUpdates chan *feeUpdateRequest
// bestHeight is the height of the block at the tip of the main chain // bestHeight is the height of the block at the tip of the main chain
// as we know it. // as we know it.
bestHeight uint32 bestHeight uint32
// selfKey is the identity public key of the backing Lighting node.
selfKey *btcec.PublicKey
}
// New creates a new AuthenticatedGossiper instance, initialized with the
// passed configuration parameters.
func New(cfg Config, selfKey *btcec.PublicKey) (*AuthenticatedGossiper, error) {
storage, err := channeldb.NewWaitingProofStore(cfg.DB)
if err != nil {
return nil, err
}
return &AuthenticatedGossiper{
selfKey: selfKey,
cfg: &cfg,
networkMsgs: make(chan *networkMsg),
quit: make(chan struct{}),
syncRequests: make(chan *syncRequest),
feeUpdates: make(chan *feeUpdateRequest),
prematureAnnouncements: make(map[uint32][]*networkMsg),
waitingProofs: storage,
}, nil
} }
// SynchronizeNode sends a message to the service indicating it should // SynchronizeNode sends a message to the service indicating it should
@ -162,6 +193,30 @@ func (d *AuthenticatedGossiper) SynchronizeNode(pub *btcec.PublicKey) {
} }
} }
// PropagateFeeUpdate signals the AuthenticatedGossiper to update the fee
// schema for the specified channels. If no channels are specified, then the
// fee update will be applied to all outgoing channels from the source node.
// Fee updates are done in two stages: first, the AuthenticatedGossiper ensures
// the updated has been committed by dependant sub-systems, then it signs and
// broadcasts new updates to the network.
func (d *AuthenticatedGossiper) PropagateFeeUpdate(newSchema routing.FeeSchema,
chanPoints ...wire.OutPoint) error {
errChan := make(chan error, 1)
feeUpdate := &feeUpdateRequest{
targetChans: chanPoints,
newSchema: newSchema,
errResp: errChan,
}
select {
case d.feeUpdates <- feeUpdate:
return <-errChan
case <-d.quit:
return fmt.Errorf("AuthenticatedGossiper shutting down")
}
}
// Start spawns network messages handler goroutine and registers on new block // Start spawns network messages handler goroutine and registers on new block
// notifications in order to properly handle the premature announcements. // notifications in order to properly handle the premature announcements.
func (d *AuthenticatedGossiper) Start() error { func (d *AuthenticatedGossiper) Start() error {
@ -281,6 +336,28 @@ func (d *AuthenticatedGossiper) networkHandler() {
for { for {
select { select {
// A new fee update has arrived. We'll commit it to the
// sub-systems below us, then craft, sign, and broadcast a new
// ChannelUpdate for the set of affected clients.
case feeUpdate := <-d.feeUpdates:
// First, we'll now create new fully signed updates for
// the affected channels and also update the underlying
// graph with the new state.
newChanUpdates, err := d.processFeeChanUpdate(feeUpdate)
if err != nil {
log.Errorf("Unable to craft fee updates: %v", err)
feeUpdate.errResp <- err
continue
}
// Finally, with the updates committed, we'll now add
// them to the announcement batch to be flushed at the
// start of the next epoch.
announcementBatch = append(announcementBatch,
newChanUpdates...)
feeUpdate.errResp <- nil
case announcement := <-d.networkMsgs: case announcement := <-d.networkMsgs:
// Process the network announcement to determine if // Process the network announcement to determine if
// this is either a new announcement from our PoV or an // this is either a new announcement from our PoV or an
@ -370,21 +447,22 @@ func (d *AuthenticatedGossiper) networkHandler() {
// Iterate over our channels and construct the // Iterate over our channels and construct the
// announcements array. // announcements array.
err := d.cfg.Router.ForAllOutgoingChannels(func(p *channeldb.ChannelEdgePolicy) error { err := d.cfg.Router.ForAllOutgoingChannels(func(_ *channeldb.ChannelEdgeInfo,
p *channeldb.ChannelEdgePolicy) error {
c := &lnwire.ChannelUpdate{ c := &lnwire.ChannelUpdate{
Signature: p.Signature, Signature: p.Signature,
ShortChannelID: lnwire.NewShortChanIDFromInt(p.ChannelID), ShortChannelID: lnwire.NewShortChanIDFromInt(p.ChannelID),
Timestamp: uint32(p.LastUpdate.Unix()), Timestamp: uint32(p.LastUpdate.Unix()),
Flags: p.Flags, Flags: p.Flags,
TimeLockDelta: p.TimeLockDelta, TimeLockDelta: p.TimeLockDelta,
HtlcMinimumMsat: uint64(p.MinHTLC), HtlcMinimumMsat: p.MinHTLC,
BaseFee: uint32(p.FeeBaseMSat), BaseFee: uint32(p.FeeBaseMSat),
FeeRate: uint32(p.FeeProportionalMillionths), FeeRate: uint32(p.FeeProportionalMillionths),
} }
selfChans = append(selfChans, c) selfChans = append(selfChans, c)
return nil return nil
}, })
)
if err != nil { if err != nil {
log.Errorf("unable to retrieve outgoing channels: %v", err) log.Errorf("unable to retrieve outgoing channels: %v", err)
continue continue
@ -425,6 +503,113 @@ func (d *AuthenticatedGossiper) networkHandler() {
} }
} }
// processFeeChanUpdate generates a new set of channel updates with the new fee
// schema applied for each specified channel identified by its channel point.
// In the case that no channel points are specified, then the fee update will
// be applied to all channels. Finally, the backing ChannelGraphSource is
// updated with the latest information reflecting the
// applied fee updates.
//
// TODO(roasbeef): generalize into generic for any channel update
func (d *AuthenticatedGossiper) processFeeChanUpdate(feeUpdate *feeUpdateRequest) ([]lnwire.Message, error) {
// First, we'll construct a set of all the channels that need to be
// updated.
chansToUpdate := make(map[wire.OutPoint]struct{})
for _, chanPoint := range feeUpdate.targetChans {
chansToUpdate[chanPoint] = struct{}{}
}
haveChanFilter := len(chansToUpdate) != 0
var chanUpdates []*lnwire.ChannelUpdate
chanEdges := make(map[lnwire.ShortChannelID]*channeldb.ChannelEdgePolicy)
// Next, we'll loop over all the outgoing channels the router knows of.
// If we have a filter then we'll only collected those channels,
// otherwise we'll collect them all.
err := d.cfg.Router.ForAllOutgoingChannels(func(info *channeldb.ChannelEdgeInfo,
edge *channeldb.ChannelEdgePolicy) error {
// If we have a channel filter, and this channel isn't a part
// of it, then we'll skip it.
if _, ok := chansToUpdate[info.ChannelPoint]; !ok && haveChanFilter {
return nil
}
// Otherwise, add the channel update to our batch to be
// updated, as we'll be re-signing it shortly.
c := &lnwire.ChannelUpdate{
Signature: edge.Signature,
ShortChannelID: lnwire.NewShortChanIDFromInt(edge.ChannelID),
Timestamp: uint32(edge.LastUpdate.Unix()),
Flags: edge.Flags,
TimeLockDelta: edge.TimeLockDelta,
HtlcMinimumMsat: edge.MinHTLC,
BaseFee: uint32(edge.FeeBaseMSat),
FeeRate: uint32(edge.FeeProportionalMillionths),
}
chanUpdates = append(chanUpdates, c)
// We'll also add it to our edge map so we can find it easily
// later to update the state within the database.
chanEdges[c.ShortChannelID] = edge
return nil
})
if err != nil {
return nil, err
}
// With the set of channel updates we need to sign obtained, we'll not
// generate new signatures for each of them using applying the new fee
// schema before signing.
signedAnns := make([]lnwire.Message, len(chanUpdates))
for i, chanUpdate := range chanUpdates {
edge := chanEdges[chanUpdate.ShortChannelID]
now := time.Now()
// First, we'll apply the new few schema update to the channel
// update and also the backing database struct.
chanUpdate.BaseFee = uint32(feeUpdate.newSchema.BaseFee)
chanUpdate.FeeRate = feeUpdate.newSchema.FeeRate
chanUpdate.Timestamp = uint32(now.Unix())
edge.FeeBaseMSat = feeUpdate.newSchema.BaseFee
edge.FeeProportionalMillionths = lnwire.MilliSatoshi(
feeUpdate.newSchema.FeeRate,
)
edge.LastUpdate = now
// With the update applied, we'll generate a new signature over
// a digest of the channel announcement itself.
sig, err := SignAnnouncement(d.cfg.AnnSigner, d.selfKey,
chanUpdate)
if err != nil {
return nil, err
}
// Next, we'll set the new signature in place, and update the
// reference in the backing slice.
edge.Signature = sig
chanUpdate.Signature = sig
signedAnns[i] = chanUpdate
// To ensure that our signature is valid, we'll verify it
// ourself before committing it to the slice returned.
err = d.validateChannelUpdateAnn(d.selfKey, chanUpdate)
if err != nil {
return nil, fmt.Errorf("generated invalid channel update "+
"sig: %v", err)
}
// Finally, we'll update the fee schema for this edge on disk.
edge.Node.PubKey.Curve = nil
if err := d.cfg.Router.UpdateEdge(edge); err != nil {
return nil, err
}
}
return signedAnns, nil
}
// processNetworkAnnouncement processes a new network relate authenticated // processNetworkAnnouncement processes a new network relate authenticated
// channel or node announcement or announcements proofs. If the announcement // channel or node announcement or announcements proofs. If the announcement
// didn't affect the internal state due to either being out of date, invalid, // didn't affect the internal state due to either being out of date, invalid,
@ -432,6 +617,8 @@ func (d *AuthenticatedGossiper) networkHandler() {
// be returned which should be broadcasted to the rest of the network. // be returned which should be broadcasted to the rest of the network.
func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []lnwire.Message { func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []lnwire.Message {
isPremature := func(chanID lnwire.ShortChannelID, delta uint32) bool { isPremature := func(chanID lnwire.ShortChannelID, delta uint32) bool {
// TODO(roasbeef) make height delta 6
// * or configurable
return chanID.BlockHeight+delta > d.bestHeight return chanID.BlockHeight+delta > d.bestHeight
} }
@ -628,16 +815,15 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l
return nil return nil
} }
// TODO(roasbeef): should be msat here
update := &channeldb.ChannelEdgePolicy{ update := &channeldb.ChannelEdgePolicy{
Signature: msg.Signature, Signature: msg.Signature,
ChannelID: shortChanID, ChannelID: shortChanID,
LastUpdate: time.Unix(int64(msg.Timestamp), 0), LastUpdate: time.Unix(int64(msg.Timestamp), 0),
Flags: msg.Flags, Flags: msg.Flags,
TimeLockDelta: msg.TimeLockDelta, TimeLockDelta: msg.TimeLockDelta,
MinHTLC: btcutil.Amount(msg.HtlcMinimumMsat), MinHTLC: msg.HtlcMinimumMsat,
FeeBaseMSat: btcutil.Amount(msg.BaseFee), FeeBaseMSat: lnwire.MilliSatoshi(msg.BaseFee),
FeeProportionalMillionths: btcutil.Amount(msg.FeeRate), FeeProportionalMillionths: lnwire.MilliSatoshi(msg.FeeRate),
} }
if err := d.cfg.Router.UpdateEdge(update); err != nil { if err := d.cfg.Router.UpdateEdge(update); err != nil {