diff --git a/autopilot/agent.go b/autopilot/agent.go index 2ff5cf0b..2f257173 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -567,10 +567,15 @@ func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32, // As channel size we'll use the maximum channel size available. chanSize := a.cfg.Constraints.MaxChanSize() - if availableFunds-chanSize < 0 { + if availableFunds < chanSize { chanSize = availableFunds } + if chanSize < a.cfg.Constraints.MinChanSize() { + return fmt.Errorf("not enough funds available to open a " + + "single channel") + } + // Use the heuristic to calculate a score for each node in the // graph. scores, err := a.cfg.Heuristic.NodeScores( @@ -601,6 +606,17 @@ func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32, continue } + // Track the available funds we have left. + if availableFunds < chanSize { + chanSize = availableFunds + } + availableFunds -= chanSize + + // If we run out of funds, we can break early. + if chanSize < a.cfg.Constraints.MinChanSize() { + break + } + chanCandidates[nID] = &AttachmentDirective{ NodeID: nID, ChanAmt: chanSize, @@ -725,8 +741,7 @@ func (a *Agent) executeDirective(directive AttachmentDirective) { // fewer slots were available, and other successful attempts finished // first. a.pendingMtx.Lock() - if uint16(len(a.pendingOpens)) >= - a.cfg.Constraints.MaxPendingOpens() { + if uint16(len(a.pendingOpens)) >= a.cfg.Constraints.MaxPendingOpens() { // Since we've reached our max number of pending opens, we'll // disconnect this peer and exit. However, if we were // previously connected to them, then we'll make sure to @@ -799,6 +814,9 @@ func (a *Agent) executeDirective(directive AttachmentDirective) { // Since the channel open was successful and is currently pending, // we'll trigger the autopilot agent to query for more peers. + // TODO(halseth): this triggers a new loop before all the new channels + // are added to the pending channels map. Should add before executing + // directive in goroutine? a.OnChannelPendingOpen() } diff --git a/autopilot/agent_test.go b/autopilot/agent_test.go index 2c0b2280..92b2b358 100644 --- a/autopilot/agent_test.go +++ b/autopilot/agent_test.go @@ -58,7 +58,7 @@ func (m *mockConstraints) MaxPendingOpens() uint16 { } func (m *mockConstraints) MinChanSize() btcutil.Amount { - return 0 + return 1e7 } func (m *mockConstraints) MaxChanSize() btcutil.Amount { return 1e8 @@ -85,13 +85,13 @@ func (m *mockHeuristic) Name() string { } func (m *mockHeuristic) NodeScores(g ChannelGraph, chans []Channel, - fundsAvailable btcutil.Amount, nodes map[NodeID]struct{}) ( + chanSize btcutil.Amount, nodes map[NodeID]struct{}) ( map[NodeID]*NodeScore, error) { if m.nodeScoresArgs != nil { directive := directiveArg{ graph: g, - amt: fundsAvailable, + amt: chanSize, chans: chans, nodes: nodes, } @@ -150,10 +150,20 @@ func (m *mockChanController) SpliceOut(chanPoint *wire.OutPoint, var _ ChannelController = (*mockChanController)(nil) -// TestAgentChannelOpenSignal tests that upon receipt of a chanOpenUpdate, then -// agent modifies its local state accordingly, and reconsults the heuristic. -func TestAgentChannelOpenSignal(t *testing.T) { - t.Parallel() +type testContext struct { + constraints *mockConstraints + heuristic *mockHeuristic + chanController ChannelController + graph testGraph + agent *Agent + walletBalance btcutil.Amount + + quit chan struct{} + sync.Mutex +} + +func setup(t *testing.T, initialChans []Channel) (*testContext, func()) { + t.Helper() // First, we'll create all the dependencies that we'll need in order to // create the autopilot agent. @@ -164,11 +174,13 @@ func TestAgentChannelOpenSignal(t *testing.T) { quit := make(chan struct{}) heuristic := &mockHeuristic{ + nodeScoresArgs: make(chan directiveArg), nodeScoresResps: make(chan map[NodeID]*NodeScore), quit: quit, } constraints := &mockConstraints{ moreChansResps: make(chan moreChansResp), + moreChanArgs: make(chan moreChanArg), quit: quit, } @@ -177,6 +189,19 @@ func TestAgentChannelOpenSignal(t *testing.T) { } memGraph, _, _ := newMemChanGraph() + // We'll keep track of the funds available to the agent, to make sure + // it correctly uses this value when querying the ChannelBudget. + var availableFunds btcutil.Amount = 10 * btcutil.SatoshiPerBitcoin + + ctx := &testContext{ + constraints: constraints, + heuristic: heuristic, + chanController: chanController, + graph: memGraph, + walletBalance: availableFunds, + quit: quit, + } + // With the dependencies we created, we can now create the initial // agent itself. testCfg := Config{ @@ -184,7 +209,9 @@ func TestAgentChannelOpenSignal(t *testing.T) { Heuristic: heuristic, ChanController: chanController, WalletBalance: func() (btcutil.Amount, error) { - return 0, nil + ctx.Lock() + defer ctx.Unlock() + return ctx.walletBalance, nil }, ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { return false, nil @@ -195,35 +222,80 @@ func TestAgentChannelOpenSignal(t *testing.T) { Graph: memGraph, Constraints: constraints, } - initialChans := []Channel{} + agent, err := New(testCfg, initialChans) if err != nil { t.Fatalf("unable to create agent: %v", err) } + ctx.agent = agent - // To ensure the heuristic doesn't block on quitting the agent, we'll - // use the agent's quit chan to signal when it should also stop. - heuristic.quit = agent.quit - - // With the autopilot agent and all its dependencies we'll star the + // With the autopilot agent and all its dependencies we'll start the // primary controller goroutine. if err := agent.Start(); err != nil { t.Fatalf("unable to start agent: %v", err) } - defer agent.Stop() - // We must defer the closing of quit after the defer agent.Stop(), to - // make sure ConnectToPeer won't block preventing the agent from - // exiting. - defer close(quit) + cleanup := func() { + // We must close quit before agent.Stop(), to make sure + // ChannelBudget won't block preventing the agent from exiting. + close(quit) + agent.Stop() + } + + return ctx, cleanup +} + +// respondMoreChans consumes the moreChanArgs element and responds to the agent +// with the given moreChansResp. +func respondMoreChans(t *testing.T, testCtx *testContext, resp moreChansResp) { + t.Helper() + + // The agent should now query the heuristic. + select { + case <-testCtx.constraints.moreChanArgs: + case <-time.After(time.Second * 3): + t.Fatalf("heuristic wasn't queried in time") + } + + // We'll send the response. + select { + case testCtx.constraints.moreChansResps <- resp: + case <-time.After(time.Second * 10): + t.Fatalf("response wasn't sent in time") + } +} + +// respondMoreChans consumes the nodeScoresArgs element and responds to the +// agent with the given node scores. +func respondNodeScores(t *testing.T, testCtx *testContext, + resp map[NodeID]*NodeScore) { + t.Helper() + + // Send over an empty list of attachment directives, which should cause + // the agent to return to waiting on a new signal. + select { + case <-testCtx.heuristic.nodeScoresArgs: + case <-time.After(time.Second * 3): + t.Fatalf("node scores weren't queried in time") + } + select { + case testCtx.heuristic.nodeScoresResps <- resp: + case <-time.After(time.Second * 10): + t.Fatalf("node scores were not sent in time") + } +} + +// TestAgentChannelOpenSignal tests that upon receipt of a chanOpenUpdate, then +// agent modifies its local state accordingly, and reconsults the heuristic. +func TestAgentChannelOpenSignal(t *testing.T) { + t.Parallel() + + testCtx, cleanup := setup(t, nil) + defer cleanup() // We'll send an initial "no" response to advance the agent past its // initial check. - select { - case constraints.moreChansResps <- moreChansResp{0, 0}: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } + respondMoreChans(t, testCtx, moreChansResp{0, 0}) // Next we'll signal a new channel being opened by the backing LN node, // with a capacity of 1 BTC. @@ -231,21 +303,17 @@ func TestAgentChannelOpenSignal(t *testing.T) { ChanID: randChanID(), Capacity: btcutil.SatoshiPerBitcoin, } - agent.OnChannelOpen(newChan) + testCtx.agent.OnChannelOpen(newChan) // The agent should now query the heuristic in order to determine its // next action as it local state has now been modified. - select { - case constraints.moreChansResps <- moreChansResp{0, 0}: - // At this point, the local state of the agent should - // have also been updated to reflect that the LN node - // now has an additional channel with one BTC. - if _, ok := agent.chanState[newChan.ChanID]; !ok { - t.Fatalf("internal channel state wasn't updated") - } + respondMoreChans(t, testCtx, moreChansResp{0, 0}) - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") + // At this point, the local state of the agent should + // have also been updated to reflect that the LN node + // now has an additional channel with one BTC. + if _, ok := testCtx.agent.chanState[newChan.ChanID]; !ok { + t.Fatalf("internal channel state wasn't updated") } // There shouldn't be a call to the Select method as we've returned @@ -254,7 +322,7 @@ func TestAgentChannelOpenSignal(t *testing.T) { // If this send success, then Select was erroneously called and the // test should be failed. - case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}: + case testCtx.heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}: t.Fatalf("Select was called but shouldn't have been") // This is the correct path as Select should've be called. @@ -290,79 +358,19 @@ var _ ChannelController = (*mockFailingChanController)(nil) func TestAgentChannelFailureSignal(t *testing.T) { t.Parallel() - // First, we'll create all the dependencies that we'll need in order to - // create the autopilot agent. - self, err := randKey() - if err != nil { - t.Fatalf("unable to generate key: %v", err) - } + testCtx, cleanup := setup(t, nil) + defer cleanup() - quit := make(chan struct{}) - heuristic := &mockHeuristic{ - nodeScoresResps: make(chan map[NodeID]*NodeScore), - quit: quit, - } - constraints := &mockConstraints{ - moreChansResps: make(chan moreChansResp), - quit: quit, - } + testCtx.chanController = &mockFailingChanController{} - chanController := &mockFailingChanController{} - memGraph, _, _ := newMemChanGraph() - node, err := memGraph.addRandNode() + node, err := testCtx.graph.addRandNode() if err != nil { t.Fatalf("unable to add node: %v", err) } - // With the dependencies we created, we can now create the initial - // agent itself. - testCfg := Config{ - Self: self, - Heuristic: heuristic, - ChanController: chanController, - WalletBalance: func() (btcutil.Amount, error) { - return 0, nil - }, - // TODO: move address check to agent. - ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { - return false, nil - }, - DisconnectPeer: func(*btcec.PublicKey) error { - return nil - }, - Graph: memGraph, - Constraints: constraints, - } - - initialChans := []Channel{} - agent, err := New(testCfg, initialChans) - if err != nil { - t.Fatalf("unable to create agent: %v", err) - } - - // To ensure the heuristic doesn't block on quitting the agent, we'll - // use the agent's quit chan to signal when it should also stop. - heuristic.quit = agent.quit - - // With the autopilot agent and all its dependencies we'll start the - // primary controller goroutine. - if err := agent.Start(); err != nil { - t.Fatalf("unable to start agent: %v", err) - } - defer agent.Stop() - - // We must defer the closing of quit after the defer agent.Stop(), to - // make sure ConnectToPeer won't block preventing the agent from - // exiting. - defer close(quit) - // First ensure the agent will attempt to open a new channel. Return // that we need more channels, and have 5BTC to use. - select { - case constraints.moreChansResps <- moreChansResp{1, 5 * btcutil.SatoshiPerBitcoin}: - case <-time.After(time.Second * 10): - t.Fatal("heuristic wasn't queried in time") - } + respondMoreChans(t, testCtx, moreChansResp{1, 5 * btcutil.SatoshiPerBitcoin}) // At this point, the agent should now be querying the heuristic to // request attachment directives, return a fake so the agent will @@ -372,28 +380,17 @@ func TestAgentChannelFailureSignal(t *testing.T) { Score: 0.5, } - select { - case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{ - NewNodeID(node): fakeDirective, - }: - case <-time.After(time.Second * 10): - t.Fatal("heuristic wasn't queried in time") - } + respondNodeScores( + t, testCtx, map[NodeID]*NodeScore{ + NewNodeID(node): fakeDirective, + }, + ) // At this point the agent will attempt to create a channel and fail. // Now ensure that the controller loop is re-executed. - select { - case constraints.moreChansResps <- moreChansResp{1, 5 * btcutil.SatoshiPerBitcoin}: - case <-time.After(time.Second * 10): - t.Fatal("heuristic wasn't queried in time") - } - - select { - case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}: - case <-time.After(time.Second * 10): - t.Fatal("heuristic wasn't queried in time") - } + respondMoreChans(t, testCtx, moreChansResp{1, 5 * btcutil.SatoshiPerBitcoin}) + respondNodeScores(t, testCtx, map[NodeID]*NodeScore{}) } // TestAgentChannelCloseSignal ensures that once the agent receives an outside @@ -401,48 +398,6 @@ func TestAgentChannelFailureSignal(t *testing.T) { // will query the heuristic to make its next decision. func TestAgentChannelCloseSignal(t *testing.T) { t.Parallel() - - // First, we'll create all the dependencies that we'll need in order to - // create the autopilot agent. - self, err := randKey() - if err != nil { - t.Fatalf("unable to generate key: %v", err) - } - - quit := make(chan struct{}) - heuristic := &mockHeuristic{ - nodeScoresResps: make(chan map[NodeID]*NodeScore), - quit: quit, - } - constraints := &mockConstraints{ - moreChansResps: make(chan moreChansResp), - quit: quit, - } - - chanController := &mockChanController{ - openChanSignals: make(chan openChanIntent), - } - memGraph, _, _ := newMemChanGraph() - - // With the dependencies we created, we can now create the initial - // agent itself. - testCfg := Config{ - Self: self, - Heuristic: heuristic, - ChanController: chanController, - WalletBalance: func() (btcutil.Amount, error) { - return 0, nil - }, - ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { - return false, nil - }, - DisconnectPeer: func(*btcec.PublicKey) error { - return nil - }, - Graph: memGraph, - Constraints: constraints, - } - // We'll start the agent with two channels already being active. initialChans := []Channel{ { @@ -454,52 +409,27 @@ func TestAgentChannelCloseSignal(t *testing.T) { Capacity: btcutil.SatoshiPerBitcoin * 2, }, } - agent, err := New(testCfg, initialChans) - if err != nil { - t.Fatalf("unable to create agent: %v", err) - } - // To ensure the heuristic doesn't block on quitting the agent, we'll - // use the agent's quit chan to signal when it should also stop. - heuristic.quit = agent.quit - - // With the autopilot agent and all its dependencies we'll star the - // primary controller goroutine. - if err := agent.Start(); err != nil { - t.Fatalf("unable to start agent: %v", err) - } - defer agent.Stop() - - // We must defer the closing of quit after the defer agent.Stop(), to - // make sure ConnectToPeer won't block preventing the agent from - // exiting. - defer close(quit) + testCtx, cleanup := setup(t, initialChans) + defer cleanup() // We'll send an initial "no" response to advance the agent past its // initial check. - select { - case constraints.moreChansResps <- moreChansResp{0, 0}: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } + respondMoreChans(t, testCtx, moreChansResp{0, 0}) // Next, we'll close both channels which should force the agent to // re-query the heuristic. - agent.OnChannelClose(initialChans[0].ChanID, initialChans[1].ChanID) + testCtx.agent.OnChannelClose(initialChans[0].ChanID, initialChans[1].ChanID) // The agent should now query the heuristic in order to determine its // next action as it local state has now been modified. - select { - case constraints.moreChansResps <- moreChansResp{0, 0}: - // At this point, the local state of the agent should - // have also been updated to reflect that the LN node - // has no existing open channels. - if len(agent.chanState) != 0 { - t.Fatalf("internal channel state wasn't updated") - } + respondMoreChans(t, testCtx, moreChansResp{0, 0}) - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") + // At this point, the local state of the agent should + // have also been updated to reflect that the LN node + // has no existing open channels. + if len(testCtx.agent.chanState) != 0 { + t.Fatalf("internal channel state wasn't updated") } // There shouldn't be a call to the Select method as we've returned @@ -508,7 +438,7 @@ func TestAgentChannelCloseSignal(t *testing.T) { // If this send success, then Select was erroneously called and the // test should be failed. - case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}: + case testCtx.heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}: t.Fatalf("Select was called but shouldn't have been") // This is the correct path as Select should've be called. @@ -522,105 +452,32 @@ func TestAgentChannelCloseSignal(t *testing.T) { func TestAgentBalanceUpdate(t *testing.T) { t.Parallel() - // First, we'll create all the dependencies that we'll need in order to - // create the autopilot agent. - self, err := randKey() - if err != nil { - t.Fatalf("unable to generate key: %v", err) - } - - quit := make(chan struct{}) - heuristic := &mockHeuristic{ - nodeScoresResps: make(chan map[NodeID]*NodeScore), - quit: quit, - } - constraints := &mockConstraints{ - moreChansResps: make(chan moreChansResp), - quit: quit, - } - - chanController := &mockChanController{ - openChanSignals: make(chan openChanIntent), - } - memGraph, _, _ := newMemChanGraph() - - // The wallet will start with 2 BTC available. - var walletBalanceMtx sync.Mutex - walletBalance := btcutil.Amount(btcutil.SatoshiPerBitcoin * 2) - - // With the dependencies we created, we can now create the initial - // agent itself. - testCfg := Config{ - Self: self, - Heuristic: heuristic, - ChanController: chanController, - WalletBalance: func() (btcutil.Amount, error) { - walletBalanceMtx.Lock() - defer walletBalanceMtx.Unlock() - return walletBalance, nil - }, - ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { - return false, nil - }, - DisconnectPeer: func(*btcec.PublicKey) error { - return nil - }, - Graph: memGraph, - Constraints: constraints, - } - initialChans := []Channel{} - agent, err := New(testCfg, initialChans) - if err != nil { - t.Fatalf("unable to create agent: %v", err) - } - - // To ensure the heuristic doesn't block on quitting the agent, we'll - // use the agent's quit chan to signal when it should also stop. - heuristic.quit = agent.quit - - // With the autopilot agent and all its dependencies we'll star the - // primary controller goroutine. - if err := agent.Start(); err != nil { - t.Fatalf("unable to start agent: %v", err) - } - defer agent.Stop() - - // We must defer the closing of quit after the defer agent.Stop(), to - // make sure ConnectToPeer won't block preventing the agent from - // exiting. - defer close(quit) + testCtx, cleanup := setup(t, nil) + defer cleanup() // We'll send an initial "no" response to advance the agent past its // initial check. - select { - case constraints.moreChansResps <- moreChansResp{0, 0}: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } + respondMoreChans(t, testCtx, moreChansResp{0, 0}) // Next we'll send a new balance update signal to the agent, adding 5 // BTC to the amount of available funds. - walletBalanceMtx.Lock() - walletBalance += btcutil.SatoshiPerBitcoin * 5 - walletBalanceMtx.Unlock() + testCtx.Lock() + testCtx.walletBalance += btcutil.SatoshiPerBitcoin * 5 + testCtx.Unlock() - agent.OnBalanceChange() + testCtx.agent.OnBalanceChange() // The agent should now query the heuristic in order to determine its // next action as it local state has now been modified. - select { - case constraints.moreChansResps <- moreChansResp{0, 0}: - // At this point, the local state of the agent should - // have also been updated to reflect that the LN node - // now has an additional 5BTC available. - if agent.totalBalance != walletBalance { - t.Fatalf("expected %v wallet balance "+ - "instead have %v", agent.totalBalance, - walletBalance) - } + respondMoreChans(t, testCtx, moreChansResp{0, 0}) - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") + // At this point, the local state of the agent should + // have also been updated to reflect that the LN node + // now has an additional 5BTC available. + if testCtx.agent.totalBalance != testCtx.walletBalance { + t.Fatalf("expected %v wallet balance "+ + "instead have %v", testCtx.agent.totalBalance, + testCtx.walletBalance) } // There shouldn't be a call to the Select method as we've returned @@ -629,7 +486,7 @@ func TestAgentBalanceUpdate(t *testing.T) { // If this send success, then Select was erroneously called and the // test should be failed. - case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}: + case testCtx.heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}: t.Fatalf("Select was called but shouldn't have been") // This is the correct path as Select should've be called. @@ -642,70 +499,8 @@ func TestAgentBalanceUpdate(t *testing.T) { func TestAgentImmediateAttach(t *testing.T) { t.Parallel() - // First, we'll create all the dependencies that we'll need in order to - // create the autopilot agent. - self, err := randKey() - if err != nil { - t.Fatalf("unable to generate key: %v", err) - } - - quit := make(chan struct{}) - heuristic := &mockHeuristic{ - nodeScoresResps: make(chan map[NodeID]*NodeScore), - quit: quit, - } - constraints := &mockConstraints{ - moreChansResps: make(chan moreChansResp), - quit: quit, - } - - chanController := &mockChanController{ - openChanSignals: make(chan openChanIntent), - } - memGraph, _, _ := newMemChanGraph() - - // The wallet will start with 10 BTC available. - const walletBalance = btcutil.SatoshiPerBitcoin * 10 - - // With the dependencies we created, we can now create the initial - // agent itself. - testCfg := Config{ - Self: self, - Heuristic: heuristic, - ChanController: chanController, - WalletBalance: func() (btcutil.Amount, error) { - return walletBalance, nil - }, - ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { - return false, nil - }, - DisconnectPeer: func(*btcec.PublicKey) error { - return nil - }, - Graph: memGraph, - Constraints: constraints, - } - initialChans := []Channel{} - agent, err := New(testCfg, initialChans) - if err != nil { - t.Fatalf("unable to create agent: %v", err) - } - - // To ensure the heuristic doesn't block on quitting the agent, we'll - // use the agent's quit chan to signal when it should also stop. - heuristic.quit = agent.quit - - // With the autopilot agent and all its dependencies we'll star the - // primary controller goroutine. - if err := agent.Start(); err != nil { - t.Fatalf("unable to start agent: %v", err) - } - defer agent.Stop() - - // We must defer the closing of quit after the defer agent.Stop(), to - // make sure ConnectToPeer won't block preventing the agent from - // exiting. - defer close(quit) + testCtx, cleanup := setup(t, nil) + defer cleanup() const numChans = 5 @@ -713,7 +508,7 @@ func TestAgentImmediateAttach(t *testing.T) { directives := make(map[NodeID]*NodeScore) nodeKeys := make(map[NodeID]struct{}) for i := 0; i < numChans; i++ { - pub, err := memGraph.addRandNode() + pub, err := testCtx.graph.addRandNode() if err != nil { t.Fatalf("unable to generate key: %v", err) } @@ -727,32 +522,23 @@ func TestAgentImmediateAttach(t *testing.T) { // The very first thing the agent should do is query the NeedMoreChans // method on the passed heuristic. So we'll provide it with a response // that will kick off the main loop. - select { - - // We'll send over a response indicating that it should - // establish more channels, and give it a budget of 5 BTC to do - // so. - case constraints.moreChansResps <- moreChansResp{ - numMore: numChans, - amt: 5 * btcutil.SatoshiPerBitcoin, - }: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } + respondMoreChans(t, testCtx, + moreChansResp{ + numMore: numChans, + amt: 5 * btcutil.SatoshiPerBitcoin, + }, + ) // At this point, the agent should now be querying the heuristic to // requests attachment directives. With our fake directives created, // we'll now send then to the agent as a return value for the Select // function. - select { - case heuristic.nodeScoresResps <- directives: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } + respondNodeScores(t, testCtx, directives) // Finally, we should receive 5 calls to the OpenChannel method with // the exact same parameters that we specified within the attachment // directives. + chanController := testCtx.chanController.(*mockChanController) for i := 0; i < numChans; i++ { select { case openChan := <-chanController.openChanSignals: @@ -779,72 +565,12 @@ func TestAgentImmediateAttach(t *testing.T) { func TestAgentPrivateChannels(t *testing.T) { t.Parallel() - // First, we'll create all the dependencies that we'll need in order to - // create the autopilot agent. - self, err := randKey() - if err != nil { - t.Fatalf("unable to generate key: %v", err) - } - - quit := make(chan struct{}) - heuristic := &mockHeuristic{ - nodeScoresResps: make(chan map[NodeID]*NodeScore), - quit: quit, - } - constraints := &mockConstraints{ - moreChansResps: make(chan moreChansResp), - quit: quit, - } + testCtx, cleanup := setup(t, nil) + defer cleanup() // The chanController should be initialized such that all of its open // channel requests are for private channels. - chanController := &mockChanController{ - openChanSignals: make(chan openChanIntent), - private: true, - } - memGraph, _, _ := newMemChanGraph() - - // The wallet will start with 10 BTC available. - const walletBalance = btcutil.SatoshiPerBitcoin * 10 - - // With the dependencies we created, we can now create the initial - // agent itself. - cfg := Config{ - Self: self, - Heuristic: heuristic, - ChanController: chanController, - WalletBalance: func() (btcutil.Amount, error) { - return walletBalance, nil - }, - ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { - return false, nil - }, - DisconnectPeer: func(*btcec.PublicKey) error { - return nil - }, - Graph: memGraph, - Constraints: constraints, - } - agent, err := New(cfg, nil) - if err != nil { - t.Fatalf("unable to create agent: %v", err) - } - - // To ensure the heuristic doesn't block on quitting the agent, we'll - // use the agent's quit chan to signal when it should also stop. - heuristic.quit = agent.quit - - // With the autopilot agent and all its dependencies we'll star the - // primary controller goroutine. - if err := agent.Start(); err != nil { - t.Fatalf("unable to start agent: %v", err) - } - defer agent.Stop() - - // We must defer the closing of quit after the defer agent.Stop(), to - // make sure ConnectToPeer won't block preventing the agent from - // exiting. - defer close(quit) + testCtx.chanController.(*mockChanController).private = true const numChans = 5 @@ -852,7 +578,7 @@ func TestAgentPrivateChannels(t *testing.T) { // agent's graph, and it can progress within its loop. directives := make(map[NodeID]*NodeScore) for i := 0; i < numChans; i++ { - pub, err := memGraph.addRandNode() + pub, err := testCtx.graph.addRandNode() if err != nil { t.Fatalf("unable to generate key: %v", err) } @@ -871,23 +597,17 @@ func TestAgentPrivateChannels(t *testing.T) { numMore: numChans, amt: 5 * btcutil.SatoshiPerBitcoin, } - select { - case constraints.moreChansResps <- resp: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } + respondMoreChans(t, testCtx, resp) + // At this point, the agent should now be querying the heuristic to // requests attachment directives. With our fake directives created, // we'll now send then to the agent as a return value for the Select // function. - select { - case heuristic.nodeScoresResps <- directives: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } + respondNodeScores(t, testCtx, directives) // Finally, we should receive 5 calls to the OpenChannel method, each // specifying that it's for a private channel. + chanController := testCtx.chanController.(*mockChanController) for i := 0; i < numChans; i++ { select { case openChan := <-chanController.openChanSignals: @@ -906,77 +626,11 @@ func TestAgentPrivateChannels(t *testing.T) { func TestAgentPendingChannelState(t *testing.T) { t.Parallel() - // First, we'll create all the dependencies that we'll need in order to - // create the autopilot agent. - self, err := randKey() - if err != nil { - t.Fatalf("unable to generate key: %v", err) - } - - quit := make(chan struct{}) - heuristic := &mockHeuristic{ - nodeScoresResps: make(chan map[NodeID]*NodeScore), - quit: quit, - } - constraints := &mockConstraints{ - moreChansResps: make(chan moreChansResp), - quit: quit, - } - - chanController := &mockChanController{ - openChanSignals: make(chan openChanIntent), - } - memGraph, _, _ := newMemChanGraph() - - // The wallet will start with 6 BTC available. - var walletBalanceMtx sync.Mutex - walletBalance := btcutil.Amount(btcutil.SatoshiPerBitcoin * 6) - - // With the dependencies we created, we can now create the initial - // agent itself. - testCfg := Config{ - Self: self, - Heuristic: heuristic, - ChanController: chanController, - WalletBalance: func() (btcutil.Amount, error) { - walletBalanceMtx.Lock() - defer walletBalanceMtx.Unlock() - - return walletBalance, nil - }, - ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { - return false, nil - }, - DisconnectPeer: func(*btcec.PublicKey) error { - return nil - }, - Graph: memGraph, - Constraints: constraints, - } - initialChans := []Channel{} - agent, err := New(testCfg, initialChans) - if err != nil { - t.Fatalf("unable to create agent: %v", err) - } - - // To ensure the heuristic doesn't block on quitting the agent, we'll - // use the agent's quit chan to signal when it should also stop. - heuristic.quit = agent.quit - - // With the autopilot agent and all its dependencies we'll start the - // primary controller goroutine. - if err := agent.Start(); err != nil { - t.Fatalf("unable to start agent: %v", err) - } - defer agent.Stop() - - // We must defer the closing of quit after the defer agent.Stop(), to - // make sure ConnectToPeer won't block preventing the agent from - // exiting. - defer close(quit) + testCtx, cleanup := setup(t, nil) + defer cleanup() // We'll only return a single directive for a pre-chosen node. - nodeKey, err := memGraph.addRandNode() + nodeKey, err := testCtx.graph.addRandNode() if err != nil { t.Fatalf("unable to generate key: %v", err) } @@ -990,31 +644,24 @@ func TestAgentPendingChannelState(t *testing.T) { // query, that it needs more channels and has 3 BTC available for // attachment. We'll send over a response indicating that it should // establish more channels, and give it a budget of 1 BTC to do so. - select { - case constraints.moreChansResps <- moreChansResp{ - numMore: 1, - amt: btcutil.SatoshiPerBitcoin, - }: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } + respondMoreChans(t, testCtx, + moreChansResp{ + numMore: 1, + amt: btcutil.SatoshiPerBitcoin, + }, + ) - constraints.moreChanArgs = make(chan moreChanArg) - - select { - case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{ - nodeID: nodeDirective, - }: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } - - heuristic.nodeScoresArgs = make(chan directiveArg) + respondNodeScores(t, testCtx, + map[NodeID]*NodeScore{ + nodeID: nodeDirective, + }, + ) // A request to open the channel should've also been sent. + chanController := testCtx.chanController.(*mockChanController) select { case openChan := <-chanController.openChanSignals: - chanAmt := constraints.MaxChanSize() + chanAmt := testCtx.constraints.MaxChanSize() if openChan.amt != chanAmt { t.Fatalf("invalid chan amt: expected %v, got %v", chanAmt, openChan.amt) @@ -1031,11 +678,11 @@ func TestAgentPendingChannelState(t *testing.T) { // Now, in order to test that the pending state was properly updated, // we'll trigger a balance update in order to trigger a query to the // heuristic. - walletBalanceMtx.Lock() - walletBalance += 0.4 * btcutil.SatoshiPerBitcoin - walletBalanceMtx.Unlock() + testCtx.Lock() + testCtx.walletBalance += 0.4 * btcutil.SatoshiPerBitcoin + testCtx.Unlock() - agent.OnBalanceChange() + testCtx.agent.OnBalanceChange() // The heuristic should be queried, and the argument for the set of // channels passed in should include the pending channels that @@ -1044,8 +691,8 @@ func TestAgentPendingChannelState(t *testing.T) { // The request that we get should include a pending channel for the // one that we just created, otherwise the agent isn't properly // updating its internal state. - case req := <-constraints.moreChanArgs: - chanAmt := constraints.MaxChanSize() + case req := <-testCtx.constraints.moreChanArgs: + chanAmt := testCtx.constraints.MaxChanSize() if len(req.chans) != 1 { t.Fatalf("should include pending chan in current "+ "state, instead have %v chans", len(req.chans)) @@ -1065,7 +712,7 @@ func TestAgentPendingChannelState(t *testing.T) { // We'll send across a response indicating that it *does* need more // channels. select { - case constraints.moreChansResps <- moreChansResp{1, btcutil.SatoshiPerBitcoin}: + case testCtx.constraints.moreChansResps <- moreChansResp{1, btcutil.SatoshiPerBitcoin}: case <-time.After(time.Second * 10): t.Fatalf("need more chans wasn't queried in time") } @@ -1074,7 +721,7 @@ func TestAgentPendingChannelState(t *testing.T) { // Select method. The arguments passed should reflect the fact that the // node we have a pending channel to, should be ignored. select { - case req := <-heuristic.nodeScoresArgs: + case req := <-testCtx.heuristic.nodeScoresArgs: if len(req.chans) == 0 { t.Fatalf("expected to skip %v nodes, instead "+ "skipping %v", 1, len(req.chans)) @@ -1093,88 +740,25 @@ func TestAgentPendingChannelState(t *testing.T) { func TestAgentPendingOpenChannel(t *testing.T) { t.Parallel() - // First, we'll create all the dependencies that we'll need in order to - // create the autopilot agent. - self, err := randKey() - if err != nil { - t.Fatalf("unable to generate key: %v", err) - } - - quit := make(chan struct{}) - heuristic := &mockHeuristic{ - nodeScoresResps: make(chan map[NodeID]*NodeScore), - quit: quit, - } - constraints := &mockConstraints{ - moreChansResps: make(chan moreChansResp), - quit: quit, - } - - chanController := &mockChanController{ - openChanSignals: make(chan openChanIntent), - } - memGraph, _, _ := newMemChanGraph() - - // The wallet will start with 6 BTC available. - const walletBalance = btcutil.SatoshiPerBitcoin * 6 - - // With the dependencies we created, we can now create the initial - // agent itself. - cfg := Config{ - Self: self, - Heuristic: heuristic, - ChanController: chanController, - WalletBalance: func() (btcutil.Amount, error) { - return walletBalance, nil - }, - Graph: memGraph, - Constraints: constraints, - } - agent, err := New(cfg, nil) - if err != nil { - t.Fatalf("unable to create agent: %v", err) - } - - // To ensure the heuristic doesn't block on quitting the agent, we'll - // use the agent's quit chan to signal when it should also stop. - heuristic.quit = agent.quit - - // With the autopilot agent and all its dependencies we'll start the - // primary controller goroutine. - if err := agent.Start(); err != nil { - t.Fatalf("unable to start agent: %v", err) - } - defer agent.Stop() - - // We must defer the closing of quit after the defer agent.Stop(), to - // make sure ConnectToPeer won't block preventing the agent from - // exiting. - defer close(quit) + testCtx, cleanup := setup(t, nil) + defer cleanup() // We'll send an initial "no" response to advance the agent past its // initial check. - select { - case constraints.moreChansResps <- moreChansResp{0, 0}: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } + respondMoreChans(t, testCtx, moreChansResp{0, 0}) // Next, we'll signal that a new channel has been opened, but it is // still pending. - agent.OnChannelPendingOpen() + testCtx.agent.OnChannelPendingOpen() // The agent should now query the heuristic in order to determine its // next action as its local state has now been modified. - select { - case constraints.moreChansResps <- moreChansResp{0, 0}: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } + respondMoreChans(t, testCtx, moreChansResp{0, 0}) // There shouldn't be a call to the Select method as we've returned // "false" for NeedMoreChans above. select { - case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}: + case testCtx.heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}: t.Fatalf("Select was called but shouldn't have been") default: } @@ -1188,108 +772,43 @@ func TestAgentPendingOpenChannel(t *testing.T) { func TestAgentOnNodeUpdates(t *testing.T) { t.Parallel() - // First, we'll create all the dependencies that we'll need in order to - // create the autopilot agent. - self, err := randKey() - if err != nil { - t.Fatalf("unable to generate key: %v", err) - } - - quit := make(chan struct{}) - heuristic := &mockHeuristic{ - nodeScoresResps: make(chan map[NodeID]*NodeScore), - quit: quit, - } - constraints := &mockConstraints{ - moreChansResps: make(chan moreChansResp), - quit: quit, - } - - chanController := &mockChanController{ - openChanSignals: make(chan openChanIntent), - } - memGraph, _, _ := newMemChanGraph() - - // The wallet will start with 6 BTC available. - const walletBalance = btcutil.SatoshiPerBitcoin * 6 - - // With the dependencies we created, we can now create the initial - // agent itself. - cfg := Config{ - Self: self, - Heuristic: heuristic, - ChanController: chanController, - WalletBalance: func() (btcutil.Amount, error) { - return walletBalance, nil - }, - Graph: memGraph, - Constraints: constraints, - } - agent, err := New(cfg, nil) - if err != nil { - t.Fatalf("unable to create agent: %v", err) - } - - // To ensure the heuristic doesn't block on quitting the agent, we'll - // use the agent's quit chan to signal when it should also stop. - heuristic.quit = agent.quit - - // With the autopilot agent and all its dependencies we'll start the - // primary controller goroutine. - if err := agent.Start(); err != nil { - t.Fatalf("unable to start agent: %v", err) - } - defer agent.Stop() - - // We must defer the closing of quit after the defer agent.Stop(), to - // make sure ConnectToPeer won't block preventing the agent from - // exiting. - defer close(quit) + testCtx, cleanup := setup(t, nil) + defer cleanup() // We'll send an initial "yes" response to advance the agent past its // initial check. This will cause it to try to get directives from an // empty graph. - select { - case constraints.moreChansResps <- moreChansResp{ - numMore: 2, - amt: walletBalance, - }: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } + respondMoreChans( + t, testCtx, + moreChansResp{ + numMore: 2, + amt: testCtx.walletBalance, + }, + ) // Send over an empty list of attachment directives, which should cause // the agent to return to waiting on a new signal. - select { - case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}: - case <-time.After(time.Second * 10): - t.Fatalf("Select was not called but should have been") - } + respondNodeScores(t, testCtx, map[NodeID]*NodeScore{}) // Simulate more nodes being added to the graph by informing the agent // that we have node updates. - agent.OnNodeUpdates() + testCtx.agent.OnNodeUpdates() // In response, the agent should wake up and see if it needs more // channels. Since we haven't done anything, we will send the same // response as before since we are still trying to open channels. - select { - case constraints.moreChansResps <- moreChansResp{ - numMore: 2, - amt: walletBalance, - }: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } + respondMoreChans( + t, testCtx, + moreChansResp{ + numMore: 2, + amt: testCtx.walletBalance, + }, + ) // Again the agent should pull in the next set of attachment directives. // It's not important that this list is also empty, so long as the node // updates signal is causing the agent to make this attempt. - select { - case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}: - case <-time.After(time.Second * 10): - t.Fatalf("Select was not called but should have been") - } + respondNodeScores(t, testCtx, map[NodeID]*NodeScore{}) } // TestAgentSkipPendingConns asserts that the agent will not try to make @@ -1301,89 +820,29 @@ func TestAgentOnNodeUpdates(t *testing.T) { func TestAgentSkipPendingConns(t *testing.T) { t.Parallel() - // First, we'll create all the dependencies that we'll need in order to - // create the autopilot agent. - self, err := randKey() - if err != nil { - t.Fatalf("unable to generate key: %v", err) - } - - quit := make(chan struct{}) - heuristic := &mockHeuristic{ - nodeScoresArgs: make(chan directiveArg), - nodeScoresResps: make(chan map[NodeID]*NodeScore), - quit: quit, - } - constraints := &mockConstraints{ - moreChansResps: make(chan moreChansResp), - quit: quit, - } - - chanController := &mockChanController{ - openChanSignals: make(chan openChanIntent), - } - memGraph, _, _ := newMemChanGraph() - - // The wallet will start with 6 BTC available. - const walletBalance = btcutil.SatoshiPerBitcoin * 6 + testCtx, cleanup := setup(t, nil) + defer cleanup() connect := make(chan chan error) + testCtx.agent.cfg.ConnectToPeer = func(*btcec.PublicKey, []net.Addr) (bool, error) { + errChan := make(chan error) - // With the dependencies we created, we can now create the initial - // agent itself. - testCfg := Config{ - Self: self, - Heuristic: heuristic, - ChanController: chanController, - WalletBalance: func() (btcutil.Amount, error) { - return walletBalance, nil - }, - ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { - errChan := make(chan error) + select { + case connect <- errChan: + case <-testCtx.quit: + return false, errors.New("quit") + } - select { - case connect <- errChan: - case <-quit: - return false, errors.New("quit") - } - - select { - case err := <-errChan: - return false, err - case <-quit: - return false, errors.New("quit") - } - }, - DisconnectPeer: func(*btcec.PublicKey) error { - return nil - }, - Graph: memGraph, - Constraints: constraints, + select { + case err := <-errChan: + return false, err + case <-testCtx.quit: + return false, errors.New("quit") + } } - initialChans := []Channel{} - agent, err := New(testCfg, initialChans) - if err != nil { - t.Fatalf("unable to create agent: %v", err) - } - - // To ensure the heuristic doesn't block on quitting the agent, we'll - // use the agent's quit chan to signal when it should also stop. - heuristic.quit = agent.quit - - // With the autopilot agent and all its dependencies we'll start the - // primary controller goroutine. - if err := agent.Start(); err != nil { - t.Fatalf("unable to start agent: %v", err) - } - defer agent.Stop() - - // We must defer the closing of quit after the defer agent.Stop(), to - // make sure ConnectToPeer won't block preventing the agent from - // exiting. - defer close(quit) // We'll only return a single directive for a pre-chosen node. - nodeKey, err := memGraph.addRandNode() + nodeKey, err := testCtx.graph.addRandNode() if err != nil { t.Fatalf("unable to generate key: %v", err) } @@ -1395,7 +854,7 @@ func TestAgentSkipPendingConns(t *testing.T) { // We'll also add a second node to the graph, to keep the first one // company. - nodeKey2, err := memGraph.addRandNode() + nodeKey2, err := testCtx.graph.addRandNode() if err != nil { t.Fatalf("unable to generate key: %v", err) } @@ -1404,18 +863,16 @@ func TestAgentSkipPendingConns(t *testing.T) { // We'll send an initial "yes" response to advance the agent past its // initial check. This will cause it to try to get directives from the // graph. - select { - case constraints.moreChansResps <- moreChansResp{ - numMore: 1, - amt: walletBalance, - }: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } + respondMoreChans(t, testCtx, + moreChansResp{ + numMore: 1, + amt: testCtx.walletBalance, + }, + ) // Both nodes should be part of the arguments. select { - case req := <-heuristic.nodeScoresArgs: + case req := <-testCtx.heuristic.nodeScoresArgs: if len(req.nodes) != 2 { t.Fatalf("expected %v nodes, instead "+ "had %v", 2, len(req.nodes)) @@ -1433,7 +890,7 @@ func TestAgentSkipPendingConns(t *testing.T) { // Respond with a scored directive. We skip node2 for now, implicitly // giving it a zero-score. select { - case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{ + case testCtx.heuristic.nodeScoresResps <- map[NodeID]*NodeScore{ NewNodeID(nodeKey): nodeDirective, }: case <-time.After(time.Second * 10): @@ -1449,22 +906,20 @@ func TestAgentSkipPendingConns(t *testing.T) { } // Signal the agent to go again, now that we've tried to connect. - agent.OnNodeUpdates() + testCtx.agent.OnNodeUpdates() // The heuristic again informs the agent that we need more channels. - select { - case constraints.moreChansResps <- moreChansResp{ - numMore: 1, - amt: walletBalance, - }: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } + respondMoreChans(t, testCtx, + moreChansResp{ + numMore: 1, + amt: testCtx.walletBalance, + }, + ) // Since the node now has a pending connection, it should be skipped // and not part of the nodes attempting to be scored. select { - case req := <-heuristic.nodeScoresArgs: + case req := <-testCtx.heuristic.nodeScoresArgs: if len(req.nodes) != 1 { t.Fatalf("expected %v nodes, instead "+ "had %v", 1, len(req.nodes)) @@ -1478,7 +933,7 @@ func TestAgentSkipPendingConns(t *testing.T) { // Respond with an emtpty score set. select { - case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}: + case testCtx.heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}: case <-time.After(time.Second * 10): t.Fatalf("heuristic wasn't queried in time") } @@ -1501,20 +956,18 @@ func TestAgentSkipPendingConns(t *testing.T) { // The agent will now retry since the last connection attempt failed. // The heuristic again informs the agent that we need more channels. - select { - case constraints.moreChansResps <- moreChansResp{ - numMore: 1, - amt: walletBalance, - }: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } + respondMoreChans(t, testCtx, + moreChansResp{ + numMore: 1, + amt: testCtx.walletBalance, + }, + ) // The node should now be marked as "failed", which should make it // being skipped during scoring. Again check that it won't be among the // score request. select { - case req := <-heuristic.nodeScoresArgs: + case req := <-testCtx.heuristic.nodeScoresArgs: if len(req.nodes) != 1 { t.Fatalf("expected %v nodes, instead "+ "had %v", 1, len(req.nodes)) @@ -1532,7 +985,7 @@ func TestAgentSkipPendingConns(t *testing.T) { Score: 0.5, } select { - case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{ + case testCtx.heuristic.nodeScoresResps <- map[NodeID]*NodeScore{ nodeID2: nodeDirective2, }: case <-time.After(time.Second * 10): @@ -1552,86 +1005,30 @@ func TestAgentSkipPendingConns(t *testing.T) { func TestAgentQuitWhenPendingConns(t *testing.T) { t.Parallel() - // First, we'll create all the dependencies that we'll need in order to - // create the autopilot agent. - self, err := randKey() - if err != nil { - t.Fatalf("unable to generate key: %v", err) - } - - quit := make(chan struct{}) - defer close(quit) - - heuristic := &mockHeuristic{ - nodeScoresArgs: make(chan directiveArg), - nodeScoresResps: make(chan map[NodeID]*NodeScore), - quit: quit, - } - constraints := &mockConstraints{ - moreChansResps: make(chan moreChansResp), - quit: quit, - } - - chanController := &mockChanController{ - openChanSignals: make(chan openChanIntent), - } - memGraph, _, _ := newMemChanGraph() - - // The wallet will start with 6 BTC available. - const walletBalance = btcutil.SatoshiPerBitcoin * 6 + testCtx, cleanup := setup(t, nil) + defer cleanup() connect := make(chan chan error) - // With the dependencies we created, we can now create the initial - // agent itself. - testCfg := Config{ - Self: self, - Heuristic: heuristic, - ChanController: chanController, - WalletBalance: func() (btcutil.Amount, error) { - return walletBalance, nil - }, - ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { - errChan := make(chan error) + testCtx.agent.cfg.ConnectToPeer = func(*btcec.PublicKey, []net.Addr) (bool, error) { + errChan := make(chan error) - select { - case connect <- errChan: - case <-quit: - return false, errors.New("quit") - } + select { + case connect <- errChan: + case <-testCtx.quit: + return false, errors.New("quit") + } - select { - case err := <-errChan: - return false, err - case <-quit: - return false, errors.New("quit") - } - }, - DisconnectPeer: func(*btcec.PublicKey) error { - return nil - }, - Graph: memGraph, - Constraints: constraints, + select { + case err := <-errChan: + return false, err + case <-testCtx.quit: + return false, errors.New("quit") + } } - initialChans := []Channel{} - agent, err := New(testCfg, initialChans) - if err != nil { - t.Fatalf("unable to create agent: %v", err) - } - - // To ensure the heuristic doesn't block on quitting the agent, we'll - // use the agent's quit chan to signal when it should also stop. - heuristic.quit = agent.quit - - // With the autopilot agent and all its dependencies we'll start the - // primary controller goroutine. - if err := agent.Start(); err != nil { - t.Fatalf("unable to start agent: %v", err) - } - defer agent.Stop() // We'll only return a single directive for a pre-chosen node. - nodeKey, err := memGraph.addRandNode() + nodeKey, err := testCtx.graph.addRandNode() if err != nil { t.Fatalf("unable to generate key: %v", err) } @@ -1644,18 +1041,16 @@ func TestAgentQuitWhenPendingConns(t *testing.T) { // We'll send an initial "yes" response to advance the agent past its // initial check. This will cause it to try to get directives from the // graph. - select { - case constraints.moreChansResps <- moreChansResp{ - numMore: 1, - amt: walletBalance, - }: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } + respondMoreChans(t, testCtx, + moreChansResp{ + numMore: 1, + amt: testCtx.walletBalance, + }, + ) // Check the args. select { - case req := <-heuristic.nodeScoresArgs: + case req := <-testCtx.heuristic.nodeScoresArgs: if len(req.nodes) != 1 { t.Fatalf("expected %v nodes, instead "+ "had %v", 1, len(req.nodes)) @@ -1669,7 +1064,7 @@ func TestAgentQuitWhenPendingConns(t *testing.T) { // Respond with a scored directive. select { - case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{ + case testCtx.heuristic.nodeScoresResps <- map[NodeID]*NodeScore{ NewNodeID(nodeKey): nodeDirective, }: case <-time.After(time.Second * 10): @@ -1687,7 +1082,7 @@ func TestAgentQuitWhenPendingConns(t *testing.T) { // pending connection. stopped := make(chan error) go func() { - stopped <- agent.Stop() + stopped <- testCtx.agent.Stop() }() select { @@ -1699,3 +1094,219 @@ func TestAgentQuitWhenPendingConns(t *testing.T) { t.Fatalf("unable to stop agent") } } + +// respondWithScores checks that the moreChansRequest contains what we expect, +// and responds with the given node scores. +func respondWithScores(t *testing.T, testCtx *testContext, + channelBudget btcutil.Amount, existingChans, newChans int, + nodeScores map[NodeID]*NodeScore) { + + t.Helper() + + select { + case testCtx.constraints.moreChansResps <- moreChansResp{ + numMore: uint32(newChans), + amt: channelBudget, + }: + case <-time.After(time.Second * 3): + t.Fatalf("heuristic wasn't queried in time") + } + + // The agent should query for scores using the constraints returned + // above. We expect the agent to use the maximum channel size when + // opening channels. + chanSize := testCtx.constraints.MaxChanSize() + + select { + case req := <-testCtx.heuristic.nodeScoresArgs: + // All nodes in the graph should be potential channel + // candidates. + if len(req.nodes) != len(nodeScores) { + t.Fatalf("expected %v nodes, instead had %v", + len(nodeScores), len(req.nodes)) + } + + // 'existingChans' is already open. + if len(req.chans) != existingChans { + t.Fatalf("expected %d existing channel, got %v", + existingChans, len(req.chans)) + } + if req.amt != chanSize { + t.Fatalf("expected channel size of %v, got %v", + chanSize, req.amt) + } + + case <-time.After(time.Second * 3): + t.Fatalf("select wasn't queried in time") + } + + // Respond with the given scores. + select { + case testCtx.heuristic.nodeScoresResps <- nodeScores: + case <-time.After(time.Second * 3): + t.Fatalf("NodeScores wasn't queried in time") + } +} + +// checkChannelOpens asserts that the channel controller attempts open the +// number of channels we expect, and with the exact total allocation. +func checkChannelOpens(t *testing.T, testCtx *testContext, + allocation btcutil.Amount, numChans int) []NodeID { + + var nodes []NodeID + + // The agent should attempt to open channels, totaling what we expect. + var totalAllocation btcutil.Amount + chanController := testCtx.chanController.(*mockChanController) + for i := 0; i < numChans; i++ { + select { + case openChan := <-chanController.openChanSignals: + totalAllocation += openChan.amt + + testCtx.Lock() + testCtx.walletBalance -= openChan.amt + testCtx.Unlock() + + nodes = append(nodes, NewNodeID(openChan.target)) + + case <-time.After(time.Second * 3): + t.Fatalf("channel not opened in time") + } + } + + if totalAllocation != allocation { + t.Fatalf("expected agent to open channels totalling %v, "+ + "instead was %v", allocation, totalAllocation) + } + + // Finally, make sure the agent won't try opening more channels. + select { + case <-chanController.openChanSignals: + t.Fatalf("agent unexpectedly opened channel") + + case <-time.After(50 * time.Millisecond): + } + + return nodes +} + +// TestAgentChannelSizeAllocation tests that the autopilot agent opens channel +// of size that stays within the channel budget and size restrictions. +func TestAgentChannelSizeAllocation(t *testing.T) { + t.Parallel() + + // Total number of nodes in our mock graph. + const numNodes = 10 + + testCtx, cleanup := setup(t, nil) + defer cleanup() + + nodeScores := make(map[NodeID]*NodeScore) + for i := 0; i < numNodes; i++ { + nodeKey, err := testCtx.graph.addRandNode() + if err != nil { + t.Fatalf("unable to generate key: %v", err) + } + nodeID := NewNodeID(nodeKey) + nodeScores[nodeID] = &NodeScore{ + NodeID: nodeID, + Score: 0.5, + } + } + + // The agent should now query the heuristic in order to determine its + // next action as it local state has now been modified. + select { + case arg := <-testCtx.constraints.moreChanArgs: + if len(arg.chans) != 0 { + t.Fatalf("expected agent to have no channels open, "+ + "had %v", len(arg.chans)) + } + if arg.balance != testCtx.walletBalance { + t.Fatalf("expectd agent to have %v balance, had %v", + testCtx.walletBalance, arg.balance) + } + case <-time.After(time.Second * 3): + t.Fatalf("heuristic wasn't queried in time") + } + + // We'll return a response telling the agent to open 5 channels, with a + // total channel budget of 5 BTC. + var channelBudget btcutil.Amount = 5 * btcutil.SatoshiPerBitcoin + numExistingChannels := 0 + numNewChannels := 5 + respondWithScores( + t, testCtx, channelBudget, numExistingChannels, + numNewChannels, nodeScores, + ) + + expectedAllocation := testCtx.constraints.MaxChanSize() * btcutil.Amount(numNewChannels) + nodes := checkChannelOpens( + t, testCtx, expectedAllocation, numNewChannels, + ) + + // Delete the selected nodes from our set of scores, to avoid scoring + // nodes we already have channels to. + for _, node := range nodes { + delete(nodeScores, node) + } + + // TODO(halseth): this loop is a hack to ensure all the attempted + // channels are accounted for. This happens because the agent will + // query the ChannelBudget before all the pending channels are added to + // the map. Fix by adding them to the pending channels map before + // executing directives in goroutines? + waitForNumChans := func(expChans int) { + t.Helper() + + Loop: + for { + select { + case arg := <-testCtx.constraints.moreChanArgs: + // As long as the number of existing channels + // is below our expected number of channels, + // we'll keep responding with "no more + // channels". + if len(arg.chans) != expChans { + select { + case testCtx.constraints.moreChansResps <- moreChansResp{0, 0}: + case <-time.After(time.Second * 3): + t.Fatalf("heuristic wasn't " + + "queried in time") + } + continue + } + + if arg.balance != testCtx.walletBalance { + t.Fatalf("expectd agent to have %v "+ + "balance, had %v", + testCtx.walletBalance, + arg.balance) + } + break Loop + + case <-time.After(time.Second * 3): + t.Fatalf("heuristic wasn't queried in time") + } + } + } + + // Wait for the agent to have 5 channels. + waitForNumChans(numNewChannels) + + // Set the channel budget to 1.5 BTC. + channelBudget = btcutil.SatoshiPerBitcoin * 3 / 2 + + // We'll return a response telling the agent to open 3 channels, with a + // total channel budget of 1.5 BTC. + numExistingChannels = 5 + numNewChannels = 3 + respondWithScores( + t, testCtx, channelBudget, numExistingChannels, + numNewChannels, nodeScores, + ) + + // To stay within the budget, we expect the autopilot to open 2 + // channels. + checkChannelOpens(t, testCtx, channelBudget, 2) +}