diff --git a/autopilot/agent.go b/autopilot/agent.go index 8cd93be6..e91a9fc6 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -301,22 +301,26 @@ func (a *Agent) OnChannelClose(closedChans ...lnwire.ShortChannelID) { } // mergeNodeMaps merges the Agent's set of nodes that it already has active -// channels open to, with the set of nodes that are pending new channels. This -// ensures that the Agent doesn't attempt to open any "duplicate" channels to -// the same node. -func mergeNodeMaps(a map[NodeID]struct{}, b map[NodeID]struct{}, - c map[NodeID]Channel) map[NodeID]struct{} { +// channels open to, with the other sets of nodes that should be removed from +// consideration during heuristic selection. This ensures that the Agent doesn't +// attempt to open any "duplicate" channels to the same node. +func mergeNodeMaps(c map[NodeID]Channel, + skips ...map[NodeID]struct{}) map[NodeID]struct{} { - res := make(map[NodeID]struct{}, len(a)+len(b)+len(c)) - for nodeID := range a { - res[nodeID] = struct{}{} - } - for nodeID := range b { - res[nodeID] = struct{}{} + numNodes := len(c) + for _, skip := range skips { + numNodes += len(skip) } + + res := make(map[NodeID]struct{}, len(c)+numNodes) for nodeID := range c { res[nodeID] = struct{}{} } + for _, skip := range skips { + for nodeID := range skip { + res[nodeID] = struct{}{} + } + } return res } @@ -360,6 +364,11 @@ func (a *Agent) controller() { // channels with, but didn't succeed. failedNodes := make(map[NodeID]struct{}) + // pendingConns tracks the nodes that we are attempting to make + // connections to. This prevents us from making duplicate connection + // requests to the same node. + pendingConns := make(map[NodeID]struct{}) + // pendingOpens tracks the channels that we've requested to be // initiated, but haven't yet been confirmed as being fully opened. // This state is required as otherwise, we may go over our allotted @@ -481,7 +490,9 @@ func (a *Agent) controller() { // duplicate edges. connectedNodes := a.chanState.ConnectedNodes() pendingMtx.Lock() - nodesToSkip := mergeNodeMaps(connectedNodes, failedNodes, pendingOpens) + nodesToSkip := mergeNodeMaps(pendingOpens, + pendingConns, connectedNodes, failedNodes, + ) pendingMtx.Unlock() // If we reach this point, then according to our heuristic we @@ -507,32 +518,40 @@ func (a *Agent) controller() { log.Infof("Attempting to execute channel attachment "+ "directives: %v", spew.Sdump(chanCandidates)) + // Before proceeding, check to see if we have any slots + // available to open channels. If there are any, we will attempt + // to dispatch the retrieved directives since we can't be + // certain which ones may actually succeed. If too many + // connections succeed, we will they will be ignored and made + // available to future heuristic selections. + pendingMtx.Lock() + if uint16(len(pendingOpens)) >= a.cfg.MaxPendingOpens { + pendingMtx.Unlock() + log.Debugf("Reached cap of %v pending "+ + "channel opens, will retry "+ + "after success/failure", + a.cfg.MaxPendingOpens) + continue + } + // For each recommended attachment directive, we'll launch a // new goroutine to attempt to carry out the directive. If any // of these succeed, then we'll receive a new state update, // taking us back to the top of our controller loop. - pendingMtx.Lock() for _, chanCandidate := range chanCandidates { - // Before we proceed, we'll check to see if this - // attempt would take us past the total number of - // allowed pending opens. If so, then we'll skip this - // round and wait for an attempt to either fail or - // succeed. - if uint16(len(pendingOpens))+1 > - a.cfg.MaxPendingOpens { - - log.Debugf("Reached cap of %v pending "+ - "channel opens, will retry "+ - "after success/failure", - a.cfg.MaxPendingOpens) + // Skip candidates which we are already trying + // to establish a connection with. + nodeID := chanCandidate.NodeID + if _, ok := pendingConns[nodeID]; ok { continue } + pendingConns[nodeID] = struct{}{} go func(directive AttachmentDirective) { // We'll start out by attempting to connect to // the peer in order to begin the funding // workflow. - pub := directive.PeerKey + pub := directive.NodeKey alreadyConnected, err := a.cfg.ConnectToPeer( pub, directive.Addrs, ) @@ -548,6 +567,7 @@ func (a *Agent) controller() { // again. nodeID := NewNodeID(pub) pendingMtx.Lock() + delete(pendingConns, nodeID) failedNodes[nodeID] = struct{}{} pendingMtx.Unlock() @@ -558,24 +578,31 @@ func (a *Agent) controller() { return } - // If we were succesful, we'll track this peer - // in our set of pending opens. We do this here - // to ensure we don't stall on selecting new - // peers if the connection attempt happens to - // take too long. + // The connection was successful, though before + // progressing we must check that we have not + // already met our quota for max pending open + // channels. This can happen if multiple + // directives were spawned but fewer slots were + // available, and other successful attempts + // finished first. pendingMtx.Lock() - if uint16(len(pendingOpens))+1 > + if uint16(len(pendingOpens)) >= a.cfg.MaxPendingOpens { - - pendingMtx.Unlock() - - // Since we've reached our max number - // of pending opens, we'll disconnect - // this peer and exit. However, if we - // were previously connected to them, - // then we'll make sure to maintain the + // Since we've reached our max number of + // pending opens, we'll disconnect this + // peer and exit. However, if we were + // previously connected to them, then + // we'll make sure to maintain the // connection alive. if alreadyConnected { + // Since we succeeded in + // connecting, we won't add this + // peer to the failed nodes map, + // but we will remove it from + // pendingConns so that it can + // be retried in the future. + delete(pendingConns, nodeID) + pendingMtx.Unlock() return } @@ -589,10 +616,23 @@ func (a *Agent) controller() { pub.SerializeCompressed(), err) } + + // Now that we have disconnected, we can + // remove this node from our pending + // conns map, permitting subsequent + // connection attempts. + delete(pendingConns, nodeID) + pendingMtx.Unlock() return } - nodeID := NewNodeID(directive.PeerKey) + // If we were successful, we'll track this peer + // in our set of pending opens. We do this here + // to ensure we don't stall on selecting new + // peers if the connection attempt happens to + // take too long. + nodeID := directive.NodeID + delete(pendingConns, nodeID) pendingOpens[nodeID] = Channel{ Capacity: directive.ChanAmt, Node: nodeID,