From 3e992f094d1ca07266ad4e565f8a1869d1c547c0 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 31 Aug 2018 08:52:47 +0200 Subject: [PATCH 1/5] autopilot/agent: move signal processing out of select --- autopilot/agent.go | 413 ++++++++++++++++++++++----------------------- 1 file changed, 201 insertions(+), 212 deletions(-) diff --git a/autopilot/agent.go b/autopilot/agent.go index 6f137583..a401d247 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -440,222 +440,211 @@ func (a *Agent) controller() { "need for more channels") } - pendingMtx.Lock() - log.Debugf("Pending channels: %v", spew.Sdump(pendingOpens)) - pendingMtx.Unlock() - - // With all the updates applied, we'll obtain a set of - // the current active channels (confirmed channels), - // and also factor in our set of unconfirmed channels. - confirmedChans := a.chanState - pendingMtx.Lock() - totalChans := mergeChanState(pendingOpens, confirmedChans) - pendingMtx.Unlock() - - // Now that we've updated our internal state, we'll - // consult our channel attachment heuristic to - // determine if we should open up any additional - // channels or modify existing channels. - availableFunds, numChans, needMore := a.cfg.Heuristic.NeedMoreChans( - totalChans, a.totalBalance, - ) - if !needMore { - continue - } - - log.Infof("Triggering attachment directive dispatch, "+ - "total_funds=%v", a.totalBalance) - - // We're to attempt an attachment so we'll o obtain the - // set of nodes that we currently have channels with so - // we avoid duplicate edges. - connectedNodes := a.chanState.ConnectedNodes() - pendingMtx.Lock() - nodesToSkip := mergeNodeMaps(connectedNodes, failedNodes, pendingOpens) - pendingMtx.Unlock() - - // If we reach this point, then according to our - // heuristic we should modify our channel state to tend - // towards what it determines to the optimal state. So - // we'll call Select to get a fresh batch of attachment - // directives, passing in the amount of funds available - // for us to use. - chanCandidates, err := a.cfg.Heuristic.Select( - a.cfg.Self, a.cfg.Graph, availableFunds, - numChans, nodesToSkip, - ) - if err != nil { - log.Errorf("Unable to select candidates for "+ - "attachment: %v", err) - continue - } - - if len(chanCandidates) == 0 { - log.Infof("No eligible candidates to connect to") - continue - } - - log.Infof("Attempting to execute channel attachment "+ - "directives: %v", spew.Sdump(chanCandidates)) - - // For each recommended attachment directive, we'll - // launch a new goroutine to attempt to carry out the - // directive. If any of these succeed, then we'll - // receive a new state update, taking us back to the - // top of our controller loop. - pendingMtx.Lock() - for _, chanCandidate := range chanCandidates { - // Before we proceed, we'll check to see if - // this attempt would take us past the total - // number of allowed pending opens. If so, then - // we'll skip this round and wait for an - // attempt to either fail or succeed. - if uint16(len(pendingOpens))+1 > - a.cfg.MaxPendingOpens { - - log.Debugf("Reached cap of %v pending "+ - "channel opens, will retry "+ - "after success/failure", - a.cfg.MaxPendingOpens) - continue - } - - go func(directive AttachmentDirective) { - // We'll start out by attempting to - // connect to the peer in order to begin - // the funding workflow. - pub := directive.PeerKey - alreadyConnected, err := a.cfg.ConnectToPeer( - pub, directive.Addrs, - ) - if err != nil { - log.Warnf("Unable to connect "+ - "to %x: %v", - pub.SerializeCompressed(), - err) - - // Since we failed to connect to - // them, we'll mark them as - // failed so that we don't - // attempt to connect to them - // again. - nodeID := NewNodeID(pub) - pendingMtx.Lock() - failedNodes[nodeID] = struct{}{} - pendingMtx.Unlock() - - // Finally, we'll trigger the - // agent to select new peers to - // connect to. - a.OnChannelOpenFailure() - - return - } - - // If we were succesful, we'll track - // this peer in our set of pending - // opens. We do this here to ensure we - // don't stall on selecting new peers if - // the connection attempt happens to - // take too long. - pendingMtx.Lock() - if uint16(len(pendingOpens))+1 > - a.cfg.MaxPendingOpens { - - pendingMtx.Unlock() - - // Since we've reached our max - // number of pending opens, - // we'll disconnect this peer - // and exit. However, if we were - // previously connected to them, - // then we'll make sure to - // maintain the connection - // alive. - if alreadyConnected { - return - } - - err = a.cfg.DisconnectPeer( - pub, - ) - if err != nil { - log.Warnf("Unable to "+ - "disconnect peer "+ - "%x: %v", - pub.SerializeCompressed(), - err) - } - return - } - - nodeID := NewNodeID(directive.PeerKey) - pendingOpens[nodeID] = Channel{ - Capacity: directive.ChanAmt, - Node: nodeID, - } - pendingMtx.Unlock() - - // We can then begin the funding - // workflow with this peer. - err = a.cfg.ChanController.OpenChannel( - pub, directive.ChanAmt, - ) - if err != nil { - log.Warnf("Unable to open "+ - "channel to %x of %v: %v", - pub.SerializeCompressed(), - directive.ChanAmt, err) - - // As the attempt failed, we'll - // clear the peer from the set of - // pending opens and mark them - // as failed so we don't attempt - // to open a channel to them - // again. - pendingMtx.Lock() - delete(pendingOpens, nodeID) - failedNodes[nodeID] = struct{}{} - pendingMtx.Unlock() - - // Trigger the agent to - // re-evaluate everything and - // possibly retry with a - // different node. - a.OnChannelOpenFailure() - - // Finally, we should also - // disconnect the peer if we - // weren't already connected to - // them beforehand by an - // external subsystem. - if alreadyConnected { - return - } - - err = a.cfg.DisconnectPeer(pub) - if err != nil { - log.Warnf("Unable to "+ - "disconnect peer "+ - "%x: %v", - pub.SerializeCompressed(), - err) - } - } - - // Since the channel open was successful - // and is currently pending, we'll - // trigger the autopilot agent to query - // for more peers. - a.OnChannelPendingOpen() - }(chanCandidate) - } - pendingMtx.Unlock() - // The agent has been signalled to exit, so we'll bail out // immediately. case <-a.quit: return } + + pendingMtx.Lock() + log.Debugf("Pending channels: %v", spew.Sdump(pendingOpens)) + pendingMtx.Unlock() + + // With all the updates applied, we'll obtain a set of the + // current active channels (confirmed channels), and also + // factor in our set of unconfirmed channels. + confirmedChans := a.chanState + pendingMtx.Lock() + totalChans := mergeChanState(pendingOpens, confirmedChans) + pendingMtx.Unlock() + + // Now that we've updated our internal state, we'll consult our + // channel attachment heuristic to determine if we should open + // up any additional channels or modify existing channels. + availableFunds, numChans, needMore := a.cfg.Heuristic.NeedMoreChans( + totalChans, a.totalBalance, + ) + if !needMore { + continue + } + + log.Infof("Triggering attachment directive dispatch, "+ + "total_funds=%v", a.totalBalance) + + // We're to attempt an attachment so we'll o obtain the set of + // nodes that we currently have channels with so we avoid + // duplicate edges. + connectedNodes := a.chanState.ConnectedNodes() + pendingMtx.Lock() + nodesToSkip := mergeNodeMaps(connectedNodes, failedNodes, pendingOpens) + pendingMtx.Unlock() + + // If we reach this point, then according to our heuristic we + // should modify our channel state to tend towards what it + // determines to the optimal state. So we'll call Select to get + // a fresh batch of attachment directives, passing in the + // amount of funds available for us to use. + chanCandidates, err := a.cfg.Heuristic.Select( + a.cfg.Self, a.cfg.Graph, availableFunds, + numChans, nodesToSkip, + ) + if err != nil { + log.Errorf("Unable to select candidates for "+ + "attachment: %v", err) + continue + } + + if len(chanCandidates) == 0 { + log.Infof("No eligible candidates to connect to") + continue + } + + log.Infof("Attempting to execute channel attachment "+ + "directives: %v", spew.Sdump(chanCandidates)) + + // For each recommended attachment directive, we'll launch a + // new goroutine to attempt to carry out the directive. If any + // of these succeed, then we'll receive a new state update, + // taking us back to the top of our controller loop. + pendingMtx.Lock() + for _, chanCandidate := range chanCandidates { + // Before we proceed, we'll check to see if this + // attempt would take us past the total number of + // allowed pending opens. If so, then we'll skip this + // round and wait for an attempt to either fail or + // succeed. + if uint16(len(pendingOpens))+1 > + a.cfg.MaxPendingOpens { + + log.Debugf("Reached cap of %v pending "+ + "channel opens, will retry "+ + "after success/failure", + a.cfg.MaxPendingOpens) + continue + } + + go func(directive AttachmentDirective) { + // We'll start out by attempting to connect to + // the peer in order to begin the funding + // workflow. + pub := directive.PeerKey + alreadyConnected, err := a.cfg.ConnectToPeer( + pub, directive.Addrs, + ) + if err != nil { + log.Warnf("Unable to connect "+ + "to %x: %v", + pub.SerializeCompressed(), + err) + + // Since we failed to connect to them, + // we'll mark them as failed so that we + // don't attempt to connect to them + // again. + nodeID := NewNodeID(pub) + pendingMtx.Lock() + failedNodes[nodeID] = struct{}{} + pendingMtx.Unlock() + + // Finally, we'll trigger the agent to + // select new peers to connect to. + a.OnChannelOpenFailure() + + return + } + + // If we were succesful, we'll track this peer + // in our set of pending opens. We do this here + // to ensure we don't stall on selecting new + // peers if the connection attempt happens to + // take too long. + pendingMtx.Lock() + if uint16(len(pendingOpens))+1 > + a.cfg.MaxPendingOpens { + + pendingMtx.Unlock() + + // Since we've reached our max number + // of pending opens, we'll disconnect + // this peer and exit. However, if we + // were previously connected to them, + // then we'll make sure to maintain the + // connection alive. + if alreadyConnected { + return + } + + err = a.cfg.DisconnectPeer( + pub, + ) + if err != nil { + log.Warnf("Unable to "+ + "disconnect peer "+ + "%x: %v", + pub.SerializeCompressed(), + err) + } + return + } + + nodeID := NewNodeID(directive.PeerKey) + pendingOpens[nodeID] = Channel{ + Capacity: directive.ChanAmt, + Node: nodeID, + } + pendingMtx.Unlock() + + // We can then begin the funding workflow with + // this peer. + err = a.cfg.ChanController.OpenChannel( + pub, directive.ChanAmt, + ) + if err != nil { + log.Warnf("Unable to open "+ + "channel to %x of %v: %v", + pub.SerializeCompressed(), + directive.ChanAmt, err) + + // As the attempt failed, we'll clear + // the peer from the set of pending + // opens and mark them as failed so we + // don't attempt to open a channel to + // them again. + pendingMtx.Lock() + delete(pendingOpens, nodeID) + failedNodes[nodeID] = struct{}{} + pendingMtx.Unlock() + + // Trigger the agent to re-evaluate + // everything and possibly retry with a + // different node. + a.OnChannelOpenFailure() + + // Finally, we should also disconnect + // the peer if we weren't already + // connected to them beforehand by an + // external subsystem. + if alreadyConnected { + return + } + + err = a.cfg.DisconnectPeer(pub) + if err != nil { + log.Warnf("Unable to "+ + "disconnect peer "+ + "%x: %v", + pub.SerializeCompressed(), + err) + } + } + + // Since the channel open was successful and is + // currently pending, we'll trigger the + // autopilot agent to query for more peers. + a.OnChannelPendingOpen() + }(chanCandidate) + } + pendingMtx.Unlock() + } } From 4a88c61a90135bb8eb685a4920fd377973590044 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 31 Aug 2018 08:55:41 +0200 Subject: [PATCH 2/5] autopilot/agent: signal nodeUpdates on own channel We do this to avoid a huge amount of goroutines piling up on initial graph sync, as they will all block trying to send the node update on the stateUpdates channel. Now we instead make a new buffered channel nodeUpdates, and just return immediately if there is already a signal in the channel waiting to be processed. --- autopilot/agent.go | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/autopilot/agent.go b/autopilot/agent.go index a401d247..1f04c9d6 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -117,6 +117,11 @@ type Agent struct { // affect the heuristics of the agent will be sent over. stateUpdates chan interface{} + // nodeUpdates is a channel that changes to the graph node landscape + // will be sent over. This channel will be buffered to ensure we have + // at most one pending update of this type to handle at a given time. + nodeUpdates chan *nodeUpdates + // totalBalance is the total number of satoshis the backing wallet is // known to control at any given instance. This value will be updated // when the agent receives external balance update signals. @@ -136,6 +141,7 @@ func New(cfg Config, initialState []Channel) (*Agent, error) { chanState: make(map[lnwire.ShortChannelID]Channel), quit: make(chan struct{}), stateUpdates: make(chan interface{}), + nodeUpdates: make(chan *nodeUpdates, 1), } for _, c := range initialState { @@ -223,15 +229,10 @@ func (a *Agent) OnBalanceChange() { // OnNodeUpdates is a callback that should be executed each time our channel // graph has new nodes or their node announcements are updated. func (a *Agent) OnNodeUpdates() { - a.wg.Add(1) - go func() { - defer a.wg.Done() - - select { - case a.stateUpdates <- &nodeUpdates{}: - case <-a.quit: - } - }() + select { + case a.nodeUpdates <- &nodeUpdates{}: + default: + } } // OnChannelOpen is a callback that should be executed each time a new channel @@ -430,16 +431,15 @@ func (a *Agent) controller() { } updateBalance() - - // New nodes have been added to the graph or their node - // announcements have been updated. We will consider - // opening channels to these nodes if we haven't - // stabilized. - case *nodeUpdates: - log.Infof("Node updates received, assessing " + - "need for more channels") } + // New nodes have been added to the graph or their node + // announcements have been updated. We will consider opening + // channels to these nodes if we haven't stabilized. + case <-a.nodeUpdates: + log.Infof("Node updates received, assessing " + + "need for more channels") + // The agent has been signalled to exit, so we'll bail out // immediately. case <-a.quit: From 186e6d4da4a41652f9f79500010ea0a58b2e7f4c Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 31 Aug 2018 14:45:00 +0200 Subject: [PATCH 3/5] autopilot/agent: signal chanOpenFailureUpdates on own channel We do this to avoid a huge amount of goroutines piling up when autopilot is trying to open many channels, as they will all block trying to send the update on the stateUpdates channel. Now we instead send them on a buffered channel, similar to what is done with the nodeUpdates. --- autopilot/agent.go | 46 ++++++++++++++++++++++++---------------------- 1 file changed, 24 insertions(+), 22 deletions(-) diff --git a/autopilot/agent.go b/autopilot/agent.go index 1f04c9d6..1b9bb233 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -122,6 +122,12 @@ type Agent struct { // at most one pending update of this type to handle at a given time. nodeUpdates chan *nodeUpdates + // chanOpenFailures is a channel where updates about channel open + // failures will be sent. This channel will be buffered to ensure we + // have at most one pending update of this type to handle at a given + // time. + chanOpenFailures chan *chanOpenFailureUpdate + // totalBalance is the total number of satoshis the backing wallet is // known to control at any given instance. This value will be updated // when the agent receives external balance update signals. @@ -137,11 +143,12 @@ type Agent struct { // the backing Lightning Node. func New(cfg Config, initialState []Channel) (*Agent, error) { a := &Agent{ - cfg: cfg, - chanState: make(map[lnwire.ShortChannelID]Channel), - quit: make(chan struct{}), - stateUpdates: make(chan interface{}), - nodeUpdates: make(chan *nodeUpdates, 1), + cfg: cfg, + chanState: make(map[lnwire.ShortChannelID]Channel), + quit: make(chan struct{}), + stateUpdates: make(chan interface{}), + nodeUpdates: make(chan *nodeUpdates, 1), + chanOpenFailures: make(chan *chanOpenFailureUpdate, 1), } for _, c := range initialState { @@ -265,15 +272,10 @@ func (a *Agent) OnChannelPendingOpen() { // autopilot has attempted to open a channel, but failed. In this case we can // retry channel creation with a different node. func (a *Agent) OnChannelOpenFailure() { - a.wg.Add(1) - go func() { - defer a.wg.Done() - - select { - case a.stateUpdates <- &chanOpenFailureUpdate{}: - case <-a.quit: - } - }() + select { + case a.chanOpenFailures <- &chanOpenFailureUpdate{}: + default: + } } // OnChannelClose is a callback that should be executed each time a prior @@ -388,14 +390,6 @@ func (a *Agent) controller() { updateBalance() - // The channel we tried to open previously failed for - // whatever reason. - case *chanOpenFailureUpdate: - log.Debug("Retrying after previous channel " + - "open failure.") - - updateBalance() - // A new channel has been opened successfully. This was // either opened by the Agent, or an external system // that is able to drive the Lightning Node. @@ -433,6 +427,14 @@ func (a *Agent) controller() { updateBalance() } + // The channel we tried to open previously failed for whatever + // reason. + case <-a.chanOpenFailures: + log.Debug("Retrying after previous channel open " + + "failure.") + + updateBalance() + // New nodes have been added to the graph or their node // announcements have been updated. We will consider opening // channels to these nodes if we haven't stabilized. From a9a9c9aeb431240f944c5020bc6c736d7fb50adb Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 31 Aug 2018 14:55:07 +0200 Subject: [PATCH 4/5] autopilot/agent: signal chanPendingOpenUpdates on own channel --- autopilot/agent.go | 41 ++++++++++++++++++++++------------------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/autopilot/agent.go b/autopilot/agent.go index 1b9bb233..2d0ebf5b 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -122,6 +122,12 @@ type Agent struct { // at most one pending update of this type to handle at a given time. nodeUpdates chan *nodeUpdates + // pendingOpenUpdates is a channel where updates about channel pending + // opening will be sent. This channel will be buffered to ensure we + // have at most one pending update of this type to handle at a given + // time. + pendingOpenUpdates chan *chanPendingOpenUpdate + // chanOpenFailures is a channel where updates about channel open // failures will be sent. This channel will be buffered to ensure we // have at most one pending update of this type to handle at a given @@ -143,12 +149,13 @@ type Agent struct { // the backing Lightning Node. func New(cfg Config, initialState []Channel) (*Agent, error) { a := &Agent{ - cfg: cfg, - chanState: make(map[lnwire.ShortChannelID]Channel), - quit: make(chan struct{}), - stateUpdates: make(chan interface{}), - nodeUpdates: make(chan *nodeUpdates, 1), - chanOpenFailures: make(chan *chanOpenFailureUpdate, 1), + cfg: cfg, + chanState: make(map[lnwire.ShortChannelID]Channel), + quit: make(chan struct{}), + stateUpdates: make(chan interface{}), + nodeUpdates: make(chan *nodeUpdates, 1), + chanOpenFailures: make(chan *chanOpenFailureUpdate, 1), + pendingOpenUpdates: make(chan *chanPendingOpenUpdate, 1), } for _, c := range initialState { @@ -260,12 +267,10 @@ func (a *Agent) OnChannelOpen(c Channel) { // channel is opened, either by the agent or an external subsystems, but is // still pending. func (a *Agent) OnChannelPendingOpen() { - go func() { - select { - case a.stateUpdates <- &chanPendingOpenUpdate{}: - case <-a.quit: - } - }() + select { + case a.pendingOpenUpdates <- &chanPendingOpenUpdate{}: + default: + } } // OnChannelOpenFailure is a callback that should be executed when the @@ -406,13 +411,6 @@ func (a *Agent) controller() { pendingMtx.Unlock() updateBalance() - - // A new channel has been opened by the agent or an - // external subsystem, but is still pending - // confirmation. - case *chanPendingOpenUpdate: - updateBalance() - // A channel has been closed, this may free up an // available slot, triggering a new channel update. case *chanCloseUpdate: @@ -427,6 +425,11 @@ func (a *Agent) controller() { updateBalance() } + // A new channel has been opened by the agent or an external + // subsystem, but is still pending confirmation. + case <-a.pendingOpenUpdates: + updateBalance() + // The channel we tried to open previously failed for whatever // reason. case <-a.chanOpenFailures: From 0d4df54118ba6e72a5f67a07ea86111dc2c12036 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Tue, 4 Sep 2018 10:16:02 +0200 Subject: [PATCH 5/5] autopilot/agent: signal balanceUpdates on own channel --- autopilot/agent.go | 42 +++++++++++++++++++++--------------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/autopilot/agent.go b/autopilot/agent.go index 2d0ebf5b..8cd93be6 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -117,6 +117,12 @@ type Agent struct { // affect the heuristics of the agent will be sent over. stateUpdates chan interface{} + // balanceUpdates is a channel where notifications about updates to the + // wallet's balance will be sent. This channel will be buffered to + // ensure we have at most one pending update of this type to handle at + // a given time. + balanceUpdates chan *balanceUpdate + // nodeUpdates is a channel that changes to the graph node landscape // will be sent over. This channel will be buffered to ensure we have // at most one pending update of this type to handle at a given time. @@ -153,6 +159,7 @@ func New(cfg Config, initialState []Channel) (*Agent, error) { chanState: make(map[lnwire.ShortChannelID]Channel), quit: make(chan struct{}), stateUpdates: make(chan interface{}), + balanceUpdates: make(chan *balanceUpdate, 1), nodeUpdates: make(chan *nodeUpdates, 1), chanOpenFailures: make(chan *chanOpenFailureUpdate, 1), pendingOpenUpdates: make(chan *chanPendingOpenUpdate, 1), @@ -226,18 +233,13 @@ type chanCloseUpdate struct { closedChans []lnwire.ShortChannelID } -// OnBalanceChange is a callback that should be executed each time the balance of -// the backing wallet changes. +// OnBalanceChange is a callback that should be executed each time the balance +// of the backing wallet changes. func (a *Agent) OnBalanceChange() { - a.wg.Add(1) - go func() { - defer a.wg.Done() - - select { - case a.stateUpdates <- &balanceUpdate{}: - case <-a.quit: - } - }() + select { + case a.balanceUpdates <- &balanceUpdate{}: + default: + } } // OnNodeUpdates is a callback that should be executed each time our channel @@ -385,16 +387,6 @@ func (a *Agent) controller() { log.Infof("Processing new external signal") switch update := signal.(type) { - // The balance of the backing wallet has changed, if - // more funds are now available, we may attempt to open - // up an additional channel, or splice in funds to an - // existing one. - case *balanceUpdate: - log.Debug("Applying external balance state " + - "update") - - updateBalance() - // A new channel has been opened successfully. This was // either opened by the Agent, or an external system // that is able to drive the Lightning Node. @@ -430,6 +422,14 @@ func (a *Agent) controller() { case <-a.pendingOpenUpdates: updateBalance() + // The balance of the backing wallet has changed, if more funds + // are now available, we may attempt to open up an additional + // channel, or splice in funds to an existing one. + case <-a.balanceUpdates: + log.Debug("Applying external balance state update") + + updateBalance() + // The channel we tried to open previously failed for whatever // reason. case <-a.chanOpenFailures: