autopilot: limit the number of outstanding channel open goroutines
In this commit, we fix an existing bug that would at times cause us to spiral out of control and potentially created thousands of outbound connections. Our fix is simple: limit the total number of outstanding channel establishment attempts. Without this limit, we have no way to bound the number of active goroutines. Fixes #772.
This commit is contained in:
parent
2cb7d5b570
commit
ce2d5a2156
@ -41,6 +41,12 @@ type Config struct {
|
|||||||
// within the graph.
|
// within the graph.
|
||||||
Graph ChannelGraph
|
Graph ChannelGraph
|
||||||
|
|
||||||
|
// MaxPendingOpens is the maximum number of pending channel
|
||||||
|
// establishment goroutines that can be lingering. We cap this value in
|
||||||
|
// order to control the level of parallelism caused by the autopiloit
|
||||||
|
// agent.
|
||||||
|
MaxPendingOpens uint16
|
||||||
|
|
||||||
// TODO(roasbeef): add additional signals from fee rates and revenue of
|
// TODO(roasbeef): add additional signals from fee rates and revenue of
|
||||||
// currently opened channels
|
// currently opened channels
|
||||||
}
|
}
|
||||||
@ -410,6 +416,21 @@ func (a *Agent) controller(startingBalance btcutil.Amount) {
|
|||||||
// top of our controller loop.
|
// top of our controller loop.
|
||||||
pendingMtx.Lock()
|
pendingMtx.Lock()
|
||||||
for _, chanCandidate := range chanCandidates {
|
for _, chanCandidate := range chanCandidates {
|
||||||
|
// Before we proceed, we'll check to see if
|
||||||
|
// this attempt would take us past the total
|
||||||
|
// number of allowed pending opens. If so, then
|
||||||
|
// we'll skip this round and wait for an
|
||||||
|
// attempt to either fail or succeed.
|
||||||
|
if uint16(len(pendingOpens))+1 >
|
||||||
|
a.cfg.MaxPendingOpens {
|
||||||
|
|
||||||
|
log.Debugf("Reached cap of %v pending "+
|
||||||
|
"channel opens, will retry "+
|
||||||
|
"after success/failure",
|
||||||
|
a.cfg.MaxPendingOpens)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
nID := NewNodeID(chanCandidate.PeerKey)
|
nID := NewNodeID(chanCandidate.PeerKey)
|
||||||
pendingOpens[nID] = Channel{
|
pendingOpens[nID] = Channel{
|
||||||
Capacity: chanCandidate.ChanAmt,
|
Capacity: chanCandidate.ChanAmt,
|
||||||
@ -417,6 +438,7 @@ func (a *Agent) controller(startingBalance btcutil.Amount) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
go func(directive AttachmentDirective) {
|
go func(directive AttachmentDirective) {
|
||||||
|
|
||||||
pub := directive.PeerKey
|
pub := directive.PeerKey
|
||||||
err := a.cfg.ChanController.OpenChannel(
|
err := a.cfg.ChanController.OpenChannel(
|
||||||
|
|
||||||
|
@ -139,6 +139,7 @@ func TestAgentChannelOpenSignal(t *testing.T) {
|
|||||||
return 0, nil
|
return 0, nil
|
||||||
},
|
},
|
||||||
Graph: memGraph,
|
Graph: memGraph,
|
||||||
|
MaxPendingOpens: 10,
|
||||||
}
|
}
|
||||||
initialChans := []Channel{}
|
initialChans := []Channel{}
|
||||||
agent, err := New(testCfg, initialChans)
|
agent, err := New(testCfg, initialChans)
|
||||||
@ -272,6 +273,7 @@ func TestAgentChannelFailureSignal(t *testing.T) {
|
|||||||
return 0, nil
|
return 0, nil
|
||||||
},
|
},
|
||||||
Graph: memGraph,
|
Graph: memGraph,
|
||||||
|
MaxPendingOpens: 10,
|
||||||
}
|
}
|
||||||
|
|
||||||
initialChans := []Channel{}
|
initialChans := []Channel{}
|
||||||
@ -361,6 +363,7 @@ func TestAgentChannelCloseSignal(t *testing.T) {
|
|||||||
return 0, nil
|
return 0, nil
|
||||||
},
|
},
|
||||||
Graph: memGraph,
|
Graph: memGraph,
|
||||||
|
MaxPendingOpens: 10,
|
||||||
}
|
}
|
||||||
|
|
||||||
// We'll start the agent with two channels already being active.
|
// We'll start the agent with two channels already being active.
|
||||||
@ -484,6 +487,7 @@ func TestAgentBalanceUpdate(t *testing.T) {
|
|||||||
return walletBalance, nil
|
return walletBalance, nil
|
||||||
},
|
},
|
||||||
Graph: memGraph,
|
Graph: memGraph,
|
||||||
|
MaxPendingOpens: 10,
|
||||||
}
|
}
|
||||||
initialChans := []Channel{}
|
initialChans := []Channel{}
|
||||||
agent, err := New(testCfg, initialChans)
|
agent, err := New(testCfg, initialChans)
|
||||||
@ -599,6 +603,7 @@ func TestAgentImmediateAttach(t *testing.T) {
|
|||||||
return walletBalance, nil
|
return walletBalance, nil
|
||||||
},
|
},
|
||||||
Graph: memGraph,
|
Graph: memGraph,
|
||||||
|
MaxPendingOpens: 10,
|
||||||
}
|
}
|
||||||
initialChans := []Channel{}
|
initialChans := []Channel{}
|
||||||
agent, err := New(testCfg, initialChans)
|
agent, err := New(testCfg, initialChans)
|
||||||
@ -733,6 +738,7 @@ func TestAgentPendingChannelState(t *testing.T) {
|
|||||||
return walletBalance, nil
|
return walletBalance, nil
|
||||||
},
|
},
|
||||||
Graph: memGraph,
|
Graph: memGraph,
|
||||||
|
MaxPendingOpens: 10,
|
||||||
}
|
}
|
||||||
initialChans := []Channel{}
|
initialChans := []Channel{}
|
||||||
agent, err := New(testCfg, initialChans)
|
agent, err := New(testCfg, initialChans)
|
||||||
|
Loading…
Reference in New Issue
Block a user