diff --git a/autopilot/agent.go b/autopilot/agent.go index 6e521ba6..d7c14e23 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -186,6 +186,10 @@ type balanceUpdate struct { balanceDelta btcutil.Amount } +// nodeUpdates is a type of external state update that reflects an addition or +// modification in channel graph node membership. +type nodeUpdates struct{} + // chanOpenUpdate is a type of external state update that indicates a new // channel has been opened, either by the Agent itself (within the main // controller loop), or by an external user to the system. @@ -222,6 +226,20 @@ func (a *Agent) OnBalanceChange(delta btcutil.Amount) { }() } +// OnNodeUpdates is a callback that should be executed each time our channel +// graph has new nodes or their node announcements are updated. +func (a *Agent) OnNodeUpdates() { + a.wg.Add(1) + go func() { + defer a.wg.Done() + + select { + case a.stateUpdates <- &nodeUpdates{}: + case <-a.quit: + } + }() +} + // OnChannelOpen is a callback that should be executed each time a new channel // is manually opened by the user or any system outside the autopilot agent. func (a *Agent) OnChannelOpen(c Channel) { @@ -417,6 +435,14 @@ func (a *Agent) controller(startingBalance btcutil.Amount) { } updateBalance() + + // New nodes have been added to the graph or their node + // announcements have been updated. We will consider + // opening channels to these nodes if we haven't + // stabilized. + case *nodeUpdates: + log.Infof("Node updates received, assessing " + + "need for more channels") } pendingMtx.Lock() diff --git a/autopilot/agent_test.go b/autopilot/agent_test.go index d89913bf..ff47c321 100644 --- a/autopilot/agent_test.go +++ b/autopilot/agent_test.go @@ -1183,3 +1183,117 @@ func TestAgentPendingOpenChannel(t *testing.T) { default: } } + +// TestAgentOnNodeUpdates tests that the agent will wake up in response to the +// OnNodeUpdates signal. This is useful in ensuring that autopilot is always +// pulling in the latest graph updates into its decision making. It also +// prevents the agent from stalling after an initial attempt that finds no nodes +// in the graph. +func TestAgentOnNodeUpdates(t *testing.T) { + t.Parallel() + + // First, we'll create all the dependencies that we'll need in order to + // create the autopilot agent. + self, err := randKey() + if err != nil { + t.Fatalf("unable to generate key: %v", err) + } + heuristic := &mockHeuristic{ + moreChansResps: make(chan moreChansResp), + directiveResps: make(chan []AttachmentDirective), + } + chanController := &mockChanController{ + openChanSignals: make(chan openChanIntent), + } + memGraph, _, _ := newMemChanGraph() + + // The wallet will start with 6 BTC available. + const walletBalance = btcutil.SatoshiPerBitcoin * 6 + + // With the dependencies we created, we can now create the initial + // agent itself. + cfg := Config{ + Self: self, + Heuristic: heuristic, + ChanController: chanController, + WalletBalance: func() (btcutil.Amount, error) { + return walletBalance, nil + }, + Graph: memGraph, + MaxPendingOpens: 10, + } + agent, err := New(cfg, nil) + if err != nil { + t.Fatalf("unable to create agent: %v", err) + } + + // To ensure the heuristic doesn't block on quitting the agent, we'll + // use the agent's quit chan to signal when it should also stop. + heuristic.quit = agent.quit + + // With the autopilot agent and all its dependencies we'll start the + // primary controller goroutine. + if err := agent.Start(); err != nil { + t.Fatalf("unable to start agent: %v", err) + } + defer agent.Stop() + + // We'll send an initial "yes" response to advance the agent past its + // initial check. This will cause it to try to get directives from an + // empty graph. + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + select { + case heuristic.moreChansResps <- moreChansResp{ + needMore: true, + numMore: 2, + amt: walletBalance, + }: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } + }() + wg.Wait() + + // Send over an empty list of attachment directives, which should cause + // the agent to return to waiting on a new signal. + select { + case heuristic.directiveResps <- []AttachmentDirective{}: + case <-time.After(time.Second * 10): + t.Fatalf("Select was not called but should have been") + } + + // Simulate more nodes being added to the graph by informing the agent + // that we have node updates. + agent.OnNodeUpdates() + + // In response, the agent should wake up and see if it needs more + // channels. Since we haven't done anything, we will send the same + // response as before since we are still trying to open channels. + var wg2 sync.WaitGroup + wg2.Add(1) + go func() { + defer wg2.Done() + select { + case heuristic.moreChansResps <- moreChansResp{ + needMore: true, + numMore: 2, + amt: walletBalance, + }: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } + }() + wg2.Wait() + + // Again the agent should pull in the next set of attachment directives. + // It's not important that this list is also empty, so long as the node + // updates signal is causing the agent to make this attempt. + select { + case heuristic.directiveResps <- []AttachmentDirective{}: + case <-time.After(time.Second * 10): + t.Fatalf("Select was not called but should have been") + } +} diff --git a/pilot.go b/pilot.go index d9a1a200..219702a7 100644 --- a/pilot.go +++ b/pilot.go @@ -306,6 +306,13 @@ func initAutoPilot(svr *server, cfg *autoPilotConfig) (*autopilot.Agent, error) pilot.OnChannelClose(chanID) } + // If new nodes were added to the graph, or nod + // information has changed, we'll poke autopilot + // to see if it can make use of them. + if len(topChange.NodeUpdates) > 0 { + pilot.OnNodeUpdates() + } + case <-svr.quit: return }