chainntnfs: ensure all block epoch notifications are sent *in order*

In this commit, we fix a lingering bug related to the way that we
deliver block epoch notifications to end users. Before this commit, we
would launch a new goroutine for *each block*. This was done in order
to ensure that the notification dispatch wouldn’t block the main
goroutine that was dispatching the notifications. This method archived
the goal, but had a nasty side effect that the goroutines could be
re-ordered during scheduling, meaning that in the case of fast
successive blocks, then notifications would be delivered out of order.
Receiving out of order notifications is either disallowed, or can cause
sub-systems that rely on these notifications to get into weird states.

In order to fix this issue, we’ll no longer launch a new goroutine to
deliver each notification to an awaiting client. Instead, each client
will now gain a concurrent in-order queue for notification delivery.
Due to the internal design of chainntnfs.ConcurrentQueue, the caller
should never block, yet the receivers will receive notifications in
order. This change solves the re-ordering issue and also minimizes the
number of goroutines that we’ll create in order to deliver block epoch
notifications.
This commit is contained in:
Olaoluwa Osuntokun 2018-02-09 16:13:21 -08:00
parent 9f52372cd2
commit 9c18c3d9a4
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21
3 changed files with 172 additions and 57 deletions

@ -174,6 +174,9 @@ func (b *BitcoindNotifier) Stop() error {
}
}
for _, epochClient := range b.blockEpochClients {
close(epochClient.cancelChan)
epochClient.wg.Wait()
close(epochClient.epochChan)
}
b.txConfNotifier.TearDown()
@ -213,7 +216,13 @@ out:
chainntnfs.Log.Infof("Cancelling epoch "+
"notification, epoch_id=%v", msg.epochID)
// First, close the cancel channel for this
// First, we'll lookup the original
// registration in order to stop the active
// queue goroutine.
reg := b.blockEpochClients[msg.epochID]
reg.epochQueue.Stop()
// Next, close the cancel channel for this
// specific client, and wait for the client to
// exit.
close(b.blockEpochClients[msg.epochID].cancelChan)
@ -441,27 +450,14 @@ func (b *BitcoindNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.
}
for _, epochClient := range b.blockEpochClients {
b.wg.Add(1)
epochClient.wg.Add(1)
go func(ntfnChan chan *chainntnfs.BlockEpoch, cancelChan chan struct{},
clientWg *sync.WaitGroup) {
select {
// TODO(roasbeef): move to goroutine per client, use sync queue
case epochClient.epochQueue.ChanIn() <- epoch:
defer clientWg.Done()
defer b.wg.Done()
case <-epochClient.cancelChan:
select {
case ntfnChan <- epoch:
case <-cancelChan:
return
case <-b.quit:
return
}
}(epochClient.epochChan, epochClient.cancelChan, &epochClient.wg)
case <-b.quit:
}
}
}
@ -628,6 +624,8 @@ type blockEpochRegistration struct {
epochChan chan *chainntnfs.BlockEpoch
epochQueue *chainntnfs.ConcurrentQueue
cancelChan chan struct{}
wg sync.WaitGroup
@ -643,22 +641,58 @@ type epochCancel struct {
// caller to receive notifications, of each new block connected to the main
// chain.
func (b *BitcoindNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) {
registration := &blockEpochRegistration{
reg := &blockEpochRegistration{
epochQueue: chainntnfs.NewConcurrentQueue(20),
epochChan: make(chan *chainntnfs.BlockEpoch, 20),
cancelChan: make(chan struct{}),
epochID: atomic.AddUint64(&b.epochClientCounter, 1),
}
reg.epochQueue.Start()
// Before we send the request to the main goroutine, we'll launch a new
// goroutine to proxy items added to our queue to the client itself.
// This ensures that all notifications are received *in order*.
reg.wg.Add(1)
go func() {
defer reg.wg.Done()
for {
select {
case ntfn := <-reg.epochQueue.ChanOut():
blockNtfn := ntfn.(*chainntnfs.BlockEpoch)
select {
case reg.epochChan <- blockNtfn:
case <-reg.cancelChan:
return
case <-b.quit:
return
}
case <-reg.cancelChan:
return
case <-b.quit:
return
}
}
}()
select {
case <-b.quit:
// As we're exiting before the registration could be sent,
// we'll stop the queue now ourselves.
reg.epochQueue.Stop()
return nil, errors.New("chainntnfs: system interrupt while " +
"attempting to register for block epoch notification.")
case b.notificationRegistry <- registration:
case b.notificationRegistry <- reg:
return &chainntnfs.BlockEpochEvent{
Epochs: registration.epochChan,
Epochs: reg.epochChan,
Cancel: func() {
cancel := &epochCancel{
epochID: registration.epochID,
epochID: reg.epochID,
}
// Submit epoch cancellation to notification dispatcher.
@ -668,7 +702,7 @@ func (b *BitcoindNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent
// closed before yielding to caller.
for {
select {
case _, ok := <-registration.epochChan:
case _, ok := <-reg.epochChan:
if !ok {
return
}

@ -184,6 +184,9 @@ func (b *BtcdNotifier) Stop() error {
}
}
for _, epochClient := range b.blockEpochClients {
close(epochClient.cancelChan)
epochClient.wg.Wait()
close(epochClient.epochChan)
}
b.txConfNotifier.TearDown()
@ -247,7 +250,13 @@ out:
chainntnfs.Log.Infof("Cancelling epoch "+
"notification, epoch_id=%v", msg.epochID)
// First, close the cancel channel for this
// First, we'll lookup the original
// registration in order to stop the active
// queue goroutine.
reg := b.blockEpochClients[msg.epochID]
reg.epochQueue.Stop()
// Next, close the cancel channel for this
// specific client, and wait for the client to
// exit.
close(b.blockEpochClients[msg.epochID].cancelChan)
@ -260,7 +269,6 @@ out:
// cancelled.
close(b.blockEpochClients[msg.epochID].epochChan)
delete(b.blockEpochClients, msg.epochID)
}
case registerMsg := <-b.notificationRegistry:
switch msg := registerMsg.(type) {
@ -462,27 +470,14 @@ func (b *BtcdNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash
}
for _, epochClient := range b.blockEpochClients {
b.wg.Add(1)
epochClient.wg.Add(1)
go func(ntfnChan chan *chainntnfs.BlockEpoch, cancelChan chan struct{},
clientWg *sync.WaitGroup) {
select {
// TODO(roasbeef): move to goroutine per client, use sync queue
case epochClient.epochQueue.ChanIn() <- epoch:
defer clientWg.Done()
defer b.wg.Done()
case <-epochClient.cancelChan:
select {
case ntfnChan <- epoch:
case <-cancelChan:
return
case <-b.quit:
return
}
}(epochClient.epochChan, epochClient.cancelChan, &epochClient.wg)
case <-b.quit:
}
}
}
@ -631,6 +626,8 @@ type blockEpochRegistration struct {
epochChan chan *chainntnfs.BlockEpoch
epochQueue *chainntnfs.ConcurrentQueue
cancelChan chan struct{}
wg sync.WaitGroup
@ -646,32 +643,69 @@ type epochCancel struct {
// caller to receive notifications, of each new block connected to the main
// chain.
func (b *BtcdNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) {
registration := &blockEpochRegistration{
reg := &blockEpochRegistration{
epochQueue: chainntnfs.NewConcurrentQueue(20),
epochChan: make(chan *chainntnfs.BlockEpoch, 20),
cancelChan: make(chan struct{}),
epochID: atomic.AddUint64(&b.epochClientCounter, 1),
}
reg.epochQueue.Start()
// Before we send the request to the main goroutine, we'll launch a new
// goroutine to proxy items added to our queue to the client itself.
// This ensures that all notifications are received *in order*.
reg.wg.Add(1)
go func() {
defer reg.wg.Done()
for {
select {
case ntfn := <-reg.epochQueue.ChanOut():
blockNtfn := ntfn.(*chainntnfs.BlockEpoch)
select {
case reg.epochChan <- blockNtfn:
case <-reg.cancelChan:
return
case <-b.quit:
return
}
case <-reg.cancelChan:
return
case <-b.quit:
return
}
}
}()
select {
case <-b.quit:
// As we're exiting before the registration could be sent,
// we'll stop the queue now ourselves.
reg.epochQueue.Stop()
return nil, errors.New("chainntnfs: system interrupt while " +
"attempting to register for block epoch notification.")
case b.notificationRegistry <- registration:
case b.notificationRegistry <- reg:
return &chainntnfs.BlockEpochEvent{
Epochs: registration.epochChan,
Epochs: reg.epochChan,
Cancel: func() {
cancel := &epochCancel{
epochID: registration.epochID,
epochID: reg.epochID,
}
// Submit epoch cancellation to notification dispatcher.
select {
case b.notificationCancels <- cancel:
// Cancellation is being handled, drain the epoch channel until it is
// closed before yielding to caller.
// Cancellation is being handled, drain
// the epoch channel until it is closed
// before yielding to caller.
for {
select {
case _, ok := <-registration.epochChan:
case _, ok := <-reg.epochChan:
if !ok {
return
}

@ -181,6 +181,9 @@ func (n *NeutrinoNotifier) Stop() error {
}
}
for _, epochClient := range n.blockEpochClients {
close(epochClient.cancelChan)
epochClient.wg.Wait()
close(epochClient.epochChan)
}
n.txConfNotifier.TearDown()
@ -257,7 +260,13 @@ func (n *NeutrinoNotifier) notificationDispatcher() {
chainntnfs.Log.Infof("Cancelling epoch "+
"notification, epoch_id=%v", msg.epochID)
// First, close the cancel channel for this
// First, we'll lookup the original
// registration in order to stop the active
// queue goroutine.
reg := n.blockEpochClients[msg.epochID]
reg.epochQueue.Stop()
// Next, close the cancel channel for this
// specific client, and wait for the client to
// exit.
close(n.blockEpochClients[msg.epochID].cancelChan)
@ -715,6 +724,8 @@ type blockEpochRegistration struct {
epochChan chan *chainntnfs.BlockEpoch
epochQueue *chainntnfs.ConcurrentQueue
cancelChan chan struct{}
wg sync.WaitGroup
@ -729,22 +740,58 @@ type epochCancel struct {
// RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the caller
// to receive notifications, of each new block connected to the main chain.
func (n *NeutrinoNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) {
registration := &blockEpochRegistration{
reg := &blockEpochRegistration{
epochQueue: chainntnfs.NewConcurrentQueue(20),
epochChan: make(chan *chainntnfs.BlockEpoch, 20),
cancelChan: make(chan struct{}),
epochID: atomic.AddUint64(&n.epochClientCounter, 1),
}
reg.epochQueue.Start()
// Before we send the request to the main goroutine, we'll launch a new
// goroutine to proxy items added to our queue to the client itself.
// This ensures that all notifications are received *in order*.
reg.wg.Add(1)
go func() {
defer reg.wg.Done()
for {
select {
case ntfn := <-reg.epochQueue.ChanOut():
blockNtfn := ntfn.(*chainntnfs.BlockEpoch)
select {
case reg.epochChan <- blockNtfn:
case <-reg.cancelChan:
return
case <-n.quit:
return
}
case <-reg.cancelChan:
return
case <-n.quit:
return
}
}
}()
select {
case <-n.quit:
// As we're exiting before the registration could be sent,
// we'll stop the queue now ourselves.
reg.epochQueue.Stop()
return nil, errors.New("chainntnfs: system interrupt while " +
"attempting to register for block epoch notification.")
case n.notificationRegistry <- registration:
case n.notificationRegistry <- reg:
return &chainntnfs.BlockEpochEvent{
Epochs: registration.epochChan,
Epochs: reg.epochChan,
Cancel: func() {
cancel := &epochCancel{
epochID: registration.epochID,
epochID: reg.epochID,
}
// Submit epoch cancellation to notification dispatcher.
@ -754,7 +801,7 @@ func (n *NeutrinoNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent
// closed before yielding to caller.
for {
select {
case _, ok := <-registration.epochChan:
case _, ok := <-reg.epochChan:
if !ok {
return
}