diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 8cb4cbb4..cffe3991 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -256,6 +256,10 @@ out: close(outPointClients[msg.spendID].spendChan) delete(b.spendNotifications[msg.op], msg.spendID) } + + // Signal that it's safe yield from Cancel to application. + close(msg.done) + case *epochCancel: chainntnfs.Log.Infof("Cancelling epoch "+ "notification, epoch_id=%v", msg.epochID) @@ -274,6 +278,7 @@ out: close(b.blockEpochClients[msg.epochID].epochChan) delete(b.blockEpochClients, msg.epochID) + // Signal that it's safe yield from Cancel to application. close(msg.done) } case registerMsg := <-b.notificationRegistry: @@ -620,6 +625,8 @@ type spendCancel struct { // spendID the ID of the notification to cancel. spendID uint64 + + done chan struct{} } // RegisterSpendNtfn registers an intent to be notified once the target @@ -675,13 +682,22 @@ func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, return &chainntnfs.SpendEvent{ Spend: ntfn.spendChan, Cancel: func() { - select { - case b.notificationCancels <- &spendCancel{ + cancel := &spendCancel{ op: *outpoint, spendID: ntfn.spendID, - }: + done: make(chan struct{}), + } + + // Submit spend cancellation to notification dispatcher. + select { + case b.notificationCancels <- cancel: + // Cancellation is being handled, wait for close before yielding to + // caller. + select { + case <-cancel.done: + case <-b.quit: + } case <-b.quit: - return } }, }, nil @@ -766,14 +782,16 @@ func (b *BtcdNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, er done: make(chan struct{}), } + // Submit epoch cancellation to notification dispatcher. select { case b.notificationCancels <- cancel: + // Cancellation is being handled, wait for close before yielding to + // caller. select { case <-cancel.done: case <-b.quit: } case <-b.quit: - return } }, }, nil diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 4e9f2034..a0d777a2 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -267,6 +267,10 @@ func (n *NeutrinoNotifier) notificationDispatcher() { close(outPointClients[msg.spendID].spendChan) delete(n.spendNotifications[msg.op], msg.spendID) } + + // Signal that it's safe yield from Cancel to application. + close(msg.done) + case *epochCancel: chainntnfs.Log.Infof("Cancelling epoch "+ "notification, epoch_id=%v", msg.epochID) @@ -285,6 +289,7 @@ func (n *NeutrinoNotifier) notificationDispatcher() { close(n.blockEpochClients[msg.epochID].epochChan) delete(n.blockEpochClients, msg.epochID) + // Signal that it's safe yield from Cancel to application. close(msg.done) } @@ -682,6 +687,8 @@ type spendCancel struct { // spendID the ID of the notification to cancel. spendID uint64 + + done chan struct{} } // RegisterSpendNtfn registers an intent to be notified once the target @@ -706,13 +713,22 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, spendEvent := &chainntnfs.SpendEvent{ Spend: ntfn.spendChan, Cancel: func() { - select { - case n.notificationCancels <- &spendCancel{ + cancel := &spendCancel{ op: *outpoint, spendID: ntfn.spendID, - }: + done: make(chan struct{}), + } + + // Submit spend cancellation to notification dispatcher. + select { + case n.notificationCancels <- cancel: + // Cancellation is being handled, wait for close before yielding to + // caller. + select { + case <-cancel.done: + case <-n.quit: + } case <-n.quit: - return } }, } @@ -870,14 +886,16 @@ func (n *NeutrinoNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent done: make(chan struct{}), } + // Submit epoch cancellation to notification dispatcher. select { case n.notificationCancels <- cancel: + // Cancellation is being handled, wait for close before yielding to + // caller. select { case <-cancel.done: case <-n.quit: } case <-n.quit: - return } }, }, nil