From 711b695a2f515a657190f57b1a7306273a2eb2a1 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 15 Aug 2017 18:23:52 -0700 Subject: [PATCH] autopilot: add tracking of pending channel state This commit adds tracking of the pending channels state within the autopilot.Agent. This fixes a class of bugs which was discovered during the latest test net block storm wherein the Agent would attempt to repeatedly attach to the same node due to rapid closure of other channels. In this commit we fix this issue by ensuring that we always factor in the pending channel state when querying the heuristic w.r.t if we need more channels, and if so to which nodes should be attached to. --- autopilot/agent.go | 86 ++++++++++++++--- autopilot/agent_test.go | 209 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 284 insertions(+), 11 deletions(-) diff --git a/autopilot/agent.go b/autopilot/agent.go index 32d7daa2..045ca998 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -50,7 +50,7 @@ type Config struct { // helper utility methods. type channelState map[lnwire.ShortChannelID]Channel -// CHannels returns a slice of all the active channels. +// Channels returns a slice of all the active channels. func (c channelState) Channels() []Channel { chans := make([]Channel, 0, len(c)) for _, channel := range c { @@ -209,6 +209,43 @@ func (a *Agent) OnChannelClose(closedChans ...lnwire.ShortChannelID) { }() } +// mergeNodeMaps merges the Agent's set of nodes that it already has active +// channels open to, with the set of nodes that are pending new channels. This +// ensures that the Agent doesn't attempt to open any "duplicate" channels to +// the same node. +func mergeNodeMaps(a map[NodeID]struct{}, + b map[NodeID]Channel) map[NodeID]struct{} { + + c := make(map[NodeID]struct{}, len(a)+len(b)) + for nodeID := range a { + c[nodeID] = struct{}{} + } + for nodeID := range b { + c[nodeID] = struct{}{} + } + + return c +} + +// mergeChanState merges the Agent's set of active channels, with the set of +// channels awaiting confirmation. This ensures that the agent doesn't go over +// the prescribed channel limit or fund allocation limit. +func mergeChanState(pendingChans map[NodeID]Channel, + activeChans channelState) []Channel { + + numChans := len(pendingChans) + len(activeChans) + totalChans := make([]Channel, 0, numChans) + + for _, activeChan := range activeChans.Channels() { + totalChans = append(totalChans, activeChan) + } + for _, pendingChan := range pendingChans { + totalChans = append(totalChans, pendingChan) + } + + return totalChans +} + // controller implements the closed-loop control system of the Agent. The // controller will make a decision w.r.t channel placement within the graph // based on: it's current internal state of the set of active channels open, @@ -218,8 +255,6 @@ func (a *Agent) OnChannelClose(closedChans ...lnwire.ShortChannelID) { func (a *Agent) controller(startingBalance btcutil.Amount) { defer a.wg.Done() - // TODO(roasbeef): add queries for internal state? - // We'll start off by assigning our starting balance, and injecting // that amount as an initial wake up to the main controller goroutine. a.OnBalanceChange(startingBalance) @@ -227,6 +262,13 @@ func (a *Agent) controller(startingBalance btcutil.Amount) { // TODO(roasbeef): do we in fact need to maintain order? // * use sync.Cond if so + // pendingOpens tracks the channels that we've requested to be + // initiated, but haven't yet been confirmed as being fully opened. + // This state is required as otherwise, we may go over our allotted + // channel limit, or open multiple channels to the same node. + pendingOpens := make(map[NodeID]Channel) + var pendingMtx sync.Mutex + // TODO(roasbeef): add 10-minute wake up timer for { select { @@ -258,6 +300,10 @@ func (a *Agent) controller(startingBalance btcutil.Amount) { newChan := update.newChan a.chanState[newChan.ChanID] = newChan + pendingMtx.Lock() + delete(pendingOpens, newChan.Node) + pendingMtx.Unlock() + // A channel has been closed, this may free up an // available slot, triggering a new channel update. case *chanCloseUpdate: @@ -271,15 +317,17 @@ func (a *Agent) controller(startingBalance btcutil.Amount) { } // With all the updates applied, we'll obtain a set of - // the current active channels. - chans := a.chanState.Channels() + // the current active channels (confirmed channels), + // and also factor in our set of unconfirmed channels. + confirmedChans := a.chanState + totalChans := mergeChanState(pendingOpens, confirmedChans) // Now that we've updated our internal state, we'll // consult our channel attachment heuristic to // determine if we should open up any additional // channels or modify existing channels. availableFunds, needMore := a.cfg.Heuristic.NeedMoreChans( - chans, a.totalBalance, + totalChans, a.totalBalance, ) if !needMore { continue @@ -290,7 +338,10 @@ func (a *Agent) controller(startingBalance btcutil.Amount) { // We're to attempt an attachment so we'll o obtain the // set of nodes that we currently have channels with so // we avoid duplicate edges. - nodesToSkip := a.chanState.ConnectedNodes() + connectedNodes := a.chanState.ConnectedNodes() + pendingMtx.Lock() + nodesToSkip := mergeNodeMaps(connectedNodes, pendingOpens) + pendingMtx.Unlock() // If we reach this point, then according to our // heuristic we should modify our channel state to tend @@ -321,7 +372,14 @@ func (a *Agent) controller(startingBalance btcutil.Amount) { // directive. If any of these succeed, then we'll // receive a new state update, taking us back to the // top of our controller loop. + pendingMtx.Lock() for _, chanCandidate := range chanCandidates { + nID := NewNodeID(chanCandidate.PeerKey) + pendingOpens[nID] = Channel{ + Capacity: chanCandidate.ChanAmt, + Node: nID, + } + go func(directive AttachmentDirective) { pub := directive.PeerKey err := a.cfg.ChanController.OpenChannel( @@ -334,14 +392,20 @@ func (a *Agent) controller(startingBalance btcutil.Amount) { "channel to %x of %v: %v", pub.SerializeCompressed(), directive.ChanAmt, err) - return + + // As the attempt failed, we'll + // clear it from the set of + // pending channels. + pendingMtx.Lock() + nID := NewNodeID(directive.PeerKey) + delete(pendingOpens, nID) + pendingMtx.Unlock() + } - // TODO(roasbeef): on err signal - // failure so attempt to allocate - // again? }(chanCandidate) } + pendingMtx.Unlock() // The agent has been signalled to exit, so we'll bail out // immediately. diff --git a/autopilot/agent_test.go b/autopilot/agent_test.go index 03a75c22..0fdc0ebe 100644 --- a/autopilot/agent_test.go +++ b/autopilot/agent_test.go @@ -17,21 +17,54 @@ type moreChansResp struct { amt btcutil.Amount } +type moreChanArg struct { + chans []Channel + balance btcutil.Amount +} + type mockHeuristic struct { moreChansResps chan moreChansResp + moreChanArgs chan moreChanArg + directiveResps chan []AttachmentDirective + directiveArgs chan directiveArg } func (m *mockHeuristic) NeedMoreChans(chans []Channel, balance btcutil.Amount) (btcutil.Amount, bool) { + if m.moreChanArgs != nil { + m.moreChanArgs <- moreChanArg{ + chans: chans, + balance: balance, + } + + } + resp := <-m.moreChansResps return resp.amt, resp.needMore } +type directiveArg struct { + self *btcec.PublicKey + graph ChannelGraph + amt btcutil.Amount + skip map[NodeID]struct{} +} + func (m *mockHeuristic) Select(self *btcec.PublicKey, graph ChannelGraph, + amtToUse btcutil.Amount, skipChans map[NodeID]struct{}) ([]AttachmentDirective, error) { + if m.directiveArgs != nil { + m.directiveArgs <- directiveArg{ + self: self, + graph: graph, + amt: amtToUse, + skip: skipChans, + } + } + resp := <-m.directiveResps return resp, nil } @@ -552,3 +585,179 @@ func TestAgentImmediateAttach(t *testing.T) { } } } + +// TestAgentPendingChannelState ensures that the agent properly factors in its +// pending channel state when making decisions w.r.t if it needs more channels +// or not, and if so, who is eligible to open new channels to. +func TestAgentPendingChannelState(t *testing.T) { + t.Parallel() + + // First, we'll create all the dependencies that we'll need in order to + // create the autopilot agent. + self, err := randKey() + if err != nil { + t.Fatalf("unable to generate key: %v", err) + } + heuristic := &mockHeuristic{ + moreChansResps: make(chan moreChansResp), + directiveResps: make(chan []AttachmentDirective), + } + chanController := &mockChanController{ + openChanSignals: make(chan openChanIntent), + } + memGraph, _, _ := newMemChanGraph() + + // The wallet will start with 6 BTC available. + const walletBalance = btcutil.SatoshiPerBitcoin * 6 + + // With the dependencies we created, we can now create the initial + // agent itself. + testCfg := Config{ + Self: self, + Heuristic: heuristic, + ChanController: chanController, + WalletBalance: func() (btcutil.Amount, error) { + return walletBalance, nil + }, + Graph: memGraph, + } + initialChans := []Channel{} + agent, err := New(testCfg, initialChans) + if err != nil { + t.Fatalf("unable to create agent: %v", err) + } + + // With the autopilot agent and all its dependencies we'll start the + // primary controller goroutine. + if err := agent.Start(); err != nil { + t.Fatalf("unable to start agent: %v", err) + } + defer agent.Stop() + + var wg sync.WaitGroup + + // Once again, we'll start by telling the agent as part of its first + // query, that it needs more channels and has 3 BTC available for + // attachment. + wg.Add(1) + go func() { + select { + + // We'll send over a response indicating that it should + // establish more channels, and give it a budget of 1 BTC to do + // so. + case heuristic.moreChansResps <- moreChansResp{true, btcutil.SatoshiPerBitcoin}: + wg.Done() + return + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } + }() + + // We'll wait for the first query to be consumed. If this doesn't + // happen then the above goroutine will timeout, and fail the test. + wg.Wait() + + heuristic.moreChanArgs = make(chan moreChanArg) + + // Next, the agent should deliver a query to the Select method of the + // heuristic. We'll only return a single directive for a pre-chosen + // node. + nodeKey, err := randKey() + if err != nil { + t.Fatalf("unable to generate key: %v", err) + } + nodeID := NewNodeID(nodeKey) + nodeDirective := AttachmentDirective{ + PeerKey: nodeKey, + ChanAmt: 0.5 * btcutil.SatoshiPerBitcoin, + Addrs: []net.Addr{ + &net.TCPAddr{ + IP: bytes.Repeat([]byte("a"), 16), + }, + }, + } + select { + case heuristic.directiveResps <- []AttachmentDirective{nodeDirective}: + return + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } + + heuristic.directiveArgs = make(chan directiveArg) + + // A request to open the channel should've also been sent. + select { + case openChan := <-chanController.openChanSignals: + if openChan.amt != nodeDirective.ChanAmt { + t.Fatalf("invalid chan amt: expected %v, got %v", + nodeDirective.ChanAmt, openChan.amt) + } + if !openChan.target.IsEqual(nodeKey) { + t.Fatalf("unexpected key: expected %x, got %x", + nodeKey.SerializeCompressed(), + openChan.target.SerializeCompressed()) + } + if len(openChan.addrs) != 1 { + t.Fatalf("should have single addr, instead have: %v", + len(openChan.addrs)) + } + case <-time.After(time.Second * 10): + t.Fatalf("channel wasn't opened in time") + } + + // Now, in order to test that the pending state was properly updated, + // we'll trigger a balance update in order to trigger a query to the + // heuristic. + agent.OnBalanceChange(0.4 * btcutil.SatoshiPerBitcoin) + + wg = sync.WaitGroup{} + + // The heuristic should be queried, and the argument for the set of + // channels passed in should include the pending channels that + // should've been created above. + select { + // The request that we get should include a pending channel for the + // one that we just created, otherwise the agent isn't properly + // updating its internal state. + case req := <-heuristic.moreChanArgs: + if len(req.chans) != 1 { + t.Fatalf("should include pending chan in current "+ + "state, instead have %v chans", len(req.chans)) + } + if req.chans[0].Capacity != nodeDirective.ChanAmt { + t.Fatalf("wrong chan capacity: expected %v, got %v", + req.chans[0].Capacity, nodeDirective.ChanAmt) + } + if req.chans[0].Node != nodeID { + t.Fatalf("wrong node ID: expected %x, got %x", + req.chans[0].Node[:], nodeID) + } + case <-time.After(time.Second * 10): + t.Fatalf("need more chans wasn't queried in time") + } + + // We'll send across a response indicating that it *does* need more + // channels. + select { + case heuristic.moreChansResps <- moreChansResp{true, btcutil.SatoshiPerBitcoin}: + case <-time.After(time.Second * 10): + t.Fatalf("need more chans wasn't queried in time") + } + + // The response above should prompt the agent to make a query to the + // Select method. The arguments passed should reflect the fact that the + // node we have a pending channel to, should be ignored. + select { + case req := <-heuristic.directiveArgs: + if len(req.skip) == 0 { + t.Fatalf("expected to skip %v nodes, instead "+ + "skipping %v", 1, len(req.skip)) + } + if _, ok := req.skip[nodeID]; !ok { + t.Fatalf("pending node not included in skip arguments") + } + case <-time.After(time.Second * 10): + t.Fatalf("select wasn't queried in time") + } +}