Merge pull request #3520 from halseth/autopilot-heuristic-update-trigger

[autopilot] Trigger channel opening on external score update
This commit is contained in:
Olaoluwa Osuntokun 2019-11-01 17:24:22 -07:00 committed by GitHub
commit bef1cd01c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 110 additions and 2 deletions

@ -141,6 +141,10 @@ type Agent struct {
// time. // time.
chanOpenFailures chan *chanOpenFailureUpdate chanOpenFailures chan *chanOpenFailureUpdate
// heuristicUpdates is a channel where updates from active heurstics
// will be sent.
heuristicUpdates chan *heuristicUpdate
// totalBalance is the total number of satoshis the backing wallet is // totalBalance is the total number of satoshis the backing wallet is
// known to control at any given instance. This value will be updated // known to control at any given instance. This value will be updated
// when the agent receives external balance update signals. // when the agent receives external balance update signals.
@ -179,6 +183,7 @@ func New(cfg Config, initialState []Channel) (*Agent, error) {
balanceUpdates: make(chan *balanceUpdate, 1), balanceUpdates: make(chan *balanceUpdate, 1),
nodeUpdates: make(chan *nodeUpdates, 1), nodeUpdates: make(chan *nodeUpdates, 1),
chanOpenFailures: make(chan *chanOpenFailureUpdate, 1), chanOpenFailures: make(chan *chanOpenFailureUpdate, 1),
heuristicUpdates: make(chan *heuristicUpdate, 1),
pendingOpenUpdates: make(chan *chanPendingOpenUpdate, 1), pendingOpenUpdates: make(chan *chanPendingOpenUpdate, 1),
failedNodes: make(map[NodeID]struct{}), failedNodes: make(map[NodeID]struct{}),
pendingConns: make(map[NodeID]struct{}), pendingConns: make(map[NodeID]struct{}),
@ -256,6 +261,13 @@ type chanPendingOpenUpdate struct{}
// a previous channel open failed, and that it might be possible to try again. // a previous channel open failed, and that it might be possible to try again.
type chanOpenFailureUpdate struct{} type chanOpenFailureUpdate struct{}
// heuristicUpdate is an update sent when one of the autopilot heuristics has
// changed, and prompts the agent to make a new attempt at opening more
// channels.
type heuristicUpdate struct {
heuristic AttachmentHeuristic
}
// chanCloseUpdate is a type of external state update that indicates that the // chanCloseUpdate is a type of external state update that indicates that the
// backing Lightning Node has closed a previously open channel. // backing Lightning Node has closed a previously open channel.
type chanCloseUpdate struct { type chanCloseUpdate struct {
@ -329,6 +341,17 @@ func (a *Agent) OnChannelClose(closedChans ...lnwire.ShortChannelID) {
}() }()
} }
// OnHeuristicUpdate is a method called when a heuristic has been updated, to
// trigger the agent to do a new state assessment.
func (a *Agent) OnHeuristicUpdate(h AttachmentHeuristic) {
select {
case a.heuristicUpdates <- &heuristicUpdate{
heuristic: h,
}:
default:
}
}
// mergeNodeMaps merges the Agent's set of nodes that it already has active // mergeNodeMaps merges the Agent's set of nodes that it already has active
// channels open to, with the other sets of nodes that should be removed from // channels open to, with the other sets of nodes that should be removed from
// consideration during heuristic selection. This ensures that the Agent doesn't // consideration during heuristic selection. This ensures that the Agent doesn't
@ -470,6 +493,12 @@ func (a *Agent) controller() {
log.Debugf("Node updates received, assessing " + log.Debugf("Node updates received, assessing " +
"need for more channels") "need for more channels")
// Any of the deployed heuristics has been updated, check
// whether we have new channel candidates available.
case upd := <-a.heuristicUpdates:
log.Debugf("Heuristic %v updated, assessing need for "+
"more channels", upd.heuristic.Name())
// The agent has been signalled to exit, so we'll bail out // The agent has been signalled to exit, so we'll bail out
// immediately. // immediately.
case <-a.quit: case <-a.quit:
@ -541,10 +570,28 @@ func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32,
connectedNodes := a.chanState.ConnectedNodes() connectedNodes := a.chanState.ConnectedNodes()
a.chanStateMtx.Unlock() a.chanStateMtx.Unlock()
for nID := range connectedNodes {
log.Tracef("Skipping node %x with open channel", nID[:])
}
a.pendingMtx.Lock() a.pendingMtx.Lock()
for nID := range a.pendingOpens {
log.Tracef("Skipping node %x with pending channel open", nID[:])
}
for nID := range a.pendingConns {
log.Tracef("Skipping node %x with pending connection", nID[:])
}
for nID := range a.failedNodes {
log.Tracef("Skipping failed node %v", nID[:])
}
nodesToSkip := mergeNodeMaps(a.pendingOpens, nodesToSkip := mergeNodeMaps(a.pendingOpens,
a.pendingConns, connectedNodes, a.failedNodes, a.pendingConns, connectedNodes, a.failedNodes,
) )
a.pendingMtx.Unlock() a.pendingMtx.Unlock()
// Gather the set of all nodes in the graph, except those we // Gather the set of all nodes in the graph, except those we

@ -330,6 +330,54 @@ func TestAgentChannelOpenSignal(t *testing.T) {
} }
} }
// TestAgentHeuristicUpdateSignal tests that upon notification about a
// heuristic update, the agent reconsults the heuristic.
func TestAgentHeuristicUpdateSignal(t *testing.T) {
t.Parallel()
testCtx, cleanup := setup(t, nil)
defer cleanup()
// We'll send an initial "no" response to advance the agent past its
// initial check.
respondMoreChans(t, testCtx, moreChansResp{0, 0})
// Next we'll signal that one of the heuristcs have been updated.
testCtx.agent.OnHeuristicUpdate(testCtx.heuristic)
// The update should trigger the agent to ask for a channel budget.so
// we'll respond that there is a budget for opening 1 more channel.
respondMoreChans(t, testCtx,
moreChansResp{
numMore: 1,
amt: 1 * btcutil.SatoshiPerBitcoin,
},
)
// At this point, the agent should now be querying the heuristic for
// scores. We'll respond.
pub, err := testCtx.graph.addRandNode()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
nodeID := NewNodeID(pub)
scores := map[NodeID]*NodeScore{
nodeID: {
NodeID: nodeID,
Score: 0.5,
},
}
respondNodeScores(t, testCtx, scores)
// Finally, this should result in the agent opening a channel.
chanController := testCtx.chanController.(*mockChanController)
select {
case <-chanController.openChanSignals:
case <-time.After(time.Second * 10):
t.Fatalf("channel not opened in time")
}
}
// A mockFailingChanController always fails to open a channel. // A mockFailingChanController always fails to open a channel.
type mockFailingChanController struct { type mockFailingChanController struct {
} }

@ -319,8 +319,12 @@ func (m *Manager) queryHeuristics(nodes map[NodeID]struct{}, localState bool) (
// We'll start by getting the scores from each available sub-heuristic, // We'll start by getting the scores from each available sub-heuristic,
// in addition the current agent heuristic. // in addition the current agent heuristic.
var heuristics []AttachmentHeuristic
heuristics = append(heuristics, availableHeuristics...)
heuristics = append(heuristics, m.cfg.PilotCfg.Heuristic)
report := make(HeuristicScores) report := make(HeuristicScores)
for _, h := range append(availableHeuristics, m.cfg.PilotCfg.Heuristic) { for _, h := range heuristics {
name := h.Name() name := h.Name()
// If the agent heuristic is among the simple heuristics it // If the agent heuristic is among the simple heuristics it
@ -354,6 +358,9 @@ func (m *Manager) queryHeuristics(nodes map[NodeID]struct{}, localState bool) (
// SetNodeScores is used to set the scores of the given heuristic, if it is // SetNodeScores is used to set the scores of the given heuristic, if it is
// active, and ScoreSettable. // active, and ScoreSettable.
func (m *Manager) SetNodeScores(name string, scores map[NodeID]float64) error { func (m *Manager) SetNodeScores(name string, scores map[NodeID]float64) error {
m.Lock()
defer m.Unlock()
// It must be ScoreSettable to be available for external // It must be ScoreSettable to be available for external
// scores. // scores.
s, ok := m.cfg.PilotCfg.Heuristic.(ScoreSettable) s, ok := m.cfg.PilotCfg.Heuristic.(ScoreSettable)
@ -372,5 +379,11 @@ func (m *Manager) SetNodeScores(name string, scores map[NodeID]float64) error {
return fmt.Errorf("heuristic with name %v not found", name) return fmt.Errorf("heuristic with name %v not found", name)
} }
// If the autopilot agent is active, notify about the updated
// heuristic.
if m.pilot != nil {
m.pilot.OnHeuristicUpdate(m.cfg.PilotCfg.Heuristic)
}
return nil return nil
} }

@ -31,7 +31,7 @@ func validateAtplCfg(cfg *autoPilotConfig) ([]*autopilot.WeightedHeuristic,
for _, a := range autopilot.AvailableHeuristics { for _, a := range autopilot.AvailableHeuristics {
heuristicsStr += fmt.Sprintf(" '%v' ", a.Name()) heuristicsStr += fmt.Sprintf(" '%v' ", a.Name())
} }
availStr := fmt.Sprintf("Avaiblable heuristcs are: [%v]", heuristicsStr) availStr := fmt.Sprintf("Available heuristics are: [%v]", heuristicsStr)
// We'll go through the config and make sure all the heuristics exists, // We'll go through the config and make sure all the heuristics exists,
// and that the sum of their weights is 1.0. // and that the sum of their weights is 1.0.