autopilot/agent: add maps to agent struct

This commit moves the maps used by the controller loop to the Agent
struct, in preparation for breaking it up into smaller parts.
This commit is contained in:
Johan T. Halseth 2018-11-22 23:18:09 +01:00
parent 86e6d230f2
commit fb10175ea5
No known key found for this signature in database
GPG Key ID: 15BAADA29DA20D26

@ -143,6 +143,22 @@ type Agent struct {
// when the agent receives external balance update signals.
totalBalance btcutil.Amount
// failedNodes lists nodes that we've previously attempted to initiate
// channels with, but didn't succeed.
failedNodes map[NodeID]struct{}
// pendingConns tracks the nodes that we are attempting to make
// connections to. This prevents us from making duplicate connection
// requests to the same node.
pendingConns map[NodeID]struct{}
// pendingOpens tracks the channels that we've requested to be
// initiated, but haven't yet been confirmed as being fully opened.
// This state is required as otherwise, we may go over our allotted
// channel limit, or open multiple channels to the same node.
pendingOpens map[NodeID]Channel
pendingMtx sync.Mutex
quit chan struct{}
wg sync.WaitGroup
}
@ -161,6 +177,9 @@ func New(cfg Config, initialState []Channel) (*Agent, error) {
nodeUpdates: make(chan *nodeUpdates, 1),
chanOpenFailures: make(chan *chanOpenFailureUpdate, 1),
pendingOpenUpdates: make(chan *chanPendingOpenUpdate, 1),
failedNodes: make(map[NodeID]struct{}),
pendingConns: make(map[NodeID]struct{}),
pendingOpens: make(map[NodeID]Channel),
}
for _, c := range initialState {
@ -357,23 +376,6 @@ func (a *Agent) controller() {
// TODO(roasbeef): do we in fact need to maintain order?
// * use sync.Cond if so
// failedNodes lists nodes that we've previously attempted to initiate
// channels with, but didn't succeed.
failedNodes := make(map[NodeID]struct{})
// pendingConns tracks the nodes that we are attempting to make
// connections to. This prevents us from making duplicate connection
// requests to the same node.
pendingConns := make(map[NodeID]struct{})
// pendingOpens tracks the channels that we've requested to be
// initiated, but haven't yet been confirmed as being fully opened.
// This state is required as otherwise, we may go over our allotted
// channel limit, or open multiple channels to the same node.
pendingOpens := make(map[NodeID]Channel)
var pendingMtx sync.Mutex
updateBalance := func() {
newBalance, err := a.cfg.WalletBalance()
if err != nil {
@ -405,9 +407,9 @@ func (a *Agent) controller() {
newChan := update.newChan
a.chanState[newChan.ChanID] = newChan
pendingMtx.Lock()
delete(pendingOpens, newChan.Node)
pendingMtx.Unlock()
a.pendingMtx.Lock()
delete(a.pendingOpens, newChan.Node)
a.pendingMtx.Unlock()
updateBalance()
// A channel has been closed, this may free up an
@ -458,17 +460,17 @@ func (a *Agent) controller() {
return
}
pendingMtx.Lock()
log.Debugf("Pending channels: %v", spew.Sdump(pendingOpens))
pendingMtx.Unlock()
a.pendingMtx.Lock()
log.Debugf("Pending channels: %v", spew.Sdump(a.pendingOpens))
a.pendingMtx.Unlock()
// With all the updates applied, we'll obtain a set of the
// current active channels (confirmed channels), and also
// factor in our set of unconfirmed channels.
confirmedChans := a.chanState
pendingMtx.Lock()
totalChans := mergeChanState(pendingOpens, confirmedChans)
pendingMtx.Unlock()
a.pendingMtx.Lock()
totalChans := mergeChanState(a.pendingOpens, confirmedChans)
a.pendingMtx.Unlock()
// Now that we've updated our internal state, we'll consult our
// channel attachment heuristic to determine if we should open
@ -487,11 +489,11 @@ func (a *Agent) controller() {
// nodes that we currently have channels with so we avoid
// duplicate edges.
connectedNodes := a.chanState.ConnectedNodes()
pendingMtx.Lock()
nodesToSkip := mergeNodeMaps(pendingOpens,
pendingConns, connectedNodes, failedNodes,
a.pendingMtx.Lock()
nodesToSkip := mergeNodeMaps(a.pendingOpens,
a.pendingConns, connectedNodes, a.failedNodes,
)
pendingMtx.Unlock()
a.pendingMtx.Unlock()
// If we reach this point, then according to our heuristic we
// should modify our channel state to tend towards what it
@ -522,9 +524,9 @@ func (a *Agent) controller() {
// certain which ones may actually succeed. If too many
// connections succeed, we will they will be ignored and made
// available to future heuristic selections.
pendingMtx.Lock()
if uint16(len(pendingOpens)) >= a.cfg.Constraints.MaxPendingOpens {
pendingMtx.Unlock()
a.pendingMtx.Lock()
if uint16(len(a.pendingOpens)) >= a.cfg.Constraints.MaxPendingOpens {
a.pendingMtx.Unlock()
log.Debugf("Reached cap of %v pending "+
"channel opens, will retry "+
"after success/failure",
@ -540,10 +542,10 @@ func (a *Agent) controller() {
// Skip candidates which we are already trying
// to establish a connection with.
nodeID := chanCandidate.NodeID
if _, ok := pendingConns[nodeID]; ok {
if _, ok := a.pendingConns[nodeID]; ok {
continue
}
pendingConns[nodeID] = struct{}{}
a.pendingConns[nodeID] = struct{}{}
a.wg.Add(1)
go func(directive AttachmentDirective) {
@ -567,10 +569,10 @@ func (a *Agent) controller() {
// don't attempt to connect to them
// again.
nodeID := NewNodeID(pub)
pendingMtx.Lock()
delete(pendingConns, nodeID)
failedNodes[nodeID] = struct{}{}
pendingMtx.Unlock()
a.pendingMtx.Lock()
delete(a.pendingConns, nodeID)
a.failedNodes[nodeID] = struct{}{}
a.pendingMtx.Unlock()
// Finally, we'll trigger the agent to
// select new peers to connect to.
@ -586,8 +588,8 @@ func (a *Agent) controller() {
// directives were spawned but fewer slots were
// available, and other successful attempts
// finished first.
pendingMtx.Lock()
if uint16(len(pendingOpens)) >=
a.pendingMtx.Lock()
if uint16(len(a.pendingOpens)) >=
a.cfg.Constraints.MaxPendingOpens {
// Since we've reached our max number of
// pending opens, we'll disconnect this
@ -600,10 +602,10 @@ func (a *Agent) controller() {
// connecting, we won't add this
// peer to the failed nodes map,
// but we will remove it from
// pendingConns so that it can
// a.pendingConns so that it can
// be retried in the future.
delete(pendingConns, nodeID)
pendingMtx.Unlock()
delete(a.pendingConns, nodeID)
a.pendingMtx.Unlock()
return
}
@ -622,8 +624,8 @@ func (a *Agent) controller() {
// remove this node from our pending
// conns map, permitting subsequent
// connection attempts.
delete(pendingConns, nodeID)
pendingMtx.Unlock()
delete(a.pendingConns, nodeID)
a.pendingMtx.Unlock()
return
}
@ -633,12 +635,12 @@ func (a *Agent) controller() {
// peers if the connection attempt happens to
// take too long.
nodeID := directive.NodeID
delete(pendingConns, nodeID)
pendingOpens[nodeID] = Channel{
delete(a.pendingConns, nodeID)
a.pendingOpens[nodeID] = Channel{
Capacity: directive.ChanAmt,
Node: nodeID,
}
pendingMtx.Unlock()
a.pendingMtx.Unlock()
// We can then begin the funding workflow with
// this peer.
@ -656,10 +658,10 @@ func (a *Agent) controller() {
// opens and mark them as failed so we
// don't attempt to open a channel to
// them again.
pendingMtx.Lock()
delete(pendingOpens, nodeID)
failedNodes[nodeID] = struct{}{}
pendingMtx.Unlock()
a.pendingMtx.Lock()
delete(a.pendingOpens, nodeID)
a.failedNodes[nodeID] = struct{}{}
a.pendingMtx.Unlock()
// Trigger the agent to re-evaluate
// everything and possibly retry with a
@ -690,7 +692,7 @@ func (a *Agent) controller() {
a.OnChannelPendingOpen()
}(chanCandidate)
}
pendingMtx.Unlock()
a.pendingMtx.Unlock()
}
}