pilot+lnd: let autopilot.Manager manage pilot-agent

This commit moves the responsibility of managing the life cycle of the
autopilot from main to the autopilot Manager. It utilizes the recently
introduced autopilot Manager, and just sets up the necessary interfaces
for the Manager to properly set up the required subscriptions when
starting the agent.
This commit is contained in:
Johan T. Halseth 2018-12-13 12:26:29 +01:00
parent 6310ed0f1c
commit cff42e06c8
No known key found for this signature in database
GPG Key ID: 15BAADA29DA20D26
2 changed files with 50 additions and 149 deletions

28
lnd.go

@ -38,6 +38,7 @@ import (
proxy "github.com/grpc-ecosystem/grpc-gateway/runtime"
flags "github.com/jessevdk/go-flags"
"github.com/lightningnetwork/lnd/autopilot"
"github.com/lightningnetwork/lnd/build"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/keychain"
@ -312,6 +313,21 @@ func lndMain() error {
return err
}
// Set up an auotpilot manager from the current config. This will be
// used to manage the underlying autopilot agent, starting and stopping
// it at will.
atplCfg := initAutoPilot(server, cfg.Autopilot)
atplManager, err := autopilot.NewManager(atplCfg)
if err != nil {
ltndLog.Errorf("unable to create autopilot manager: %v", err)
return err
}
if err := atplManager.Start(); err != nil {
ltndLog.Errorf("unable to start autopilot manager: %v", err)
return err
}
defer atplManager.Stop()
// Initialize, and register our implementation of the gRPC interface
// exported by the rpcServer.
rpcServer, err := newRPCServer(
@ -375,20 +391,14 @@ func lndMain() error {
defer server.Stop()
// Now that the server has started, if the autopilot mode is currently
// active, then we'll initialize a fresh instance of it and start it.
// active, then we'll start the autopilot agent immediately. It will be
// stopped together with the autopilot service.
if cfg.Autopilot.Active {
pilot, err := initAutoPilot(server, cfg.Autopilot)
if err != nil {
ltndLog.Errorf("unable to create autopilot agent: %v",
err)
return err
}
if err := pilot.Start(); err != nil {
if err := atplManager.StartAgent(); err != nil {
ltndLog.Errorf("unable to start autopilot agent: %v",
err)
return err
}
defer pilot.Stop()
}
// Wait for shutdown signal from either a graceful server stop or from

171
pilot.go

@ -79,10 +79,11 @@ func (c *chanController) SpliceOut(chanPoint *wire.OutPoint,
// autopilot.ChannelController interface.
var _ autopilot.ChannelController = (*chanController)(nil)
// initAutoPilot initializes a new autopilot.Agent instance based on the passed
// configuration struct. All interfaces needed to drive the pilot will be
// registered and launched.
func initAutoPilot(svr *server, cfg *autoPilotConfig) (*autopilot.Agent, error) {
// initAutoPilot initializes a new autopilot.ManagerCfg to manage an
// autopilot.Agent instance based on the passed configuration struct. The agent
// and all interfaces needed to drive it won't be launched before the Manager's
// StartAgent method is called.
func initAutoPilot(svr *server, cfg *autoPilotConfig) *autopilot.ManagerCfg {
atplLog.Infof("Instantiating autopilot with cfg: %v", spew.Sdump(cfg))
// Set up the constraints the autopilot heuristics must adhere to.
@ -176,143 +177,33 @@ func initAutoPilot(svr *server, cfg *autoPilotConfig) (*autopilot.Agent, error)
DisconnectPeer: svr.DisconnectPeer,
}
// Next, we'll fetch the current state of open channels from the
// database to use as initial state for the auto-pilot agent.
activeChannels, err := svr.chanDB.FetchAllChannels()
if err != nil {
return nil, err
}
initialChanState := make([]autopilot.Channel, len(activeChannels))
for i, channel := range activeChannels {
initialChanState[i] = autopilot.Channel{
ChanID: channel.ShortChanID(),
Capacity: channel.Capacity,
Node: autopilot.NewNodeID(channel.IdentityPub),
}
}
// Now that we have all the initial dependencies, we can create the
// auto-pilot instance itself.
pilot, err := autopilot.New(pilotCfg, initialChanState)
if err != nil {
return nil, err
}
// Finally, we'll need to subscribe to two things: incoming
// transactions that modify the wallet's balance, and also any graph
// topology updates.
txnSubscription, err := svr.cc.wallet.SubscribeTransactions()
if err != nil {
return nil, err
}
graphSubscription, err := svr.chanRouter.SubscribeTopology()
if err != nil {
return nil, err
}
// We'll launch a goroutine to provide the agent with notifications
// whenever the balance of the wallet changes.
svr.wg.Add(2)
go func() {
defer txnSubscription.Cancel()
defer svr.wg.Done()
for {
select {
case <-txnSubscription.ConfirmedTransactions():
pilot.OnBalanceChange()
case <-svr.quit:
return
// Create and return the autopilot.ManagerCfg that administrates this
// agent-pilot instance.
return &autopilot.ManagerCfg{
Self: self,
PilotCfg: &pilotCfg,
ChannelState: func() ([]autopilot.Channel, error) {
// We'll fetch the current state of open
// channels from the database to use as initial
// state for the auto-pilot agent.
activeChannels, err := svr.chanDB.FetchAllChannels()
if err != nil {
return nil, err
}
}
}()
go func() {
defer svr.wg.Done()
for {
select {
// We won't act upon new unconfirmed transaction, as
// we'll only use confirmed outputs when funding.
// However, we will still drain this request in order
// to avoid goroutine leaks, and ensure we promptly
// read from the channel if available.
case <-txnSubscription.UnconfirmedTransactions():
case <-svr.quit:
return
chanState := make([]autopilot.Channel,
len(activeChannels))
for i, channel := range activeChannels {
chanState[i] = autopilot.Channel{
ChanID: channel.ShortChanID(),
Capacity: channel.Capacity,
Node: autopilot.NewNodeID(
channel.IdentityPub),
}
}
}
}()
// We'll also launch a goroutine to provide the agent with
// notifications for when the graph topology controlled by the node
// changes.
svr.wg.Add(1)
go func() {
defer graphSubscription.Cancel()
defer svr.wg.Done()
for {
select {
case topChange, ok := <-graphSubscription.TopologyChanges:
// If the router is shutting down, then we will
// as well.
if !ok {
return
}
for _, edgeUpdate := range topChange.ChannelEdgeUpdates {
// If this isn't an advertisement by
// the backing lnd node, then we'll
// continue as we only want to add
// channels that we've created
// ourselves.
if !edgeUpdate.AdvertisingNode.IsEqual(self) {
continue
}
// If this is indeed a channel we
// opened, then we'll convert it to the
// autopilot.Channel format, and notify
// the pilot of the new channel.
chanNode := autopilot.NewNodeID(
edgeUpdate.ConnectingNode,
)
chanID := lnwire.NewShortChanIDFromInt(
edgeUpdate.ChanID,
)
edge := autopilot.Channel{
ChanID: chanID,
Capacity: edgeUpdate.Capacity,
Node: chanNode,
}
pilot.OnChannelOpen(edge)
}
// For each closed channel, we'll obtain
// the chanID of the closed channel and send it
// to the pilot.
for _, chanClose := range topChange.ClosedChannels {
chanID := lnwire.NewShortChanIDFromInt(
chanClose.ChanID,
)
pilot.OnChannelClose(chanID)
}
// If new nodes were added to the graph, or nod
// information has changed, we'll poke autopilot
// to see if it can make use of them.
if len(topChange.NodeUpdates) > 0 {
pilot.OnNodeUpdates()
}
case <-svr.quit:
return
}
}
}()
return pilot, nil
return chanState, nil
},
SubscribeTransactions: svr.cc.wallet.SubscribeTransactions,
SubscribeTopology: svr.chanRouter.SubscribeTopology,
}
}