From fb10175ea504db80915d18a891eb88ba60f0b48e Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Thu, 22 Nov 2018 23:18:09 +0100 Subject: [PATCH] autopilot/agent: add maps to agent struct This commit moves the maps used by the controller loop to the Agent struct, in preparation for breaking it up into smaller parts. --- autopilot/agent.go | 110 +++++++++++++++++++++++---------------------- 1 file changed, 56 insertions(+), 54 deletions(-) diff --git a/autopilot/agent.go b/autopilot/agent.go index a59602c0..10be87b9 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -143,6 +143,22 @@ type Agent struct { // when the agent receives external balance update signals. totalBalance btcutil.Amount + // failedNodes lists nodes that we've previously attempted to initiate + // channels with, but didn't succeed. + failedNodes map[NodeID]struct{} + + // pendingConns tracks the nodes that we are attempting to make + // connections to. This prevents us from making duplicate connection + // requests to the same node. + pendingConns map[NodeID]struct{} + + // pendingOpens tracks the channels that we've requested to be + // initiated, but haven't yet been confirmed as being fully opened. + // This state is required as otherwise, we may go over our allotted + // channel limit, or open multiple channels to the same node. + pendingOpens map[NodeID]Channel + pendingMtx sync.Mutex + quit chan struct{} wg sync.WaitGroup } @@ -161,6 +177,9 @@ func New(cfg Config, initialState []Channel) (*Agent, error) { nodeUpdates: make(chan *nodeUpdates, 1), chanOpenFailures: make(chan *chanOpenFailureUpdate, 1), pendingOpenUpdates: make(chan *chanPendingOpenUpdate, 1), + failedNodes: make(map[NodeID]struct{}), + pendingConns: make(map[NodeID]struct{}), + pendingOpens: make(map[NodeID]Channel), } for _, c := range initialState { @@ -357,23 +376,6 @@ func (a *Agent) controller() { // TODO(roasbeef): do we in fact need to maintain order? // * use sync.Cond if so - - // failedNodes lists nodes that we've previously attempted to initiate - // channels with, but didn't succeed. - failedNodes := make(map[NodeID]struct{}) - - // pendingConns tracks the nodes that we are attempting to make - // connections to. This prevents us from making duplicate connection - // requests to the same node. - pendingConns := make(map[NodeID]struct{}) - - // pendingOpens tracks the channels that we've requested to be - // initiated, but haven't yet been confirmed as being fully opened. - // This state is required as otherwise, we may go over our allotted - // channel limit, or open multiple channels to the same node. - pendingOpens := make(map[NodeID]Channel) - var pendingMtx sync.Mutex - updateBalance := func() { newBalance, err := a.cfg.WalletBalance() if err != nil { @@ -405,9 +407,9 @@ func (a *Agent) controller() { newChan := update.newChan a.chanState[newChan.ChanID] = newChan - pendingMtx.Lock() - delete(pendingOpens, newChan.Node) - pendingMtx.Unlock() + a.pendingMtx.Lock() + delete(a.pendingOpens, newChan.Node) + a.pendingMtx.Unlock() updateBalance() // A channel has been closed, this may free up an @@ -458,17 +460,17 @@ func (a *Agent) controller() { return } - pendingMtx.Lock() - log.Debugf("Pending channels: %v", spew.Sdump(pendingOpens)) - pendingMtx.Unlock() + a.pendingMtx.Lock() + log.Debugf("Pending channels: %v", spew.Sdump(a.pendingOpens)) + a.pendingMtx.Unlock() // With all the updates applied, we'll obtain a set of the // current active channels (confirmed channels), and also // factor in our set of unconfirmed channels. confirmedChans := a.chanState - pendingMtx.Lock() - totalChans := mergeChanState(pendingOpens, confirmedChans) - pendingMtx.Unlock() + a.pendingMtx.Lock() + totalChans := mergeChanState(a.pendingOpens, confirmedChans) + a.pendingMtx.Unlock() // Now that we've updated our internal state, we'll consult our // channel attachment heuristic to determine if we should open @@ -487,11 +489,11 @@ func (a *Agent) controller() { // nodes that we currently have channels with so we avoid // duplicate edges. connectedNodes := a.chanState.ConnectedNodes() - pendingMtx.Lock() - nodesToSkip := mergeNodeMaps(pendingOpens, - pendingConns, connectedNodes, failedNodes, + a.pendingMtx.Lock() + nodesToSkip := mergeNodeMaps(a.pendingOpens, + a.pendingConns, connectedNodes, a.failedNodes, ) - pendingMtx.Unlock() + a.pendingMtx.Unlock() // If we reach this point, then according to our heuristic we // should modify our channel state to tend towards what it @@ -522,9 +524,9 @@ func (a *Agent) controller() { // certain which ones may actually succeed. If too many // connections succeed, we will they will be ignored and made // available to future heuristic selections. - pendingMtx.Lock() - if uint16(len(pendingOpens)) >= a.cfg.Constraints.MaxPendingOpens { - pendingMtx.Unlock() + a.pendingMtx.Lock() + if uint16(len(a.pendingOpens)) >= a.cfg.Constraints.MaxPendingOpens { + a.pendingMtx.Unlock() log.Debugf("Reached cap of %v pending "+ "channel opens, will retry "+ "after success/failure", @@ -540,10 +542,10 @@ func (a *Agent) controller() { // Skip candidates which we are already trying // to establish a connection with. nodeID := chanCandidate.NodeID - if _, ok := pendingConns[nodeID]; ok { + if _, ok := a.pendingConns[nodeID]; ok { continue } - pendingConns[nodeID] = struct{}{} + a.pendingConns[nodeID] = struct{}{} a.wg.Add(1) go func(directive AttachmentDirective) { @@ -567,10 +569,10 @@ func (a *Agent) controller() { // don't attempt to connect to them // again. nodeID := NewNodeID(pub) - pendingMtx.Lock() - delete(pendingConns, nodeID) - failedNodes[nodeID] = struct{}{} - pendingMtx.Unlock() + a.pendingMtx.Lock() + delete(a.pendingConns, nodeID) + a.failedNodes[nodeID] = struct{}{} + a.pendingMtx.Unlock() // Finally, we'll trigger the agent to // select new peers to connect to. @@ -586,8 +588,8 @@ func (a *Agent) controller() { // directives were spawned but fewer slots were // available, and other successful attempts // finished first. - pendingMtx.Lock() - if uint16(len(pendingOpens)) >= + a.pendingMtx.Lock() + if uint16(len(a.pendingOpens)) >= a.cfg.Constraints.MaxPendingOpens { // Since we've reached our max number of // pending opens, we'll disconnect this @@ -600,10 +602,10 @@ func (a *Agent) controller() { // connecting, we won't add this // peer to the failed nodes map, // but we will remove it from - // pendingConns so that it can + // a.pendingConns so that it can // be retried in the future. - delete(pendingConns, nodeID) - pendingMtx.Unlock() + delete(a.pendingConns, nodeID) + a.pendingMtx.Unlock() return } @@ -622,8 +624,8 @@ func (a *Agent) controller() { // remove this node from our pending // conns map, permitting subsequent // connection attempts. - delete(pendingConns, nodeID) - pendingMtx.Unlock() + delete(a.pendingConns, nodeID) + a.pendingMtx.Unlock() return } @@ -633,12 +635,12 @@ func (a *Agent) controller() { // peers if the connection attempt happens to // take too long. nodeID := directive.NodeID - delete(pendingConns, nodeID) - pendingOpens[nodeID] = Channel{ + delete(a.pendingConns, nodeID) + a.pendingOpens[nodeID] = Channel{ Capacity: directive.ChanAmt, Node: nodeID, } - pendingMtx.Unlock() + a.pendingMtx.Unlock() // We can then begin the funding workflow with // this peer. @@ -656,10 +658,10 @@ func (a *Agent) controller() { // opens and mark them as failed so we // don't attempt to open a channel to // them again. - pendingMtx.Lock() - delete(pendingOpens, nodeID) - failedNodes[nodeID] = struct{}{} - pendingMtx.Unlock() + a.pendingMtx.Lock() + delete(a.pendingOpens, nodeID) + a.failedNodes[nodeID] = struct{}{} + a.pendingMtx.Unlock() // Trigger the agent to re-evaluate // everything and possibly retry with a @@ -690,7 +692,7 @@ func (a *Agent) controller() { a.OnChannelPendingOpen() }(chanCandidate) } - pendingMtx.Unlock() + a.pendingMtx.Unlock() } }