diff --git a/autopilot/agent.go b/autopilot/agent.go index 26859a7f..be6480c4 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -1,6 +1,7 @@ package autopilot import ( + "bytes" "fmt" "math/rand" "net" @@ -578,18 +579,49 @@ func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32, ) a.pendingMtx.Unlock() - // If we reach this point, then according to our heuristic we - // should modify our channel state to tend towards what it - // determines to the optimal state. So we'll call Select to get - // a fresh batch of attachment directives, passing in the - // amount of funds available for us to use. - chanCandidates, err := a.cfg.Heuristic.Select( - a.cfg.Self, a.cfg.Graph, availableFunds, - numChans, nodesToSkip, + // Gather the set of all nodes in the graph, except those we + // want to skip. + selfPubBytes := a.cfg.Self.SerializeCompressed() + nodes := make(map[NodeID]struct{}) + if err := a.cfg.Graph.ForEachNode(func(node Node) error { + nID := NodeID(node.PubKey()) + + // If we come across ourselves, them we'll continue in + // order to avoid attempting to make a channel with + // ourselves. + if bytes.Equal(nID[:], selfPubBytes) { + return nil + } + + // Additionally, if this node is in the blacklist, then + // we'll skip it. + if _, ok := nodesToSkip[nID]; ok { + return nil + } + + nodes[nID] = struct{}{} + return nil + }); err != nil { + return fmt.Errorf("unable to get graph nodes: %v", err) + } + + // Use the heuristic to calculate a score for each node in the + // graph. + scores, err := a.cfg.Heuristic.NodeScores( + a.cfg.Graph, totalChans, availableFunds, nodes, ) if err != nil { - return fmt.Errorf("Unable to select candidates for "+ - "attachment: %v", err) + return fmt.Errorf("unable to calculate node scores : %v", err) + } + + log.Debugf("Got scores for %d nodes", len(scores)) + + // Now use the score to make a weighted choice which + // nodes to attempt to open channels to. + chanCandidates, err := chooseN(int(numChans), scores) + if err != nil { + return fmt.Errorf("Unable to make weighted choice: %v", + err) } if len(chanCandidates) == 0 { @@ -630,7 +662,7 @@ func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32, a.pendingConns[nodeID] = struct{}{} a.wg.Add(1) - go a.executeDirective(chanCandidate) + go a.executeDirective(*chanCandidate) } return nil } diff --git a/autopilot/agent_test.go b/autopilot/agent_test.go index 0017eb45..9cf8aae2 100644 --- a/autopilot/agent_test.go +++ b/autopilot/agent_test.go @@ -29,8 +29,8 @@ type mockHeuristic struct { moreChansResps chan moreChansResp moreChanArgs chan moreChanArg - directiveResps chan []AttachmentDirective - directiveArgs chan directiveArg + nodeScoresResps chan map[NodeID]*AttachmentDirective + nodeScoresArgs chan directiveArg quit chan struct{} } @@ -60,44 +60,43 @@ func (m *mockHeuristic) NeedMoreChans(chans []Channel, } type directiveArg struct { - self *btcec.PublicKey graph ChannelGraph amt btcutil.Amount - skip map[NodeID]struct{} + chans []Channel + nodes map[NodeID]struct{} } func (m *mockHeuristic) Select(self *btcec.PublicKey, graph ChannelGraph, amtToUse btcutil.Amount, numChans uint32, skipChans map[NodeID]struct{}) ([]AttachmentDirective, error) { - - if m.directiveArgs != nil { - directive := directiveArg{ - self: self, - graph: graph, - amt: amtToUse, - skip: skipChans, - } - - select { - case m.directiveArgs <- directive: - case <-m.quit: - return nil, errors.New("exiting") - } - } - - select { - case resp := <-m.directiveResps: - return resp, nil - case <-m.quit: - return nil, errors.New("exiting") - } + return nil, nil } func (m *mockHeuristic) NodeScores(g ChannelGraph, chans []Channel, fundsAvailable btcutil.Amount, nodes map[NodeID]struct{}) ( map[NodeID]*AttachmentDirective, error) { - return nil, nil + if m.nodeScoresArgs != nil { + directive := directiveArg{ + graph: g, + amt: fundsAvailable, + chans: chans, + nodes: nodes, + } + + select { + case m.nodeScoresArgs <- directive: + case <-m.quit: + return nil, errors.New("exiting") + } + } + + select { + case resp := <-m.nodeScoresResps: + return resp, nil + case <-m.quit: + return nil, errors.New("exiting") + } } var _ AttachmentHeuristic = (*mockHeuristic)(nil) @@ -151,8 +150,8 @@ func TestAgentChannelOpenSignal(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } heuristic := &mockHeuristic{ - moreChansResps: make(chan moreChansResp), - directiveResps: make(chan []AttachmentDirective), + moreChansResps: make(chan moreChansResp), + nodeScoresResps: make(chan map[NodeID]*AttachmentDirective), } chanController := &mockChanController{ openChanSignals: make(chan openChanIntent, 10), @@ -233,7 +232,7 @@ func TestAgentChannelOpenSignal(t *testing.T) { // If this send success, then Select was erroneously called and the // test should be failed. - case heuristic.directiveResps <- []AttachmentDirective{}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}: t.Fatalf("Select was called but shouldn't have been") // This is the correct path as Select should've be called. @@ -276,8 +275,8 @@ func TestAgentChannelFailureSignal(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } heuristic := &mockHeuristic{ - moreChansResps: make(chan moreChansResp), - directiveResps: make(chan []AttachmentDirective), + moreChansResps: make(chan moreChansResp), + nodeScoresResps: make(chan map[NodeID]*AttachmentDirective), } chanController := &mockFailingChanController{} memGraph, _, _ := newMemChanGraph() @@ -331,7 +330,7 @@ func TestAgentChannelFailureSignal(t *testing.T) { // At this point, the agent should now be querying the heuristic to // request attachment directives, return a fake so the agent will // attempt to open a channel. - var fakeDirective = AttachmentDirective{ + var fakeDirective = &AttachmentDirective{ NodeID: NewNodeID(self), ChanAmt: btcutil.SatoshiPerBitcoin, Addrs: []net.Addr{ @@ -339,10 +338,13 @@ func TestAgentChannelFailureSignal(t *testing.T) { IP: bytes.Repeat([]byte("a"), 16), }, }, + Score: 0.5, } select { - case heuristic.directiveResps <- []AttachmentDirective{fakeDirective}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{ + NewNodeID(self): fakeDirective, + }: case <-time.After(time.Second * 10): t.Fatal("heuristic wasn't queried in time") } @@ -357,7 +359,7 @@ func TestAgentChannelFailureSignal(t *testing.T) { } select { - case heuristic.directiveResps <- []AttachmentDirective{}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}: case <-time.After(time.Second * 10): t.Fatal("heuristic wasn't queried in time") } @@ -376,8 +378,8 @@ func TestAgentChannelCloseSignal(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } heuristic := &mockHeuristic{ - moreChansResps: make(chan moreChansResp), - directiveResps: make(chan []AttachmentDirective), + moreChansResps: make(chan moreChansResp), + nodeScoresResps: make(chan map[NodeID]*AttachmentDirective), } chanController := &mockChanController{ openChanSignals: make(chan openChanIntent), @@ -465,7 +467,7 @@ func TestAgentChannelCloseSignal(t *testing.T) { // If this send success, then Select was erroneously called and the // test should be failed. - case heuristic.directiveResps <- []AttachmentDirective{}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}: t.Fatalf("Select was called but shouldn't have been") // This is the correct path as Select should've be called. @@ -486,8 +488,8 @@ func TestAgentBalanceUpdate(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } heuristic := &mockHeuristic{ - moreChansResps: make(chan moreChansResp), - directiveResps: make(chan []AttachmentDirective), + moreChansResps: make(chan moreChansResp), + nodeScoresResps: make(chan map[NodeID]*AttachmentDirective), } chanController := &mockChanController{ openChanSignals: make(chan openChanIntent), @@ -576,7 +578,7 @@ func TestAgentBalanceUpdate(t *testing.T) { // If this send success, then Select was erroneously called and the // test should be failed. - case heuristic.directiveResps <- []AttachmentDirective{}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}: t.Fatalf("Select was called but shouldn't have been") // This is the correct path as Select should've be called. @@ -596,8 +598,8 @@ func TestAgentImmediateAttach(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } heuristic := &mockHeuristic{ - moreChansResps: make(chan moreChansResp), - directiveResps: make(chan []AttachmentDirective), + moreChansResps: make(chan moreChansResp), + nodeScoresResps: make(chan map[NodeID]*AttachmentDirective), } chanController := &mockChanController{ openChanSignals: make(chan openChanIntent), @@ -666,7 +668,7 @@ func TestAgentImmediateAttach(t *testing.T) { // At this point, the agent should now be querying the heuristic to // requests attachment directives. We'll generate 5 mock directives so // it can progress within its loop. - directives := make([]AttachmentDirective, numChans) + directives := make(map[NodeID]*AttachmentDirective) nodeKeys := make(map[NodeID]struct{}) for i := 0; i < numChans; i++ { pub, err := randKey() @@ -674,7 +676,7 @@ func TestAgentImmediateAttach(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } nodeID := NewNodeID(pub) - directives[i] = AttachmentDirective{ + directives[nodeID] = &AttachmentDirective{ NodeID: nodeID, ChanAmt: btcutil.SatoshiPerBitcoin, Addrs: []net.Addr{ @@ -682,6 +684,7 @@ func TestAgentImmediateAttach(t *testing.T) { IP: bytes.Repeat([]byte("a"), 16), }, }, + Score: 0.5, } nodeKeys[nodeID] = struct{}{} } @@ -689,7 +692,7 @@ func TestAgentImmediateAttach(t *testing.T) { // With our fake directives created, we'll now send then to the agent // as a return value for the Select function. select { - case heuristic.directiveResps <- directives: + case heuristic.nodeScoresResps <- directives: case <-time.After(time.Second * 10): t.Fatalf("heuristic wasn't queried in time") } @@ -711,6 +714,7 @@ func TestAgentImmediateAttach(t *testing.T) { nodeID) } delete(nodeKeys, nodeID) + case <-time.After(time.Second * 10): t.Fatalf("channel not opened in time") } @@ -729,8 +733,8 @@ func TestAgentPrivateChannels(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } heuristic := &mockHeuristic{ - moreChansResps: make(chan moreChansResp), - directiveResps: make(chan []AttachmentDirective), + moreChansResps: make(chan moreChansResp), + nodeScoresResps: make(chan map[NodeID]*AttachmentDirective), } // The chanController should be initialized such that all of its open // channel requests are for private channels. @@ -800,13 +804,13 @@ func TestAgentPrivateChannels(t *testing.T) { // At this point, the agent should now be querying the heuristic to // requests attachment directives. We'll generate 5 mock directives so // it can progress within its loop. - directives := make([]AttachmentDirective, numChans) + directives := make(map[NodeID]*AttachmentDirective) for i := 0; i < numChans; i++ { pub, err := randKey() if err != nil { t.Fatalf("unable to generate key: %v", err) } - directives[i] = AttachmentDirective{ + directives[NewNodeID(pub)] = &AttachmentDirective{ NodeID: NewNodeID(pub), ChanAmt: btcutil.SatoshiPerBitcoin, Addrs: []net.Addr{ @@ -814,13 +818,14 @@ func TestAgentPrivateChannels(t *testing.T) { IP: bytes.Repeat([]byte("a"), 16), }, }, + Score: 0.5, } } // With our fake directives created, we'll now send then to the agent // as a return value for the Select function. select { - case heuristic.directiveResps <- directives: + case heuristic.nodeScoresResps <- directives: case <-time.After(time.Second * 10): t.Fatalf("heuristic wasn't queried in time") } @@ -852,8 +857,8 @@ func TestAgentPendingChannelState(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } heuristic := &mockHeuristic{ - moreChansResps: make(chan moreChansResp), - directiveResps: make(chan []AttachmentDirective), + moreChansResps: make(chan moreChansResp), + nodeScoresResps: make(chan map[NodeID]*AttachmentDirective), } chanController := &mockChanController{ openChanSignals: make(chan openChanIntent), @@ -928,7 +933,7 @@ func TestAgentPendingChannelState(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } nodeID := NewNodeID(nodeKey) - nodeDirective := AttachmentDirective{ + nodeDirective := &AttachmentDirective{ NodeID: nodeID, ChanAmt: 0.5 * btcutil.SatoshiPerBitcoin, Addrs: []net.Addr{ @@ -936,14 +941,18 @@ func TestAgentPendingChannelState(t *testing.T) { IP: bytes.Repeat([]byte("a"), 16), }, }, + Score: 0.5, } + select { - case heuristic.directiveResps <- []AttachmentDirective{nodeDirective}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{ + nodeID: nodeDirective, + }: case <-time.After(time.Second * 10): t.Fatalf("heuristic wasn't queried in time") } - heuristic.directiveArgs = make(chan directiveArg) + heuristic.nodeScoresArgs = make(chan directiveArg) // A request to open the channel should've also been sent. select { @@ -1006,12 +1015,12 @@ func TestAgentPendingChannelState(t *testing.T) { // Select method. The arguments passed should reflect the fact that the // node we have a pending channel to, should be ignored. select { - case req := <-heuristic.directiveArgs: - if len(req.skip) == 0 { + case req := <-heuristic.nodeScoresArgs: + if len(req.chans) == 0 { t.Fatalf("expected to skip %v nodes, instead "+ - "skipping %v", 1, len(req.skip)) + "skipping %v", 1, len(req.chans)) } - if _, ok := req.skip[nodeID]; !ok { + if req.chans[0].Node != nodeID { t.Fatalf("pending node not included in skip arguments") } case <-time.After(time.Second * 10): @@ -1032,8 +1041,8 @@ func TestAgentPendingOpenChannel(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } heuristic := &mockHeuristic{ - moreChansResps: make(chan moreChansResp), - directiveResps: make(chan []AttachmentDirective), + moreChansResps: make(chan moreChansResp), + nodeScoresResps: make(chan map[NodeID]*AttachmentDirective), } chanController := &mockChanController{ openChanSignals: make(chan openChanIntent), @@ -1096,7 +1105,7 @@ func TestAgentPendingOpenChannel(t *testing.T) { // There shouldn't be a call to the Select method as we've returned // "false" for NeedMoreChans above. select { - case heuristic.directiveResps <- []AttachmentDirective{}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}: t.Fatalf("Select was called but shouldn't have been") default: } @@ -1117,8 +1126,8 @@ func TestAgentOnNodeUpdates(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } heuristic := &mockHeuristic{ - moreChansResps: make(chan moreChansResp), - directiveResps: make(chan []AttachmentDirective), + moreChansResps: make(chan moreChansResp), + nodeScoresResps: make(chan map[NodeID]*AttachmentDirective), } chanController := &mockChanController{ openChanSignals: make(chan openChanIntent), @@ -1174,7 +1183,7 @@ func TestAgentOnNodeUpdates(t *testing.T) { // Send over an empty list of attachment directives, which should cause // the agent to return to waiting on a new signal. select { - case heuristic.directiveResps <- []AttachmentDirective{}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}: case <-time.After(time.Second * 10): t.Fatalf("Select was not called but should have been") } @@ -1200,7 +1209,7 @@ func TestAgentOnNodeUpdates(t *testing.T) { // It's not important that this list is also empty, so long as the node // updates signal is causing the agent to make this attempt. select { - case heuristic.directiveResps <- []AttachmentDirective{}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}: case <-time.After(time.Second * 10): t.Fatalf("Select was not called but should have been") } @@ -1222,8 +1231,8 @@ func TestAgentSkipPendingConns(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } heuristic := &mockHeuristic{ - moreChansResps: make(chan moreChansResp), - directiveResps: make(chan []AttachmentDirective), + moreChansResps: make(chan moreChansResp), + nodeScoresResps: make(chan map[NodeID]*AttachmentDirective), } chanController := &mockChanController{ openChanSignals: make(chan openChanIntent), @@ -1311,7 +1320,7 @@ func TestAgentSkipPendingConns(t *testing.T) { if err != nil { t.Fatalf("unable to generate key: %v", err) } - nodeDirective := AttachmentDirective{ + nodeDirective := &AttachmentDirective{ NodeID: NewNodeID(nodeKey), ChanAmt: 0.5 * btcutil.SatoshiPerBitcoin, Addrs: []net.Addr{ @@ -1319,9 +1328,13 @@ func TestAgentSkipPendingConns(t *testing.T) { IP: bytes.Repeat([]byte("a"), 16), }, }, + Score: 0.5, } + select { - case heuristic.directiveResps <- []AttachmentDirective{nodeDirective}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{ + NewNodeID(nodeKey): nodeDirective, + }: case <-time.After(time.Second * 10): t.Fatalf("heuristic wasn't queried in time") } @@ -1349,7 +1362,9 @@ func TestAgentSkipPendingConns(t *testing.T) { // Send a directive for the same node, which already has a pending conn. select { - case heuristic.directiveResps <- []AttachmentDirective{nodeDirective}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{ + NewNodeID(nodeKey): nodeDirective, + }: case <-time.After(time.Second * 10): t.Fatalf("heuristic wasn't queried in time") } @@ -1386,7 +1401,9 @@ func TestAgentSkipPendingConns(t *testing.T) { // Send a directive for the same node, which already has a pending conn. select { - case heuristic.directiveResps <- []AttachmentDirective{nodeDirective}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{ + NewNodeID(nodeKey): nodeDirective, + }: case <-time.After(time.Second * 10): t.Fatalf("heuristic wasn't queried in time") }