diff --git a/.gitignore b/.gitignore index 0ed921b1..d7f19c41 100644 --- a/.gitignore +++ b/.gitignore @@ -24,10 +24,10 @@ _testmain.go *.test *.prof -lnd -lnd-debug -lncli -lncli-debug +/lnd +/lnd-debug +/lncli +/lncli-debug # Integration test log files output*.log @@ -54,6 +54,4 @@ profile.tmp .DS_Store -main* - .vscode diff --git a/autopilot/manager.go b/autopilot/manager.go new file mode 100644 index 00000000..33182e98 --- /dev/null +++ b/autopilot/manager.go @@ -0,0 +1,267 @@ +package autopilot + +import ( + "sync" + "sync/atomic" + + "github.com/btcsuite/btcd/btcec" + "github.com/lightningnetwork/lnd/lnwallet" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/routing" +) + +// ManagerCfg houses a set of values and methods that is passed to the Manager +// for it to properly manage its autopilot agent. +type ManagerCfg struct { + // Self is the public key of the lnd instance. It is used to making + // sure the autopilot is not opening channels to itself. + Self *btcec.PublicKey + + // PilotCfg is the config of the autopilot agent managed by the + // Manager. + PilotCfg *Config + + // ChannelState is a function closure that returns the current set of + // channels managed by this node. + ChannelState func() ([]Channel, error) + + // SubscribeTransactions is used to get a subscription for transactions + // relevant to this node's wallet. + SubscribeTransactions func() (lnwallet.TransactionSubscription, error) + + // SubscribeTopology is used to get a subscription for topology changes + // on the network. + SubscribeTopology func() (*routing.TopologyClient, error) +} + +// Manager is struct that manages an autopilot agent, making it possible to +// enable and disable it at will, and hand it relevant external information. +// It implements the autopilot grpc service, which is used to get data about +// the running autopilot, and give it relevant information. +type Manager struct { + started uint32 // To be used atomically. + stopped uint32 // To be used atomically. + + cfg *ManagerCfg + + // pilot is the current autopilot agent. It will be nil if the agent is + // disabled. + pilot *Agent + + quit chan struct{} + wg sync.WaitGroup + sync.Mutex +} + +// NewManager creates a new instance of the Manager from the passed config. +func NewManager(cfg *ManagerCfg) (*Manager, error) { + return &Manager{ + cfg: cfg, + quit: make(chan struct{}), + }, nil +} + +// Start starts the Manager. +func (m *Manager) Start() error { + if !atomic.CompareAndSwapUint32(&m.started, 0, 1) { + return nil + } + + return nil +} + +// Stop stops the Manager. If an autopilot agent is active, it will also be +// stopped. +func (m *Manager) Stop() error { + if !atomic.CompareAndSwapUint32(&m.stopped, 0, 1) { + return nil + } + + if err := m.StopAgent(); err != nil { + log.Errorf("Unable to stop pilot: %v", err) + } + + close(m.quit) + m.wg.Wait() + + return nil +} + +// IsActive returns whether the autopilot agent is currently active. +func (m *Manager) IsActive() bool { + m.Lock() + defer m.Unlock() + + return m.pilot != nil +} + +// StartAgent creates and starts an autopilot agent from the Manager's +// config. +func (m *Manager) StartAgent() error { + m.Lock() + defer m.Unlock() + + // Already active. + if m.pilot != nil { + return nil + } + + // Next, we'll fetch the current state of open channels from the + // database to use as initial state for the auto-pilot agent. + initialChanState, err := m.cfg.ChannelState() + if err != nil { + return err + } + + // Now that we have all the initial dependencies, we can create the + // auto-pilot instance itself. + m.pilot, err = New(*m.cfg.PilotCfg, initialChanState) + if err != nil { + return err + } + + if err := m.pilot.Start(); err != nil { + return err + } + + // Finally, we'll need to subscribe to two things: incoming + // transactions that modify the wallet's balance, and also any graph + // topology updates. + txnSubscription, err := m.cfg.SubscribeTransactions() + if err != nil { + defer m.pilot.Stop() + return err + } + graphSubscription, err := m.cfg.SubscribeTopology() + if err != nil { + defer m.pilot.Stop() + defer txnSubscription.Cancel() + return err + } + + // We'll launch a goroutine to provide the agent with notifications + // whenever the balance of the wallet changes. + // TODO(halseth): can lead to panic if in process of shutting down. + m.wg.Add(1) + go func() { + defer txnSubscription.Cancel() + defer m.wg.Done() + + for { + select { + case <-txnSubscription.ConfirmedTransactions(): + m.pilot.OnBalanceChange() + + // We won't act upon new unconfirmed transaction, as + // we'll only use confirmed outputs when funding. + // However, we will still drain this request in order + // to avoid goroutine leaks, and ensure we promptly + // read from the channel if available. + case <-txnSubscription.UnconfirmedTransactions(): + case <-m.pilot.quit: + return + case <-m.quit: + return + } + } + + }() + + // We'll also launch a goroutine to provide the agent with + // notifications for when the graph topology controlled by the node + // changes. + m.wg.Add(1) + go func() { + defer graphSubscription.Cancel() + defer m.wg.Done() + + for { + select { + case topChange, ok := <-graphSubscription.TopologyChanges: + // If the router is shutting down, then we will + // as well. + if !ok { + return + } + + for _, edgeUpdate := range topChange.ChannelEdgeUpdates { + // If this isn't an advertisement by + // the backing lnd node, then we'll + // continue as we only want to add + // channels that we've created + // ourselves. + if !edgeUpdate.AdvertisingNode.IsEqual(m.cfg.Self) { + continue + } + + // If this is indeed a channel we + // opened, then we'll convert it to the + // autopilot.Channel format, and notify + // the pilot of the new channel. + chanNode := NewNodeID( + edgeUpdate.ConnectingNode, + ) + chanID := lnwire.NewShortChanIDFromInt( + edgeUpdate.ChanID, + ) + edge := Channel{ + ChanID: chanID, + Capacity: edgeUpdate.Capacity, + Node: chanNode, + } + m.pilot.OnChannelOpen(edge) + } + + // For each closed channel, we'll obtain + // the chanID of the closed channel and send it + // to the pilot. + for _, chanClose := range topChange.ClosedChannels { + chanID := lnwire.NewShortChanIDFromInt( + chanClose.ChanID, + ) + + m.pilot.OnChannelClose(chanID) + } + + // If new nodes were added to the graph, or nod + // information has changed, we'll poke autopilot + // to see if it can make use of them. + if len(topChange.NodeUpdates) > 0 { + m.pilot.OnNodeUpdates() + } + + case <-m.pilot.quit: + return + case <-m.quit: + return + } + } + }() + + log.Debugf("Manager started autopilot agent") + + return nil +} + +// StopAgent stops any active autopilot agent. +func (m *Manager) StopAgent() error { + m.Lock() + defer m.Unlock() + + // Not active, so we can return early. + if m.pilot == nil { + return nil + } + + if err := m.pilot.Stop(); err != nil { + return err + } + + // Make sure to nil the current agent, indicating it is no longer + // active. + m.pilot = nil + + log.Debugf("Manager stopped autopilot agent") + + return nil +} diff --git a/cmd/lncli/autopilotrpc_active.go b/cmd/lncli/autopilotrpc_active.go new file mode 100644 index 00000000..542fb60c --- /dev/null +++ b/cmd/lncli/autopilotrpc_active.go @@ -0,0 +1,113 @@ +// +build autopilotrpc + +package main + +import ( + "context" + + "github.com/lightningnetwork/lnd/lnrpc/autopilotrpc" + "github.com/urfave/cli" +) + +func getAutopilotClient(ctx *cli.Context) (autopilotrpc.AutopilotClient, func()) { + conn := getClientConn(ctx, false) + + cleanUp := func() { + conn.Close() + } + + return autopilotrpc.NewAutopilotClient(conn), cleanUp +} + +var getStatusCommand = cli.Command{ + Name: "status", + Usage: "Get the active status of autopilot.", + Description: "", + Action: actionDecorator(getStatus), +} + +func getStatus(ctx *cli.Context) error { + ctxb := context.Background() + client, cleanUp := getAutopilotClient(ctx) + defer cleanUp() + + req := &autopilotrpc.StatusRequest{} + + resp, err := client.Status(ctxb, req) + if err != nil { + return err + } + + printRespJSON(resp) + return nil +} + +var enableCommand = cli.Command{ + Name: "enable", + Usage: "Enable the autopilot.", + Description: "", + Action: actionDecorator(enable), +} + +var disableCommand = cli.Command{ + Name: "disable", + Usage: "Disable the active autopilot.", + Description: "", + Action: actionDecorator(disable), +} + +func enable(ctx *cli.Context) error { + ctxb := context.Background() + client, cleanUp := getAutopilotClient(ctx) + defer cleanUp() + + // We will enable the autopilot. + req := &autopilotrpc.ModifyStatusRequest{ + Enable: true, + } + + resp, err := client.ModifyStatus(ctxb, req) + if err != nil { + return err + } + + printRespJSON(resp) + return nil +} + +func disable(ctx *cli.Context) error { + ctxb := context.Background() + client, cleanUp := getAutopilotClient(ctx) + defer cleanUp() + + // We will disable the autopilot. + req := &autopilotrpc.ModifyStatusRequest{ + Enable: false, + } + + resp, err := client.ModifyStatus(ctxb, req) + if err != nil { + return err + } + + printRespJSON(resp) + return nil +} + +// autopilotCommands will return the set of commands to enable for autopilotrpc +// builds. +func autopilotCommands() []cli.Command { + return []cli.Command{ + { + Name: "autopilot", + Category: "Autopilot", + Usage: "Interact with a running autopilot.", + Description: "", + Subcommands: []cli.Command{ + getStatusCommand, + enableCommand, + disableCommand, + }, + }, + } +} diff --git a/cmd/lncli/autopilotrpc_default.go b/cmd/lncli/autopilotrpc_default.go new file mode 100644 index 00000000..49061254 --- /dev/null +++ b/cmd/lncli/autopilotrpc_default.go @@ -0,0 +1,10 @@ +// +build !autopilotrpc + +package main + +import "github.com/urfave/cli" + +// autopilotCommands will return nil for non-autopilotrpc builds. +func autopilotCommands() []cli.Command { + return nil +} diff --git a/cmd/lncli/main.go b/cmd/lncli/main.go index b9c1c043..d5f994c1 100644 --- a/cmd/lncli/main.go +++ b/cmd/lncli/main.go @@ -293,6 +293,9 @@ func main() { forwardingHistoryCommand, } + // Add any extra autopilot commands determined by build flags. + app.Commands = append(app.Commands, autopilotCommands()...) + if err := app.Run(os.Args); err != nil { fatal(err) } diff --git a/lnd.go b/lnd.go index b9f0a59a..f6219811 100644 --- a/lnd.go +++ b/lnd.go @@ -38,6 +38,7 @@ import ( proxy "github.com/grpc-ecosystem/grpc-gateway/runtime" flags "github.com/jessevdk/go-flags" + "github.com/lightningnetwork/lnd/autopilot" "github.com/lightningnetwork/lnd/build" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/keychain" @@ -312,11 +313,26 @@ func lndMain() error { return err } + // Set up an auotpilot manager from the current config. This will be + // used to manage the underlying autopilot agent, starting and stopping + // it at will. + atplCfg := initAutoPilot(server, cfg.Autopilot) + atplManager, err := autopilot.NewManager(atplCfg) + if err != nil { + ltndLog.Errorf("unable to create autopilot manager: %v", err) + return err + } + if err := atplManager.Start(); err != nil { + ltndLog.Errorf("unable to start autopilot manager: %v", err) + return err + } + defer atplManager.Stop() + // Initialize, and register our implementation of the gRPC interface // exported by the rpcServer. rpcServer, err := newRPCServer( server, macaroonService, cfg.SubRPCServers, serverOpts, - proxyOpts, tlsConf, + proxyOpts, atplManager, tlsConf, ) if err != nil { srvrLog.Errorf("unable to start RPC server: %v", err) @@ -375,20 +391,14 @@ func lndMain() error { defer server.Stop() // Now that the server has started, if the autopilot mode is currently - // active, then we'll initialize a fresh instance of it and start it. + // active, then we'll start the autopilot agent immediately. It will be + // stopped together with the autopilot service. if cfg.Autopilot.Active { - pilot, err := initAutoPilot(server, cfg.Autopilot) - if err != nil { - ltndLog.Errorf("unable to create autopilot agent: %v", - err) - return err - } - if err := pilot.Start(); err != nil { + if err := atplManager.StartAgent(); err != nil { ltndLog.Errorf("unable to start autopilot agent: %v", err) return err } - defer pilot.Stop() } // Wait for shutdown signal from either a graceful server stop or from diff --git a/lnrpc/autopilotrpc/autopilot.pb.go b/lnrpc/autopilotrpc/autopilot.pb.go new file mode 100644 index 00000000..9878b99b --- /dev/null +++ b/lnrpc/autopilotrpc/autopilot.pb.go @@ -0,0 +1,226 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: autopilotrpc/autopilot.proto + +/* +Package autopilotrpc is a generated protocol buffer package. + +It is generated from these files: + autopilotrpc/autopilot.proto + +It has these top-level messages: + StatusRequest + StatusResponse + ModifyStatusRequest + ModifyStatusResponse +*/ +package autopilotrpc + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type StatusRequest struct { +} + +func (m *StatusRequest) Reset() { *m = StatusRequest{} } +func (m *StatusRequest) String() string { return proto.CompactTextString(m) } +func (*StatusRequest) ProtoMessage() {} +func (*StatusRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +type StatusResponse struct { + // / Indicates whether the autopilot is active or not. + Active bool `protobuf:"varint,1,opt,name=active" json:"active,omitempty"` +} + +func (m *StatusResponse) Reset() { *m = StatusResponse{} } +func (m *StatusResponse) String() string { return proto.CompactTextString(m) } +func (*StatusResponse) ProtoMessage() {} +func (*StatusResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +func (m *StatusResponse) GetActive() bool { + if m != nil { + return m.Active + } + return false +} + +type ModifyStatusRequest struct { + // / Whether the autopilot agent should be enabled or not. + Enable bool `protobuf:"varint,1,opt,name=enable" json:"enable,omitempty"` +} + +func (m *ModifyStatusRequest) Reset() { *m = ModifyStatusRequest{} } +func (m *ModifyStatusRequest) String() string { return proto.CompactTextString(m) } +func (*ModifyStatusRequest) ProtoMessage() {} +func (*ModifyStatusRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } + +func (m *ModifyStatusRequest) GetEnable() bool { + if m != nil { + return m.Enable + } + return false +} + +type ModifyStatusResponse struct { +} + +func (m *ModifyStatusResponse) Reset() { *m = ModifyStatusResponse{} } +func (m *ModifyStatusResponse) String() string { return proto.CompactTextString(m) } +func (*ModifyStatusResponse) ProtoMessage() {} +func (*ModifyStatusResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } + +func init() { + proto.RegisterType((*StatusRequest)(nil), "autopilotrpc.StatusRequest") + proto.RegisterType((*StatusResponse)(nil), "autopilotrpc.StatusResponse") + proto.RegisterType((*ModifyStatusRequest)(nil), "autopilotrpc.ModifyStatusRequest") + proto.RegisterType((*ModifyStatusResponse)(nil), "autopilotrpc.ModifyStatusResponse") +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// Client API for Autopilot service + +type AutopilotClient interface { + // * + // Status returns whether the daemon's autopilot agent is active. + Status(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*StatusResponse, error) + // * + // ModifyStatus is used to modify the status of the autopilot agent, like + // enabling or disabling it. + ModifyStatus(ctx context.Context, in *ModifyStatusRequest, opts ...grpc.CallOption) (*ModifyStatusResponse, error) +} + +type autopilotClient struct { + cc *grpc.ClientConn +} + +func NewAutopilotClient(cc *grpc.ClientConn) AutopilotClient { + return &autopilotClient{cc} +} + +func (c *autopilotClient) Status(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*StatusResponse, error) { + out := new(StatusResponse) + err := grpc.Invoke(ctx, "/autopilotrpc.Autopilot/Status", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *autopilotClient) ModifyStatus(ctx context.Context, in *ModifyStatusRequest, opts ...grpc.CallOption) (*ModifyStatusResponse, error) { + out := new(ModifyStatusResponse) + err := grpc.Invoke(ctx, "/autopilotrpc.Autopilot/ModifyStatus", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for Autopilot service + +type AutopilotServer interface { + // * + // Status returns whether the daemon's autopilot agent is active. + Status(context.Context, *StatusRequest) (*StatusResponse, error) + // * + // ModifyStatus is used to modify the status of the autopilot agent, like + // enabling or disabling it. + ModifyStatus(context.Context, *ModifyStatusRequest) (*ModifyStatusResponse, error) +} + +func RegisterAutopilotServer(s *grpc.Server, srv AutopilotServer) { + s.RegisterService(&_Autopilot_serviceDesc, srv) +} + +func _Autopilot_Status_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(StatusRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(AutopilotServer).Status(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/autopilotrpc.Autopilot/Status", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(AutopilotServer).Status(ctx, req.(*StatusRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Autopilot_ModifyStatus_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ModifyStatusRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(AutopilotServer).ModifyStatus(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/autopilotrpc.Autopilot/ModifyStatus", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(AutopilotServer).ModifyStatus(ctx, req.(*ModifyStatusRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _Autopilot_serviceDesc = grpc.ServiceDesc{ + ServiceName: "autopilotrpc.Autopilot", + HandlerType: (*AutopilotServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Status", + Handler: _Autopilot_Status_Handler, + }, + { + MethodName: "ModifyStatus", + Handler: _Autopilot_ModifyStatus_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "autopilotrpc/autopilot.proto", +} + +func init() { proto.RegisterFile("autopilotrpc/autopilot.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 183 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x49, 0x2c, 0x2d, 0xc9, + 0x2f, 0xc8, 0xcc, 0xc9, 0x2f, 0x29, 0x2a, 0x48, 0xd6, 0x87, 0x73, 0xf4, 0x0a, 0x8a, 0xf2, 0x4b, + 0xf2, 0x85, 0x78, 0x90, 0x65, 0x95, 0xf8, 0xb9, 0x78, 0x83, 0x4b, 0x12, 0x4b, 0x4a, 0x8b, 0x83, + 0x52, 0x0b, 0x4b, 0x53, 0x8b, 0x4b, 0x94, 0x34, 0xb8, 0xf8, 0x60, 0x02, 0xc5, 0x05, 0xf9, 0x79, + 0xc5, 0xa9, 0x42, 0x62, 0x5c, 0x6c, 0x89, 0xc9, 0x25, 0x99, 0x65, 0xa9, 0x12, 0x8c, 0x0a, 0x8c, + 0x1a, 0x1c, 0x41, 0x50, 0x9e, 0x92, 0x2e, 0x97, 0xb0, 0x6f, 0x7e, 0x4a, 0x66, 0x5a, 0x25, 0x8a, + 0x01, 0x20, 0xe5, 0xa9, 0x79, 0x89, 0x49, 0x39, 0x70, 0xe5, 0x10, 0x9e, 0x92, 0x18, 0x97, 0x08, + 0xaa, 0x72, 0x88, 0xf1, 0x46, 0xcb, 0x19, 0xb9, 0x38, 0x1d, 0x61, 0x4e, 0x12, 0x72, 0xe6, 0x62, + 0x83, 0xc8, 0x0b, 0x49, 0xeb, 0x21, 0x3b, 0x54, 0x0f, 0xc5, 0x12, 0x29, 0x19, 0xec, 0x92, 0x50, + 0x17, 0x87, 0x72, 0xf1, 0x20, 0x5b, 0x25, 0xa4, 0x88, 0xaa, 0x1a, 0x8b, 0xab, 0xa5, 0x94, 0xf0, + 0x29, 0x81, 0x18, 0x9b, 0xc4, 0x06, 0x0e, 0x40, 0x63, 0x40, 0x00, 0x00, 0x00, 0xff, 0xff, 0xf8, + 0x0f, 0x84, 0xb8, 0x60, 0x01, 0x00, 0x00, +} diff --git a/lnrpc/autopilotrpc/autopilot.proto b/lnrpc/autopilotrpc/autopilot.proto new file mode 100644 index 00000000..ee39c087 --- /dev/null +++ b/lnrpc/autopilotrpc/autopilot.proto @@ -0,0 +1,34 @@ +syntax = "proto3"; + +package autopilotrpc; + +// Autopilot is a service that can be used to get information about the current +// state of the daemon's autopilot agent, and also supply it with information +// that can be used when deciding where to open channels. +service Autopilot { + /** + Status returns whether the daemon's autopilot agent is active. + */ + rpc Status(StatusRequest) returns (StatusResponse); + + /** + ModifyStatus is used to modify the status of the autopilot agent, like + enabling or disabling it. + */ + rpc ModifyStatus(ModifyStatusRequest) returns (ModifyStatusResponse); +} + +message StatusRequest{ +} + +message StatusResponse{ + /// Indicates whether the autopilot is active or not. + bool active = 1 [json_name = "active"]; +} + +message ModifyStatusRequest{ + /// Whether the autopilot agent should be enabled or not. + bool enable = 1 [json_name = "enable"]; +} + +message ModifyStatusResponse {} diff --git a/lnrpc/autopilotrpc/autopilot_server.go b/lnrpc/autopilotrpc/autopilot_server.go new file mode 100644 index 00000000..9d5b6421 --- /dev/null +++ b/lnrpc/autopilotrpc/autopilot_server.go @@ -0,0 +1,156 @@ +// +build autopilotrpc + +package autopilotrpc + +import ( + "context" + "os" + "sync/atomic" + + "github.com/lightningnetwork/lnd/autopilot" + "github.com/lightningnetwork/lnd/lnrpc" + "google.golang.org/grpc" + "gopkg.in/macaroon-bakery.v2/bakery" +) + +const ( + // subServerName is the name of the sub rpc server. We'll use this name + // to register ourselves, and we also require that the main + // SubServerConfigDispatcher instance recognize tt as the name of our + // RPC service. + subServerName = "AutopilotRPC" +) + +var ( + // macPermissions maps RPC calls to the permissions they require. + macPermissions = map[string][]bakery.Op{ + "/autopilotrpc.Autopilot/Status": {{ + Entity: "info", + Action: "read", + }}, + "/autopilotrpc.Autopilot/ModifyStatus": {{ + Entity: "onchain", + Action: "write", + }, { + Entity: "offchain", + Action: "write", + }}, + } +) + +// Server is a sub-server of the main RPC server: the autopilot RPC. This sub +// RPC server allows external callers to access the status of the autopilot +// currently active within lnd, as well as configuring it at runtime. +type Server struct { + started int32 // To be used atomically. + shutdown int32 // To be used atomically. + + cfg *Config + + manager *autopilot.Manager +} + +// A compile time check to ensure that Server fully implements the +// AutopilotServer gRPC service. +var _ AutopilotServer = (*Server)(nil) + +// fileExists reports whether the named file or directory exists. +func fileExists(name string) bool { + if _, err := os.Stat(name); err != nil { + if os.IsNotExist(err) { + return false + } + } + return true +} + +// New returns a new instance of the autopilotrpc Autopilot sub-server. We also +// return the set of permissions for the macaroons that we may create within +// this method. If the macaroons we need aren't found in the filepath, then +// we'll create them on start up. If we're unable to locate, or create the +// macaroons we need, then we'll return with an error. +func New(cfg *Config) (*Server, lnrpc.MacaroonPerms, error) { + // We don't create any new macaroons for this subserver, instead reuse + // existing onchain/offchain permissions. + server := &Server{ + cfg: cfg, + manager: cfg.Manager, + } + + return server, macPermissions, nil +} + +// Start launches any helper goroutines required for the Server to function. +// +// NOTE: This is part of the lnrpc.SubServer interface. +func (s *Server) Start() error { + if atomic.AddInt32(&s.started, 1) != 1 { + return nil + } + + return s.manager.Start() +} + +// Stop signals any active goroutines for a graceful closure. +// +// NOTE: This is part of the lnrpc.SubServer interface. +func (s *Server) Stop() error { + if atomic.AddInt32(&s.shutdown, 1) != 1 { + return nil + } + + return s.manager.Stop() +} + +// Name returns a unique string representation of the sub-server. This can be +// used to identify the sub-server and also de-duplicate them. +// +// NOTE: This is part of the lnrpc.SubServer interface. +func (s *Server) Name() string { + return subServerName +} + +// RegisterWithRootServer will be called by the root gRPC server to direct a +// sub RPC server to register itself with the main gRPC root server. Until this +// is called, each sub-server won't be able to have +// requests routed towards it. +// +// NOTE: This is part of the lnrpc.SubServer interface. +func (s *Server) RegisterWithRootServer(grpcServer *grpc.Server) error { + // We make sure that we register it with the main gRPC server to ensure + // all our methods are routed properly. + RegisterAutopilotServer(grpcServer, s) + + log.Debugf("Autopilot RPC server successfully register with root " + + "gRPC server") + + return nil +} + +// Status returns the current status of the autopilot agent. +// +// NOTE: Part of the AutopilotServer interface. +func (s *Server) Status(ctx context.Context, + in *StatusRequest) (*StatusResponse, error) { + + return &StatusResponse{ + Active: s.manager.IsActive(), + }, nil +} + +// ModifyStatus activates the current autopilot agent, if active. +// +// NOTE: Part of the AutopilotServer interface. +func (s *Server) ModifyStatus(ctx context.Context, + in *ModifyStatusRequest) (*ModifyStatusResponse, error) { + + log.Debugf("Setting agent enabled=%v", in.Enable) + + var err error + if in.Enable { + err = s.manager.StartAgent() + } else { + err = s.manager.StopAgent() + } + return &ModifyStatusResponse{}, err +} diff --git a/lnrpc/autopilotrpc/config_active.go b/lnrpc/autopilotrpc/config_active.go new file mode 100644 index 00000000..93aca4f2 --- /dev/null +++ b/lnrpc/autopilotrpc/config_active.go @@ -0,0 +1,17 @@ +// +build autopilotrpc + +package autopilotrpc + +import ( + "github.com/lightningnetwork/lnd/autopilot" +) + +// Config is the primary configuration struct for the autopilot RPC server. It +// contains all the items required for the rpc server to carry out its +// duties. The fields with struct tags are meant to be parsed as normal +// configuration options, while if able to be populated, the latter fields MUST +// also be specified. +type Config struct { + // Manager is the running autopilot manager. + Manager *autopilot.Manager +} diff --git a/lnrpc/autopilotrpc/config_default.go b/lnrpc/autopilotrpc/config_default.go new file mode 100644 index 00000000..2d42ab51 --- /dev/null +++ b/lnrpc/autopilotrpc/config_default.go @@ -0,0 +1,6 @@ +// +build !autopilotrpc + +package autopilotrpc + +// Config is empty for non-autopilotrpc builds. +type Config struct{} diff --git a/lnrpc/autopilotrpc/driver.go b/lnrpc/autopilotrpc/driver.go new file mode 100644 index 00000000..f6c62040 --- /dev/null +++ b/lnrpc/autopilotrpc/driver.go @@ -0,0 +1,63 @@ +// +build autopilotrpc + +package autopilotrpc + +import ( + "fmt" + + "github.com/lightningnetwork/lnd/lnrpc" +) + +// createNewSubServer is a helper method that will create the new sub server +// given the main config dispatcher method. If we're unable to find the config +// that is meant for us in the config dispatcher, then we'll exit with an +// error. +func createNewSubServer(configRegistry lnrpc.SubServerConfigDispatcher) ( + lnrpc.SubServer, lnrpc.MacaroonPerms, error) { + + // We'll attempt to look up the config that we expect, according to our + // subServerName name. If we can't find this, then we'll exit with an + // error, as we're unable to properly initialize ourselves without this + // config. + subServerConf, ok := configRegistry.FetchConfig(subServerName) + if !ok { + return nil, nil, fmt.Errorf("unable to find config for "+ + "subserver type %s", subServerName) + } + + // Now that we've found an object mapping to our service name, we'll + // ensure that it's the type we need. + config, ok := subServerConf.(*Config) + if !ok { + return nil, nil, fmt.Errorf("wrong type of config for "+ + "subserver %s, expected %T got %T", subServerName, + &Config{}, subServerConf) + } + + // Before we try to make the new service instance, we'll perform + // some sanity checks on the arguments to ensure taht they're useable. + switch { + case config.Manager == nil: + return nil, nil, fmt.Errorf("Manager must be set to create " + + "Autopilotrpc") + } + + return New(config) +} + +func init() { + subServer := &lnrpc.SubServerDriver{ + SubServerName: subServerName, + New: func(c lnrpc.SubServerConfigDispatcher) (lnrpc.SubServer, + lnrpc.MacaroonPerms, error) { + return createNewSubServer(c) + }, + } + + // If the build tag is active, then we'll register ourselves as a + // sub-RPC server within the global lnrpc package namespace. + if err := lnrpc.RegisterSubServer(subServer); err != nil { + panic(fmt.Sprintf("failed to register sub server driver "+ + "'%s': %v", subServerName, err)) + } +} diff --git a/lnrpc/autopilotrpc/log.go b/lnrpc/autopilotrpc/log.go new file mode 100644 index 00000000..caf38119 --- /dev/null +++ b/lnrpc/autopilotrpc/log.go @@ -0,0 +1,45 @@ +package autopilotrpc + +import ( + "github.com/btcsuite/btclog" + "github.com/lightningnetwork/lnd/build" +) + +// log is a logger that is initialized with no output filters. This means the +// package will not perform any logging by default until the caller requests +// it. +var log btclog.Logger + +// The default amount of logging is none. +func init() { + UseLogger(build.NewSubLogger("ARPC", nil)) +} + +// DisableLog disables all library log output. Logging output is disabled by +// by default until UseLogger is called. +func DisableLog() { + UseLogger(btclog.Disabled) +} + +// UseLogger uses a specified Logger to output package logging info. This +// should be used in preference to SetLogWriter if the caller is also using +// btclog. +func UseLogger(logger btclog.Logger) { + log = logger +} + +// logClosure is used to provide a closure over expensive logging operations so +// don't have to be performed when the logging level doesn't warrant it. +type logClosure func() string + +// String invokes the underlying function and returns the result. +func (c logClosure) String() string { + return c() +} + +// newLogClosure returns a new closure over a function that returns a string +// which itself provides a Stringer interface so that it can be used with the +// logging system. +func newLogClosure(c func() string) logClosure { + return logClosure(c) +} diff --git a/log.go b/log.go index aa2e32d5..21f647c0 100644 --- a/log.go +++ b/log.go @@ -19,6 +19,7 @@ import ( "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/discovery" "github.com/lightningnetwork/lnd/htlcswitch" + "github.com/lightningnetwork/lnd/lnrpc/autopilotrpc" "github.com/lightningnetwork/lnd/lnrpc/signrpc" "github.com/lightningnetwork/lnd/lnrpc/walletrpc" "github.com/lightningnetwork/lnd/lnwallet" @@ -69,6 +70,7 @@ var ( swprLog = build.NewSubLogger("SWPR", backendLog.Logger) sgnrLog = build.NewSubLogger("SGNR", backendLog.Logger) wlktLog = build.NewSubLogger("WLKT", backendLog.Logger) + arpcLog = build.NewSubLogger("ARPC", backendLog.Logger) ) // Initialize package-global logger variables. @@ -88,6 +90,7 @@ func init() { sweep.UseLogger(swprLog) signrpc.UseLogger(sgnrLog) walletrpc.UseLogger(wlktLog) + autopilotrpc.UseLogger(arpcLog) } // subsystemLoggers maps each subsystem identifier to its associated logger. @@ -113,6 +116,7 @@ var subsystemLoggers = map[string]btclog.Logger{ "SWPR": swprLog, "SGNR": sgnrLog, "WLKT": wlktLog, + "ARPC": arpcLog, } // initLogRotator initializes the logging rotator to write logs to logFile and diff --git a/pilot.go b/pilot.go index 174452d7..9f352742 100644 --- a/pilot.go +++ b/pilot.go @@ -79,10 +79,11 @@ func (c *chanController) SpliceOut(chanPoint *wire.OutPoint, // autopilot.ChannelController interface. var _ autopilot.ChannelController = (*chanController)(nil) -// initAutoPilot initializes a new autopilot.Agent instance based on the passed -// configuration struct. All interfaces needed to drive the pilot will be -// registered and launched. -func initAutoPilot(svr *server, cfg *autoPilotConfig) (*autopilot.Agent, error) { +// initAutoPilot initializes a new autopilot.ManagerCfg to manage an +// autopilot.Agent instance based on the passed configuration struct. The agent +// and all interfaces needed to drive it won't be launched before the Manager's +// StartAgent method is called. +func initAutoPilot(svr *server, cfg *autoPilotConfig) *autopilot.ManagerCfg { atplLog.Infof("Instantiating autopilot with cfg: %v", spew.Sdump(cfg)) // Set up the constraints the autopilot heuristics must adhere to. @@ -176,143 +177,33 @@ func initAutoPilot(svr *server, cfg *autoPilotConfig) (*autopilot.Agent, error) DisconnectPeer: svr.DisconnectPeer, } - // Next, we'll fetch the current state of open channels from the - // database to use as initial state for the auto-pilot agent. - activeChannels, err := svr.chanDB.FetchAllChannels() - if err != nil { - return nil, err - } - initialChanState := make([]autopilot.Channel, len(activeChannels)) - for i, channel := range activeChannels { - initialChanState[i] = autopilot.Channel{ - ChanID: channel.ShortChanID(), - Capacity: channel.Capacity, - Node: autopilot.NewNodeID(channel.IdentityPub), - } - } - - // Now that we have all the initial dependencies, we can create the - // auto-pilot instance itself. - pilot, err := autopilot.New(pilotCfg, initialChanState) - if err != nil { - return nil, err - } - - // Finally, we'll need to subscribe to two things: incoming - // transactions that modify the wallet's balance, and also any graph - // topology updates. - txnSubscription, err := svr.cc.wallet.SubscribeTransactions() - if err != nil { - return nil, err - } - graphSubscription, err := svr.chanRouter.SubscribeTopology() - if err != nil { - return nil, err - } - - // We'll launch a goroutine to provide the agent with notifications - // whenever the balance of the wallet changes. - svr.wg.Add(2) - go func() { - defer txnSubscription.Cancel() - defer svr.wg.Done() - - for { - select { - case <-txnSubscription.ConfirmedTransactions(): - pilot.OnBalanceChange() - case <-svr.quit: - return + // Create and return the autopilot.ManagerCfg that administrates this + // agent-pilot instance. + return &autopilot.ManagerCfg{ + Self: self, + PilotCfg: &pilotCfg, + ChannelState: func() ([]autopilot.Channel, error) { + // We'll fetch the current state of open + // channels from the database to use as initial + // state for the auto-pilot agent. + activeChannels, err := svr.chanDB.FetchAllChannels() + if err != nil { + return nil, err } - } - - }() - go func() { - defer svr.wg.Done() - - for { - select { - // We won't act upon new unconfirmed transaction, as - // we'll only use confirmed outputs when funding. - // However, we will still drain this request in order - // to avoid goroutine leaks, and ensure we promptly - // read from the channel if available. - case <-txnSubscription.UnconfirmedTransactions(): - case <-svr.quit: - return + chanState := make([]autopilot.Channel, + len(activeChannels)) + for i, channel := range activeChannels { + chanState[i] = autopilot.Channel{ + ChanID: channel.ShortChanID(), + Capacity: channel.Capacity, + Node: autopilot.NewNodeID( + channel.IdentityPub), + } } - } - }() - - // We'll also launch a goroutine to provide the agent with - // notifications for when the graph topology controlled by the node - // changes. - svr.wg.Add(1) - go func() { - defer graphSubscription.Cancel() - defer svr.wg.Done() - - for { - select { - case topChange, ok := <-graphSubscription.TopologyChanges: - // If the router is shutting down, then we will - // as well. - if !ok { - return - } - - for _, edgeUpdate := range topChange.ChannelEdgeUpdates { - // If this isn't an advertisement by - // the backing lnd node, then we'll - // continue as we only want to add - // channels that we've created - // ourselves. - if !edgeUpdate.AdvertisingNode.IsEqual(self) { - continue - } - - // If this is indeed a channel we - // opened, then we'll convert it to the - // autopilot.Channel format, and notify - // the pilot of the new channel. - chanNode := autopilot.NewNodeID( - edgeUpdate.ConnectingNode, - ) - chanID := lnwire.NewShortChanIDFromInt( - edgeUpdate.ChanID, - ) - edge := autopilot.Channel{ - ChanID: chanID, - Capacity: edgeUpdate.Capacity, - Node: chanNode, - } - pilot.OnChannelOpen(edge) - } - - // For each closed channel, we'll obtain - // the chanID of the closed channel and send it - // to the pilot. - for _, chanClose := range topChange.ClosedChannels { - chanID := lnwire.NewShortChanIDFromInt( - chanClose.ChanID, - ) - - pilot.OnChannelClose(chanID) - } - - // If new nodes were added to the graph, or nod - // information has changed, we'll poke autopilot - // to see if it can make use of them. - if len(topChange.NodeUpdates) > 0 { - pilot.OnNodeUpdates() - } - - case <-svr.quit: - return - } - } - }() - - return pilot, nil + return chanState, nil + }, + SubscribeTransactions: svr.cc.wallet.SubscribeTransactions, + SubscribeTopology: svr.chanRouter.SubscribeTopology, + } } diff --git a/rpcserver.go b/rpcserver.go index f8ffbbdf..d5397569 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -27,6 +27,7 @@ import ( "github.com/coreos/bbolt" "github.com/davecgh/go-spew/spew" proxy "github.com/grpc-ecosystem/grpc-gateway/runtime" + "github.com/lightningnetwork/lnd/autopilot" "github.com/lightningnetwork/lnd/build" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/htlcswitch" @@ -392,7 +393,7 @@ var _ lnrpc.LightningServer = (*rpcServer)(nil) // like requiring TLS, etc. func newRPCServer(s *server, macService *macaroons.Service, subServerCgs *subRPCServerConfigs, serverOpts []grpc.ServerOption, - restServerOpts []grpc.DialOption, + restServerOpts []grpc.DialOption, atpl *autopilot.Manager, tlsCfg *tls.Config) (*rpcServer, error) { var ( @@ -404,7 +405,7 @@ func newRPCServer(s *server, macService *macaroons.Service, // the dependencies they need are properly populated within each sub // server configuration struct. err := subServerCgs.PopulateDependencies( - s.cc, networkDir, macService, + s.cc, networkDir, macService, atpl, ) if err != nil { return nil, err diff --git a/subrpcserver_config.go b/subrpcserver_config.go index b63cf95c..a60790ad 100644 --- a/subrpcserver_config.go +++ b/subrpcserver_config.go @@ -4,6 +4,8 @@ import ( "fmt" "reflect" + "github.com/lightningnetwork/lnd/autopilot" + "github.com/lightningnetwork/lnd/lnrpc/autopilotrpc" "github.com/lightningnetwork/lnd/lnrpc/signrpc" "github.com/lightningnetwork/lnd/lnrpc/walletrpc" "github.com/lightningnetwork/lnd/macaroons" @@ -25,6 +27,10 @@ type subRPCServerConfigs struct { // also requests keys and addresses under control of the backing // wallet. WalletKitRPC *walletrpc.Config `group:"walletrpc" namespace:"walletrpc"` + + // AutopilotRPC is a sub-RPC server that exposes methods on the running + // autopilot as a gRPC service. + AutopilotRPC *autopilotrpc.Config `group:"autopilotrpc" namespace:"autopilotrpc"` } // PopulateDependencies attempts to iterate through all the sub-server configs @@ -34,7 +40,8 @@ type subRPCServerConfigs struct { // NOTE: This MUST be called before any callers are permitted to execute the // FetchConfig method. func (s *subRPCServerConfigs) PopulateDependencies(cc *chainControl, - networkDir string, macService *macaroons.Service) error { + networkDir string, macService *macaroons.Service, + atpl *autopilot.Manager) error { // First, we'll use reflect to obtain a version of the config struct // that allows us to programmatically inspect its fields. @@ -92,6 +99,13 @@ func (s *subRPCServerConfigs) PopulateDependencies(cc *chainControl, reflect.ValueOf(cc.keyRing), ) + case *autopilotrpc.Config: + subCfgValue := extractReflectValue(cfg) + + subCfgValue.FieldByName("Manager").Set( + reflect.ValueOf(atpl), + ) + default: return fmt.Errorf("unknown field: %v, %T", fieldName, cfg)