autopilot: move address lookup from heuristic to agent

To avoid having the heuristics deal with (possibly conflicting) address
lookups, we let the agent handle them.
This commit is contained in:
Johan T. Halseth 2018-12-19 14:54:54 +01:00
parent cfd237bf1f
commit ccf4b7feab
No known key found for this signature in database
GPG Key ID: 15BAADA29DA20D26
4 changed files with 85 additions and 128 deletions

@ -525,6 +525,7 @@ func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32,
// want to skip. // want to skip.
selfPubBytes := a.cfg.Self.SerializeCompressed() selfPubBytes := a.cfg.Self.SerializeCompressed()
nodes := make(map[NodeID]struct{}) nodes := make(map[NodeID]struct{})
addresses := make(map[NodeID][]net.Addr)
if err := a.cfg.Graph.ForEachNode(func(node Node) error { if err := a.cfg.Graph.ForEachNode(func(node Node) error {
nID := NodeID(node.PubKey()) nID := NodeID(node.PubKey())
@ -535,6 +536,14 @@ func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32,
return nil return nil
} }
// If the node has no known addresses, we cannot connect to it,
// so we'll skip it.
addrs := node.Addrs()
if len(addrs) == 0 {
return nil
}
addresses[nID] = addrs
// Additionally, if this node is in the blacklist, then // Additionally, if this node is in the blacklist, then
// we'll skip it. // we'll skip it.
if _, ok := nodesToSkip[nID]; ok { if _, ok := nodesToSkip[nID]; ok {
@ -562,6 +571,12 @@ func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32,
return fmt.Errorf("unable to calculate node scores : %v", err) return fmt.Errorf("unable to calculate node scores : %v", err)
} }
// Add addresses to the candidates.
for nID, c := range scores {
addrs := addresses[nID]
c.Addrs = addrs
}
log.Debugf("Got scores for %d nodes", len(scores)) log.Debugf("Got scores for %d nodes", len(scores))
// Now use the score to make a weighted choice which // Now use the score to make a weighted choice which

@ -1,7 +1,6 @@
package autopilot package autopilot
import ( import (
"bytes"
"errors" "errors"
"fmt" "fmt"
"net" "net"
@ -306,6 +305,10 @@ func TestAgentChannelFailureSignal(t *testing.T) {
chanController := &mockFailingChanController{} chanController := &mockFailingChanController{}
memGraph, _, _ := newMemChanGraph() memGraph, _, _ := newMemChanGraph()
node, err := memGraph.addRandNode()
if err != nil {
t.Fatalf("unable to add node: %v", err)
}
// With the dependencies we created, we can now create the initial // With the dependencies we created, we can now create the initial
// agent itself. // agent itself.
@ -316,6 +319,7 @@ func TestAgentChannelFailureSignal(t *testing.T) {
WalletBalance: func() (btcutil.Amount, error) { WalletBalance: func() (btcutil.Amount, error) {
return 0, nil return 0, nil
}, },
// TODO: move address check to agent.
ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) {
return false, nil return false, nil
}, },
@ -360,19 +364,14 @@ func TestAgentChannelFailureSignal(t *testing.T) {
// request attachment directives, return a fake so the agent will // request attachment directives, return a fake so the agent will
// attempt to open a channel. // attempt to open a channel.
var fakeDirective = &AttachmentDirective{ var fakeDirective = &AttachmentDirective{
NodeID: NewNodeID(self), NodeID: NewNodeID(node),
ChanAmt: btcutil.SatoshiPerBitcoin, ChanAmt: btcutil.SatoshiPerBitcoin,
Addrs: []net.Addr{ Score: 0.5,
&net.TCPAddr{
IP: bytes.Repeat([]byte("a"), 16),
},
},
Score: 0.5,
} }
select { select {
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{ case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{
NewNodeID(self): fakeDirective, NewNodeID(node): fakeDirective,
}: }:
case <-time.After(time.Second * 10): case <-time.After(time.Second * 10):
t.Fatal("heuristic wasn't queried in time") t.Fatal("heuristic wasn't queried in time")
@ -707,6 +706,22 @@ func TestAgentImmediateAttach(t *testing.T) {
const numChans = 5 const numChans = 5
// We'll generate 5 mock directives so it can progress within its loop.
directives := make(map[NodeID]*AttachmentDirective)
nodeKeys := make(map[NodeID]struct{})
for i := 0; i < numChans; i++ {
pub, err := memGraph.addRandNode()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
nodeID := NewNodeID(pub)
directives[nodeID] = &AttachmentDirective{
NodeID: nodeID,
ChanAmt: btcutil.SatoshiPerBitcoin,
Score: 0.5,
}
nodeKeys[nodeID] = struct{}{}
}
// The very first thing the agent should do is query the NeedMoreChans // The very first thing the agent should do is query the NeedMoreChans
// method on the passed heuristic. So we'll provide it with a response // method on the passed heuristic. So we'll provide it with a response
// that will kick off the main loop. // that will kick off the main loop.
@ -724,31 +739,9 @@ func TestAgentImmediateAttach(t *testing.T) {
} }
// At this point, the agent should now be querying the heuristic to // At this point, the agent should now be querying the heuristic to
// requests attachment directives. We'll generate 5 mock directives so // requests attachment directives. With our fake directives created,
// it can progress within its loop. // we'll now send then to the agent as a return value for the Select
directives := make(map[NodeID]*AttachmentDirective) // function.
nodeKeys := make(map[NodeID]struct{})
for i := 0; i < numChans; i++ {
pub, err := randKey()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
nodeID := NewNodeID(pub)
directives[nodeID] = &AttachmentDirective{
NodeID: nodeID,
ChanAmt: btcutil.SatoshiPerBitcoin,
Addrs: []net.Addr{
&net.TCPAddr{
IP: bytes.Repeat([]byte("a"), 16),
},
},
Score: 0.5,
}
nodeKeys[nodeID] = struct{}{}
}
// With our fake directives created, we'll now send then to the agent
// as a return value for the Select function.
select { select {
case heuristic.nodeScoresResps <- directives: case heuristic.nodeScoresResps <- directives:
case <-time.After(time.Second * 10): case <-time.After(time.Second * 10):
@ -853,6 +846,21 @@ func TestAgentPrivateChannels(t *testing.T) {
const numChans = 5 const numChans = 5
// We'll generate 5 mock directives so the pubkeys will be found in the
// agent's graph, and it can progress within its loop.
directives := make(map[NodeID]*AttachmentDirective)
for i := 0; i < numChans; i++ {
pub, err := memGraph.addRandNode()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
directives[NewNodeID(pub)] = &AttachmentDirective{
NodeID: NewNodeID(pub),
ChanAmt: btcutil.SatoshiPerBitcoin,
Score: 0.5,
}
}
// The very first thing the agent should do is query the NeedMoreChans // The very first thing the agent should do is query the NeedMoreChans
// method on the passed heuristic. So we'll provide it with a response // method on the passed heuristic. So we'll provide it with a response
// that will kick off the main loop. We'll send over a response // that will kick off the main loop. We'll send over a response
@ -867,30 +875,10 @@ func TestAgentPrivateChannels(t *testing.T) {
case <-time.After(time.Second * 10): case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time") t.Fatalf("heuristic wasn't queried in time")
} }
// At this point, the agent should now be querying the heuristic to // At this point, the agent should now be querying the heuristic to
// requests attachment directives. We'll generate 5 mock directives so // requests attachment directives. With our fake directives created,
// it can progress within its loop. // we'll now send then to the agent as a return value for the Select
directives := make(map[NodeID]*AttachmentDirective) // function.
for i := 0; i < numChans; i++ {
pub, err := randKey()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
directives[NewNodeID(pub)] = &AttachmentDirective{
NodeID: NewNodeID(pub),
ChanAmt: btcutil.SatoshiPerBitcoin,
Addrs: []net.Addr{
&net.TCPAddr{
IP: bytes.Repeat([]byte("a"), 16),
},
},
Score: 0.5,
}
}
// With our fake directives created, we'll now send then to the agent
// as a return value for the Select function.
select { select {
case heuristic.nodeScoresResps <- directives: case heuristic.nodeScoresResps <- directives:
case <-time.After(time.Second * 10): case <-time.After(time.Second * 10):
@ -986,6 +974,18 @@ func TestAgentPendingChannelState(t *testing.T) {
// exiting. // exiting.
defer close(quit) defer close(quit)
// We'll only return a single directive for a pre-chosen node.
nodeKey, err := memGraph.addRandNode()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
nodeID := NewNodeID(nodeKey)
nodeDirective := &AttachmentDirective{
NodeID: nodeID,
ChanAmt: 0.5 * btcutil.SatoshiPerBitcoin,
Score: 0.5,
}
// Once again, we'll start by telling the agent as part of its first // Once again, we'll start by telling the agent as part of its first
// query, that it needs more channels and has 3 BTC available for // query, that it needs more channels and has 3 BTC available for
// attachment. We'll send over a response indicating that it should // attachment. We'll send over a response indicating that it should
@ -1001,25 +1001,6 @@ func TestAgentPendingChannelState(t *testing.T) {
constraints.moreChanArgs = make(chan moreChanArg) constraints.moreChanArgs = make(chan moreChanArg)
// Next, the agent should deliver a query to the Select method of the
// heuristic. We'll only return a single directive for a pre-chosen
// node.
nodeKey, err := randKey()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
nodeID := NewNodeID(nodeKey)
nodeDirective := &AttachmentDirective{
NodeID: nodeID,
ChanAmt: 0.5 * btcutil.SatoshiPerBitcoin,
Addrs: []net.Addr{
&net.TCPAddr{
IP: bytes.Repeat([]byte("a"), 16),
},
},
Score: 0.5,
}
select { select {
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{ case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{
nodeID: nodeDirective, nodeID: nodeDirective,
@ -1398,6 +1379,17 @@ func TestAgentSkipPendingConns(t *testing.T) {
// exiting. // exiting.
defer close(quit) defer close(quit)
// We'll only return a single directive for a pre-chosen node.
nodeKey, err := memGraph.addRandNode()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
nodeDirective := &AttachmentDirective{
NodeID: NewNodeID(nodeKey),
ChanAmt: 0.5 * btcutil.SatoshiPerBitcoin,
Score: 0.5,
}
// We'll send an initial "yes" response to advance the agent past its // We'll send an initial "yes" response to advance the agent past its
// initial check. This will cause it to try to get directives from the // initial check. This will cause it to try to get directives from the
// graph. // graph.
@ -1410,24 +1402,6 @@ func TestAgentSkipPendingConns(t *testing.T) {
t.Fatalf("heuristic wasn't queried in time") t.Fatalf("heuristic wasn't queried in time")
} }
// Next, the agent should deliver a query to the Select method of the
// heuristic. We'll only return a single directive for a pre-chosen
// node.
nodeKey, err := randKey()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
nodeDirective := &AttachmentDirective{
NodeID: NewNodeID(nodeKey),
ChanAmt: 0.5 * btcutil.SatoshiPerBitcoin,
Addrs: []net.Addr{
&net.TCPAddr{
IP: bytes.Repeat([]byte("a"), 16),
},
},
Score: 0.5,
}
select { select {
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{ case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{
NewNodeID(nodeKey): nodeDirective, NewNodeID(nodeKey): nodeDirective,

@ -2,7 +2,6 @@ package autopilot
import ( import (
prand "math/rand" prand "math/rand"
"net"
"time" "time"
"github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/btcec"
@ -75,11 +74,9 @@ func (p *ConstrainedPrefAttachment) NodeScores(g ChannelGraph, chans []Channel,
map[NodeID]*AttachmentDirective, error) { map[NodeID]*AttachmentDirective, error) {
// Count the number of channels in the graph. We'll also count the // Count the number of channels in the graph. We'll also count the
// number of channels as we go for the nodes we are interested in, and // number of channels as we go for the nodes we are interested in.
// record their addresses found in the db.
var graphChans int var graphChans int
nodeChanNum := make(map[NodeID]int) nodeChanNum := make(map[NodeID]int)
addresses := make(map[NodeID][]net.Addr)
if err := g.ForEachNode(func(n Node) error { if err := g.ForEachNode(func(n Node) error {
var nodeChans int var nodeChans int
err := n.ForEachChannel(func(_ ChannelEdge) error { err := n.ForEachChannel(func(_ ChannelEdge) error {
@ -98,10 +95,8 @@ func (p *ConstrainedPrefAttachment) NodeScores(g ChannelGraph, chans []Channel,
return nil return nil
} }
// Otherwise we'll record the number of channels, and also // Otherwise we'll record the number of channels.
// populate the address in our channel candidates map.
nodeChanNum[nID] = nodeChans nodeChanNum[nID] = nodeChans
addresses[nID] = n.Addrs()
return nil return nil
}); err != nil { }); err != nil {
@ -126,7 +121,6 @@ func (p *ConstrainedPrefAttachment) NodeScores(g ChannelGraph, chans []Channel,
for nID, nodeChans := range nodeChanNum { for nID, nodeChans := range nodeChanNum {
_, ok := existingPeers[nID] _, ok := existingPeers[nID]
addrs := addresses[nID]
switch { switch {
@ -135,11 +129,6 @@ func (p *ConstrainedPrefAttachment) NodeScores(g ChannelGraph, chans []Channel,
case ok: case ok:
continue continue
// If the node has no addresses, we cannot connect to it, so we
// skip it for now, which implicitly gives it a score of 0.
case len(addrs) == 0:
continue
// If the node had no channels, we skip it, since it would have // If the node had no channels, we skip it, since it would have
// gotten a zero score anyway. // gotten a zero score anyway.
case nodeChans == 0: case nodeChans == 0:
@ -152,7 +141,6 @@ func (p *ConstrainedPrefAttachment) NodeScores(g ChannelGraph, chans []Channel,
candidates[nID] = &AttachmentDirective{ candidates[nID] = &AttachmentDirective{
NodeID: nID, NodeID: nID,
ChanAmt: chanSize, ChanAmt: chanSize,
Addrs: addrs,
Score: score, Score: score,
} }
} }

@ -287,11 +287,6 @@ func TestConstrainedPrefAttachmentSelectTwoVertexes(t *testing.T) {
"to be %v, instead was %v", "to be %v, instead was %v",
expScore, candidate.Score) expScore, candidate.Score)
} }
if len(candidate.Addrs) == 0 {
t1.Fatalf("expected node to have " +
"available addresses, didn't")
}
} }
}) })
if !success { if !success {
@ -492,11 +487,6 @@ func TestConstrainedPrefAttachmentSelectGreedyAllocation(t *testing.T) {
"of %v, instead got %v", "of %v, instead got %v",
maxChanSize, candidate.ChanAmt) maxChanSize, candidate.ChanAmt)
} }
if len(candidate.Addrs) == 0 {
t1.Fatalf("expected node to have " +
"available addresses, didn't")
}
} }
// Imagine a few channels are being opened, and there's // Imagine a few channels are being opened, and there's
@ -527,11 +517,6 @@ func TestConstrainedPrefAttachmentSelectGreedyAllocation(t *testing.T) {
"of %v, instead got %v", "of %v, instead got %v",
remBalance, candidate.ChanAmt) remBalance, candidate.ChanAmt)
} }
if len(candidate.Addrs) == 0 {
t1.Fatalf("expected node to have " +
"available addresses, didn't")
}
} }
}) })
if !success { if !success {
@ -622,11 +607,6 @@ func TestConstrainedPrefAttachmentSelectSkipNodes(t *testing.T) {
"of %v, instead got %v", "of %v, instead got %v",
maxChanSize, candidate.ChanAmt) maxChanSize, candidate.ChanAmt)
} }
if len(candidate.Addrs) == 0 {
t1.Fatalf("expected node to have " +
"available addresses, didn't")
}
} }
// We'll simulate a channel update by adding the nodes // We'll simulate a channel update by adding the nodes