autopilot/manager: only set m.pilot if started successfully

This commit fixes a subtle bug within the autopilot manager, that would
cause the active pilot to not be reset in case it wasn't started
successfully.

We also make sure the associated goroutines close over the started
pilot, and not the active pilot.
This commit is contained in:
Johan T. Halseth 2018-12-19 14:54:56 +01:00
parent 2f17030e8c
commit dbf7b380a9
No known key found for this signature in database
GPG Key ID: 15BAADA29DA20D26

@ -115,12 +115,12 @@ func (m *Manager) StartAgent() error {
// Now that we have all the initial dependencies, we can create the // Now that we have all the initial dependencies, we can create the
// auto-pilot instance itself. // auto-pilot instance itself.
m.pilot, err = New(*m.cfg.PilotCfg, initialChanState) pilot, err := New(*m.cfg.PilotCfg, initialChanState)
if err != nil { if err != nil {
return err return err
} }
if err := m.pilot.Start(); err != nil { if err := pilot.Start(); err != nil {
return err return err
} }
@ -129,16 +129,18 @@ func (m *Manager) StartAgent() error {
// topology updates. // topology updates.
txnSubscription, err := m.cfg.SubscribeTransactions() txnSubscription, err := m.cfg.SubscribeTransactions()
if err != nil { if err != nil {
defer m.pilot.Stop() pilot.Stop()
return err return err
} }
graphSubscription, err := m.cfg.SubscribeTopology() graphSubscription, err := m.cfg.SubscribeTopology()
if err != nil { if err != nil {
defer m.pilot.Stop() txnSubscription.Cancel()
defer txnSubscription.Cancel() pilot.Stop()
return err return err
} }
m.pilot = pilot
// We'll launch a goroutine to provide the agent with notifications // We'll launch a goroutine to provide the agent with notifications
// whenever the balance of the wallet changes. // whenever the balance of the wallet changes.
// TODO(halseth): can lead to panic if in process of shutting down. // TODO(halseth): can lead to panic if in process of shutting down.
@ -150,7 +152,7 @@ func (m *Manager) StartAgent() error {
for { for {
select { select {
case <-txnSubscription.ConfirmedTransactions(): case <-txnSubscription.ConfirmedTransactions():
m.pilot.OnBalanceChange() pilot.OnBalanceChange()
// We won't act upon new unconfirmed transaction, as // We won't act upon new unconfirmed transaction, as
// we'll only use confirmed outputs when funding. // we'll only use confirmed outputs when funding.
@ -158,7 +160,7 @@ func (m *Manager) StartAgent() error {
// to avoid goroutine leaks, and ensure we promptly // to avoid goroutine leaks, and ensure we promptly
// read from the channel if available. // read from the channel if available.
case <-txnSubscription.UnconfirmedTransactions(): case <-txnSubscription.UnconfirmedTransactions():
case <-m.pilot.quit: case <-pilot.quit:
return return
case <-m.quit: case <-m.quit:
return return
@ -209,7 +211,7 @@ func (m *Manager) StartAgent() error {
Capacity: edgeUpdate.Capacity, Capacity: edgeUpdate.Capacity,
Node: chanNode, Node: chanNode,
} }
m.pilot.OnChannelOpen(edge) pilot.OnChannelOpen(edge)
} }
// For each closed channel, we'll obtain // For each closed channel, we'll obtain
@ -220,17 +222,17 @@ func (m *Manager) StartAgent() error {
chanClose.ChanID, chanClose.ChanID,
) )
m.pilot.OnChannelClose(chanID) pilot.OnChannelClose(chanID)
} }
// If new nodes were added to the graph, or nod // If new nodes were added to the graph, or nod
// information has changed, we'll poke autopilot // information has changed, we'll poke autopilot
// to see if it can make use of them. // to see if it can make use of them.
if len(topChange.NodeUpdates) > 0 { if len(topChange.NodeUpdates) > 0 {
m.pilot.OnNodeUpdates() pilot.OnNodeUpdates()
} }
case <-m.pilot.quit: case <-pilot.quit:
return return
case <-m.quit: case <-m.quit:
return return