autopilot/agent: move signal processing out of select

This commit is contained in:
Johan T. Halseth 2018-08-31 08:52:47 +02:00
parent 4f43c1c943
commit 3e992f094d
No known key found for this signature in database
GPG Key ID: 15BAADA29DA20D26

@ -440,22 +440,27 @@ func (a *Agent) controller() {
"need for more channels") "need for more channels")
} }
// The agent has been signalled to exit, so we'll bail out
// immediately.
case <-a.quit:
return
}
pendingMtx.Lock() pendingMtx.Lock()
log.Debugf("Pending channels: %v", spew.Sdump(pendingOpens)) log.Debugf("Pending channels: %v", spew.Sdump(pendingOpens))
pendingMtx.Unlock() pendingMtx.Unlock()
// With all the updates applied, we'll obtain a set of // With all the updates applied, we'll obtain a set of the
// the current active channels (confirmed channels), // current active channels (confirmed channels), and also
// and also factor in our set of unconfirmed channels. // factor in our set of unconfirmed channels.
confirmedChans := a.chanState confirmedChans := a.chanState
pendingMtx.Lock() pendingMtx.Lock()
totalChans := mergeChanState(pendingOpens, confirmedChans) totalChans := mergeChanState(pendingOpens, confirmedChans)
pendingMtx.Unlock() pendingMtx.Unlock()
// Now that we've updated our internal state, we'll // Now that we've updated our internal state, we'll consult our
// consult our channel attachment heuristic to // channel attachment heuristic to determine if we should open
// determine if we should open up any additional // up any additional channels or modify existing channels.
// channels or modify existing channels.
availableFunds, numChans, needMore := a.cfg.Heuristic.NeedMoreChans( availableFunds, numChans, needMore := a.cfg.Heuristic.NeedMoreChans(
totalChans, a.totalBalance, totalChans, a.totalBalance,
) )
@ -466,20 +471,19 @@ func (a *Agent) controller() {
log.Infof("Triggering attachment directive dispatch, "+ log.Infof("Triggering attachment directive dispatch, "+
"total_funds=%v", a.totalBalance) "total_funds=%v", a.totalBalance)
// We're to attempt an attachment so we'll o obtain the // We're to attempt an attachment so we'll o obtain the set of
// set of nodes that we currently have channels with so // nodes that we currently have channels with so we avoid
// we avoid duplicate edges. // duplicate edges.
connectedNodes := a.chanState.ConnectedNodes() connectedNodes := a.chanState.ConnectedNodes()
pendingMtx.Lock() pendingMtx.Lock()
nodesToSkip := mergeNodeMaps(connectedNodes, failedNodes, pendingOpens) nodesToSkip := mergeNodeMaps(connectedNodes, failedNodes, pendingOpens)
pendingMtx.Unlock() pendingMtx.Unlock()
// If we reach this point, then according to our // If we reach this point, then according to our heuristic we
// heuristic we should modify our channel state to tend // should modify our channel state to tend towards what it
// towards what it determines to the optimal state. So // determines to the optimal state. So we'll call Select to get
// we'll call Select to get a fresh batch of attachment // a fresh batch of attachment directives, passing in the
// directives, passing in the amount of funds available // amount of funds available for us to use.
// for us to use.
chanCandidates, err := a.cfg.Heuristic.Select( chanCandidates, err := a.cfg.Heuristic.Select(
a.cfg.Self, a.cfg.Graph, availableFunds, a.cfg.Self, a.cfg.Graph, availableFunds,
numChans, nodesToSkip, numChans, nodesToSkip,
@ -498,18 +502,17 @@ func (a *Agent) controller() {
log.Infof("Attempting to execute channel attachment "+ log.Infof("Attempting to execute channel attachment "+
"directives: %v", spew.Sdump(chanCandidates)) "directives: %v", spew.Sdump(chanCandidates))
// For each recommended attachment directive, we'll // For each recommended attachment directive, we'll launch a
// launch a new goroutine to attempt to carry out the // new goroutine to attempt to carry out the directive. If any
// directive. If any of these succeed, then we'll // of these succeed, then we'll receive a new state update,
// receive a new state update, taking us back to the // taking us back to the top of our controller loop.
// top of our controller loop.
pendingMtx.Lock() pendingMtx.Lock()
for _, chanCandidate := range chanCandidates { for _, chanCandidate := range chanCandidates {
// Before we proceed, we'll check to see if // Before we proceed, we'll check to see if this
// this attempt would take us past the total // attempt would take us past the total number of
// number of allowed pending opens. If so, then // allowed pending opens. If so, then we'll skip this
// we'll skip this round and wait for an // round and wait for an attempt to either fail or
// attempt to either fail or succeed. // succeed.
if uint16(len(pendingOpens))+1 > if uint16(len(pendingOpens))+1 >
a.cfg.MaxPendingOpens { a.cfg.MaxPendingOpens {
@ -521,9 +524,9 @@ func (a *Agent) controller() {
} }
go func(directive AttachmentDirective) { go func(directive AttachmentDirective) {
// We'll start out by attempting to // We'll start out by attempting to connect to
// connect to the peer in order to begin // the peer in order to begin the funding
// the funding workflow. // workflow.
pub := directive.PeerKey pub := directive.PeerKey
alreadyConnected, err := a.cfg.ConnectToPeer( alreadyConnected, err := a.cfg.ConnectToPeer(
pub, directive.Addrs, pub, directive.Addrs,
@ -534,29 +537,26 @@ func (a *Agent) controller() {
pub.SerializeCompressed(), pub.SerializeCompressed(),
err) err)
// Since we failed to connect to // Since we failed to connect to them,
// them, we'll mark them as // we'll mark them as failed so that we
// failed so that we don't // don't attempt to connect to them
// attempt to connect to them
// again. // again.
nodeID := NewNodeID(pub) nodeID := NewNodeID(pub)
pendingMtx.Lock() pendingMtx.Lock()
failedNodes[nodeID] = struct{}{} failedNodes[nodeID] = struct{}{}
pendingMtx.Unlock() pendingMtx.Unlock()
// Finally, we'll trigger the // Finally, we'll trigger the agent to
// agent to select new peers to // select new peers to connect to.
// connect to.
a.OnChannelOpenFailure() a.OnChannelOpenFailure()
return return
} }
// If we were succesful, we'll track // If we were succesful, we'll track this peer
// this peer in our set of pending // in our set of pending opens. We do this here
// opens. We do this here to ensure we // to ensure we don't stall on selecting new
// don't stall on selecting new peers if // peers if the connection attempt happens to
// the connection attempt happens to
// take too long. // take too long.
pendingMtx.Lock() pendingMtx.Lock()
if uint16(len(pendingOpens))+1 > if uint16(len(pendingOpens))+1 >
@ -564,14 +564,12 @@ func (a *Agent) controller() {
pendingMtx.Unlock() pendingMtx.Unlock()
// Since we've reached our max // Since we've reached our max number
// number of pending opens, // of pending opens, we'll disconnect
// we'll disconnect this peer // this peer and exit. However, if we
// and exit. However, if we were // were previously connected to them,
// previously connected to them, // then we'll make sure to maintain the
// then we'll make sure to // connection alive.
// maintain the connection
// alive.
if alreadyConnected { if alreadyConnected {
return return
} }
@ -596,8 +594,8 @@ func (a *Agent) controller() {
} }
pendingMtx.Unlock() pendingMtx.Unlock()
// We can then begin the funding // We can then begin the funding workflow with
// workflow with this peer. // this peer.
err = a.cfg.ChanController.OpenChannel( err = a.cfg.ChanController.OpenChannel(
pub, directive.ChanAmt, pub, directive.ChanAmt,
) )
@ -607,27 +605,24 @@ func (a *Agent) controller() {
pub.SerializeCompressed(), pub.SerializeCompressed(),
directive.ChanAmt, err) directive.ChanAmt, err)
// As the attempt failed, we'll // As the attempt failed, we'll clear
// clear the peer from the set of // the peer from the set of pending
// pending opens and mark them // opens and mark them as failed so we
// as failed so we don't attempt // don't attempt to open a channel to
// to open a channel to them // them again.
// again.
pendingMtx.Lock() pendingMtx.Lock()
delete(pendingOpens, nodeID) delete(pendingOpens, nodeID)
failedNodes[nodeID] = struct{}{} failedNodes[nodeID] = struct{}{}
pendingMtx.Unlock() pendingMtx.Unlock()
// Trigger the agent to // Trigger the agent to re-evaluate
// re-evaluate everything and // everything and possibly retry with a
// possibly retry with a
// different node. // different node.
a.OnChannelOpenFailure() a.OnChannelOpenFailure()
// Finally, we should also // Finally, we should also disconnect
// disconnect the peer if we // the peer if we weren't already
// weren't already connected to // connected to them beforehand by an
// them beforehand by an
// external subsystem. // external subsystem.
if alreadyConnected { if alreadyConnected {
return return
@ -643,19 +638,13 @@ func (a *Agent) controller() {
} }
} }
// Since the channel open was successful // Since the channel open was successful and is
// and is currently pending, we'll // currently pending, we'll trigger the
// trigger the autopilot agent to query // autopilot agent to query for more peers.
// for more peers.
a.OnChannelPendingOpen() a.OnChannelPendingOpen()
}(chanCandidate) }(chanCandidate)
} }
pendingMtx.Unlock() pendingMtx.Unlock()
// The agent has been signalled to exit, so we'll bail out
// immediately.
case <-a.quit:
return
}
} }
} }