Merge pull request #1818 from cfromknecht/aplt-track-pending-conns

autopilot: prevent duplicate conns to same peer
This commit is contained in:
Olaoluwa Osuntokun 2018-09-05 21:01:18 -07:00 committed by GitHub
commit a1137a4f68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 419 additions and 324 deletions

@ -301,22 +301,26 @@ func (a *Agent) OnChannelClose(closedChans ...lnwire.ShortChannelID) {
} }
// mergeNodeMaps merges the Agent's set of nodes that it already has active // mergeNodeMaps merges the Agent's set of nodes that it already has active
// channels open to, with the set of nodes that are pending new channels. This // channels open to, with the other sets of nodes that should be removed from
// ensures that the Agent doesn't attempt to open any "duplicate" channels to // consideration during heuristic selection. This ensures that the Agent doesn't
// the same node. // attempt to open any "duplicate" channels to the same node.
func mergeNodeMaps(a map[NodeID]struct{}, b map[NodeID]struct{}, func mergeNodeMaps(c map[NodeID]Channel,
c map[NodeID]Channel) map[NodeID]struct{} { skips ...map[NodeID]struct{}) map[NodeID]struct{} {
res := make(map[NodeID]struct{}, len(a)+len(b)+len(c)) numNodes := len(c)
for nodeID := range a { for _, skip := range skips {
res[nodeID] = struct{}{} numNodes += len(skip)
}
for nodeID := range b {
res[nodeID] = struct{}{}
} }
res := make(map[NodeID]struct{}, len(c)+numNodes)
for nodeID := range c { for nodeID := range c {
res[nodeID] = struct{}{} res[nodeID] = struct{}{}
} }
for _, skip := range skips {
for nodeID := range skip {
res[nodeID] = struct{}{}
}
}
return res return res
} }
@ -360,6 +364,11 @@ func (a *Agent) controller() {
// channels with, but didn't succeed. // channels with, but didn't succeed.
failedNodes := make(map[NodeID]struct{}) failedNodes := make(map[NodeID]struct{})
// pendingConns tracks the nodes that we are attempting to make
// connections to. This prevents us from making duplicate connection
// requests to the same node.
pendingConns := make(map[NodeID]struct{})
// pendingOpens tracks the channels that we've requested to be // pendingOpens tracks the channels that we've requested to be
// initiated, but haven't yet been confirmed as being fully opened. // initiated, but haven't yet been confirmed as being fully opened.
// This state is required as otherwise, we may go over our allotted // This state is required as otherwise, we may go over our allotted
@ -481,7 +490,9 @@ func (a *Agent) controller() {
// duplicate edges. // duplicate edges.
connectedNodes := a.chanState.ConnectedNodes() connectedNodes := a.chanState.ConnectedNodes()
pendingMtx.Lock() pendingMtx.Lock()
nodesToSkip := mergeNodeMaps(connectedNodes, failedNodes, pendingOpens) nodesToSkip := mergeNodeMaps(pendingOpens,
pendingConns, connectedNodes, failedNodes,
)
pendingMtx.Unlock() pendingMtx.Unlock()
// If we reach this point, then according to our heuristic we // If we reach this point, then according to our heuristic we
@ -507,32 +518,40 @@ func (a *Agent) controller() {
log.Infof("Attempting to execute channel attachment "+ log.Infof("Attempting to execute channel attachment "+
"directives: %v", spew.Sdump(chanCandidates)) "directives: %v", spew.Sdump(chanCandidates))
// Before proceeding, check to see if we have any slots
// available to open channels. If there are any, we will attempt
// to dispatch the retrieved directives since we can't be
// certain which ones may actually succeed. If too many
// connections succeed, we will they will be ignored and made
// available to future heuristic selections.
pendingMtx.Lock()
if uint16(len(pendingOpens)) >= a.cfg.MaxPendingOpens {
pendingMtx.Unlock()
log.Debugf("Reached cap of %v pending "+
"channel opens, will retry "+
"after success/failure",
a.cfg.MaxPendingOpens)
continue
}
// For each recommended attachment directive, we'll launch a // For each recommended attachment directive, we'll launch a
// new goroutine to attempt to carry out the directive. If any // new goroutine to attempt to carry out the directive. If any
// of these succeed, then we'll receive a new state update, // of these succeed, then we'll receive a new state update,
// taking us back to the top of our controller loop. // taking us back to the top of our controller loop.
pendingMtx.Lock()
for _, chanCandidate := range chanCandidates { for _, chanCandidate := range chanCandidates {
// Before we proceed, we'll check to see if this // Skip candidates which we are already trying
// attempt would take us past the total number of // to establish a connection with.
// allowed pending opens. If so, then we'll skip this nodeID := chanCandidate.NodeID
// round and wait for an attempt to either fail or if _, ok := pendingConns[nodeID]; ok {
// succeed.
if uint16(len(pendingOpens))+1 >
a.cfg.MaxPendingOpens {
log.Debugf("Reached cap of %v pending "+
"channel opens, will retry "+
"after success/failure",
a.cfg.MaxPendingOpens)
continue continue
} }
pendingConns[nodeID] = struct{}{}
go func(directive AttachmentDirective) { go func(directive AttachmentDirective) {
// We'll start out by attempting to connect to // We'll start out by attempting to connect to
// the peer in order to begin the funding // the peer in order to begin the funding
// workflow. // workflow.
pub := directive.PeerKey pub := directive.NodeKey
alreadyConnected, err := a.cfg.ConnectToPeer( alreadyConnected, err := a.cfg.ConnectToPeer(
pub, directive.Addrs, pub, directive.Addrs,
) )
@ -548,6 +567,7 @@ func (a *Agent) controller() {
// again. // again.
nodeID := NewNodeID(pub) nodeID := NewNodeID(pub)
pendingMtx.Lock() pendingMtx.Lock()
delete(pendingConns, nodeID)
failedNodes[nodeID] = struct{}{} failedNodes[nodeID] = struct{}{}
pendingMtx.Unlock() pendingMtx.Unlock()
@ -558,24 +578,31 @@ func (a *Agent) controller() {
return return
} }
// If we were succesful, we'll track this peer // The connection was successful, though before
// in our set of pending opens. We do this here // progressing we must check that we have not
// to ensure we don't stall on selecting new // already met our quota for max pending open
// peers if the connection attempt happens to // channels. This can happen if multiple
// take too long. // directives were spawned but fewer slots were
// available, and other successful attempts
// finished first.
pendingMtx.Lock() pendingMtx.Lock()
if uint16(len(pendingOpens))+1 > if uint16(len(pendingOpens)) >=
a.cfg.MaxPendingOpens { a.cfg.MaxPendingOpens {
// Since we've reached our max number of
pendingMtx.Unlock() // pending opens, we'll disconnect this
// peer and exit. However, if we were
// Since we've reached our max number // previously connected to them, then
// of pending opens, we'll disconnect // we'll make sure to maintain the
// this peer and exit. However, if we
// were previously connected to them,
// then we'll make sure to maintain the
// connection alive. // connection alive.
if alreadyConnected { if alreadyConnected {
// Since we succeeded in
// connecting, we won't add this
// peer to the failed nodes map,
// but we will remove it from
// pendingConns so that it can
// be retried in the future.
delete(pendingConns, nodeID)
pendingMtx.Unlock()
return return
} }
@ -589,10 +616,23 @@ func (a *Agent) controller() {
pub.SerializeCompressed(), pub.SerializeCompressed(),
err) err)
} }
// Now that we have disconnected, we can
// remove this node from our pending
// conns map, permitting subsequent
// connection attempts.
delete(pendingConns, nodeID)
pendingMtx.Unlock()
return return
} }
nodeID := NewNodeID(directive.PeerKey) // If we were successful, we'll track this peer
// in our set of pending opens. We do this here
// to ensure we don't stall on selecting new
// peers if the connection attempt happens to
// take too long.
nodeID := directive.NodeID
delete(pendingConns, nodeID)
pendingOpens[nodeID] = Channel{ pendingOpens[nodeID] = Channel{
Capacity: directive.ChanAmt, Capacity: directive.ChanAmt,
Node: nodeID, Node: nodeID,

@ -3,6 +3,7 @@ package autopilot
import ( import (
"bytes" "bytes"
"errors" "errors"
"fmt"
"net" "net"
"sync" "sync"
"testing" "testing"
@ -186,22 +187,13 @@ func TestAgentChannelOpenSignal(t *testing.T) {
} }
defer agent.Stop() defer agent.Stop()
var wg sync.WaitGroup
// We'll send an initial "no" response to advance the agent past its // We'll send an initial "no" response to advance the agent past its
// initial check. // initial check.
wg.Add(1) select {
go func() { case heuristic.moreChansResps <- moreChansResp{false, 0, 0}:
select { case <-time.After(time.Second * 10):
case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: t.Fatalf("heuristic wasn't queried in time")
wg.Done() }
return
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
}()
wg.Wait()
// Next we'll signal a new channel being opened by the backing LN node, // Next we'll signal a new channel being opened by the backing LN node,
// with a capacity of 1 BTC. // with a capacity of 1 BTC.
@ -211,34 +203,20 @@ func TestAgentChannelOpenSignal(t *testing.T) {
} }
agent.OnChannelOpen(newChan) agent.OnChannelOpen(newChan)
wg = sync.WaitGroup{}
// The agent should now query the heuristic in order to determine its // The agent should now query the heuristic in order to determine its
// next action as it local state has now been modified. // next action as it local state has now been modified.
wg.Add(1) select {
go func() { case heuristic.moreChansResps <- moreChansResp{false, 0, 0}:
select { // At this point, the local state of the agent should
case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: // have also been updated to reflect that the LN node
// At this point, the local state of the agent should // now has an additional channel with one BTC.
// have also been updated to reflect that the LN node if _, ok := agent.chanState[newChan.ChanID]; !ok {
// now has an additional channel with one BTC. t.Fatalf("internal channel state wasn't updated")
if _, ok := agent.chanState[newChan.ChanID]; !ok {
t.Fatalf("internal channel state wasn't updated")
}
// With all of our assertions passed, we'll signal the
// main test goroutine to continue the test.
wg.Done()
return
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
} }
}()
// We'll wait here for either the agent to query the heuristic to be case <-time.After(time.Second * 10):
// queried, or for the timeout above to tick. t.Fatalf("heuristic wasn't queried in time")
wg.Wait() }
// There shouldn't be a call to the Select method as we've returned // There shouldn't be a call to the Select method as we've returned
// "false" for NeedMoreChans above. // "false" for NeedMoreChans above.
@ -343,7 +321,8 @@ func TestAgentChannelFailureSignal(t *testing.T) {
// request attachment directives, return a fake so the agent will // request attachment directives, return a fake so the agent will
// attempt to open a channel. // attempt to open a channel.
var fakeDirective = AttachmentDirective{ var fakeDirective = AttachmentDirective{
PeerKey: self, NodeKey: self,
NodeID: NewNodeID(self),
ChanAmt: btcutil.SatoshiPerBitcoin, ChanAmt: btcutil.SatoshiPerBitcoin,
Addrs: []net.Addr{ Addrs: []net.Addr{
&net.TCPAddr{ &net.TCPAddr{
@ -441,55 +420,32 @@ func TestAgentChannelCloseSignal(t *testing.T) {
} }
defer agent.Stop() defer agent.Stop()
var wg sync.WaitGroup
// We'll send an initial "no" response to advance the agent past its // We'll send an initial "no" response to advance the agent past its
// initial check. // initial check.
wg.Add(1) select {
go func() { case heuristic.moreChansResps <- moreChansResp{false, 0, 0}:
select { case <-time.After(time.Second * 10):
case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: t.Fatalf("heuristic wasn't queried in time")
wg.Done() }
return
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
}()
wg.Wait()
// Next, we'll close both channels which should force the agent to // Next, we'll close both channels which should force the agent to
// re-query the heuristic. // re-query the heuristic.
agent.OnChannelClose(initialChans[0].ChanID, initialChans[1].ChanID) agent.OnChannelClose(initialChans[0].ChanID, initialChans[1].ChanID)
wg = sync.WaitGroup{}
// The agent should now query the heuristic in order to determine its // The agent should now query the heuristic in order to determine its
// next action as it local state has now been modified. // next action as it local state has now been modified.
wg.Add(1) select {
go func() { case heuristic.moreChansResps <- moreChansResp{false, 0, 0}:
select { // At this point, the local state of the agent should
case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: // have also been updated to reflect that the LN node
// At this point, the local state of the agent should // has no existing open channels.
// have also been updated to reflect that the LN node if len(agent.chanState) != 0 {
// has no existing open channels. t.Fatalf("internal channel state wasn't updated")
if len(agent.chanState) != 0 {
t.Fatalf("internal channel state wasn't updated")
}
// With all of our assertions passed, we'll signal the
// main test goroutine to continue the test.
wg.Done()
return
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
} }
}()
// We'll wait here for either the agent to query the heuristic to be case <-time.After(time.Second * 10):
// queried, or for the timeout above to tick. t.Fatalf("heuristic wasn't queried in time")
wg.Wait() }
// There shouldn't be a call to the Select method as we've returned // There shouldn't be a call to the Select method as we've returned
// "false" for NeedMoreChans above. // "false" for NeedMoreChans above.
@ -567,22 +523,13 @@ func TestAgentBalanceUpdate(t *testing.T) {
} }
defer agent.Stop() defer agent.Stop()
var wg sync.WaitGroup
// We'll send an initial "no" response to advance the agent past its // We'll send an initial "no" response to advance the agent past its
// initial check. // initial check.
wg.Add(1) select {
go func() { case heuristic.moreChansResps <- moreChansResp{false, 0, 0}:
select { case <-time.After(time.Second * 10):
case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: t.Fatalf("heuristic wasn't queried in time")
wg.Done() }
return
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
}()
wg.Wait()
// Next we'll send a new balance update signal to the agent, adding 5 // Next we'll send a new balance update signal to the agent, adding 5
// BTC to the amount of available funds. // BTC to the amount of available funds.
@ -592,36 +539,22 @@ func TestAgentBalanceUpdate(t *testing.T) {
agent.OnBalanceChange() agent.OnBalanceChange()
wg = sync.WaitGroup{}
// The agent should now query the heuristic in order to determine its // The agent should now query the heuristic in order to determine its
// next action as it local state has now been modified. // next action as it local state has now been modified.
wg.Add(1) select {
go func() { case heuristic.moreChansResps <- moreChansResp{false, 0, 0}:
select { // At this point, the local state of the agent should
case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: // have also been updated to reflect that the LN node
// At this point, the local state of the agent should // now has an additional 5BTC available.
// have also been updated to reflect that the LN node if agent.totalBalance != walletBalance {
// now has an additional 5BTC available. t.Fatalf("expected %v wallet balance "+
if agent.totalBalance != walletBalance { "instead have %v", agent.totalBalance,
t.Fatalf("expected %v wallet balance "+ walletBalance)
"instead have %v", agent.totalBalance,
walletBalance)
}
// With all of our assertions passed, we'll signal the
// main test goroutine to continue the test.
wg.Done()
return
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
} }
}()
// We'll wait here for either the agent to query the heuristic to be case <-time.After(time.Second * 10):
// queried, or for the timeout above to tick. t.Fatalf("heuristic wasn't queried in time")
wg.Wait() }
// There shouldn't be a call to the Select method as we've returned // There shouldn't be a call to the Select method as we've returned
// "false" for NeedMoreChans above. // "false" for NeedMoreChans above.
@ -695,39 +628,39 @@ func TestAgentImmediateAttach(t *testing.T) {
} }
defer agent.Stop() defer agent.Stop()
var wg sync.WaitGroup
const numChans = 5 const numChans = 5
// The very first thing the agent should do is query the NeedMoreChans // The very first thing the agent should do is query the NeedMoreChans
// method on the passed heuristic. So we'll provide it with a response // method on the passed heuristic. So we'll provide it with a response
// that will kick off the main loop. // that will kick off the main loop.
wg.Add(1) select {
go func() {
select {
// We'll send over a response indicating that it should // We'll send over a response indicating that it should
// establish more channels, and give it a budget of 5 BTC to do // establish more channels, and give it a budget of 5 BTC to do
// so. // so.
case heuristic.moreChansResps <- moreChansResp{true, numChans, 5 * btcutil.SatoshiPerBitcoin}: case heuristic.moreChansResps <- moreChansResp{
wg.Done() needMore: true,
return numMore: numChans,
case <-time.After(time.Second * 10): amt: 5 * btcutil.SatoshiPerBitcoin,
t.Fatalf("heuristic wasn't queried in time") }:
} case <-time.After(time.Second * 10):
}() t.Fatalf("heuristic wasn't queried in time")
}
// We'll wait here for the agent to query the heuristic. If ti doesn't
// do so within 10 seconds, then the test will fail out.
wg.Wait()
// At this point, the agent should now be querying the heuristic to // At this point, the agent should now be querying the heuristic to
// requests attachment directives. We'll generate 5 mock directives so // requests attachment directives. We'll generate 5 mock directives so
// it can progress within its loop. // it can progress within its loop.
directives := make([]AttachmentDirective, numChans) directives := make([]AttachmentDirective, numChans)
nodeKeys := make(map[NodeID]struct{})
for i := 0; i < numChans; i++ { for i := 0; i < numChans; i++ {
pub, err := randKey()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
nodeID := NewNodeID(pub)
directives[i] = AttachmentDirective{ directives[i] = AttachmentDirective{
PeerKey: self, NodeKey: pub,
NodeID: nodeID,
ChanAmt: btcutil.SatoshiPerBitcoin, ChanAmt: btcutil.SatoshiPerBitcoin,
Addrs: []net.Addr{ Addrs: []net.Addr{
&net.TCPAddr{ &net.TCPAddr{
@ -735,26 +668,16 @@ func TestAgentImmediateAttach(t *testing.T) {
}, },
}, },
} }
nodeKeys[nodeID] = struct{}{}
} }
wg = sync.WaitGroup{}
// With our fake directives created, we'll now send then to the agent // With our fake directives created, we'll now send then to the agent
// as a return value for the Select function. // as a return value for the Select function.
wg.Add(1) select {
go func() { case heuristic.directiveResps <- directives:
select { case <-time.After(time.Second * 10):
case heuristic.directiveResps <- directives: t.Fatalf("heuristic wasn't queried in time")
wg.Done() }
return
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
}()
// We'll wait here for either the agent to query the heuristic to be
// queried, or for the timeout above to tick.
wg.Wait()
// Finally, we should receive 5 calls to the OpenChannel method with // Finally, we should receive 5 calls to the OpenChannel method with
// the exact same parameters that we specified within the attachment // the exact same parameters that we specified within the attachment
@ -766,11 +689,13 @@ func TestAgentImmediateAttach(t *testing.T) {
t.Fatalf("invalid chan amt: expected %v, got %v", t.Fatalf("invalid chan amt: expected %v, got %v",
btcutil.SatoshiPerBitcoin, openChan.amt) btcutil.SatoshiPerBitcoin, openChan.amt)
} }
if !openChan.target.IsEqual(self) { nodeID := NewNodeID(openChan.target)
t.Fatalf("unexpected key: expected %x, got %x", _, ok := nodeKeys[nodeID]
self.SerializeCompressed(), if !ok {
openChan.target.SerializeCompressed()) t.Fatalf("unexpected key: %v, not found",
nodeID)
} }
delete(nodeKeys, nodeID)
case <-time.After(time.Second * 10): case <-time.After(time.Second * 10):
t.Fatalf("channel not opened in time") t.Fatalf("channel not opened in time")
} }
@ -838,42 +763,35 @@ func TestAgentPrivateChannels(t *testing.T) {
defer agent.Stop() defer agent.Stop()
const numChans = 5 const numChans = 5
var wg sync.WaitGroup
// The very first thing the agent should do is query the NeedMoreChans // The very first thing the agent should do is query the NeedMoreChans
// method on the passed heuristic. So we'll provide it with a response // method on the passed heuristic. So we'll provide it with a response
// that will kick off the main loop. // that will kick off the main loop. We'll send over a response
wg.Add(1) // indicating that it should establish more channels, and give it a
go func() { // budget of 5 BTC to do so.
defer wg.Done() resp := moreChansResp{
needMore: true,
// We'll send over a response indicating that it should numMore: numChans,
// establish more channels, and give it a budget of 5 BTC to do amt: 5 * btcutil.SatoshiPerBitcoin,
// so. }
resp := moreChansResp{ select {
needMore: true, case heuristic.moreChansResps <- resp:
numMore: numChans, case <-time.After(time.Second * 10):
amt: 5 * btcutil.SatoshiPerBitcoin, t.Fatalf("heuristic wasn't queried in time")
} }
select {
case heuristic.moreChansResps <- resp:
return
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
}()
// We'll wait here for the agent to query the heuristic. If it doesn't
// do so within 10 seconds, then the test will fail out.
wg.Wait()
// At this point, the agent should now be querying the heuristic to // At this point, the agent should now be querying the heuristic to
// requests attachment directives. We'll generate 5 mock directives so // requests attachment directives. We'll generate 5 mock directives so
// it can progress within its loop. // it can progress within its loop.
directives := make([]AttachmentDirective, numChans) directives := make([]AttachmentDirective, numChans)
for i := 0; i < numChans; i++ { for i := 0; i < numChans; i++ {
pub, err := randKey()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
directives[i] = AttachmentDirective{ directives[i] = AttachmentDirective{
PeerKey: self, NodeKey: pub,
NodeID: NewNodeID(pub),
ChanAmt: btcutil.SatoshiPerBitcoin, ChanAmt: btcutil.SatoshiPerBitcoin,
Addrs: []net.Addr{ Addrs: []net.Addr{
&net.TCPAddr{ &net.TCPAddr{
@ -885,21 +803,11 @@ func TestAgentPrivateChannels(t *testing.T) {
// With our fake directives created, we'll now send then to the agent // With our fake directives created, we'll now send then to the agent
// as a return value for the Select function. // as a return value for the Select function.
wg.Add(1) select {
go func() { case heuristic.directiveResps <- directives:
defer wg.Done() case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
select { }
case heuristic.directiveResps <- directives:
return
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
}()
// We'll wait here for either the agent to query the heuristic to be
// queried, or for the timeout above to tick.
wg.Wait()
// Finally, we should receive 5 calls to the OpenChannel method, each // Finally, we should receive 5 calls to the OpenChannel method, each
// specifying that it's for a private channel. // specifying that it's for a private channel.
@ -978,29 +886,19 @@ func TestAgentPendingChannelState(t *testing.T) {
} }
defer agent.Stop() defer agent.Stop()
var wg sync.WaitGroup
// Once again, we'll start by telling the agent as part of its first // Once again, we'll start by telling the agent as part of its first
// query, that it needs more channels and has 3 BTC available for // query, that it needs more channels and has 3 BTC available for
// attachment. // attachment. We'll send over a response indicating that it should
wg.Add(1) // establish more channels, and give it a budget of 1 BTC to do so.
go func() { select {
select { case heuristic.moreChansResps <- moreChansResp{
needMore: true,
// We'll send over a response indicating that it should numMore: 1,
// establish more channels, and give it a budget of 1 BTC to do amt: btcutil.SatoshiPerBitcoin,
// so. }:
case heuristic.moreChansResps <- moreChansResp{true, 1, btcutil.SatoshiPerBitcoin}: case <-time.After(time.Second * 10):
wg.Done() t.Fatalf("heuristic wasn't queried in time")
return }
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
}()
// We'll wait for the first query to be consumed. If this doesn't
// happen then the above goroutine will timeout, and fail the test.
wg.Wait()
heuristic.moreChanArgs = make(chan moreChanArg) heuristic.moreChanArgs = make(chan moreChanArg)
@ -1013,7 +911,8 @@ func TestAgentPendingChannelState(t *testing.T) {
} }
nodeID := NewNodeID(nodeKey) nodeID := NewNodeID(nodeKey)
nodeDirective := AttachmentDirective{ nodeDirective := AttachmentDirective{
PeerKey: nodeKey, NodeKey: nodeKey,
NodeID: nodeID,
ChanAmt: 0.5 * btcutil.SatoshiPerBitcoin, ChanAmt: 0.5 * btcutil.SatoshiPerBitcoin,
Addrs: []net.Addr{ Addrs: []net.Addr{
&net.TCPAddr{ &net.TCPAddr{
@ -1072,7 +971,7 @@ func TestAgentPendingChannelState(t *testing.T) {
} }
if req.chans[0].Node != nodeID { if req.chans[0].Node != nodeID {
t.Fatalf("wrong node ID: expected %x, got %x", t.Fatalf("wrong node ID: expected %x, got %x",
req.chans[0].Node[:], nodeID) nodeID, req.chans[0].Node[:])
} }
case <-time.After(time.Second * 10): case <-time.After(time.Second * 10):
t.Fatalf("need more chans wasn't queried in time") t.Fatalf("need more chans wasn't queried in time")
@ -1157,16 +1056,11 @@ func TestAgentPendingOpenChannel(t *testing.T) {
// We'll send an initial "no" response to advance the agent past its // We'll send an initial "no" response to advance the agent past its
// initial check. // initial check.
var wg sync.WaitGroup select {
wg.Add(1) case heuristic.moreChansResps <- moreChansResp{false, 0, 0}:
go func() { case <-time.After(time.Second * 10):
defer wg.Done() t.Fatalf("heuristic wasn't queried in time")
select { }
case heuristic.moreChansResps <- moreChansResp{false, 0, 0}:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
}()
// Next, we'll signal that a new channel has been opened, but it is // Next, we'll signal that a new channel has been opened, but it is
// still pending. // still pending.
@ -1174,19 +1068,11 @@ func TestAgentPendingOpenChannel(t *testing.T) {
// The agent should now query the heuristic in order to determine its // The agent should now query the heuristic in order to determine its
// next action as its local state has now been modified. // next action as its local state has now been modified.
wg.Add(1) select {
go func() { case heuristic.moreChansResps <- moreChansResp{false, 0, 0}:
defer wg.Done() case <-time.After(time.Second * 10):
select { t.Fatalf("heuristic wasn't queried in time")
case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: }
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
}()
// We'll wait here for either the agent to query the heuristic to be
// queried, or for the timeout above to tick.
wg.Wait()
// There shouldn't be a call to the Select method as we've returned // There shouldn't be a call to the Select method as we've returned
// "false" for NeedMoreChans above. // "false" for NeedMoreChans above.
@ -1254,21 +1140,15 @@ func TestAgentOnNodeUpdates(t *testing.T) {
// We'll send an initial "yes" response to advance the agent past its // We'll send an initial "yes" response to advance the agent past its
// initial check. This will cause it to try to get directives from an // initial check. This will cause it to try to get directives from an
// empty graph. // empty graph.
var wg sync.WaitGroup select {
wg.Add(1) case heuristic.moreChansResps <- moreChansResp{
go func() { needMore: true,
defer wg.Done() numMore: 2,
select { amt: walletBalance,
case heuristic.moreChansResps <- moreChansResp{ }:
needMore: true, case <-time.After(time.Second * 10):
numMore: 2, t.Fatalf("heuristic wasn't queried in time")
amt: walletBalance, }
}:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
}()
wg.Wait()
// Send over an empty list of attachment directives, which should cause // Send over an empty list of attachment directives, which should cause
// the agent to return to waiting on a new signal. // the agent to return to waiting on a new signal.
@ -1285,21 +1165,15 @@ func TestAgentOnNodeUpdates(t *testing.T) {
// In response, the agent should wake up and see if it needs more // In response, the agent should wake up and see if it needs more
// channels. Since we haven't done anything, we will send the same // channels. Since we haven't done anything, we will send the same
// response as before since we are still trying to open channels. // response as before since we are still trying to open channels.
var wg2 sync.WaitGroup select {
wg2.Add(1) case heuristic.moreChansResps <- moreChansResp{
go func() { needMore: true,
defer wg2.Done() numMore: 2,
select { amt: walletBalance,
case heuristic.moreChansResps <- moreChansResp{ }:
needMore: true, case <-time.After(time.Second * 10):
numMore: 2, t.Fatalf("heuristic wasn't queried in time")
amt: walletBalance, }
}:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
}()
wg2.Wait()
// Again the agent should pull in the next set of attachment directives. // Again the agent should pull in the next set of attachment directives.
// It's not important that this list is also empty, so long as the node // It's not important that this list is also empty, so long as the node
@ -1310,3 +1184,180 @@ func TestAgentOnNodeUpdates(t *testing.T) {
t.Fatalf("Select was not called but should have been") t.Fatalf("Select was not called but should have been")
} }
} }
// TestAgentSkipPendingConns asserts that the agent will not try to make
// duplicate connection requests to the same node, even if the attachment
// heuristic instructs the agent to do so. It also asserts that the agent
// stops tracking the pending connection once it finishes. Note that in
// practice, a failed connection would be inserted into the skip map passed to
// the attachment heuristic, though this does not assert that case.
func TestAgentSkipPendingConns(t *testing.T) {
t.Parallel()
// First, we'll create all the dependencies that we'll need in order to
// create the autopilot agent.
self, err := randKey()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
heuristic := &mockHeuristic{
moreChansResps: make(chan moreChansResp),
directiveResps: make(chan []AttachmentDirective),
}
chanController := &mockChanController{
openChanSignals: make(chan openChanIntent),
}
memGraph, _, _ := newMemChanGraph()
// The wallet will start with 6 BTC available.
const walletBalance = btcutil.SatoshiPerBitcoin * 6
connect := make(chan chan error)
// With the dependencies we created, we can now create the initial
// agent itself.
testCfg := Config{
Self: self,
Heuristic: heuristic,
ChanController: chanController,
WalletBalance: func() (btcutil.Amount, error) {
return walletBalance, nil
},
ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) {
errChan := make(chan error)
connect <- errChan
err := <-errChan
return false, err
},
DisconnectPeer: func(*btcec.PublicKey) error {
return nil
},
Graph: memGraph,
MaxPendingOpens: 10,
}
initialChans := []Channel{}
agent, err := New(testCfg, initialChans)
if err != nil {
t.Fatalf("unable to create agent: %v", err)
}
// To ensure the heuristic doesn't block on quitting the agent, we'll
// use the agent's quit chan to signal when it should also stop.
heuristic.quit = agent.quit
// With the autopilot agent and all its dependencies we'll start the
// primary controller goroutine.
if err := agent.Start(); err != nil {
t.Fatalf("unable to start agent: %v", err)
}
defer agent.Stop()
// We'll send an initial "yes" response to advance the agent past its
// initial check. This will cause it to try to get directives from the
// graph.
select {
case heuristic.moreChansResps <- moreChansResp{
needMore: true,
numMore: 1,
amt: walletBalance,
}:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
// Next, the agent should deliver a query to the Select method of the
// heuristic. We'll only return a single directive for a pre-chosen
// node.
nodeKey, err := randKey()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
nodeDirective := AttachmentDirective{
NodeKey: nodeKey,
NodeID: NewNodeID(nodeKey),
ChanAmt: 0.5 * btcutil.SatoshiPerBitcoin,
Addrs: []net.Addr{
&net.TCPAddr{
IP: bytes.Repeat([]byte("a"), 16),
},
},
}
select {
case heuristic.directiveResps <- []AttachmentDirective{nodeDirective}:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
var errChan chan error
select {
case errChan = <-connect:
case <-time.After(time.Second * 10):
t.Fatalf("agent did not attempt connection")
}
// Signal the agent to go again, now that we've tried to connect.
agent.OnNodeUpdates()
// The heuristic again informs the agent that we need more channels.
select {
case heuristic.moreChansResps <- moreChansResp{
needMore: true,
numMore: 1,
amt: walletBalance,
}:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
// Send a directive for the same node, which already has a pending conn.
select {
case heuristic.directiveResps <- []AttachmentDirective{nodeDirective}:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
// This time, the agent should skip trying to connect to the node with a
// pending connection.
select {
case <-connect:
t.Fatalf("agent should not have attempted connection")
case <-time.After(time.Second * 3):
}
// Now, timeout the original request, which should still be waiting for
// a response.
select {
case errChan <- fmt.Errorf("connection timeout"):
case <-time.After(time.Second * 10):
t.Fatalf("agent did not receive connection timeout")
}
// Signal the agent to try again, now that there are no pending conns.
agent.OnNodeUpdates()
// The heuristic again informs the agent that we need more channels.
select {
case heuristic.moreChansResps <- moreChansResp{
needMore: true,
numMore: 1,
amt: walletBalance,
}:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
// Send a directive for the same node, which already has a pending conn.
select {
case heuristic.directiveResps <- []AttachmentDirective{nodeDirective}:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
// This time, the agent should try the connection since the peer has
// been removed from the pending map.
select {
case <-connect:
case <-time.After(time.Second * 10):
t.Fatalf("agent have attempted connection")
}
}

@ -85,10 +85,13 @@ type ChannelGraph interface {
// AttachmentHeuristic. It details to which node a channel should be created // AttachmentHeuristic. It details to which node a channel should be created
// to, and also the parameters which should be used in the channel creation. // to, and also the parameters which should be used in the channel creation.
type AttachmentDirective struct { type AttachmentDirective struct {
// PeerKey is the target node for this attachment directive. It can be // NodeKey is the target node for this attachment directive. It can be
// identified by its public key, and therefore can be used along with // identified by its public key, and therefore can be used along with
// a ChannelOpener implementation to execute the directive. // a ChannelOpener implementation to execute the directive.
PeerKey *btcec.PublicKey NodeKey *btcec.PublicKey
// NodeID is the serialized compressed pubkey of the target node.
NodeID NodeID
// ChanAmt is the size of the channel that should be opened, expressed // ChanAmt is the size of the channel that should be opened, expressed
// in satoshis. // in satoshis.

@ -245,11 +245,12 @@ func (p *ConstrainedPrefAttachment) Select(self *btcec.PublicKey, g ChannelGraph
} }
directives = append(directives, AttachmentDirective{ directives = append(directives, AttachmentDirective{
// TODO(roasbeef): need curve? // TODO(roasbeef): need curve?
PeerKey: &btcec.PublicKey{ NodeKey: &btcec.PublicKey{
X: pub.X, X: pub.X,
Y: pub.Y, Y: pub.Y,
}, },
Addrs: selectedNode.Addrs(), NodeID: NewNodeID(pub),
Addrs: selectedNode.Addrs(),
}) })
// With the node selected, we'll add it to the set of visited // With the node selected, we'll add it to the set of visited

@ -349,11 +349,11 @@ func TestConstrainedPrefAttachmentSelectTwoVertexes(t *testing.T) {
edge2Pub := edge2.Peer.PubKey() edge2Pub := edge2.Peer.PubKey()
switch { switch {
case bytes.Equal(directive.PeerKey.SerializeCompressed(), edge1Pub[:]): case bytes.Equal(directive.NodeKey.SerializeCompressed(), edge1Pub[:]):
case bytes.Equal(directive.PeerKey.SerializeCompressed(), edge2Pub[:]): case bytes.Equal(directive.NodeKey.SerializeCompressed(), edge2Pub[:]):
default: default:
t1.Fatalf("attached to unknown node: %x", t1.Fatalf("attached to unknown node: %x",
directive.PeerKey.SerializeCompressed()) directive.NodeKey.SerializeCompressed())
} }
// As the number of funds available exceed the // As the number of funds available exceed the
@ -634,8 +634,8 @@ func TestConstrainedPrefAttachmentSelectSkipNodes(t *testing.T) {
// We'll simulate a channel update by adding the nodes // We'll simulate a channel update by adding the nodes
// we just establish channel with the to set of nodes // we just establish channel with the to set of nodes
// to be skipped. // to be skipped.
skipNodes[NewNodeID(directives[0].PeerKey)] = struct{}{} skipNodes[NewNodeID(directives[0].NodeKey)] = struct{}{}
skipNodes[NewNodeID(directives[1].PeerKey)] = struct{}{} skipNodes[NewNodeID(directives[1].NodeKey)] = struct{}{}
// If we attempt to make a call to the Select function, // If we attempt to make a call to the Select function,
// without providing any new information, then we // without providing any new information, then we