lnd.xprv/autopilot/manager.go
Johan T. Halseth dbeafe8832
autopilot: rename to Capacity to Balance, define ChannelInfo()
We rename the field Capacity on local channels to Balance, define a new
method ChannelInfo on the manager config that let us query the database
for our latest channels state. Using this we will use the current local
balance instead of the channel capacity when doing allocation
calculations.
2020-10-05 12:24:07 +02:00

393 lines
9.8 KiB
Go

package autopilot
import (
"fmt"
"sync"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing"
)
// ManagerCfg houses a set of values and methods that is passed to the Manager
// for it to properly manage its autopilot agent.
type ManagerCfg struct {
// Self is the public key of the lnd instance. It is used to making
// sure the autopilot is not opening channels to itself.
Self *btcec.PublicKey
// PilotCfg is the config of the autopilot agent managed by the
// Manager.
PilotCfg *Config
// ChannelState is a function closure that returns the current set of
// channels managed by this node.
ChannelState func() ([]LocalChannel, error)
// ChannelInfo is a function closure that returns the channel managed
// by the node given by the passed channel point.
ChannelInfo func(wire.OutPoint) (*LocalChannel, error)
// SubscribeTransactions is used to get a subscription for transactions
// relevant to this node's wallet.
SubscribeTransactions func() (lnwallet.TransactionSubscription, error)
// SubscribeTopology is used to get a subscription for topology changes
// on the network.
SubscribeTopology func() (*routing.TopologyClient, error)
}
// Manager is struct that manages an autopilot agent, making it possible to
// enable and disable it at will, and hand it relevant external information.
// It implements the autopilot grpc service, which is used to get data about
// the running autopilot, and give it relevant information.
type Manager struct {
started sync.Once
stopped sync.Once
cfg *ManagerCfg
// pilot is the current autopilot agent. It will be nil if the agent is
// disabled.
pilot *Agent
quit chan struct{}
wg sync.WaitGroup
sync.Mutex
}
// NewManager creates a new instance of the Manager from the passed config.
func NewManager(cfg *ManagerCfg) (*Manager, error) {
return &Manager{
cfg: cfg,
quit: make(chan struct{}),
}, nil
}
// Start starts the Manager.
func (m *Manager) Start() error {
m.started.Do(func() {})
return nil
}
// Stop stops the Manager. If an autopilot agent is active, it will also be
// stopped.
func (m *Manager) Stop() error {
m.stopped.Do(func() {
if err := m.StopAgent(); err != nil {
log.Errorf("Unable to stop pilot: %v", err)
}
close(m.quit)
m.wg.Wait()
})
return nil
}
// IsActive returns whether the autopilot agent is currently active.
func (m *Manager) IsActive() bool {
m.Lock()
defer m.Unlock()
return m.pilot != nil
}
// StartAgent creates and starts an autopilot agent from the Manager's
// config.
func (m *Manager) StartAgent() error {
m.Lock()
defer m.Unlock()
// Already active.
if m.pilot != nil {
return nil
}
// Next, we'll fetch the current state of open channels from the
// database to use as initial state for the auto-pilot agent.
initialChanState, err := m.cfg.ChannelState()
if err != nil {
return err
}
// Now that we have all the initial dependencies, we can create the
// auto-pilot instance itself.
pilot, err := New(*m.cfg.PilotCfg, initialChanState)
if err != nil {
return err
}
if err := pilot.Start(); err != nil {
return err
}
// Finally, we'll need to subscribe to two things: incoming
// transactions that modify the wallet's balance, and also any graph
// topology updates.
txnSubscription, err := m.cfg.SubscribeTransactions()
if err != nil {
pilot.Stop()
return err
}
graphSubscription, err := m.cfg.SubscribeTopology()
if err != nil {
txnSubscription.Cancel()
pilot.Stop()
return err
}
m.pilot = pilot
// We'll launch a goroutine to provide the agent with notifications
// whenever the balance of the wallet changes.
// TODO(halseth): can lead to panic if in process of shutting down.
m.wg.Add(1)
go func() {
defer txnSubscription.Cancel()
defer m.wg.Done()
for {
select {
case <-txnSubscription.ConfirmedTransactions():
pilot.OnBalanceChange()
// We won't act upon new unconfirmed transaction, as
// we'll only use confirmed outputs when funding.
// However, we will still drain this request in order
// to avoid goroutine leaks, and ensure we promptly
// read from the channel if available.
case <-txnSubscription.UnconfirmedTransactions():
case <-pilot.quit:
return
case <-m.quit:
return
}
}
}()
// We'll also launch a goroutine to provide the agent with
// notifications for when the graph topology controlled by the node
// changes.
m.wg.Add(1)
go func() {
defer graphSubscription.Cancel()
defer m.wg.Done()
for {
select {
case topChange, ok := <-graphSubscription.TopologyChanges:
// If the router is shutting down, then we will
// as well.
if !ok {
return
}
for _, edgeUpdate := range topChange.ChannelEdgeUpdates {
// If this isn't an advertisement by
// the backing lnd node, then we'll
// continue as we only want to add
// channels that we've created
// ourselves.
if !edgeUpdate.AdvertisingNode.IsEqual(m.cfg.Self) {
continue
}
// If this is indeed a channel we
// opened, then we'll convert it to the
// autopilot.Channel format, and notify
// the pilot of the new channel.
cp := edgeUpdate.ChanPoint
edge, err := m.cfg.ChannelInfo(cp)
if err != nil {
log.Errorf("Unable to fetch "+
"channel info for %v: "+
"%v", cp, err)
continue
}
pilot.OnChannelOpen(*edge)
}
// For each closed channel, we'll obtain
// the chanID of the closed channel and send it
// to the pilot.
for _, chanClose := range topChange.ClosedChannels {
chanID := lnwire.NewShortChanIDFromInt(
chanClose.ChanID,
)
pilot.OnChannelClose(chanID)
}
// If new nodes were added to the graph, or nod
// information has changed, we'll poke autopilot
// to see if it can make use of them.
if len(topChange.NodeUpdates) > 0 {
pilot.OnNodeUpdates()
}
case <-pilot.quit:
return
case <-m.quit:
return
}
}
}()
log.Debugf("Manager started autopilot agent")
return nil
}
// StopAgent stops any active autopilot agent.
func (m *Manager) StopAgent() error {
m.Lock()
defer m.Unlock()
// Not active, so we can return early.
if m.pilot == nil {
return nil
}
if err := m.pilot.Stop(); err != nil {
return err
}
// Make sure to nil the current agent, indicating it is no longer
// active.
m.pilot = nil
log.Debugf("Manager stopped autopilot agent")
return nil
}
// QueryHeuristics queries the available autopilot heuristics for node scores.
func (m *Manager) QueryHeuristics(nodes []NodeID, localState bool) (
HeuristicScores, error) {
m.Lock()
defer m.Unlock()
n := make(map[NodeID]struct{})
for _, node := range nodes {
n[node] = struct{}{}
}
log.Debugf("Querying heuristics for %d nodes", len(n))
return m.queryHeuristics(n, localState)
}
// HeuristicScores is an alias for a map that maps heuristic names to a map of
// scores for pubkeys.
type HeuristicScores map[string]map[NodeID]float64
// queryHeuristics gets node scores from all available simple heuristics, and
// the agent's current active heuristic.
//
// NOTE: Must be called with the manager's lock.
func (m *Manager) queryHeuristics(nodes map[NodeID]struct{}, localState bool) (
HeuristicScores, error) {
// If we want to take the local state into action when querying the
// heuristics, we fetch it. If not we'll just pass an emply slice to
// the heuristic.
var totalChans []LocalChannel
var err error
if localState {
// Fetch the current set of channels.
totalChans, err = m.cfg.ChannelState()
if err != nil {
return nil, err
}
// If the agent is active, we can merge the channel state with
// the channels pending open.
if m.pilot != nil {
m.pilot.chanStateMtx.Lock()
m.pilot.pendingMtx.Lock()
totalChans = mergeChanState(
m.pilot.pendingOpens, m.pilot.chanState,
)
m.pilot.pendingMtx.Unlock()
m.pilot.chanStateMtx.Unlock()
}
}
// As channel size we'll use the maximum size.
chanSize := m.cfg.PilotCfg.Constraints.MaxChanSize()
// We'll start by getting the scores from each available sub-heuristic,
// in addition the current agent heuristic.
var heuristics []AttachmentHeuristic
heuristics = append(heuristics, availableHeuristics...)
heuristics = append(heuristics, m.cfg.PilotCfg.Heuristic)
report := make(HeuristicScores)
for _, h := range heuristics {
name := h.Name()
// If the agent heuristic is among the simple heuristics it
// might get queried more than once. As an optimization we'll
// just skip it the second time.
if _, ok := report[name]; ok {
continue
}
s, err := h.NodeScores(
m.cfg.PilotCfg.Graph, totalChans, chanSize, nodes,
)
if err != nil {
return nil, fmt.Errorf("unable to get sub score: %v",
err)
}
log.Debugf("Heuristic \"%v\" scored %d nodes", name, len(s))
scores := make(map[NodeID]float64)
for nID, score := range s {
scores[nID] = score.Score
}
report[name] = scores
}
return report, nil
}
// SetNodeScores is used to set the scores of the given heuristic, if it is
// active, and ScoreSettable.
func (m *Manager) SetNodeScores(name string, scores map[NodeID]float64) error {
m.Lock()
defer m.Unlock()
// It must be ScoreSettable to be available for external
// scores.
s, ok := m.cfg.PilotCfg.Heuristic.(ScoreSettable)
if !ok {
return fmt.Errorf("current heuristic doesn't support " +
"external scoring")
}
// Heuristic was found, set its node scores.
applied, err := s.SetNodeScores(name, scores)
if err != nil {
return err
}
if !applied {
return fmt.Errorf("heuristic with name %v not found", name)
}
// If the autopilot agent is active, notify about the updated
// heuristic.
if m.pilot != nil {
m.pilot.OnHeuristicUpdate(m.cfg.PilotCfg.Heuristic)
}
return nil
}