From 4599b0eac358428edab58dab26f5bebf4da3ab78 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Thu, 30 Aug 2018 20:33:57 -0700 Subject: [PATCH] autopilot/agent: track pending connections This commit modifies the autopilot agent to track all pending connection requests, and forgo further attempts if a connection is already present. Previously, the agent would try and establish hundreds of requests to a node, especially if the connections were timing out and not returning. This resulted in an OOM OMM when cranking up maxchannels to 200, since there would be close to 10k pending connections before the program was terminated. The issue was compounded by periodic batch timeouts, causing autopilot to try and process thousands of triggers for failing connections to the same peer. With these fixes, autopilot will skip nodes that we are trying to connect to during heuristic selection. The CPU and memory utilization have been significantly reduced as a result. --- autopilot/agent.go | 122 ++++++++++++++++++++++++++++++--------------- 1 file changed, 81 insertions(+), 41 deletions(-) diff --git a/autopilot/agent.go b/autopilot/agent.go index 8cd93be6..e91a9fc6 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -301,22 +301,26 @@ func (a *Agent) OnChannelClose(closedChans ...lnwire.ShortChannelID) { } // mergeNodeMaps merges the Agent's set of nodes that it already has active -// channels open to, with the set of nodes that are pending new channels. This -// ensures that the Agent doesn't attempt to open any "duplicate" channels to -// the same node. -func mergeNodeMaps(a map[NodeID]struct{}, b map[NodeID]struct{}, - c map[NodeID]Channel) map[NodeID]struct{} { +// channels open to, with the other sets of nodes that should be removed from +// consideration during heuristic selection. This ensures that the Agent doesn't +// attempt to open any "duplicate" channels to the same node. +func mergeNodeMaps(c map[NodeID]Channel, + skips ...map[NodeID]struct{}) map[NodeID]struct{} { - res := make(map[NodeID]struct{}, len(a)+len(b)+len(c)) - for nodeID := range a { - res[nodeID] = struct{}{} - } - for nodeID := range b { - res[nodeID] = struct{}{} + numNodes := len(c) + for _, skip := range skips { + numNodes += len(skip) } + + res := make(map[NodeID]struct{}, len(c)+numNodes) for nodeID := range c { res[nodeID] = struct{}{} } + for _, skip := range skips { + for nodeID := range skip { + res[nodeID] = struct{}{} + } + } return res } @@ -360,6 +364,11 @@ func (a *Agent) controller() { // channels with, but didn't succeed. failedNodes := make(map[NodeID]struct{}) + // pendingConns tracks the nodes that we are attempting to make + // connections to. This prevents us from making duplicate connection + // requests to the same node. + pendingConns := make(map[NodeID]struct{}) + // pendingOpens tracks the channels that we've requested to be // initiated, but haven't yet been confirmed as being fully opened. // This state is required as otherwise, we may go over our allotted @@ -481,7 +490,9 @@ func (a *Agent) controller() { // duplicate edges. connectedNodes := a.chanState.ConnectedNodes() pendingMtx.Lock() - nodesToSkip := mergeNodeMaps(connectedNodes, failedNodes, pendingOpens) + nodesToSkip := mergeNodeMaps(pendingOpens, + pendingConns, connectedNodes, failedNodes, + ) pendingMtx.Unlock() // If we reach this point, then according to our heuristic we @@ -507,32 +518,40 @@ func (a *Agent) controller() { log.Infof("Attempting to execute channel attachment "+ "directives: %v", spew.Sdump(chanCandidates)) + // Before proceeding, check to see if we have any slots + // available to open channels. If there are any, we will attempt + // to dispatch the retrieved directives since we can't be + // certain which ones may actually succeed. If too many + // connections succeed, we will they will be ignored and made + // available to future heuristic selections. + pendingMtx.Lock() + if uint16(len(pendingOpens)) >= a.cfg.MaxPendingOpens { + pendingMtx.Unlock() + log.Debugf("Reached cap of %v pending "+ + "channel opens, will retry "+ + "after success/failure", + a.cfg.MaxPendingOpens) + continue + } + // For each recommended attachment directive, we'll launch a // new goroutine to attempt to carry out the directive. If any // of these succeed, then we'll receive a new state update, // taking us back to the top of our controller loop. - pendingMtx.Lock() for _, chanCandidate := range chanCandidates { - // Before we proceed, we'll check to see if this - // attempt would take us past the total number of - // allowed pending opens. If so, then we'll skip this - // round and wait for an attempt to either fail or - // succeed. - if uint16(len(pendingOpens))+1 > - a.cfg.MaxPendingOpens { - - log.Debugf("Reached cap of %v pending "+ - "channel opens, will retry "+ - "after success/failure", - a.cfg.MaxPendingOpens) + // Skip candidates which we are already trying + // to establish a connection with. + nodeID := chanCandidate.NodeID + if _, ok := pendingConns[nodeID]; ok { continue } + pendingConns[nodeID] = struct{}{} go func(directive AttachmentDirective) { // We'll start out by attempting to connect to // the peer in order to begin the funding // workflow. - pub := directive.PeerKey + pub := directive.NodeKey alreadyConnected, err := a.cfg.ConnectToPeer( pub, directive.Addrs, ) @@ -548,6 +567,7 @@ func (a *Agent) controller() { // again. nodeID := NewNodeID(pub) pendingMtx.Lock() + delete(pendingConns, nodeID) failedNodes[nodeID] = struct{}{} pendingMtx.Unlock() @@ -558,24 +578,31 @@ func (a *Agent) controller() { return } - // If we were succesful, we'll track this peer - // in our set of pending opens. We do this here - // to ensure we don't stall on selecting new - // peers if the connection attempt happens to - // take too long. + // The connection was successful, though before + // progressing we must check that we have not + // already met our quota for max pending open + // channels. This can happen if multiple + // directives were spawned but fewer slots were + // available, and other successful attempts + // finished first. pendingMtx.Lock() - if uint16(len(pendingOpens))+1 > + if uint16(len(pendingOpens)) >= a.cfg.MaxPendingOpens { - - pendingMtx.Unlock() - - // Since we've reached our max number - // of pending opens, we'll disconnect - // this peer and exit. However, if we - // were previously connected to them, - // then we'll make sure to maintain the + // Since we've reached our max number of + // pending opens, we'll disconnect this + // peer and exit. However, if we were + // previously connected to them, then + // we'll make sure to maintain the // connection alive. if alreadyConnected { + // Since we succeeded in + // connecting, we won't add this + // peer to the failed nodes map, + // but we will remove it from + // pendingConns so that it can + // be retried in the future. + delete(pendingConns, nodeID) + pendingMtx.Unlock() return } @@ -589,10 +616,23 @@ func (a *Agent) controller() { pub.SerializeCompressed(), err) } + + // Now that we have disconnected, we can + // remove this node from our pending + // conns map, permitting subsequent + // connection attempts. + delete(pendingConns, nodeID) + pendingMtx.Unlock() return } - nodeID := NewNodeID(directive.PeerKey) + // If we were successful, we'll track this peer + // in our set of pending opens. We do this here + // to ensure we don't stall on selecting new + // peers if the connection attempt happens to + // take too long. + nodeID := directive.NodeID + delete(pendingConns, nodeID) pendingOpens[nodeID] = Channel{ Capacity: directive.ChanAmt, Node: nodeID,