Merge pull request #2633 from halseth/autpilot-chansize-allocation

[autopilot] distribute available funds among channels
This commit is contained in:
Olaoluwa Osuntokun 2019-03-14 12:51:02 -07:00 committed by GitHub
commit 36cc1da8ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 515 additions and 886 deletions

@ -567,10 +567,15 @@ func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32,
// As channel size we'll use the maximum channel size available. // As channel size we'll use the maximum channel size available.
chanSize := a.cfg.Constraints.MaxChanSize() chanSize := a.cfg.Constraints.MaxChanSize()
if availableFunds-chanSize < 0 { if availableFunds < chanSize {
chanSize = availableFunds chanSize = availableFunds
} }
if chanSize < a.cfg.Constraints.MinChanSize() {
return fmt.Errorf("not enough funds available to open a " +
"single channel")
}
// Use the heuristic to calculate a score for each node in the // Use the heuristic to calculate a score for each node in the
// graph. // graph.
scores, err := a.cfg.Heuristic.NodeScores( scores, err := a.cfg.Heuristic.NodeScores(
@ -601,6 +606,17 @@ func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32,
continue continue
} }
// Track the available funds we have left.
if availableFunds < chanSize {
chanSize = availableFunds
}
availableFunds -= chanSize
// If we run out of funds, we can break early.
if chanSize < a.cfg.Constraints.MinChanSize() {
break
}
chanCandidates[nID] = &AttachmentDirective{ chanCandidates[nID] = &AttachmentDirective{
NodeID: nID, NodeID: nID,
ChanAmt: chanSize, ChanAmt: chanSize,
@ -725,8 +741,7 @@ func (a *Agent) executeDirective(directive AttachmentDirective) {
// fewer slots were available, and other successful attempts finished // fewer slots were available, and other successful attempts finished
// first. // first.
a.pendingMtx.Lock() a.pendingMtx.Lock()
if uint16(len(a.pendingOpens)) >= if uint16(len(a.pendingOpens)) >= a.cfg.Constraints.MaxPendingOpens() {
a.cfg.Constraints.MaxPendingOpens() {
// Since we've reached our max number of pending opens, we'll // Since we've reached our max number of pending opens, we'll
// disconnect this peer and exit. However, if we were // disconnect this peer and exit. However, if we were
// previously connected to them, then we'll make sure to // previously connected to them, then we'll make sure to
@ -799,6 +814,9 @@ func (a *Agent) executeDirective(directive AttachmentDirective) {
// Since the channel open was successful and is currently pending, // Since the channel open was successful and is currently pending,
// we'll trigger the autopilot agent to query for more peers. // we'll trigger the autopilot agent to query for more peers.
// TODO(halseth): this triggers a new loop before all the new channels
// are added to the pending channels map. Should add before executing
// directive in goroutine?
a.OnChannelPendingOpen() a.OnChannelPendingOpen()
} }

@ -58,7 +58,7 @@ func (m *mockConstraints) MaxPendingOpens() uint16 {
} }
func (m *mockConstraints) MinChanSize() btcutil.Amount { func (m *mockConstraints) MinChanSize() btcutil.Amount {
return 0 return 1e7
} }
func (m *mockConstraints) MaxChanSize() btcutil.Amount { func (m *mockConstraints) MaxChanSize() btcutil.Amount {
return 1e8 return 1e8
@ -85,13 +85,13 @@ func (m *mockHeuristic) Name() string {
} }
func (m *mockHeuristic) NodeScores(g ChannelGraph, chans []Channel, func (m *mockHeuristic) NodeScores(g ChannelGraph, chans []Channel,
fundsAvailable btcutil.Amount, nodes map[NodeID]struct{}) ( chanSize btcutil.Amount, nodes map[NodeID]struct{}) (
map[NodeID]*NodeScore, error) { map[NodeID]*NodeScore, error) {
if m.nodeScoresArgs != nil { if m.nodeScoresArgs != nil {
directive := directiveArg{ directive := directiveArg{
graph: g, graph: g,
amt: fundsAvailable, amt: chanSize,
chans: chans, chans: chans,
nodes: nodes, nodes: nodes,
} }
@ -150,10 +150,20 @@ func (m *mockChanController) SpliceOut(chanPoint *wire.OutPoint,
var _ ChannelController = (*mockChanController)(nil) var _ ChannelController = (*mockChanController)(nil)
// TestAgentChannelOpenSignal tests that upon receipt of a chanOpenUpdate, then type testContext struct {
// agent modifies its local state accordingly, and reconsults the heuristic. constraints *mockConstraints
func TestAgentChannelOpenSignal(t *testing.T) { heuristic *mockHeuristic
t.Parallel() chanController ChannelController
graph testGraph
agent *Agent
walletBalance btcutil.Amount
quit chan struct{}
sync.Mutex
}
func setup(t *testing.T, initialChans []Channel) (*testContext, func()) {
t.Helper()
// First, we'll create all the dependencies that we'll need in order to // First, we'll create all the dependencies that we'll need in order to
// create the autopilot agent. // create the autopilot agent.
@ -164,11 +174,13 @@ func TestAgentChannelOpenSignal(t *testing.T) {
quit := make(chan struct{}) quit := make(chan struct{})
heuristic := &mockHeuristic{ heuristic := &mockHeuristic{
nodeScoresArgs: make(chan directiveArg),
nodeScoresResps: make(chan map[NodeID]*NodeScore), nodeScoresResps: make(chan map[NodeID]*NodeScore),
quit: quit, quit: quit,
} }
constraints := &mockConstraints{ constraints := &mockConstraints{
moreChansResps: make(chan moreChansResp), moreChansResps: make(chan moreChansResp),
moreChanArgs: make(chan moreChanArg),
quit: quit, quit: quit,
} }
@ -177,6 +189,19 @@ func TestAgentChannelOpenSignal(t *testing.T) {
} }
memGraph, _, _ := newMemChanGraph() memGraph, _, _ := newMemChanGraph()
// We'll keep track of the funds available to the agent, to make sure
// it correctly uses this value when querying the ChannelBudget.
var availableFunds btcutil.Amount = 10 * btcutil.SatoshiPerBitcoin
ctx := &testContext{
constraints: constraints,
heuristic: heuristic,
chanController: chanController,
graph: memGraph,
walletBalance: availableFunds,
quit: quit,
}
// With the dependencies we created, we can now create the initial // With the dependencies we created, we can now create the initial
// agent itself. // agent itself.
testCfg := Config{ testCfg := Config{
@ -184,7 +209,9 @@ func TestAgentChannelOpenSignal(t *testing.T) {
Heuristic: heuristic, Heuristic: heuristic,
ChanController: chanController, ChanController: chanController,
WalletBalance: func() (btcutil.Amount, error) { WalletBalance: func() (btcutil.Amount, error) {
return 0, nil ctx.Lock()
defer ctx.Unlock()
return ctx.walletBalance, nil
}, },
ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) {
return false, nil return false, nil
@ -195,35 +222,80 @@ func TestAgentChannelOpenSignal(t *testing.T) {
Graph: memGraph, Graph: memGraph,
Constraints: constraints, Constraints: constraints,
} }
initialChans := []Channel{}
agent, err := New(testCfg, initialChans) agent, err := New(testCfg, initialChans)
if err != nil { if err != nil {
t.Fatalf("unable to create agent: %v", err) t.Fatalf("unable to create agent: %v", err)
} }
ctx.agent = agent
// To ensure the heuristic doesn't block on quitting the agent, we'll // With the autopilot agent and all its dependencies we'll start the
// use the agent's quit chan to signal when it should also stop.
heuristic.quit = agent.quit
// With the autopilot agent and all its dependencies we'll star the
// primary controller goroutine. // primary controller goroutine.
if err := agent.Start(); err != nil { if err := agent.Start(); err != nil {
t.Fatalf("unable to start agent: %v", err) t.Fatalf("unable to start agent: %v", err)
} }
defer agent.Stop()
// We must defer the closing of quit after the defer agent.Stop(), to cleanup := func() {
// make sure ConnectToPeer won't block preventing the agent from // We must close quit before agent.Stop(), to make sure
// exiting. // ChannelBudget won't block preventing the agent from exiting.
defer close(quit) close(quit)
agent.Stop()
}
return ctx, cleanup
}
// respondMoreChans consumes the moreChanArgs element and responds to the agent
// with the given moreChansResp.
func respondMoreChans(t *testing.T, testCtx *testContext, resp moreChansResp) {
t.Helper()
// The agent should now query the heuristic.
select {
case <-testCtx.constraints.moreChanArgs:
case <-time.After(time.Second * 3):
t.Fatalf("heuristic wasn't queried in time")
}
// We'll send the response.
select {
case testCtx.constraints.moreChansResps <- resp:
case <-time.After(time.Second * 10):
t.Fatalf("response wasn't sent in time")
}
}
// respondMoreChans consumes the nodeScoresArgs element and responds to the
// agent with the given node scores.
func respondNodeScores(t *testing.T, testCtx *testContext,
resp map[NodeID]*NodeScore) {
t.Helper()
// Send over an empty list of attachment directives, which should cause
// the agent to return to waiting on a new signal.
select {
case <-testCtx.heuristic.nodeScoresArgs:
case <-time.After(time.Second * 3):
t.Fatalf("node scores weren't queried in time")
}
select {
case testCtx.heuristic.nodeScoresResps <- resp:
case <-time.After(time.Second * 10):
t.Fatalf("node scores were not sent in time")
}
}
// TestAgentChannelOpenSignal tests that upon receipt of a chanOpenUpdate, then
// agent modifies its local state accordingly, and reconsults the heuristic.
func TestAgentChannelOpenSignal(t *testing.T) {
t.Parallel()
testCtx, cleanup := setup(t, nil)
defer cleanup()
// We'll send an initial "no" response to advance the agent past its // We'll send an initial "no" response to advance the agent past its
// initial check. // initial check.
select { respondMoreChans(t, testCtx, moreChansResp{0, 0})
case constraints.moreChansResps <- moreChansResp{0, 0}:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
// Next we'll signal a new channel being opened by the backing LN node, // Next we'll signal a new channel being opened by the backing LN node,
// with a capacity of 1 BTC. // with a capacity of 1 BTC.
@ -231,30 +303,26 @@ func TestAgentChannelOpenSignal(t *testing.T) {
ChanID: randChanID(), ChanID: randChanID(),
Capacity: btcutil.SatoshiPerBitcoin, Capacity: btcutil.SatoshiPerBitcoin,
} }
agent.OnChannelOpen(newChan) testCtx.agent.OnChannelOpen(newChan)
// The agent should now query the heuristic in order to determine its // The agent should now query the heuristic in order to determine its
// next action as it local state has now been modified. // next action as it local state has now been modified.
select { respondMoreChans(t, testCtx, moreChansResp{0, 0})
case constraints.moreChansResps <- moreChansResp{0, 0}:
// At this point, the local state of the agent should // At this point, the local state of the agent should
// have also been updated to reflect that the LN node // have also been updated to reflect that the LN node
// now has an additional channel with one BTC. // now has an additional channel with one BTC.
if _, ok := agent.chanState[newChan.ChanID]; !ok { if _, ok := testCtx.agent.chanState[newChan.ChanID]; !ok {
t.Fatalf("internal channel state wasn't updated") t.Fatalf("internal channel state wasn't updated")
} }
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
// There shouldn't be a call to the Select method as we've returned // There shouldn't be a call to the Select method as we've returned
// "false" for NeedMoreChans above. // "false" for NeedMoreChans above.
select { select {
// If this send success, then Select was erroneously called and the // If this send success, then Select was erroneously called and the
// test should be failed. // test should be failed.
case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}: case testCtx.heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}:
t.Fatalf("Select was called but shouldn't have been") t.Fatalf("Select was called but shouldn't have been")
// This is the correct path as Select should've be called. // This is the correct path as Select should've be called.
@ -290,79 +358,19 @@ var _ ChannelController = (*mockFailingChanController)(nil)
func TestAgentChannelFailureSignal(t *testing.T) { func TestAgentChannelFailureSignal(t *testing.T) {
t.Parallel() t.Parallel()
// First, we'll create all the dependencies that we'll need in order to testCtx, cleanup := setup(t, nil)
// create the autopilot agent. defer cleanup()
self, err := randKey()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
quit := make(chan struct{}) testCtx.chanController = &mockFailingChanController{}
heuristic := &mockHeuristic{
nodeScoresResps: make(chan map[NodeID]*NodeScore),
quit: quit,
}
constraints := &mockConstraints{
moreChansResps: make(chan moreChansResp),
quit: quit,
}
chanController := &mockFailingChanController{} node, err := testCtx.graph.addRandNode()
memGraph, _, _ := newMemChanGraph()
node, err := memGraph.addRandNode()
if err != nil { if err != nil {
t.Fatalf("unable to add node: %v", err) t.Fatalf("unable to add node: %v", err)
} }
// With the dependencies we created, we can now create the initial
// agent itself.
testCfg := Config{
Self: self,
Heuristic: heuristic,
ChanController: chanController,
WalletBalance: func() (btcutil.Amount, error) {
return 0, nil
},
// TODO: move address check to agent.
ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) {
return false, nil
},
DisconnectPeer: func(*btcec.PublicKey) error {
return nil
},
Graph: memGraph,
Constraints: constraints,
}
initialChans := []Channel{}
agent, err := New(testCfg, initialChans)
if err != nil {
t.Fatalf("unable to create agent: %v", err)
}
// To ensure the heuristic doesn't block on quitting the agent, we'll
// use the agent's quit chan to signal when it should also stop.
heuristic.quit = agent.quit
// With the autopilot agent and all its dependencies we'll start the
// primary controller goroutine.
if err := agent.Start(); err != nil {
t.Fatalf("unable to start agent: %v", err)
}
defer agent.Stop()
// We must defer the closing of quit after the defer agent.Stop(), to
// make sure ConnectToPeer won't block preventing the agent from
// exiting.
defer close(quit)
// First ensure the agent will attempt to open a new channel. Return // First ensure the agent will attempt to open a new channel. Return
// that we need more channels, and have 5BTC to use. // that we need more channels, and have 5BTC to use.
select { respondMoreChans(t, testCtx, moreChansResp{1, 5 * btcutil.SatoshiPerBitcoin})
case constraints.moreChansResps <- moreChansResp{1, 5 * btcutil.SatoshiPerBitcoin}:
case <-time.After(time.Second * 10):
t.Fatal("heuristic wasn't queried in time")
}
// At this point, the agent should now be querying the heuristic to // At this point, the agent should now be querying the heuristic to
// request attachment directives, return a fake so the agent will // request attachment directives, return a fake so the agent will
@ -372,28 +380,17 @@ func TestAgentChannelFailureSignal(t *testing.T) {
Score: 0.5, Score: 0.5,
} }
select { respondNodeScores(
case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{ t, testCtx, map[NodeID]*NodeScore{
NewNodeID(node): fakeDirective, NewNodeID(node): fakeDirective,
}: },
case <-time.After(time.Second * 10): )
t.Fatal("heuristic wasn't queried in time")
}
// At this point the agent will attempt to create a channel and fail. // At this point the agent will attempt to create a channel and fail.
// Now ensure that the controller loop is re-executed. // Now ensure that the controller loop is re-executed.
select { respondMoreChans(t, testCtx, moreChansResp{1, 5 * btcutil.SatoshiPerBitcoin})
case constraints.moreChansResps <- moreChansResp{1, 5 * btcutil.SatoshiPerBitcoin}: respondNodeScores(t, testCtx, map[NodeID]*NodeScore{})
case <-time.After(time.Second * 10):
t.Fatal("heuristic wasn't queried in time")
}
select {
case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}:
case <-time.After(time.Second * 10):
t.Fatal("heuristic wasn't queried in time")
}
} }
// TestAgentChannelCloseSignal ensures that once the agent receives an outside // TestAgentChannelCloseSignal ensures that once the agent receives an outside
@ -401,48 +398,6 @@ func TestAgentChannelFailureSignal(t *testing.T) {
// will query the heuristic to make its next decision. // will query the heuristic to make its next decision.
func TestAgentChannelCloseSignal(t *testing.T) { func TestAgentChannelCloseSignal(t *testing.T) {
t.Parallel() t.Parallel()
// First, we'll create all the dependencies that we'll need in order to
// create the autopilot agent.
self, err := randKey()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
quit := make(chan struct{})
heuristic := &mockHeuristic{
nodeScoresResps: make(chan map[NodeID]*NodeScore),
quit: quit,
}
constraints := &mockConstraints{
moreChansResps: make(chan moreChansResp),
quit: quit,
}
chanController := &mockChanController{
openChanSignals: make(chan openChanIntent),
}
memGraph, _, _ := newMemChanGraph()
// With the dependencies we created, we can now create the initial
// agent itself.
testCfg := Config{
Self: self,
Heuristic: heuristic,
ChanController: chanController,
WalletBalance: func() (btcutil.Amount, error) {
return 0, nil
},
ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) {
return false, nil
},
DisconnectPeer: func(*btcec.PublicKey) error {
return nil
},
Graph: memGraph,
Constraints: constraints,
}
// We'll start the agent with two channels already being active. // We'll start the agent with two channels already being active.
initialChans := []Channel{ initialChans := []Channel{
{ {
@ -454,61 +409,36 @@ func TestAgentChannelCloseSignal(t *testing.T) {
Capacity: btcutil.SatoshiPerBitcoin * 2, Capacity: btcutil.SatoshiPerBitcoin * 2,
}, },
} }
agent, err := New(testCfg, initialChans)
if err != nil {
t.Fatalf("unable to create agent: %v", err)
}
// To ensure the heuristic doesn't block on quitting the agent, we'll testCtx, cleanup := setup(t, initialChans)
// use the agent's quit chan to signal when it should also stop. defer cleanup()
heuristic.quit = agent.quit
// With the autopilot agent and all its dependencies we'll star the
// primary controller goroutine.
if err := agent.Start(); err != nil {
t.Fatalf("unable to start agent: %v", err)
}
defer agent.Stop()
// We must defer the closing of quit after the defer agent.Stop(), to
// make sure ConnectToPeer won't block preventing the agent from
// exiting.
defer close(quit)
// We'll send an initial "no" response to advance the agent past its // We'll send an initial "no" response to advance the agent past its
// initial check. // initial check.
select { respondMoreChans(t, testCtx, moreChansResp{0, 0})
case constraints.moreChansResps <- moreChansResp{0, 0}:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
// Next, we'll close both channels which should force the agent to // Next, we'll close both channels which should force the agent to
// re-query the heuristic. // re-query the heuristic.
agent.OnChannelClose(initialChans[0].ChanID, initialChans[1].ChanID) testCtx.agent.OnChannelClose(initialChans[0].ChanID, initialChans[1].ChanID)
// The agent should now query the heuristic in order to determine its // The agent should now query the heuristic in order to determine its
// next action as it local state has now been modified. // next action as it local state has now been modified.
select { respondMoreChans(t, testCtx, moreChansResp{0, 0})
case constraints.moreChansResps <- moreChansResp{0, 0}:
// At this point, the local state of the agent should // At this point, the local state of the agent should
// have also been updated to reflect that the LN node // have also been updated to reflect that the LN node
// has no existing open channels. // has no existing open channels.
if len(agent.chanState) != 0 { if len(testCtx.agent.chanState) != 0 {
t.Fatalf("internal channel state wasn't updated") t.Fatalf("internal channel state wasn't updated")
} }
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
// There shouldn't be a call to the Select method as we've returned // There shouldn't be a call to the Select method as we've returned
// "false" for NeedMoreChans above. // "false" for NeedMoreChans above.
select { select {
// If this send success, then Select was erroneously called and the // If this send success, then Select was erroneously called and the
// test should be failed. // test should be failed.
case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}: case testCtx.heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}:
t.Fatalf("Select was called but shouldn't have been") t.Fatalf("Select was called but shouldn't have been")
// This is the correct path as Select should've be called. // This is the correct path as Select should've be called.
@ -522,105 +452,32 @@ func TestAgentChannelCloseSignal(t *testing.T) {
func TestAgentBalanceUpdate(t *testing.T) { func TestAgentBalanceUpdate(t *testing.T) {
t.Parallel() t.Parallel()
// First, we'll create all the dependencies that we'll need in order to testCtx, cleanup := setup(t, nil)
// create the autopilot agent. defer cleanup()
self, err := randKey()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
quit := make(chan struct{})
heuristic := &mockHeuristic{
nodeScoresResps: make(chan map[NodeID]*NodeScore),
quit: quit,
}
constraints := &mockConstraints{
moreChansResps: make(chan moreChansResp),
quit: quit,
}
chanController := &mockChanController{
openChanSignals: make(chan openChanIntent),
}
memGraph, _, _ := newMemChanGraph()
// The wallet will start with 2 BTC available.
var walletBalanceMtx sync.Mutex
walletBalance := btcutil.Amount(btcutil.SatoshiPerBitcoin * 2)
// With the dependencies we created, we can now create the initial
// agent itself.
testCfg := Config{
Self: self,
Heuristic: heuristic,
ChanController: chanController,
WalletBalance: func() (btcutil.Amount, error) {
walletBalanceMtx.Lock()
defer walletBalanceMtx.Unlock()
return walletBalance, nil
},
ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) {
return false, nil
},
DisconnectPeer: func(*btcec.PublicKey) error {
return nil
},
Graph: memGraph,
Constraints: constraints,
}
initialChans := []Channel{}
agent, err := New(testCfg, initialChans)
if err != nil {
t.Fatalf("unable to create agent: %v", err)
}
// To ensure the heuristic doesn't block on quitting the agent, we'll
// use the agent's quit chan to signal when it should also stop.
heuristic.quit = agent.quit
// With the autopilot agent and all its dependencies we'll star the
// primary controller goroutine.
if err := agent.Start(); err != nil {
t.Fatalf("unable to start agent: %v", err)
}
defer agent.Stop()
// We must defer the closing of quit after the defer agent.Stop(), to
// make sure ConnectToPeer won't block preventing the agent from
// exiting.
defer close(quit)
// We'll send an initial "no" response to advance the agent past its // We'll send an initial "no" response to advance the agent past its
// initial check. // initial check.
select { respondMoreChans(t, testCtx, moreChansResp{0, 0})
case constraints.moreChansResps <- moreChansResp{0, 0}:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
// Next we'll send a new balance update signal to the agent, adding 5 // Next we'll send a new balance update signal to the agent, adding 5
// BTC to the amount of available funds. // BTC to the amount of available funds.
walletBalanceMtx.Lock() testCtx.Lock()
walletBalance += btcutil.SatoshiPerBitcoin * 5 testCtx.walletBalance += btcutil.SatoshiPerBitcoin * 5
walletBalanceMtx.Unlock() testCtx.Unlock()
agent.OnBalanceChange() testCtx.agent.OnBalanceChange()
// The agent should now query the heuristic in order to determine its // The agent should now query the heuristic in order to determine its
// next action as it local state has now been modified. // next action as it local state has now been modified.
select { respondMoreChans(t, testCtx, moreChansResp{0, 0})
case constraints.moreChansResps <- moreChansResp{0, 0}:
// At this point, the local state of the agent should // At this point, the local state of the agent should
// have also been updated to reflect that the LN node // have also been updated to reflect that the LN node
// now has an additional 5BTC available. // now has an additional 5BTC available.
if agent.totalBalance != walletBalance { if testCtx.agent.totalBalance != testCtx.walletBalance {
t.Fatalf("expected %v wallet balance "+ t.Fatalf("expected %v wallet balance "+
"instead have %v", agent.totalBalance, "instead have %v", testCtx.agent.totalBalance,
walletBalance) testCtx.walletBalance)
}
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
} }
// There shouldn't be a call to the Select method as we've returned // There shouldn't be a call to the Select method as we've returned
@ -629,7 +486,7 @@ func TestAgentBalanceUpdate(t *testing.T) {
// If this send success, then Select was erroneously called and the // If this send success, then Select was erroneously called and the
// test should be failed. // test should be failed.
case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}: case testCtx.heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}:
t.Fatalf("Select was called but shouldn't have been") t.Fatalf("Select was called but shouldn't have been")
// This is the correct path as Select should've be called. // This is the correct path as Select should've be called.
@ -642,70 +499,8 @@ func TestAgentBalanceUpdate(t *testing.T) {
func TestAgentImmediateAttach(t *testing.T) { func TestAgentImmediateAttach(t *testing.T) {
t.Parallel() t.Parallel()
// First, we'll create all the dependencies that we'll need in order to testCtx, cleanup := setup(t, nil)
// create the autopilot agent. defer cleanup()
self, err := randKey()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
quit := make(chan struct{})
heuristic := &mockHeuristic{
nodeScoresResps: make(chan map[NodeID]*NodeScore),
quit: quit,
}
constraints := &mockConstraints{
moreChansResps: make(chan moreChansResp),
quit: quit,
}
chanController := &mockChanController{
openChanSignals: make(chan openChanIntent),
}
memGraph, _, _ := newMemChanGraph()
// The wallet will start with 10 BTC available.
const walletBalance = btcutil.SatoshiPerBitcoin * 10
// With the dependencies we created, we can now create the initial
// agent itself.
testCfg := Config{
Self: self,
Heuristic: heuristic,
ChanController: chanController,
WalletBalance: func() (btcutil.Amount, error) {
return walletBalance, nil
},
ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) {
return false, nil
},
DisconnectPeer: func(*btcec.PublicKey) error {
return nil
},
Graph: memGraph,
Constraints: constraints,
}
initialChans := []Channel{}
agent, err := New(testCfg, initialChans)
if err != nil {
t.Fatalf("unable to create agent: %v", err)
}
// To ensure the heuristic doesn't block on quitting the agent, we'll
// use the agent's quit chan to signal when it should also stop.
heuristic.quit = agent.quit
// With the autopilot agent and all its dependencies we'll star the
// primary controller goroutine.
if err := agent.Start(); err != nil {
t.Fatalf("unable to start agent: %v", err)
}
defer agent.Stop()
// We must defer the closing of quit after the defer agent.Stop(), to
// make sure ConnectToPeer won't block preventing the agent from
// exiting.
defer close(quit)
const numChans = 5 const numChans = 5
@ -713,7 +508,7 @@ func TestAgentImmediateAttach(t *testing.T) {
directives := make(map[NodeID]*NodeScore) directives := make(map[NodeID]*NodeScore)
nodeKeys := make(map[NodeID]struct{}) nodeKeys := make(map[NodeID]struct{})
for i := 0; i < numChans; i++ { for i := 0; i < numChans; i++ {
pub, err := memGraph.addRandNode() pub, err := testCtx.graph.addRandNode()
if err != nil { if err != nil {
t.Fatalf("unable to generate key: %v", err) t.Fatalf("unable to generate key: %v", err)
} }
@ -727,32 +522,23 @@ func TestAgentImmediateAttach(t *testing.T) {
// The very first thing the agent should do is query the NeedMoreChans // The very first thing the agent should do is query the NeedMoreChans
// method on the passed heuristic. So we'll provide it with a response // method on the passed heuristic. So we'll provide it with a response
// that will kick off the main loop. // that will kick off the main loop.
select { respondMoreChans(t, testCtx,
moreChansResp{
// We'll send over a response indicating that it should
// establish more channels, and give it a budget of 5 BTC to do
// so.
case constraints.moreChansResps <- moreChansResp{
numMore: numChans, numMore: numChans,
amt: 5 * btcutil.SatoshiPerBitcoin, amt: 5 * btcutil.SatoshiPerBitcoin,
}: },
case <-time.After(time.Second * 10): )
t.Fatalf("heuristic wasn't queried in time")
}
// At this point, the agent should now be querying the heuristic to // At this point, the agent should now be querying the heuristic to
// requests attachment directives. With our fake directives created, // requests attachment directives. With our fake directives created,
// we'll now send then to the agent as a return value for the Select // we'll now send then to the agent as a return value for the Select
// function. // function.
select { respondNodeScores(t, testCtx, directives)
case heuristic.nodeScoresResps <- directives:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
// Finally, we should receive 5 calls to the OpenChannel method with // Finally, we should receive 5 calls to the OpenChannel method with
// the exact same parameters that we specified within the attachment // the exact same parameters that we specified within the attachment
// directives. // directives.
chanController := testCtx.chanController.(*mockChanController)
for i := 0; i < numChans; i++ { for i := 0; i < numChans; i++ {
select { select {
case openChan := <-chanController.openChanSignals: case openChan := <-chanController.openChanSignals:
@ -779,72 +565,12 @@ func TestAgentImmediateAttach(t *testing.T) {
func TestAgentPrivateChannels(t *testing.T) { func TestAgentPrivateChannels(t *testing.T) {
t.Parallel() t.Parallel()
// First, we'll create all the dependencies that we'll need in order to testCtx, cleanup := setup(t, nil)
// create the autopilot agent. defer cleanup()
self, err := randKey()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
quit := make(chan struct{})
heuristic := &mockHeuristic{
nodeScoresResps: make(chan map[NodeID]*NodeScore),
quit: quit,
}
constraints := &mockConstraints{
moreChansResps: make(chan moreChansResp),
quit: quit,
}
// The chanController should be initialized such that all of its open // The chanController should be initialized such that all of its open
// channel requests are for private channels. // channel requests are for private channels.
chanController := &mockChanController{ testCtx.chanController.(*mockChanController).private = true
openChanSignals: make(chan openChanIntent),
private: true,
}
memGraph, _, _ := newMemChanGraph()
// The wallet will start with 10 BTC available.
const walletBalance = btcutil.SatoshiPerBitcoin * 10
// With the dependencies we created, we can now create the initial
// agent itself.
cfg := Config{
Self: self,
Heuristic: heuristic,
ChanController: chanController,
WalletBalance: func() (btcutil.Amount, error) {
return walletBalance, nil
},
ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) {
return false, nil
},
DisconnectPeer: func(*btcec.PublicKey) error {
return nil
},
Graph: memGraph,
Constraints: constraints,
}
agent, err := New(cfg, nil)
if err != nil {
t.Fatalf("unable to create agent: %v", err)
}
// To ensure the heuristic doesn't block on quitting the agent, we'll
// use the agent's quit chan to signal when it should also stop.
heuristic.quit = agent.quit
// With the autopilot agent and all its dependencies we'll star the
// primary controller goroutine.
if err := agent.Start(); err != nil {
t.Fatalf("unable to start agent: %v", err)
}
defer agent.Stop()
// We must defer the closing of quit after the defer agent.Stop(), to
// make sure ConnectToPeer won't block preventing the agent from
// exiting.
defer close(quit)
const numChans = 5 const numChans = 5
@ -852,7 +578,7 @@ func TestAgentPrivateChannels(t *testing.T) {
// agent's graph, and it can progress within its loop. // agent's graph, and it can progress within its loop.
directives := make(map[NodeID]*NodeScore) directives := make(map[NodeID]*NodeScore)
for i := 0; i < numChans; i++ { for i := 0; i < numChans; i++ {
pub, err := memGraph.addRandNode() pub, err := testCtx.graph.addRandNode()
if err != nil { if err != nil {
t.Fatalf("unable to generate key: %v", err) t.Fatalf("unable to generate key: %v", err)
} }
@ -871,23 +597,17 @@ func TestAgentPrivateChannels(t *testing.T) {
numMore: numChans, numMore: numChans,
amt: 5 * btcutil.SatoshiPerBitcoin, amt: 5 * btcutil.SatoshiPerBitcoin,
} }
select { respondMoreChans(t, testCtx, resp)
case constraints.moreChansResps <- resp:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
// At this point, the agent should now be querying the heuristic to // At this point, the agent should now be querying the heuristic to
// requests attachment directives. With our fake directives created, // requests attachment directives. With our fake directives created,
// we'll now send then to the agent as a return value for the Select // we'll now send then to the agent as a return value for the Select
// function. // function.
select { respondNodeScores(t, testCtx, directives)
case heuristic.nodeScoresResps <- directives:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
// Finally, we should receive 5 calls to the OpenChannel method, each // Finally, we should receive 5 calls to the OpenChannel method, each
// specifying that it's for a private channel. // specifying that it's for a private channel.
chanController := testCtx.chanController.(*mockChanController)
for i := 0; i < numChans; i++ { for i := 0; i < numChans; i++ {
select { select {
case openChan := <-chanController.openChanSignals: case openChan := <-chanController.openChanSignals:
@ -906,77 +626,11 @@ func TestAgentPrivateChannels(t *testing.T) {
func TestAgentPendingChannelState(t *testing.T) { func TestAgentPendingChannelState(t *testing.T) {
t.Parallel() t.Parallel()
// First, we'll create all the dependencies that we'll need in order to testCtx, cleanup := setup(t, nil)
// create the autopilot agent. defer cleanup()
self, err := randKey()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
quit := make(chan struct{})
heuristic := &mockHeuristic{
nodeScoresResps: make(chan map[NodeID]*NodeScore),
quit: quit,
}
constraints := &mockConstraints{
moreChansResps: make(chan moreChansResp),
quit: quit,
}
chanController := &mockChanController{
openChanSignals: make(chan openChanIntent),
}
memGraph, _, _ := newMemChanGraph()
// The wallet will start with 6 BTC available.
var walletBalanceMtx sync.Mutex
walletBalance := btcutil.Amount(btcutil.SatoshiPerBitcoin * 6)
// With the dependencies we created, we can now create the initial
// agent itself.
testCfg := Config{
Self: self,
Heuristic: heuristic,
ChanController: chanController,
WalletBalance: func() (btcutil.Amount, error) {
walletBalanceMtx.Lock()
defer walletBalanceMtx.Unlock()
return walletBalance, nil
},
ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) {
return false, nil
},
DisconnectPeer: func(*btcec.PublicKey) error {
return nil
},
Graph: memGraph,
Constraints: constraints,
}
initialChans := []Channel{}
agent, err := New(testCfg, initialChans)
if err != nil {
t.Fatalf("unable to create agent: %v", err)
}
// To ensure the heuristic doesn't block on quitting the agent, we'll
// use the agent's quit chan to signal when it should also stop.
heuristic.quit = agent.quit
// With the autopilot agent and all its dependencies we'll start the
// primary controller goroutine.
if err := agent.Start(); err != nil {
t.Fatalf("unable to start agent: %v", err)
}
defer agent.Stop()
// We must defer the closing of quit after the defer agent.Stop(), to
// make sure ConnectToPeer won't block preventing the agent from
// exiting.
defer close(quit)
// We'll only return a single directive for a pre-chosen node. // We'll only return a single directive for a pre-chosen node.
nodeKey, err := memGraph.addRandNode() nodeKey, err := testCtx.graph.addRandNode()
if err != nil { if err != nil {
t.Fatalf("unable to generate key: %v", err) t.Fatalf("unable to generate key: %v", err)
} }
@ -990,31 +644,24 @@ func TestAgentPendingChannelState(t *testing.T) {
// query, that it needs more channels and has 3 BTC available for // query, that it needs more channels and has 3 BTC available for
// attachment. We'll send over a response indicating that it should // attachment. We'll send over a response indicating that it should
// establish more channels, and give it a budget of 1 BTC to do so. // establish more channels, and give it a budget of 1 BTC to do so.
select { respondMoreChans(t, testCtx,
case constraints.moreChansResps <- moreChansResp{ moreChansResp{
numMore: 1, numMore: 1,
amt: btcutil.SatoshiPerBitcoin, amt: btcutil.SatoshiPerBitcoin,
}: },
case <-time.After(time.Second * 10): )
t.Fatalf("heuristic wasn't queried in time")
}
constraints.moreChanArgs = make(chan moreChanArg) respondNodeScores(t, testCtx,
map[NodeID]*NodeScore{
select {
case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{
nodeID: nodeDirective, nodeID: nodeDirective,
}: },
case <-time.After(time.Second * 10): )
t.Fatalf("heuristic wasn't queried in time")
}
heuristic.nodeScoresArgs = make(chan directiveArg)
// A request to open the channel should've also been sent. // A request to open the channel should've also been sent.
chanController := testCtx.chanController.(*mockChanController)
select { select {
case openChan := <-chanController.openChanSignals: case openChan := <-chanController.openChanSignals:
chanAmt := constraints.MaxChanSize() chanAmt := testCtx.constraints.MaxChanSize()
if openChan.amt != chanAmt { if openChan.amt != chanAmt {
t.Fatalf("invalid chan amt: expected %v, got %v", t.Fatalf("invalid chan amt: expected %v, got %v",
chanAmt, openChan.amt) chanAmt, openChan.amt)
@ -1031,11 +678,11 @@ func TestAgentPendingChannelState(t *testing.T) {
// Now, in order to test that the pending state was properly updated, // Now, in order to test that the pending state was properly updated,
// we'll trigger a balance update in order to trigger a query to the // we'll trigger a balance update in order to trigger a query to the
// heuristic. // heuristic.
walletBalanceMtx.Lock() testCtx.Lock()
walletBalance += 0.4 * btcutil.SatoshiPerBitcoin testCtx.walletBalance += 0.4 * btcutil.SatoshiPerBitcoin
walletBalanceMtx.Unlock() testCtx.Unlock()
agent.OnBalanceChange() testCtx.agent.OnBalanceChange()
// The heuristic should be queried, and the argument for the set of // The heuristic should be queried, and the argument for the set of
// channels passed in should include the pending channels that // channels passed in should include the pending channels that
@ -1044,8 +691,8 @@ func TestAgentPendingChannelState(t *testing.T) {
// The request that we get should include a pending channel for the // The request that we get should include a pending channel for the
// one that we just created, otherwise the agent isn't properly // one that we just created, otherwise the agent isn't properly
// updating its internal state. // updating its internal state.
case req := <-constraints.moreChanArgs: case req := <-testCtx.constraints.moreChanArgs:
chanAmt := constraints.MaxChanSize() chanAmt := testCtx.constraints.MaxChanSize()
if len(req.chans) != 1 { if len(req.chans) != 1 {
t.Fatalf("should include pending chan in current "+ t.Fatalf("should include pending chan in current "+
"state, instead have %v chans", len(req.chans)) "state, instead have %v chans", len(req.chans))
@ -1065,7 +712,7 @@ func TestAgentPendingChannelState(t *testing.T) {
// We'll send across a response indicating that it *does* need more // We'll send across a response indicating that it *does* need more
// channels. // channels.
select { select {
case constraints.moreChansResps <- moreChansResp{1, btcutil.SatoshiPerBitcoin}: case testCtx.constraints.moreChansResps <- moreChansResp{1, btcutil.SatoshiPerBitcoin}:
case <-time.After(time.Second * 10): case <-time.After(time.Second * 10):
t.Fatalf("need more chans wasn't queried in time") t.Fatalf("need more chans wasn't queried in time")
} }
@ -1074,7 +721,7 @@ func TestAgentPendingChannelState(t *testing.T) {
// Select method. The arguments passed should reflect the fact that the // Select method. The arguments passed should reflect the fact that the
// node we have a pending channel to, should be ignored. // node we have a pending channel to, should be ignored.
select { select {
case req := <-heuristic.nodeScoresArgs: case req := <-testCtx.heuristic.nodeScoresArgs:
if len(req.chans) == 0 { if len(req.chans) == 0 {
t.Fatalf("expected to skip %v nodes, instead "+ t.Fatalf("expected to skip %v nodes, instead "+
"skipping %v", 1, len(req.chans)) "skipping %v", 1, len(req.chans))
@ -1093,88 +740,25 @@ func TestAgentPendingChannelState(t *testing.T) {
func TestAgentPendingOpenChannel(t *testing.T) { func TestAgentPendingOpenChannel(t *testing.T) {
t.Parallel() t.Parallel()
// First, we'll create all the dependencies that we'll need in order to testCtx, cleanup := setup(t, nil)
// create the autopilot agent. defer cleanup()
self, err := randKey()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
quit := make(chan struct{})
heuristic := &mockHeuristic{
nodeScoresResps: make(chan map[NodeID]*NodeScore),
quit: quit,
}
constraints := &mockConstraints{
moreChansResps: make(chan moreChansResp),
quit: quit,
}
chanController := &mockChanController{
openChanSignals: make(chan openChanIntent),
}
memGraph, _, _ := newMemChanGraph()
// The wallet will start with 6 BTC available.
const walletBalance = btcutil.SatoshiPerBitcoin * 6
// With the dependencies we created, we can now create the initial
// agent itself.
cfg := Config{
Self: self,
Heuristic: heuristic,
ChanController: chanController,
WalletBalance: func() (btcutil.Amount, error) {
return walletBalance, nil
},
Graph: memGraph,
Constraints: constraints,
}
agent, err := New(cfg, nil)
if err != nil {
t.Fatalf("unable to create agent: %v", err)
}
// To ensure the heuristic doesn't block on quitting the agent, we'll
// use the agent's quit chan to signal when it should also stop.
heuristic.quit = agent.quit
// With the autopilot agent and all its dependencies we'll start the
// primary controller goroutine.
if err := agent.Start(); err != nil {
t.Fatalf("unable to start agent: %v", err)
}
defer agent.Stop()
// We must defer the closing of quit after the defer agent.Stop(), to
// make sure ConnectToPeer won't block preventing the agent from
// exiting.
defer close(quit)
// We'll send an initial "no" response to advance the agent past its // We'll send an initial "no" response to advance the agent past its
// initial check. // initial check.
select { respondMoreChans(t, testCtx, moreChansResp{0, 0})
case constraints.moreChansResps <- moreChansResp{0, 0}:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
// Next, we'll signal that a new channel has been opened, but it is // Next, we'll signal that a new channel has been opened, but it is
// still pending. // still pending.
agent.OnChannelPendingOpen() testCtx.agent.OnChannelPendingOpen()
// The agent should now query the heuristic in order to determine its // The agent should now query the heuristic in order to determine its
// next action as its local state has now been modified. // next action as its local state has now been modified.
select { respondMoreChans(t, testCtx, moreChansResp{0, 0})
case constraints.moreChansResps <- moreChansResp{0, 0}:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
// There shouldn't be a call to the Select method as we've returned // There shouldn't be a call to the Select method as we've returned
// "false" for NeedMoreChans above. // "false" for NeedMoreChans above.
select { select {
case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}: case testCtx.heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}:
t.Fatalf("Select was called but shouldn't have been") t.Fatalf("Select was called but shouldn't have been")
default: default:
} }
@ -1188,108 +772,43 @@ func TestAgentPendingOpenChannel(t *testing.T) {
func TestAgentOnNodeUpdates(t *testing.T) { func TestAgentOnNodeUpdates(t *testing.T) {
t.Parallel() t.Parallel()
// First, we'll create all the dependencies that we'll need in order to testCtx, cleanup := setup(t, nil)
// create the autopilot agent. defer cleanup()
self, err := randKey()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
quit := make(chan struct{})
heuristic := &mockHeuristic{
nodeScoresResps: make(chan map[NodeID]*NodeScore),
quit: quit,
}
constraints := &mockConstraints{
moreChansResps: make(chan moreChansResp),
quit: quit,
}
chanController := &mockChanController{
openChanSignals: make(chan openChanIntent),
}
memGraph, _, _ := newMemChanGraph()
// The wallet will start with 6 BTC available.
const walletBalance = btcutil.SatoshiPerBitcoin * 6
// With the dependencies we created, we can now create the initial
// agent itself.
cfg := Config{
Self: self,
Heuristic: heuristic,
ChanController: chanController,
WalletBalance: func() (btcutil.Amount, error) {
return walletBalance, nil
},
Graph: memGraph,
Constraints: constraints,
}
agent, err := New(cfg, nil)
if err != nil {
t.Fatalf("unable to create agent: %v", err)
}
// To ensure the heuristic doesn't block on quitting the agent, we'll
// use the agent's quit chan to signal when it should also stop.
heuristic.quit = agent.quit
// With the autopilot agent and all its dependencies we'll start the
// primary controller goroutine.
if err := agent.Start(); err != nil {
t.Fatalf("unable to start agent: %v", err)
}
defer agent.Stop()
// We must defer the closing of quit after the defer agent.Stop(), to
// make sure ConnectToPeer won't block preventing the agent from
// exiting.
defer close(quit)
// We'll send an initial "yes" response to advance the agent past its // We'll send an initial "yes" response to advance the agent past its
// initial check. This will cause it to try to get directives from an // initial check. This will cause it to try to get directives from an
// empty graph. // empty graph.
select { respondMoreChans(
case constraints.moreChansResps <- moreChansResp{ t, testCtx,
moreChansResp{
numMore: 2, numMore: 2,
amt: walletBalance, amt: testCtx.walletBalance,
}: },
case <-time.After(time.Second * 10): )
t.Fatalf("heuristic wasn't queried in time")
}
// Send over an empty list of attachment directives, which should cause // Send over an empty list of attachment directives, which should cause
// the agent to return to waiting on a new signal. // the agent to return to waiting on a new signal.
select { respondNodeScores(t, testCtx, map[NodeID]*NodeScore{})
case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}:
case <-time.After(time.Second * 10):
t.Fatalf("Select was not called but should have been")
}
// Simulate more nodes being added to the graph by informing the agent // Simulate more nodes being added to the graph by informing the agent
// that we have node updates. // that we have node updates.
agent.OnNodeUpdates() testCtx.agent.OnNodeUpdates()
// In response, the agent should wake up and see if it needs more // In response, the agent should wake up and see if it needs more
// channels. Since we haven't done anything, we will send the same // channels. Since we haven't done anything, we will send the same
// response as before since we are still trying to open channels. // response as before since we are still trying to open channels.
select { respondMoreChans(
case constraints.moreChansResps <- moreChansResp{ t, testCtx,
moreChansResp{
numMore: 2, numMore: 2,
amt: walletBalance, amt: testCtx.walletBalance,
}: },
case <-time.After(time.Second * 10): )
t.Fatalf("heuristic wasn't queried in time")
}
// Again the agent should pull in the next set of attachment directives. // Again the agent should pull in the next set of attachment directives.
// It's not important that this list is also empty, so long as the node // It's not important that this list is also empty, so long as the node
// updates signal is causing the agent to make this attempt. // updates signal is causing the agent to make this attempt.
select { respondNodeScores(t, testCtx, map[NodeID]*NodeScore{})
case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}:
case <-time.After(time.Second * 10):
t.Fatalf("Select was not called but should have been")
}
} }
// TestAgentSkipPendingConns asserts that the agent will not try to make // TestAgentSkipPendingConns asserts that the agent will not try to make
@ -1301,89 +820,29 @@ func TestAgentOnNodeUpdates(t *testing.T) {
func TestAgentSkipPendingConns(t *testing.T) { func TestAgentSkipPendingConns(t *testing.T) {
t.Parallel() t.Parallel()
// First, we'll create all the dependencies that we'll need in order to testCtx, cleanup := setup(t, nil)
// create the autopilot agent. defer cleanup()
self, err := randKey()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
quit := make(chan struct{})
heuristic := &mockHeuristic{
nodeScoresArgs: make(chan directiveArg),
nodeScoresResps: make(chan map[NodeID]*NodeScore),
quit: quit,
}
constraints := &mockConstraints{
moreChansResps: make(chan moreChansResp),
quit: quit,
}
chanController := &mockChanController{
openChanSignals: make(chan openChanIntent),
}
memGraph, _, _ := newMemChanGraph()
// The wallet will start with 6 BTC available.
const walletBalance = btcutil.SatoshiPerBitcoin * 6
connect := make(chan chan error) connect := make(chan chan error)
testCtx.agent.cfg.ConnectToPeer = func(*btcec.PublicKey, []net.Addr) (bool, error) {
// With the dependencies we created, we can now create the initial
// agent itself.
testCfg := Config{
Self: self,
Heuristic: heuristic,
ChanController: chanController,
WalletBalance: func() (btcutil.Amount, error) {
return walletBalance, nil
},
ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) {
errChan := make(chan error) errChan := make(chan error)
select { select {
case connect <- errChan: case connect <- errChan:
case <-quit: case <-testCtx.quit:
return false, errors.New("quit") return false, errors.New("quit")
} }
select { select {
case err := <-errChan: case err := <-errChan:
return false, err return false, err
case <-quit: case <-testCtx.quit:
return false, errors.New("quit") return false, errors.New("quit")
} }
},
DisconnectPeer: func(*btcec.PublicKey) error {
return nil
},
Graph: memGraph,
Constraints: constraints,
} }
initialChans := []Channel{}
agent, err := New(testCfg, initialChans)
if err != nil {
t.Fatalf("unable to create agent: %v", err)
}
// To ensure the heuristic doesn't block on quitting the agent, we'll
// use the agent's quit chan to signal when it should also stop.
heuristic.quit = agent.quit
// With the autopilot agent and all its dependencies we'll start the
// primary controller goroutine.
if err := agent.Start(); err != nil {
t.Fatalf("unable to start agent: %v", err)
}
defer agent.Stop()
// We must defer the closing of quit after the defer agent.Stop(), to
// make sure ConnectToPeer won't block preventing the agent from
// exiting.
defer close(quit)
// We'll only return a single directive for a pre-chosen node. // We'll only return a single directive for a pre-chosen node.
nodeKey, err := memGraph.addRandNode() nodeKey, err := testCtx.graph.addRandNode()
if err != nil { if err != nil {
t.Fatalf("unable to generate key: %v", err) t.Fatalf("unable to generate key: %v", err)
} }
@ -1395,7 +854,7 @@ func TestAgentSkipPendingConns(t *testing.T) {
// We'll also add a second node to the graph, to keep the first one // We'll also add a second node to the graph, to keep the first one
// company. // company.
nodeKey2, err := memGraph.addRandNode() nodeKey2, err := testCtx.graph.addRandNode()
if err != nil { if err != nil {
t.Fatalf("unable to generate key: %v", err) t.Fatalf("unable to generate key: %v", err)
} }
@ -1404,18 +863,16 @@ func TestAgentSkipPendingConns(t *testing.T) {
// We'll send an initial "yes" response to advance the agent past its // We'll send an initial "yes" response to advance the agent past its
// initial check. This will cause it to try to get directives from the // initial check. This will cause it to try to get directives from the
// graph. // graph.
select { respondMoreChans(t, testCtx,
case constraints.moreChansResps <- moreChansResp{ moreChansResp{
numMore: 1, numMore: 1,
amt: walletBalance, amt: testCtx.walletBalance,
}: },
case <-time.After(time.Second * 10): )
t.Fatalf("heuristic wasn't queried in time")
}
// Both nodes should be part of the arguments. // Both nodes should be part of the arguments.
select { select {
case req := <-heuristic.nodeScoresArgs: case req := <-testCtx.heuristic.nodeScoresArgs:
if len(req.nodes) != 2 { if len(req.nodes) != 2 {
t.Fatalf("expected %v nodes, instead "+ t.Fatalf("expected %v nodes, instead "+
"had %v", 2, len(req.nodes)) "had %v", 2, len(req.nodes))
@ -1433,7 +890,7 @@ func TestAgentSkipPendingConns(t *testing.T) {
// Respond with a scored directive. We skip node2 for now, implicitly // Respond with a scored directive. We skip node2 for now, implicitly
// giving it a zero-score. // giving it a zero-score.
select { select {
case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{ case testCtx.heuristic.nodeScoresResps <- map[NodeID]*NodeScore{
NewNodeID(nodeKey): nodeDirective, NewNodeID(nodeKey): nodeDirective,
}: }:
case <-time.After(time.Second * 10): case <-time.After(time.Second * 10):
@ -1449,22 +906,20 @@ func TestAgentSkipPendingConns(t *testing.T) {
} }
// Signal the agent to go again, now that we've tried to connect. // Signal the agent to go again, now that we've tried to connect.
agent.OnNodeUpdates() testCtx.agent.OnNodeUpdates()
// The heuristic again informs the agent that we need more channels. // The heuristic again informs the agent that we need more channels.
select { respondMoreChans(t, testCtx,
case constraints.moreChansResps <- moreChansResp{ moreChansResp{
numMore: 1, numMore: 1,
amt: walletBalance, amt: testCtx.walletBalance,
}: },
case <-time.After(time.Second * 10): )
t.Fatalf("heuristic wasn't queried in time")
}
// Since the node now has a pending connection, it should be skipped // Since the node now has a pending connection, it should be skipped
// and not part of the nodes attempting to be scored. // and not part of the nodes attempting to be scored.
select { select {
case req := <-heuristic.nodeScoresArgs: case req := <-testCtx.heuristic.nodeScoresArgs:
if len(req.nodes) != 1 { if len(req.nodes) != 1 {
t.Fatalf("expected %v nodes, instead "+ t.Fatalf("expected %v nodes, instead "+
"had %v", 1, len(req.nodes)) "had %v", 1, len(req.nodes))
@ -1478,7 +933,7 @@ func TestAgentSkipPendingConns(t *testing.T) {
// Respond with an emtpty score set. // Respond with an emtpty score set.
select { select {
case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}: case testCtx.heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}:
case <-time.After(time.Second * 10): case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time") t.Fatalf("heuristic wasn't queried in time")
} }
@ -1501,20 +956,18 @@ func TestAgentSkipPendingConns(t *testing.T) {
// The agent will now retry since the last connection attempt failed. // The agent will now retry since the last connection attempt failed.
// The heuristic again informs the agent that we need more channels. // The heuristic again informs the agent that we need more channels.
select { respondMoreChans(t, testCtx,
case constraints.moreChansResps <- moreChansResp{ moreChansResp{
numMore: 1, numMore: 1,
amt: walletBalance, amt: testCtx.walletBalance,
}: },
case <-time.After(time.Second * 10): )
t.Fatalf("heuristic wasn't queried in time")
}
// The node should now be marked as "failed", which should make it // The node should now be marked as "failed", which should make it
// being skipped during scoring. Again check that it won't be among the // being skipped during scoring. Again check that it won't be among the
// score request. // score request.
select { select {
case req := <-heuristic.nodeScoresArgs: case req := <-testCtx.heuristic.nodeScoresArgs:
if len(req.nodes) != 1 { if len(req.nodes) != 1 {
t.Fatalf("expected %v nodes, instead "+ t.Fatalf("expected %v nodes, instead "+
"had %v", 1, len(req.nodes)) "had %v", 1, len(req.nodes))
@ -1532,7 +985,7 @@ func TestAgentSkipPendingConns(t *testing.T) {
Score: 0.5, Score: 0.5,
} }
select { select {
case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{ case testCtx.heuristic.nodeScoresResps <- map[NodeID]*NodeScore{
nodeID2: nodeDirective2, nodeID2: nodeDirective2,
}: }:
case <-time.After(time.Second * 10): case <-time.After(time.Second * 10):
@ -1552,86 +1005,30 @@ func TestAgentSkipPendingConns(t *testing.T) {
func TestAgentQuitWhenPendingConns(t *testing.T) { func TestAgentQuitWhenPendingConns(t *testing.T) {
t.Parallel() t.Parallel()
// First, we'll create all the dependencies that we'll need in order to testCtx, cleanup := setup(t, nil)
// create the autopilot agent. defer cleanup()
self, err := randKey()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
quit := make(chan struct{})
defer close(quit)
heuristic := &mockHeuristic{
nodeScoresArgs: make(chan directiveArg),
nodeScoresResps: make(chan map[NodeID]*NodeScore),
quit: quit,
}
constraints := &mockConstraints{
moreChansResps: make(chan moreChansResp),
quit: quit,
}
chanController := &mockChanController{
openChanSignals: make(chan openChanIntent),
}
memGraph, _, _ := newMemChanGraph()
// The wallet will start with 6 BTC available.
const walletBalance = btcutil.SatoshiPerBitcoin * 6
connect := make(chan chan error) connect := make(chan chan error)
// With the dependencies we created, we can now create the initial testCtx.agent.cfg.ConnectToPeer = func(*btcec.PublicKey, []net.Addr) (bool, error) {
// agent itself.
testCfg := Config{
Self: self,
Heuristic: heuristic,
ChanController: chanController,
WalletBalance: func() (btcutil.Amount, error) {
return walletBalance, nil
},
ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) {
errChan := make(chan error) errChan := make(chan error)
select { select {
case connect <- errChan: case connect <- errChan:
case <-quit: case <-testCtx.quit:
return false, errors.New("quit") return false, errors.New("quit")
} }
select { select {
case err := <-errChan: case err := <-errChan:
return false, err return false, err
case <-quit: case <-testCtx.quit:
return false, errors.New("quit") return false, errors.New("quit")
} }
},
DisconnectPeer: func(*btcec.PublicKey) error {
return nil
},
Graph: memGraph,
Constraints: constraints,
} }
initialChans := []Channel{}
agent, err := New(testCfg, initialChans)
if err != nil {
t.Fatalf("unable to create agent: %v", err)
}
// To ensure the heuristic doesn't block on quitting the agent, we'll
// use the agent's quit chan to signal when it should also stop.
heuristic.quit = agent.quit
// With the autopilot agent and all its dependencies we'll start the
// primary controller goroutine.
if err := agent.Start(); err != nil {
t.Fatalf("unable to start agent: %v", err)
}
defer agent.Stop()
// We'll only return a single directive for a pre-chosen node. // We'll only return a single directive for a pre-chosen node.
nodeKey, err := memGraph.addRandNode() nodeKey, err := testCtx.graph.addRandNode()
if err != nil { if err != nil {
t.Fatalf("unable to generate key: %v", err) t.Fatalf("unable to generate key: %v", err)
} }
@ -1644,18 +1041,16 @@ func TestAgentQuitWhenPendingConns(t *testing.T) {
// We'll send an initial "yes" response to advance the agent past its // We'll send an initial "yes" response to advance the agent past its
// initial check. This will cause it to try to get directives from the // initial check. This will cause it to try to get directives from the
// graph. // graph.
select { respondMoreChans(t, testCtx,
case constraints.moreChansResps <- moreChansResp{ moreChansResp{
numMore: 1, numMore: 1,
amt: walletBalance, amt: testCtx.walletBalance,
}: },
case <-time.After(time.Second * 10): )
t.Fatalf("heuristic wasn't queried in time")
}
// Check the args. // Check the args.
select { select {
case req := <-heuristic.nodeScoresArgs: case req := <-testCtx.heuristic.nodeScoresArgs:
if len(req.nodes) != 1 { if len(req.nodes) != 1 {
t.Fatalf("expected %v nodes, instead "+ t.Fatalf("expected %v nodes, instead "+
"had %v", 1, len(req.nodes)) "had %v", 1, len(req.nodes))
@ -1669,7 +1064,7 @@ func TestAgentQuitWhenPendingConns(t *testing.T) {
// Respond with a scored directive. // Respond with a scored directive.
select { select {
case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{ case testCtx.heuristic.nodeScoresResps <- map[NodeID]*NodeScore{
NewNodeID(nodeKey): nodeDirective, NewNodeID(nodeKey): nodeDirective,
}: }:
case <-time.After(time.Second * 10): case <-time.After(time.Second * 10):
@ -1687,7 +1082,7 @@ func TestAgentQuitWhenPendingConns(t *testing.T) {
// pending connection. // pending connection.
stopped := make(chan error) stopped := make(chan error)
go func() { go func() {
stopped <- agent.Stop() stopped <- testCtx.agent.Stop()
}() }()
select { select {
@ -1699,3 +1094,219 @@ func TestAgentQuitWhenPendingConns(t *testing.T) {
t.Fatalf("unable to stop agent") t.Fatalf("unable to stop agent")
} }
} }
// respondWithScores checks that the moreChansRequest contains what we expect,
// and responds with the given node scores.
func respondWithScores(t *testing.T, testCtx *testContext,
channelBudget btcutil.Amount, existingChans, newChans int,
nodeScores map[NodeID]*NodeScore) {
t.Helper()
select {
case testCtx.constraints.moreChansResps <- moreChansResp{
numMore: uint32(newChans),
amt: channelBudget,
}:
case <-time.After(time.Second * 3):
t.Fatalf("heuristic wasn't queried in time")
}
// The agent should query for scores using the constraints returned
// above. We expect the agent to use the maximum channel size when
// opening channels.
chanSize := testCtx.constraints.MaxChanSize()
select {
case req := <-testCtx.heuristic.nodeScoresArgs:
// All nodes in the graph should be potential channel
// candidates.
if len(req.nodes) != len(nodeScores) {
t.Fatalf("expected %v nodes, instead had %v",
len(nodeScores), len(req.nodes))
}
// 'existingChans' is already open.
if len(req.chans) != existingChans {
t.Fatalf("expected %d existing channel, got %v",
existingChans, len(req.chans))
}
if req.amt != chanSize {
t.Fatalf("expected channel size of %v, got %v",
chanSize, req.amt)
}
case <-time.After(time.Second * 3):
t.Fatalf("select wasn't queried in time")
}
// Respond with the given scores.
select {
case testCtx.heuristic.nodeScoresResps <- nodeScores:
case <-time.After(time.Second * 3):
t.Fatalf("NodeScores wasn't queried in time")
}
}
// checkChannelOpens asserts that the channel controller attempts open the
// number of channels we expect, and with the exact total allocation.
func checkChannelOpens(t *testing.T, testCtx *testContext,
allocation btcutil.Amount, numChans int) []NodeID {
var nodes []NodeID
// The agent should attempt to open channels, totaling what we expect.
var totalAllocation btcutil.Amount
chanController := testCtx.chanController.(*mockChanController)
for i := 0; i < numChans; i++ {
select {
case openChan := <-chanController.openChanSignals:
totalAllocation += openChan.amt
testCtx.Lock()
testCtx.walletBalance -= openChan.amt
testCtx.Unlock()
nodes = append(nodes, NewNodeID(openChan.target))
case <-time.After(time.Second * 3):
t.Fatalf("channel not opened in time")
}
}
if totalAllocation != allocation {
t.Fatalf("expected agent to open channels totalling %v, "+
"instead was %v", allocation, totalAllocation)
}
// Finally, make sure the agent won't try opening more channels.
select {
case <-chanController.openChanSignals:
t.Fatalf("agent unexpectedly opened channel")
case <-time.After(50 * time.Millisecond):
}
return nodes
}
// TestAgentChannelSizeAllocation tests that the autopilot agent opens channel
// of size that stays within the channel budget and size restrictions.
func TestAgentChannelSizeAllocation(t *testing.T) {
t.Parallel()
// Total number of nodes in our mock graph.
const numNodes = 10
testCtx, cleanup := setup(t, nil)
defer cleanup()
nodeScores := make(map[NodeID]*NodeScore)
for i := 0; i < numNodes; i++ {
nodeKey, err := testCtx.graph.addRandNode()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
nodeID := NewNodeID(nodeKey)
nodeScores[nodeID] = &NodeScore{
NodeID: nodeID,
Score: 0.5,
}
}
// The agent should now query the heuristic in order to determine its
// next action as it local state has now been modified.
select {
case arg := <-testCtx.constraints.moreChanArgs:
if len(arg.chans) != 0 {
t.Fatalf("expected agent to have no channels open, "+
"had %v", len(arg.chans))
}
if arg.balance != testCtx.walletBalance {
t.Fatalf("expectd agent to have %v balance, had %v",
testCtx.walletBalance, arg.balance)
}
case <-time.After(time.Second * 3):
t.Fatalf("heuristic wasn't queried in time")
}
// We'll return a response telling the agent to open 5 channels, with a
// total channel budget of 5 BTC.
var channelBudget btcutil.Amount = 5 * btcutil.SatoshiPerBitcoin
numExistingChannels := 0
numNewChannels := 5
respondWithScores(
t, testCtx, channelBudget, numExistingChannels,
numNewChannels, nodeScores,
)
expectedAllocation := testCtx.constraints.MaxChanSize() * btcutil.Amount(numNewChannels)
nodes := checkChannelOpens(
t, testCtx, expectedAllocation, numNewChannels,
)
// Delete the selected nodes from our set of scores, to avoid scoring
// nodes we already have channels to.
for _, node := range nodes {
delete(nodeScores, node)
}
// TODO(halseth): this loop is a hack to ensure all the attempted
// channels are accounted for. This happens because the agent will
// query the ChannelBudget before all the pending channels are added to
// the map. Fix by adding them to the pending channels map before
// executing directives in goroutines?
waitForNumChans := func(expChans int) {
t.Helper()
Loop:
for {
select {
case arg := <-testCtx.constraints.moreChanArgs:
// As long as the number of existing channels
// is below our expected number of channels,
// we'll keep responding with "no more
// channels".
if len(arg.chans) != expChans {
select {
case testCtx.constraints.moreChansResps <- moreChansResp{0, 0}:
case <-time.After(time.Second * 3):
t.Fatalf("heuristic wasn't " +
"queried in time")
}
continue
}
if arg.balance != testCtx.walletBalance {
t.Fatalf("expectd agent to have %v "+
"balance, had %v",
testCtx.walletBalance,
arg.balance)
}
break Loop
case <-time.After(time.Second * 3):
t.Fatalf("heuristic wasn't queried in time")
}
}
}
// Wait for the agent to have 5 channels.
waitForNumChans(numNewChannels)
// Set the channel budget to 1.5 BTC.
channelBudget = btcutil.SatoshiPerBitcoin * 3 / 2
// We'll return a response telling the agent to open 3 channels, with a
// total channel budget of 1.5 BTC.
numExistingChannels = 5
numNewChannels = 3
respondWithScores(
t, testCtx, channelBudget, numExistingChannels,
numNewChannels, nodeScores,
)
// To stay within the budget, we expect the autopilot to open 2
// channels.
checkChannelOpens(t, testCtx, channelBudget, 2)
}