autopilot/agent: signal chanOpenFailureUpdates on own channel

We do this to avoid a huge amount of goroutines piling up when autopilot
is trying to open many channels, as they will all block trying to send
the update on the stateUpdates channel. Now we instead send them on a
buffered channel, similar to what is done with the nodeUpdates.
This commit is contained in:
Johan T. Halseth 2018-08-31 14:45:00 +02:00
parent 4a88c61a90
commit 186e6d4da4
No known key found for this signature in database
GPG Key ID: 15BAADA29DA20D26

@ -122,6 +122,12 @@ type Agent struct {
// at most one pending update of this type to handle at a given time. // at most one pending update of this type to handle at a given time.
nodeUpdates chan *nodeUpdates nodeUpdates chan *nodeUpdates
// chanOpenFailures is a channel where updates about channel open
// failures will be sent. This channel will be buffered to ensure we
// have at most one pending update of this type to handle at a given
// time.
chanOpenFailures chan *chanOpenFailureUpdate
// totalBalance is the total number of satoshis the backing wallet is // totalBalance is the total number of satoshis the backing wallet is
// known to control at any given instance. This value will be updated // known to control at any given instance. This value will be updated
// when the agent receives external balance update signals. // when the agent receives external balance update signals.
@ -137,11 +143,12 @@ type Agent struct {
// the backing Lightning Node. // the backing Lightning Node.
func New(cfg Config, initialState []Channel) (*Agent, error) { func New(cfg Config, initialState []Channel) (*Agent, error) {
a := &Agent{ a := &Agent{
cfg: cfg, cfg: cfg,
chanState: make(map[lnwire.ShortChannelID]Channel), chanState: make(map[lnwire.ShortChannelID]Channel),
quit: make(chan struct{}), quit: make(chan struct{}),
stateUpdates: make(chan interface{}), stateUpdates: make(chan interface{}),
nodeUpdates: make(chan *nodeUpdates, 1), nodeUpdates: make(chan *nodeUpdates, 1),
chanOpenFailures: make(chan *chanOpenFailureUpdate, 1),
} }
for _, c := range initialState { for _, c := range initialState {
@ -265,15 +272,10 @@ func (a *Agent) OnChannelPendingOpen() {
// autopilot has attempted to open a channel, but failed. In this case we can // autopilot has attempted to open a channel, but failed. In this case we can
// retry channel creation with a different node. // retry channel creation with a different node.
func (a *Agent) OnChannelOpenFailure() { func (a *Agent) OnChannelOpenFailure() {
a.wg.Add(1) select {
go func() { case a.chanOpenFailures <- &chanOpenFailureUpdate{}:
defer a.wg.Done() default:
}
select {
case a.stateUpdates <- &chanOpenFailureUpdate{}:
case <-a.quit:
}
}()
} }
// OnChannelClose is a callback that should be executed each time a prior // OnChannelClose is a callback that should be executed each time a prior
@ -388,14 +390,6 @@ func (a *Agent) controller() {
updateBalance() updateBalance()
// The channel we tried to open previously failed for
// whatever reason.
case *chanOpenFailureUpdate:
log.Debug("Retrying after previous channel " +
"open failure.")
updateBalance()
// A new channel has been opened successfully. This was // A new channel has been opened successfully. This was
// either opened by the Agent, or an external system // either opened by the Agent, or an external system
// that is able to drive the Lightning Node. // that is able to drive the Lightning Node.
@ -433,6 +427,14 @@ func (a *Agent) controller() {
updateBalance() updateBalance()
} }
// The channel we tried to open previously failed for whatever
// reason.
case <-a.chanOpenFailures:
log.Debug("Retrying after previous channel open " +
"failure.")
updateBalance()
// New nodes have been added to the graph or their node // New nodes have been added to the graph or their node
// announcements have been updated. We will consider opening // announcements have been updated. We will consider opening
// channels to these nodes if we haven't stabilized. // channels to these nodes if we haven't stabilized.