chainntnfs: add Done chan to events to signal reorg safety

This commit is contained in:
Wilmer Paulino 2018-12-10 18:29:22 -08:00
parent 52a80f2d37
commit 060f2f7774
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
3 changed files with 163 additions and 0 deletions

@ -199,6 +199,12 @@ type ConfirmationEvent struct {
// NOTE: This channel must be buffered. // NOTE: This channel must be buffered.
NegativeConf chan int32 NegativeConf chan int32
// Done is a channel that gets sent upon once the confirmation request
// is no longer under the risk of being reorged out of the chain.
//
// NOTE: This channel must be buffered.
Done chan struct{}
// Cancel is a closure that should be executed by the caller in the case // Cancel is a closure that should be executed by the caller in the case
// that they wish to prematurely abandon their registered confirmation // that they wish to prematurely abandon their registered confirmation
// notification. // notification.
@ -212,6 +218,7 @@ func NewConfirmationEvent(numConfs uint32, cancel func()) *ConfirmationEvent {
Confirmed: make(chan *TxConfirmation, 1), Confirmed: make(chan *TxConfirmation, 1),
Updates: make(chan uint32, numConfs), Updates: make(chan uint32, numConfs),
NegativeConf: make(chan int32, 1), NegativeConf: make(chan int32, 1),
Done: make(chan struct{}, 1),
Cancel: cancel, Cancel: cancel,
} }
} }
@ -249,6 +256,12 @@ type SpendEvent struct {
// NOTE: This channel must be buffered. // NOTE: This channel must be buffered.
Reorg chan struct{} Reorg chan struct{}
// Done is a channel that gets sent upon once the confirmation request
// is no longer under the risk of being reorged out of the chain.
//
// NOTE: This channel must be buffered.
Done chan struct{}
// Cancel is a closure that should be executed by the caller in the case // Cancel is a closure that should be executed by the caller in the case
// that they wish to prematurely abandon their registered spend // that they wish to prematurely abandon their registered spend
// notification. // notification.
@ -260,6 +273,7 @@ func NewSpendEvent(cancel func()) *SpendEvent {
return &SpendEvent{ return &SpendEvent{
Spend: make(chan *SpendDetail, 1), Spend: make(chan *SpendDetail, 1),
Reorg: make(chan struct{}, 1), Reorg: make(chan struct{}, 1),
Done: make(chan struct{}, 1),
Cancel: cancel, Cancel: cancel,
} }
} }

@ -952,6 +952,7 @@ func (n *TxNotifier) CancelSpend(spendRequest SpendRequest, spendID uint64) {
// their cancel request has been fulfilled. // their cancel request has been fulfilled.
close(ntfn.Event.Spend) close(ntfn.Event.Spend)
close(ntfn.Event.Reorg) close(ntfn.Event.Reorg)
close(ntfn.Event.Done)
delete(spendSet.ntfns, spendID) delete(spendSet.ntfns, spendID)
} }
@ -1749,6 +1750,7 @@ func (n *TxNotifier) TearDown() {
close(ntfn.Event.Confirmed) close(ntfn.Event.Confirmed)
close(ntfn.Event.Updates) close(ntfn.Event.Updates)
close(ntfn.Event.NegativeConf) close(ntfn.Event.NegativeConf)
close(ntfn.Event.Done)
} }
} }
@ -1756,6 +1758,7 @@ func (n *TxNotifier) TearDown() {
for _, ntfn := range spendSet.ntfns { for _, ntfn := range spendSet.ntfns {
close(ntfn.Event.Spend) close(ntfn.Event.Spend)
close(ntfn.Event.Reorg) close(ntfn.Event.Reorg)
close(ntfn.Event.Done)
} }
} }
} }

@ -2110,6 +2110,152 @@ func TestTxNotifierSpendHintCache(t *testing.T) {
} }
} }
// TestTxNotifierNtfnDone ensures that a notification is sent to registered
// clients through the Done channel once the notification request is no longer
// under the risk of being reorged out of the chain.
func TestTxNotifierNtfnDone(t *testing.T) {
t.Parallel()
hintCache := newMockHintCache()
const reorgSafetyLimit = 100
n := chainntnfs.NewTxNotifier(10, reorgSafetyLimit, hintCache, hintCache)
// We'll start by creating two notification requests: one confirmation
// and one spend.
confNtfn := &chainntnfs.ConfNtfn{
ConfID: 1,
ConfRequest: chainntnfs.ConfRequest{
TxID: chainntnfs.ZeroHash,
PkScript: testScript,
},
NumConfirmations: 1,
Event: chainntnfs.NewConfirmationEvent(1, nil),
}
if _, _, err := n.RegisterConf(confNtfn); err != nil {
t.Fatalf("unable to register conf ntfn: %v", err)
}
spendNtfn := &chainntnfs.SpendNtfn{
SpendID: 2,
SpendRequest: chainntnfs.SpendRequest{
OutPoint: chainntnfs.ZeroOutPoint,
PkScript: testScript,
},
Event: chainntnfs.NewSpendEvent(nil),
}
if _, _, err := n.RegisterSpend(spendNtfn); err != nil {
t.Fatalf("unable to register spend: %v", err)
}
// We'll create two transactions that will satisfy the notification
// requests above and include them in the next block of the chain.
tx := wire.NewMsgTx(1)
tx.AddTxOut(&wire.TxOut{PkScript: testRawScript})
spendTx := wire.NewMsgTx(1)
spendTx.AddTxIn(&wire.TxIn{
PreviousOutPoint: wire.OutPoint{Index: 1},
SignatureScript: testSigScript,
})
block := btcutil.NewBlock(&wire.MsgBlock{
Transactions: []*wire.MsgTx{tx, spendTx},
})
err := n.ConnectTip(block.Hash(), 11, block.Transactions())
if err != nil {
t.Fatalf("unable to connect block: %v", err)
}
if err := n.NotifyHeight(11); err != nil {
t.Fatalf("unable to dispatch notifications: %v", err)
}
// With the chain extended, we should see notifications dispatched for
// both requests.
select {
case <-confNtfn.Event.Confirmed:
default:
t.Fatal("expected to receive confirmation notification")
}
select {
case <-spendNtfn.Event.Spend:
default:
t.Fatal("expected to receive spend notification")
}
// The done notifications should not be dispatched yet as the requests
// are still under the risk of being reorged out the chain.
select {
case <-confNtfn.Event.Done:
t.Fatal("received unexpected done notification for confirmation")
case <-spendNtfn.Event.Done:
t.Fatal("received unexpected done notification for spend")
default:
}
// Now, we'll disconnect the block at tip to simulate a reorg. The reorg
// notifications should be dispatched to the respective clients.
if err := n.DisconnectTip(11); err != nil {
t.Fatalf("unable to disconnect block: %v", err)
}
select {
case <-confNtfn.Event.NegativeConf:
default:
t.Fatal("expected to receive reorg notification for confirmation")
}
select {
case <-spendNtfn.Event.Reorg:
default:
t.Fatal("expected to receive reorg notification for spend")
}
// We'll reconnect the block that satisfies both of these requests.
// We should see notifications dispatched for both once again.
err = n.ConnectTip(block.Hash(), 11, block.Transactions())
if err != nil {
t.Fatalf("unable to connect block: %v", err)
}
if err := n.NotifyHeight(11); err != nil {
t.Fatalf("unable to dispatch notifications: %v", err)
}
select {
case <-confNtfn.Event.Confirmed:
default:
t.Fatal("expected to receive confirmation notification")
}
select {
case <-spendNtfn.Event.Spend:
default:
t.Fatal("expected to receive spend notification")
}
// Finally, we'll extend the chain with blocks until the requests are no
// longer under the risk of being reorged out of the chain. We should
// expect the done notifications to be dispatched.
nextHeight := uint32(12)
for i := nextHeight; i < nextHeight+reorgSafetyLimit; i++ {
dummyBlock := btcutil.NewBlock(&wire.MsgBlock{})
if err := n.ConnectTip(dummyBlock.Hash(), i, nil); err != nil {
t.Fatalf("unable to connect block: %v", err)
}
}
select {
case <-confNtfn.Event.Done:
default:
t.Fatal("expected to receive done notification for confirmation")
}
select {
case <-spendNtfn.Event.Done:
default:
t.Fatal("expected to receive done notification for spend")
}
}
// TestTxNotifierTearDown ensures that the TxNotifier properly alerts clients // TestTxNotifierTearDown ensures that the TxNotifier properly alerts clients
// that it is shutting down and will be unable to deliver notifications. // that it is shutting down and will be unable to deliver notifications.
func TestTxNotifierTearDown(t *testing.T) { func TestTxNotifierTearDown(t *testing.T) {