chainntfns/btcdnotify: eliminate block epoch race condition, use diff cancel channel

This commit fixes a race condition that was uncovered by the race
condition detector surrounding cancelling active block epoch
notifications. Previously we would close the main notification channel
for each client, at tine this would cause a read/write race condition
if an active grouting was attempting to dispatch a notification. We now
fix this use by using a distinct channel for signaling cancellation to
the active grouting, and another to signal cancellation to any
notification observers.
This commit is contained in:
Olaoluwa Osuntokun 2017-05-05 15:53:09 -07:00
parent 5b7fe7de9e
commit 4b15310c08
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2

@ -68,7 +68,7 @@ type BtcdNotifier struct {
confNotifications map[chainhash.Hash][]*confirmationsNotification confNotifications map[chainhash.Hash][]*confirmationsNotification
confHeap *confirmationHeap confHeap *confirmationHeap
blockEpochClients map[uint64]chan *chainntnfs.BlockEpoch blockEpochClients map[uint64]*blockEpochRegistration
disconnectedBlockHashes chan *blockNtfn disconnectedBlockHashes chan *blockNtfn
@ -95,7 +95,7 @@ func New(config *btcrpcclient.ConnConfig) (*BtcdNotifier, error) {
notificationCancels: make(chan interface{}), notificationCancels: make(chan interface{}),
notificationRegistry: make(chan interface{}), notificationRegistry: make(chan interface{}),
blockEpochClients: make(map[uint64]chan *chainntnfs.BlockEpoch), blockEpochClients: make(map[uint64]*blockEpochRegistration),
spendNotifications: make(map[wire.OutPoint]map[uint64]*spendNotification), spendNotifications: make(map[wire.OutPoint]map[uint64]*spendNotification),
@ -185,7 +185,8 @@ func (b *BtcdNotifier) Stop() error {
} }
} }
for _, epochClient := range b.blockEpochClients { for _, epochClient := range b.blockEpochClients {
close(epochClient) close(epochClient.cancelChan)
close(epochClient.epochChan)
} }
return nil return nil
@ -260,7 +261,8 @@ out:
chainntnfs.Log.Infof("Cancelling epoch "+ chainntnfs.Log.Infof("Cancelling epoch "+
"notification, epoch_id=%v", msg.epochID) "notification, epoch_id=%v", msg.epochID)
close(b.blockEpochClients[msg.epochID]) close(b.blockEpochClients[msg.epochID].cancelChan)
close(b.blockEpochClients[msg.epochID].epochChan)
delete(b.blockEpochClients, msg.epochID) delete(b.blockEpochClients, msg.epochID)
close(msg.done) close(msg.done)
@ -292,7 +294,7 @@ out:
b.confNotifications[txid] = append(b.confNotifications[txid], msg) b.confNotifications[txid] = append(b.confNotifications[txid], msg)
case *blockEpochRegistration: case *blockEpochRegistration:
chainntnfs.Log.Infof("New block epoch subscription") chainntnfs.Log.Infof("New block epoch subscription")
b.blockEpochClients[msg.epochID] = msg.epochChan b.blockEpochClients[msg.epochID] = msg
} }
case staleBlockHash := <-b.disconnectedBlockHashes: case staleBlockHash := <-b.disconnectedBlockHashes:
@ -481,17 +483,21 @@ func (b *BtcdNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash
Hash: newSha, Hash: newSha,
} }
for _, epochChan := range b.blockEpochClients { for _, epochClient := range b.blockEpochClients {
b.wg.Add(1) b.wg.Add(1)
go func(ntfnChan chan *chainntnfs.BlockEpoch) { go func(ntfnChan chan *chainntnfs.BlockEpoch, cancelChan chan struct{}) {
defer b.wg.Done() defer b.wg.Done()
select { select {
case ntfnChan <- epoch: case ntfnChan <- epoch:
case <-cancelChan:
return
case <-b.quit: case <-b.quit:
return return
} }
}(epochChan) }(epochClient.epochChan, epochClient.cancelChan)
} }
} }
@ -540,9 +546,9 @@ func (b *BtcdNotifier) checkConfirmationTrigger(txSha *chainhash.Hash,
// requested, or place the notification on the // requested, or place the notification on the
// confirmation heap for future usage. // confirmation heap for future usage.
if confClients, ok := b.confNotifications[*txSha]; ok { if confClients, ok := b.confNotifications[*txSha]; ok {
// Either all of the registered confirmations wtill be // Either all of the registered confirmations will be
// dispatched due to a single confirmation, or added to the // dispatched due to a single confirmation, or added to the
// conf head. Therefor we unconditioanlly delete the registered // conf head. Therefor we unconditionally delete the registered
// confirmations from the staging zone. // confirmations from the staging zone.
defer func() { defer func() {
delete(b.confNotifications, *txSha) delete(b.confNotifications, *txSha)
@ -706,6 +712,8 @@ func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
type blockEpochRegistration struct { type blockEpochRegistration struct {
epochChan chan *chainntnfs.BlockEpoch epochChan chan *chainntnfs.BlockEpoch
cancelChan chan struct{}
epochID uint64 epochID uint64
} }
@ -722,8 +730,9 @@ type epochCancel struct {
// chain. // chain.
func (b *BtcdNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { func (b *BtcdNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) {
registration := &blockEpochRegistration{ registration := &blockEpochRegistration{
epochChan: make(chan *chainntnfs.BlockEpoch, 20), epochChan: make(chan *chainntnfs.BlockEpoch, 20),
epochID: atomic.AddUint64(&b.epochClientCounter, 1), cancelChan: make(chan struct{}),
epochID: atomic.AddUint64(&b.epochClientCounter, 1),
} }
select { select {