autopilot/interface+agent: return NodeScore from NodeScores

Since NodeScores no longer returns fully populated AttachmentDirectives,
we make this explicit by defining a new type NodeScore that includes a
subset of what the AttachmentDirective does.
This commit is contained in:
Johan T. Halseth 2018-12-19 15:24:17 +01:00
parent 25de66d27b
commit 5b1e72a019
No known key found for this signature in database
GPG Key ID: 15BAADA29DA20D26
5 changed files with 129 additions and 173 deletions

@ -573,25 +573,16 @@ func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32,
log.Debugf("Got scores for %d nodes", len(scores))
// Temporary convert to NodeScore.
nodeScores := make(map[NodeID]*NodeScore)
for k, v := range scores {
nodeScores[k] = &NodeScore{
NodeID: v.NodeID,
Score: v.Score,
}
}
// Now use the score to make a weighted choice which nodes to attempt
// to open channels to.
nodeScores, err = chooseN(numChans, nodeScores)
scores, err = chooseN(numChans, scores)
if err != nil {
return fmt.Errorf("Unable to make weighted choice: %v",
err)
}
chanCandidates := make(map[NodeID]*AttachmentDirective)
for nID := range nodeScores {
for nID := range scores {
// Add addresses to the candidates.
addrs := addresses[nID]

@ -67,7 +67,7 @@ func (m *mockConstraints) MaxChanSize() btcutil.Amount {
var _ AgentConstraints = (*mockConstraints)(nil)
type mockHeuristic struct {
nodeScoresResps chan map[NodeID]*AttachmentDirective
nodeScoresResps chan map[NodeID]*NodeScore
nodeScoresArgs chan directiveArg
quit chan struct{}
@ -82,7 +82,7 @@ type directiveArg struct {
func (m *mockHeuristic) NodeScores(g ChannelGraph, chans []Channel,
fundsAvailable btcutil.Amount, nodes map[NodeID]struct{}) (
map[NodeID]*AttachmentDirective, error) {
map[NodeID]*NodeScore, error) {
if m.nodeScoresArgs != nil {
directive := directiveArg{
@ -160,7 +160,7 @@ func TestAgentChannelOpenSignal(t *testing.T) {
quit := make(chan struct{})
heuristic := &mockHeuristic{
nodeScoresResps: make(chan map[NodeID]*AttachmentDirective),
nodeScoresResps: make(chan map[NodeID]*NodeScore),
quit: quit,
}
constraints := &mockConstraints{
@ -250,7 +250,7 @@ func TestAgentChannelOpenSignal(t *testing.T) {
// If this send success, then Select was erroneously called and the
// test should be failed.
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}:
case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}:
t.Fatalf("Select was called but shouldn't have been")
// This is the correct path as Select should've be called.
@ -295,7 +295,7 @@ func TestAgentChannelFailureSignal(t *testing.T) {
quit := make(chan struct{})
heuristic := &mockHeuristic{
nodeScoresResps: make(chan map[NodeID]*AttachmentDirective),
nodeScoresResps: make(chan map[NodeID]*NodeScore),
quit: quit,
}
constraints := &mockConstraints{
@ -363,14 +363,13 @@ func TestAgentChannelFailureSignal(t *testing.T) {
// At this point, the agent should now be querying the heuristic to
// request attachment directives, return a fake so the agent will
// attempt to open a channel.
var fakeDirective = &AttachmentDirective{
NodeID: NewNodeID(node),
ChanAmt: btcutil.SatoshiPerBitcoin,
Score: 0.5,
var fakeDirective = &NodeScore{
NodeID: NewNodeID(node),
Score: 0.5,
}
select {
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{
case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{
NewNodeID(node): fakeDirective,
}:
case <-time.After(time.Second * 10):
@ -387,7 +386,7 @@ func TestAgentChannelFailureSignal(t *testing.T) {
}
select {
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}:
case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}:
case <-time.After(time.Second * 10):
t.Fatal("heuristic wasn't queried in time")
}
@ -408,7 +407,7 @@ func TestAgentChannelCloseSignal(t *testing.T) {
quit := make(chan struct{})
heuristic := &mockHeuristic{
nodeScoresResps: make(chan map[NodeID]*AttachmentDirective),
nodeScoresResps: make(chan map[NodeID]*NodeScore),
quit: quit,
}
constraints := &mockConstraints{
@ -505,7 +504,7 @@ func TestAgentChannelCloseSignal(t *testing.T) {
// If this send success, then Select was erroneously called and the
// test should be failed.
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}:
case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}:
t.Fatalf("Select was called but shouldn't have been")
// This is the correct path as Select should've be called.
@ -528,7 +527,7 @@ func TestAgentBalanceUpdate(t *testing.T) {
quit := make(chan struct{})
heuristic := &mockHeuristic{
nodeScoresResps: make(chan map[NodeID]*AttachmentDirective),
nodeScoresResps: make(chan map[NodeID]*NodeScore),
quit: quit,
}
constraints := &mockConstraints{
@ -626,7 +625,7 @@ func TestAgentBalanceUpdate(t *testing.T) {
// If this send success, then Select was erroneously called and the
// test should be failed.
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}:
case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}:
t.Fatalf("Select was called but shouldn't have been")
// This is the correct path as Select should've be called.
@ -648,7 +647,7 @@ func TestAgentImmediateAttach(t *testing.T) {
quit := make(chan struct{})
heuristic := &mockHeuristic{
nodeScoresResps: make(chan map[NodeID]*AttachmentDirective),
nodeScoresResps: make(chan map[NodeID]*NodeScore),
quit: quit,
}
constraints := &mockConstraints{
@ -707,7 +706,7 @@ func TestAgentImmediateAttach(t *testing.T) {
const numChans = 5
// We'll generate 5 mock directives so it can progress within its loop.
directives := make(map[NodeID]*AttachmentDirective)
directives := make(map[NodeID]*NodeScore)
nodeKeys := make(map[NodeID]struct{})
for i := 0; i < numChans; i++ {
pub, err := memGraph.addRandNode()
@ -715,10 +714,9 @@ func TestAgentImmediateAttach(t *testing.T) {
t.Fatalf("unable to generate key: %v", err)
}
nodeID := NewNodeID(pub)
directives[nodeID] = &AttachmentDirective{
NodeID: nodeID,
ChanAmt: btcutil.SatoshiPerBitcoin,
Score: 0.5,
directives[nodeID] = &NodeScore{
NodeID: nodeID,
Score: 0.5,
}
nodeKeys[nodeID] = struct{}{}
}
@ -786,7 +784,7 @@ func TestAgentPrivateChannels(t *testing.T) {
quit := make(chan struct{})
heuristic := &mockHeuristic{
nodeScoresResps: make(chan map[NodeID]*AttachmentDirective),
nodeScoresResps: make(chan map[NodeID]*NodeScore),
quit: quit,
}
constraints := &mockConstraints{
@ -848,16 +846,15 @@ func TestAgentPrivateChannels(t *testing.T) {
// We'll generate 5 mock directives so the pubkeys will be found in the
// agent's graph, and it can progress within its loop.
directives := make(map[NodeID]*AttachmentDirective)
directives := make(map[NodeID]*NodeScore)
for i := 0; i < numChans; i++ {
pub, err := memGraph.addRandNode()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
directives[NewNodeID(pub)] = &AttachmentDirective{
NodeID: NewNodeID(pub),
ChanAmt: btcutil.SatoshiPerBitcoin,
Score: 0.5,
directives[NewNodeID(pub)] = &NodeScore{
NodeID: NewNodeID(pub),
Score: 0.5,
}
}
@ -914,7 +911,7 @@ func TestAgentPendingChannelState(t *testing.T) {
quit := make(chan struct{})
heuristic := &mockHeuristic{
nodeScoresResps: make(chan map[NodeID]*AttachmentDirective),
nodeScoresResps: make(chan map[NodeID]*NodeScore),
quit: quit,
}
constraints := &mockConstraints{
@ -980,10 +977,9 @@ func TestAgentPendingChannelState(t *testing.T) {
t.Fatalf("unable to generate key: %v", err)
}
nodeID := NewNodeID(nodeKey)
nodeDirective := &AttachmentDirective{
NodeID: nodeID,
ChanAmt: 0.5 * btcutil.SatoshiPerBitcoin,
Score: 0.5,
nodeDirective := &NodeScore{
NodeID: nodeID,
Score: 0.5,
}
// Once again, we'll start by telling the agent as part of its first
@ -1002,7 +998,7 @@ func TestAgentPendingChannelState(t *testing.T) {
constraints.moreChanArgs = make(chan moreChanArg)
select {
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{
case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{
nodeID: nodeDirective,
}:
case <-time.After(time.Second * 10):
@ -1014,9 +1010,10 @@ func TestAgentPendingChannelState(t *testing.T) {
// A request to open the channel should've also been sent.
select {
case openChan := <-chanController.openChanSignals:
if openChan.amt != nodeDirective.ChanAmt {
chanAmt := constraints.MaxChanSize()
if openChan.amt != chanAmt {
t.Fatalf("invalid chan amt: expected %v, got %v",
nodeDirective.ChanAmt, openChan.amt)
chanAmt, openChan.amt)
}
if !openChan.target.IsEqual(nodeKey) {
t.Fatalf("unexpected key: expected %x, got %x",
@ -1044,13 +1041,14 @@ func TestAgentPendingChannelState(t *testing.T) {
// one that we just created, otherwise the agent isn't properly
// updating its internal state.
case req := <-constraints.moreChanArgs:
chanAmt := constraints.MaxChanSize()
if len(req.chans) != 1 {
t.Fatalf("should include pending chan in current "+
"state, instead have %v chans", len(req.chans))
}
if req.chans[0].Capacity != nodeDirective.ChanAmt {
if req.chans[0].Capacity != chanAmt {
t.Fatalf("wrong chan capacity: expected %v, got %v",
req.chans[0].Capacity, nodeDirective.ChanAmt)
req.chans[0].Capacity, chanAmt)
}
if req.chans[0].Node != nodeID {
t.Fatalf("wrong node ID: expected %x, got %x",
@ -1100,7 +1098,7 @@ func TestAgentPendingOpenChannel(t *testing.T) {
quit := make(chan struct{})
heuristic := &mockHeuristic{
nodeScoresResps: make(chan map[NodeID]*AttachmentDirective),
nodeScoresResps: make(chan map[NodeID]*NodeScore),
quit: quit,
}
constraints := &mockConstraints{
@ -1172,7 +1170,7 @@ func TestAgentPendingOpenChannel(t *testing.T) {
// There shouldn't be a call to the Select method as we've returned
// "false" for NeedMoreChans above.
select {
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}:
case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}:
t.Fatalf("Select was called but shouldn't have been")
default:
}
@ -1195,7 +1193,7 @@ func TestAgentOnNodeUpdates(t *testing.T) {
quit := make(chan struct{})
heuristic := &mockHeuristic{
nodeScoresResps: make(chan map[NodeID]*AttachmentDirective),
nodeScoresResps: make(chan map[NodeID]*NodeScore),
quit: quit,
}
constraints := &mockConstraints{
@ -1259,7 +1257,7 @@ func TestAgentOnNodeUpdates(t *testing.T) {
// Send over an empty list of attachment directives, which should cause
// the agent to return to waiting on a new signal.
select {
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}:
case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}:
case <-time.After(time.Second * 10):
t.Fatalf("Select was not called but should have been")
}
@ -1284,7 +1282,7 @@ func TestAgentOnNodeUpdates(t *testing.T) {
// It's not important that this list is also empty, so long as the node
// updates signal is causing the agent to make this attempt.
select {
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}:
case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}:
case <-time.After(time.Second * 10):
t.Fatalf("Select was not called but should have been")
}
@ -1308,7 +1306,8 @@ func TestAgentSkipPendingConns(t *testing.T) {
quit := make(chan struct{})
heuristic := &mockHeuristic{
nodeScoresResps: make(chan map[NodeID]*AttachmentDirective),
nodeScoresArgs: make(chan directiveArg),
nodeScoresResps: make(chan map[NodeID]*NodeScore),
quit: quit,
}
constraints := &mockConstraints{
@ -1384,12 +1383,20 @@ func TestAgentSkipPendingConns(t *testing.T) {
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
nodeDirective := &AttachmentDirective{
NodeID: NewNodeID(nodeKey),
ChanAmt: 0.5 * btcutil.SatoshiPerBitcoin,
Score: 0.5,
nodeID := NewNodeID(nodeKey)
nodeDirective := &NodeScore{
NodeID: nodeID,
Score: 0.5,
}
// We'll also add a second node to the graph, to keep the first one
// company.
nodeKey2, err := memGraph.addRandNode()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
nodeID2 := NewNodeID(nodeKey2)
// We'll send an initial "yes" response to advance the agent past its
// initial check. This will cause it to try to get directives from the
// graph.
@ -1402,14 +1409,34 @@ func TestAgentSkipPendingConns(t *testing.T) {
t.Fatalf("heuristic wasn't queried in time")
}
// Both nodes should be part of the arguments.
select {
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{
case req := <-heuristic.nodeScoresArgs:
if len(req.nodes) != 2 {
t.Fatalf("expected %v nodes, instead "+
"had %v", 2, len(req.nodes))
}
if _, ok := req.nodes[nodeID]; !ok {
t.Fatalf("node not included in arguments")
}
if _, ok := req.nodes[nodeID2]; !ok {
t.Fatalf("node not included in arguments")
}
case <-time.After(time.Second * 10):
t.Fatalf("select wasn't queried in time")
}
// Respond with a scored directive. We skip node2 for now, implicitly
// giving it a zero-score.
select {
case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{
NewNodeID(nodeKey): nodeDirective,
}:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
// The agent should attempt connection to the node.
var errChan chan error
select {
case errChan = <-connect:
@ -1430,17 +1457,30 @@ func TestAgentSkipPendingConns(t *testing.T) {
t.Fatalf("heuristic wasn't queried in time")
}
// Send a directive for the same node, which already has a pending conn.
// Since the node now has a pending connection, it should be skipped
// and not part of the nodes attempting to be scored.
select {
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{
NewNodeID(nodeKey): nodeDirective,
}:
case req := <-heuristic.nodeScoresArgs:
if len(req.nodes) != 1 {
t.Fatalf("expected %v nodes, instead "+
"had %v", 1, len(req.nodes))
}
if _, ok := req.nodes[nodeID2]; !ok {
t.Fatalf("node not included in arguments")
}
case <-time.After(time.Second * 10):
t.Fatalf("select wasn't queried in time")
}
// Respond with an emtpty score set.
select {
case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
// This time, the agent should skip trying to connect to the node with a
// pending connection.
// The agent should not attempt any connection, since no nodes were
// scored.
select {
case <-connect:
t.Fatalf("agent should not have attempted connection")
@ -1466,20 +1506,39 @@ func TestAgentSkipPendingConns(t *testing.T) {
t.Fatalf("heuristic wasn't queried in time")
}
// Send a directive for the same node, which already has a pending conn.
// The node should now be marked as "failed", which should make it
// being skipped during scoring. Again check that it won't be among the
// score request.
select {
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{
NewNodeID(nodeKey): nodeDirective,
case req := <-heuristic.nodeScoresArgs:
if len(req.nodes) != 1 {
t.Fatalf("expected %v nodes, instead "+
"had %v", 1, len(req.nodes))
}
if _, ok := req.nodes[nodeID2]; !ok {
t.Fatalf("node not included in arguments")
}
case <-time.After(time.Second * 10):
t.Fatalf("select wasn't queried in time")
}
// Send a directive for the second node.
nodeDirective2 := &NodeScore{
NodeID: nodeID2,
Score: 0.5,
}
select {
case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{
nodeID2: nodeDirective2,
}:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
// This time, the agent should try the connection since the peer has
// been removed from the pending map.
// This time, the agent should try the connection to the second node.
select {
case <-connect:
case <-time.After(time.Second * 10):
t.Fatalf("agent have attempted connection")
t.Fatalf("agent should have attempted connection")
}
}

@ -110,10 +110,6 @@ type AttachmentDirective struct {
// Addrs is a list of addresses that the target peer may be reachable
// at.
Addrs []net.Addr
// Score is the score given by the heuristic for opening a channel of
// the given size to this node.
Score float64
}
// AttachmentHeuristic is one of the primary interfaces within this package.
@ -126,8 +122,8 @@ type AttachmentHeuristic interface {
// NodeScores is a method that given the current channel graph and
// current set of local channels, scores the given nodes according to
// the preference of opening a channel of the given size with them. The
// returned channel candidates maps the NodeID to an attachment
// directive containing a score.
// returned channel candidates maps the NodeID to a NodeScore for the
// node.
//
// The scores will be in the range [0, M], where 0 indicates no
// improvement in connectivity if a channel is opened to this node,
@ -139,7 +135,7 @@ type AttachmentHeuristic interface {
// score of 0.
NodeScores(g ChannelGraph, chans []Channel,
chanSize btcutil.Amount, nodes map[NodeID]struct{}) (
map[NodeID]*AttachmentDirective, error)
map[NodeID]*NodeScore, error)
}
// ChannelController is a simple interface that allows an auto-pilot agent to

@ -62,7 +62,7 @@ func NewNodeID(pub *btcec.PublicKey) NodeID {
// NOTE: This is a part of the AttachmentHeuristic interface.
func (p *PrefAttachment) NodeScores(g ChannelGraph, chans []Channel,
chanSize btcutil.Amount, nodes map[NodeID]struct{}) (
map[NodeID]*AttachmentDirective, error) {
map[NodeID]*NodeScore, error) {
// Count the number of channels in the graph. We'll also count the
// number of channels as we go for the nodes we are interested in.
@ -108,7 +108,7 @@ func (p *PrefAttachment) NodeScores(g ChannelGraph, chans []Channel,
// For each node in the set of nodes, count their fraction of channels
// in the graph, and use that as the score.
candidates := make(map[NodeID]*AttachmentDirective)
candidates := make(map[NodeID]*NodeScore)
for nID, nodeChans := range nodeChanNum {
_, ok := existingPeers[nID]
@ -129,10 +129,9 @@ func (p *PrefAttachment) NodeScores(g ChannelGraph, chans []Channel,
// Otherwise we score the node according to its fraction of
// channels in the graph.
score := float64(nodeChans) / float64(graphChans)
candidates[nID] = &AttachmentDirective{
NodeID: nID,
ChanAmt: chanSize,
Score: score,
candidates[nID] = &NodeScore{
NodeID: nID,
Score: score,
}
}

@ -247,15 +247,6 @@ func TestPrefAttachmentSelectTwoVertexes(t *testing.T) {
nodeID[:])
}
// As the number of funds available exceed the
// max channel size, both edges should consume
// the maximum channel size.
if candidate.ChanAmt != maxChanSize {
t1.Fatalf("max channel size should be "+
"allocated, instead %v was: ",
maxChanSize)
}
// Since each of the nodes has 1 channel, out
// of only one channel in the graph, we expect
// their score to be 0.5.
@ -273,68 +264,6 @@ func TestPrefAttachmentSelectTwoVertexes(t *testing.T) {
}
}
// TestPrefAttachmentSelectInsufficientFunds ensures that if the
// balance of the backing wallet is below the set min channel size, then it
// never recommends candidates to attach to.
func TestPrefAttachmentSelectInsufficientFunds(t *testing.T) {
t.Parallel()
prand.Seed(time.Now().Unix())
const (
maxChanSize = btcutil.Amount(btcutil.SatoshiPerBitcoin)
)
for _, graph := range chanGraphs {
success := t.Run(graph.name, func(t1 *testing.T) {
graph, cleanup, err := graph.genFunc()
if err != nil {
t1.Fatalf("unable to create graph: %v", err)
}
if cleanup != nil {
defer cleanup()
}
// Add 10 nodes to the graph, with channels between
// them.
completeGraph(t, graph, 10)
prefAttach := NewPrefAttachment()
nodes := make(map[NodeID]struct{})
if err := graph.ForEachNode(func(n Node) error {
nodes[n.PubKey()] = struct{}{}
return nil
}); err != nil {
t1.Fatalf("unable to traverse graph: %v", err)
}
// With the necessary state initialized, we'll now
// attempt to get the score for our list of nodes,
// passing zero for the amount of wallet funds. This
// should return candidates with zero-value channels.
scores, err := prefAttach.NodeScores(graph, nil,
0, nodes)
if err != nil {
t1.Fatalf("unable to select attachment "+
"directives: %v", err)
}
// Since all should be given a score of 0, the map
// should be empty.
for _, s := range scores {
if s.ChanAmt != 0 {
t1.Fatalf("expected zero channel, "+
"instead got %v ", s.ChanAmt)
}
}
})
if !success {
break
}
}
}
// TestPrefAttachmentSelectGreedyAllocation tests that if upon
// returning node scores, the NodeScores method will attempt to greedily
// allocate all funds to each vertex (up to the max channel size).
@ -437,12 +366,6 @@ func TestPrefAttachmentSelectGreedyAllocation(t *testing.T) {
if candidate.Score == 0 {
t1.Fatalf("Expected non-zero score")
}
if candidate.ChanAmt != maxChanSize {
t1.Fatalf("expected recommendation "+
"of %v, instead got %v",
maxChanSize, candidate.ChanAmt)
}
}
// Imagine a few channels are being opened, and there's
@ -467,12 +390,6 @@ func TestPrefAttachmentSelectGreedyAllocation(t *testing.T) {
if candidate.Score == 0 {
t1.Fatalf("Expected non-zero score")
}
if candidate.ChanAmt != remBalance {
t1.Fatalf("expected recommendation "+
"of %v, instead got %v",
remBalance, candidate.ChanAmt)
}
}
})
if !success {
@ -546,12 +463,6 @@ func TestPrefAttachmentSelectSkipNodes(t *testing.T) {
if candidate.Score == 0 {
t1.Fatalf("Expected non-zero score")
}
if candidate.ChanAmt != maxChanSize {
t1.Fatalf("expected recommendation "+
"of %v, instead got %v",
maxChanSize, candidate.ChanAmt)
}
}
// We'll simulate a channel update by adding the nodes