diff --git a/autopilot/agent.go b/autopilot/agent.go index 10be87b9..42370fd0 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -548,151 +548,121 @@ func (a *Agent) controller() { a.pendingConns[nodeID] = struct{}{} a.wg.Add(1) - go func(directive AttachmentDirective) { - defer a.wg.Done() - - // We'll start out by attempting to connect to - // the peer in order to begin the funding - // workflow. - pub := directive.NodeKey - alreadyConnected, err := a.cfg.ConnectToPeer( - pub, directive.Addrs, - ) - if err != nil { - log.Warnf("Unable to connect "+ - "to %x: %v", - pub.SerializeCompressed(), - err) - - // Since we failed to connect to them, - // we'll mark them as failed so that we - // don't attempt to connect to them - // again. - nodeID := NewNodeID(pub) - a.pendingMtx.Lock() - delete(a.pendingConns, nodeID) - a.failedNodes[nodeID] = struct{}{} - a.pendingMtx.Unlock() - - // Finally, we'll trigger the agent to - // select new peers to connect to. - a.OnChannelOpenFailure() - - return - } - - // The connection was successful, though before - // progressing we must check that we have not - // already met our quota for max pending open - // channels. This can happen if multiple - // directives were spawned but fewer slots were - // available, and other successful attempts - // finished first. - a.pendingMtx.Lock() - if uint16(len(a.pendingOpens)) >= - a.cfg.Constraints.MaxPendingOpens { - // Since we've reached our max number of - // pending opens, we'll disconnect this - // peer and exit. However, if we were - // previously connected to them, then - // we'll make sure to maintain the - // connection alive. - if alreadyConnected { - // Since we succeeded in - // connecting, we won't add this - // peer to the failed nodes map, - // but we will remove it from - // a.pendingConns so that it can - // be retried in the future. - delete(a.pendingConns, nodeID) - a.pendingMtx.Unlock() - return - } - - err = a.cfg.DisconnectPeer( - pub, - ) - if err != nil { - log.Warnf("Unable to "+ - "disconnect peer "+ - "%x: %v", - pub.SerializeCompressed(), - err) - } - - // Now that we have disconnected, we can - // remove this node from our pending - // conns map, permitting subsequent - // connection attempts. - delete(a.pendingConns, nodeID) - a.pendingMtx.Unlock() - return - } - - // If we were successful, we'll track this peer - // in our set of pending opens. We do this here - // to ensure we don't stall on selecting new - // peers if the connection attempt happens to - // take too long. - nodeID := directive.NodeID - delete(a.pendingConns, nodeID) - a.pendingOpens[nodeID] = Channel{ - Capacity: directive.ChanAmt, - Node: nodeID, - } - a.pendingMtx.Unlock() - - // We can then begin the funding workflow with - // this peer. - err = a.cfg.ChanController.OpenChannel( - pub, directive.ChanAmt, - ) - if err != nil { - log.Warnf("Unable to open "+ - "channel to %x of %v: %v", - pub.SerializeCompressed(), - directive.ChanAmt, err) - - // As the attempt failed, we'll clear - // the peer from the set of pending - // opens and mark them as failed so we - // don't attempt to open a channel to - // them again. - a.pendingMtx.Lock() - delete(a.pendingOpens, nodeID) - a.failedNodes[nodeID] = struct{}{} - a.pendingMtx.Unlock() - - // Trigger the agent to re-evaluate - // everything and possibly retry with a - // different node. - a.OnChannelOpenFailure() - - // Finally, we should also disconnect - // the peer if we weren't already - // connected to them beforehand by an - // external subsystem. - if alreadyConnected { - return - } - - err = a.cfg.DisconnectPeer(pub) - if err != nil { - log.Warnf("Unable to "+ - "disconnect peer "+ - "%x: %v", - pub.SerializeCompressed(), - err) - } - } - - // Since the channel open was successful and is - // currently pending, we'll trigger the - // autopilot agent to query for more peers. - a.OnChannelPendingOpen() - }(chanCandidate) + go a.executeDirective(chanCandidate) } a.pendingMtx.Unlock() - } } + +// executeDirective attempts to connect to the channel candidate specified by +// the given attachment directive, and open a channel of the given size. +// +// NOTE: MUST be run as a goroutine. +func (a *Agent) executeDirective(directive AttachmentDirective) { + defer a.wg.Done() + + // We'll start out by attempting to connect to the peer in order to + // begin the funding workflow. + pub := directive.NodeKey + nodeID := directive.NodeID + alreadyConnected, err := a.cfg.ConnectToPeer(pub, directive.Addrs) + if err != nil { + log.Warnf("Unable to connect to %x: %v", + pub.SerializeCompressed(), err) + + // Since we failed to connect to them, we'll mark them as + // failed so that we don't attempt to connect to them again. + a.pendingMtx.Lock() + delete(a.pendingConns, nodeID) + a.failedNodes[nodeID] = struct{}{} + a.pendingMtx.Unlock() + + // Finally, we'll trigger the agent to select new peers to + // connect to. + a.OnChannelOpenFailure() + + return + } + + // The connection was successful, though before progressing we must + // check that we have not already met our quota for max pending open + // channels. This can happen if multiple directives were spawned but + // fewer slots were available, and other successful attempts finished + // first. + a.pendingMtx.Lock() + if uint16(len(a.pendingOpens)) >= + a.cfg.Constraints.MaxPendingOpens { + // Since we've reached our max number of pending opens, we'll + // disconnect this peer and exit. However, if we were + // previously connected to them, then we'll make sure to + // maintain the connection alive. + if alreadyConnected { + // Since we succeeded in connecting, we won't add this + // peer to the failed nodes map, but we will remove it + // from a.pendingConns so that it can be retried in the + // future. + delete(a.pendingConns, nodeID) + a.pendingMtx.Unlock() + return + } + + err = a.cfg.DisconnectPeer(pub) + if err != nil { + log.Warnf("Unable to disconnect peer %x: %v", + pub.SerializeCompressed(), err) + } + + // Now that we have disconnected, we can remove this node from + // our pending conns map, permitting subsequent connection + // attempts. + delete(a.pendingConns, nodeID) + a.pendingMtx.Unlock() + return + } + + // If we were successful, we'll track this peer in our set of pending + // opens. We do this here to ensure we don't stall on selecting new + // peers if the connection attempt happens to take too long. + delete(a.pendingConns, nodeID) + a.pendingOpens[nodeID] = Channel{ + Capacity: directive.ChanAmt, + Node: nodeID, + } + a.pendingMtx.Unlock() + + // We can then begin the funding workflow with this peer. + err = a.cfg.ChanController.OpenChannel(pub, directive.ChanAmt) + if err != nil { + log.Warnf("Unable to open channel to %x of %v: %v", + pub.SerializeCompressed(), directive.ChanAmt, err) + + // As the attempt failed, we'll clear the peer from the set of + // pending opens and mark them as failed so we don't attempt to + // open a channel to them again. + a.pendingMtx.Lock() + delete(a.pendingOpens, nodeID) + a.failedNodes[nodeID] = struct{}{} + a.pendingMtx.Unlock() + + // Trigger the agent to re-evaluate everything and possibly + // retry with a different node. + a.OnChannelOpenFailure() + + // Finally, we should also disconnect the peer if we weren't + // already connected to them beforehand by an external + // subsystem. + if alreadyConnected { + return + } + + err = a.cfg.DisconnectPeer(pub) + if err != nil { + log.Warnf("Unable to disconnect peer %x: %v", + pub.SerializeCompressed(), err) + } + } + + // Since the channel open was successful and is currently pending, + // we'll trigger the autopilot agent to query for more peers. + a.OnChannelPendingOpen() +}