From 9f85eadde1b4ca355bd2e3ea025ded0575267b65 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Sat, 29 Jul 2017 20:28:48 -0700 Subject: [PATCH] chainntnfs/btcd+neutrino: sync epoch cancel --- chainntnfs/btcdnotify/btcd.go | 41 +++++++++++++------------- chainntnfs/neutrinonotify/neutrino.go | 42 +++++++++++++-------------- 2 files changed, 40 insertions(+), 43 deletions(-) diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index cffe3991..1afd82ae 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -257,9 +257,6 @@ out: delete(b.spendNotifications[msg.op], msg.spendID) } - // Signal that it's safe yield from Cancel to application. - close(msg.done) - case *epochCancel: chainntnfs.Log.Infof("Cancelling epoch "+ "notification, epoch_id=%v", msg.epochID) @@ -278,8 +275,6 @@ out: close(b.blockEpochClients[msg.epochID].epochChan) delete(b.blockEpochClients, msg.epochID) - // Signal that it's safe yield from Cancel to application. - close(msg.done) } case registerMsg := <-b.notificationRegistry: switch msg := registerMsg.(type) { @@ -625,8 +620,6 @@ type spendCancel struct { // spendID the ID of the notification to cancel. spendID uint64 - - done chan struct{} } // RegisterSpendNtfn registers an intent to be notified once the target @@ -685,17 +678,21 @@ func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, cancel := &spendCancel{ op: *outpoint, spendID: ntfn.spendID, - done: make(chan struct{}), } // Submit spend cancellation to notification dispatcher. select { case b.notificationCancels <- cancel: - // Cancellation is being handled, wait for close before yielding to - // caller. - select { - case <-cancel.done: - case <-b.quit: + // Cancellation is being handled, drain the spend chan until it is + // closed before yielding to the caller. + for { + select { + case _, ok := <-ntfn.spendChan: + if !ok { + return + } + case <-b.quit: + } } case <-b.quit: } @@ -755,8 +752,6 @@ type blockEpochRegistration struct { // cancel an outstanding epoch notification that has yet to be dispatched. type epochCancel struct { epochID uint64 - - done chan struct{} } // RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the @@ -779,17 +774,21 @@ func (b *BtcdNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, er Cancel: func() { cancel := &epochCancel{ epochID: registration.epochID, - done: make(chan struct{}), } // Submit epoch cancellation to notification dispatcher. select { case b.notificationCancels <- cancel: - // Cancellation is being handled, wait for close before yielding to - // caller. - select { - case <-cancel.done: - case <-b.quit: + // Cancellation is being handled, drain the epoch channel until it is + // closed before yielding to caller. + for { + select { + case _, ok := <-registration.epochChan: + if !ok { + return + } + case <-b.quit: + } } case <-b.quit: } diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index a0d777a2..73484a94 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -268,9 +268,6 @@ func (n *NeutrinoNotifier) notificationDispatcher() { delete(n.spendNotifications[msg.op], msg.spendID) } - // Signal that it's safe yield from Cancel to application. - close(msg.done) - case *epochCancel: chainntnfs.Log.Infof("Cancelling epoch "+ "notification, epoch_id=%v", msg.epochID) @@ -288,9 +285,6 @@ func (n *NeutrinoNotifier) notificationDispatcher() { // cancelled. close(n.blockEpochClients[msg.epochID].epochChan) delete(n.blockEpochClients, msg.epochID) - - // Signal that it's safe yield from Cancel to application. - close(msg.done) } case registerMsg := <-n.notificationRegistry: @@ -687,8 +681,6 @@ type spendCancel struct { // spendID the ID of the notification to cancel. spendID uint64 - - done chan struct{} } // RegisterSpendNtfn registers an intent to be notified once the target @@ -716,17 +708,21 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, cancel := &spendCancel{ op: *outpoint, spendID: ntfn.spendID, - done: make(chan struct{}), } // Submit spend cancellation to notification dispatcher. select { case n.notificationCancels <- cancel: - // Cancellation is being handled, wait for close before yielding to - // caller. - select { - case <-cancel.done: - case <-n.quit: + // Cancellation is being handled, drain the spend chan until it is + // closed before yielding to the caller. + for { + select { + case _, ok := <-ntfn.spendChan: + if !ok { + return + } + case <-n.quit: + } } case <-n.quit: } @@ -860,8 +856,6 @@ type blockEpochRegistration struct { // to cancel an outstanding epoch notification that has yet to be dispatched. type epochCancel struct { epochID uint64 - - done chan struct{} } // RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the caller @@ -883,17 +877,21 @@ func (n *NeutrinoNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent Cancel: func() { cancel := &epochCancel{ epochID: registration.epochID, - done: make(chan struct{}), } // Submit epoch cancellation to notification dispatcher. select { case n.notificationCancels <- cancel: - // Cancellation is being handled, wait for close before yielding to - // caller. - select { - case <-cancel.done: - case <-n.quit: + // Cancellation is being handled, drain the epoch channel until it is + // closed before yielding to caller. + for { + select { + case _, ok := <-registration.epochChan: + if !ok { + return + } + case <-n.quit: + } } case <-n.quit: }