diff --git a/autopilot/agent.go b/autopilot/agent.go index dc8a3af9..6fe72f98 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -141,6 +141,10 @@ type Agent struct { // time. chanOpenFailures chan *chanOpenFailureUpdate + // heuristicUpdates is a channel where updates from active heurstics + // will be sent. + heuristicUpdates chan *heuristicUpdate + // totalBalance is the total number of satoshis the backing wallet is // known to control at any given instance. This value will be updated // when the agent receives external balance update signals. @@ -179,6 +183,7 @@ func New(cfg Config, initialState []Channel) (*Agent, error) { balanceUpdates: make(chan *balanceUpdate, 1), nodeUpdates: make(chan *nodeUpdates, 1), chanOpenFailures: make(chan *chanOpenFailureUpdate, 1), + heuristicUpdates: make(chan *heuristicUpdate, 1), pendingOpenUpdates: make(chan *chanPendingOpenUpdate, 1), failedNodes: make(map[NodeID]struct{}), pendingConns: make(map[NodeID]struct{}), @@ -256,6 +261,13 @@ type chanPendingOpenUpdate struct{} // a previous channel open failed, and that it might be possible to try again. type chanOpenFailureUpdate struct{} +// heuristicUpdate is an update sent when one of the autopilot heuristics has +// changed, and prompts the agent to make a new attempt at opening more +// channels. +type heuristicUpdate struct { + heuristic AttachmentHeuristic +} + // chanCloseUpdate is a type of external state update that indicates that the // backing Lightning Node has closed a previously open channel. type chanCloseUpdate struct { @@ -329,6 +341,17 @@ func (a *Agent) OnChannelClose(closedChans ...lnwire.ShortChannelID) { }() } +// OnHeuristicUpdate is a method called when a heuristic has been updated, to +// trigger the agent to do a new state assessment. +func (a *Agent) OnHeuristicUpdate(h AttachmentHeuristic) { + select { + case a.heuristicUpdates <- &heuristicUpdate{ + heuristic: h, + }: + default: + } +} + // mergeNodeMaps merges the Agent's set of nodes that it already has active // channels open to, with the other sets of nodes that should be removed from // consideration during heuristic selection. This ensures that the Agent doesn't @@ -470,6 +493,12 @@ func (a *Agent) controller() { log.Debugf("Node updates received, assessing " + "need for more channels") + // Any of the deployed heuristics has been updated, check + // whether we have new channel candidates available. + case upd := <-a.heuristicUpdates: + log.Debugf("Heuristic %v updated, assessing need for "+ + "more channels", upd.heuristic.Name()) + // The agent has been signalled to exit, so we'll bail out // immediately. case <-a.quit: @@ -541,10 +570,28 @@ func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32, connectedNodes := a.chanState.ConnectedNodes() a.chanStateMtx.Unlock() + for nID := range connectedNodes { + log.Tracef("Skipping node %x with open channel", nID[:]) + } + a.pendingMtx.Lock() + + for nID := range a.pendingOpens { + log.Tracef("Skipping node %x with pending channel open", nID[:]) + } + + for nID := range a.pendingConns { + log.Tracef("Skipping node %x with pending connection", nID[:]) + } + + for nID := range a.failedNodes { + log.Tracef("Skipping failed node %v", nID[:]) + } + nodesToSkip := mergeNodeMaps(a.pendingOpens, a.pendingConns, connectedNodes, a.failedNodes, ) + a.pendingMtx.Unlock() // Gather the set of all nodes in the graph, except those we diff --git a/autopilot/agent_test.go b/autopilot/agent_test.go index bd703c9e..26e63bf3 100644 --- a/autopilot/agent_test.go +++ b/autopilot/agent_test.go @@ -330,6 +330,54 @@ func TestAgentChannelOpenSignal(t *testing.T) { } } +// TestAgentHeuristicUpdateSignal tests that upon notification about a +// heuristic update, the agent reconsults the heuristic. +func TestAgentHeuristicUpdateSignal(t *testing.T) { + t.Parallel() + + testCtx, cleanup := setup(t, nil) + defer cleanup() + + // We'll send an initial "no" response to advance the agent past its + // initial check. + respondMoreChans(t, testCtx, moreChansResp{0, 0}) + + // Next we'll signal that one of the heuristcs have been updated. + testCtx.agent.OnHeuristicUpdate(testCtx.heuristic) + + // The update should trigger the agent to ask for a channel budget.so + // we'll respond that there is a budget for opening 1 more channel. + respondMoreChans(t, testCtx, + moreChansResp{ + numMore: 1, + amt: 1 * btcutil.SatoshiPerBitcoin, + }, + ) + + // At this point, the agent should now be querying the heuristic for + // scores. We'll respond. + pub, err := testCtx.graph.addRandNode() + if err != nil { + t.Fatalf("unable to generate key: %v", err) + } + nodeID := NewNodeID(pub) + scores := map[NodeID]*NodeScore{ + nodeID: { + NodeID: nodeID, + Score: 0.5, + }, + } + respondNodeScores(t, testCtx, scores) + + // Finally, this should result in the agent opening a channel. + chanController := testCtx.chanController.(*mockChanController) + select { + case <-chanController.openChanSignals: + case <-time.After(time.Second * 10): + t.Fatalf("channel not opened in time") + } +} + // A mockFailingChanController always fails to open a channel. type mockFailingChanController struct { } diff --git a/autopilot/manager.go b/autopilot/manager.go index 3253fdaf..88f6c37d 100644 --- a/autopilot/manager.go +++ b/autopilot/manager.go @@ -319,8 +319,12 @@ func (m *Manager) queryHeuristics(nodes map[NodeID]struct{}, localState bool) ( // We'll start by getting the scores from each available sub-heuristic, // in addition the current agent heuristic. + var heuristics []AttachmentHeuristic + heuristics = append(heuristics, availableHeuristics...) + heuristics = append(heuristics, m.cfg.PilotCfg.Heuristic) + report := make(HeuristicScores) - for _, h := range append(availableHeuristics, m.cfg.PilotCfg.Heuristic) { + for _, h := range heuristics { name := h.Name() // If the agent heuristic is among the simple heuristics it @@ -354,6 +358,9 @@ func (m *Manager) queryHeuristics(nodes map[NodeID]struct{}, localState bool) ( // SetNodeScores is used to set the scores of the given heuristic, if it is // active, and ScoreSettable. func (m *Manager) SetNodeScores(name string, scores map[NodeID]float64) error { + m.Lock() + defer m.Unlock() + // It must be ScoreSettable to be available for external // scores. s, ok := m.cfg.PilotCfg.Heuristic.(ScoreSettable) @@ -372,5 +379,11 @@ func (m *Manager) SetNodeScores(name string, scores map[NodeID]float64) error { return fmt.Errorf("heuristic with name %v not found", name) } + // If the autopilot agent is active, notify about the updated + // heuristic. + if m.pilot != nil { + m.pilot.OnHeuristicUpdate(m.cfg.PilotCfg.Heuristic) + } + return nil } diff --git a/pilot.go b/pilot.go index b783c5b6..6fa3a6c6 100644 --- a/pilot.go +++ b/pilot.go @@ -31,7 +31,7 @@ func validateAtplCfg(cfg *autoPilotConfig) ([]*autopilot.WeightedHeuristic, for _, a := range autopilot.AvailableHeuristics { heuristicsStr += fmt.Sprintf(" '%v' ", a.Name()) } - availStr := fmt.Sprintf("Avaiblable heuristcs are: [%v]", heuristicsStr) + availStr := fmt.Sprintf("Available heuristics are: [%v]", heuristicsStr) // We'll go through the config and make sure all the heuristics exists, // and that the sum of their weights is 1.0.