diff --git a/autopilot/agent.go b/autopilot/agent.go new file mode 100644 index 00000000..32d7daa2 --- /dev/null +++ b/autopilot/agent.go @@ -0,0 +1,353 @@ +package autopilot + +import ( + "sync" + "sync/atomic" + + "github.com/davecgh/go-spew/spew" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/roasbeef/btcd/btcec" + "github.com/roasbeef/btcutil" +) + +// Config couples all the items that that an autopilot agent needs to function. +// All items within the struct MUST be populated for the Agent to be able to +// carry out its duties. +type Config struct { + // Self is the identity public key of the Lightning Network node that + // is being driven by the agent. This is used to ensure that we don't + // accidentally attempt to open a channel with ourselves. + Self *btcec.PublicKey + + // Heuristic is an attachment heuristic which will govern to whom we + // open channels to, and also what those channels look like in terms of + // desired capacity. The Heuristic will take into account the current + // state of the graph, our set of open channels, and the amount of + // available funds when determining how channels are to be opened. + // Additionally, a heuristic make also factor in extra-graph + // information in order to make more pertinent recommendations. + Heuristic AttachmentHeuristic + + // ChanController is an interface that is able to directly manage the + // creation, closing and update of channels within the network. + ChanController ChannelController + + // WalletBalance is a function closure that should return the current + // available balance o the backing wallet. + WalletBalance func() (btcutil.Amount, error) + + // Graph is an abstract channel graph that the Heuristic and the Agent + // will use to make decisions w.r.t channel allocation and placement + // within the graph. + Graph ChannelGraph + + // TODO(roasbeef): add additional signals from fee rates and revenue of + // currently opened channels +} + +// channelState is a type that represents the set of active channels of the +// backing LN node that the Agent should be ware of. This type contains a few +// helper utility methods. +type channelState map[lnwire.ShortChannelID]Channel + +// CHannels returns a slice of all the active channels. +func (c channelState) Channels() []Channel { + chans := make([]Channel, 0, len(c)) + for _, channel := range c { + chans = append(chans, channel) + } + return chans +} + +// ConnectedNodes returns the set of nodes we currently have a channel with. +// This information is needed as we want to avoid making repeated channels with +// any node. +func (c channelState) ConnectedNodes() map[NodeID]struct{} { + nodes := make(map[NodeID]struct{}) + for _, channels := range c { + nodes[channels.Node] = struct{}{} + } + return nodes +} + +// Agent implements a closed-loop control system which seeks to autonomously +// optimize the allocation of satoshis within channels throughput the network's +// channel graph. An agent is configurable by swapping out different +// AttachmentHeuristic strategies. The agent uses external signals such as the +// wallet balance changing, or new channels being opened/closed for the local +// node as an indicator to re-examine its internal state, and the amount of +// available funds in order to make updated decisions w.r.t the channel graph. +// The Agent will automatically open, close, and splice in/out channel as +// necessary for it to step closer to its optimal state. +// +// TODO(roasbeef): prob re-word +type Agent struct { + // Only to be used atomically. + started uint32 + stopped uint32 + + // cfg houses the configuration state of the Ant. + cfg Config + + // chanState tracks the current set of open channels. + chanState channelState + + // stateUpdates is a channel that any external state updates that may + // affect the heuristics of the agent will be sent over. + stateUpdates chan interface{} + + // totalBalance is the total number of satoshis the backing wallet is + // known to control at any given instance. This value will be updated + // when the agent receives external balance update signals. + totalBalance btcutil.Amount + + quit chan struct{} + wg sync.WaitGroup +} + +// New creates a new instance of the Agent instantiated using the passed +// configuration and initial channel state. The initial channel state slice +// should be populated with the set of Channels that are currently opened by +// the backing Lightning Node. +func New(cfg Config, initialState []Channel) (*Agent, error) { + a := &Agent{ + cfg: cfg, + chanState: make(map[lnwire.ShortChannelID]Channel), + quit: make(chan struct{}), + stateUpdates: make(chan interface{}), + } + + for _, c := range initialState { + a.chanState[c.ChanID] = c + } + + return a, nil +} + +// Start starts the agent along with any goroutines it needs to perform its +// normal duties. +func (a *Agent) Start() error { + if !atomic.CompareAndSwapUint32(&a.started, 0, 1) { + return nil + } + + log.Infof("Autopilot Agent starting") + + startingBalance, err := a.cfg.WalletBalance() + if err != nil { + return err + } + + a.wg.Add(1) + go a.controller(startingBalance) + + return nil +} + +// Stop signals the Agent to gracefully shutdown. This function will block +// until all goroutines have exited. +func (a *Agent) Stop() error { + if !atomic.CompareAndSwapUint32(&a.stopped, 0, 1) { + return nil + } + + log.Infof("Autopilot Agent stopping") + + close(a.quit) + a.wg.Wait() + + return nil +} + +// balanceUpdate is a type of external state update that reflects an +// increase/decrease in the funds currently available to the wallet. +type balanceUpdate struct { + balanceDelta btcutil.Amount +} + +// chanOpenUpdate is a type of external state update the indicates a new +// channel has been opened, either by the Agent itself (within the main +// controller loop), or by an external user to the system. +type chanOpenUpdate struct { + newChan Channel +} + +// chanCloseUpdate is a type of external state update that indicates that the +// backing Lightning Node has closed a previously open channel. +type chanCloseUpdate struct { + closedChans []lnwire.ShortChannelID +} + +// OnBalanceChange is a callback that should be executed each the balance of +// the backing wallet changes. +func (a *Agent) OnBalanceChange(delta btcutil.Amount) { + go func() { + a.stateUpdates <- &balanceUpdate{ + balanceDelta: delta, + } + }() +} + +// OnChannelOpen is a callback that should be executed each time a new channel +// is manually opened by the user or any system outside the autopilot agent. +func (a *Agent) OnChannelOpen(c Channel) { + go func() { + a.stateUpdates <- &chanOpenUpdate{ + newChan: c, + } + }() +} + +// OnChannelClose is a callback that should be executed each time a prior +// channel has been closed for any reason. This includes regular +// closes, force closes, and channel breaches. +func (a *Agent) OnChannelClose(closedChans ...lnwire.ShortChannelID) { + go func() { + a.stateUpdates <- &chanCloseUpdate{ + closedChans: closedChans, + } + }() +} + +// controller implements the closed-loop control system of the Agent. The +// controller will make a decision w.r.t channel placement within the graph +// based on: it's current internal state of the set of active channels open, +// and external state changes as a result of decisions it makes w.r.t channel +// allocation, or attributes affecting its control loop being updated by the +// backing Lightning Node. +func (a *Agent) controller(startingBalance btcutil.Amount) { + defer a.wg.Done() + + // TODO(roasbeef): add queries for internal state? + + // We'll start off by assigning our starting balance, and injecting + // that amount as an initial wake up to the main controller goroutine. + a.OnBalanceChange(startingBalance) + + // TODO(roasbeef): do we in fact need to maintain order? + // * use sync.Cond if so + + // TODO(roasbeef): add 10-minute wake up timer + for { + select { + // A new external signal has arrived. We'll use this to update + // our internal state, then determine if we should trigger a + // channel state modification (open/close, splice in/out). + case signal := <-a.stateUpdates: + log.Infof("Processing new external signal") + + switch update := signal.(type) { + // The balance of the backing wallet has changed, if + // more funds are now available, we may attempt to open + // up an additional channel, or splice in funds to an + // existing one. + case *balanceUpdate: + log.Debugf("Applying external balance state "+ + "update of: %v", update.balanceDelta) + + a.totalBalance += update.balanceDelta + + // A new channel has been opened successfully. This was + // either opened by the Agent, or an external system + // that is able to drive the Lightning Node. + case *chanOpenUpdate: + log.Debugf("New channel successfully opened, "+ + "updating state with: %v", + spew.Sdump(update.newChan)) + + newChan := update.newChan + a.chanState[newChan.ChanID] = newChan + + // A channel has been closed, this may free up an + // available slot, triggering a new channel update. + case *chanCloseUpdate: + log.Debugf("Applying closed channel "+ + "updates: %v", + spew.Sdump(update.closedChans)) + + for _, closedChan := range update.closedChans { + delete(a.chanState, closedChan) + } + } + + // With all the updates applied, we'll obtain a set of + // the current active channels. + chans := a.chanState.Channels() + + // Now that we've updated our internal state, we'll + // consult our channel attachment heuristic to + // determine if we should open up any additional + // channels or modify existing channels. + availableFunds, needMore := a.cfg.Heuristic.NeedMoreChans( + chans, a.totalBalance, + ) + if !needMore { + continue + } + + log.Infof("Triggering attachment directive dispatch") + + // We're to attempt an attachment so we'll o obtain the + // set of nodes that we currently have channels with so + // we avoid duplicate edges. + nodesToSkip := a.chanState.ConnectedNodes() + + // If we reach this point, then according to our + // heuristic we should modify our channel state to tend + // towards what it determines to the optimal state. So + // we'll call Select to get a fresh batch of attachment + // directives, passing in the amount of funds available + // for us to use. + chanCandidates, err := a.cfg.Heuristic.Select( + a.cfg.Self, a.cfg.Graph, availableFunds, + nodesToSkip, + ) + if err != nil { + log.Errorf("Unable to select candidates for "+ + "attachment: %v", err) + continue + } + + if len(chanCandidates) == 0 { + log.Infof("No eligible candidates to connect to") + continue + } + + log.Infof("Attempting to execute channel attachment "+ + "directives: %v", spew.Sdump(chanCandidates)) + + // For each recommended attachment directive, we'll + // launch a new goroutine to attempt to carry out the + // directive. If any of these succeed, then we'll + // receive a new state update, taking us back to the + // top of our controller loop. + for _, chanCandidate := range chanCandidates { + go func(directive AttachmentDirective) { + pub := directive.PeerKey + err := a.cfg.ChanController.OpenChannel( + directive.PeerKey, + directive.ChanAmt, + directive.Addrs, + ) + if err != nil { + log.Warnf("Unable to open "+ + "channel to %x of %v: %v", + pub.SerializeCompressed(), + directive.ChanAmt, err) + return + } + + // TODO(roasbeef): on err signal + // failure so attempt to allocate + // again? + }(chanCandidate) + } + + // The agent has been signalled to exit, so we'll bail out + // immediately. + case <-a.quit: + return + } + } + +} diff --git a/autopilot/agent_test.go b/autopilot/agent_test.go new file mode 100644 index 00000000..03a75c22 --- /dev/null +++ b/autopilot/agent_test.go @@ -0,0 +1,554 @@ +package autopilot + +import ( + "bytes" + "net" + "sync" + "testing" + "time" + + "github.com/roasbeef/btcd/btcec" + "github.com/roasbeef/btcd/wire" + "github.com/roasbeef/btcutil" +) + +type moreChansResp struct { + needMore bool + amt btcutil.Amount +} + +type mockHeuristic struct { + moreChansResps chan moreChansResp + directiveResps chan []AttachmentDirective +} + +func (m *mockHeuristic) NeedMoreChans(chans []Channel, + balance btcutil.Amount) (btcutil.Amount, bool) { + + resp := <-m.moreChansResps + return resp.amt, resp.needMore +} + +func (m *mockHeuristic) Select(self *btcec.PublicKey, graph ChannelGraph, + amtToUse btcutil.Amount, skipChans map[NodeID]struct{}) ([]AttachmentDirective, error) { + + resp := <-m.directiveResps + return resp, nil +} + +var _ AttachmentHeuristic = (*mockHeuristic)(nil) + +type openChanIntent struct { + target *btcec.PublicKey + amt btcutil.Amount + addrs []net.Addr +} + +type mockChanController struct { + openChanSignals chan openChanIntent +} + +func (m *mockChanController) OpenChannel(target *btcec.PublicKey, amt btcutil.Amount, + addrs []net.Addr) error { + + m.openChanSignals <- openChanIntent{ + target: target, + amt: amt, + addrs: addrs, + } + return nil +} + +func (m *mockChanController) CloseChannel(chanPoint *wire.OutPoint) error { + return nil +} +func (m *mockChanController) SpliceIn(chanPoint *wire.OutPoint, + amt btcutil.Amount) (*Channel, error) { + return nil, nil +} +func (m *mockChanController) SpliceOut(chanPoint *wire.OutPoint, + amt btcutil.Amount) (*Channel, error) { + return nil, nil +} + +var _ ChannelController = (*mockChanController)(nil) + +// TestAgentChannelOpenSignal tests that upon receipt of a chanOpenUpdate, then +// agent modifies its local state accordingly, and reconsults the heuristic. +func TestAgentChannelOpenSignal(t *testing.T) { + t.Parallel() + + // First, we'll create all the dependencies that we'll need in order to + // create the autopilot agent. + self, err := randKey() + if err != nil { + t.Fatalf("unable to generate key: %v", err) + } + heuristic := &mockHeuristic{ + moreChansResps: make(chan moreChansResp), + directiveResps: make(chan []AttachmentDirective), + } + chanController := &mockChanController{ + openChanSignals: make(chan openChanIntent, 10), + } + memGraph, _, _ := newMemChanGraph() + + // With the dependencies we created, we can now create the initial + // agent itself. + testCfg := Config{ + Self: self, + Heuristic: heuristic, + ChanController: chanController, + WalletBalance: func() (btcutil.Amount, error) { + return 0, nil + }, + Graph: memGraph, + } + initialChans := []Channel{} + agent, err := New(testCfg, initialChans) + if err != nil { + t.Fatalf("unable to create agent: %v", err) + } + + // With the autopilot agent and all its dependencies we'll star the + // primary controller goroutine. + if err := agent.Start(); err != nil { + t.Fatalf("unable to start agent: %v", err) + } + defer agent.Stop() + + var wg sync.WaitGroup + + // We'll send an initial "no" response to advance the agent past its + // initial check. + wg.Add(1) + go func() { + select { + case heuristic.moreChansResps <- moreChansResp{false, 0}: + wg.Done() + return + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } + }() + + wg.Wait() + + // Next we'll signal a new channel being opened by the backing LN node, + // with a capacity of 1 BTC. + newChan := Channel{ + ChanID: randChanID(), + Capacity: btcutil.SatoshiPerBitcoin, + } + agent.OnChannelOpen(newChan) + + wg = sync.WaitGroup{} + + // The agent should now query the heuristic in order to determine its + // next action as it local state has now been modified. + wg.Add(1) + go func() { + select { + case heuristic.moreChansResps <- moreChansResp{false, 0}: + // At this point, the local state of the agent should + // have also been updated to reflect that the LN node + // now has an additional channel with one BTC. + if _, ok := agent.chanState[newChan.ChanID]; !ok { + t.Fatalf("internal channel state wasn't updated") + } + + // With all of our assertions passed, we'll signal the + // main test goroutine to continue the test. + wg.Done() + return + + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } + }() + + // We'll wait here for either the agent to query the heuristic to be + // queried, or for the timeout above to tick. + wg.Wait() + + // There shouldn't be a call to the Select method as we've returned + // "false" for NeedMoreChans above. + select { + + // If this send success, then Select was erroneously called and the + // test should be failed. + case heuristic.directiveResps <- []AttachmentDirective{}: + t.Fatalf("Select was called but shouldn't have been") + + // This is the correct path as Select should've be called. + default: + } +} + +// TestAgentChannelCloseSignal ensures that once the agent receives an outside +// signal of a channel belonging to the backing LN node being closed, then it +// will query the heuristic to make its next decision. +func TestAgentChannelCloseSignal(t *testing.T) { + t.Parallel() + + // First, we'll create all the dependencies that we'll need in order to + // create the autopilot agent. + self, err := randKey() + if err != nil { + t.Fatalf("unable to generate key: %v", err) + } + heuristic := &mockHeuristic{ + moreChansResps: make(chan moreChansResp), + directiveResps: make(chan []AttachmentDirective), + } + chanController := &mockChanController{ + openChanSignals: make(chan openChanIntent), + } + memGraph, _, _ := newMemChanGraph() + + // With the dependencies we created, we can now create the initial + // agent itself. + testCfg := Config{ + Self: self, + Heuristic: heuristic, + ChanController: chanController, + WalletBalance: func() (btcutil.Amount, error) { + return 0, nil + }, + Graph: memGraph, + } + + // We'll start the agent with two channels already being active. + initialChans := []Channel{ + { + ChanID: randChanID(), + Capacity: btcutil.SatoshiPerBitcoin, + }, + { + ChanID: randChanID(), + Capacity: btcutil.SatoshiPerBitcoin * 2, + }, + } + agent, err := New(testCfg, initialChans) + if err != nil { + t.Fatalf("unable to create agent: %v", err) + } + + // With the autopilot agent and all its dependencies we'll star the + // primary controller goroutine. + if err := agent.Start(); err != nil { + t.Fatalf("unable to start agent: %v", err) + } + defer agent.Stop() + + var wg sync.WaitGroup + + // We'll send an initial "no" response to advance the agent past its + // initial check. + wg.Add(1) + go func() { + select { + case heuristic.moreChansResps <- moreChansResp{false, 0}: + wg.Done() + return + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } + }() + + wg.Wait() + + // Next, we'll close both channels which should force the agent to + // re-query the heuristic. + agent.OnChannelClose(initialChans[0].ChanID, initialChans[1].ChanID) + + wg = sync.WaitGroup{} + + // The agent should now query the heuristic in order to determine its + // next action as it local state has now been modified. + wg.Add(1) + go func() { + select { + case heuristic.moreChansResps <- moreChansResp{false, 0}: + // At this point, the local state of the agent should + // have also been updated to reflect that the LN node + // has no existing open channels. + if len(agent.chanState) != 0 { + t.Fatalf("internal channel state wasn't updated") + } + + // With all of our assertions passed, we'll signal the + // main test goroutine to continue the test. + wg.Done() + return + + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } + }() + + // We'll wait here for either the agent to query the heuristic to be + // queried, or for the timeout above to tick. + wg.Wait() + + // There shouldn't be a call to the Select method as we've returned + // "false" for NeedMoreChans above. + select { + + // If this send success, then Select was erroneously called and the + // test should be failed. + case heuristic.directiveResps <- []AttachmentDirective{}: + t.Fatalf("Select was called but shouldn't have been") + + // This is the correct path as Select should've be called. + default: + } +} + +// TestAgentBalanceUpdateIncrease ensures that once the agent receives an +// outside signal concerning a balance update, then it will re-query the +// heuristic to determine its next action. +func TestAgentBalanceUpdate(t *testing.T) { + t.Parallel() + + // First, we'll create all the dependencies that we'll need in order to + // create the autopilot agent. + self, err := randKey() + if err != nil { + t.Fatalf("unable to generate key: %v", err) + } + heuristic := &mockHeuristic{ + moreChansResps: make(chan moreChansResp), + directiveResps: make(chan []AttachmentDirective), + } + chanController := &mockChanController{ + openChanSignals: make(chan openChanIntent), + } + memGraph, _, _ := newMemChanGraph() + + // The wallet will start with 2 BTC available. + const walletBalance = btcutil.SatoshiPerBitcoin * 2 + + // With the dependencies we created, we can now create the initial + // agent itself. + testCfg := Config{ + Self: self, + Heuristic: heuristic, + ChanController: chanController, + WalletBalance: func() (btcutil.Amount, error) { + return walletBalance, nil + }, + Graph: memGraph, + } + initialChans := []Channel{} + agent, err := New(testCfg, initialChans) + if err != nil { + t.Fatalf("unable to create agent: %v", err) + } + + // With the autopilot agent and all its dependencies we'll star the + // primary controller goroutine. + if err := agent.Start(); err != nil { + t.Fatalf("unable to start agent: %v", err) + } + defer agent.Stop() + + var wg sync.WaitGroup + + // We'll send an initial "no" response to advance the agent past its + // initial check. + wg.Add(1) + go func() { + select { + case heuristic.moreChansResps <- moreChansResp{false, 0}: + wg.Done() + return + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } + }() + + wg.Wait() + + // Next we'll send a new balance update signal to the agent, adding 5 + // BTC to the amount of available funds. + const balanceDelta = btcutil.SatoshiPerBitcoin * 5 + agent.OnBalanceChange(balanceDelta) + + wg = sync.WaitGroup{} + + // The agent should now query the heuristic in order to determine its + // next action as it local state has now been modified. + wg.Add(1) + go func() { + select { + case heuristic.moreChansResps <- moreChansResp{false, 0}: + // At this point, the local state of the agent should + // have also been updated to reflect that the LN node + // now has an additional 5BTC available. + const expectedAmt = walletBalance + balanceDelta + if agent.totalBalance != expectedAmt { + t.Fatalf("expected %v wallet balance "+ + "instead have %v", agent.totalBalance, + expectedAmt) + } + + // With all of our assertions passed, we'll signal the + // main test goroutine to continue the test. + wg.Done() + return + + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } + }() + + // We'll wait here for either the agent to query the heuristic to be + // queried, or for the timeout above to tick. + wg.Wait() + + // There shouldn't be a call to the Select method as we've returned + // "false" for NeedMoreChans above. + select { + + // If this send success, then Select was erroneously called and the + // test should be failed. + case heuristic.directiveResps <- []AttachmentDirective{}: + t.Fatalf("Select was called but shouldn't have been") + + // This is the correct path as Select should've be called. + default: + } +} + +// TestAgentImmediateAttach tests that if an autopilot agent is created, and it +// has enough funds available to create channels, then it does so immediately. +func TestAgentImmediateAttach(t *testing.T) { + t.Parallel() + + // First, we'll create all the dependencies that we'll need in order to + // create the autopilot agent. + self, err := randKey() + if err != nil { + t.Fatalf("unable to generate key: %v", err) + } + heuristic := &mockHeuristic{ + moreChansResps: make(chan moreChansResp), + directiveResps: make(chan []AttachmentDirective), + } + chanController := &mockChanController{ + openChanSignals: make(chan openChanIntent), + } + memGraph, _, _ := newMemChanGraph() + + // The wallet will start with 10 BTC available. + const walletBalance = btcutil.SatoshiPerBitcoin * 10 + + // With the dependencies we created, we can now create the initial + // agent itself. + testCfg := Config{ + Self: self, + Heuristic: heuristic, + ChanController: chanController, + WalletBalance: func() (btcutil.Amount, error) { + return walletBalance, nil + }, + Graph: memGraph, + } + initialChans := []Channel{} + agent, err := New(testCfg, initialChans) + if err != nil { + t.Fatalf("unable to create agent: %v", err) + } + + // With the autopilot agent and all its dependencies we'll star the + // primary controller goroutine. + if err := agent.Start(); err != nil { + t.Fatalf("unable to start agent: %v", err) + } + defer agent.Stop() + + var wg sync.WaitGroup + + // The very first thing the agent should do is query the NeedMoreChans + // method on the passed heuristic. So we'll provide it with a response + // that will kick off the main loop. + wg.Add(1) + go func() { + select { + + // We'll send over a response indicating that it should + // establish more channels, and give it a budget of 5 BTC to do + // so. + case heuristic.moreChansResps <- moreChansResp{true, 5 * btcutil.SatoshiPerBitcoin}: + wg.Done() + return + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } + }() + + // We'll wait here for the agent to query the heuristic. If ti doesn't + // do so within 10 seconds, then the test will fail out. + wg.Wait() + + // At this point, the agent should now be querying the heuristic to + // requests attachment directives. We'll generate 5 mock directives so + // it can progress within its loop. + const numChans = 5 + directives := make([]AttachmentDirective, numChans) + for i := 0; i < numChans; i++ { + directives[i] = AttachmentDirective{ + PeerKey: self, + ChanAmt: btcutil.SatoshiPerBitcoin, + Addrs: []net.Addr{ + &net.TCPAddr{ + IP: bytes.Repeat([]byte("a"), 16), + }, + }, + } + } + + wg = sync.WaitGroup{} + + // With our fake directives created, we'll now send then to the agent + // as a return value for the Select function. + wg.Add(1) + go func() { + select { + case heuristic.directiveResps <- directives: + wg.Done() + return + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } + }() + + // We'll wait here for either the agent to query the heuristic to be + // queried, or for the timeout above to tick. + wg.Wait() + + // Finally, we should receive 5 calls to the OpenChannel method with + // the exact same parameters that we specified within the attachment + // directives. + for i := 0; i < numChans; i++ { + select { + case openChan := <-chanController.openChanSignals: + if openChan.amt != btcutil.SatoshiPerBitcoin { + t.Fatalf("invalid chan amt: expected %v, got %v", + btcutil.SatoshiPerBitcoin, openChan.amt) + } + if !openChan.target.IsEqual(self) { + t.Fatalf("unexpected key: expected %x, got %x", + self.SerializeCompressed(), + openChan.target.SerializeCompressed()) + } + if len(openChan.addrs) != 1 { + t.Fatalf("should have single addr, instead have: %v", + len(openChan.addrs)) + } + case <-time.After(time.Second * 10): + t.Fatalf("channel not opened in time") + } + } +} diff --git a/autopilot/graph.go b/autopilot/graph.go new file mode 100644 index 00000000..27dae73d --- /dev/null +++ b/autopilot/graph.go @@ -0,0 +1,425 @@ +package autopilot + +import ( + "bytes" + "math/big" + prand "math/rand" + "net" + "time" + + "github.com/boltdb/bolt" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/roasbeef/btcd/btcec" + "github.com/roasbeef/btcutil" +) + +var ( + testSig = &btcec.Signature{ + R: new(big.Int), + S: new(big.Int), + } + _, _ = testSig.R.SetString("63724406601629180062774974542967536251589935445068131219452686511677818569431", 10) + _, _ = testSig.S.SetString("18801056069249825825291287104931333862866033135609736119018462340006816851118", 10) +) + +// databaseChannelGraph wraps a channeldb.ChannelGraph instance with the +// necessary API to properly implement the autopilot.ChannelGraph interface. +// +// TODO(roasbeef): move inmpl to main package? +type databaseChannelGraph struct { + db *channeldb.ChannelGraph +} + +// A compile time assertion to ensure databaseChannelGraph meets the +// autopilot.ChannelGraph interface. +var _ ChannelGraph = (*databaseChannelGraph)(nil) + +// ChannelGraphFromDatabase returns a instance of the autopilot.ChannelGraph +// backed by a live, open channeldb instance. +func ChannelGraphFromDatabase(db *channeldb.ChannelGraph) ChannelGraph { + return &databaseChannelGraph{ + db: db, + } +} + +// type dbNode is a wrapper struct around a database transaction an +// channeldb.LightningNode. The wrapper method implement the autopilot.Node +// interface. +type dbNode struct { + tx *bolt.Tx + + node *channeldb.LightningNode +} + +// A compile time assertion to ensure dbNode meets the autopilot.Node +// interface. +var _ Node = (*dbNode)(nil) + +// PubKey is the identity public key of the node. This will be used to attempt +// to target a node for channel opening by the main autopilot agent. +// +// NOTE: Part of the autopilot.Node interface. +func (d dbNode) PubKey() *btcec.PublicKey { + return d.node.PubKey +} + +// Addrs returns a slice of publicly reachable public TCP addresses that the +// peer is known to be listening on. +// +// NOTE: Part of the autopilot.Node interface. +func (d dbNode) Addrs() []net.Addr { + return d.node.Addresses +} + +// ForEachChannel is a higher-order function that will be used to iterate +// through all edges emanating from/to the target node. For each active +// channel, this function should be called with the populated ChannelEdge that +// describes the active channel. +// +// NOTE: Part of the autopilot.Node interface. +func (d dbNode) ForEachChannel(cb func(ChannelEdge) error) error { + return d.node.ForEachChannel(d.tx, func(tx *bolt.Tx, + ei *channeldb.ChannelEdgeInfo, ep *channeldb.ChannelEdgePolicy) error { + + edge := ChannelEdge{ + Channel: Channel{ + ChanID: lnwire.NewShortChanIDFromInt(ep.ChannelID), + Capacity: ei.Capacity, + FundedAmt: ei.Capacity, + Node: NewNodeID(ep.Node.PubKey), + }, + Peer: dbNode{ + tx: tx, + node: ep.Node, + }, + } + + return cb(edge) + }) +} + +// ForEachNode is a higher-order function that should be called once for each +// connected node within the channel graph. If the passed callback returns an +// error, then execution should be terminated. +// +// NOTE: Part of the autopilot.ChannelGraph interface. +func (d *databaseChannelGraph) ForEachNode(cb func(Node) error) error { + return d.db.ForEachNode(nil, func(tx *bolt.Tx, n *channeldb.LightningNode) error { + + // We'll skip over any node that doesn't have any advertised + // addresses. As we won't be able to reach them to actually + // open any channels. + if len(n.Addresses) == 0 { + log.Tracef("Skipping unreachable node %x", + n.PubKey.SerializeCompressed()) + return nil + } + + node := dbNode{ + tx: tx, + node: n, + } + return cb(node) + }) +} + +// addRandChannel creates a new channel two target nodes. This function is +// meant to aide in the generation of random graphs for use within test cases +// the exercise the autopilot package. +func (d *databaseChannelGraph) addRandChannel(node1, node2 *btcec.PublicKey, + capacity btcutil.Amount) (*ChannelEdge, *ChannelEdge, error) { + + fetchNode := func(pub *btcec.PublicKey) (*channeldb.LightningNode, error) { + if pub != nil { + dbNode, err := d.db.FetchLightningNode(pub) + switch { + case err == channeldb.ErrGraphNodeNotFound: + fallthrough + case err == channeldb.ErrGraphNotFound: + graphNode := &channeldb.LightningNode{ + PubKey: pub, + HaveNodeAnnouncement: true, + Addresses: []net.Addr{ + &net.TCPAddr{ + IP: bytes.Repeat([]byte("a"), 16), + }, + }, + Features: lnwire.NewFeatureVector(nil), + AuthSig: testSig, + } + if err := d.db.AddLightningNode(graphNode); err != nil { + return nil, err + } + case err != nil: + return nil, err + } + + return dbNode, nil + } + + nodeKey, err := randKey() + if err != nil { + return nil, err + } + dbNode := &channeldb.LightningNode{ + PubKey: nodeKey, + HaveNodeAnnouncement: true, + Addresses: []net.Addr{ + &net.TCPAddr{ + IP: bytes.Repeat([]byte("a"), 16), + }, + }, + Features: lnwire.NewFeatureVector(nil), + AuthSig: testSig, + } + if err := d.db.AddLightningNode(dbNode); err != nil { + return nil, err + } + + return dbNode, nil + } + + vertex1, err := fetchNode(node1) + if err != nil { + return nil, nil, err + } + + vertex2, err := fetchNode(node2) + if err != nil { + return nil, nil, err + } + + var lnNode1, lnNode2 *btcec.PublicKey + node1Bytes := vertex1.PubKey.SerializeCompressed() + node2Bytes := vertex2.PubKey.SerializeCompressed() + if bytes.Compare(node1Bytes, node2Bytes) == -1 { + lnNode1 = vertex1.PubKey + lnNode2 = vertex2.PubKey + } else { + lnNode1 = vertex2.PubKey + lnNode2 = vertex1.PubKey + } + + chanID := randChanID() + edge := &channeldb.ChannelEdgeInfo{ + ChannelID: chanID.ToUint64(), + NodeKey1: lnNode1, + NodeKey2: lnNode2, + BitcoinKey1: vertex1.PubKey, + BitcoinKey2: vertex2.PubKey, + Capacity: capacity, + } + if err := d.db.AddChannelEdge(edge); err != nil { + return nil, nil, err + } + edgePolicy := &channeldb.ChannelEdgePolicy{ + Signature: testSig, + ChannelID: chanID.ToUint64(), + LastUpdate: time.Now(), + TimeLockDelta: 10, + MinHTLC: btcutil.Amount(1), + FeeBaseMSat: btcutil.Amount(10), + FeeProportionalMillionths: btcutil.Amount(10000), + Flags: 0, + } + + if err := d.db.UpdateEdgePolicy(edgePolicy); err != nil { + return nil, nil, err + } + edgePolicy = &channeldb.ChannelEdgePolicy{ + Signature: testSig, + ChannelID: chanID.ToUint64(), + LastUpdate: time.Now(), + TimeLockDelta: 10, + MinHTLC: btcutil.Amount(1), + FeeBaseMSat: btcutil.Amount(10), + FeeProportionalMillionths: btcutil.Amount(10000), + Flags: 1, + } + if err := d.db.UpdateEdgePolicy(edgePolicy); err != nil { + return nil, nil, err + } + + return &ChannelEdge{ + Channel: Channel{ + ChanID: chanID, + Capacity: capacity, + }, + Peer: dbNode{ + node: vertex1, + }, + }, + &ChannelEdge{ + Channel: Channel{ + ChanID: chanID, + Capacity: capacity, + }, + Peer: dbNode{ + node: vertex2, + }, + }, + nil +} + +// memChannelGraph is a implementation of the autopilot.ChannelGraph backed by +// an in-memory graph. +type memChannelGraph struct { + graph map[NodeID]memNode +} + +// A compile time assertion to ensure memChannelGraph meets the +// autopilot.ChannelGraph interface. +var _ ChannelGraph = (*memChannelGraph)(nil) + +// newMemChannelGraph creates a new blank in-memory channel graph +// implementation. +func newMemChannelGraph() *memChannelGraph { + return &memChannelGraph{ + graph: make(map[NodeID]memNode), + } +} + +// ForEachNode is a higher-order function that should be called once for each +// connected node within the channel graph. If the passed callback returns an +// error, then execution should be terminated. +// +// NOTE: Part of the autopilot.ChannelGraph interface. +func (m memChannelGraph) ForEachNode(cb func(Node) error) error { + for _, node := range m.graph { + if err := cb(node); err != nil { + return err + } + } + + return nil +} + +// randChanID generates a new random channel ID. +func randChanID() lnwire.ShortChannelID { + return lnwire.NewShortChanIDFromInt(uint64(prand.Int63())) +} + +// randKey returns a random public key. +func randKey() (*btcec.PublicKey, error) { + priv, err := btcec.NewPrivateKey(btcec.S256()) + if err != nil { + return nil, err + } + + return priv.PubKey(), nil +} + +// addRandChannel creates a new channel two target nodes. This function is +// meant to aide in the generation of random graphs for use within test cases +// the exercise the autopilot package. +func (m *memChannelGraph) addRandChannel(node1, node2 *btcec.PublicKey, + capacity btcutil.Amount) (*ChannelEdge, *ChannelEdge, error) { + + var ( + vertex1, vertex2 memNode + ok bool + ) + + if node1 != nil { + vertex1, ok = m.graph[NewNodeID(node1)] + if !ok { + vertex1 = memNode{ + pub: node1, + } + } + } else { + newPub, err := randKey() + if err != nil { + return nil, nil, err + } + vertex1 = memNode{ + pub: newPub, + } + } + + if node2 != nil { + vertex2, ok = m.graph[NewNodeID(node2)] + if !ok { + vertex2 = memNode{ + pub: node2, + } + } + } else { + newPub, err := randKey() + if err != nil { + return nil, nil, err + } + vertex2 = memNode{ + pub: newPub, + } + } + + channel := Channel{ + ChanID: randChanID(), + Capacity: capacity, + } + + edge1 := ChannelEdge{ + Channel: channel, + Peer: vertex2, + } + vertex1.chans = append(vertex1.chans, edge1) + + edge2 := ChannelEdge{ + Channel: channel, + Peer: vertex1, + } + vertex2.chans = append(vertex2.chans, edge2) + + m.graph[NewNodeID(vertex1.pub)] = vertex1 + m.graph[NewNodeID(vertex2.pub)] = vertex2 + + return &edge1, &edge2, nil +} + +// memNode is a purely in-memory implementation of the autopilot.Node +// interface. +type memNode struct { + pub *btcec.PublicKey + + chans []ChannelEdge + + addrs []net.Addr +} + +// A compile time assertion to ensure memNode meets the autopilot.Node +// interface. +var _ Node = (*memNode)(nil) + +// PubKey is the identity public key of the node. This will be used to attempt +// to target a node for channel opening by the main autopilot agent. +// +// NOTE: Part of the autopilot.Node interface. +func (m memNode) PubKey() *btcec.PublicKey { + return m.pub +} + +// Addrs returns a slice of publicly reachable public TCP addresses that the +// peer is known to be listening on. +// +// NOTE: Part of the autopilot.Node interface. +func (m memNode) Addrs() []net.Addr { + return m.addrs +} + +// ForEachChannel is a higher-order function that will be used to iterate +// through all edges emanating from/to the target node. For each active +// channel, this function should be called with the populated ChannelEdge that +// describes the active channel. +// +// NOTE: Part of the autopilot.Node interface. +func (m memNode) ForEachChannel(cb func(ChannelEdge) error) error { + for _, channel := range m.chans { + if err := cb(channel); err != nil { + return err + } + } + + return nil +} diff --git a/autopilot/interface.go b/autopilot/interface.go new file mode 100644 index 00000000..5fb75d95 --- /dev/null +++ b/autopilot/interface.go @@ -0,0 +1,151 @@ +package autopilot + +import ( + "net" + + "github.com/lightningnetwork/lnd/lnwire" + "github.com/roasbeef/btcd/btcec" + "github.com/roasbeef/btcd/wire" + "github.com/roasbeef/btcutil" +) + +// Node node is an interface which represents n abstract vertex within the +// channel graph. All nodes should have at least a single edge to/from them +// within the graph. +// +// TODO(roasbeef): combine with routing.ChannelGraphSource +type Node interface { + // PubKey is the identity public key of the node. This will be used to + // attempt to target a node for channel opening by the main autopilot + // agent. + PubKey() *btcec.PublicKey + + // Addrs returns a slice of publicly reachable public TCP addresses + // that the peer is known to be listening on. + Addrs() []net.Addr + + // ForEachChannel is a higher-order function that will be used to + // iterate through all edges emanating from/to the target node. For + // each active channel, this function should be called with the + // populated ChannelEdge that describes the active channel. + ForEachChannel(func(ChannelEdge) error) error +} + +// Channel is a simple struct which contains relevant details of a particular +// channel within the channel graph. The fields in this struct may be used a +// signals for various AttachmentHeuristic implementations. +type Channel struct { + // ChanID is the short channel ID for this channel as defined within + // BOLT-0007. + ChanID lnwire.ShortChannelID + + // Capacity is the capacity of the channel expressed in satoshis. + Capacity btcutil.Amount + + // FundedAmt is the amount the local node funded into the target + // channel. + // + // TODO(roasbeef): need this? + FundedAmt btcutil.Amount + + // Node is the peer that this channel has been established with. + Node NodeID + + // TODO(roasbeef): also add other traits? + // * fee, timelock, etc +} + +// ChannelEdge is a struct that holds details concerning a channel, but also +// contains a reference to the Node that this channel connects to as a directed +// edge witihn the graph. The existence of this reference to the connected node +// will allow callers to traverse the graph in an object-oriented manner. +type ChannelEdge struct { + // Channel contains the attributes of this channel. + Channel + + // Peer is the peer that this channel creates an edge to in the channel + // graph. + Peer Node +} + +// ChannelGraph in an interface that represents a traversable channel graph. +// The autopilot agent will use this interface as its source of graph traits in +// order to make decisions concerning which channels should be opened, and to +// whom. +// +// TODO(roasbeef): abstract?? +type ChannelGraph interface { + // ForEachNode is a higher-order function that should be called once + // for each connected node within the channel graph. If the passed + // callback returns an error, then execution should be terminated. + ForEachNode(func(Node) error) error +} + +// AttachmentDirective describes a channel attachment proscribed by an +// AttachmentHeuristic. It details to which node a channel should be created +// to, and also the parameters which should be used in the channel creation. +type AttachmentDirective struct { + // PeerKey is the target node for this attachment directive. It can be + // identified by it's public key, and therefore can be used along with + // a ChannelOpener implementation to execute the directive. + PeerKey *btcec.PublicKey + + // ChanAmt is the size of the channel that should be opened, expressed + // in satoshis. + ChanAmt btcutil.Amount + + // Addrs is a list of addresses that the target peer may be reachable + // at. + Addrs []net.Addr +} + +// AttachmentHeuristic is one of the primary interfaces within this package. +// Implementations of this interface will be used to implement a control system +// which automatically regulates channels of a particular agent, attempting to +// optimize channels opened/closed based on various heuristics. The purpose of +// the interface is to allow an auto-pilot agent to decide if it needs more +// channels, and if so, which exact channels should be opened. +type AttachmentHeuristic interface { + // NeedMoreChans is a predicate that should return true if, given the + // passed parameters, and its internal state, more channels should be + // opened within the channel graph. If the heuristic decides that we do + // indeed need more channels, then the second argument returned will + // represent the amount of additional funds to be used towards creating + // channels. + NeedMoreChans(chans []Channel, balance btcutil.Amount) (btcutil.Amount, bool) + + // Select is a method that given the current state of the channel + // graph, a set of nodes to ignore, and an amount of available funds, + // should return a set of attachment directives which describe which + // additional channels should be opened within the graph to push the + // heuristic back towards its equilibrium state. + Select(self *btcec.PublicKey, graph ChannelGraph, amtToUse btcutil.Amount, + skipNodes map[NodeID]struct{}) ([]AttachmentDirective, error) +} + +// ChannelController is a simple interface that allows an auto-pilot agent to +// open a channel within the graph to a target peer, close targeted channels, +// or add/remove funds from existing channels via a splice in/out mechanisms. +type ChannelController interface { + // OpenChannel opens a channel to a target peer, with a capacity of the + // specified amount. This function should un-block immediately after + // the funding transaction that marks the channel open has been + // broadcast. + OpenChannel(target *btcec.PublicKey, amt btcutil.Amount, + addrs []net.Addr) error + + // CloseChannel attempts to close out the target channel. + // + // TODO(roasbeef): add force option? + CloseChannel(chanPoint *wire.OutPoint) error + + // SpliceIn attempts to add additional funds to the target channel via + // a splice in mechanism. The new channel with an updated capacity + // should be returned. + SpliceIn(chanPoint *wire.OutPoint, amt btcutil.Amount) (*Channel, error) + + // SpliceOut attempts to remove funds from an existing channels using a + // splice out mechanism. The removed funds from the channel should be + // returned to an output under the control of the backing wallet. + SpliceOut(chanPoint *wire.OutPoint, amt btcutil.Amount) (*Channel, error) +} diff --git a/autopilot/log.go b/autopilot/log.go new file mode 100644 index 00000000..70f49947 --- /dev/null +++ b/autopilot/log.go @@ -0,0 +1,26 @@ +package autopilot + +import "github.com/btcsuite/btclog" + +// log is a logger that is initialized with no output filters. This +// means the package will not perform any logging by default until the caller +// requests it. +var log btclog.Logger + +// The default amount of logging is none. +func init() { + DisableLog() +} + +// DisableLog disables all library log output. Logging output is disabled +// by default until UseLogger is called. +func DisableLog() { + log = btclog.Disabled +} + +// UseLogger uses a specified Logger to output package logging info. +// This should be used in preference to SetLogWriter if the caller is also +// using btclog. +func UseLogger(logger btclog.Logger) { + log = logger +} diff --git a/autopilot/prefattach.go b/autopilot/prefattach.go new file mode 100644 index 00000000..50f9d227 --- /dev/null +++ b/autopilot/prefattach.go @@ -0,0 +1,271 @@ +package autopilot + +import ( + "fmt" + prand "math/rand" + "time" + + "github.com/roasbeef/btcd/btcec" + "github.com/roasbeef/btcutil" +) + +// ConstrainedPrefAttachment is an implementation of the AttachmentHeuristic +// interface that implement a constrained non-linear preferential attachment +// heuristic. This means that given a threshold to allocate to automatic +// channel establishment, the heuristic will attempt to favor connecting to +// nodes which already have a set amount of links, selected by sampling from a +// power law distribution. The attachment ins non-linear in that it favors +// nodes with a higher in-degree but less so that regular linear preferential +// attachment. As a result, this creates smaller and less clusters than regular +// linear preferential attachment. +// +// TODO(roasbeef): BA, with k=-3 +type ConstrainedPrefAttachment struct { + minChanSize btcutil.Amount + maxChanSize btcutil.Amount + + chanLimit uint16 + + threshold float64 +} + +// NewPrefAttchment creates a new instance of a ConstrainedPrefAttachment +// heuristics given bounds on allowed channel sizes, and an allocation amount +// which is interpreted as a percentage of funds that is to be committed to +// channels at all times. +func NewConstrainedPrefAttachment(minChanSize, maxChanSize btcutil.Amount, + chanLimit uint16, allocation float64) *ConstrainedPrefAttachment { + + prand.Seed(time.Now().Unix()) + + return &ConstrainedPrefAttachment{ + minChanSize: minChanSize, + chanLimit: chanLimit, + maxChanSize: maxChanSize, + threshold: allocation, + } +} + +// A compile time assertion to ensure ConstrainedPrefAttachment meets the +// AttachmentHeuristic interface. +var _ AttachmentHeuristic = (*ConstrainedPrefAttachment)(nil) + +// NeedMoreChans is a predicate that should return true if, given the passed +// parameters, and its internal state, more channels should be opened within +// the channel graph. If the heuristic decides that we do indeed need more +// channels, then the second argument returned will represent the amount of +// additional funds to be used towards creating channels. +// +// NOTE: This is a part of the AttachmentHeuristic interface. +func (p *ConstrainedPrefAttachment) NeedMoreChans(channels []Channel, + funds btcutil.Amount) (btcutil.Amount, bool) { + + // If we're already over our maximum allowed number of channels, then + // we'll instruct the controller not to create any more channels. + if len(channels) >= int(p.chanLimit) { + return 0, false + } + + // First, we'll tally up the total amount of funds that are currently + // present within the set of active channels. + var totalChanAllocation btcutil.Amount + for _, channel := range channels { + totalChanAllocation += channel.Capacity + } + + // With this value known, we'll now compute the total amount of fund + // allocated across regular utxo's and channel utxo's. + totalFunds := funds + totalChanAllocation + + // Once the total amount has been computed, we then calculate the + // fraction of funds currently allocated to channels. + fundsFraction := float64(totalChanAllocation) / float64(totalFunds) + + // If this fraction is below our threshold, then we'll return true, to + // indicate the controller should call Select to obtain a candidate set + // of channels to attempt to open. + needMore := fundsFraction < p.threshold + if !needMore { + return 0, false + } + + // Now that we know we need more funds, we'll compute the amount of + // additional funds we should allocate towards channels. + targetAllocation := btcutil.Amount(float64(totalFunds) * p.threshold) + fundsAvailable := targetAllocation - totalChanAllocation + return fundsAvailable, true +} + +// nodeID is a simple type that holds a EC public key serialized in compressed +// format. +type NodeID [33]byte + +// NewNodeID creates a new nodeID from a passed public key. +func NewNodeID(pub *btcec.PublicKey) NodeID { + var n NodeID + copy(n[:], pub.SerializeCompressed()) + return n +} + +// Select returns a candidate set of attachment directives that should be +// executed based on the current internal state, the state of the channel +// graph, the set of nodes we should exclude, and the amount of funds +// available. The heuristic employed by this method is one that attempts to +// promote a scale-free network globally, via local attachment preferences for +// new nodes joining the network with an amount of available funds to be +// allocated to channels. Specifically, we consider the degree of each node +// (and the flow in/out of the node available via its open channels) and +// utilize the Barabási–Albert model to drive our recommended attachment +// heuristics. If implemented globally for each new participant, this results +// in a channel graph that is scale-free and follows a power law distribution +// with k=-3. +// +// NOTE: This is a part of the AttachmentHeuristic interface. +func (p *ConstrainedPrefAttachment) Select(self *btcec.PublicKey, g ChannelGraph, + fundsAvailable btcutil.Amount, skipNodes map[NodeID]struct{}) ([]AttachmentDirective, error) { + + // TODO(roasbeef): rename? + + var directives []AttachmentDirective + + if fundsAvailable < p.minChanSize { + return directives, nil + } + + // We'll continue our attachment loop until we've exhausted the current + // amount of available funds. + visited := make(map[NodeID]struct{}) + for i := uint16(0); i < p.chanLimit; i++ { + // selectionSlice will be used to randomly select a node + // according to a power law distribution. For each connected + // edge, we'll add an instance of the node to this slice. Thus, + // for a given node, the probability that we'll attach to it + // is: k_i / sum(k_j), where k_i is the degree of the target + // node, and k_j is the degree of all other nodes i != j. This + // implements the classic Barabási–Albert model for + // preferential attachment. + var selectionSlice []Node + + // For each node, and each channel that the node has, we'll add + // an instance of that node to the selection slice above. + // This'll slice where the frequency of each node is equivalent + // to the number of channels that connect to it. + // + // TODO(roasbeef): add noise to make adversarially resistant? + if err := g.ForEachNode(func(node Node) error { + nID := NewNodeID(node.PubKey()) + + // Once a node has already been attached to, we'll + // ensure that it isn't factored into any further + // decisions within this round. + if _, ok := visited[nID]; ok { + return nil + } + + // If we come across ourselves, them we'll continue in + // order to avoid attempting to make a channel with + // ourselves. + if node.PubKey().IsEqual(self) { + return nil + } + + // Additionally, if this node is in the backlist, then + // we'll skip it. + if _, ok := skipNodes[nID]; ok { + return nil + } + + // For initial bootstrap purposes, if a node doesn't + // have any channels, then we'll ensure that it has at + // least one item in the selection slice. + // + // TODO(roasbeef): make conditional? + selectionSlice = append(selectionSlice, node) + + // For each active channel the node has, we'll add an + // additional channel to the selection slice to + // increase their weight. + if err := node.ForEachChannel(func(channel ChannelEdge) error { + selectionSlice = append(selectionSlice, node) + return nil + }); err != nil { + return err + } + + return nil + }); err != nil { + return nil, err + } + + // If no nodes at all were accumulated, then we'll exit early + // as there are no eligible candidates. + if len(selectionSlice) == 0 { + break + } + + // Given our selection slice, we'll now generate a random index + // into this slice. The node we select will be recommended by + // us to create a channel to. + selectedIndex := prand.Int31n(int32(len(selectionSlice))) + selectedNode := selectionSlice[selectedIndex] + + // TODO(roasbeef): cap on num channels to same participant? + + // With the node selected, we'll add this (node, amount) tuple + // to out set of recommended directives. + pub := selectedNode.PubKey() + directives = append(directives, AttachmentDirective{ + // TODO(roasbeef): need curve? + PeerKey: &btcec.PublicKey{ + X: pub.X, + Y: pub.Y, + }, + Addrs: selectedNode.Addrs(), + }) + + // With the node selected, we'll add it to the set of visited + // nodes to avoid attaching to it again. + visited[NewNodeID(selectedNode.PubKey())] = struct{}{} + } + + numSelectedNodes := int64(len(directives)) + switch { + // If we have enough available funds to distribute the maximum channel + // size for each of the selected peers to attach to, then we'll + // allocate the maximum amount to each peer. + case int64(fundsAvailable) >= numSelectedNodes*int64(p.maxChanSize): + for i := 0; i < int(numSelectedNodes); i++ { + directives[i].ChanAmt = p.maxChanSize + } + + return directives, nil + + // Otherwise, we'll greedily allocate our funds to the channels + // successively until we run out of available funds, or can't create a + // channel above the min channel size. + case int64(fundsAvailable) < numSelectedNodes*int64(p.maxChanSize): + i := 0 + for fundsAvailable > p.minChanSize { + // We'll attempt to allocate the max channel size + // initially. If we don't have enough funds to do this, + // then we'll allocate the remainder of the funds + // available to the channel. + delta := p.maxChanSize + if fundsAvailable-delta < 0 { + delta = fundsAvailable + } + + directives[i].ChanAmt = delta + + fundsAvailable -= delta + i++ + } + + // We'll slice the initial set of directives to properly + // reflect the amount of funds we were able to allocate. + return directives[:i:i], nil + + default: + return nil, fmt.Errorf("err") + } +} diff --git a/autopilot/prefattach_test.go b/autopilot/prefattach_test.go new file mode 100644 index 00000000..611b3bc6 --- /dev/null +++ b/autopilot/prefattach_test.go @@ -0,0 +1,635 @@ +package autopilot + +import ( + "io/ioutil" + "os" + "testing" + "time" + + prand "math/rand" + + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/roasbeef/btcd/btcec" + "github.com/roasbeef/btcutil" +) + +func TestConstrainedPrefAttachmentNeedMoreChan(t *testing.T) { + t.Parallel() + + prand.Seed(time.Now().Unix()) + + const ( + minChanSize = 0 + maxChanSize = btcutil.Amount(btcutil.SatoshiPerBitcoin) + + chanLimit = 3 + + threshold = 0.5 + ) + + randChanID := func() lnwire.ShortChannelID { + return lnwire.NewShortChanIDFromInt(uint64(prand.Int63())) + } + + testCases := []struct { + channels []Channel + walletAmt btcutil.Amount + + needMore bool + amtAvailable btcutil.Amount + }{ + // Many available funds, but already have too many active open + // channels. + { + []Channel{ + { + ChanID: randChanID(), + Capacity: btcutil.Amount(prand.Int31()), + }, + { + ChanID: randChanID(), + Capacity: btcutil.Amount(prand.Int31()), + }, + { + ChanID: randChanID(), + Capacity: btcutil.Amount(prand.Int31()), + }, + }, + btcutil.Amount(btcutil.SatoshiPerBitcoin * 10), + false, + 0, + }, + + // Ratio of funds in channels and total funds meets the + // threshold. + { + []Channel{ + { + ChanID: randChanID(), + Capacity: btcutil.Amount(btcutil.SatoshiPerBitcoin), + }, + { + ChanID: randChanID(), + Capacity: btcutil.Amount(btcutil.SatoshiPerBitcoin), + }, + }, + btcutil.Amount(btcutil.SatoshiPerBitcoin * 2), + false, + 0, + }, + + // Ratio of funds in channels and total funds is below the + // threshold. We have 10 BTC allocated amongst channels and + // funds, atm. We're targeting 50%, so 5 BTC should be + // allocated. Only 1 BTC is atm, so 4 BTC should be + // recommended. + { + []Channel{ + { + ChanID: randChanID(), + Capacity: btcutil.Amount(btcutil.SatoshiPerBitcoin), + }, + }, + btcutil.Amount(btcutil.SatoshiPerBitcoin * 9), + true, + btcutil.Amount(btcutil.SatoshiPerBitcoin * 4), + }, + + // Ratio of funds in channels and total funds is below the + // threshold. We have 14 BTC total amongst the wallet's + // balance, and our currently opened channels. Since we're + // targeting a 50% allocation, we should commit 7 BTC. The + // current channels commit 4 BTC, so we should expected 3 bTC + // to be committed. + { + []Channel{ + { + ChanID: randChanID(), + Capacity: btcutil.Amount(btcutil.SatoshiPerBitcoin), + }, + { + ChanID: randChanID(), + Capacity: btcutil.Amount(btcutil.SatoshiPerBitcoin * 3), + }, + }, + btcutil.Amount(btcutil.SatoshiPerBitcoin * 10), + true, + btcutil.Amount(btcutil.SatoshiPerBitcoin * 3), + }, + + // Ratio of funds in channels and total funds is above the + // threshold. + { + []Channel{ + { + ChanID: randChanID(), + Capacity: btcutil.Amount(btcutil.SatoshiPerBitcoin), + }, + { + ChanID: randChanID(), + Capacity: btcutil.Amount(btcutil.SatoshiPerBitcoin), + }, + }, + btcutil.Amount(btcutil.SatoshiPerBitcoin), + false, + 0, + }, + } + + prefAttatch := NewConstrainedPrefAttachment(minChanSize, maxChanSize, + chanLimit, threshold) + + for i, testCase := range testCases { + amtToAllocate, needMore := prefAttatch.NeedMoreChans(testCase.channels, + testCase.walletAmt) + + if amtToAllocate != testCase.amtAvailable { + t.Fatalf("test #%v: expected %v, got %v", + i, testCase.amtAvailable, amtToAllocate) + } + if needMore != testCase.needMore { + t.Fatalf("test #%v: expected %v, got %v", + i, testCase.needMore, needMore) + } + } +} + +type genGraphFunc func() (testGraph, func(), error) + +type testGraph interface { + ChannelGraph + + addRandChannel(*btcec.PublicKey, *btcec.PublicKey, + btcutil.Amount) (*ChannelEdge, *ChannelEdge, error) +} + +func newDiskChanGraph() (testGraph, func(), error) { + // First, create a temporary directory to be used for the duration of + // this test. + tempDirName, err := ioutil.TempDir("", "channeldb") + if err != nil { + return nil, nil, err + } + + // Next, create channeldb for the first time. + cdb, err := channeldb.Open(tempDirName) + if err != nil { + return nil, nil, err + } + + cleanUp := func() { + cdb.Close() + os.RemoveAll(tempDirName) + } + + return &databaseChannelGraph{ + db: cdb.ChannelGraph(), + }, cleanUp, nil +} + +var _ testGraph = (*databaseChannelGraph)(nil) + +func newMemChanGraph() (testGraph, func(), error) { + return newMemChannelGraph(), nil, nil +} + +var _ testGraph = (*memChannelGraph)(nil) + +var chanGraphs = []struct { + name string + genFunc genGraphFunc +}{ + { + name: "disk_graph", + genFunc: newDiskChanGraph, + }, + { + name: "mem_graph", + genFunc: newMemChanGraph, + }, +} + +// TestConstrainedPrefAttachmentSelectEmptyGraph ensures that when passed en +// empty graph, the Select function always detects the state, and returns nil. +// Otherwise, it would be possible for the main Select loop to entire an +// infinite loop. +func TestConstrainedPrefAttachmentSelectEmptyGraph(t *testing.T) { + const ( + minChanSize = 0 + maxChanSize = btcutil.Amount(btcutil.SatoshiPerBitcoin) + chanLimit = 3 + threshold = 0.5 + ) + + // First, we'll generate a random key that represents "us", and create + // a new instance of the heuristic with our set parameters. + self, err := randKey() + if err != nil { + t.Fatalf("unable to generate self key: %v", err) + } + prefAttatch := NewConstrainedPrefAttachment(minChanSize, maxChanSize, + chanLimit, threshold) + + skipNodes := make(map[NodeID]struct{}) + for _, graph := range chanGraphs { + success := t.Run(graph.name, func(t1 *testing.T) { + graph, cleanup, err := graph.genFunc() + if err != nil { + t1.Fatalf("unable to create graph: %v", err) + } + if cleanup != nil { + defer cleanup() + } + + // With the necessary state initialized, we'll not + // attempt to select a set of candidates channel for + // creation given the current state of the graph. + const walletFunds = btcutil.SatoshiPerBitcoin + directives, err := prefAttatch.Select(self, graph, + walletFunds, skipNodes) + if err != nil { + t1.Fatalf("unable to select attachment "+ + "directives: %v", err) + } + + // We shouldn't have selected any new directives as we + // started with an empty graph. + if len(directives) != 0 { + t1.Fatalf("zero attachment directives "+ + "should've been returned instead %v were", + len(directives)) + } + }) + if !success { + break + } + } +} + +// TestConstrainedPrefAttachmentSelectTwoVertexes ensures that when passed a +// graph with only two eligible vertexes, then both are selected (without any +// repeats), and the funds are appropriately allocated across each peer. +func TestConstrainedPrefAttachmentSelectTwoVertexes(t *testing.T) { + t.Parallel() + + prand.Seed(time.Now().Unix()) + + const ( + minChanSize = 0 + maxChanSize = btcutil.Amount(btcutil.SatoshiPerBitcoin) + chanLimit = 3 + threshold = 0.5 + ) + + skipNodes := make(map[NodeID]struct{}) + for _, graph := range chanGraphs { + success := t.Run(graph.name, func(t1 *testing.T) { + graph, cleanup, err := graph.genFunc() + if err != nil { + t1.Fatalf("unable to create graph: %v", err) + } + if cleanup != nil { + defer cleanup() + } + + // First, we'll generate a random key that represents + // "us", and create a new instance of the heuristic + // with our set parameters. + self, err := randKey() + if err != nil { + t1.Fatalf("unable to generate self key: %v", err) + } + prefAttatch := NewConstrainedPrefAttachment(minChanSize, maxChanSize, + chanLimit, threshold) + + // For this set, we'll load the memory graph with two + // nodes, and a random channel connecting them. + const chanCapacity = btcutil.SatoshiPerBitcoin + edge1, edge2, err := graph.addRandChannel(nil, nil, chanCapacity) + if err != nil { + t1.Fatalf("unable to generate channel: %v", err) + } + + // With the necessary state initialized, we'll not + // attempt to select a set of candidates channel for + // creation given the current state of the graph. + const walletFunds = btcutil.SatoshiPerBitcoin * 10 + directives, err := prefAttatch.Select(self, graph, + walletFunds, skipNodes) + if err != nil { + t1.Fatalf("unable to select attachment directives: %v", err) + } + + // Two new directives should have been selected, one + // for each node already present within the graph. + if len(directives) != 2 { + t1.Fatalf("two attachment directives should've been "+ + "returned instead %v were", len(directives)) + } + + // The node attached to should be amongst the two edges + // created above. + for _, directive := range directives { + switch { + case directive.PeerKey.IsEqual(edge1.Peer.PubKey()): + case directive.PeerKey.IsEqual(edge2.Peer.PubKey()): + default: + t1.Fatalf("attache to unknown node: %x", + directive.PeerKey.SerializeCompressed()) + } + + // As the number of funds available exceed the + // max channel size, both edges should consume + // the maximum channel size. + if directive.ChanAmt != maxChanSize { + t1.Fatalf("max channel size should be allocated, "+ + "instead %v was: ", maxChanSize) + } + } + }) + if !success { + break + } + } +} + +// TestConstrainedPrefAttachmentSelectInsufficientFunds ensures that if the +// balance of the backing wallet is below the set min channel size, then it +// never recommends candidates to attach to. +func TestConstrainedPrefAttachmentSelectInsufficientFunds(t *testing.T) { + t.Parallel() + + prand.Seed(time.Now().Unix()) + + const ( + minChanSize = 0 + maxChanSize = btcutil.Amount(btcutil.SatoshiPerBitcoin) + chanLimit = 3 + threshold = 0.5 + ) + + skipNodes := make(map[NodeID]struct{}) + for _, graph := range chanGraphs { + success := t.Run(graph.name, func(t1 *testing.T) { + graph, cleanup, err := graph.genFunc() + if err != nil { + t1.Fatalf("unable to create graph: %v", err) + } + if cleanup != nil { + defer cleanup() + } + + // First, we'll generate a random key that represents + // "us", and create a new instance of the heuristic + // with our set parameters. + self, err := randKey() + if err != nil { + t1.Fatalf("unable to generate self key: %v", err) + } + prefAttatch := NewConstrainedPrefAttachment( + minChanSize, maxChanSize, chanLimit, threshold, + ) + + // Next, we'll attempt to select a set of candidates, + // passing zero for the amount of wallet funds. This + // should return an empty slice of directives. + directives, err := prefAttatch.Select(self, graph, 0, + skipNodes) + if err != nil { + t1.Fatalf("unable to select attachment "+ + "directives: %v", err) + } + if len(directives) != 0 { + t1.Fatalf("zero attachment directives "+ + "should've been returned instead %v were", + len(directives)) + } + }) + if !success { + break + } + } +} + +// TestConstrainedPrefAttachmentSelectGreedyAllocation tests that if upon +// deciding a set of candidates, we're unable to evenly split our funds, then +// we attempt to greedily allocate all funds to each selected vertex (up to the +// max channel size). +func TestConstrainedPrefAttachmentSelectGreedyAllocation(t *testing.T) { + t.Parallel() + + prand.Seed(time.Now().Unix()) + + const ( + minChanSize = 0 + maxChanSize = btcutil.Amount(btcutil.SatoshiPerBitcoin) + chanLimit = 3 + threshold = 0.5 + ) + + skipNodes := make(map[NodeID]struct{}) + for _, graph := range chanGraphs { + success := t.Run(graph.name, func(t1 *testing.T) { + graph, cleanup, err := graph.genFunc() + if err != nil { + t1.Fatalf("unable to create graph: %v", err) + } + if cleanup != nil { + defer cleanup() + } + + // First, we'll generate a random key that represents + // "us", and create a new instance of the heuristic + // with our set parameters. + self, err := randKey() + if err != nil { + t1.Fatalf("unable to generate self key: %v", err) + } + prefAttatch := NewConstrainedPrefAttachment( + minChanSize, maxChanSize, chanLimit, threshold, + ) + + const chanCapcity = btcutil.SatoshiPerBitcoin + + // Next, we'll add 3 nodes to the graph, creating an + // "open triangle topology". + edge1, _, err := graph.addRandChannel(nil, nil, + chanCapcity) + if err != nil { + t1.Fatalf("unable to create channel: %v", err) + } + _, _, err = graph.addRandChannel( + edge1.Peer.PubKey(), nil, chanCapcity, + ) + if err != nil { + t1.Fatalf("unable to create channel: %v", err) + } + + // At this point, there should be three nodes in the + // graph, with node node having two edges. + numNodes := 0 + twoChans := false + if err := graph.ForEachNode(func(n Node) error { + numNodes++ + + numChans := 0 + err := n.ForEachChannel(func(c ChannelEdge) error { + numChans++ + return nil + }) + if err != nil { + return err + } + + twoChans = twoChans || (numChans == 2) + + return nil + }); err != nil { + t1.Fatalf("unable to traverse graph: %v", err) + } + if numNodes != 3 { + t1.Fatalf("expected 3 nodes, instead have: %v", + numNodes) + } + if !twoChans { + t1.Fatalf("expected node to have two channels") + } + + // We'll now begin our test, modeling the available + // wallet balance to be 5.5 BTC. We're shooting for a + // 50/50 allocation, and have 3 BTC in channels. As a + // result, the heuristic should try to greedily + // allocate funds to channels. + const availableBalance = btcutil.SatoshiPerBitcoin * 2.5 + directives, err := prefAttatch.Select(self, graph, + availableBalance, skipNodes) + if err != nil { + t1.Fatalf("unable to select attachment "+ + "directives: %v", err) + } + + // Three directives should have been returned. + if len(directives) != 3 { + t1.Fatalf("expected 3 directives, instead "+ + "got: %v", len(directives)) + } + + // The two directive should have the max channel amount + // allocated. + if directives[0].ChanAmt != maxChanSize { + t1.Fatalf("expected recommendation of %v, "+ + "instead got %v", maxChanSize, + directives[0].ChanAmt) + } + if directives[1].ChanAmt != maxChanSize { + t1.Fatalf("expected recommendation of %v, "+ + "instead got %v", maxChanSize, + directives[1].ChanAmt) + } + + // The third channel should have been allocated the + // remainder, or 0.5 BTC. + if directives[2].ChanAmt != (btcutil.SatoshiPerBitcoin * 0.5) { + t1.Fatalf("expected recommendation of %v, "+ + "instead got %v", maxChanSize, + directives[2].ChanAmt) + } + }) + if !success { + break + } + } +} + +// TestConstrainedPrefAttachmentSelectSkipNodes ensures that if a node was +// already select for attachment, then that node is excluded from the set of +// candidate nodes. +func TestConstrainedPrefAttachmentSelectSkipNodes(t *testing.T) { + t.Parallel() + + prand.Seed(time.Now().Unix()) + + const ( + minChanSize = 0 + maxChanSize = btcutil.Amount(btcutil.SatoshiPerBitcoin) + chanLimit = 3 + threshold = 0.5 + ) + + skipNodes := make(map[NodeID]struct{}) + for _, graph := range chanGraphs { + success := t.Run(graph.name, func(t1 *testing.T) { + graph, cleanup, err := graph.genFunc() + if err != nil { + t1.Fatalf("unable to create graph: %v", err) + } + if cleanup != nil { + defer cleanup() + } + + // First, we'll generate a random key that represents + // "us", and create a new instance of the heuristic + // with our set parameters. + self, err := randKey() + if err != nil { + t1.Fatalf("unable to generate self key: %v", err) + } + prefAttatch := NewConstrainedPrefAttachment( + minChanSize, maxChanSize, chanLimit, threshold, + ) + + // Next, we'll create a simple topology of two nodes, + // with a single channel connecting them. + const chanCapcity = btcutil.SatoshiPerBitcoin + _, _, err = graph.addRandChannel(nil, nil, + chanCapcity) + if err != nil { + t1.Fatalf("unable to create channel: %v", err) + } + + // With our graph created, we'll now execute the Select + // function to recommend potential attachment + // candidates. + const availableBalance = btcutil.SatoshiPerBitcoin * 2.5 + directives, err := prefAttatch.Select(self, graph, + availableBalance, skipNodes) + if err != nil { + t1.Fatalf("unable to select attachment "+ + "directives: %v", err) + } + + // As the channel limit is three, and two nodes are + // present in the graph, both should be selected. + if len(directives) != 2 { + t1.Fatalf("expected two directives, instead "+ + "got %v", len(directives)) + } + + // We'll simulate a channel update by adding the nodes + // we just establish channel with the to set of nodes + // to be skipped. + skipNodes[NewNodeID(directives[0].PeerKey)] = struct{}{} + skipNodes[NewNodeID(directives[1].PeerKey)] = struct{}{} + + // If we attempt to make a call to the Select function, + // without providing any new information, then we + // should get no new directives as both nodes has + // already been attached to. + directives, err = prefAttatch.Select(self, graph, + availableBalance, skipNodes) + if err != nil { + t1.Fatalf("unable to select attachment "+ + "directives: %v", err) + } + + if len(directives) != 0 { + t1.Fatalf("zero new directives should have been "+ + "selected, but %v were", len(directives)) + } + }) + if !success { + break + } + } +}