diff --git a/autopilot/agent.go b/autopilot/agent.go index 32d7daa2..045ca998 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -50,7 +50,7 @@ type Config struct { // helper utility methods. type channelState map[lnwire.ShortChannelID]Channel -// CHannels returns a slice of all the active channels. +// Channels returns a slice of all the active channels. func (c channelState) Channels() []Channel { chans := make([]Channel, 0, len(c)) for _, channel := range c { @@ -209,6 +209,43 @@ func (a *Agent) OnChannelClose(closedChans ...lnwire.ShortChannelID) { }() } +// mergeNodeMaps merges the Agent's set of nodes that it already has active +// channels open to, with the set of nodes that are pending new channels. This +// ensures that the Agent doesn't attempt to open any "duplicate" channels to +// the same node. +func mergeNodeMaps(a map[NodeID]struct{}, + b map[NodeID]Channel) map[NodeID]struct{} { + + c := make(map[NodeID]struct{}, len(a)+len(b)) + for nodeID := range a { + c[nodeID] = struct{}{} + } + for nodeID := range b { + c[nodeID] = struct{}{} + } + + return c +} + +// mergeChanState merges the Agent's set of active channels, with the set of +// channels awaiting confirmation. This ensures that the agent doesn't go over +// the prescribed channel limit or fund allocation limit. +func mergeChanState(pendingChans map[NodeID]Channel, + activeChans channelState) []Channel { + + numChans := len(pendingChans) + len(activeChans) + totalChans := make([]Channel, 0, numChans) + + for _, activeChan := range activeChans.Channels() { + totalChans = append(totalChans, activeChan) + } + for _, pendingChan := range pendingChans { + totalChans = append(totalChans, pendingChan) + } + + return totalChans +} + // controller implements the closed-loop control system of the Agent. The // controller will make a decision w.r.t channel placement within the graph // based on: it's current internal state of the set of active channels open, @@ -218,8 +255,6 @@ func (a *Agent) OnChannelClose(closedChans ...lnwire.ShortChannelID) { func (a *Agent) controller(startingBalance btcutil.Amount) { defer a.wg.Done() - // TODO(roasbeef): add queries for internal state? - // We'll start off by assigning our starting balance, and injecting // that amount as an initial wake up to the main controller goroutine. a.OnBalanceChange(startingBalance) @@ -227,6 +262,13 @@ func (a *Agent) controller(startingBalance btcutil.Amount) { // TODO(roasbeef): do we in fact need to maintain order? // * use sync.Cond if so + // pendingOpens tracks the channels that we've requested to be + // initiated, but haven't yet been confirmed as being fully opened. + // This state is required as otherwise, we may go over our allotted + // channel limit, or open multiple channels to the same node. + pendingOpens := make(map[NodeID]Channel) + var pendingMtx sync.Mutex + // TODO(roasbeef): add 10-minute wake up timer for { select { @@ -258,6 +300,10 @@ func (a *Agent) controller(startingBalance btcutil.Amount) { newChan := update.newChan a.chanState[newChan.ChanID] = newChan + pendingMtx.Lock() + delete(pendingOpens, newChan.Node) + pendingMtx.Unlock() + // A channel has been closed, this may free up an // available slot, triggering a new channel update. case *chanCloseUpdate: @@ -271,15 +317,17 @@ func (a *Agent) controller(startingBalance btcutil.Amount) { } // With all the updates applied, we'll obtain a set of - // the current active channels. - chans := a.chanState.Channels() + // the current active channels (confirmed channels), + // and also factor in our set of unconfirmed channels. + confirmedChans := a.chanState + totalChans := mergeChanState(pendingOpens, confirmedChans) // Now that we've updated our internal state, we'll // consult our channel attachment heuristic to // determine if we should open up any additional // channels or modify existing channels. availableFunds, needMore := a.cfg.Heuristic.NeedMoreChans( - chans, a.totalBalance, + totalChans, a.totalBalance, ) if !needMore { continue @@ -290,7 +338,10 @@ func (a *Agent) controller(startingBalance btcutil.Amount) { // We're to attempt an attachment so we'll o obtain the // set of nodes that we currently have channels with so // we avoid duplicate edges. - nodesToSkip := a.chanState.ConnectedNodes() + connectedNodes := a.chanState.ConnectedNodes() + pendingMtx.Lock() + nodesToSkip := mergeNodeMaps(connectedNodes, pendingOpens) + pendingMtx.Unlock() // If we reach this point, then according to our // heuristic we should modify our channel state to tend @@ -321,7 +372,14 @@ func (a *Agent) controller(startingBalance btcutil.Amount) { // directive. If any of these succeed, then we'll // receive a new state update, taking us back to the // top of our controller loop. + pendingMtx.Lock() for _, chanCandidate := range chanCandidates { + nID := NewNodeID(chanCandidate.PeerKey) + pendingOpens[nID] = Channel{ + Capacity: chanCandidate.ChanAmt, + Node: nID, + } + go func(directive AttachmentDirective) { pub := directive.PeerKey err := a.cfg.ChanController.OpenChannel( @@ -334,14 +392,20 @@ func (a *Agent) controller(startingBalance btcutil.Amount) { "channel to %x of %v: %v", pub.SerializeCompressed(), directive.ChanAmt, err) - return + + // As the attempt failed, we'll + // clear it from the set of + // pending channels. + pendingMtx.Lock() + nID := NewNodeID(directive.PeerKey) + delete(pendingOpens, nID) + pendingMtx.Unlock() + } - // TODO(roasbeef): on err signal - // failure so attempt to allocate - // again? }(chanCandidate) } + pendingMtx.Unlock() // The agent has been signalled to exit, so we'll bail out // immediately. diff --git a/autopilot/agent_test.go b/autopilot/agent_test.go index 03a75c22..0fdc0ebe 100644 --- a/autopilot/agent_test.go +++ b/autopilot/agent_test.go @@ -17,21 +17,54 @@ type moreChansResp struct { amt btcutil.Amount } +type moreChanArg struct { + chans []Channel + balance btcutil.Amount +} + type mockHeuristic struct { moreChansResps chan moreChansResp + moreChanArgs chan moreChanArg + directiveResps chan []AttachmentDirective + directiveArgs chan directiveArg } func (m *mockHeuristic) NeedMoreChans(chans []Channel, balance btcutil.Amount) (btcutil.Amount, bool) { + if m.moreChanArgs != nil { + m.moreChanArgs <- moreChanArg{ + chans: chans, + balance: balance, + } + + } + resp := <-m.moreChansResps return resp.amt, resp.needMore } +type directiveArg struct { + self *btcec.PublicKey + graph ChannelGraph + amt btcutil.Amount + skip map[NodeID]struct{} +} + func (m *mockHeuristic) Select(self *btcec.PublicKey, graph ChannelGraph, + amtToUse btcutil.Amount, skipChans map[NodeID]struct{}) ([]AttachmentDirective, error) { + if m.directiveArgs != nil { + m.directiveArgs <- directiveArg{ + self: self, + graph: graph, + amt: amtToUse, + skip: skipChans, + } + } + resp := <-m.directiveResps return resp, nil } @@ -552,3 +585,179 @@ func TestAgentImmediateAttach(t *testing.T) { } } } + +// TestAgentPendingChannelState ensures that the agent properly factors in its +// pending channel state when making decisions w.r.t if it needs more channels +// or not, and if so, who is eligible to open new channels to. +func TestAgentPendingChannelState(t *testing.T) { + t.Parallel() + + // First, we'll create all the dependencies that we'll need in order to + // create the autopilot agent. + self, err := randKey() + if err != nil { + t.Fatalf("unable to generate key: %v", err) + } + heuristic := &mockHeuristic{ + moreChansResps: make(chan moreChansResp), + directiveResps: make(chan []AttachmentDirective), + } + chanController := &mockChanController{ + openChanSignals: make(chan openChanIntent), + } + memGraph, _, _ := newMemChanGraph() + + // The wallet will start with 6 BTC available. + const walletBalance = btcutil.SatoshiPerBitcoin * 6 + + // With the dependencies we created, we can now create the initial + // agent itself. + testCfg := Config{ + Self: self, + Heuristic: heuristic, + ChanController: chanController, + WalletBalance: func() (btcutil.Amount, error) { + return walletBalance, nil + }, + Graph: memGraph, + } + initialChans := []Channel{} + agent, err := New(testCfg, initialChans) + if err != nil { + t.Fatalf("unable to create agent: %v", err) + } + + // With the autopilot agent and all its dependencies we'll start the + // primary controller goroutine. + if err := agent.Start(); err != nil { + t.Fatalf("unable to start agent: %v", err) + } + defer agent.Stop() + + var wg sync.WaitGroup + + // Once again, we'll start by telling the agent as part of its first + // query, that it needs more channels and has 3 BTC available for + // attachment. + wg.Add(1) + go func() { + select { + + // We'll send over a response indicating that it should + // establish more channels, and give it a budget of 1 BTC to do + // so. + case heuristic.moreChansResps <- moreChansResp{true, btcutil.SatoshiPerBitcoin}: + wg.Done() + return + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } + }() + + // We'll wait for the first query to be consumed. If this doesn't + // happen then the above goroutine will timeout, and fail the test. + wg.Wait() + + heuristic.moreChanArgs = make(chan moreChanArg) + + // Next, the agent should deliver a query to the Select method of the + // heuristic. We'll only return a single directive for a pre-chosen + // node. + nodeKey, err := randKey() + if err != nil { + t.Fatalf("unable to generate key: %v", err) + } + nodeID := NewNodeID(nodeKey) + nodeDirective := AttachmentDirective{ + PeerKey: nodeKey, + ChanAmt: 0.5 * btcutil.SatoshiPerBitcoin, + Addrs: []net.Addr{ + &net.TCPAddr{ + IP: bytes.Repeat([]byte("a"), 16), + }, + }, + } + select { + case heuristic.directiveResps <- []AttachmentDirective{nodeDirective}: + return + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } + + heuristic.directiveArgs = make(chan directiveArg) + + // A request to open the channel should've also been sent. + select { + case openChan := <-chanController.openChanSignals: + if openChan.amt != nodeDirective.ChanAmt { + t.Fatalf("invalid chan amt: expected %v, got %v", + nodeDirective.ChanAmt, openChan.amt) + } + if !openChan.target.IsEqual(nodeKey) { + t.Fatalf("unexpected key: expected %x, got %x", + nodeKey.SerializeCompressed(), + openChan.target.SerializeCompressed()) + } + if len(openChan.addrs) != 1 { + t.Fatalf("should have single addr, instead have: %v", + len(openChan.addrs)) + } + case <-time.After(time.Second * 10): + t.Fatalf("channel wasn't opened in time") + } + + // Now, in order to test that the pending state was properly updated, + // we'll trigger a balance update in order to trigger a query to the + // heuristic. + agent.OnBalanceChange(0.4 * btcutil.SatoshiPerBitcoin) + + wg = sync.WaitGroup{} + + // The heuristic should be queried, and the argument for the set of + // channels passed in should include the pending channels that + // should've been created above. + select { + // The request that we get should include a pending channel for the + // one that we just created, otherwise the agent isn't properly + // updating its internal state. + case req := <-heuristic.moreChanArgs: + if len(req.chans) != 1 { + t.Fatalf("should include pending chan in current "+ + "state, instead have %v chans", len(req.chans)) + } + if req.chans[0].Capacity != nodeDirective.ChanAmt { + t.Fatalf("wrong chan capacity: expected %v, got %v", + req.chans[0].Capacity, nodeDirective.ChanAmt) + } + if req.chans[0].Node != nodeID { + t.Fatalf("wrong node ID: expected %x, got %x", + req.chans[0].Node[:], nodeID) + } + case <-time.After(time.Second * 10): + t.Fatalf("need more chans wasn't queried in time") + } + + // We'll send across a response indicating that it *does* need more + // channels. + select { + case heuristic.moreChansResps <- moreChansResp{true, btcutil.SatoshiPerBitcoin}: + case <-time.After(time.Second * 10): + t.Fatalf("need more chans wasn't queried in time") + } + + // The response above should prompt the agent to make a query to the + // Select method. The arguments passed should reflect the fact that the + // node we have a pending channel to, should be ignored. + select { + case req := <-heuristic.directiveArgs: + if len(req.skip) == 0 { + t.Fatalf("expected to skip %v nodes, instead "+ + "skipping %v", 1, len(req.skip)) + } + if _, ok := req.skip[nodeID]; !ok { + t.Fatalf("pending node not included in skip arguments") + } + case <-time.After(time.Second * 10): + t.Fatalf("select wasn't queried in time") + } +}