autopilot/agent: split opening logic into own method

This commit takes the logic after the autopilot agent has decided that it
needs to open more channels, and moves it into a new method openChan.
This commit is contained in:
Johan T. Halseth 2018-11-22 23:18:09 +01:00
parent 26810fe928
commit 89c3c5319f
No known key found for this signature in database
GPG Key ID: 15BAADA29DA20D26

@ -1,6 +1,7 @@
package autopilot package autopilot
import ( import (
"fmt"
"net" "net"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -485,75 +486,85 @@ func (a *Agent) controller() {
log.Infof("Triggering attachment directive dispatch, "+ log.Infof("Triggering attachment directive dispatch, "+
"total_funds=%v", a.totalBalance) "total_funds=%v", a.totalBalance)
// We're to attempt an attachment so we'll obtain the set of err := a.openChans(availableFunds, numChans, totalChans)
// nodes that we currently have channels with so we avoid
// duplicate edges.
connectedNodes := a.chanState.ConnectedNodes()
a.pendingMtx.Lock()
nodesToSkip := mergeNodeMaps(a.pendingOpens,
a.pendingConns, connectedNodes, a.failedNodes,
)
a.pendingMtx.Unlock()
// If we reach this point, then according to our heuristic we
// should modify our channel state to tend towards what it
// determines to the optimal state. So we'll call Select to get
// a fresh batch of attachment directives, passing in the
// amount of funds available for us to use.
chanCandidates, err := a.cfg.Heuristic.Select(
a.cfg.Self, a.cfg.Graph, availableFunds,
numChans, nodesToSkip,
)
if err != nil { if err != nil {
log.Errorf("Unable to select candidates for "+ log.Errorf("Unable to open channels: %v", err)
"attachment: %v", err)
continue
} }
if len(chanCandidates) == 0 {
log.Infof("No eligible candidates to connect to")
continue
}
log.Infof("Attempting to execute channel attachment "+
"directives: %v", spew.Sdump(chanCandidates))
// Before proceeding, check to see if we have any slots
// available to open channels. If there are any, we will attempt
// to dispatch the retrieved directives since we can't be
// certain which ones may actually succeed. If too many
// connections succeed, we will they will be ignored and made
// available to future heuristic selections.
a.pendingMtx.Lock()
if uint16(len(a.pendingOpens)) >= a.cfg.Constraints.MaxPendingOpens {
a.pendingMtx.Unlock()
log.Debugf("Reached cap of %v pending "+
"channel opens, will retry "+
"after success/failure",
a.cfg.Constraints.MaxPendingOpens)
continue
}
// For each recommended attachment directive, we'll launch a
// new goroutine to attempt to carry out the directive. If any
// of these succeed, then we'll receive a new state update,
// taking us back to the top of our controller loop.
for _, chanCandidate := range chanCandidates {
// Skip candidates which we are already trying
// to establish a connection with.
nodeID := chanCandidate.NodeID
if _, ok := a.pendingConns[nodeID]; ok {
continue
}
a.pendingConns[nodeID] = struct{}{}
a.wg.Add(1)
go a.executeDirective(chanCandidate)
}
a.pendingMtx.Unlock()
} }
} }
// openChans queries the agent's heuristic for a set of channel candidates, and
// attempts to open channels to them.
func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32,
totalChans []Channel) error {
// We're to attempt an attachment so we'll obtain the set of
// nodes that we currently have channels with so we avoid
// duplicate edges.
connectedNodes := a.chanState.ConnectedNodes()
a.pendingMtx.Lock()
nodesToSkip := mergeNodeMaps(a.pendingOpens,
a.pendingConns, connectedNodes, a.failedNodes,
)
a.pendingMtx.Unlock()
// If we reach this point, then according to our heuristic we
// should modify our channel state to tend towards what it
// determines to the optimal state. So we'll call Select to get
// a fresh batch of attachment directives, passing in the
// amount of funds available for us to use.
chanCandidates, err := a.cfg.Heuristic.Select(
a.cfg.Self, a.cfg.Graph, availableFunds,
numChans, nodesToSkip,
)
if err != nil {
return fmt.Errorf("Unable to select candidates for "+
"attachment: %v", err)
}
if len(chanCandidates) == 0 {
log.Infof("No eligible candidates to connect to")
return nil
}
log.Infof("Attempting to execute channel attachment "+
"directives: %v", spew.Sdump(chanCandidates))
// Before proceeding, check to see if we have any slots
// available to open channels. If there are any, we will attempt
// to dispatch the retrieved directives since we can't be
// certain which ones may actually succeed. If too many
// connections succeed, we will they will be ignored and made
// available to future heuristic selections.
a.pendingMtx.Lock()
defer a.pendingMtx.Unlock()
if uint16(len(a.pendingOpens)) >= a.cfg.Constraints.MaxPendingOpens {
log.Debugf("Reached cap of %v pending "+
"channel opens, will retry "+
"after success/failure",
a.cfg.Constraints.MaxPendingOpens)
return nil
}
// For each recommended attachment directive, we'll launch a
// new goroutine to attempt to carry out the directive. If any
// of these succeed, then we'll receive a new state update,
// taking us back to the top of our controller loop.
for _, chanCandidate := range chanCandidates {
// Skip candidates which we are already trying
// to establish a connection with.
nodeID := chanCandidate.NodeID
if _, ok := a.pendingConns[nodeID]; ok {
continue
}
a.pendingConns[nodeID] = struct{}{}
a.wg.Add(1)
go a.executeDirective(chanCandidate)
}
return nil
}
// executeDirective attempts to connect to the channel candidate specified by // executeDirective attempts to connect to the channel candidate specified by
// the given attachment directive, and open a channel of the given size. // the given attachment directive, and open a channel of the given size.
// //