From 3e992f094d1ca07266ad4e565f8a1869d1c547c0 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 31 Aug 2018 08:52:47 +0200 Subject: [PATCH] autopilot/agent: move signal processing out of select --- autopilot/agent.go | 413 ++++++++++++++++++++++----------------------- 1 file changed, 201 insertions(+), 212 deletions(-) diff --git a/autopilot/agent.go b/autopilot/agent.go index 6f137583..a401d247 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -440,222 +440,211 @@ func (a *Agent) controller() { "need for more channels") } - pendingMtx.Lock() - log.Debugf("Pending channels: %v", spew.Sdump(pendingOpens)) - pendingMtx.Unlock() - - // With all the updates applied, we'll obtain a set of - // the current active channels (confirmed channels), - // and also factor in our set of unconfirmed channels. - confirmedChans := a.chanState - pendingMtx.Lock() - totalChans := mergeChanState(pendingOpens, confirmedChans) - pendingMtx.Unlock() - - // Now that we've updated our internal state, we'll - // consult our channel attachment heuristic to - // determine if we should open up any additional - // channels or modify existing channels. - availableFunds, numChans, needMore := a.cfg.Heuristic.NeedMoreChans( - totalChans, a.totalBalance, - ) - if !needMore { - continue - } - - log.Infof("Triggering attachment directive dispatch, "+ - "total_funds=%v", a.totalBalance) - - // We're to attempt an attachment so we'll o obtain the - // set of nodes that we currently have channels with so - // we avoid duplicate edges. - connectedNodes := a.chanState.ConnectedNodes() - pendingMtx.Lock() - nodesToSkip := mergeNodeMaps(connectedNodes, failedNodes, pendingOpens) - pendingMtx.Unlock() - - // If we reach this point, then according to our - // heuristic we should modify our channel state to tend - // towards what it determines to the optimal state. So - // we'll call Select to get a fresh batch of attachment - // directives, passing in the amount of funds available - // for us to use. - chanCandidates, err := a.cfg.Heuristic.Select( - a.cfg.Self, a.cfg.Graph, availableFunds, - numChans, nodesToSkip, - ) - if err != nil { - log.Errorf("Unable to select candidates for "+ - "attachment: %v", err) - continue - } - - if len(chanCandidates) == 0 { - log.Infof("No eligible candidates to connect to") - continue - } - - log.Infof("Attempting to execute channel attachment "+ - "directives: %v", spew.Sdump(chanCandidates)) - - // For each recommended attachment directive, we'll - // launch a new goroutine to attempt to carry out the - // directive. If any of these succeed, then we'll - // receive a new state update, taking us back to the - // top of our controller loop. - pendingMtx.Lock() - for _, chanCandidate := range chanCandidates { - // Before we proceed, we'll check to see if - // this attempt would take us past the total - // number of allowed pending opens. If so, then - // we'll skip this round and wait for an - // attempt to either fail or succeed. - if uint16(len(pendingOpens))+1 > - a.cfg.MaxPendingOpens { - - log.Debugf("Reached cap of %v pending "+ - "channel opens, will retry "+ - "after success/failure", - a.cfg.MaxPendingOpens) - continue - } - - go func(directive AttachmentDirective) { - // We'll start out by attempting to - // connect to the peer in order to begin - // the funding workflow. - pub := directive.PeerKey - alreadyConnected, err := a.cfg.ConnectToPeer( - pub, directive.Addrs, - ) - if err != nil { - log.Warnf("Unable to connect "+ - "to %x: %v", - pub.SerializeCompressed(), - err) - - // Since we failed to connect to - // them, we'll mark them as - // failed so that we don't - // attempt to connect to them - // again. - nodeID := NewNodeID(pub) - pendingMtx.Lock() - failedNodes[nodeID] = struct{}{} - pendingMtx.Unlock() - - // Finally, we'll trigger the - // agent to select new peers to - // connect to. - a.OnChannelOpenFailure() - - return - } - - // If we were succesful, we'll track - // this peer in our set of pending - // opens. We do this here to ensure we - // don't stall on selecting new peers if - // the connection attempt happens to - // take too long. - pendingMtx.Lock() - if uint16(len(pendingOpens))+1 > - a.cfg.MaxPendingOpens { - - pendingMtx.Unlock() - - // Since we've reached our max - // number of pending opens, - // we'll disconnect this peer - // and exit. However, if we were - // previously connected to them, - // then we'll make sure to - // maintain the connection - // alive. - if alreadyConnected { - return - } - - err = a.cfg.DisconnectPeer( - pub, - ) - if err != nil { - log.Warnf("Unable to "+ - "disconnect peer "+ - "%x: %v", - pub.SerializeCompressed(), - err) - } - return - } - - nodeID := NewNodeID(directive.PeerKey) - pendingOpens[nodeID] = Channel{ - Capacity: directive.ChanAmt, - Node: nodeID, - } - pendingMtx.Unlock() - - // We can then begin the funding - // workflow with this peer. - err = a.cfg.ChanController.OpenChannel( - pub, directive.ChanAmt, - ) - if err != nil { - log.Warnf("Unable to open "+ - "channel to %x of %v: %v", - pub.SerializeCompressed(), - directive.ChanAmt, err) - - // As the attempt failed, we'll - // clear the peer from the set of - // pending opens and mark them - // as failed so we don't attempt - // to open a channel to them - // again. - pendingMtx.Lock() - delete(pendingOpens, nodeID) - failedNodes[nodeID] = struct{}{} - pendingMtx.Unlock() - - // Trigger the agent to - // re-evaluate everything and - // possibly retry with a - // different node. - a.OnChannelOpenFailure() - - // Finally, we should also - // disconnect the peer if we - // weren't already connected to - // them beforehand by an - // external subsystem. - if alreadyConnected { - return - } - - err = a.cfg.DisconnectPeer(pub) - if err != nil { - log.Warnf("Unable to "+ - "disconnect peer "+ - "%x: %v", - pub.SerializeCompressed(), - err) - } - } - - // Since the channel open was successful - // and is currently pending, we'll - // trigger the autopilot agent to query - // for more peers. - a.OnChannelPendingOpen() - }(chanCandidate) - } - pendingMtx.Unlock() - // The agent has been signalled to exit, so we'll bail out // immediately. case <-a.quit: return } + + pendingMtx.Lock() + log.Debugf("Pending channels: %v", spew.Sdump(pendingOpens)) + pendingMtx.Unlock() + + // With all the updates applied, we'll obtain a set of the + // current active channels (confirmed channels), and also + // factor in our set of unconfirmed channels. + confirmedChans := a.chanState + pendingMtx.Lock() + totalChans := mergeChanState(pendingOpens, confirmedChans) + pendingMtx.Unlock() + + // Now that we've updated our internal state, we'll consult our + // channel attachment heuristic to determine if we should open + // up any additional channels or modify existing channels. + availableFunds, numChans, needMore := a.cfg.Heuristic.NeedMoreChans( + totalChans, a.totalBalance, + ) + if !needMore { + continue + } + + log.Infof("Triggering attachment directive dispatch, "+ + "total_funds=%v", a.totalBalance) + + // We're to attempt an attachment so we'll o obtain the set of + // nodes that we currently have channels with so we avoid + // duplicate edges. + connectedNodes := a.chanState.ConnectedNodes() + pendingMtx.Lock() + nodesToSkip := mergeNodeMaps(connectedNodes, failedNodes, pendingOpens) + pendingMtx.Unlock() + + // If we reach this point, then according to our heuristic we + // should modify our channel state to tend towards what it + // determines to the optimal state. So we'll call Select to get + // a fresh batch of attachment directives, passing in the + // amount of funds available for us to use. + chanCandidates, err := a.cfg.Heuristic.Select( + a.cfg.Self, a.cfg.Graph, availableFunds, + numChans, nodesToSkip, + ) + if err != nil { + log.Errorf("Unable to select candidates for "+ + "attachment: %v", err) + continue + } + + if len(chanCandidates) == 0 { + log.Infof("No eligible candidates to connect to") + continue + } + + log.Infof("Attempting to execute channel attachment "+ + "directives: %v", spew.Sdump(chanCandidates)) + + // For each recommended attachment directive, we'll launch a + // new goroutine to attempt to carry out the directive. If any + // of these succeed, then we'll receive a new state update, + // taking us back to the top of our controller loop. + pendingMtx.Lock() + for _, chanCandidate := range chanCandidates { + // Before we proceed, we'll check to see if this + // attempt would take us past the total number of + // allowed pending opens. If so, then we'll skip this + // round and wait for an attempt to either fail or + // succeed. + if uint16(len(pendingOpens))+1 > + a.cfg.MaxPendingOpens { + + log.Debugf("Reached cap of %v pending "+ + "channel opens, will retry "+ + "after success/failure", + a.cfg.MaxPendingOpens) + continue + } + + go func(directive AttachmentDirective) { + // We'll start out by attempting to connect to + // the peer in order to begin the funding + // workflow. + pub := directive.PeerKey + alreadyConnected, err := a.cfg.ConnectToPeer( + pub, directive.Addrs, + ) + if err != nil { + log.Warnf("Unable to connect "+ + "to %x: %v", + pub.SerializeCompressed(), + err) + + // Since we failed to connect to them, + // we'll mark them as failed so that we + // don't attempt to connect to them + // again. + nodeID := NewNodeID(pub) + pendingMtx.Lock() + failedNodes[nodeID] = struct{}{} + pendingMtx.Unlock() + + // Finally, we'll trigger the agent to + // select new peers to connect to. + a.OnChannelOpenFailure() + + return + } + + // If we were succesful, we'll track this peer + // in our set of pending opens. We do this here + // to ensure we don't stall on selecting new + // peers if the connection attempt happens to + // take too long. + pendingMtx.Lock() + if uint16(len(pendingOpens))+1 > + a.cfg.MaxPendingOpens { + + pendingMtx.Unlock() + + // Since we've reached our max number + // of pending opens, we'll disconnect + // this peer and exit. However, if we + // were previously connected to them, + // then we'll make sure to maintain the + // connection alive. + if alreadyConnected { + return + } + + err = a.cfg.DisconnectPeer( + pub, + ) + if err != nil { + log.Warnf("Unable to "+ + "disconnect peer "+ + "%x: %v", + pub.SerializeCompressed(), + err) + } + return + } + + nodeID := NewNodeID(directive.PeerKey) + pendingOpens[nodeID] = Channel{ + Capacity: directive.ChanAmt, + Node: nodeID, + } + pendingMtx.Unlock() + + // We can then begin the funding workflow with + // this peer. + err = a.cfg.ChanController.OpenChannel( + pub, directive.ChanAmt, + ) + if err != nil { + log.Warnf("Unable to open "+ + "channel to %x of %v: %v", + pub.SerializeCompressed(), + directive.ChanAmt, err) + + // As the attempt failed, we'll clear + // the peer from the set of pending + // opens and mark them as failed so we + // don't attempt to open a channel to + // them again. + pendingMtx.Lock() + delete(pendingOpens, nodeID) + failedNodes[nodeID] = struct{}{} + pendingMtx.Unlock() + + // Trigger the agent to re-evaluate + // everything and possibly retry with a + // different node. + a.OnChannelOpenFailure() + + // Finally, we should also disconnect + // the peer if we weren't already + // connected to them beforehand by an + // external subsystem. + if alreadyConnected { + return + } + + err = a.cfg.DisconnectPeer(pub) + if err != nil { + log.Warnf("Unable to "+ + "disconnect peer "+ + "%x: %v", + pub.SerializeCompressed(), + err) + } + } + + // Since the channel open was successful and is + // currently pending, we'll trigger the + // autopilot agent to query for more peers. + a.OnChannelPendingOpen() + }(chanCandidate) + } + pendingMtx.Unlock() + } }