routing+discovery: extract local channel manager

The policy update logic that resided part in the gossiper and
part in the rpc server is extracted into its own object.

This prepares for additional validation logic to be added for policy
updates that would otherwise make the gossiper heavier.

It is also a small first step towards separation of our own channel data
from the rest of the graph.
This commit is contained in:
Joost Jager 2019-09-19 11:02:46 +02:00
parent 4b2eb9cb81
commit c80feeb4b3
No known key found for this signature in database
GPG Key ID: A61B9D4C393C59C7
7 changed files with 323 additions and 157 deletions

@ -86,14 +86,12 @@ type networkMsg struct {
}
// chanPolicyUpdateRequest is a request that is sent to the server when a caller
// wishes to update the channel policy (fees e.g.) for a particular set of
// channels. New ChannelUpdate messages will be crafted to be sent out during
// the next broadcast epoch and the fee updates committed to the lower layer.
// wishes to update a particular set of channels. New ChannelUpdate messages
// will be crafted to be sent out during the next broadcast epoch and the fee
// updates committed to the lower layer.
type chanPolicyUpdateRequest struct {
targetChans []wire.OutPoint
newSchema routing.ChannelPolicy
chanPolicies chan updatedChanPolicies
edgesToUpdate []EdgeWithInfo
errChan chan error
}
// Config defines the configuration for the service. ALL elements within the
@ -361,31 +359,36 @@ type updatedChanPolicies struct {
err error
}
// PropagateChanPolicyUpdate signals the AuthenticatedGossiper to update the
// channel forwarding policies for the specified channels. If no channels are
// specified, then the update will be applied to all outgoing channels from the
// source node. Policy updates are done in two stages: first, the
// EdgeWithInfo contains the information that is required to update an edge.
type EdgeWithInfo struct {
// Info describes the channel.
Info *channeldb.ChannelEdgeInfo
// Edge describes the policy in one direction of the channel.
Edge *channeldb.ChannelEdgePolicy
}
// PropagateChanPolicyUpdate signals the AuthenticatedGossiper to perform the
// specified edge updates. Updates are done in two stages: first, the
// AuthenticatedGossiper ensures the update has been committed by dependent
// sub-systems, then it signs and broadcasts new updates to the network. A
// mapping between outpoints and updated channel policies is returned, which is
// used to update the forwarding policies of the underlying links.
func (d *AuthenticatedGossiper) PropagateChanPolicyUpdate(
newSchema routing.ChannelPolicy, chanPoints ...wire.OutPoint) (
map[wire.OutPoint]*channeldb.ChannelEdgePolicy, error) {
edgesToUpdate []EdgeWithInfo) error {
chanPolicyChan := make(chan updatedChanPolicies, 1)
errChan := make(chan error, 1)
policyUpdate := &chanPolicyUpdateRequest{
targetChans: chanPoints,
newSchema: newSchema,
chanPolicies: chanPolicyChan,
edgesToUpdate: edgesToUpdate,
errChan: errChan,
}
select {
case d.chanPolicyUpdates <- policyUpdate:
updatedPolicies := <-chanPolicyChan
return updatedPolicies.chanPolicies, updatedPolicies.err
err := <-errChan
return err
case <-d.quit:
return nil, fmt.Errorf("AuthenticatedGossiper shutting down")
return fmt.Errorf("AuthenticatedGossiper shutting down")
}
}
@ -922,14 +925,10 @@ func (d *AuthenticatedGossiper) networkHandler() {
// First, we'll now create new fully signed updates for
// the affected channels and also update the underlying
// graph with the new state.
chanPolicies, newChanUpdates, err := d.processChanPolicyUpdate(
policyUpdate,
newChanUpdates, err := d.processChanPolicyUpdate(
policyUpdate.edgesToUpdate,
)
update := updatedChanPolicies{
chanPolicies,
err,
}
policyUpdate.chanPolicies <- update
policyUpdate.errChan <- err
if err != nil {
log.Errorf("Unable to craft policy updates: %v",
err)
@ -1317,102 +1316,29 @@ func (d *AuthenticatedGossiper) retransmitStaleAnns(now time.Time) error {
return nil
}
// processChanPolicyUpdate generates a new set of channel updates with the new
// channel policy applied for each specified channel identified by its channel
// point. In the case that no channel points are specified, then the update
// will be applied to all channels. Finally, the backing ChannelGraphSource is
// updated with the latest information reflecting the applied updates.
//
// TODO(roasbeef): generalize into generic for any channel update
// processChanPolicyUpdate generates a new set of channel updates for the
// provided list of edges and updates the backing ChannelGraphSource.
func (d *AuthenticatedGossiper) processChanPolicyUpdate(
policyUpdate *chanPolicyUpdateRequest) (
map[wire.OutPoint]*channeldb.ChannelEdgePolicy, []networkMsg, error) {
edgesToUpdate []EdgeWithInfo) ([]networkMsg, error) {
// First, we'll construct a set of all the channels that need to be
// updated.
chansToUpdate := make(map[wire.OutPoint]struct{})
for _, chanPoint := range policyUpdate.targetChans {
chansToUpdate[chanPoint] = struct{}{}
}
// Next, we'll create a mapping from outpoint to edge policy that will
// be used by each edge's underlying link to update its policy.
chanPolicies := make(map[wire.OutPoint]*channeldb.ChannelEdgePolicy)
haveChanFilter := len(chansToUpdate) != 0
if haveChanFilter {
log.Infof("Updating routing policies for chan_points=%v",
spew.Sdump(chansToUpdate))
} else {
log.Infof("Updating routing policies for all chans")
}
type edgeWithInfo struct {
info *channeldb.ChannelEdgeInfo
edge *channeldb.ChannelEdgePolicy
}
var edgesToUpdate []edgeWithInfo
// Next, we'll loop over all the outgoing channels the router knows of.
// If we have a filter then we'll only collected those channels,
// otherwise we'll collect them all.
err := d.cfg.Router.ForAllOutgoingChannels(func(
info *channeldb.ChannelEdgeInfo,
edge *channeldb.ChannelEdgePolicy) error {
// If we have a channel filter, and this channel isn't a part
// of it, then we'll skip it.
if _, ok := chansToUpdate[info.ChannelPoint]; !ok && haveChanFilter {
return nil
}
// Now that we know we should update this channel, we'll update
// its set of policies.
edge.FeeBaseMSat = policyUpdate.newSchema.BaseFee
edge.FeeProportionalMillionths = lnwire.MilliSatoshi(
policyUpdate.newSchema.FeeRate,
)
edge.TimeLockDelta = uint16(policyUpdate.newSchema.TimeLockDelta)
// Max htlc is currently always set to the channel capacity.
edge.MessageFlags |= lnwire.ChanUpdateOptionMaxHtlc
edge.MaxHTLC = lnwire.NewMSatFromSatoshis(info.Capacity)
edgesToUpdate = append(edgesToUpdate, edgeWithInfo{
info: info,
edge: edge,
})
return nil
})
if err != nil {
return nil, nil, err
}
// With the set of edges we need to update retrieved, we'll now re-sign
// them, and insert them into the database.
var chanUpdates []networkMsg
for _, edgeInfo := range edgesToUpdate {
// Now that we've collected all the channels we need to update,
// we'll Re-sign and update the backing ChannelGraphSource, and
// we'll re-sign and update the backing ChannelGraphSource, and
// retrieve our ChannelUpdate to broadcast.
_, chanUpdate, err := d.updateChannel(
edgeInfo.info, edgeInfo.edge,
edgeInfo.Info, edgeInfo.Edge,
)
if err != nil {
return nil, nil, err
return nil, err
}
// Since the update succeeded, add the edge to our policy
// mapping.
chanPolicies[edgeInfo.info.ChannelPoint] = edgeInfo.edge
// We'll avoid broadcasting any updates for private channels to
// avoid directly giving away their existence. Instead, we'll
// send the update directly to the remote party.
if edgeInfo.info.AuthProof == nil {
if edgeInfo.Info.AuthProof == nil {
remotePubKey := remotePubFromChanInfo(
edgeInfo.info, chanUpdate.ChannelFlags,
edgeInfo.Info, chanUpdate.ChannelFlags,
)
err := d.reliableSender.sendMessage(
chanUpdate, remotePubKey,
@ -1435,7 +1361,7 @@ func (d *AuthenticatedGossiper) processChanPolicyUpdate(
})
}
return chanPolicies, chanUpdates, nil
return chanUpdates, nil
}
// processRejectedEdge examines a rejected edge to see if we can extract any

@ -3602,20 +3602,26 @@ out:
// Now that all of our channels are loaded, we'll attempt to update the
// policy of all of them.
const newTimeLockDelta = 100
newPolicy := routing.ChannelPolicy{
TimeLockDelta: newTimeLockDelta,
}
newChanPolicies, err := ctx.gossiper.PropagateChanPolicyUpdate(newPolicy)
var edgesToUpdate []EdgeWithInfo
err = ctx.router.ForAllOutgoingChannels(func(
info *channeldb.ChannelEdgeInfo,
edge *channeldb.ChannelEdgePolicy) error {
edge.TimeLockDelta = uint16(newTimeLockDelta)
edgesToUpdate = append(edgesToUpdate, EdgeWithInfo{
Info: info,
Edge: edge,
})
return nil
})
if err != nil {
t.Fatalf("unable to chan policies: %v", err)
t.Fatal(err)
}
// Ensure that the updated channel policies are as expected.
for _, dbPolicy := range newChanPolicies {
if dbPolicy.TimeLockDelta != uint16(newPolicy.TimeLockDelta) {
t.Fatalf("wrong delta: expected %v, got %v",
newPolicy.TimeLockDelta, dbPolicy.TimeLockDelta)
}
err = ctx.gossiper.PropagateChanPolicyUpdate(edgesToUpdate)
if err != nil {
t.Fatalf("unable to chan policies: %v", err)
}
// Two channel updates should now be broadcast, with neither of them

@ -431,7 +431,7 @@ func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, paymentID uint64,
// forwarding policies for all links have been updated, or the switch shuts
// down.
func (s *Switch) UpdateForwardingPolicies(
chanPolicies map[wire.OutPoint]*channeldb.ChannelEdgePolicy) {
chanPolicies map[wire.OutPoint]ForwardingPolicy) {
log.Tracef("Updating link policies: %v", newLogClosure(func() string {
return spew.Sdump(chanPolicies)
@ -440,7 +440,7 @@ func (s *Switch) UpdateForwardingPolicies(
s.indexMtx.RLock()
// Update each link in chanPolicies.
for targetLink := range chanPolicies {
for targetLink, policy := range chanPolicies {
cid := lnwire.NewChanIDFromOutPoint(&targetLink)
link, ok := s.linkIndex[cid]
@ -450,28 +450,12 @@ func (s *Switch) UpdateForwardingPolicies(
continue
}
newPolicy := dbPolicyToFwdingPolicy(
chanPolicies[*link.ChannelPoint()],
)
link.UpdateForwardingPolicy(newPolicy)
link.UpdateForwardingPolicy(policy)
}
s.indexMtx.RUnlock()
}
// dbPolicyToFwdingPolicy is a helper function that converts a channeldb
// ChannelEdgePolicy into a ForwardingPolicy struct for the purpose of updating
// the forwarding policy of a link.
func dbPolicyToFwdingPolicy(policy *channeldb.ChannelEdgePolicy) ForwardingPolicy {
return ForwardingPolicy{
BaseFee: policy.FeeBaseMSat,
FeeRate: policy.FeeProportionalMillionths,
TimeLockDelta: uint32(policy.TimeLockDelta),
MinHTLC: policy.MinHTLC,
MaxHTLC: policy.MaxHTLC,
}
}
// forward is used in order to find next channel link and apply htlc update.
// Also this function is used by channel links itself in order to forward the
// update after it has been included in the channel.

@ -0,0 +1,145 @@
package localchans
import (
"fmt"
"sync"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/discovery"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing"
)
// Manager manages the node's local channels. The only operation that is
// currently implemented is updating forwarding policies.
type Manager struct {
// UpdateForwardingPolicies is used by the manager to update active
// links with a new policy.
UpdateForwardingPolicies func(
chanPolicies map[wire.OutPoint]htlcswitch.ForwardingPolicy)
// PropagateChanPolicyUpdate is called to persist a new policy to disk
// and broadcast it to the network.
PropagateChanPolicyUpdate func(
edgesToUpdate []discovery.EdgeWithInfo) error
// ForAllOutgoingChannels is required to iterate over all our local
// channels.
ForAllOutgoingChannels func(cb func(*channeldb.ChannelEdgeInfo,
*channeldb.ChannelEdgePolicy) error) error
// policyUpdateLock ensures that the database and the link do not fall
// out of sync if there are concurrent fee update calls. Without it,
// there is a chance that policy A updates the database, then policy B
// updates the database, then policy B updates the link, then policy A
// updates the link.
policyUpdateLock sync.Mutex
}
// UpdatePolicy updates the policy for the specified channels on disk and in the
// active links.
func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy,
chanPoints ...wire.OutPoint) error {
r.policyUpdateLock.Lock()
defer r.policyUpdateLock.Unlock()
// First, we'll construct a set of all the channels that need to be
// updated.
chansToUpdate := make(map[wire.OutPoint]struct{})
for _, chanPoint := range chanPoints {
chansToUpdate[chanPoint] = struct{}{}
}
haveChanFilter := len(chansToUpdate) != 0
var edgesToUpdate []discovery.EdgeWithInfo
policiesToUpdate := make(map[wire.OutPoint]htlcswitch.ForwardingPolicy)
// Next, we'll loop over all the outgoing channels the router knows of.
// If we have a filter then we'll only collected those channels,
// otherwise we'll collect them all.
err := r.ForAllOutgoingChannels(func(
info *channeldb.ChannelEdgeInfo,
edge *channeldb.ChannelEdgePolicy) error {
// If we have a channel filter, and this channel isn't a part
// of it, then we'll skip it.
_, ok := chansToUpdate[info.ChannelPoint]
if !ok && haveChanFilter {
return nil
}
// Apply the new policy to the edge.
err := r.updateEdge(info.Capacity, edge, newSchema)
if err != nil {
return nil
}
// Add updated edge to list of edges to send to gossiper.
edgesToUpdate = append(edgesToUpdate, discovery.EdgeWithInfo{
Info: info,
Edge: edge,
})
// Add updated policy to list of policies to send to switch.
policiesToUpdate[info.ChannelPoint] = htlcswitch.ForwardingPolicy{
BaseFee: edge.FeeBaseMSat,
FeeRate: edge.FeeProportionalMillionths,
TimeLockDelta: uint32(edge.TimeLockDelta),
MinHTLC: edge.MinHTLC,
MaxHTLC: edge.MaxHTLC,
}
return nil
})
if err != nil {
return err
}
// Commit the policy updates to disk and broadcast to the network. We
// validated the new policy above, so we expect no validation errors. If
// this would happen because of a bug, the link policy will be
// desynchronized. It is currently not possible to atomically commit
// multiple edge updates.
err = r.PropagateChanPolicyUpdate(edgesToUpdate)
if err != nil {
return err
}
// Update active links.
r.UpdateForwardingPolicies(policiesToUpdate)
return nil
}
// updateEdge updates the given edge with the new schema.
func (r *Manager) updateEdge(capacity btcutil.Amount,
edge *channeldb.ChannelEdgePolicy,
newSchema routing.ChannelPolicy) error {
// Update forwarding fee scheme and required time lock delta.
edge.FeeBaseMSat = newSchema.BaseFee
edge.FeeProportionalMillionths = lnwire.MilliSatoshi(
newSchema.FeeRate,
)
edge.TimeLockDelta = uint16(newSchema.TimeLockDelta)
// Max htlc is currently always set to the channel capacity.
edge.MessageFlags |= lnwire.ChanUpdateOptionMaxHtlc
edge.MaxHTLC = lnwire.NewMSatFromSatoshis(capacity)
// Validate htlc amount constraints.
if edge.MinHTLC > edge.MaxHTLC {
return fmt.Errorf("min_htlc %v greater than max_htlc %v",
edge.MinHTLC, edge.MaxHTLC)
}
// Clear signature to help prevent usage of the previous signature.
edge.SetSigBytes(nil)
return nil
}

@ -0,0 +1,111 @@
package localchans
import (
"testing"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcutil"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/discovery"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/routing"
)
// TestManager tests that the local channel manager properly propagates fee
// updates to gossiper and links.
func TestManager(t *testing.T) {
chanPoint := wire.OutPoint{Hash: chainhash.Hash{1}, Index: 2}
chanCap := btcutil.Amount(1000)
newPolicy := routing.ChannelPolicy{
FeeSchema: routing.FeeSchema{
BaseFee: 100,
FeeRate: 200,
},
TimeLockDelta: 80,
}
updateForwardingPolicies := func(
chanPolicies map[wire.OutPoint]htlcswitch.ForwardingPolicy) {
if len(chanPolicies) != 1 {
t.Fatal("unexpected number of policies to apply")
}
policy := chanPolicies[chanPoint]
if policy.TimeLockDelta != newPolicy.TimeLockDelta {
t.Fatal("unexpected time lock delta")
}
if policy.BaseFee != newPolicy.BaseFee {
t.Fatal("unexpected base fee")
}
if uint32(policy.FeeRate) != newPolicy.FeeRate {
t.Fatal("unexpected base fee")
}
if policy.MaxHTLC != lnwire.NewMSatFromSatoshis(chanCap) {
t.Fatal("unexpected max htlc")
}
}
propagateChanPolicyUpdate := func(
edgesToUpdate []discovery.EdgeWithInfo) error {
if len(edgesToUpdate) != 1 {
t.Fatal("unexpected number of edges to update")
}
policy := edgesToUpdate[0].Edge
if !policy.MessageFlags.HasMaxHtlc() {
t.Fatal("expected max htlc flag")
}
if policy.TimeLockDelta != uint16(newPolicy.TimeLockDelta) {
t.Fatal("unexpected time lock delta")
}
if policy.FeeBaseMSat != newPolicy.BaseFee {
t.Fatal("unexpected base fee")
}
if uint32(policy.FeeProportionalMillionths) != newPolicy.FeeRate {
t.Fatal("unexpected base fee")
}
if policy.MaxHTLC != lnwire.NewMSatFromSatoshis(chanCap) {
t.Fatal("unexpected max htlc")
}
return nil
}
forAllOutgoingChannels := func(cb func(*channeldb.ChannelEdgeInfo,
*channeldb.ChannelEdgePolicy) error) error {
return cb(
&channeldb.ChannelEdgeInfo{
Capacity: chanCap,
ChannelPoint: chanPoint,
},
&channeldb.ChannelEdgePolicy{},
)
}
manager := Manager{
UpdateForwardingPolicies: updateForwardingPolicies,
PropagateChanPolicyUpdate: propagateChanPolicyUpdate,
ForAllOutgoingChannels: forAllOutgoingChannels,
}
// Test updating a specific channels.
err := manager.UpdatePolicy(newPolicy, chanPoint)
if err != nil {
t.Fatal(err)
}
// Test updating all channels, which comes down to the same as testing a
// specific channel because there is only one channel.
err = manager.UpdatePolicy(newPolicy)
if err != nil {
t.Fatal(err)
}
}

@ -4545,12 +4545,6 @@ func (r *rpcServer) FeeReport(ctx context.Context,
// 0.000001, or 0.0001%.
const minFeeRate = 1e-6
// policyUpdateLock ensures that the database and the link do not fall out of
// sync if there are concurrent fee update calls. Without it, there is a chance
// that policy A updates the database, then policy B updates the database, then
// policy B updates the link, then policy A updates the link.
var policyUpdateLock sync.Mutex
// UpdateChannelPolicy allows the caller to update the channel forwarding policy
// for all channels globally, or a particular channel.
func (r *rpcServer) UpdateChannelPolicy(ctx context.Context,
@ -4615,22 +4609,13 @@ func (r *rpcServer) UpdateChannelPolicy(ctx context.Context,
req.BaseFeeMsat, req.FeeRate, feeRateFixed, req.TimeLockDelta,
spew.Sdump(targetChans))
// With the scope resolved, we'll now send this to the
// AuthenticatedGossiper so it can propagate the new policy for our
// target channel(s).
policyUpdateLock.Lock()
defer policyUpdateLock.Unlock()
chanPolicies, err := r.server.authGossiper.PropagateChanPolicyUpdate(
chanPolicy, targetChans...,
)
// With the scope resolved, we'll now send this to the local channel
// manager so it can propagate the new policy for our target channel(s).
err := r.server.localChanMgr.UpdatePolicy(chanPolicy, targetChans...)
if err != nil {
return nil, err
}
// Finally, we'll apply the set of channel policies to the target
// channels' links.
r.server.htlcSwitch.UpdateForwardingPolicies(chanPolicies)
return &lnrpc.PolicyUpdateResponse{}, nil
}

@ -48,6 +48,7 @@ import (
"github.com/lightningnetwork/lnd/peernotifier"
"github.com/lightningnetwork/lnd/pool"
"github.com/lightningnetwork/lnd/routing"
"github.com/lightningnetwork/lnd/routing/localchans"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/lightningnetwork/lnd/sweep"
"github.com/lightningnetwork/lnd/ticker"
@ -203,6 +204,8 @@ type server struct {
authGossiper *discovery.AuthenticatedGossiper
localChanMgr *localchans.Manager
utxoNursery *utxoNursery
sweeper *sweep.UtxoSweeper
@ -735,6 +738,12 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB,
s.identityPriv.PubKey(),
)
s.localChanMgr = &localchans.Manager{
ForAllOutgoingChannels: s.chanRouter.ForAllOutgoingChannels,
PropagateChanPolicyUpdate: s.authGossiper.PropagateChanPolicyUpdate,
UpdateForwardingPolicies: s.htlcSwitch.UpdateForwardingPolicies,
}
utxnStore, err := newNurseryStore(activeNetParams.GenesisHash, chanDB)
if err != nil {
srvrLog.Errorf("unable to create nursery store: %v", err)