From 602856750b3b5b4eea1fd9d979c336a755e51806 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Thu, 23 Aug 2018 18:52:49 -0700 Subject: [PATCH 1/3] autopilot/agent: add OnNodeUpdates signal Adds a new external signal alerting autopilot that new nodes have been added to the channel graph or an existing node has modified its channel announcment. This allows autopilot to examine its current state, and attempt to open channels if our target state is not yet met. --- autopilot/agent.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/autopilot/agent.go b/autopilot/agent.go index 6e521ba6..d7c14e23 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -186,6 +186,10 @@ type balanceUpdate struct { balanceDelta btcutil.Amount } +// nodeUpdates is a type of external state update that reflects an addition or +// modification in channel graph node membership. +type nodeUpdates struct{} + // chanOpenUpdate is a type of external state update that indicates a new // channel has been opened, either by the Agent itself (within the main // controller loop), or by an external user to the system. @@ -222,6 +226,20 @@ func (a *Agent) OnBalanceChange(delta btcutil.Amount) { }() } +// OnNodeUpdates is a callback that should be executed each time our channel +// graph has new nodes or their node announcements are updated. +func (a *Agent) OnNodeUpdates() { + a.wg.Add(1) + go func() { + defer a.wg.Done() + + select { + case a.stateUpdates <- &nodeUpdates{}: + case <-a.quit: + } + }() +} + // OnChannelOpen is a callback that should be executed each time a new channel // is manually opened by the user or any system outside the autopilot agent. func (a *Agent) OnChannelOpen(c Channel) { @@ -417,6 +435,14 @@ func (a *Agent) controller(startingBalance btcutil.Amount) { } updateBalance() + + // New nodes have been added to the graph or their node + // announcements have been updated. We will consider + // opening channels to these nodes if we haven't + // stabilized. + case *nodeUpdates: + log.Infof("Node updates received, assessing " + + "need for more channels") } pendingMtx.Lock() From 93f1994dc52a4000da824dd95b47f8fa4586793d Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Thu, 23 Aug 2018 18:54:44 -0700 Subject: [PATCH 2/3] pilot: signal OnNodeUpdates to autopilot --- pilot.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pilot.go b/pilot.go index d9a1a200..219702a7 100644 --- a/pilot.go +++ b/pilot.go @@ -306,6 +306,13 @@ func initAutoPilot(svr *server, cfg *autoPilotConfig) (*autopilot.Agent, error) pilot.OnChannelClose(chanID) } + // If new nodes were added to the graph, or nod + // information has changed, we'll poke autopilot + // to see if it can make use of them. + if len(topChange.NodeUpdates) > 0 { + pilot.OnNodeUpdates() + } + case <-svr.quit: return } From 1898e57807171de42fa1af630c4d4580ec7481fd Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Thu, 23 Aug 2018 18:55:03 -0700 Subject: [PATCH 3/3] autopilot/agent_test: test that agent recovers from initial empty graph --- autopilot/agent_test.go | 114 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) diff --git a/autopilot/agent_test.go b/autopilot/agent_test.go index d89913bf..ff47c321 100644 --- a/autopilot/agent_test.go +++ b/autopilot/agent_test.go @@ -1183,3 +1183,117 @@ func TestAgentPendingOpenChannel(t *testing.T) { default: } } + +// TestAgentOnNodeUpdates tests that the agent will wake up in response to the +// OnNodeUpdates signal. This is useful in ensuring that autopilot is always +// pulling in the latest graph updates into its decision making. It also +// prevents the agent from stalling after an initial attempt that finds no nodes +// in the graph. +func TestAgentOnNodeUpdates(t *testing.T) { + t.Parallel() + + // First, we'll create all the dependencies that we'll need in order to + // create the autopilot agent. + self, err := randKey() + if err != nil { + t.Fatalf("unable to generate key: %v", err) + } + heuristic := &mockHeuristic{ + moreChansResps: make(chan moreChansResp), + directiveResps: make(chan []AttachmentDirective), + } + chanController := &mockChanController{ + openChanSignals: make(chan openChanIntent), + } + memGraph, _, _ := newMemChanGraph() + + // The wallet will start with 6 BTC available. + const walletBalance = btcutil.SatoshiPerBitcoin * 6 + + // With the dependencies we created, we can now create the initial + // agent itself. + cfg := Config{ + Self: self, + Heuristic: heuristic, + ChanController: chanController, + WalletBalance: func() (btcutil.Amount, error) { + return walletBalance, nil + }, + Graph: memGraph, + MaxPendingOpens: 10, + } + agent, err := New(cfg, nil) + if err != nil { + t.Fatalf("unable to create agent: %v", err) + } + + // To ensure the heuristic doesn't block on quitting the agent, we'll + // use the agent's quit chan to signal when it should also stop. + heuristic.quit = agent.quit + + // With the autopilot agent and all its dependencies we'll start the + // primary controller goroutine. + if err := agent.Start(); err != nil { + t.Fatalf("unable to start agent: %v", err) + } + defer agent.Stop() + + // We'll send an initial "yes" response to advance the agent past its + // initial check. This will cause it to try to get directives from an + // empty graph. + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + select { + case heuristic.moreChansResps <- moreChansResp{ + needMore: true, + numMore: 2, + amt: walletBalance, + }: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } + }() + wg.Wait() + + // Send over an empty list of attachment directives, which should cause + // the agent to return to waiting on a new signal. + select { + case heuristic.directiveResps <- []AttachmentDirective{}: + case <-time.After(time.Second * 10): + t.Fatalf("Select was not called but should have been") + } + + // Simulate more nodes being added to the graph by informing the agent + // that we have node updates. + agent.OnNodeUpdates() + + // In response, the agent should wake up and see if it needs more + // channels. Since we haven't done anything, we will send the same + // response as before since we are still trying to open channels. + var wg2 sync.WaitGroup + wg2.Add(1) + go func() { + defer wg2.Done() + select { + case heuristic.moreChansResps <- moreChansResp{ + needMore: true, + numMore: 2, + amt: walletBalance, + }: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } + }() + wg2.Wait() + + // Again the agent should pull in the next set of attachment directives. + // It's not important that this list is also empty, so long as the node + // updates signal is causing the agent to make this attempt. + select { + case heuristic.directiveResps <- []AttachmentDirective{}: + case <-time.After(time.Second * 10): + t.Fatalf("Select was not called but should have been") + } +}