chainntnfs/btcdnotify: ensure block epoch coroutines exit before closing ntfn channel
This commit is contained in:
parent
ab2af76b44
commit
625d57aea6
@ -185,7 +185,6 @@ func (b *BtcdNotifier) Stop() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
for _, epochClient := range b.blockEpochClients {
|
for _, epochClient := range b.blockEpochClients {
|
||||||
close(epochClient.cancelChan)
|
|
||||||
close(epochClient.epochChan)
|
close(epochClient.epochChan)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -261,7 +260,17 @@ out:
|
|||||||
chainntnfs.Log.Infof("Cancelling epoch "+
|
chainntnfs.Log.Infof("Cancelling epoch "+
|
||||||
"notification, epoch_id=%v", msg.epochID)
|
"notification, epoch_id=%v", msg.epochID)
|
||||||
|
|
||||||
|
// First, close the cancel channel for this
|
||||||
|
// specific client, and wait for the client to
|
||||||
|
// exit.
|
||||||
close(b.blockEpochClients[msg.epochID].cancelChan)
|
close(b.blockEpochClients[msg.epochID].cancelChan)
|
||||||
|
b.blockEpochClients[msg.epochID].wg.Wait()
|
||||||
|
|
||||||
|
// Once the client has exited, we can then
|
||||||
|
// safely close the channel used to send epoch
|
||||||
|
// notifications, in order to notify any
|
||||||
|
// listeners that the intent has been
|
||||||
|
// cancelled.
|
||||||
close(b.blockEpochClients[msg.epochID].epochChan)
|
close(b.blockEpochClients[msg.epochID].epochChan)
|
||||||
delete(b.blockEpochClients, msg.epochID)
|
delete(b.blockEpochClients, msg.epochID)
|
||||||
|
|
||||||
@ -486,7 +495,11 @@ func (b *BtcdNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash
|
|||||||
|
|
||||||
for _, epochClient := range b.blockEpochClients {
|
for _, epochClient := range b.blockEpochClients {
|
||||||
b.wg.Add(1)
|
b.wg.Add(1)
|
||||||
go func(ntfnChan chan *chainntnfs.BlockEpoch, cancelChan chan struct{}) {
|
epochClient.wg.Add(1)
|
||||||
|
go func(ntfnChan chan *chainntnfs.BlockEpoch, cancelChan chan struct{},
|
||||||
|
clientWg sync.WaitGroup) {
|
||||||
|
|
||||||
|
defer clientWg.Done()
|
||||||
defer b.wg.Done()
|
defer b.wg.Done()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
@ -498,7 +511,8 @@ func (b *BtcdNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash
|
|||||||
case <-b.quit:
|
case <-b.quit:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}(epochClient.epochChan, epochClient.cancelChan)
|
|
||||||
|
}(epochClient.epochChan, epochClient.cancelChan, epochClient.wg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -712,11 +726,13 @@ func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
|
|||||||
// blockEpochRegistration represents a client's intent to receive a
|
// blockEpochRegistration represents a client's intent to receive a
|
||||||
// notification with each newly connected block.
|
// notification with each newly connected block.
|
||||||
type blockEpochRegistration struct {
|
type blockEpochRegistration struct {
|
||||||
|
epochID uint64
|
||||||
|
|
||||||
epochChan chan *chainntnfs.BlockEpoch
|
epochChan chan *chainntnfs.BlockEpoch
|
||||||
|
|
||||||
cancelChan chan struct{}
|
cancelChan chan struct{}
|
||||||
|
|
||||||
epochID uint64
|
wg sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
// epochCancel is a message sent to the BtcdNotifier when a client wishes to
|
// epochCancel is a message sent to the BtcdNotifier when a client wishes to
|
||||||
|
Loading…
Reference in New Issue
Block a user