Merge pull request #2039 from halseth/autopilot-rpcserver

Add Autopilot sub-RPC server
This commit is contained in:
Olaoluwa Osuntokun 2018-12-13 17:33:39 -08:00 committed by GitHub
commit 0fafd5e2fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1017 additions and 159 deletions

10
.gitignore vendored

@ -24,10 +24,10 @@ _testmain.go
*.test *.test
*.prof *.prof
lnd /lnd
lnd-debug /lnd-debug
lncli /lncli
lncli-debug /lncli-debug
# Integration test log files # Integration test log files
output*.log output*.log
@ -54,6 +54,4 @@ profile.tmp
.DS_Store .DS_Store
main*
.vscode .vscode

267
autopilot/manager.go Normal file

@ -0,0 +1,267 @@
package autopilot
import (
"sync"
"sync/atomic"
"github.com/btcsuite/btcd/btcec"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing"
)
// ManagerCfg houses a set of values and methods that is passed to the Manager
// for it to properly manage its autopilot agent.
type ManagerCfg struct {
// Self is the public key of the lnd instance. It is used to making
// sure the autopilot is not opening channels to itself.
Self *btcec.PublicKey
// PilotCfg is the config of the autopilot agent managed by the
// Manager.
PilotCfg *Config
// ChannelState is a function closure that returns the current set of
// channels managed by this node.
ChannelState func() ([]Channel, error)
// SubscribeTransactions is used to get a subscription for transactions
// relevant to this node's wallet.
SubscribeTransactions func() (lnwallet.TransactionSubscription, error)
// SubscribeTopology is used to get a subscription for topology changes
// on the network.
SubscribeTopology func() (*routing.TopologyClient, error)
}
// Manager is struct that manages an autopilot agent, making it possible to
// enable and disable it at will, and hand it relevant external information.
// It implements the autopilot grpc service, which is used to get data about
// the running autopilot, and give it relevant information.
type Manager struct {
started uint32 // To be used atomically.
stopped uint32 // To be used atomically.
cfg *ManagerCfg
// pilot is the current autopilot agent. It will be nil if the agent is
// disabled.
pilot *Agent
quit chan struct{}
wg sync.WaitGroup
sync.Mutex
}
// NewManager creates a new instance of the Manager from the passed config.
func NewManager(cfg *ManagerCfg) (*Manager, error) {
return &Manager{
cfg: cfg,
quit: make(chan struct{}),
}, nil
}
// Start starts the Manager.
func (m *Manager) Start() error {
if !atomic.CompareAndSwapUint32(&m.started, 0, 1) {
return nil
}
return nil
}
// Stop stops the Manager. If an autopilot agent is active, it will also be
// stopped.
func (m *Manager) Stop() error {
if !atomic.CompareAndSwapUint32(&m.stopped, 0, 1) {
return nil
}
if err := m.StopAgent(); err != nil {
log.Errorf("Unable to stop pilot: %v", err)
}
close(m.quit)
m.wg.Wait()
return nil
}
// IsActive returns whether the autopilot agent is currently active.
func (m *Manager) IsActive() bool {
m.Lock()
defer m.Unlock()
return m.pilot != nil
}
// StartAgent creates and starts an autopilot agent from the Manager's
// config.
func (m *Manager) StartAgent() error {
m.Lock()
defer m.Unlock()
// Already active.
if m.pilot != nil {
return nil
}
// Next, we'll fetch the current state of open channels from the
// database to use as initial state for the auto-pilot agent.
initialChanState, err := m.cfg.ChannelState()
if err != nil {
return err
}
// Now that we have all the initial dependencies, we can create the
// auto-pilot instance itself.
m.pilot, err = New(*m.cfg.PilotCfg, initialChanState)
if err != nil {
return err
}
if err := m.pilot.Start(); err != nil {
return err
}
// Finally, we'll need to subscribe to two things: incoming
// transactions that modify the wallet's balance, and also any graph
// topology updates.
txnSubscription, err := m.cfg.SubscribeTransactions()
if err != nil {
defer m.pilot.Stop()
return err
}
graphSubscription, err := m.cfg.SubscribeTopology()
if err != nil {
defer m.pilot.Stop()
defer txnSubscription.Cancel()
return err
}
// We'll launch a goroutine to provide the agent with notifications
// whenever the balance of the wallet changes.
// TODO(halseth): can lead to panic if in process of shutting down.
m.wg.Add(1)
go func() {
defer txnSubscription.Cancel()
defer m.wg.Done()
for {
select {
case <-txnSubscription.ConfirmedTransactions():
m.pilot.OnBalanceChange()
// We won't act upon new unconfirmed transaction, as
// we'll only use confirmed outputs when funding.
// However, we will still drain this request in order
// to avoid goroutine leaks, and ensure we promptly
// read from the channel if available.
case <-txnSubscription.UnconfirmedTransactions():
case <-m.pilot.quit:
return
case <-m.quit:
return
}
}
}()
// We'll also launch a goroutine to provide the agent with
// notifications for when the graph topology controlled by the node
// changes.
m.wg.Add(1)
go func() {
defer graphSubscription.Cancel()
defer m.wg.Done()
for {
select {
case topChange, ok := <-graphSubscription.TopologyChanges:
// If the router is shutting down, then we will
// as well.
if !ok {
return
}
for _, edgeUpdate := range topChange.ChannelEdgeUpdates {
// If this isn't an advertisement by
// the backing lnd node, then we'll
// continue as we only want to add
// channels that we've created
// ourselves.
if !edgeUpdate.AdvertisingNode.IsEqual(m.cfg.Self) {
continue
}
// If this is indeed a channel we
// opened, then we'll convert it to the
// autopilot.Channel format, and notify
// the pilot of the new channel.
chanNode := NewNodeID(
edgeUpdate.ConnectingNode,
)
chanID := lnwire.NewShortChanIDFromInt(
edgeUpdate.ChanID,
)
edge := Channel{
ChanID: chanID,
Capacity: edgeUpdate.Capacity,
Node: chanNode,
}
m.pilot.OnChannelOpen(edge)
}
// For each closed channel, we'll obtain
// the chanID of the closed channel and send it
// to the pilot.
for _, chanClose := range topChange.ClosedChannels {
chanID := lnwire.NewShortChanIDFromInt(
chanClose.ChanID,
)
m.pilot.OnChannelClose(chanID)
}
// If new nodes were added to the graph, or nod
// information has changed, we'll poke autopilot
// to see if it can make use of them.
if len(topChange.NodeUpdates) > 0 {
m.pilot.OnNodeUpdates()
}
case <-m.pilot.quit:
return
case <-m.quit:
return
}
}
}()
log.Debugf("Manager started autopilot agent")
return nil
}
// StopAgent stops any active autopilot agent.
func (m *Manager) StopAgent() error {
m.Lock()
defer m.Unlock()
// Not active, so we can return early.
if m.pilot == nil {
return nil
}
if err := m.pilot.Stop(); err != nil {
return err
}
// Make sure to nil the current agent, indicating it is no longer
// active.
m.pilot = nil
log.Debugf("Manager stopped autopilot agent")
return nil
}

@ -0,0 +1,113 @@
// +build autopilotrpc
package main
import (
"context"
"github.com/lightningnetwork/lnd/lnrpc/autopilotrpc"
"github.com/urfave/cli"
)
func getAutopilotClient(ctx *cli.Context) (autopilotrpc.AutopilotClient, func()) {
conn := getClientConn(ctx, false)
cleanUp := func() {
conn.Close()
}
return autopilotrpc.NewAutopilotClient(conn), cleanUp
}
var getStatusCommand = cli.Command{
Name: "status",
Usage: "Get the active status of autopilot.",
Description: "",
Action: actionDecorator(getStatus),
}
func getStatus(ctx *cli.Context) error {
ctxb := context.Background()
client, cleanUp := getAutopilotClient(ctx)
defer cleanUp()
req := &autopilotrpc.StatusRequest{}
resp, err := client.Status(ctxb, req)
if err != nil {
return err
}
printRespJSON(resp)
return nil
}
var enableCommand = cli.Command{
Name: "enable",
Usage: "Enable the autopilot.",
Description: "",
Action: actionDecorator(enable),
}
var disableCommand = cli.Command{
Name: "disable",
Usage: "Disable the active autopilot.",
Description: "",
Action: actionDecorator(disable),
}
func enable(ctx *cli.Context) error {
ctxb := context.Background()
client, cleanUp := getAutopilotClient(ctx)
defer cleanUp()
// We will enable the autopilot.
req := &autopilotrpc.ModifyStatusRequest{
Enable: true,
}
resp, err := client.ModifyStatus(ctxb, req)
if err != nil {
return err
}
printRespJSON(resp)
return nil
}
func disable(ctx *cli.Context) error {
ctxb := context.Background()
client, cleanUp := getAutopilotClient(ctx)
defer cleanUp()
// We will disable the autopilot.
req := &autopilotrpc.ModifyStatusRequest{
Enable: false,
}
resp, err := client.ModifyStatus(ctxb, req)
if err != nil {
return err
}
printRespJSON(resp)
return nil
}
// autopilotCommands will return the set of commands to enable for autopilotrpc
// builds.
func autopilotCommands() []cli.Command {
return []cli.Command{
{
Name: "autopilot",
Category: "Autopilot",
Usage: "Interact with a running autopilot.",
Description: "",
Subcommands: []cli.Command{
getStatusCommand,
enableCommand,
disableCommand,
},
},
}
}

@ -0,0 +1,10 @@
// +build !autopilotrpc
package main
import "github.com/urfave/cli"
// autopilotCommands will return nil for non-autopilotrpc builds.
func autopilotCommands() []cli.Command {
return nil
}

@ -293,6 +293,9 @@ func main() {
forwardingHistoryCommand, forwardingHistoryCommand,
} }
// Add any extra autopilot commands determined by build flags.
app.Commands = append(app.Commands, autopilotCommands()...)
if err := app.Run(os.Args); err != nil { if err := app.Run(os.Args); err != nil {
fatal(err) fatal(err)
} }

30
lnd.go

@ -38,6 +38,7 @@ import (
proxy "github.com/grpc-ecosystem/grpc-gateway/runtime" proxy "github.com/grpc-ecosystem/grpc-gateway/runtime"
flags "github.com/jessevdk/go-flags" flags "github.com/jessevdk/go-flags"
"github.com/lightningnetwork/lnd/autopilot"
"github.com/lightningnetwork/lnd/build" "github.com/lightningnetwork/lnd/build"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/keychain"
@ -312,11 +313,26 @@ func lndMain() error {
return err return err
} }
// Set up an auotpilot manager from the current config. This will be
// used to manage the underlying autopilot agent, starting and stopping
// it at will.
atplCfg := initAutoPilot(server, cfg.Autopilot)
atplManager, err := autopilot.NewManager(atplCfg)
if err != nil {
ltndLog.Errorf("unable to create autopilot manager: %v", err)
return err
}
if err := atplManager.Start(); err != nil {
ltndLog.Errorf("unable to start autopilot manager: %v", err)
return err
}
defer atplManager.Stop()
// Initialize, and register our implementation of the gRPC interface // Initialize, and register our implementation of the gRPC interface
// exported by the rpcServer. // exported by the rpcServer.
rpcServer, err := newRPCServer( rpcServer, err := newRPCServer(
server, macaroonService, cfg.SubRPCServers, serverOpts, server, macaroonService, cfg.SubRPCServers, serverOpts,
proxyOpts, tlsConf, proxyOpts, atplManager, tlsConf,
) )
if err != nil { if err != nil {
srvrLog.Errorf("unable to start RPC server: %v", err) srvrLog.Errorf("unable to start RPC server: %v", err)
@ -375,20 +391,14 @@ func lndMain() error {
defer server.Stop() defer server.Stop()
// Now that the server has started, if the autopilot mode is currently // Now that the server has started, if the autopilot mode is currently
// active, then we'll initialize a fresh instance of it and start it. // active, then we'll start the autopilot agent immediately. It will be
// stopped together with the autopilot service.
if cfg.Autopilot.Active { if cfg.Autopilot.Active {
pilot, err := initAutoPilot(server, cfg.Autopilot) if err := atplManager.StartAgent(); err != nil {
if err != nil {
ltndLog.Errorf("unable to create autopilot agent: %v",
err)
return err
}
if err := pilot.Start(); err != nil {
ltndLog.Errorf("unable to start autopilot agent: %v", ltndLog.Errorf("unable to start autopilot agent: %v",
err) err)
return err return err
} }
defer pilot.Stop()
} }
// Wait for shutdown signal from either a graceful server stop or from // Wait for shutdown signal from either a graceful server stop or from

@ -0,0 +1,226 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: autopilotrpc/autopilot.proto
/*
Package autopilotrpc is a generated protocol buffer package.
It is generated from these files:
autopilotrpc/autopilot.proto
It has these top-level messages:
StatusRequest
StatusResponse
ModifyStatusRequest
ModifyStatusResponse
*/
package autopilotrpc
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import (
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type StatusRequest struct {
}
func (m *StatusRequest) Reset() { *m = StatusRequest{} }
func (m *StatusRequest) String() string { return proto.CompactTextString(m) }
func (*StatusRequest) ProtoMessage() {}
func (*StatusRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
type StatusResponse struct {
// / Indicates whether the autopilot is active or not.
Active bool `protobuf:"varint,1,opt,name=active" json:"active,omitempty"`
}
func (m *StatusResponse) Reset() { *m = StatusResponse{} }
func (m *StatusResponse) String() string { return proto.CompactTextString(m) }
func (*StatusResponse) ProtoMessage() {}
func (*StatusResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (m *StatusResponse) GetActive() bool {
if m != nil {
return m.Active
}
return false
}
type ModifyStatusRequest struct {
// / Whether the autopilot agent should be enabled or not.
Enable bool `protobuf:"varint,1,opt,name=enable" json:"enable,omitempty"`
}
func (m *ModifyStatusRequest) Reset() { *m = ModifyStatusRequest{} }
func (m *ModifyStatusRequest) String() string { return proto.CompactTextString(m) }
func (*ModifyStatusRequest) ProtoMessage() {}
func (*ModifyStatusRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
func (m *ModifyStatusRequest) GetEnable() bool {
if m != nil {
return m.Enable
}
return false
}
type ModifyStatusResponse struct {
}
func (m *ModifyStatusResponse) Reset() { *m = ModifyStatusResponse{} }
func (m *ModifyStatusResponse) String() string { return proto.CompactTextString(m) }
func (*ModifyStatusResponse) ProtoMessage() {}
func (*ModifyStatusResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
func init() {
proto.RegisterType((*StatusRequest)(nil), "autopilotrpc.StatusRequest")
proto.RegisterType((*StatusResponse)(nil), "autopilotrpc.StatusResponse")
proto.RegisterType((*ModifyStatusRequest)(nil), "autopilotrpc.ModifyStatusRequest")
proto.RegisterType((*ModifyStatusResponse)(nil), "autopilotrpc.ModifyStatusResponse")
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// Client API for Autopilot service
type AutopilotClient interface {
// *
// Status returns whether the daemon's autopilot agent is active.
Status(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*StatusResponse, error)
// *
// ModifyStatus is used to modify the status of the autopilot agent, like
// enabling or disabling it.
ModifyStatus(ctx context.Context, in *ModifyStatusRequest, opts ...grpc.CallOption) (*ModifyStatusResponse, error)
}
type autopilotClient struct {
cc *grpc.ClientConn
}
func NewAutopilotClient(cc *grpc.ClientConn) AutopilotClient {
return &autopilotClient{cc}
}
func (c *autopilotClient) Status(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*StatusResponse, error) {
out := new(StatusResponse)
err := grpc.Invoke(ctx, "/autopilotrpc.Autopilot/Status", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *autopilotClient) ModifyStatus(ctx context.Context, in *ModifyStatusRequest, opts ...grpc.CallOption) (*ModifyStatusResponse, error) {
out := new(ModifyStatusResponse)
err := grpc.Invoke(ctx, "/autopilotrpc.Autopilot/ModifyStatus", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for Autopilot service
type AutopilotServer interface {
// *
// Status returns whether the daemon's autopilot agent is active.
Status(context.Context, *StatusRequest) (*StatusResponse, error)
// *
// ModifyStatus is used to modify the status of the autopilot agent, like
// enabling or disabling it.
ModifyStatus(context.Context, *ModifyStatusRequest) (*ModifyStatusResponse, error)
}
func RegisterAutopilotServer(s *grpc.Server, srv AutopilotServer) {
s.RegisterService(&_Autopilot_serviceDesc, srv)
}
func _Autopilot_Status_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(StatusRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(AutopilotServer).Status(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/autopilotrpc.Autopilot/Status",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(AutopilotServer).Status(ctx, req.(*StatusRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Autopilot_ModifyStatus_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ModifyStatusRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(AutopilotServer).ModifyStatus(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/autopilotrpc.Autopilot/ModifyStatus",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(AutopilotServer).ModifyStatus(ctx, req.(*ModifyStatusRequest))
}
return interceptor(ctx, in, info, handler)
}
var _Autopilot_serviceDesc = grpc.ServiceDesc{
ServiceName: "autopilotrpc.Autopilot",
HandlerType: (*AutopilotServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Status",
Handler: _Autopilot_Status_Handler,
},
{
MethodName: "ModifyStatus",
Handler: _Autopilot_ModifyStatus_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "autopilotrpc/autopilot.proto",
}
func init() { proto.RegisterFile("autopilotrpc/autopilot.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 183 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x49, 0x2c, 0x2d, 0xc9,
0x2f, 0xc8, 0xcc, 0xc9, 0x2f, 0x29, 0x2a, 0x48, 0xd6, 0x87, 0x73, 0xf4, 0x0a, 0x8a, 0xf2, 0x4b,
0xf2, 0x85, 0x78, 0x90, 0x65, 0x95, 0xf8, 0xb9, 0x78, 0x83, 0x4b, 0x12, 0x4b, 0x4a, 0x8b, 0x83,
0x52, 0x0b, 0x4b, 0x53, 0x8b, 0x4b, 0x94, 0x34, 0xb8, 0xf8, 0x60, 0x02, 0xc5, 0x05, 0xf9, 0x79,
0xc5, 0xa9, 0x42, 0x62, 0x5c, 0x6c, 0x89, 0xc9, 0x25, 0x99, 0x65, 0xa9, 0x12, 0x8c, 0x0a, 0x8c,
0x1a, 0x1c, 0x41, 0x50, 0x9e, 0x92, 0x2e, 0x97, 0xb0, 0x6f, 0x7e, 0x4a, 0x66, 0x5a, 0x25, 0x8a,
0x01, 0x20, 0xe5, 0xa9, 0x79, 0x89, 0x49, 0x39, 0x70, 0xe5, 0x10, 0x9e, 0x92, 0x18, 0x97, 0x08,
0xaa, 0x72, 0x88, 0xf1, 0x46, 0xcb, 0x19, 0xb9, 0x38, 0x1d, 0x61, 0x4e, 0x12, 0x72, 0xe6, 0x62,
0x83, 0xc8, 0x0b, 0x49, 0xeb, 0x21, 0x3b, 0x54, 0x0f, 0xc5, 0x12, 0x29, 0x19, 0xec, 0x92, 0x50,
0x17, 0x87, 0x72, 0xf1, 0x20, 0x5b, 0x25, 0xa4, 0x88, 0xaa, 0x1a, 0x8b, 0xab, 0xa5, 0x94, 0xf0,
0x29, 0x81, 0x18, 0x9b, 0xc4, 0x06, 0x0e, 0x40, 0x63, 0x40, 0x00, 0x00, 0x00, 0xff, 0xff, 0xf8,
0x0f, 0x84, 0xb8, 0x60, 0x01, 0x00, 0x00,
}

@ -0,0 +1,34 @@
syntax = "proto3";
package autopilotrpc;
// Autopilot is a service that can be used to get information about the current
// state of the daemon's autopilot agent, and also supply it with information
// that can be used when deciding where to open channels.
service Autopilot {
/**
Status returns whether the daemon's autopilot agent is active.
*/
rpc Status(StatusRequest) returns (StatusResponse);
/**
ModifyStatus is used to modify the status of the autopilot agent, like
enabling or disabling it.
*/
rpc ModifyStatus(ModifyStatusRequest) returns (ModifyStatusResponse);
}
message StatusRequest{
}
message StatusResponse{
/// Indicates whether the autopilot is active or not.
bool active = 1 [json_name = "active"];
}
message ModifyStatusRequest{
/// Whether the autopilot agent should be enabled or not.
bool enable = 1 [json_name = "enable"];
}
message ModifyStatusResponse {}

@ -0,0 +1,156 @@
// +build autopilotrpc
package autopilotrpc
import (
"context"
"os"
"sync/atomic"
"github.com/lightningnetwork/lnd/autopilot"
"github.com/lightningnetwork/lnd/lnrpc"
"google.golang.org/grpc"
"gopkg.in/macaroon-bakery.v2/bakery"
)
const (
// subServerName is the name of the sub rpc server. We'll use this name
// to register ourselves, and we also require that the main
// SubServerConfigDispatcher instance recognize tt as the name of our
// RPC service.
subServerName = "AutopilotRPC"
)
var (
// macPermissions maps RPC calls to the permissions they require.
macPermissions = map[string][]bakery.Op{
"/autopilotrpc.Autopilot/Status": {{
Entity: "info",
Action: "read",
}},
"/autopilotrpc.Autopilot/ModifyStatus": {{
Entity: "onchain",
Action: "write",
}, {
Entity: "offchain",
Action: "write",
}},
}
)
// Server is a sub-server of the main RPC server: the autopilot RPC. This sub
// RPC server allows external callers to access the status of the autopilot
// currently active within lnd, as well as configuring it at runtime.
type Server struct {
started int32 // To be used atomically.
shutdown int32 // To be used atomically.
cfg *Config
manager *autopilot.Manager
}
// A compile time check to ensure that Server fully implements the
// AutopilotServer gRPC service.
var _ AutopilotServer = (*Server)(nil)
// fileExists reports whether the named file or directory exists.
func fileExists(name string) bool {
if _, err := os.Stat(name); err != nil {
if os.IsNotExist(err) {
return false
}
}
return true
}
// New returns a new instance of the autopilotrpc Autopilot sub-server. We also
// return the set of permissions for the macaroons that we may create within
// this method. If the macaroons we need aren't found in the filepath, then
// we'll create them on start up. If we're unable to locate, or create the
// macaroons we need, then we'll return with an error.
func New(cfg *Config) (*Server, lnrpc.MacaroonPerms, error) {
// We don't create any new macaroons for this subserver, instead reuse
// existing onchain/offchain permissions.
server := &Server{
cfg: cfg,
manager: cfg.Manager,
}
return server, macPermissions, nil
}
// Start launches any helper goroutines required for the Server to function.
//
// NOTE: This is part of the lnrpc.SubServer interface.
func (s *Server) Start() error {
if atomic.AddInt32(&s.started, 1) != 1 {
return nil
}
return s.manager.Start()
}
// Stop signals any active goroutines for a graceful closure.
//
// NOTE: This is part of the lnrpc.SubServer interface.
func (s *Server) Stop() error {
if atomic.AddInt32(&s.shutdown, 1) != 1 {
return nil
}
return s.manager.Stop()
}
// Name returns a unique string representation of the sub-server. This can be
// used to identify the sub-server and also de-duplicate them.
//
// NOTE: This is part of the lnrpc.SubServer interface.
func (s *Server) Name() string {
return subServerName
}
// RegisterWithRootServer will be called by the root gRPC server to direct a
// sub RPC server to register itself with the main gRPC root server. Until this
// is called, each sub-server won't be able to have
// requests routed towards it.
//
// NOTE: This is part of the lnrpc.SubServer interface.
func (s *Server) RegisterWithRootServer(grpcServer *grpc.Server) error {
// We make sure that we register it with the main gRPC server to ensure
// all our methods are routed properly.
RegisterAutopilotServer(grpcServer, s)
log.Debugf("Autopilot RPC server successfully register with root " +
"gRPC server")
return nil
}
// Status returns the current status of the autopilot agent.
//
// NOTE: Part of the AutopilotServer interface.
func (s *Server) Status(ctx context.Context,
in *StatusRequest) (*StatusResponse, error) {
return &StatusResponse{
Active: s.manager.IsActive(),
}, nil
}
// ModifyStatus activates the current autopilot agent, if active.
//
// NOTE: Part of the AutopilotServer interface.
func (s *Server) ModifyStatus(ctx context.Context,
in *ModifyStatusRequest) (*ModifyStatusResponse, error) {
log.Debugf("Setting agent enabled=%v", in.Enable)
var err error
if in.Enable {
err = s.manager.StartAgent()
} else {
err = s.manager.StopAgent()
}
return &ModifyStatusResponse{}, err
}

@ -0,0 +1,17 @@
// +build autopilotrpc
package autopilotrpc
import (
"github.com/lightningnetwork/lnd/autopilot"
)
// Config is the primary configuration struct for the autopilot RPC server. It
// contains all the items required for the rpc server to carry out its
// duties. The fields with struct tags are meant to be parsed as normal
// configuration options, while if able to be populated, the latter fields MUST
// also be specified.
type Config struct {
// Manager is the running autopilot manager.
Manager *autopilot.Manager
}

@ -0,0 +1,6 @@
// +build !autopilotrpc
package autopilotrpc
// Config is empty for non-autopilotrpc builds.
type Config struct{}

@ -0,0 +1,63 @@
// +build autopilotrpc
package autopilotrpc
import (
"fmt"
"github.com/lightningnetwork/lnd/lnrpc"
)
// createNewSubServer is a helper method that will create the new sub server
// given the main config dispatcher method. If we're unable to find the config
// that is meant for us in the config dispatcher, then we'll exit with an
// error.
func createNewSubServer(configRegistry lnrpc.SubServerConfigDispatcher) (
lnrpc.SubServer, lnrpc.MacaroonPerms, error) {
// We'll attempt to look up the config that we expect, according to our
// subServerName name. If we can't find this, then we'll exit with an
// error, as we're unable to properly initialize ourselves without this
// config.
subServerConf, ok := configRegistry.FetchConfig(subServerName)
if !ok {
return nil, nil, fmt.Errorf("unable to find config for "+
"subserver type %s", subServerName)
}
// Now that we've found an object mapping to our service name, we'll
// ensure that it's the type we need.
config, ok := subServerConf.(*Config)
if !ok {
return nil, nil, fmt.Errorf("wrong type of config for "+
"subserver %s, expected %T got %T", subServerName,
&Config{}, subServerConf)
}
// Before we try to make the new service instance, we'll perform
// some sanity checks on the arguments to ensure taht they're useable.
switch {
case config.Manager == nil:
return nil, nil, fmt.Errorf("Manager must be set to create " +
"Autopilotrpc")
}
return New(config)
}
func init() {
subServer := &lnrpc.SubServerDriver{
SubServerName: subServerName,
New: func(c lnrpc.SubServerConfigDispatcher) (lnrpc.SubServer,
lnrpc.MacaroonPerms, error) {
return createNewSubServer(c)
},
}
// If the build tag is active, then we'll register ourselves as a
// sub-RPC server within the global lnrpc package namespace.
if err := lnrpc.RegisterSubServer(subServer); err != nil {
panic(fmt.Sprintf("failed to register sub server driver "+
"'%s': %v", subServerName, err))
}
}

45
lnrpc/autopilotrpc/log.go Normal file

@ -0,0 +1,45 @@
package autopilotrpc
import (
"github.com/btcsuite/btclog"
"github.com/lightningnetwork/lnd/build"
)
// log is a logger that is initialized with no output filters. This means the
// package will not perform any logging by default until the caller requests
// it.
var log btclog.Logger
// The default amount of logging is none.
func init() {
UseLogger(build.NewSubLogger("ARPC", nil))
}
// DisableLog disables all library log output. Logging output is disabled by
// by default until UseLogger is called.
func DisableLog() {
UseLogger(btclog.Disabled)
}
// UseLogger uses a specified Logger to output package logging info. This
// should be used in preference to SetLogWriter if the caller is also using
// btclog.
func UseLogger(logger btclog.Logger) {
log = logger
}
// logClosure is used to provide a closure over expensive logging operations so
// don't have to be performed when the logging level doesn't warrant it.
type logClosure func() string
// String invokes the underlying function and returns the result.
func (c logClosure) String() string {
return c()
}
// newLogClosure returns a new closure over a function that returns a string
// which itself provides a Stringer interface so that it can be used with the
// logging system.
func newLogClosure(c func() string) logClosure {
return logClosure(c)
}

4
log.go

@ -19,6 +19,7 @@ import (
"github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/discovery" "github.com/lightningnetwork/lnd/discovery"
"github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lnrpc/autopilotrpc"
"github.com/lightningnetwork/lnd/lnrpc/signrpc" "github.com/lightningnetwork/lnd/lnrpc/signrpc"
"github.com/lightningnetwork/lnd/lnrpc/walletrpc" "github.com/lightningnetwork/lnd/lnrpc/walletrpc"
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet"
@ -69,6 +70,7 @@ var (
swprLog = build.NewSubLogger("SWPR", backendLog.Logger) swprLog = build.NewSubLogger("SWPR", backendLog.Logger)
sgnrLog = build.NewSubLogger("SGNR", backendLog.Logger) sgnrLog = build.NewSubLogger("SGNR", backendLog.Logger)
wlktLog = build.NewSubLogger("WLKT", backendLog.Logger) wlktLog = build.NewSubLogger("WLKT", backendLog.Logger)
arpcLog = build.NewSubLogger("ARPC", backendLog.Logger)
) )
// Initialize package-global logger variables. // Initialize package-global logger variables.
@ -88,6 +90,7 @@ func init() {
sweep.UseLogger(swprLog) sweep.UseLogger(swprLog)
signrpc.UseLogger(sgnrLog) signrpc.UseLogger(sgnrLog)
walletrpc.UseLogger(wlktLog) walletrpc.UseLogger(wlktLog)
autopilotrpc.UseLogger(arpcLog)
} }
// subsystemLoggers maps each subsystem identifier to its associated logger. // subsystemLoggers maps each subsystem identifier to its associated logger.
@ -113,6 +116,7 @@ var subsystemLoggers = map[string]btclog.Logger{
"SWPR": swprLog, "SWPR": swprLog,
"SGNR": sgnrLog, "SGNR": sgnrLog,
"WLKT": wlktLog, "WLKT": wlktLog,
"ARPC": arpcLog,
} }
// initLogRotator initializes the logging rotator to write logs to logFile and // initLogRotator initializes the logging rotator to write logs to logFile and

171
pilot.go

@ -79,10 +79,11 @@ func (c *chanController) SpliceOut(chanPoint *wire.OutPoint,
// autopilot.ChannelController interface. // autopilot.ChannelController interface.
var _ autopilot.ChannelController = (*chanController)(nil) var _ autopilot.ChannelController = (*chanController)(nil)
// initAutoPilot initializes a new autopilot.Agent instance based on the passed // initAutoPilot initializes a new autopilot.ManagerCfg to manage an
// configuration struct. All interfaces needed to drive the pilot will be // autopilot.Agent instance based on the passed configuration struct. The agent
// registered and launched. // and all interfaces needed to drive it won't be launched before the Manager's
func initAutoPilot(svr *server, cfg *autoPilotConfig) (*autopilot.Agent, error) { // StartAgent method is called.
func initAutoPilot(svr *server, cfg *autoPilotConfig) *autopilot.ManagerCfg {
atplLog.Infof("Instantiating autopilot with cfg: %v", spew.Sdump(cfg)) atplLog.Infof("Instantiating autopilot with cfg: %v", spew.Sdump(cfg))
// Set up the constraints the autopilot heuristics must adhere to. // Set up the constraints the autopilot heuristics must adhere to.
@ -176,143 +177,33 @@ func initAutoPilot(svr *server, cfg *autoPilotConfig) (*autopilot.Agent, error)
DisconnectPeer: svr.DisconnectPeer, DisconnectPeer: svr.DisconnectPeer,
} }
// Next, we'll fetch the current state of open channels from the // Create and return the autopilot.ManagerCfg that administrates this
// database to use as initial state for the auto-pilot agent. // agent-pilot instance.
activeChannels, err := svr.chanDB.FetchAllChannels() return &autopilot.ManagerCfg{
if err != nil { Self: self,
return nil, err PilotCfg: &pilotCfg,
} ChannelState: func() ([]autopilot.Channel, error) {
initialChanState := make([]autopilot.Channel, len(activeChannels)) // We'll fetch the current state of open
for i, channel := range activeChannels { // channels from the database to use as initial
initialChanState[i] = autopilot.Channel{ // state for the auto-pilot agent.
ChanID: channel.ShortChanID(), activeChannels, err := svr.chanDB.FetchAllChannels()
Capacity: channel.Capacity, if err != nil {
Node: autopilot.NewNodeID(channel.IdentityPub), return nil, err
}
}
// Now that we have all the initial dependencies, we can create the
// auto-pilot instance itself.
pilot, err := autopilot.New(pilotCfg, initialChanState)
if err != nil {
return nil, err
}
// Finally, we'll need to subscribe to two things: incoming
// transactions that modify the wallet's balance, and also any graph
// topology updates.
txnSubscription, err := svr.cc.wallet.SubscribeTransactions()
if err != nil {
return nil, err
}
graphSubscription, err := svr.chanRouter.SubscribeTopology()
if err != nil {
return nil, err
}
// We'll launch a goroutine to provide the agent with notifications
// whenever the balance of the wallet changes.
svr.wg.Add(2)
go func() {
defer txnSubscription.Cancel()
defer svr.wg.Done()
for {
select {
case <-txnSubscription.ConfirmedTransactions():
pilot.OnBalanceChange()
case <-svr.quit:
return
} }
} chanState := make([]autopilot.Channel,
len(activeChannels))
}() for i, channel := range activeChannels {
go func() { chanState[i] = autopilot.Channel{
defer svr.wg.Done() ChanID: channel.ShortChanID(),
Capacity: channel.Capacity,
for { Node: autopilot.NewNodeID(
select { channel.IdentityPub),
// We won't act upon new unconfirmed transaction, as }
// we'll only use confirmed outputs when funding.
// However, we will still drain this request in order
// to avoid goroutine leaks, and ensure we promptly
// read from the channel if available.
case <-txnSubscription.UnconfirmedTransactions():
case <-svr.quit:
return
} }
}
}() return chanState, nil
},
// We'll also launch a goroutine to provide the agent with SubscribeTransactions: svr.cc.wallet.SubscribeTransactions,
// notifications for when the graph topology controlled by the node SubscribeTopology: svr.chanRouter.SubscribeTopology,
// changes. }
svr.wg.Add(1)
go func() {
defer graphSubscription.Cancel()
defer svr.wg.Done()
for {
select {
case topChange, ok := <-graphSubscription.TopologyChanges:
// If the router is shutting down, then we will
// as well.
if !ok {
return
}
for _, edgeUpdate := range topChange.ChannelEdgeUpdates {
// If this isn't an advertisement by
// the backing lnd node, then we'll
// continue as we only want to add
// channels that we've created
// ourselves.
if !edgeUpdate.AdvertisingNode.IsEqual(self) {
continue
}
// If this is indeed a channel we
// opened, then we'll convert it to the
// autopilot.Channel format, and notify
// the pilot of the new channel.
chanNode := autopilot.NewNodeID(
edgeUpdate.ConnectingNode,
)
chanID := lnwire.NewShortChanIDFromInt(
edgeUpdate.ChanID,
)
edge := autopilot.Channel{
ChanID: chanID,
Capacity: edgeUpdate.Capacity,
Node: chanNode,
}
pilot.OnChannelOpen(edge)
}
// For each closed channel, we'll obtain
// the chanID of the closed channel and send it
// to the pilot.
for _, chanClose := range topChange.ClosedChannels {
chanID := lnwire.NewShortChanIDFromInt(
chanClose.ChanID,
)
pilot.OnChannelClose(chanID)
}
// If new nodes were added to the graph, or nod
// information has changed, we'll poke autopilot
// to see if it can make use of them.
if len(topChange.NodeUpdates) > 0 {
pilot.OnNodeUpdates()
}
case <-svr.quit:
return
}
}
}()
return pilot, nil
} }

@ -27,6 +27,7 @@ import (
"github.com/coreos/bbolt" "github.com/coreos/bbolt"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
proxy "github.com/grpc-ecosystem/grpc-gateway/runtime" proxy "github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/lightningnetwork/lnd/autopilot"
"github.com/lightningnetwork/lnd/build" "github.com/lightningnetwork/lnd/build"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/htlcswitch"
@ -392,7 +393,7 @@ var _ lnrpc.LightningServer = (*rpcServer)(nil)
// like requiring TLS, etc. // like requiring TLS, etc.
func newRPCServer(s *server, macService *macaroons.Service, func newRPCServer(s *server, macService *macaroons.Service,
subServerCgs *subRPCServerConfigs, serverOpts []grpc.ServerOption, subServerCgs *subRPCServerConfigs, serverOpts []grpc.ServerOption,
restServerOpts []grpc.DialOption, restServerOpts []grpc.DialOption, atpl *autopilot.Manager,
tlsCfg *tls.Config) (*rpcServer, error) { tlsCfg *tls.Config) (*rpcServer, error) {
var ( var (
@ -404,7 +405,7 @@ func newRPCServer(s *server, macService *macaroons.Service,
// the dependencies they need are properly populated within each sub // the dependencies they need are properly populated within each sub
// server configuration struct. // server configuration struct.
err := subServerCgs.PopulateDependencies( err := subServerCgs.PopulateDependencies(
s.cc, networkDir, macService, s.cc, networkDir, macService, atpl,
) )
if err != nil { if err != nil {
return nil, err return nil, err

@ -4,6 +4,8 @@ import (
"fmt" "fmt"
"reflect" "reflect"
"github.com/lightningnetwork/lnd/autopilot"
"github.com/lightningnetwork/lnd/lnrpc/autopilotrpc"
"github.com/lightningnetwork/lnd/lnrpc/signrpc" "github.com/lightningnetwork/lnd/lnrpc/signrpc"
"github.com/lightningnetwork/lnd/lnrpc/walletrpc" "github.com/lightningnetwork/lnd/lnrpc/walletrpc"
"github.com/lightningnetwork/lnd/macaroons" "github.com/lightningnetwork/lnd/macaroons"
@ -25,6 +27,10 @@ type subRPCServerConfigs struct {
// also requests keys and addresses under control of the backing // also requests keys and addresses under control of the backing
// wallet. // wallet.
WalletKitRPC *walletrpc.Config `group:"walletrpc" namespace:"walletrpc"` WalletKitRPC *walletrpc.Config `group:"walletrpc" namespace:"walletrpc"`
// AutopilotRPC is a sub-RPC server that exposes methods on the running
// autopilot as a gRPC service.
AutopilotRPC *autopilotrpc.Config `group:"autopilotrpc" namespace:"autopilotrpc"`
} }
// PopulateDependencies attempts to iterate through all the sub-server configs // PopulateDependencies attempts to iterate through all the sub-server configs
@ -34,7 +40,8 @@ type subRPCServerConfigs struct {
// NOTE: This MUST be called before any callers are permitted to execute the // NOTE: This MUST be called before any callers are permitted to execute the
// FetchConfig method. // FetchConfig method.
func (s *subRPCServerConfigs) PopulateDependencies(cc *chainControl, func (s *subRPCServerConfigs) PopulateDependencies(cc *chainControl,
networkDir string, macService *macaroons.Service) error { networkDir string, macService *macaroons.Service,
atpl *autopilot.Manager) error {
// First, we'll use reflect to obtain a version of the config struct // First, we'll use reflect to obtain a version of the config struct
// that allows us to programmatically inspect its fields. // that allows us to programmatically inspect its fields.
@ -92,6 +99,13 @@ func (s *subRPCServerConfigs) PopulateDependencies(cc *chainControl,
reflect.ValueOf(cc.keyRing), reflect.ValueOf(cc.keyRing),
) )
case *autopilotrpc.Config:
subCfgValue := extractReflectValue(cfg)
subCfgValue.FieldByName("Manager").Set(
reflect.ValueOf(atpl),
)
default: default:
return fmt.Errorf("unknown field: %v, %T", fieldName, return fmt.Errorf("unknown field: %v, %T", fieldName,
cfg) cfg)