autopilot/agent: use NodeScores to select channel candidates

This commit makes the autopilot agent use the new NodeScores heuristic
API to select channel candiates, instead of the Select API. The result
will be similar, but instead of selecting a set of nodes to open
channels to, we get a score based results which can later be used
together with other heuristics to choose nodes to open channels to.

This commit also makes the existing autopilot agent tests compatible
with the new NodeScores API.
This commit is contained in:
Johan T. Halseth 2018-11-22 23:18:09 +01:00
parent e84bd29836
commit b3d315298c
No known key found for this signature in database
GPG Key ID: 15BAADA29DA20D26
2 changed files with 133 additions and 84 deletions

@ -1,6 +1,7 @@
package autopilot
import (
"bytes"
"fmt"
"math/rand"
"net"
@ -578,18 +579,49 @@ func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32,
)
a.pendingMtx.Unlock()
// If we reach this point, then according to our heuristic we
// should modify our channel state to tend towards what it
// determines to the optimal state. So we'll call Select to get
// a fresh batch of attachment directives, passing in the
// amount of funds available for us to use.
chanCandidates, err := a.cfg.Heuristic.Select(
a.cfg.Self, a.cfg.Graph, availableFunds,
numChans, nodesToSkip,
// Gather the set of all nodes in the graph, except those we
// want to skip.
selfPubBytes := a.cfg.Self.SerializeCompressed()
nodes := make(map[NodeID]struct{})
if err := a.cfg.Graph.ForEachNode(func(node Node) error {
nID := NodeID(node.PubKey())
// If we come across ourselves, them we'll continue in
// order to avoid attempting to make a channel with
// ourselves.
if bytes.Equal(nID[:], selfPubBytes) {
return nil
}
// Additionally, if this node is in the blacklist, then
// we'll skip it.
if _, ok := nodesToSkip[nID]; ok {
return nil
}
nodes[nID] = struct{}{}
return nil
}); err != nil {
return fmt.Errorf("unable to get graph nodes: %v", err)
}
// Use the heuristic to calculate a score for each node in the
// graph.
scores, err := a.cfg.Heuristic.NodeScores(
a.cfg.Graph, totalChans, availableFunds, nodes,
)
if err != nil {
return fmt.Errorf("Unable to select candidates for "+
"attachment: %v", err)
return fmt.Errorf("unable to calculate node scores : %v", err)
}
log.Debugf("Got scores for %d nodes", len(scores))
// Now use the score to make a weighted choice which
// nodes to attempt to open channels to.
chanCandidates, err := chooseN(int(numChans), scores)
if err != nil {
return fmt.Errorf("Unable to make weighted choice: %v",
err)
}
if len(chanCandidates) == 0 {
@ -630,7 +662,7 @@ func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32,
a.pendingConns[nodeID] = struct{}{}
a.wg.Add(1)
go a.executeDirective(chanCandidate)
go a.executeDirective(*chanCandidate)
}
return nil
}

@ -29,8 +29,8 @@ type mockHeuristic struct {
moreChansResps chan moreChansResp
moreChanArgs chan moreChanArg
directiveResps chan []AttachmentDirective
directiveArgs chan directiveArg
nodeScoresResps chan map[NodeID]*AttachmentDirective
nodeScoresArgs chan directiveArg
quit chan struct{}
}
@ -60,44 +60,43 @@ func (m *mockHeuristic) NeedMoreChans(chans []Channel,
}
type directiveArg struct {
self *btcec.PublicKey
graph ChannelGraph
amt btcutil.Amount
skip map[NodeID]struct{}
chans []Channel
nodes map[NodeID]struct{}
}
func (m *mockHeuristic) Select(self *btcec.PublicKey, graph ChannelGraph,
amtToUse btcutil.Amount, numChans uint32,
skipChans map[NodeID]struct{}) ([]AttachmentDirective, error) {
if m.directiveArgs != nil {
directive := directiveArg{
self: self,
graph: graph,
amt: amtToUse,
skip: skipChans,
}
select {
case m.directiveArgs <- directive:
case <-m.quit:
return nil, errors.New("exiting")
}
}
select {
case resp := <-m.directiveResps:
return resp, nil
case <-m.quit:
return nil, errors.New("exiting")
}
return nil, nil
}
func (m *mockHeuristic) NodeScores(g ChannelGraph, chans []Channel,
fundsAvailable btcutil.Amount, nodes map[NodeID]struct{}) (
map[NodeID]*AttachmentDirective, error) {
return nil, nil
if m.nodeScoresArgs != nil {
directive := directiveArg{
graph: g,
amt: fundsAvailable,
chans: chans,
nodes: nodes,
}
select {
case m.nodeScoresArgs <- directive:
case <-m.quit:
return nil, errors.New("exiting")
}
}
select {
case resp := <-m.nodeScoresResps:
return resp, nil
case <-m.quit:
return nil, errors.New("exiting")
}
}
var _ AttachmentHeuristic = (*mockHeuristic)(nil)
@ -152,7 +151,7 @@ func TestAgentChannelOpenSignal(t *testing.T) {
}
heuristic := &mockHeuristic{
moreChansResps: make(chan moreChansResp),
directiveResps: make(chan []AttachmentDirective),
nodeScoresResps: make(chan map[NodeID]*AttachmentDirective),
}
chanController := &mockChanController{
openChanSignals: make(chan openChanIntent, 10),
@ -233,7 +232,7 @@ func TestAgentChannelOpenSignal(t *testing.T) {
// If this send success, then Select was erroneously called and the
// test should be failed.
case heuristic.directiveResps <- []AttachmentDirective{}:
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}:
t.Fatalf("Select was called but shouldn't have been")
// This is the correct path as Select should've be called.
@ -277,7 +276,7 @@ func TestAgentChannelFailureSignal(t *testing.T) {
}
heuristic := &mockHeuristic{
moreChansResps: make(chan moreChansResp),
directiveResps: make(chan []AttachmentDirective),
nodeScoresResps: make(chan map[NodeID]*AttachmentDirective),
}
chanController := &mockFailingChanController{}
memGraph, _, _ := newMemChanGraph()
@ -331,7 +330,7 @@ func TestAgentChannelFailureSignal(t *testing.T) {
// At this point, the agent should now be querying the heuristic to
// request attachment directives, return a fake so the agent will
// attempt to open a channel.
var fakeDirective = AttachmentDirective{
var fakeDirective = &AttachmentDirective{
NodeID: NewNodeID(self),
ChanAmt: btcutil.SatoshiPerBitcoin,
Addrs: []net.Addr{
@ -339,10 +338,13 @@ func TestAgentChannelFailureSignal(t *testing.T) {
IP: bytes.Repeat([]byte("a"), 16),
},
},
Score: 0.5,
}
select {
case heuristic.directiveResps <- []AttachmentDirective{fakeDirective}:
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{
NewNodeID(self): fakeDirective,
}:
case <-time.After(time.Second * 10):
t.Fatal("heuristic wasn't queried in time")
}
@ -357,7 +359,7 @@ func TestAgentChannelFailureSignal(t *testing.T) {
}
select {
case heuristic.directiveResps <- []AttachmentDirective{}:
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}:
case <-time.After(time.Second * 10):
t.Fatal("heuristic wasn't queried in time")
}
@ -377,7 +379,7 @@ func TestAgentChannelCloseSignal(t *testing.T) {
}
heuristic := &mockHeuristic{
moreChansResps: make(chan moreChansResp),
directiveResps: make(chan []AttachmentDirective),
nodeScoresResps: make(chan map[NodeID]*AttachmentDirective),
}
chanController := &mockChanController{
openChanSignals: make(chan openChanIntent),
@ -465,7 +467,7 @@ func TestAgentChannelCloseSignal(t *testing.T) {
// If this send success, then Select was erroneously called and the
// test should be failed.
case heuristic.directiveResps <- []AttachmentDirective{}:
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}:
t.Fatalf("Select was called but shouldn't have been")
// This is the correct path as Select should've be called.
@ -487,7 +489,7 @@ func TestAgentBalanceUpdate(t *testing.T) {
}
heuristic := &mockHeuristic{
moreChansResps: make(chan moreChansResp),
directiveResps: make(chan []AttachmentDirective),
nodeScoresResps: make(chan map[NodeID]*AttachmentDirective),
}
chanController := &mockChanController{
openChanSignals: make(chan openChanIntent),
@ -576,7 +578,7 @@ func TestAgentBalanceUpdate(t *testing.T) {
// If this send success, then Select was erroneously called and the
// test should be failed.
case heuristic.directiveResps <- []AttachmentDirective{}:
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}:
t.Fatalf("Select was called but shouldn't have been")
// This is the correct path as Select should've be called.
@ -597,7 +599,7 @@ func TestAgentImmediateAttach(t *testing.T) {
}
heuristic := &mockHeuristic{
moreChansResps: make(chan moreChansResp),
directiveResps: make(chan []AttachmentDirective),
nodeScoresResps: make(chan map[NodeID]*AttachmentDirective),
}
chanController := &mockChanController{
openChanSignals: make(chan openChanIntent),
@ -666,7 +668,7 @@ func TestAgentImmediateAttach(t *testing.T) {
// At this point, the agent should now be querying the heuristic to
// requests attachment directives. We'll generate 5 mock directives so
// it can progress within its loop.
directives := make([]AttachmentDirective, numChans)
directives := make(map[NodeID]*AttachmentDirective)
nodeKeys := make(map[NodeID]struct{})
for i := 0; i < numChans; i++ {
pub, err := randKey()
@ -674,7 +676,7 @@ func TestAgentImmediateAttach(t *testing.T) {
t.Fatalf("unable to generate key: %v", err)
}
nodeID := NewNodeID(pub)
directives[i] = AttachmentDirective{
directives[nodeID] = &AttachmentDirective{
NodeID: nodeID,
ChanAmt: btcutil.SatoshiPerBitcoin,
Addrs: []net.Addr{
@ -682,6 +684,7 @@ func TestAgentImmediateAttach(t *testing.T) {
IP: bytes.Repeat([]byte("a"), 16),
},
},
Score: 0.5,
}
nodeKeys[nodeID] = struct{}{}
}
@ -689,7 +692,7 @@ func TestAgentImmediateAttach(t *testing.T) {
// With our fake directives created, we'll now send then to the agent
// as a return value for the Select function.
select {
case heuristic.directiveResps <- directives:
case heuristic.nodeScoresResps <- directives:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
@ -711,6 +714,7 @@ func TestAgentImmediateAttach(t *testing.T) {
nodeID)
}
delete(nodeKeys, nodeID)
case <-time.After(time.Second * 10):
t.Fatalf("channel not opened in time")
}
@ -730,7 +734,7 @@ func TestAgentPrivateChannels(t *testing.T) {
}
heuristic := &mockHeuristic{
moreChansResps: make(chan moreChansResp),
directiveResps: make(chan []AttachmentDirective),
nodeScoresResps: make(chan map[NodeID]*AttachmentDirective),
}
// The chanController should be initialized such that all of its open
// channel requests are for private channels.
@ -800,13 +804,13 @@ func TestAgentPrivateChannels(t *testing.T) {
// At this point, the agent should now be querying the heuristic to
// requests attachment directives. We'll generate 5 mock directives so
// it can progress within its loop.
directives := make([]AttachmentDirective, numChans)
directives := make(map[NodeID]*AttachmentDirective)
for i := 0; i < numChans; i++ {
pub, err := randKey()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
directives[i] = AttachmentDirective{
directives[NewNodeID(pub)] = &AttachmentDirective{
NodeID: NewNodeID(pub),
ChanAmt: btcutil.SatoshiPerBitcoin,
Addrs: []net.Addr{
@ -814,13 +818,14 @@ func TestAgentPrivateChannels(t *testing.T) {
IP: bytes.Repeat([]byte("a"), 16),
},
},
Score: 0.5,
}
}
// With our fake directives created, we'll now send then to the agent
// as a return value for the Select function.
select {
case heuristic.directiveResps <- directives:
case heuristic.nodeScoresResps <- directives:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
@ -853,7 +858,7 @@ func TestAgentPendingChannelState(t *testing.T) {
}
heuristic := &mockHeuristic{
moreChansResps: make(chan moreChansResp),
directiveResps: make(chan []AttachmentDirective),
nodeScoresResps: make(chan map[NodeID]*AttachmentDirective),
}
chanController := &mockChanController{
openChanSignals: make(chan openChanIntent),
@ -928,7 +933,7 @@ func TestAgentPendingChannelState(t *testing.T) {
t.Fatalf("unable to generate key: %v", err)
}
nodeID := NewNodeID(nodeKey)
nodeDirective := AttachmentDirective{
nodeDirective := &AttachmentDirective{
NodeID: nodeID,
ChanAmt: 0.5 * btcutil.SatoshiPerBitcoin,
Addrs: []net.Addr{
@ -936,14 +941,18 @@ func TestAgentPendingChannelState(t *testing.T) {
IP: bytes.Repeat([]byte("a"), 16),
},
},
Score: 0.5,
}
select {
case heuristic.directiveResps <- []AttachmentDirective{nodeDirective}:
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{
nodeID: nodeDirective,
}:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
heuristic.directiveArgs = make(chan directiveArg)
heuristic.nodeScoresArgs = make(chan directiveArg)
// A request to open the channel should've also been sent.
select {
@ -1006,12 +1015,12 @@ func TestAgentPendingChannelState(t *testing.T) {
// Select method. The arguments passed should reflect the fact that the
// node we have a pending channel to, should be ignored.
select {
case req := <-heuristic.directiveArgs:
if len(req.skip) == 0 {
case req := <-heuristic.nodeScoresArgs:
if len(req.chans) == 0 {
t.Fatalf("expected to skip %v nodes, instead "+
"skipping %v", 1, len(req.skip))
"skipping %v", 1, len(req.chans))
}
if _, ok := req.skip[nodeID]; !ok {
if req.chans[0].Node != nodeID {
t.Fatalf("pending node not included in skip arguments")
}
case <-time.After(time.Second * 10):
@ -1033,7 +1042,7 @@ func TestAgentPendingOpenChannel(t *testing.T) {
}
heuristic := &mockHeuristic{
moreChansResps: make(chan moreChansResp),
directiveResps: make(chan []AttachmentDirective),
nodeScoresResps: make(chan map[NodeID]*AttachmentDirective),
}
chanController := &mockChanController{
openChanSignals: make(chan openChanIntent),
@ -1096,7 +1105,7 @@ func TestAgentPendingOpenChannel(t *testing.T) {
// There shouldn't be a call to the Select method as we've returned
// "false" for NeedMoreChans above.
select {
case heuristic.directiveResps <- []AttachmentDirective{}:
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}:
t.Fatalf("Select was called but shouldn't have been")
default:
}
@ -1118,7 +1127,7 @@ func TestAgentOnNodeUpdates(t *testing.T) {
}
heuristic := &mockHeuristic{
moreChansResps: make(chan moreChansResp),
directiveResps: make(chan []AttachmentDirective),
nodeScoresResps: make(chan map[NodeID]*AttachmentDirective),
}
chanController := &mockChanController{
openChanSignals: make(chan openChanIntent),
@ -1174,7 +1183,7 @@ func TestAgentOnNodeUpdates(t *testing.T) {
// Send over an empty list of attachment directives, which should cause
// the agent to return to waiting on a new signal.
select {
case heuristic.directiveResps <- []AttachmentDirective{}:
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}:
case <-time.After(time.Second * 10):
t.Fatalf("Select was not called but should have been")
}
@ -1200,7 +1209,7 @@ func TestAgentOnNodeUpdates(t *testing.T) {
// It's not important that this list is also empty, so long as the node
// updates signal is causing the agent to make this attempt.
select {
case heuristic.directiveResps <- []AttachmentDirective{}:
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}:
case <-time.After(time.Second * 10):
t.Fatalf("Select was not called but should have been")
}
@ -1223,7 +1232,7 @@ func TestAgentSkipPendingConns(t *testing.T) {
}
heuristic := &mockHeuristic{
moreChansResps: make(chan moreChansResp),
directiveResps: make(chan []AttachmentDirective),
nodeScoresResps: make(chan map[NodeID]*AttachmentDirective),
}
chanController := &mockChanController{
openChanSignals: make(chan openChanIntent),
@ -1311,7 +1320,7 @@ func TestAgentSkipPendingConns(t *testing.T) {
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
nodeDirective := AttachmentDirective{
nodeDirective := &AttachmentDirective{
NodeID: NewNodeID(nodeKey),
ChanAmt: 0.5 * btcutil.SatoshiPerBitcoin,
Addrs: []net.Addr{
@ -1319,9 +1328,13 @@ func TestAgentSkipPendingConns(t *testing.T) {
IP: bytes.Repeat([]byte("a"), 16),
},
},
Score: 0.5,
}
select {
case heuristic.directiveResps <- []AttachmentDirective{nodeDirective}:
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{
NewNodeID(nodeKey): nodeDirective,
}:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
@ -1349,7 +1362,9 @@ func TestAgentSkipPendingConns(t *testing.T) {
// Send a directive for the same node, which already has a pending conn.
select {
case heuristic.directiveResps <- []AttachmentDirective{nodeDirective}:
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{
NewNodeID(nodeKey): nodeDirective,
}:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
@ -1386,7 +1401,9 @@ func TestAgentSkipPendingConns(t *testing.T) {
// Send a directive for the same node, which already has a pending conn.
select {
case heuristic.directiveResps <- []AttachmentDirective{nodeDirective}:
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{
NewNodeID(nodeKey): nodeDirective,
}:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}