channelnotifier+discover+invoices: return error in Stop functions

In order to be consistent with other sub systems an error is now
returned from the Stop functions.
This also allows writing a generic cleanup mechanism to stop all
sub systems in case of a failure.
This commit is contained in:
Roei Erez 2019-01-21 13:11:19 +02:00
parent c998264578
commit 3223df74e5
10 changed files with 58 additions and 20 deletions

@ -86,10 +86,12 @@ func (c *ChannelNotifier) Start() error {
} }
// Stop signals the notifier for a graceful shutdown. // Stop signals the notifier for a graceful shutdown.
func (c *ChannelNotifier) Stop() { func (c *ChannelNotifier) Stop() error {
var err error
c.stopped.Do(func() { c.stopped.Do(func() {
c.ntfnServer.Stop() err = c.ntfnServer.Stop()
}) })
return err
} }
// SubscribeChannelEvents returns a subscribe.Client that will receive updates // SubscribeChannelEvents returns a subscribe.Client that will receive updates

@ -461,8 +461,9 @@ func (d *AuthenticatedGossiper) start() error {
} }
// Stop signals any active goroutines for a graceful closure. // Stop signals any active goroutines for a graceful closure.
func (d *AuthenticatedGossiper) Stop() { func (d *AuthenticatedGossiper) Stop() error {
d.stopped.Do(d.stop) d.stopped.Do(d.stop)
return nil
} }
func (d *AuthenticatedGossiper) stop() { func (d *AuthenticatedGossiper) stop() {

@ -675,12 +675,14 @@ func (f *Manager) start() error {
// Stop signals all helper goroutines to execute a graceful shutdown. This // Stop signals all helper goroutines to execute a graceful shutdown. This
// method will block until all goroutines have exited. // method will block until all goroutines have exited.
func (f *Manager) Stop() { func (f *Manager) Stop() error {
f.stopped.Do(func() { f.stopped.Do(func() {
log.Info("Funding manager shutting down") log.Info("Funding manager shutting down")
close(f.quit) close(f.quit)
f.wg.Wait() f.wg.Wait()
}) })
return nil
} }
// nextPendingChanID returns the next free pending channel ID to be used to // nextPendingChanID returns the next free pending channel ID to be used to

@ -484,7 +484,9 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
func recreateAliceFundingManager(t *testing.T, alice *testNode) { func recreateAliceFundingManager(t *testing.T, alice *testNode) {
// Stop the old fundingManager before creating a new one. // Stop the old fundingManager before creating a new one.
close(alice.shutdownChannel) close(alice.shutdownChannel)
alice.fundingMgr.Stop() if err := alice.fundingMgr.Stop(); err != nil {
t.Fatalf("failed stop funding manager: %v", err)
}
aliceMsgChan := make(chan lnwire.Message) aliceMsgChan := make(chan lnwire.Message)
aliceAnnounceChan := make(chan lnwire.Message) aliceAnnounceChan := make(chan lnwire.Message)
@ -622,8 +624,12 @@ func tearDownFundingManagers(t *testing.T, a, b *testNode) {
close(a.shutdownChannel) close(a.shutdownChannel)
close(b.shutdownChannel) close(b.shutdownChannel)
a.fundingMgr.Stop() if err := a.fundingMgr.Stop(); err != nil {
b.fundingMgr.Stop() t.Fatalf("failed stop funding manager: %v", err)
}
if err := b.fundingMgr.Stop(); err != nil {
t.Fatalf("failed stop funding manager: %v", err)
}
os.RemoveAll(a.testDir) os.RemoveAll(a.testDir)
os.RemoveAll(b.testDir) os.RemoveAll(b.testDir)
} }
@ -1502,7 +1508,9 @@ func TestFundingManagerRestartBehavior(t *testing.T) {
// implementation, and expect it to retry sending the fundingLocked // implementation, and expect it to retry sending the fundingLocked
// message. We'll explicitly shut down Alice's funding manager to // message. We'll explicitly shut down Alice's funding manager to
// prevent a race when overriding the sendMessage implementation. // prevent a race when overriding the sendMessage implementation.
alice.fundingMgr.Stop() if err := alice.fundingMgr.Stop(); err != nil {
t.Fatalf("failed stop funding manager: %v", err)
}
bob.sendMessage = workingSendMessage bob.sendMessage = workingSendMessage
recreateAliceFundingManager(t, alice) recreateAliceFundingManager(t, alice)

@ -87,12 +87,14 @@ func (h *HtlcNotifier) Start() error {
} }
// Stop signals the notifier for a graceful shutdown. // Stop signals the notifier for a graceful shutdown.
func (h *HtlcNotifier) Stop() { func (h *HtlcNotifier) Stop() error {
var err error
h.stopped.Do(func() { h.stopped.Do(func() {
if err := h.ntfnServer.Stop(); err != nil { if err = h.ntfnServer.Stop(); err != nil {
log.Warnf("error stopping htlc notifier: %v", err) log.Warnf("error stopping htlc notifier: %v", err)
} }
}) })
return err
} }
// SubscribeHtlcEvents returns a subscribe.Client that will receive updates // SubscribeHtlcEvents returns a subscribe.Client that will receive updates

@ -2867,19 +2867,31 @@ func testHtcNotifier(t *testing.T, testOpts []serverOption, iterations int,
if err := aliceNotifier.Start(); err != nil { if err := aliceNotifier.Start(); err != nil {
t.Fatalf("could not start alice notifier") t.Fatalf("could not start alice notifier")
} }
defer aliceNotifier.Stop() defer func() {
if err := aliceNotifier.Stop(); err != nil {
t.Fatalf("failed to stop alice notifier: %v", err)
}
}()
bobNotifier := NewHtlcNotifier(mockTime) bobNotifier := NewHtlcNotifier(mockTime)
if err := bobNotifier.Start(); err != nil { if err := bobNotifier.Start(); err != nil {
t.Fatalf("could not start bob notifier") t.Fatalf("could not start bob notifier")
} }
defer bobNotifier.Stop() defer func() {
if err := bobNotifier.Stop(); err != nil {
t.Fatalf("failed to stop bob notifier: %v", err)
}
}()
carolNotifier := NewHtlcNotifier(mockTime) carolNotifier := NewHtlcNotifier(mockTime)
if err := carolNotifier.Start(); err != nil { if err := carolNotifier.Start(); err != nil {
t.Fatalf("could not start carol notifier") t.Fatalf("could not start carol notifier")
} }
defer carolNotifier.Stop() defer func() {
if err := carolNotifier.Stop(); err != nil {
t.Fatalf("failed to stop carol notifier: %v", err)
}
}()
// Create a notifier server option which will set our htlc notifiers // Create a notifier server option which will set our htlc notifiers
// for the three hop network. // for the three hop network.

@ -237,7 +237,7 @@ func (i *InvoiceRegistry) Start() error {
// delete them. // delete them.
err = i.scanInvoicesOnStart() err = i.scanInvoicesOnStart()
if err != nil { if err != nil {
i.Stop() _ = i.Stop()
return err return err
} }
@ -245,12 +245,13 @@ func (i *InvoiceRegistry) Start() error {
} }
// Stop signals the registry for a graceful shutdown. // Stop signals the registry for a graceful shutdown.
func (i *InvoiceRegistry) Stop() { func (i *InvoiceRegistry) Stop() error {
i.expiryWatcher.Stop() i.expiryWatcher.Stop()
close(i.quit) close(i.quit)
i.wg.Wait() i.wg.Wait()
return nil
} }
// invoiceEvent represents a new event that has modified on invoice on disk. // invoiceEvent represents a new event that has modified on invoice on disk.

@ -527,7 +527,11 @@ func TestCancelHoldInvoice(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer registry.Stop() defer func() {
if err := registry.Stop(); err != nil {
t.Fatalf("failed to stop invoice registry: %v", err)
}
}()
// Add the invoice. // Add the invoice.
_, err = registry.AddInvoice(testHodlInvoice, testInvoicePaymentHash) _, err = registry.AddInvoice(testHodlInvoice, testInvoicePaymentHash)
@ -1005,7 +1009,9 @@ func TestInvoiceExpiryWithRegistry(t *testing.T) {
// Give some time to the watcher to cancel everything. // Give some time to the watcher to cancel everything.
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
registry.Stop() if err = registry.Stop(); err != nil {
t.Fatalf("failed to stop invoice registry: %v", err)
}
// Create the expected cancellation set before the final check. // Create the expected cancellation set before the final check.
expectedCancellations = append( expectedCancellations = append(

@ -215,7 +215,9 @@ func newTestContext(t *testing.T) *testContext {
clock: clock, clock: clock,
t: t, t: t,
cleanup: func() { cleanup: func() {
registry.Stop() if err = registry.Stop(); err != nil {
t.Fatalf("failed to stop invoice registry: %v", err)
}
cleanup() cleanup()
}, },
} }

@ -49,11 +49,13 @@ func (p *PeerNotifier) Start() error {
} }
// Stop signals the notifier for a graceful shutdown. // Stop signals the notifier for a graceful shutdown.
func (p *PeerNotifier) Stop() { func (p *PeerNotifier) Stop() error {
var err error
p.stopped.Do(func() { p.stopped.Do(func() {
log.Info("Stopping PeerNotifier") log.Info("Stopping PeerNotifier")
p.ntfnServer.Stop() err = p.ntfnServer.Stop()
}) })
return err
} }
// SubscribePeerEvents returns a subscribe.Client that will receive updates // SubscribePeerEvents returns a subscribe.Client that will receive updates