autopilot/agent: signal chanPendingOpenUpdates on own channel

This commit is contained in:
Johan T. Halseth 2018-08-31 14:55:07 +02:00
parent 186e6d4da4
commit a9a9c9aeb4
No known key found for this signature in database
GPG Key ID: 15BAADA29DA20D26

@ -122,6 +122,12 @@ type Agent struct {
// at most one pending update of this type to handle at a given time. // at most one pending update of this type to handle at a given time.
nodeUpdates chan *nodeUpdates nodeUpdates chan *nodeUpdates
// pendingOpenUpdates is a channel where updates about channel pending
// opening will be sent. This channel will be buffered to ensure we
// have at most one pending update of this type to handle at a given
// time.
pendingOpenUpdates chan *chanPendingOpenUpdate
// chanOpenFailures is a channel where updates about channel open // chanOpenFailures is a channel where updates about channel open
// failures will be sent. This channel will be buffered to ensure we // failures will be sent. This channel will be buffered to ensure we
// have at most one pending update of this type to handle at a given // have at most one pending update of this type to handle at a given
@ -149,6 +155,7 @@ func New(cfg Config, initialState []Channel) (*Agent, error) {
stateUpdates: make(chan interface{}), stateUpdates: make(chan interface{}),
nodeUpdates: make(chan *nodeUpdates, 1), nodeUpdates: make(chan *nodeUpdates, 1),
chanOpenFailures: make(chan *chanOpenFailureUpdate, 1), chanOpenFailures: make(chan *chanOpenFailureUpdate, 1),
pendingOpenUpdates: make(chan *chanPendingOpenUpdate, 1),
} }
for _, c := range initialState { for _, c := range initialState {
@ -260,12 +267,10 @@ func (a *Agent) OnChannelOpen(c Channel) {
// channel is opened, either by the agent or an external subsystems, but is // channel is opened, either by the agent or an external subsystems, but is
// still pending. // still pending.
func (a *Agent) OnChannelPendingOpen() { func (a *Agent) OnChannelPendingOpen() {
go func() {
select { select {
case a.stateUpdates <- &chanPendingOpenUpdate{}: case a.pendingOpenUpdates <- &chanPendingOpenUpdate{}:
case <-a.quit: default:
} }
}()
} }
// OnChannelOpenFailure is a callback that should be executed when the // OnChannelOpenFailure is a callback that should be executed when the
@ -406,13 +411,6 @@ func (a *Agent) controller() {
pendingMtx.Unlock() pendingMtx.Unlock()
updateBalance() updateBalance()
// A new channel has been opened by the agent or an
// external subsystem, but is still pending
// confirmation.
case *chanPendingOpenUpdate:
updateBalance()
// A channel has been closed, this may free up an // A channel has been closed, this may free up an
// available slot, triggering a new channel update. // available slot, triggering a new channel update.
case *chanCloseUpdate: case *chanCloseUpdate:
@ -427,6 +425,11 @@ func (a *Agent) controller() {
updateBalance() updateBalance()
} }
// A new channel has been opened by the agent or an external
// subsystem, but is still pending confirmation.
case <-a.pendingOpenUpdates:
updateBalance()
// The channel we tried to open previously failed for whatever // The channel we tried to open previously failed for whatever
// reason. // reason.
case <-a.chanOpenFailures: case <-a.chanOpenFailures: