Merge pull request #1817 from halseth/autopilot-node-update

Reduce goroutine overhead on autopilot updates
This commit is contained in:
Olaoluwa Osuntokun 2018-09-05 18:08:37 -07:00 committed by GitHub
commit b0347879db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -117,6 +117,29 @@ type Agent struct {
// affect the heuristics of the agent will be sent over.
stateUpdates chan interface{}
// balanceUpdates is a channel where notifications about updates to the
// wallet's balance will be sent. This channel will be buffered to
// ensure we have at most one pending update of this type to handle at
// a given time.
balanceUpdates chan *balanceUpdate
// nodeUpdates is a channel that changes to the graph node landscape
// will be sent over. This channel will be buffered to ensure we have
// at most one pending update of this type to handle at a given time.
nodeUpdates chan *nodeUpdates
// pendingOpenUpdates is a channel where updates about channel pending
// opening will be sent. This channel will be buffered to ensure we
// have at most one pending update of this type to handle at a given
// time.
pendingOpenUpdates chan *chanPendingOpenUpdate
// chanOpenFailures is a channel where updates about channel open
// failures will be sent. This channel will be buffered to ensure we
// have at most one pending update of this type to handle at a given
// time.
chanOpenFailures chan *chanOpenFailureUpdate
// totalBalance is the total number of satoshis the backing wallet is
// known to control at any given instance. This value will be updated
// when the agent receives external balance update signals.
@ -132,10 +155,14 @@ type Agent struct {
// the backing Lightning Node.
func New(cfg Config, initialState []Channel) (*Agent, error) {
a := &Agent{
cfg: cfg,
chanState: make(map[lnwire.ShortChannelID]Channel),
quit: make(chan struct{}),
stateUpdates: make(chan interface{}),
cfg: cfg,
chanState: make(map[lnwire.ShortChannelID]Channel),
quit: make(chan struct{}),
stateUpdates: make(chan interface{}),
balanceUpdates: make(chan *balanceUpdate, 1),
nodeUpdates: make(chan *nodeUpdates, 1),
chanOpenFailures: make(chan *chanOpenFailureUpdate, 1),
pendingOpenUpdates: make(chan *chanPendingOpenUpdate, 1),
}
for _, c := range initialState {
@ -206,32 +233,22 @@ type chanCloseUpdate struct {
closedChans []lnwire.ShortChannelID
}
// OnBalanceChange is a callback that should be executed each time the balance of
// the backing wallet changes.
// OnBalanceChange is a callback that should be executed each time the balance
// of the backing wallet changes.
func (a *Agent) OnBalanceChange() {
a.wg.Add(1)
go func() {
defer a.wg.Done()
select {
case a.stateUpdates <- &balanceUpdate{}:
case <-a.quit:
}
}()
select {
case a.balanceUpdates <- &balanceUpdate{}:
default:
}
}
// OnNodeUpdates is a callback that should be executed each time our channel
// graph has new nodes or their node announcements are updated.
func (a *Agent) OnNodeUpdates() {
a.wg.Add(1)
go func() {
defer a.wg.Done()
select {
case a.stateUpdates <- &nodeUpdates{}:
case <-a.quit:
}
}()
select {
case a.nodeUpdates <- &nodeUpdates{}:
default:
}
}
// OnChannelOpen is a callback that should be executed each time a new channel
@ -252,27 +269,20 @@ func (a *Agent) OnChannelOpen(c Channel) {
// channel is opened, either by the agent or an external subsystems, but is
// still pending.
func (a *Agent) OnChannelPendingOpen() {
go func() {
select {
case a.stateUpdates <- &chanPendingOpenUpdate{}:
case <-a.quit:
}
}()
select {
case a.pendingOpenUpdates <- &chanPendingOpenUpdate{}:
default:
}
}
// OnChannelOpenFailure is a callback that should be executed when the
// autopilot has attempted to open a channel, but failed. In this case we can
// retry channel creation with a different node.
func (a *Agent) OnChannelOpenFailure() {
a.wg.Add(1)
go func() {
defer a.wg.Done()
select {
case a.stateUpdates <- &chanOpenFailureUpdate{}:
case <-a.quit:
}
}()
select {
case a.chanOpenFailures <- &chanOpenFailureUpdate{}:
default:
}
}
// OnChannelClose is a callback that should be executed each time a prior
@ -377,24 +387,6 @@ func (a *Agent) controller() {
log.Infof("Processing new external signal")
switch update := signal.(type) {
// The balance of the backing wallet has changed, if
// more funds are now available, we may attempt to open
// up an additional channel, or splice in funds to an
// existing one.
case *balanceUpdate:
log.Debug("Applying external balance state " +
"update")
updateBalance()
// The channel we tried to open previously failed for
// whatever reason.
case *chanOpenFailureUpdate:
log.Debug("Retrying after previous channel " +
"open failure.")
updateBalance()
// A new channel has been opened successfully. This was
// either opened by the Agent, or an external system
// that is able to drive the Lightning Node.
@ -411,13 +403,6 @@ func (a *Agent) controller() {
pendingMtx.Unlock()
updateBalance()
// A new channel has been opened by the agent or an
// external subsystem, but is still pending
// confirmation.
case *chanPendingOpenUpdate:
updateBalance()
// A channel has been closed, this may free up an
// available slot, triggering a new channel update.
case *chanCloseUpdate:
@ -430,232 +415,241 @@ func (a *Agent) controller() {
}
updateBalance()
// New nodes have been added to the graph or their node
// announcements have been updated. We will consider
// opening channels to these nodes if we haven't
// stabilized.
case *nodeUpdates:
log.Infof("Node updates received, assessing " +
"need for more channels")
}
pendingMtx.Lock()
log.Debugf("Pending channels: %v", spew.Sdump(pendingOpens))
pendingMtx.Unlock()
// A new channel has been opened by the agent or an external
// subsystem, but is still pending confirmation.
case <-a.pendingOpenUpdates:
updateBalance()
// With all the updates applied, we'll obtain a set of
// the current active channels (confirmed channels),
// and also factor in our set of unconfirmed channels.
confirmedChans := a.chanState
pendingMtx.Lock()
totalChans := mergeChanState(pendingOpens, confirmedChans)
pendingMtx.Unlock()
// The balance of the backing wallet has changed, if more funds
// are now available, we may attempt to open up an additional
// channel, or splice in funds to an existing one.
case <-a.balanceUpdates:
log.Debug("Applying external balance state update")
// Now that we've updated our internal state, we'll
// consult our channel attachment heuristic to
// determine if we should open up any additional
// channels or modify existing channels.
availableFunds, numChans, needMore := a.cfg.Heuristic.NeedMoreChans(
totalChans, a.totalBalance,
)
if !needMore {
continue
}
updateBalance()
log.Infof("Triggering attachment directive dispatch, "+
"total_funds=%v", a.totalBalance)
// The channel we tried to open previously failed for whatever
// reason.
case <-a.chanOpenFailures:
log.Debug("Retrying after previous channel open " +
"failure.")
// We're to attempt an attachment so we'll o obtain the
// set of nodes that we currently have channels with so
// we avoid duplicate edges.
connectedNodes := a.chanState.ConnectedNodes()
pendingMtx.Lock()
nodesToSkip := mergeNodeMaps(connectedNodes, failedNodes, pendingOpens)
pendingMtx.Unlock()
updateBalance()
// If we reach this point, then according to our
// heuristic we should modify our channel state to tend
// towards what it determines to the optimal state. So
// we'll call Select to get a fresh batch of attachment
// directives, passing in the amount of funds available
// for us to use.
chanCandidates, err := a.cfg.Heuristic.Select(
a.cfg.Self, a.cfg.Graph, availableFunds,
numChans, nodesToSkip,
)
if err != nil {
log.Errorf("Unable to select candidates for "+
"attachment: %v", err)
continue
}
if len(chanCandidates) == 0 {
log.Infof("No eligible candidates to connect to")
continue
}
log.Infof("Attempting to execute channel attachment "+
"directives: %v", spew.Sdump(chanCandidates))
// For each recommended attachment directive, we'll
// launch a new goroutine to attempt to carry out the
// directive. If any of these succeed, then we'll
// receive a new state update, taking us back to the
// top of our controller loop.
pendingMtx.Lock()
for _, chanCandidate := range chanCandidates {
// Before we proceed, we'll check to see if
// this attempt would take us past the total
// number of allowed pending opens. If so, then
// we'll skip this round and wait for an
// attempt to either fail or succeed.
if uint16(len(pendingOpens))+1 >
a.cfg.MaxPendingOpens {
log.Debugf("Reached cap of %v pending "+
"channel opens, will retry "+
"after success/failure",
a.cfg.MaxPendingOpens)
continue
}
go func(directive AttachmentDirective) {
// We'll start out by attempting to
// connect to the peer in order to begin
// the funding workflow.
pub := directive.PeerKey
alreadyConnected, err := a.cfg.ConnectToPeer(
pub, directive.Addrs,
)
if err != nil {
log.Warnf("Unable to connect "+
"to %x: %v",
pub.SerializeCompressed(),
err)
// Since we failed to connect to
// them, we'll mark them as
// failed so that we don't
// attempt to connect to them
// again.
nodeID := NewNodeID(pub)
pendingMtx.Lock()
failedNodes[nodeID] = struct{}{}
pendingMtx.Unlock()
// Finally, we'll trigger the
// agent to select new peers to
// connect to.
a.OnChannelOpenFailure()
return
}
// If we were succesful, we'll track
// this peer in our set of pending
// opens. We do this here to ensure we
// don't stall on selecting new peers if
// the connection attempt happens to
// take too long.
pendingMtx.Lock()
if uint16(len(pendingOpens))+1 >
a.cfg.MaxPendingOpens {
pendingMtx.Unlock()
// Since we've reached our max
// number of pending opens,
// we'll disconnect this peer
// and exit. However, if we were
// previously connected to them,
// then we'll make sure to
// maintain the connection
// alive.
if alreadyConnected {
return
}
err = a.cfg.DisconnectPeer(
pub,
)
if err != nil {
log.Warnf("Unable to "+
"disconnect peer "+
"%x: %v",
pub.SerializeCompressed(),
err)
}
return
}
nodeID := NewNodeID(directive.PeerKey)
pendingOpens[nodeID] = Channel{
Capacity: directive.ChanAmt,
Node: nodeID,
}
pendingMtx.Unlock()
// We can then begin the funding
// workflow with this peer.
err = a.cfg.ChanController.OpenChannel(
pub, directive.ChanAmt,
)
if err != nil {
log.Warnf("Unable to open "+
"channel to %x of %v: %v",
pub.SerializeCompressed(),
directive.ChanAmt, err)
// As the attempt failed, we'll
// clear the peer from the set of
// pending opens and mark them
// as failed so we don't attempt
// to open a channel to them
// again.
pendingMtx.Lock()
delete(pendingOpens, nodeID)
failedNodes[nodeID] = struct{}{}
pendingMtx.Unlock()
// Trigger the agent to
// re-evaluate everything and
// possibly retry with a
// different node.
a.OnChannelOpenFailure()
// Finally, we should also
// disconnect the peer if we
// weren't already connected to
// them beforehand by an
// external subsystem.
if alreadyConnected {
return
}
err = a.cfg.DisconnectPeer(pub)
if err != nil {
log.Warnf("Unable to "+
"disconnect peer "+
"%x: %v",
pub.SerializeCompressed(),
err)
}
}
// Since the channel open was successful
// and is currently pending, we'll
// trigger the autopilot agent to query
// for more peers.
a.OnChannelPendingOpen()
}(chanCandidate)
}
pendingMtx.Unlock()
// New nodes have been added to the graph or their node
// announcements have been updated. We will consider opening
// channels to these nodes if we haven't stabilized.
case <-a.nodeUpdates:
log.Infof("Node updates received, assessing " +
"need for more channels")
// The agent has been signalled to exit, so we'll bail out
// immediately.
case <-a.quit:
return
}
pendingMtx.Lock()
log.Debugf("Pending channels: %v", spew.Sdump(pendingOpens))
pendingMtx.Unlock()
// With all the updates applied, we'll obtain a set of the
// current active channels (confirmed channels), and also
// factor in our set of unconfirmed channels.
confirmedChans := a.chanState
pendingMtx.Lock()
totalChans := mergeChanState(pendingOpens, confirmedChans)
pendingMtx.Unlock()
// Now that we've updated our internal state, we'll consult our
// channel attachment heuristic to determine if we should open
// up any additional channels or modify existing channels.
availableFunds, numChans, needMore := a.cfg.Heuristic.NeedMoreChans(
totalChans, a.totalBalance,
)
if !needMore {
continue
}
log.Infof("Triggering attachment directive dispatch, "+
"total_funds=%v", a.totalBalance)
// We're to attempt an attachment so we'll o obtain the set of
// nodes that we currently have channels with so we avoid
// duplicate edges.
connectedNodes := a.chanState.ConnectedNodes()
pendingMtx.Lock()
nodesToSkip := mergeNodeMaps(connectedNodes, failedNodes, pendingOpens)
pendingMtx.Unlock()
// If we reach this point, then according to our heuristic we
// should modify our channel state to tend towards what it
// determines to the optimal state. So we'll call Select to get
// a fresh batch of attachment directives, passing in the
// amount of funds available for us to use.
chanCandidates, err := a.cfg.Heuristic.Select(
a.cfg.Self, a.cfg.Graph, availableFunds,
numChans, nodesToSkip,
)
if err != nil {
log.Errorf("Unable to select candidates for "+
"attachment: %v", err)
continue
}
if len(chanCandidates) == 0 {
log.Infof("No eligible candidates to connect to")
continue
}
log.Infof("Attempting to execute channel attachment "+
"directives: %v", spew.Sdump(chanCandidates))
// For each recommended attachment directive, we'll launch a
// new goroutine to attempt to carry out the directive. If any
// of these succeed, then we'll receive a new state update,
// taking us back to the top of our controller loop.
pendingMtx.Lock()
for _, chanCandidate := range chanCandidates {
// Before we proceed, we'll check to see if this
// attempt would take us past the total number of
// allowed pending opens. If so, then we'll skip this
// round and wait for an attempt to either fail or
// succeed.
if uint16(len(pendingOpens))+1 >
a.cfg.MaxPendingOpens {
log.Debugf("Reached cap of %v pending "+
"channel opens, will retry "+
"after success/failure",
a.cfg.MaxPendingOpens)
continue
}
go func(directive AttachmentDirective) {
// We'll start out by attempting to connect to
// the peer in order to begin the funding
// workflow.
pub := directive.PeerKey
alreadyConnected, err := a.cfg.ConnectToPeer(
pub, directive.Addrs,
)
if err != nil {
log.Warnf("Unable to connect "+
"to %x: %v",
pub.SerializeCompressed(),
err)
// Since we failed to connect to them,
// we'll mark them as failed so that we
// don't attempt to connect to them
// again.
nodeID := NewNodeID(pub)
pendingMtx.Lock()
failedNodes[nodeID] = struct{}{}
pendingMtx.Unlock()
// Finally, we'll trigger the agent to
// select new peers to connect to.
a.OnChannelOpenFailure()
return
}
// If we were succesful, we'll track this peer
// in our set of pending opens. We do this here
// to ensure we don't stall on selecting new
// peers if the connection attempt happens to
// take too long.
pendingMtx.Lock()
if uint16(len(pendingOpens))+1 >
a.cfg.MaxPendingOpens {
pendingMtx.Unlock()
// Since we've reached our max number
// of pending opens, we'll disconnect
// this peer and exit. However, if we
// were previously connected to them,
// then we'll make sure to maintain the
// connection alive.
if alreadyConnected {
return
}
err = a.cfg.DisconnectPeer(
pub,
)
if err != nil {
log.Warnf("Unable to "+
"disconnect peer "+
"%x: %v",
pub.SerializeCompressed(),
err)
}
return
}
nodeID := NewNodeID(directive.PeerKey)
pendingOpens[nodeID] = Channel{
Capacity: directive.ChanAmt,
Node: nodeID,
}
pendingMtx.Unlock()
// We can then begin the funding workflow with
// this peer.
err = a.cfg.ChanController.OpenChannel(
pub, directive.ChanAmt,
)
if err != nil {
log.Warnf("Unable to open "+
"channel to %x of %v: %v",
pub.SerializeCompressed(),
directive.ChanAmt, err)
// As the attempt failed, we'll clear
// the peer from the set of pending
// opens and mark them as failed so we
// don't attempt to open a channel to
// them again.
pendingMtx.Lock()
delete(pendingOpens, nodeID)
failedNodes[nodeID] = struct{}{}
pendingMtx.Unlock()
// Trigger the agent to re-evaluate
// everything and possibly retry with a
// different node.
a.OnChannelOpenFailure()
// Finally, we should also disconnect
// the peer if we weren't already
// connected to them beforehand by an
// external subsystem.
if alreadyConnected {
return
}
err = a.cfg.DisconnectPeer(pub)
if err != nil {
log.Warnf("Unable to "+
"disconnect peer "+
"%x: %v",
pub.SerializeCompressed(),
err)
}
}
// Since the channel open was successful and is
// currently pending, we'll trigger the
// autopilot agent to query for more peers.
a.OnChannelPendingOpen()
}(chanCandidate)
}
pendingMtx.Unlock()
}
}