autopilot/agent: split attachement directive attempts into method
This commit is contained in:
parent
fb10175ea5
commit
26810fe928
@ -548,151 +548,121 @@ func (a *Agent) controller() {
|
||||
a.pendingConns[nodeID] = struct{}{}
|
||||
|
||||
a.wg.Add(1)
|
||||
go func(directive AttachmentDirective) {
|
||||
defer a.wg.Done()
|
||||
|
||||
// We'll start out by attempting to connect to
|
||||
// the peer in order to begin the funding
|
||||
// workflow.
|
||||
pub := directive.NodeKey
|
||||
alreadyConnected, err := a.cfg.ConnectToPeer(
|
||||
pub, directive.Addrs,
|
||||
)
|
||||
if err != nil {
|
||||
log.Warnf("Unable to connect "+
|
||||
"to %x: %v",
|
||||
pub.SerializeCompressed(),
|
||||
err)
|
||||
|
||||
// Since we failed to connect to them,
|
||||
// we'll mark them as failed so that we
|
||||
// don't attempt to connect to them
|
||||
// again.
|
||||
nodeID := NewNodeID(pub)
|
||||
a.pendingMtx.Lock()
|
||||
delete(a.pendingConns, nodeID)
|
||||
a.failedNodes[nodeID] = struct{}{}
|
||||
a.pendingMtx.Unlock()
|
||||
|
||||
// Finally, we'll trigger the agent to
|
||||
// select new peers to connect to.
|
||||
a.OnChannelOpenFailure()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// The connection was successful, though before
|
||||
// progressing we must check that we have not
|
||||
// already met our quota for max pending open
|
||||
// channels. This can happen if multiple
|
||||
// directives were spawned but fewer slots were
|
||||
// available, and other successful attempts
|
||||
// finished first.
|
||||
a.pendingMtx.Lock()
|
||||
if uint16(len(a.pendingOpens)) >=
|
||||
a.cfg.Constraints.MaxPendingOpens {
|
||||
// Since we've reached our max number of
|
||||
// pending opens, we'll disconnect this
|
||||
// peer and exit. However, if we were
|
||||
// previously connected to them, then
|
||||
// we'll make sure to maintain the
|
||||
// connection alive.
|
||||
if alreadyConnected {
|
||||
// Since we succeeded in
|
||||
// connecting, we won't add this
|
||||
// peer to the failed nodes map,
|
||||
// but we will remove it from
|
||||
// a.pendingConns so that it can
|
||||
// be retried in the future.
|
||||
delete(a.pendingConns, nodeID)
|
||||
a.pendingMtx.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
err = a.cfg.DisconnectPeer(
|
||||
pub,
|
||||
)
|
||||
if err != nil {
|
||||
log.Warnf("Unable to "+
|
||||
"disconnect peer "+
|
||||
"%x: %v",
|
||||
pub.SerializeCompressed(),
|
||||
err)
|
||||
}
|
||||
|
||||
// Now that we have disconnected, we can
|
||||
// remove this node from our pending
|
||||
// conns map, permitting subsequent
|
||||
// connection attempts.
|
||||
delete(a.pendingConns, nodeID)
|
||||
a.pendingMtx.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// If we were successful, we'll track this peer
|
||||
// in our set of pending opens. We do this here
|
||||
// to ensure we don't stall on selecting new
|
||||
// peers if the connection attempt happens to
|
||||
// take too long.
|
||||
nodeID := directive.NodeID
|
||||
delete(a.pendingConns, nodeID)
|
||||
a.pendingOpens[nodeID] = Channel{
|
||||
Capacity: directive.ChanAmt,
|
||||
Node: nodeID,
|
||||
}
|
||||
a.pendingMtx.Unlock()
|
||||
|
||||
// We can then begin the funding workflow with
|
||||
// this peer.
|
||||
err = a.cfg.ChanController.OpenChannel(
|
||||
pub, directive.ChanAmt,
|
||||
)
|
||||
if err != nil {
|
||||
log.Warnf("Unable to open "+
|
||||
"channel to %x of %v: %v",
|
||||
pub.SerializeCompressed(),
|
||||
directive.ChanAmt, err)
|
||||
|
||||
// As the attempt failed, we'll clear
|
||||
// the peer from the set of pending
|
||||
// opens and mark them as failed so we
|
||||
// don't attempt to open a channel to
|
||||
// them again.
|
||||
a.pendingMtx.Lock()
|
||||
delete(a.pendingOpens, nodeID)
|
||||
a.failedNodes[nodeID] = struct{}{}
|
||||
a.pendingMtx.Unlock()
|
||||
|
||||
// Trigger the agent to re-evaluate
|
||||
// everything and possibly retry with a
|
||||
// different node.
|
||||
a.OnChannelOpenFailure()
|
||||
|
||||
// Finally, we should also disconnect
|
||||
// the peer if we weren't already
|
||||
// connected to them beforehand by an
|
||||
// external subsystem.
|
||||
if alreadyConnected {
|
||||
return
|
||||
}
|
||||
|
||||
err = a.cfg.DisconnectPeer(pub)
|
||||
if err != nil {
|
||||
log.Warnf("Unable to "+
|
||||
"disconnect peer "+
|
||||
"%x: %v",
|
||||
pub.SerializeCompressed(),
|
||||
err)
|
||||
}
|
||||
}
|
||||
|
||||
// Since the channel open was successful and is
|
||||
// currently pending, we'll trigger the
|
||||
// autopilot agent to query for more peers.
|
||||
a.OnChannelPendingOpen()
|
||||
}(chanCandidate)
|
||||
go a.executeDirective(chanCandidate)
|
||||
}
|
||||
a.pendingMtx.Unlock()
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// executeDirective attempts to connect to the channel candidate specified by
|
||||
// the given attachment directive, and open a channel of the given size.
|
||||
//
|
||||
// NOTE: MUST be run as a goroutine.
|
||||
func (a *Agent) executeDirective(directive AttachmentDirective) {
|
||||
defer a.wg.Done()
|
||||
|
||||
// We'll start out by attempting to connect to the peer in order to
|
||||
// begin the funding workflow.
|
||||
pub := directive.NodeKey
|
||||
nodeID := directive.NodeID
|
||||
alreadyConnected, err := a.cfg.ConnectToPeer(pub, directive.Addrs)
|
||||
if err != nil {
|
||||
log.Warnf("Unable to connect to %x: %v",
|
||||
pub.SerializeCompressed(), err)
|
||||
|
||||
// Since we failed to connect to them, we'll mark them as
|
||||
// failed so that we don't attempt to connect to them again.
|
||||
a.pendingMtx.Lock()
|
||||
delete(a.pendingConns, nodeID)
|
||||
a.failedNodes[nodeID] = struct{}{}
|
||||
a.pendingMtx.Unlock()
|
||||
|
||||
// Finally, we'll trigger the agent to select new peers to
|
||||
// connect to.
|
||||
a.OnChannelOpenFailure()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// The connection was successful, though before progressing we must
|
||||
// check that we have not already met our quota for max pending open
|
||||
// channels. This can happen if multiple directives were spawned but
|
||||
// fewer slots were available, and other successful attempts finished
|
||||
// first.
|
||||
a.pendingMtx.Lock()
|
||||
if uint16(len(a.pendingOpens)) >=
|
||||
a.cfg.Constraints.MaxPendingOpens {
|
||||
// Since we've reached our max number of pending opens, we'll
|
||||
// disconnect this peer and exit. However, if we were
|
||||
// previously connected to them, then we'll make sure to
|
||||
// maintain the connection alive.
|
||||
if alreadyConnected {
|
||||
// Since we succeeded in connecting, we won't add this
|
||||
// peer to the failed nodes map, but we will remove it
|
||||
// from a.pendingConns so that it can be retried in the
|
||||
// future.
|
||||
delete(a.pendingConns, nodeID)
|
||||
a.pendingMtx.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
err = a.cfg.DisconnectPeer(pub)
|
||||
if err != nil {
|
||||
log.Warnf("Unable to disconnect peer %x: %v",
|
||||
pub.SerializeCompressed(), err)
|
||||
}
|
||||
|
||||
// Now that we have disconnected, we can remove this node from
|
||||
// our pending conns map, permitting subsequent connection
|
||||
// attempts.
|
||||
delete(a.pendingConns, nodeID)
|
||||
a.pendingMtx.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// If we were successful, we'll track this peer in our set of pending
|
||||
// opens. We do this here to ensure we don't stall on selecting new
|
||||
// peers if the connection attempt happens to take too long.
|
||||
delete(a.pendingConns, nodeID)
|
||||
a.pendingOpens[nodeID] = Channel{
|
||||
Capacity: directive.ChanAmt,
|
||||
Node: nodeID,
|
||||
}
|
||||
a.pendingMtx.Unlock()
|
||||
|
||||
// We can then begin the funding workflow with this peer.
|
||||
err = a.cfg.ChanController.OpenChannel(pub, directive.ChanAmt)
|
||||
if err != nil {
|
||||
log.Warnf("Unable to open channel to %x of %v: %v",
|
||||
pub.SerializeCompressed(), directive.ChanAmt, err)
|
||||
|
||||
// As the attempt failed, we'll clear the peer from the set of
|
||||
// pending opens and mark them as failed so we don't attempt to
|
||||
// open a channel to them again.
|
||||
a.pendingMtx.Lock()
|
||||
delete(a.pendingOpens, nodeID)
|
||||
a.failedNodes[nodeID] = struct{}{}
|
||||
a.pendingMtx.Unlock()
|
||||
|
||||
// Trigger the agent to re-evaluate everything and possibly
|
||||
// retry with a different node.
|
||||
a.OnChannelOpenFailure()
|
||||
|
||||
// Finally, we should also disconnect the peer if we weren't
|
||||
// already connected to them beforehand by an external
|
||||
// subsystem.
|
||||
if alreadyConnected {
|
||||
return
|
||||
}
|
||||
|
||||
err = a.cfg.DisconnectPeer(pub)
|
||||
if err != nil {
|
||||
log.Warnf("Unable to disconnect peer %x: %v",
|
||||
pub.SerializeCompressed(), err)
|
||||
}
|
||||
}
|
||||
|
||||
// Since the channel open was successful and is currently pending,
|
||||
// we'll trigger the autopilot agent to query for more peers.
|
||||
a.OnChannelPendingOpen()
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user