diff --git a/autopilot/agent.go b/autopilot/agent.go index 6f137583..8cd93be6 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -117,6 +117,29 @@ type Agent struct { // affect the heuristics of the agent will be sent over. stateUpdates chan interface{} + // balanceUpdates is a channel where notifications about updates to the + // wallet's balance will be sent. This channel will be buffered to + // ensure we have at most one pending update of this type to handle at + // a given time. + balanceUpdates chan *balanceUpdate + + // nodeUpdates is a channel that changes to the graph node landscape + // will be sent over. This channel will be buffered to ensure we have + // at most one pending update of this type to handle at a given time. + nodeUpdates chan *nodeUpdates + + // pendingOpenUpdates is a channel where updates about channel pending + // opening will be sent. This channel will be buffered to ensure we + // have at most one pending update of this type to handle at a given + // time. + pendingOpenUpdates chan *chanPendingOpenUpdate + + // chanOpenFailures is a channel where updates about channel open + // failures will be sent. This channel will be buffered to ensure we + // have at most one pending update of this type to handle at a given + // time. + chanOpenFailures chan *chanOpenFailureUpdate + // totalBalance is the total number of satoshis the backing wallet is // known to control at any given instance. This value will be updated // when the agent receives external balance update signals. @@ -132,10 +155,14 @@ type Agent struct { // the backing Lightning Node. func New(cfg Config, initialState []Channel) (*Agent, error) { a := &Agent{ - cfg: cfg, - chanState: make(map[lnwire.ShortChannelID]Channel), - quit: make(chan struct{}), - stateUpdates: make(chan interface{}), + cfg: cfg, + chanState: make(map[lnwire.ShortChannelID]Channel), + quit: make(chan struct{}), + stateUpdates: make(chan interface{}), + balanceUpdates: make(chan *balanceUpdate, 1), + nodeUpdates: make(chan *nodeUpdates, 1), + chanOpenFailures: make(chan *chanOpenFailureUpdate, 1), + pendingOpenUpdates: make(chan *chanPendingOpenUpdate, 1), } for _, c := range initialState { @@ -206,32 +233,22 @@ type chanCloseUpdate struct { closedChans []lnwire.ShortChannelID } -// OnBalanceChange is a callback that should be executed each time the balance of -// the backing wallet changes. +// OnBalanceChange is a callback that should be executed each time the balance +// of the backing wallet changes. func (a *Agent) OnBalanceChange() { - a.wg.Add(1) - go func() { - defer a.wg.Done() - - select { - case a.stateUpdates <- &balanceUpdate{}: - case <-a.quit: - } - }() + select { + case a.balanceUpdates <- &balanceUpdate{}: + default: + } } // OnNodeUpdates is a callback that should be executed each time our channel // graph has new nodes or their node announcements are updated. func (a *Agent) OnNodeUpdates() { - a.wg.Add(1) - go func() { - defer a.wg.Done() - - select { - case a.stateUpdates <- &nodeUpdates{}: - case <-a.quit: - } - }() + select { + case a.nodeUpdates <- &nodeUpdates{}: + default: + } } // OnChannelOpen is a callback that should be executed each time a new channel @@ -252,27 +269,20 @@ func (a *Agent) OnChannelOpen(c Channel) { // channel is opened, either by the agent or an external subsystems, but is // still pending. func (a *Agent) OnChannelPendingOpen() { - go func() { - select { - case a.stateUpdates <- &chanPendingOpenUpdate{}: - case <-a.quit: - } - }() + select { + case a.pendingOpenUpdates <- &chanPendingOpenUpdate{}: + default: + } } // OnChannelOpenFailure is a callback that should be executed when the // autopilot has attempted to open a channel, but failed. In this case we can // retry channel creation with a different node. func (a *Agent) OnChannelOpenFailure() { - a.wg.Add(1) - go func() { - defer a.wg.Done() - - select { - case a.stateUpdates <- &chanOpenFailureUpdate{}: - case <-a.quit: - } - }() + select { + case a.chanOpenFailures <- &chanOpenFailureUpdate{}: + default: + } } // OnChannelClose is a callback that should be executed each time a prior @@ -377,24 +387,6 @@ func (a *Agent) controller() { log.Infof("Processing new external signal") switch update := signal.(type) { - // The balance of the backing wallet has changed, if - // more funds are now available, we may attempt to open - // up an additional channel, or splice in funds to an - // existing one. - case *balanceUpdate: - log.Debug("Applying external balance state " + - "update") - - updateBalance() - - // The channel we tried to open previously failed for - // whatever reason. - case *chanOpenFailureUpdate: - log.Debug("Retrying after previous channel " + - "open failure.") - - updateBalance() - // A new channel has been opened successfully. This was // either opened by the Agent, or an external system // that is able to drive the Lightning Node. @@ -411,13 +403,6 @@ func (a *Agent) controller() { pendingMtx.Unlock() updateBalance() - - // A new channel has been opened by the agent or an - // external subsystem, but is still pending - // confirmation. - case *chanPendingOpenUpdate: - updateBalance() - // A channel has been closed, this may free up an // available slot, triggering a new channel update. case *chanCloseUpdate: @@ -430,232 +415,241 @@ func (a *Agent) controller() { } updateBalance() - - // New nodes have been added to the graph or their node - // announcements have been updated. We will consider - // opening channels to these nodes if we haven't - // stabilized. - case *nodeUpdates: - log.Infof("Node updates received, assessing " + - "need for more channels") } - pendingMtx.Lock() - log.Debugf("Pending channels: %v", spew.Sdump(pendingOpens)) - pendingMtx.Unlock() + // A new channel has been opened by the agent or an external + // subsystem, but is still pending confirmation. + case <-a.pendingOpenUpdates: + updateBalance() - // With all the updates applied, we'll obtain a set of - // the current active channels (confirmed channels), - // and also factor in our set of unconfirmed channels. - confirmedChans := a.chanState - pendingMtx.Lock() - totalChans := mergeChanState(pendingOpens, confirmedChans) - pendingMtx.Unlock() + // The balance of the backing wallet has changed, if more funds + // are now available, we may attempt to open up an additional + // channel, or splice in funds to an existing one. + case <-a.balanceUpdates: + log.Debug("Applying external balance state update") - // Now that we've updated our internal state, we'll - // consult our channel attachment heuristic to - // determine if we should open up any additional - // channels or modify existing channels. - availableFunds, numChans, needMore := a.cfg.Heuristic.NeedMoreChans( - totalChans, a.totalBalance, - ) - if !needMore { - continue - } + updateBalance() - log.Infof("Triggering attachment directive dispatch, "+ - "total_funds=%v", a.totalBalance) + // The channel we tried to open previously failed for whatever + // reason. + case <-a.chanOpenFailures: + log.Debug("Retrying after previous channel open " + + "failure.") - // We're to attempt an attachment so we'll o obtain the - // set of nodes that we currently have channels with so - // we avoid duplicate edges. - connectedNodes := a.chanState.ConnectedNodes() - pendingMtx.Lock() - nodesToSkip := mergeNodeMaps(connectedNodes, failedNodes, pendingOpens) - pendingMtx.Unlock() + updateBalance() - // If we reach this point, then according to our - // heuristic we should modify our channel state to tend - // towards what it determines to the optimal state. So - // we'll call Select to get a fresh batch of attachment - // directives, passing in the amount of funds available - // for us to use. - chanCandidates, err := a.cfg.Heuristic.Select( - a.cfg.Self, a.cfg.Graph, availableFunds, - numChans, nodesToSkip, - ) - if err != nil { - log.Errorf("Unable to select candidates for "+ - "attachment: %v", err) - continue - } - - if len(chanCandidates) == 0 { - log.Infof("No eligible candidates to connect to") - continue - } - - log.Infof("Attempting to execute channel attachment "+ - "directives: %v", spew.Sdump(chanCandidates)) - - // For each recommended attachment directive, we'll - // launch a new goroutine to attempt to carry out the - // directive. If any of these succeed, then we'll - // receive a new state update, taking us back to the - // top of our controller loop. - pendingMtx.Lock() - for _, chanCandidate := range chanCandidates { - // Before we proceed, we'll check to see if - // this attempt would take us past the total - // number of allowed pending opens. If so, then - // we'll skip this round and wait for an - // attempt to either fail or succeed. - if uint16(len(pendingOpens))+1 > - a.cfg.MaxPendingOpens { - - log.Debugf("Reached cap of %v pending "+ - "channel opens, will retry "+ - "after success/failure", - a.cfg.MaxPendingOpens) - continue - } - - go func(directive AttachmentDirective) { - // We'll start out by attempting to - // connect to the peer in order to begin - // the funding workflow. - pub := directive.PeerKey - alreadyConnected, err := a.cfg.ConnectToPeer( - pub, directive.Addrs, - ) - if err != nil { - log.Warnf("Unable to connect "+ - "to %x: %v", - pub.SerializeCompressed(), - err) - - // Since we failed to connect to - // them, we'll mark them as - // failed so that we don't - // attempt to connect to them - // again. - nodeID := NewNodeID(pub) - pendingMtx.Lock() - failedNodes[nodeID] = struct{}{} - pendingMtx.Unlock() - - // Finally, we'll trigger the - // agent to select new peers to - // connect to. - a.OnChannelOpenFailure() - - return - } - - // If we were succesful, we'll track - // this peer in our set of pending - // opens. We do this here to ensure we - // don't stall on selecting new peers if - // the connection attempt happens to - // take too long. - pendingMtx.Lock() - if uint16(len(pendingOpens))+1 > - a.cfg.MaxPendingOpens { - - pendingMtx.Unlock() - - // Since we've reached our max - // number of pending opens, - // we'll disconnect this peer - // and exit. However, if we were - // previously connected to them, - // then we'll make sure to - // maintain the connection - // alive. - if alreadyConnected { - return - } - - err = a.cfg.DisconnectPeer( - pub, - ) - if err != nil { - log.Warnf("Unable to "+ - "disconnect peer "+ - "%x: %v", - pub.SerializeCompressed(), - err) - } - return - } - - nodeID := NewNodeID(directive.PeerKey) - pendingOpens[nodeID] = Channel{ - Capacity: directive.ChanAmt, - Node: nodeID, - } - pendingMtx.Unlock() - - // We can then begin the funding - // workflow with this peer. - err = a.cfg.ChanController.OpenChannel( - pub, directive.ChanAmt, - ) - if err != nil { - log.Warnf("Unable to open "+ - "channel to %x of %v: %v", - pub.SerializeCompressed(), - directive.ChanAmt, err) - - // As the attempt failed, we'll - // clear the peer from the set of - // pending opens and mark them - // as failed so we don't attempt - // to open a channel to them - // again. - pendingMtx.Lock() - delete(pendingOpens, nodeID) - failedNodes[nodeID] = struct{}{} - pendingMtx.Unlock() - - // Trigger the agent to - // re-evaluate everything and - // possibly retry with a - // different node. - a.OnChannelOpenFailure() - - // Finally, we should also - // disconnect the peer if we - // weren't already connected to - // them beforehand by an - // external subsystem. - if alreadyConnected { - return - } - - err = a.cfg.DisconnectPeer(pub) - if err != nil { - log.Warnf("Unable to "+ - "disconnect peer "+ - "%x: %v", - pub.SerializeCompressed(), - err) - } - } - - // Since the channel open was successful - // and is currently pending, we'll - // trigger the autopilot agent to query - // for more peers. - a.OnChannelPendingOpen() - }(chanCandidate) - } - pendingMtx.Unlock() + // New nodes have been added to the graph or their node + // announcements have been updated. We will consider opening + // channels to these nodes if we haven't stabilized. + case <-a.nodeUpdates: + log.Infof("Node updates received, assessing " + + "need for more channels") // The agent has been signalled to exit, so we'll bail out // immediately. case <-a.quit: return } + + pendingMtx.Lock() + log.Debugf("Pending channels: %v", spew.Sdump(pendingOpens)) + pendingMtx.Unlock() + + // With all the updates applied, we'll obtain a set of the + // current active channels (confirmed channels), and also + // factor in our set of unconfirmed channels. + confirmedChans := a.chanState + pendingMtx.Lock() + totalChans := mergeChanState(pendingOpens, confirmedChans) + pendingMtx.Unlock() + + // Now that we've updated our internal state, we'll consult our + // channel attachment heuristic to determine if we should open + // up any additional channels or modify existing channels. + availableFunds, numChans, needMore := a.cfg.Heuristic.NeedMoreChans( + totalChans, a.totalBalance, + ) + if !needMore { + continue + } + + log.Infof("Triggering attachment directive dispatch, "+ + "total_funds=%v", a.totalBalance) + + // We're to attempt an attachment so we'll o obtain the set of + // nodes that we currently have channels with so we avoid + // duplicate edges. + connectedNodes := a.chanState.ConnectedNodes() + pendingMtx.Lock() + nodesToSkip := mergeNodeMaps(connectedNodes, failedNodes, pendingOpens) + pendingMtx.Unlock() + + // If we reach this point, then according to our heuristic we + // should modify our channel state to tend towards what it + // determines to the optimal state. So we'll call Select to get + // a fresh batch of attachment directives, passing in the + // amount of funds available for us to use. + chanCandidates, err := a.cfg.Heuristic.Select( + a.cfg.Self, a.cfg.Graph, availableFunds, + numChans, nodesToSkip, + ) + if err != nil { + log.Errorf("Unable to select candidates for "+ + "attachment: %v", err) + continue + } + + if len(chanCandidates) == 0 { + log.Infof("No eligible candidates to connect to") + continue + } + + log.Infof("Attempting to execute channel attachment "+ + "directives: %v", spew.Sdump(chanCandidates)) + + // For each recommended attachment directive, we'll launch a + // new goroutine to attempt to carry out the directive. If any + // of these succeed, then we'll receive a new state update, + // taking us back to the top of our controller loop. + pendingMtx.Lock() + for _, chanCandidate := range chanCandidates { + // Before we proceed, we'll check to see if this + // attempt would take us past the total number of + // allowed pending opens. If so, then we'll skip this + // round and wait for an attempt to either fail or + // succeed. + if uint16(len(pendingOpens))+1 > + a.cfg.MaxPendingOpens { + + log.Debugf("Reached cap of %v pending "+ + "channel opens, will retry "+ + "after success/failure", + a.cfg.MaxPendingOpens) + continue + } + + go func(directive AttachmentDirective) { + // We'll start out by attempting to connect to + // the peer in order to begin the funding + // workflow. + pub := directive.PeerKey + alreadyConnected, err := a.cfg.ConnectToPeer( + pub, directive.Addrs, + ) + if err != nil { + log.Warnf("Unable to connect "+ + "to %x: %v", + pub.SerializeCompressed(), + err) + + // Since we failed to connect to them, + // we'll mark them as failed so that we + // don't attempt to connect to them + // again. + nodeID := NewNodeID(pub) + pendingMtx.Lock() + failedNodes[nodeID] = struct{}{} + pendingMtx.Unlock() + + // Finally, we'll trigger the agent to + // select new peers to connect to. + a.OnChannelOpenFailure() + + return + } + + // If we were succesful, we'll track this peer + // in our set of pending opens. We do this here + // to ensure we don't stall on selecting new + // peers if the connection attempt happens to + // take too long. + pendingMtx.Lock() + if uint16(len(pendingOpens))+1 > + a.cfg.MaxPendingOpens { + + pendingMtx.Unlock() + + // Since we've reached our max number + // of pending opens, we'll disconnect + // this peer and exit. However, if we + // were previously connected to them, + // then we'll make sure to maintain the + // connection alive. + if alreadyConnected { + return + } + + err = a.cfg.DisconnectPeer( + pub, + ) + if err != nil { + log.Warnf("Unable to "+ + "disconnect peer "+ + "%x: %v", + pub.SerializeCompressed(), + err) + } + return + } + + nodeID := NewNodeID(directive.PeerKey) + pendingOpens[nodeID] = Channel{ + Capacity: directive.ChanAmt, + Node: nodeID, + } + pendingMtx.Unlock() + + // We can then begin the funding workflow with + // this peer. + err = a.cfg.ChanController.OpenChannel( + pub, directive.ChanAmt, + ) + if err != nil { + log.Warnf("Unable to open "+ + "channel to %x of %v: %v", + pub.SerializeCompressed(), + directive.ChanAmt, err) + + // As the attempt failed, we'll clear + // the peer from the set of pending + // opens and mark them as failed so we + // don't attempt to open a channel to + // them again. + pendingMtx.Lock() + delete(pendingOpens, nodeID) + failedNodes[nodeID] = struct{}{} + pendingMtx.Unlock() + + // Trigger the agent to re-evaluate + // everything and possibly retry with a + // different node. + a.OnChannelOpenFailure() + + // Finally, we should also disconnect + // the peer if we weren't already + // connected to them beforehand by an + // external subsystem. + if alreadyConnected { + return + } + + err = a.cfg.DisconnectPeer(pub) + if err != nil { + log.Warnf("Unable to "+ + "disconnect peer "+ + "%x: %v", + pub.SerializeCompressed(), + err) + } + } + + // Since the channel open was successful and is + // currently pending, we'll trigger the + // autopilot agent to query for more peers. + a.OnChannelPendingOpen() + }(chanCandidate) + } + pendingMtx.Unlock() + } }