diff --git a/lnd.go b/lnd.go index b9f0a59a..91d1832e 100644 --- a/lnd.go +++ b/lnd.go @@ -38,6 +38,7 @@ import ( proxy "github.com/grpc-ecosystem/grpc-gateway/runtime" flags "github.com/jessevdk/go-flags" + "github.com/lightningnetwork/lnd/autopilot" "github.com/lightningnetwork/lnd/build" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/keychain" @@ -312,6 +313,21 @@ func lndMain() error { return err } + // Set up an auotpilot manager from the current config. This will be + // used to manage the underlying autopilot agent, starting and stopping + // it at will. + atplCfg := initAutoPilot(server, cfg.Autopilot) + atplManager, err := autopilot.NewManager(atplCfg) + if err != nil { + ltndLog.Errorf("unable to create autopilot manager: %v", err) + return err + } + if err := atplManager.Start(); err != nil { + ltndLog.Errorf("unable to start autopilot manager: %v", err) + return err + } + defer atplManager.Stop() + // Initialize, and register our implementation of the gRPC interface // exported by the rpcServer. rpcServer, err := newRPCServer( @@ -375,20 +391,14 @@ func lndMain() error { defer server.Stop() // Now that the server has started, if the autopilot mode is currently - // active, then we'll initialize a fresh instance of it and start it. + // active, then we'll start the autopilot agent immediately. It will be + // stopped together with the autopilot service. if cfg.Autopilot.Active { - pilot, err := initAutoPilot(server, cfg.Autopilot) - if err != nil { - ltndLog.Errorf("unable to create autopilot agent: %v", - err) - return err - } - if err := pilot.Start(); err != nil { + if err := atplManager.StartAgent(); err != nil { ltndLog.Errorf("unable to start autopilot agent: %v", err) return err } - defer pilot.Stop() } // Wait for shutdown signal from either a graceful server stop or from diff --git a/pilot.go b/pilot.go index 174452d7..9f352742 100644 --- a/pilot.go +++ b/pilot.go @@ -79,10 +79,11 @@ func (c *chanController) SpliceOut(chanPoint *wire.OutPoint, // autopilot.ChannelController interface. var _ autopilot.ChannelController = (*chanController)(nil) -// initAutoPilot initializes a new autopilot.Agent instance based on the passed -// configuration struct. All interfaces needed to drive the pilot will be -// registered and launched. -func initAutoPilot(svr *server, cfg *autoPilotConfig) (*autopilot.Agent, error) { +// initAutoPilot initializes a new autopilot.ManagerCfg to manage an +// autopilot.Agent instance based on the passed configuration struct. The agent +// and all interfaces needed to drive it won't be launched before the Manager's +// StartAgent method is called. +func initAutoPilot(svr *server, cfg *autoPilotConfig) *autopilot.ManagerCfg { atplLog.Infof("Instantiating autopilot with cfg: %v", spew.Sdump(cfg)) // Set up the constraints the autopilot heuristics must adhere to. @@ -176,143 +177,33 @@ func initAutoPilot(svr *server, cfg *autoPilotConfig) (*autopilot.Agent, error) DisconnectPeer: svr.DisconnectPeer, } - // Next, we'll fetch the current state of open channels from the - // database to use as initial state for the auto-pilot agent. - activeChannels, err := svr.chanDB.FetchAllChannels() - if err != nil { - return nil, err - } - initialChanState := make([]autopilot.Channel, len(activeChannels)) - for i, channel := range activeChannels { - initialChanState[i] = autopilot.Channel{ - ChanID: channel.ShortChanID(), - Capacity: channel.Capacity, - Node: autopilot.NewNodeID(channel.IdentityPub), - } - } - - // Now that we have all the initial dependencies, we can create the - // auto-pilot instance itself. - pilot, err := autopilot.New(pilotCfg, initialChanState) - if err != nil { - return nil, err - } - - // Finally, we'll need to subscribe to two things: incoming - // transactions that modify the wallet's balance, and also any graph - // topology updates. - txnSubscription, err := svr.cc.wallet.SubscribeTransactions() - if err != nil { - return nil, err - } - graphSubscription, err := svr.chanRouter.SubscribeTopology() - if err != nil { - return nil, err - } - - // We'll launch a goroutine to provide the agent with notifications - // whenever the balance of the wallet changes. - svr.wg.Add(2) - go func() { - defer txnSubscription.Cancel() - defer svr.wg.Done() - - for { - select { - case <-txnSubscription.ConfirmedTransactions(): - pilot.OnBalanceChange() - case <-svr.quit: - return + // Create and return the autopilot.ManagerCfg that administrates this + // agent-pilot instance. + return &autopilot.ManagerCfg{ + Self: self, + PilotCfg: &pilotCfg, + ChannelState: func() ([]autopilot.Channel, error) { + // We'll fetch the current state of open + // channels from the database to use as initial + // state for the auto-pilot agent. + activeChannels, err := svr.chanDB.FetchAllChannels() + if err != nil { + return nil, err } - } - - }() - go func() { - defer svr.wg.Done() - - for { - select { - // We won't act upon new unconfirmed transaction, as - // we'll only use confirmed outputs when funding. - // However, we will still drain this request in order - // to avoid goroutine leaks, and ensure we promptly - // read from the channel if available. - case <-txnSubscription.UnconfirmedTransactions(): - case <-svr.quit: - return + chanState := make([]autopilot.Channel, + len(activeChannels)) + for i, channel := range activeChannels { + chanState[i] = autopilot.Channel{ + ChanID: channel.ShortChanID(), + Capacity: channel.Capacity, + Node: autopilot.NewNodeID( + channel.IdentityPub), + } } - } - }() - - // We'll also launch a goroutine to provide the agent with - // notifications for when the graph topology controlled by the node - // changes. - svr.wg.Add(1) - go func() { - defer graphSubscription.Cancel() - defer svr.wg.Done() - - for { - select { - case topChange, ok := <-graphSubscription.TopologyChanges: - // If the router is shutting down, then we will - // as well. - if !ok { - return - } - - for _, edgeUpdate := range topChange.ChannelEdgeUpdates { - // If this isn't an advertisement by - // the backing lnd node, then we'll - // continue as we only want to add - // channels that we've created - // ourselves. - if !edgeUpdate.AdvertisingNode.IsEqual(self) { - continue - } - - // If this is indeed a channel we - // opened, then we'll convert it to the - // autopilot.Channel format, and notify - // the pilot of the new channel. - chanNode := autopilot.NewNodeID( - edgeUpdate.ConnectingNode, - ) - chanID := lnwire.NewShortChanIDFromInt( - edgeUpdate.ChanID, - ) - edge := autopilot.Channel{ - ChanID: chanID, - Capacity: edgeUpdate.Capacity, - Node: chanNode, - } - pilot.OnChannelOpen(edge) - } - - // For each closed channel, we'll obtain - // the chanID of the closed channel and send it - // to the pilot. - for _, chanClose := range topChange.ClosedChannels { - chanID := lnwire.NewShortChanIDFromInt( - chanClose.ChanID, - ) - - pilot.OnChannelClose(chanID) - } - - // If new nodes were added to the graph, or nod - // information has changed, we'll poke autopilot - // to see if it can make use of them. - if len(topChange.NodeUpdates) > 0 { - pilot.OnNodeUpdates() - } - - case <-svr.quit: - return - } - } - }() - - return pilot, nil + return chanState, nil + }, + SubscribeTransactions: svr.cc.wallet.SubscribeTransactions, + SubscribeTopology: svr.chanRouter.SubscribeTopology, + } }