diff --git a/autopilot/agent.go b/autopilot/agent.go index 42370fd0..e984c8fd 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -1,6 +1,7 @@ package autopilot import ( + "fmt" "net" "sync" "sync/atomic" @@ -485,75 +486,85 @@ func (a *Agent) controller() { log.Infof("Triggering attachment directive dispatch, "+ "total_funds=%v", a.totalBalance) - // We're to attempt an attachment so we'll obtain the set of - // nodes that we currently have channels with so we avoid - // duplicate edges. - connectedNodes := a.chanState.ConnectedNodes() - a.pendingMtx.Lock() - nodesToSkip := mergeNodeMaps(a.pendingOpens, - a.pendingConns, connectedNodes, a.failedNodes, - ) - a.pendingMtx.Unlock() - - // If we reach this point, then according to our heuristic we - // should modify our channel state to tend towards what it - // determines to the optimal state. So we'll call Select to get - // a fresh batch of attachment directives, passing in the - // amount of funds available for us to use. - chanCandidates, err := a.cfg.Heuristic.Select( - a.cfg.Self, a.cfg.Graph, availableFunds, - numChans, nodesToSkip, - ) + err := a.openChans(availableFunds, numChans, totalChans) if err != nil { - log.Errorf("Unable to select candidates for "+ - "attachment: %v", err) - continue + log.Errorf("Unable to open channels: %v", err) } - - if len(chanCandidates) == 0 { - log.Infof("No eligible candidates to connect to") - continue - } - - log.Infof("Attempting to execute channel attachment "+ - "directives: %v", spew.Sdump(chanCandidates)) - - // Before proceeding, check to see if we have any slots - // available to open channels. If there are any, we will attempt - // to dispatch the retrieved directives since we can't be - // certain which ones may actually succeed. If too many - // connections succeed, we will they will be ignored and made - // available to future heuristic selections. - a.pendingMtx.Lock() - if uint16(len(a.pendingOpens)) >= a.cfg.Constraints.MaxPendingOpens { - a.pendingMtx.Unlock() - log.Debugf("Reached cap of %v pending "+ - "channel opens, will retry "+ - "after success/failure", - a.cfg.Constraints.MaxPendingOpens) - continue - } - - // For each recommended attachment directive, we'll launch a - // new goroutine to attempt to carry out the directive. If any - // of these succeed, then we'll receive a new state update, - // taking us back to the top of our controller loop. - for _, chanCandidate := range chanCandidates { - // Skip candidates which we are already trying - // to establish a connection with. - nodeID := chanCandidate.NodeID - if _, ok := a.pendingConns[nodeID]; ok { - continue - } - a.pendingConns[nodeID] = struct{}{} - - a.wg.Add(1) - go a.executeDirective(chanCandidate) - } - a.pendingMtx.Unlock() } } +// openChans queries the agent's heuristic for a set of channel candidates, and +// attempts to open channels to them. +func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32, + totalChans []Channel) error { + + // We're to attempt an attachment so we'll obtain the set of + // nodes that we currently have channels with so we avoid + // duplicate edges. + connectedNodes := a.chanState.ConnectedNodes() + a.pendingMtx.Lock() + nodesToSkip := mergeNodeMaps(a.pendingOpens, + a.pendingConns, connectedNodes, a.failedNodes, + ) + a.pendingMtx.Unlock() + + // If we reach this point, then according to our heuristic we + // should modify our channel state to tend towards what it + // determines to the optimal state. So we'll call Select to get + // a fresh batch of attachment directives, passing in the + // amount of funds available for us to use. + chanCandidates, err := a.cfg.Heuristic.Select( + a.cfg.Self, a.cfg.Graph, availableFunds, + numChans, nodesToSkip, + ) + if err != nil { + return fmt.Errorf("Unable to select candidates for "+ + "attachment: %v", err) + } + + if len(chanCandidates) == 0 { + log.Infof("No eligible candidates to connect to") + return nil + } + + log.Infof("Attempting to execute channel attachment "+ + "directives: %v", spew.Sdump(chanCandidates)) + + // Before proceeding, check to see if we have any slots + // available to open channels. If there are any, we will attempt + // to dispatch the retrieved directives since we can't be + // certain which ones may actually succeed. If too many + // connections succeed, we will they will be ignored and made + // available to future heuristic selections. + a.pendingMtx.Lock() + defer a.pendingMtx.Unlock() + if uint16(len(a.pendingOpens)) >= a.cfg.Constraints.MaxPendingOpens { + log.Debugf("Reached cap of %v pending "+ + "channel opens, will retry "+ + "after success/failure", + a.cfg.Constraints.MaxPendingOpens) + return nil + } + + // For each recommended attachment directive, we'll launch a + // new goroutine to attempt to carry out the directive. If any + // of these succeed, then we'll receive a new state update, + // taking us back to the top of our controller loop. + for _, chanCandidate := range chanCandidates { + // Skip candidates which we are already trying + // to establish a connection with. + nodeID := chanCandidate.NodeID + if _, ok := a.pendingConns[nodeID]; ok { + continue + } + a.pendingConns[nodeID] = struct{}{} + + a.wg.Add(1) + go a.executeDirective(chanCandidate) + } + return nil +} + // executeDirective attempts to connect to the channel candidate specified by // the given attachment directive, and open a channel of the given size. //