chainntnfs/btcd+neurtino: unify + sync ntfn cancels

This commit is contained in:
Conner Fromknecht 2017-07-29 19:19:28 -07:00 committed by Olaoluwa Osuntokun
parent 14a06526b8
commit a9b1af4c73
2 changed files with 46 additions and 10 deletions

@ -256,6 +256,10 @@ out:
close(outPointClients[msg.spendID].spendChan)
delete(b.spendNotifications[msg.op], msg.spendID)
}
// Signal that it's safe yield from Cancel to application.
close(msg.done)
case *epochCancel:
chainntnfs.Log.Infof("Cancelling epoch "+
"notification, epoch_id=%v", msg.epochID)
@ -274,6 +278,7 @@ out:
close(b.blockEpochClients[msg.epochID].epochChan)
delete(b.blockEpochClients, msg.epochID)
// Signal that it's safe yield from Cancel to application.
close(msg.done)
}
case registerMsg := <-b.notificationRegistry:
@ -620,6 +625,8 @@ type spendCancel struct {
// spendID the ID of the notification to cancel.
spendID uint64
done chan struct{}
}
// RegisterSpendNtfn registers an intent to be notified once the target
@ -675,13 +682,22 @@ func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
return &chainntnfs.SpendEvent{
Spend: ntfn.spendChan,
Cancel: func() {
select {
case b.notificationCancels <- &spendCancel{
cancel := &spendCancel{
op: *outpoint,
spendID: ntfn.spendID,
}:
done: make(chan struct{}),
}
// Submit spend cancellation to notification dispatcher.
select {
case b.notificationCancels <- cancel:
// Cancellation is being handled, wait for close before yielding to
// caller.
select {
case <-cancel.done:
case <-b.quit:
}
case <-b.quit:
return
}
},
}, nil
@ -766,14 +782,16 @@ func (b *BtcdNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, er
done: make(chan struct{}),
}
// Submit epoch cancellation to notification dispatcher.
select {
case b.notificationCancels <- cancel:
// Cancellation is being handled, wait for close before yielding to
// caller.
select {
case <-cancel.done:
case <-b.quit:
}
case <-b.quit:
return
}
},
}, nil

@ -267,6 +267,10 @@ func (n *NeutrinoNotifier) notificationDispatcher() {
close(outPointClients[msg.spendID].spendChan)
delete(n.spendNotifications[msg.op], msg.spendID)
}
// Signal that it's safe yield from Cancel to application.
close(msg.done)
case *epochCancel:
chainntnfs.Log.Infof("Cancelling epoch "+
"notification, epoch_id=%v", msg.epochID)
@ -285,6 +289,7 @@ func (n *NeutrinoNotifier) notificationDispatcher() {
close(n.blockEpochClients[msg.epochID].epochChan)
delete(n.blockEpochClients, msg.epochID)
// Signal that it's safe yield from Cancel to application.
close(msg.done)
}
@ -682,6 +687,8 @@ type spendCancel struct {
// spendID the ID of the notification to cancel.
spendID uint64
done chan struct{}
}
// RegisterSpendNtfn registers an intent to be notified once the target
@ -706,13 +713,22 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
spendEvent := &chainntnfs.SpendEvent{
Spend: ntfn.spendChan,
Cancel: func() {
select {
case n.notificationCancels <- &spendCancel{
cancel := &spendCancel{
op: *outpoint,
spendID: ntfn.spendID,
}:
done: make(chan struct{}),
}
// Submit spend cancellation to notification dispatcher.
select {
case n.notificationCancels <- cancel:
// Cancellation is being handled, wait for close before yielding to
// caller.
select {
case <-cancel.done:
case <-n.quit:
}
case <-n.quit:
return
}
},
}
@ -870,14 +886,16 @@ func (n *NeutrinoNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent
done: make(chan struct{}),
}
// Submit epoch cancellation to notification dispatcher.
select {
case n.notificationCancels <- cancel:
// Cancellation is being handled, wait for close before yielding to
// caller.
select {
case <-cancel.done:
case <-n.quit:
}
case <-n.quit:
return
}
},
}, nil