lnd.xprv/autopilot/agent.go

696 lines
22 KiB
Go
Raw Normal View History

2017-08-11 07:14:41 +03:00
package autopilot
import (
"net"
2017-08-11 07:14:41 +03:00
"sync"
"sync/atomic"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcutil"
2018-07-31 10:17:17 +03:00
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/lnwire"
2017-08-11 07:14:41 +03:00
)
2018-01-30 03:07:26 +03:00
// Config couples all the items that an autopilot agent needs to function.
2017-08-11 07:14:41 +03:00
// All items within the struct MUST be populated for the Agent to be able to
// carry out its duties.
type Config struct {
// Self is the identity public key of the Lightning Network node that
// is being driven by the agent. This is used to ensure that we don't
// accidentally attempt to open a channel with ourselves.
Self *btcec.PublicKey
// Heuristic is an attachment heuristic which will govern to whom we
// open channels to, and also what those channels look like in terms of
// desired capacity. The Heuristic will take into account the current
// state of the graph, our set of open channels, and the amount of
// available funds when determining how channels are to be opened.
// Additionally, a heuristic make also factor in extra-graph
// information in order to make more pertinent recommendations.
Heuristic AttachmentHeuristic
// ChanController is an interface that is able to directly manage the
// creation, closing and update of channels within the network.
ChanController ChannelController
// ConnectToPeer attempts to connect to the peer using one of its
// advertised addresses. The boolean returned signals whether the peer
// was already connected.
ConnectToPeer func(*btcec.PublicKey, []net.Addr) (bool, error)
// DisconnectPeer attempts to disconnect the peer with the given public
// key.
DisconnectPeer func(*btcec.PublicKey) error
2017-08-11 07:14:41 +03:00
// WalletBalance is a function closure that should return the current
2018-10-09 19:28:34 +03:00
// available balance of the backing wallet.
2017-08-11 07:14:41 +03:00
WalletBalance func() (btcutil.Amount, error)
// Graph is an abstract channel graph that the Heuristic and the Agent
// will use to make decisions w.r.t channel allocation and placement
// within the graph.
Graph ChannelGraph
// MaxPendingOpens is the maximum number of pending channel
// establishment goroutines that can be lingering. We cap this value in
2018-10-09 19:28:34 +03:00
// order to control the level of parallelism caused by the autopilot
// agent.
MaxPendingOpens uint16
2017-08-11 07:14:41 +03:00
// TODO(roasbeef): add additional signals from fee rates and revenue of
// currently opened channels
}
// channelState is a type that represents the set of active channels of the
2018-10-09 19:28:34 +03:00
// backing LN node that the Agent should be aware of. This type contains a few
2017-08-11 07:14:41 +03:00
// helper utility methods.
type channelState map[lnwire.ShortChannelID]Channel
// Channels returns a slice of all the active channels.
2017-08-11 07:14:41 +03:00
func (c channelState) Channels() []Channel {
chans := make([]Channel, 0, len(c))
for _, channel := range c {
chans = append(chans, channel)
}
return chans
}
// ConnectedNodes returns the set of nodes we currently have a channel with.
// This information is needed as we want to avoid making repeated channels with
// any node.
func (c channelState) ConnectedNodes() map[NodeID]struct{} {
nodes := make(map[NodeID]struct{})
for _, channels := range c {
nodes[channels.Node] = struct{}{}
}
// TODO(roasbeef): add outgoing, nodes, allow incoming and outgoing to
// per node
// * only add node is chan as funding amt set
2017-08-11 07:14:41 +03:00
return nodes
}
// Agent implements a closed-loop control system which seeks to autonomously
// optimize the allocation of satoshis within channels throughput the network's
// channel graph. An agent is configurable by swapping out different
// AttachmentHeuristic strategies. The agent uses external signals such as the
// wallet balance changing, or new channels being opened/closed for the local
// node as an indicator to re-examine its internal state, and the amount of
// available funds in order to make updated decisions w.r.t the channel graph.
// The Agent will automatically open, close, and splice in/out channel as
// necessary for it to step closer to its optimal state.
//
// TODO(roasbeef): prob re-word
type Agent struct {
// Only to be used atomically.
started uint32
stopped uint32
// cfg houses the configuration state of the Ant.
cfg Config
// chanState tracks the current set of open channels.
chanState channelState
// stateUpdates is a channel that any external state updates that may
// affect the heuristics of the agent will be sent over.
stateUpdates chan interface{}
// balanceUpdates is a channel where notifications about updates to the
// wallet's balance will be sent. This channel will be buffered to
// ensure we have at most one pending update of this type to handle at
// a given time.
balanceUpdates chan *balanceUpdate
// nodeUpdates is a channel that changes to the graph node landscape
// will be sent over. This channel will be buffered to ensure we have
// at most one pending update of this type to handle at a given time.
nodeUpdates chan *nodeUpdates
// pendingOpenUpdates is a channel where updates about channel pending
// opening will be sent. This channel will be buffered to ensure we
// have at most one pending update of this type to handle at a given
// time.
pendingOpenUpdates chan *chanPendingOpenUpdate
// chanOpenFailures is a channel where updates about channel open
// failures will be sent. This channel will be buffered to ensure we
// have at most one pending update of this type to handle at a given
// time.
chanOpenFailures chan *chanOpenFailureUpdate
2017-08-11 07:14:41 +03:00
// totalBalance is the total number of satoshis the backing wallet is
// known to control at any given instance. This value will be updated
// when the agent receives external balance update signals.
totalBalance btcutil.Amount
quit chan struct{}
wg sync.WaitGroup
}
// New creates a new instance of the Agent instantiated using the passed
// configuration and initial channel state. The initial channel state slice
// should be populated with the set of Channels that are currently opened by
// the backing Lightning Node.
func New(cfg Config, initialState []Channel) (*Agent, error) {
a := &Agent{
cfg: cfg,
chanState: make(map[lnwire.ShortChannelID]Channel),
quit: make(chan struct{}),
stateUpdates: make(chan interface{}),
balanceUpdates: make(chan *balanceUpdate, 1),
nodeUpdates: make(chan *nodeUpdates, 1),
chanOpenFailures: make(chan *chanOpenFailureUpdate, 1),
pendingOpenUpdates: make(chan *chanPendingOpenUpdate, 1),
2017-08-11 07:14:41 +03:00
}
for _, c := range initialState {
a.chanState[c.ChanID] = c
}
return a, nil
}
// Start starts the agent along with any goroutines it needs to perform its
// normal duties.
func (a *Agent) Start() error {
if !atomic.CompareAndSwapUint32(&a.started, 0, 1) {
return nil
}
log.Infof("Autopilot Agent starting")
a.wg.Add(1)
go a.controller()
2017-08-11 07:14:41 +03:00
return nil
}
// Stop signals the Agent to gracefully shutdown. This function will block
// until all goroutines have exited.
func (a *Agent) Stop() error {
if !atomic.CompareAndSwapUint32(&a.stopped, 0, 1) {
return nil
}
log.Infof("Autopilot Agent stopping")
close(a.quit)
a.wg.Wait()
return nil
}
// balanceUpdate is a type of external state update that reflects an
// increase/decrease in the funds currently available to the wallet.
type balanceUpdate struct {
}
// nodeUpdates is a type of external state update that reflects an addition or
// modification in channel graph node membership.
type nodeUpdates struct{}
// chanOpenUpdate is a type of external state update that indicates a new
2017-08-11 07:14:41 +03:00
// channel has been opened, either by the Agent itself (within the main
// controller loop), or by an external user to the system.
type chanOpenUpdate struct {
newChan Channel
}
// chanPendingOpenUpdate is a type of external state update that indicates a new
// channel has been opened, either by the agent itself or an external subsystem,
// but is still pending.
type chanPendingOpenUpdate struct{}
// chanOpenFailureUpdate is a type of external state update that indicates
// a previous channel open failed, and that it might be possible to try again.
type chanOpenFailureUpdate struct{}
2017-08-11 07:14:41 +03:00
// chanCloseUpdate is a type of external state update that indicates that the
// backing Lightning Node has closed a previously open channel.
type chanCloseUpdate struct {
closedChans []lnwire.ShortChannelID
}
// OnBalanceChange is a callback that should be executed each time the balance
// of the backing wallet changes.
func (a *Agent) OnBalanceChange() {
select {
case a.balanceUpdates <- &balanceUpdate{}:
default:
}
2017-08-11 07:14:41 +03:00
}
// OnNodeUpdates is a callback that should be executed each time our channel
// graph has new nodes or their node announcements are updated.
func (a *Agent) OnNodeUpdates() {
select {
case a.nodeUpdates <- &nodeUpdates{}:
default:
}
}
2017-08-11 07:14:41 +03:00
// OnChannelOpen is a callback that should be executed each time a new channel
// is manually opened by the user or any system outside the autopilot agent.
func (a *Agent) OnChannelOpen(c Channel) {
a.wg.Add(1)
2017-08-11 07:14:41 +03:00
go func() {
defer a.wg.Done()
select {
case a.stateUpdates <- &chanOpenUpdate{newChan: c}:
case <-a.quit:
2017-08-11 07:14:41 +03:00
}
}()
}
// OnChannelPendingOpen is a callback that should be executed each time a new
// channel is opened, either by the agent or an external subsystems, but is
// still pending.
func (a *Agent) OnChannelPendingOpen() {
select {
case a.pendingOpenUpdates <- &chanPendingOpenUpdate{}:
default:
}
}
// OnChannelOpenFailure is a callback that should be executed when the
// autopilot has attempted to open a channel, but failed. In this case we can
// retry channel creation with a different node.
func (a *Agent) OnChannelOpenFailure() {
select {
case a.chanOpenFailures <- &chanOpenFailureUpdate{}:
default:
}
}
2017-08-11 07:14:41 +03:00
// OnChannelClose is a callback that should be executed each time a prior
// channel has been closed for any reason. This includes regular
// closes, force closes, and channel breaches.
func (a *Agent) OnChannelClose(closedChans ...lnwire.ShortChannelID) {
a.wg.Add(1)
2017-08-11 07:14:41 +03:00
go func() {
defer a.wg.Done()
select {
case a.stateUpdates <- &chanCloseUpdate{closedChans: closedChans}:
case <-a.quit:
2017-08-11 07:14:41 +03:00
}
}()
}
// mergeNodeMaps merges the Agent's set of nodes that it already has active
// channels open to, with the other sets of nodes that should be removed from
// consideration during heuristic selection. This ensures that the Agent doesn't
// attempt to open any "duplicate" channels to the same node.
func mergeNodeMaps(c map[NodeID]Channel,
skips ...map[NodeID]struct{}) map[NodeID]struct{} {
numNodes := len(c)
for _, skip := range skips {
numNodes += len(skip)
}
res := make(map[NodeID]struct{}, len(c)+numNodes)
for nodeID := range c {
res[nodeID] = struct{}{}
}
for _, skip := range skips {
for nodeID := range skip {
res[nodeID] = struct{}{}
}
}
return res
}
// mergeChanState merges the Agent's set of active channels, with the set of
// channels awaiting confirmation. This ensures that the agent doesn't go over
// the prescribed channel limit or fund allocation limit.
func mergeChanState(pendingChans map[NodeID]Channel,
activeChans channelState) []Channel {
numChans := len(pendingChans) + len(activeChans)
totalChans := make([]Channel, 0, numChans)
for _, activeChan := range activeChans.Channels() {
totalChans = append(totalChans, activeChan)
}
for _, pendingChan := range pendingChans {
totalChans = append(totalChans, pendingChan)
}
return totalChans
}
2017-08-11 07:14:41 +03:00
// controller implements the closed-loop control system of the Agent. The
// controller will make a decision w.r.t channel placement within the graph
// based on: its current internal state of the set of active channels open,
2017-08-11 07:14:41 +03:00
// and external state changes as a result of decisions it makes w.r.t channel
// allocation, or attributes affecting its control loop being updated by the
// backing Lightning Node.
func (a *Agent) controller() {
2017-08-11 07:14:41 +03:00
defer a.wg.Done()
// We'll start off by assigning our starting balance, and injecting
// that amount as an initial wake up to the main controller goroutine.
a.OnBalanceChange()
2017-08-11 07:14:41 +03:00
// TODO(roasbeef): do we in fact need to maintain order?
// * use sync.Cond if so
// failedNodes lists nodes that we've previously attempted to initiate
// channels with, but didn't succeed.
failedNodes := make(map[NodeID]struct{})
// pendingConns tracks the nodes that we are attempting to make
// connections to. This prevents us from making duplicate connection
// requests to the same node.
pendingConns := make(map[NodeID]struct{})
// pendingOpens tracks the channels that we've requested to be
// initiated, but haven't yet been confirmed as being fully opened.
// This state is required as otherwise, we may go over our allotted
// channel limit, or open multiple channels to the same node.
pendingOpens := make(map[NodeID]Channel)
var pendingMtx sync.Mutex
updateBalance := func() {
newBalance, err := a.cfg.WalletBalance()
if err != nil {
log.Warnf("unable to update wallet balance: %v", err)
return
}
a.totalBalance = newBalance
}
2017-08-11 07:14:41 +03:00
// TODO(roasbeef): add 10-minute wake up timer
for {
select {
// A new external signal has arrived. We'll use this to update
// our internal state, then determine if we should trigger a
// channel state modification (open/close, splice in/out).
case signal := <-a.stateUpdates:
log.Infof("Processing new external signal")
switch update := signal.(type) {
// A new channel has been opened successfully. This was
// either opened by the Agent, or an external system
// that is able to drive the Lightning Node.
case *chanOpenUpdate:
log.Debugf("New channel successfully opened, "+
"updating state with: %v",
spew.Sdump(update.newChan))
newChan := update.newChan
a.chanState[newChan.ChanID] = newChan
pendingMtx.Lock()
delete(pendingOpens, newChan.Node)
pendingMtx.Unlock()
updateBalance()
2017-08-11 07:14:41 +03:00
// A channel has been closed, this may free up an
// available slot, triggering a new channel update.
case *chanCloseUpdate:
log.Debugf("Applying closed channel "+
"updates: %v",
spew.Sdump(update.closedChans))
for _, closedChan := range update.closedChans {
delete(a.chanState, closedChan)
}
updateBalance()
2017-08-11 07:14:41 +03:00
}
// A new channel has been opened by the agent or an external
// subsystem, but is still pending confirmation.
case <-a.pendingOpenUpdates:
updateBalance()
// The balance of the backing wallet has changed, if more funds
// are now available, we may attempt to open up an additional
// channel, or splice in funds to an existing one.
case <-a.balanceUpdates:
log.Debug("Applying external balance state update")
updateBalance()
// The channel we tried to open previously failed for whatever
// reason.
case <-a.chanOpenFailures:
log.Debug("Retrying after previous channel open " +
"failure.")
updateBalance()
// New nodes have been added to the graph or their node
// announcements have been updated. We will consider opening
// channels to these nodes if we haven't stabilized.
case <-a.nodeUpdates:
log.Infof("Node updates received, assessing " +
"need for more channels")
// The agent has been signalled to exit, so we'll bail out
// immediately.
case <-a.quit:
return
}
2017-08-11 07:14:41 +03:00
pendingMtx.Lock()
log.Debugf("Pending channels: %v", spew.Sdump(pendingOpens))
pendingMtx.Unlock()
// With all the updates applied, we'll obtain a set of the
// current active channels (confirmed channels), and also
// factor in our set of unconfirmed channels.
confirmedChans := a.chanState
pendingMtx.Lock()
totalChans := mergeChanState(pendingOpens, confirmedChans)
pendingMtx.Unlock()
// Now that we've updated our internal state, we'll consult our
// channel attachment heuristic to determine if we should open
// up any additional channels or modify existing channels.
availableFunds, numChans, needMore := a.cfg.Heuristic.NeedMoreChans(
totalChans, a.totalBalance,
)
if !needMore {
continue
}
log.Infof("Triggering attachment directive dispatch, "+
"total_funds=%v", a.totalBalance)
2018-10-09 19:28:34 +03:00
// We're to attempt an attachment so we'll obtain the set of
// nodes that we currently have channels with so we avoid
// duplicate edges.
connectedNodes := a.chanState.ConnectedNodes()
pendingMtx.Lock()
nodesToSkip := mergeNodeMaps(pendingOpens,
pendingConns, connectedNodes, failedNodes,
)
pendingMtx.Unlock()
// If we reach this point, then according to our heuristic we
// should modify our channel state to tend towards what it
// determines to the optimal state. So we'll call Select to get
// a fresh batch of attachment directives, passing in the
// amount of funds available for us to use.
chanCandidates, err := a.cfg.Heuristic.Select(
a.cfg.Self, a.cfg.Graph, availableFunds,
numChans, nodesToSkip,
)
if err != nil {
log.Errorf("Unable to select candidates for "+
"attachment: %v", err)
continue
}
2017-08-11 07:14:41 +03:00
if len(chanCandidates) == 0 {
log.Infof("No eligible candidates to connect to")
continue
}
log.Infof("Attempting to execute channel attachment "+
"directives: %v", spew.Sdump(chanCandidates))
// Before proceeding, check to see if we have any slots
// available to open channels. If there are any, we will attempt
// to dispatch the retrieved directives since we can't be
// certain which ones may actually succeed. If too many
// connections succeed, we will they will be ignored and made
// available to future heuristic selections.
pendingMtx.Lock()
if uint16(len(pendingOpens)) >= a.cfg.MaxPendingOpens {
pendingMtx.Unlock()
log.Debugf("Reached cap of %v pending "+
"channel opens, will retry "+
"after success/failure",
a.cfg.MaxPendingOpens)
continue
}
// For each recommended attachment directive, we'll launch a
// new goroutine to attempt to carry out the directive. If any
// of these succeed, then we'll receive a new state update,
// taking us back to the top of our controller loop.
for _, chanCandidate := range chanCandidates {
// Skip candidates which we are already trying
// to establish a connection with.
nodeID := chanCandidate.NodeID
if _, ok := pendingConns[nodeID]; ok {
2017-08-11 07:14:41 +03:00
continue
}
pendingConns[nodeID] = struct{}{}
2017-08-11 07:14:41 +03:00
go func(directive AttachmentDirective) {
// We'll start out by attempting to connect to
// the peer in order to begin the funding
// workflow.
pub := directive.NodeKey
alreadyConnected, err := a.cfg.ConnectToPeer(
pub, directive.Addrs,
)
if err != nil {
log.Warnf("Unable to connect "+
"to %x: %v",
pub.SerializeCompressed(),
err)
// Since we failed to connect to them,
// we'll mark them as failed so that we
// don't attempt to connect to them
// again.
nodeID := NewNodeID(pub)
pendingMtx.Lock()
delete(pendingConns, nodeID)
failedNodes[nodeID] = struct{}{}
pendingMtx.Unlock()
// Finally, we'll trigger the agent to
// select new peers to connect to.
a.OnChannelOpenFailure()
return
}
// The connection was successful, though before
// progressing we must check that we have not
// already met our quota for max pending open
// channels. This can happen if multiple
// directives were spawned but fewer slots were
// available, and other successful attempts
// finished first.
pendingMtx.Lock()
if uint16(len(pendingOpens)) >=
a.cfg.MaxPendingOpens {
// Since we've reached our max number of
// pending opens, we'll disconnect this
// peer and exit. However, if we were
// previously connected to them, then
// we'll make sure to maintain the
// connection alive.
if alreadyConnected {
// Since we succeeded in
// connecting, we won't add this
// peer to the failed nodes map,
// but we will remove it from
// pendingConns so that it can
// be retried in the future.
delete(pendingConns, nodeID)
pendingMtx.Unlock()
return
}
err = a.cfg.DisconnectPeer(
pub,
)
if err != nil {
log.Warnf("Unable to "+
"disconnect peer "+
"%x: %v",
pub.SerializeCompressed(),
err)
}
// Now that we have disconnected, we can
// remove this node from our pending
// conns map, permitting subsequent
// connection attempts.
delete(pendingConns, nodeID)
pendingMtx.Unlock()
return
}
// If we were successful, we'll track this peer
// in our set of pending opens. We do this here
// to ensure we don't stall on selecting new
// peers if the connection attempt happens to
// take too long.
nodeID := directive.NodeID
delete(pendingConns, nodeID)
pendingOpens[nodeID] = Channel{
Capacity: directive.ChanAmt,
Node: nodeID,
}
pendingMtx.Unlock()
// We can then begin the funding workflow with
// this peer.
err = a.cfg.ChanController.OpenChannel(
pub, directive.ChanAmt,
)
if err != nil {
log.Warnf("Unable to open "+
"channel to %x of %v: %v",
pub.SerializeCompressed(),
directive.ChanAmt, err)
// As the attempt failed, we'll clear
// the peer from the set of pending
// opens and mark them as failed so we
// don't attempt to open a channel to
// them again.
pendingMtx.Lock()
delete(pendingOpens, nodeID)
failedNodes[nodeID] = struct{}{}
pendingMtx.Unlock()
// Trigger the agent to re-evaluate
// everything and possibly retry with a
// different node.
a.OnChannelOpenFailure()
// Finally, we should also disconnect
// the peer if we weren't already
// connected to them beforehand by an
// external subsystem.
if alreadyConnected {
return
}
err = a.cfg.DisconnectPeer(pub)
2017-08-11 07:14:41 +03:00
if err != nil {
log.Warnf("Unable to "+
"disconnect peer "+
"%x: %v",
2017-08-11 07:14:41 +03:00
pub.SerializeCompressed(),
err)
}
}
// Since the channel open was successful and is
// currently pending, we'll trigger the
// autopilot agent to query for more peers.
a.OnChannelPendingOpen()
}(chanCandidate)
2017-08-11 07:14:41 +03:00
}
pendingMtx.Unlock()
2017-08-11 07:14:41 +03:00
}
}