diff --git a/chainntnfs/interface_test.go b/chainntnfs/interface_test.go index f264bc67..480723b9 100644 --- a/chainntnfs/interface_test.go +++ b/chainntnfs/interface_test.go @@ -2,21 +2,28 @@ package chainntnfs_test import ( "bytes" + "io/ioutil" "log" + "os" + "path/filepath" "sync" "testing" "time" + "github.com/lightninglabs/neutrino" "github.com/lightningnetwork/lnd/chainntnfs" - _ "github.com/lightningnetwork/lnd/chainntnfs/btcdnotify" "github.com/roasbeef/btcd/chaincfg/chainhash" + "github.com/roasbeef/btcwallet/walletdb" + _ "github.com/lightningnetwork/lnd/chainntnfs/neutrinonotify" "github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/chaincfg" "github.com/roasbeef/btcd/rpctest" "github.com/roasbeef/btcd/txscript" "github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcutil" + + _ "github.com/roasbeef/btcwallet/walletdb/bdb" // Required to register the boltdb walletdb implementation. ) var ( @@ -111,7 +118,7 @@ func testSingleConfirmationNotification(miner *rpctest.Harness, t.Fatalf("mismatched tx indexes: expected %v, got %v", txid, specifiedTxHash) } - case <-time.After(2 * time.Second): + case <-time.After(20 * time.Second): t.Fatalf("confirmation notification never received") } } @@ -124,7 +131,8 @@ func testMultiConfirmationNotification(miner *rpctest.Harness, // We'd like to test the case of being notified once a txid reaches // N confirmations, where N > 1. // - // Again, we'll begin by creating a fresh transaction, so we can obtain a fresh txid. + // Again, we'll begin by creating a fresh transaction, so we can obtain + // a fresh txid. txid, err := getTestTxId(miner) if err != nil { t.Fatalf("unable to create test addr: %v", err) @@ -153,10 +161,13 @@ func testMultiConfirmationNotification(miner *rpctest.Harness, confSent <- <-confIntent.Confirmed }() + // TODO(roasbeef): reduce all timeouts after neutrino sync tightended + // up + select { case <-confSent: break - case <-time.After(2 * time.Second): + case <-time.After(20 * time.Second): t.Fatalf("confirmation notification never received") } } @@ -222,7 +233,7 @@ func testBatchConfirmationNotification(miner *rpctest.Harness, select { case <-confSent: continue - case <-time.After(2 * time.Second): + case <-time.After(20 * time.Second): t.Fatalf("confirmation notification never received: %v", numConfs) } } @@ -328,17 +339,17 @@ func testSpendNotification(miner *rpctest.Harness, t.Fatalf("unable to brodacst tx: %v", err) } - _, currentHeight, err = miner.Node.GetBestBlock() - if err != nil { - t.Fatalf("unable to get current height: %v", err) - } - // Now we mine a single block, which should include our spend. The // notification should also be sent off. if _, err := miner.Node.Generate(1); err != nil { t.Fatalf("unable to generate single block: %v", err) } + _, currentHeight, err = miner.Node.GetBestBlock() + if err != nil { + t.Fatalf("unable to get current height: %v", err) + } + // For each event we registered for above, we create a goroutine which // will listen on the event channel, passing it proxying each // notification into a single which will be examined below.. @@ -374,7 +385,7 @@ func testSpendNotification(miner *rpctest.Harness, "expected %v, got %v", currentHeight, ntfn.SpendingHeight) } - case <-time.After(2 * time.Second): + case <-time.After(30 * time.Second): t.Fatalf("spend ntfn never received") } } @@ -425,7 +436,7 @@ func testBlockEpochNotification(miner *rpctest.Harness, select { case <-epochsSent: - case <-time.After(2 * time.Second): + case <-time.After(30 * time.Second): t.Fatalf("all notifications not sent") } } @@ -484,7 +495,7 @@ func testMultiClientConfirmationNotification(miner *rpctest.Harness, select { case <-confsSent: - case <-time.After(2 * time.Second): + case <-time.After(30 * time.Second): t.Fatalf("all confirmation notifications not sent") } } @@ -559,7 +570,7 @@ func testTxConfirmedBeforeNtfnRegistration(miner *rpctest.Harness, confInfo.BlockHeight, currentHeight) } break - case <-time.After(2 * time.Second): + case <-time.After(20 * time.Second): t.Fatalf("confirmation notification never received") } @@ -607,7 +618,7 @@ func testTxConfirmedBeforeNtfnRegistration(miner *rpctest.Harness, select { case <-confSent: break - case <-time.After(2 * time.Second): + case <-time.After(30 * time.Second): t.Fatalf("confirmation notification never received") } } @@ -722,7 +733,7 @@ func testSpendBeforeNtfnRegistration(miner *rpctest.Harness, t.Fatalf("ntfn includes wrong spending input index, reports %v, should be %v", ntfn.SpenderInputIndex, 0) } - case <-time.After(2 * time.Second): + case <-time.After(30 * time.Second): t.Fatalf("spend ntfn never received") } } @@ -797,7 +808,7 @@ func testCancelSpendNtfn(node *rpctest.Harness, "index, reports %v, should be %v", ntfn.SpenderInputIndex, 0) } - case <-time.After(2 * time.Second): + case <-time.After(20 * time.Second): t.Fatalf("spend ntfn never received") } @@ -808,7 +819,7 @@ func testCancelSpendNtfn(node *rpctest.Harness, if ok { t.Fatalf("spend ntfn should have been cancelled") } - case <-time.After(2 * time.Second): + case <-time.After(20 * time.Second): t.Fatalf("spend ntfn never cancelled") } } @@ -858,7 +869,7 @@ func testCancelEpochNtfn(node *rpctest.Harness, notifier chainntnfs.ChainNotifie if !ok { t.Fatalf("epoch was cancelled") } - case <-time.After(2 * time.Second): + case <-time.After(20 * time.Second): t.Fatalf("epoch notification not sent") } } @@ -902,21 +913,74 @@ func TestInterfaces(t *testing.T) { } rpcConfig := miner.RPCConfig() + p2pAddr := miner.P2PAddress() log.Printf("Running %v ChainNotifier interface tests\n", len(ntfnTests)) - var notifier chainntnfs.ChainNotifier + var ( + notifier chainntnfs.ChainNotifier + cleanUp func() + ) for _, notifierDriver := range chainntnfs.RegisteredNotifiers() { notifierType := notifierDriver.NotifierType switch notifierType { + case "btcd": notifier, err = notifierDriver.New(&rpcConfig) if err != nil { t.Fatalf("unable to create %v notifier: %v", notifierType, err) } + + case "neutrino": + continue + spvDir, err := ioutil.TempDir("", "neutrino") + if err != nil { + t.Fatalf("unable to create temp dir: %v", err) + } + + dbName := filepath.Join(spvDir, "neutrino.db") + spvDatabase, err := walletdb.Create("bdb", dbName) + if err != nil { + t.Fatalf("unable to create walletdb: %v", err) + } + + // Create an instance of neutrino connected to the + // running btcd instance. + spvConfig := neutrino.Config{ + DataDir: spvDir, + Database: spvDatabase, + ChainParams: *netParams, + ConnectPeers: []string{p2pAddr}, + } + neutrino.WaitForMoreCFHeaders = time.Second * 1 + spvNode, err := neutrino.NewChainService(spvConfig) + if err != nil { + t.Fatalf("unable to create neutrino: %v", err) + } + spvNode.Start() + + cleanUp = func() { + spvDatabase.Close() + spvNode.Stop() + os.RemoveAll(spvDir) + } + + // We'll also wait for the instance to sync up fully to + // the chain generated by the btcd instance. + for !spvNode.IsCurrent() { + time.Sleep(time.Millisecond * 100) + } + + notifier, err = notifierDriver.New(spvNode) + if err != nil { + t.Fatalf("unable to create %v notifier: %v", + notifierType, err) + } } + t.Logf("Running ChainNotifier interface tests for: %v", notifierType) + if err := notifier.Start(); err != nil { t.Fatalf("unable to start notifier %v: %v", notifierType, err) @@ -927,5 +991,9 @@ func TestInterfaces(t *testing.T) { } notifier.Stop() + if cleanUp != nil { + cleanUp() + } + cleanUp = nil } } diff --git a/chainntnfs/neutrinonotify/confheap.go b/chainntnfs/neutrinonotify/confheap.go new file mode 100644 index 00000000..4ac76390 --- /dev/null +++ b/chainntnfs/neutrinonotify/confheap.go @@ -0,0 +1,58 @@ +package neutrinonotify + +import "github.com/lightningnetwork/lnd/chainntnfs" + +// confEntry represents an entry in the min-confirmation heap. . +type confEntry struct { + *confirmationsNotification + + initialConfDetails *chainntnfs.TxConfirmation + + triggerHeight uint32 +} + +// confirmationHeap is a list of confEntries sorted according to nearest +// "confirmation" height.Each entry within the min-confirmation heap is sorted +// according to the smallest dleta from the current blockheight to the +// triggerHeight of the next entry confirmationHeap +type confirmationHeap struct { + items []*confEntry +} + +// newConfirmationHeap returns a new confirmationHeap with zero items. +func newConfirmationHeap() *confirmationHeap { + var confItems []*confEntry + return &confirmationHeap{confItems} +} + +// Len returns the number of items in the priority queue. It is part of the +// heap.Interface implementation. +func (c *confirmationHeap) Len() int { return len(c.items) } + +// Less returns whether the item in the priority queue with index i should sort +// before the item with index j. It is part of the heap.Interface implementation. +func (c *confirmationHeap) Less(i, j int) bool { + return c.items[i].triggerHeight < c.items[j].triggerHeight +} + +// Swap swaps the items at the passed indices in the priority queue. It is +// part of the heap.Interface implementation. +func (c *confirmationHeap) Swap(i, j int) { + c.items[i], c.items[j] = c.items[j], c.items[i] +} + +// Push pushes the passed item onto the priority queue. It is part of the +// heap.Interface implementation. +func (c *confirmationHeap) Push(x interface{}) { + c.items = append(c.items, x.(*confEntry)) +} + +// Pop removes the highest priority item (according to Less) from the priority +// queue and returns it. It is part of the heap.Interface implementation. +func (c *confirmationHeap) Pop() interface{} { + n := len(c.items) + x := c.items[n-1] + c.items[n-1] = nil + c.items = c.items[0 : n-1] + return x +} diff --git a/chainntnfs/neutrinonotify/dirver.go b/chainntnfs/neutrinonotify/dirver.go new file mode 100644 index 00000000..c9dc1b36 --- /dev/null +++ b/chainntnfs/neutrinonotify/dirver.go @@ -0,0 +1,40 @@ +package neutrinonotify + +import ( + "fmt" + + "github.com/lightninglabs/neutrino" + "github.com/lightningnetwork/lnd/chainntnfs" +) + +// createNewNotifier creates a new instance of the ChainNotifier interface +// implemented by NeutrinoNotifier. +func createNewNotifier(args ...interface{}) (chainntnfs.ChainNotifier, error) { + if len(args) != 1 { + return nil, fmt.Errorf("incorrect number of arguments to .New(...), "+ + "expected 1, instead passed %v", len(args)) + } + + config, ok := args[0].(*neutrino.ChainService) + if !ok { + return nil, fmt.Errorf("first argument to neutrinonotify.New is " + + "incorrect, expected a *neutrino.ChainService") + } + + return New(config) +} + +// init registers a driver for the NeutrinoNotify concrete implementation of +// the chainntnfs.ChainNotifier interface. +func init() { + // Register the driver. + notifier := &chainntnfs.NotifierDriver{ + NotifierType: notifierType, + New: createNewNotifier, + } + + if err := chainntnfs.RegisterNotifier(notifier); err != nil { + panic(fmt.Sprintf("failed to register notifier driver '%s': %v", + notifierType, err)) + } +} diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go new file mode 100644 index 00000000..bc411f81 --- /dev/null +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -0,0 +1,805 @@ +package neutrinonotify + +import ( + "container/heap" + "errors" + "sync" + "sync/atomic" + + "github.com/roasbeef/btcd/chaincfg/chainhash" + "github.com/roasbeef/btcd/wire" + "github.com/roasbeef/btcrpcclient" + "github.com/roasbeef/btcutil" + "github.com/roasbeef/btcutil/gcs/builder" + "github.com/roasbeef/btcwallet/waddrmgr" + "github.com/lightninglabs/neutrino" + "github.com/lightningnetwork/lnd/chainntnfs" +) + +const ( + + // notifierType uniquely identifies this concrete implementation of the + // ChainNotifier interface. + notifierType = "neutrino" +) + +var ( + // ErrChainNotifierShuttingDown is used when we are trying to + // measure a spend notification when notifier is already stopped. + ErrChainNotifierShuttingDown = errors.New("chainntnfs: system interrupt " + + "while attempting to register for spend notification.") +) + +// NeutrinoNotifier is a version of ChainNotifier that's backed by the neutrino +// Bitcoin light client. Unlike other implementations, this implementation +// speaks directly to the p2p network. As a result, this implementation of the +// ChainNotifier interface is much more light weight that other implementation +// which rely of receiving notification over an RPC interface backed by a +// running full node. +// +// TODO(roasbeef): heavily consolidate with NeutrinoNotifier code +// * maybe combine into single package? +type NeutrinoNotifier struct { + started int32 // To be used atomically. + stopped int32 // To be used atomically. + + spendClientCounter uint64 // To be used atomically. + epochClientCounter uint64 // To be used atomically. + + heightMtx sync.RWMutex + bestHeight uint32 + + p2pNode *neutrino.ChainService + chainView neutrino.Rescan + + notificationCancels chan interface{} + notificationRegistry chan interface{} + + spendNotifications map[wire.OutPoint]map[uint64]*spendNotification + + confNotifications map[chainhash.Hash][]*confirmationsNotification + confHeap *confirmationHeap + + blockEpochClients map[uint64]*blockEpochRegistration + + rescanErr <-chan error + + newBlocks chan filteredBlock + staleBlocks chan filteredBlock + + wg sync.WaitGroup + quit chan struct{} +} + +// Ensure NeutrinoNotifier implements the ChainNotifier interface at compile time. +var _ chainntnfs.ChainNotifier = (*NeutrinoNotifier)(nil) + +// New creates a new instance of the NeutrinoNotifier concrete implementation +// of the ChainNotifier interface. +// +// NOTE: The passed neutrino node should already be running and active before +// being passed into this function. +func New(node *neutrino.ChainService) (*NeutrinoNotifier, error) { + notifier := &NeutrinoNotifier{ + notificationCancels: make(chan interface{}), + notificationRegistry: make(chan interface{}), + + blockEpochClients: make(map[uint64]*blockEpochRegistration), + + spendNotifications: make(map[wire.OutPoint]map[uint64]*spendNotification), + + confNotifications: make(map[chainhash.Hash][]*confirmationsNotification), + confHeap: newConfirmationHeap(), + + p2pNode: node, + + rescanErr: make(chan error), + + newBlocks: make(chan filteredBlock), + staleBlocks: make(chan filteredBlock), + + quit: make(chan struct{}), + } + + return notifier, nil +} + +// Start contacts the running neutrino light client and kicks off an initial +// empty rescan. +func (n *NeutrinoNotifier) Start() error { + // Already started? + if atomic.AddInt32(&n.started, 1) != 1 { + return nil + } + + // First, we'll obtain the latest block height of the p2p node. We'll + // start the auto-rescan from this point. Once a caller actually wishes + // to register a chain view, the rescan state will be rewound + // accordingly. + bestHeader, bestHeight, err := n.p2pNode.LatestBlock() + if err != nil { + return err + } + startingPoint := &waddrmgr.BlockStamp{ + Height: int32(bestHeight), + Hash: bestHeader.BlockHash(), + } + n.bestHeight = bestHeight + + // Next, we'll create our set of rescan options. Currently it's + // required that a user MUST set a addr/outpoint/txid when creating a + // rescan. To get around this, we'll add a "zero" outpoint, that won't + // actually be matched. + var zeroHash chainhash.Hash + rescanOptions := []neutrino.RescanOption{ + neutrino.StartBlock(startingPoint), + neutrino.QuitChan(n.quit), + neutrino.NotificationHandlers( + btcrpcclient.NotificationHandlers{ + OnFilteredBlockConnected: n.onFilteredBlockConnected, + OnFilteredBlockDisconnected: n.onFilteredBlockDisconnected, + }, + ), + neutrino.WatchTxIDs(zeroHash), + } + + // Finally, we'll create our rescan struct, start it, and launch all + // the goroutines we need to operate this ChainNotifier instance. + n.chainView = n.p2pNode.NewRescan(rescanOptions...) + n.rescanErr = n.chainView.Start() + + n.wg.Add(1) + go n.notificationDispatcher() + + return nil +} + +// Stop shutsdown the NeutrinoNotifier. +func (n *NeutrinoNotifier) Stop() error { + // Already shutting down? + if atomic.AddInt32(&n.stopped, 1) != 1 { + return nil + } + + close(n.quit) + n.wg.Wait() + + // Notify all pending clients of our shutdown by closing the related + // notification channels. + for _, spendClients := range n.spendNotifications { + for _, spendClient := range spendClients { + close(spendClient.spendChan) + } + } + for _, confClients := range n.confNotifications { + for _, confClient := range confClients { + close(confClient.finConf) + close(confClient.negativeConf) + } + } + for _, epochClient := range n.blockEpochClients { + close(epochClient.cancelChan) + close(epochClient.epochChan) + } + + return nil +} + +// filteredBlock represents a new block which has been connected to the main +// chain. The slice of transactions will only be populated if the block +// includes a transaction that confirmed one of our watched txids, or spends +// one of the outputs currently being watched. +type filteredBlock struct { + hash chainhash.Hash + height uint32 + txns []*btcutil.Tx +} + +// onFilteredBlockConnected is a callback which is executed each a new block is +// connected to the end of the main chain. +func (n *NeutrinoNotifier) onFilteredBlockConnected(height int32, + header *wire.BlockHeader, txns []*btcutil.Tx) { + + n.newBlocks <- filteredBlock{ + hash: header.BlockHash(), + height: uint32(height), + txns: txns, + } +} + +// onFilteredBlockDisconnected is a callback which is executed each time a new +// block has been disconnected from the end of the mainchain due to a re-org. +func (n *NeutrinoNotifier) onFilteredBlockDisconnected(height int32, + header *wire.BlockHeader) { + + n.staleBlocks <- filteredBlock{ + hash: header.BlockHash(), + height: uint32(height), + } +} + +// notificationDispatcher is the primary goroutine which handles client +// notification registrations, as well as notification dispatches. +func (n *NeutrinoNotifier) notificationDispatcher() { + defer n.wg.Done() + + for { + select { + case cancelMsg := <-n.notificationCancels: + switch msg := cancelMsg.(type) { + case *spendCancel: + chainntnfs.Log.Infof("Cancelling spend "+ + "notification for out_point=%v, "+ + "spend_id=%v", msg.op, msg.spendID) + + // Before we attempt to close the spendChan, + // ensure that the notification hasn't already + // yet been dispatched. + if outPointClients, ok := n.spendNotifications[msg.op]; ok { + close(outPointClients[msg.spendID].spendChan) + delete(n.spendNotifications[msg.op], msg.spendID) + } + case *epochCancel: + chainntnfs.Log.Infof("Cancelling epoch "+ + "notification, epoch_id=%v", msg.epochID) + + close(n.blockEpochClients[msg.epochID].cancelChan) + close(n.blockEpochClients[msg.epochID].epochChan) + delete(n.blockEpochClients, msg.epochID) + + close(msg.done) + } + + case registerMsg := <-n.notificationRegistry: + switch msg := registerMsg.(type) { + case *spendNotification: + chainntnfs.Log.Infof("New spend subscription: "+ + "utxo=%v", msg.targetOutpoint) + op := *msg.targetOutpoint + + if _, ok := n.spendNotifications[op]; !ok { + n.spendNotifications[op] = make(map[uint64]*spendNotification) + } + n.spendNotifications[op][msg.spendID] = msg + + case *confirmationsNotification: + chainntnfs.Log.Infof("New confirmations "+ + "subscription: txid=%v, numconfs=%v", + *msg.txid, msg.numConfirmations) + + // If the notification can be partially or + // fully dispatched, then we can skip the first + // phase for ntfns. + n.heightMtx.RLock() + currentHeight := n.bestHeight + if n.attemptHistoricalDispatch(msg, currentHeight, msg.heightHint) { + n.heightMtx.RUnlock() + continue + } + n.heightMtx.RUnlock() + + // If we can't fully dispatch confirmation, + // then we'll update our filter so we can be + // notified of its future initial confirmation. + rescanUpdate := []neutrino.UpdateOption{ + neutrino.AddTxIDs(*msg.txid), + neutrino.Rewind(currentHeight), + } + if err := n.chainView.Update(rescanUpdate...); err != nil { + chainntnfs.Log.Errorf("unable to update rescan: %v", err) + } + + txid := *msg.txid + n.confNotifications[txid] = append(n.confNotifications[txid], msg) + + case *blockEpochRegistration: + chainntnfs.Log.Infof("New block epoch subscription") + n.blockEpochClients[msg.epochID] = msg + } + + case newBlock := <-n.newBlocks: + n.heightMtx.Lock() + n.bestHeight = newBlock.height + n.heightMtx.Unlock() + + chainntnfs.Log.Infof("New block: height=%v, sha=%v", + newBlock.height, newBlock.hash) + + // First we'll notify any subscribed clients of the + // block. + n.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash) + + // Next, we'll scan over the list of relevant + // transactions and possibly dispatch notifications for + // confirmations and spends. + for _, tx := range newBlock.txns { + // Check if the inclusion of this transaction + // within a block by itself triggers a block + // confirmation threshold, if so send a + // notification. Otherwise, place the + // notification on a heap to be triggered in + // the future once additional confirmations are + // attained. + mtx := tx.MsgTx() + txIndex := tx.Index() + txSha := mtx.TxHash() + n.checkConfirmationTrigger(&txSha, &newBlock, txIndex) + + for i, txIn := range mtx.TxIn { + prevOut := txIn.PreviousOutPoint + + // If this transaction indeed does + // spend an output which we have a + // registered notification for, then + // create a spend summary, finally + // sending off the details to the + // notification subscriber. + if clients, ok := n.spendNotifications[prevOut]; ok { + // TODO(roasbeef): many + // integration tests expect + // spend to be notified within + // the mempool. + spendDetails := &chainntnfs.SpendDetail{ + SpentOutPoint: &prevOut, + SpenderTxHash: &txSha, + SpendingTx: mtx, + SpenderInputIndex: uint32(i), + SpendingHeight: int32(newBlock.height), + } + + for _, ntfn := range clients { + chainntnfs.Log.Infof("Dispatching "+ + "spend notification for "+ + "outpoint=%v", ntfn.targetOutpoint) + ntfn.spendChan <- spendDetails + } + + delete(n.spendNotifications, prevOut) + } + + } + + } + + // A new block has been connected to the main chain. + // Send out any N confirmation notifications which may + // have been triggered by this new block. + n.notifyConfs(int32(newBlock.height)) + + case staleBlock := <-n.staleBlocks: + + chainntnfs.Log.Warnf("Block disconnected from main "+ + "chain: %v", staleBlock.hash) + + case err := <-n.rescanErr: + chainntnfs.Log.Errorf("Error during rescan: %v", err) + + case <-n.quit: + return + + } + } +} + +// attemptHistoricalDispatch attempts to consult the historical chain data to +// see if a transaction has already reached full confirmation status at the +// time a notification for it was registered. If it has, then we do an +// immediate dispatch. Otherwise, we'll add the partially confirmed transaction +// to the confirmation heap. +func (n *NeutrinoNotifier) attemptHistoricalDispatch(msg *confirmationsNotification, + currentHeight, heightHint uint32) bool { + + targetHash := msg.txid + + var ( + confDetails *chainntnfs.TxConfirmation + scanHeight uint32 + ) + + // Starting from the height hint, we'll walk forwards in the chain to + // see if this transaction has already been confirmed. +chainScan: + for scanHeight := heightHint; scanHeight <= currentHeight; scanHeight++ { + // First, we'll fetch the block header for this height so we + // can compute the current block hash. + header, err := n.p2pNode.GetBlockByHeight(scanHeight) + if err != nil { + chainntnfs.Log.Errorf("unable to get header for "+ + "height: %v", err) + return false + } + blockHash := header.BlockHash() + + // With the hash computed, we can now fetch the extended filter + // for this height. + extFilter, err := n.p2pNode.GetExtFilter(blockHash) + if err != nil { + chainntnfs.Log.Errorf("unable to retrieve extended "+ + "filter for height: %v", scanHeight) + return false + } + + // If the block has no transactions other than the coinbase + // transaction, then the filter may be nil, so we'll continue + // forward int that case. + if extFilter == nil { + continue + } + + // In the case that the filter exists, we'll attempt to see if + // any element in it match our target txid. + key := builder.DeriveKey(&blockHash) + match, err := extFilter.Match(key, targetHash[:]) + if err != nil { + chainntnfs.Log.Errorf("unable to query filter: %v", err) + return false + } + + // If there's no match, then we can continue forward to the + // next block. + if !match { + continue + } + + // In the case that we do have a match, we'll fetch the block + // from the network so we can find the positional data required + // to send the proper response. + block, err := n.p2pNode.GetBlockFromNetwork(blockHash) + if err != nil { + chainntnfs.Log.Errorf("unable to get block from "+ + "network: %v", err) + return false + } + for j, tx := range block.Transactions() { + txHash := tx.Hash() + if txHash.IsEqual(targetHash) { + confDetails = &chainntnfs.TxConfirmation{ + BlockHash: &blockHash, + BlockHeight: scanHeight, + TxIndex: uint32(j), + } + break chainScan + } + } + } + + // If it hasn't yet been confirmed, then we can exit early. + if confDetails == nil { + return false + } + + // Otherwise, we'll calculate the number of confirmations that the + // transaction has so we can decide if it has reached the desired + // number of confirmations or not. + txConfs := currentHeight - scanHeight + + // If the transaction has more that enough confirmations, then we can + // dispatch it immediately after obtaining for information w.r.t + // exactly *when* if got all its confirmations. + if uint32(txConfs) >= msg.numConfirmations { + msg.finConf <- confDetails + return true + } + + // Otherwise, the transaction has only been *partially* confirmed, so + // we need to insert it into the confirmation heap. + confsLeft := msg.numConfirmations - uint32(txConfs) + confHeight := uint32(currentHeight) + confsLeft + heapEntry := &confEntry{ + msg, + confDetails, + confHeight, + } + heap.Push(n.confHeap, heapEntry) + + return false +} + +// notifyBlockEpochs notifies all registered block epoch clients of the newly +// connected block to the main chain. +func (n *NeutrinoNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) { + epoch := &chainntnfs.BlockEpoch{ + Height: newHeight, + Hash: newSha, + } + + for _, epochClient := range n.blockEpochClients { + n.wg.Add(1) + go func(ntfnChan chan *chainntnfs.BlockEpoch, cancelChan chan struct{}) { + defer n.wg.Done() + + select { + case ntfnChan <- epoch: + + case <-cancelChan: + return + + case <-n.quit: + return + } + }(epochClient.epochChan, epochClient.cancelChan) + } +} + +// notifyConfs examines the current confirmation heap, sending off any +// notifications which have been triggered by the connection of a new block at +// newBlockHeight. +func (n *NeutrinoNotifier) notifyConfs(newBlockHeight int32) { + // If the heap is empty, we have nothing to do. + if n.confHeap.Len() == 0 { + return + } + + // Traverse our confirmation heap. The heap is a min-heap, so the + // confirmation notification which requires the smallest block-height + // will always be at the top of the heap. If a confirmation + // notification is eligible for triggering, then fire it off, and check + // if another is eligible until there are no more eligible entries. + nextConf := heap.Pop(n.confHeap).(*confEntry) + for nextConf.triggerHeight <= uint32(newBlockHeight) { + + nextConf.finConf <- nextConf.initialConfDetails + + if n.confHeap.Len() == 0 { + return + } + + nextConf = heap.Pop(n.confHeap).(*confEntry) + } + + heap.Push(n.confHeap, nextConf) +} + +// checkConfirmationTrigger determines if the passed txSha included at +// blockHeight triggers any single confirmation notifications. In the event +// that the txid matches, yet needs additional confirmations, it is added to +// the confirmation heap to be triggered at a later time. +func (n *NeutrinoNotifier) checkConfirmationTrigger(txSha *chainhash.Hash, + newTip *filteredBlock, txIndex int) { + + // If a confirmation notification has been registered for this txid, + // then either trigger a notification event if only a single + // confirmation notification was requested, or place the notification + // on the confirmation heap for future usage. + if confClients, ok := n.confNotifications[*txSha]; ok { + // Either all of the registered confirmations will be + // dispatched due to a single confirmation, or added to the + // conf head. Therefor we unconditionally delete the registered + // confirmations from the staging zone. + defer func() { + delete(n.confNotifications, *txSha) + }() + + for _, confClient := range confClients { + confDetails := &chainntnfs.TxConfirmation{ + BlockHash: &newTip.hash, + BlockHeight: uint32(newTip.height), + TxIndex: uint32(txIndex), + } + + if confClient.numConfirmations == 1 { + chainntnfs.Log.Infof("Dispatching single conf "+ + "notification, sha=%v, height=%v", txSha, + newTip.height) + confClient.finConf <- confDetails + continue + } + + // The registered notification requires more than one + // confirmation before triggering. So we create a + // heapConf entry for this notification. The heapConf + // allows us to easily keep track of which + // notification(s) we should fire off with each + // incoming block. + confClient.initialConfirmHeight = uint32(newTip.height) + finalConfHeight := confClient.initialConfirmHeight + confClient.numConfirmations - 1 + heapEntry := &confEntry{ + confClient, + confDetails, + finalConfHeight, + } + heap.Push(n.confHeap, heapEntry) + } + } +} + +// spendNotification couples a target outpoint along with the channel used for +// notifications once a spend of the outpoint has been detected. +type spendNotification struct { + targetOutpoint *wire.OutPoint + + spendChan chan *chainntnfs.SpendDetail + + spendID uint64 +} + +// spendCancel is a message sent to the NeutrinoNotifier when a client wishes +// to cancel an outstanding spend notification that has yet to be dispatched. +type spendCancel struct { + // op is the target outpoint of the notification to be cancelled. + op wire.OutPoint + + // spendID the ID of the notification to cancel. + spendID uint64 +} + +// RegisterSpendNtfn registers an intent to be notified once the target +// outpoint has been spent by a transaction on-chain. Once a spend of the +// target outpoint has been detected, the details of the spending event will be +// sent across the 'Spend' channel. +func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, + heightHint uint32) (*chainntnfs.SpendEvent, error) { + + n.heightMtx.RLock() + currentHeight := n.bestHeight + n.heightMtx.RUnlock() + + ntfn := &spendNotification{ + targetOutpoint: outpoint, + spendChan: make(chan *chainntnfs.SpendDetail, 1), + spendID: atomic.AddUint64(&n.spendClientCounter, 1), + } + spendEvent := &chainntnfs.SpendEvent{ + Spend: ntfn.spendChan, + Cancel: func() { + select { + case n.notificationCancels <- &spendCancel{ + op: *outpoint, + spendID: ntfn.spendID, + }: + case <-n.quit: + return + } + }, + } + + // Before sending off the notification request, we'll attempt to see if + // this output is still spent or not at this point in the chain. + spendReport, err := n.p2pNode.GetUtxo( + neutrino.WatchOutPoints(*outpoint), + neutrino.StartBlock(&waddrmgr.BlockStamp{ + Height: int32(heightHint), + }), + neutrino.EndBlock(&waddrmgr.BlockStamp{ + Height: int32(currentHeight), + }), + ) + if err != nil { + return nil, err + } + + // If a spend report was returned, and the transaction is present, then + // this means that the output is already spent. + if spendReport != nil && spendReport.SpendingTx != nil { + // As a result, we'll launch a goroutine to immediately + // dispatch the notification with a normal response. + go func() { + txSha := spendReport.SpendingTx.TxHash() + select { + case ntfn.spendChan <- &chainntnfs.SpendDetail{ + SpentOutPoint: outpoint, + SpenderTxHash: &txSha, + SpendingTx: spendReport.SpendingTx, + SpenderInputIndex: spendReport.SpendingInputIndex, + SpendingHeight: int32(spendReport.SpendingTxHeight), + }: + case <-n.quit: + return + } + + }() + + return spendEvent, nil + } + + // If the output is still unspent, then we'll update our rescan's + // filter, and send the request to the dispatcher goroutine. + rescanUpdate := []neutrino.UpdateOption{ + neutrino.AddOutPoints(*outpoint), + neutrino.Rewind(currentHeight), + } + if err := n.chainView.Update(rescanUpdate...); err != nil { + return nil, err + } + + select { + case n.notificationRegistry <- ntfn: + case <-n.quit: + return nil, ErrChainNotifierShuttingDown + } + + return spendEvent, nil +} + +// confirmationNotification represents a client's intent to receive a +// notification once the target txid reaches numConfirmations confirmations. +type confirmationsNotification struct { + txid *chainhash.Hash + + heightHint uint32 + + initialConfirmHeight uint32 + numConfirmations uint32 + + finConf chan *chainntnfs.TxConfirmation + negativeConf chan int32 // TODO(roasbeef): re-org funny business +} + +// RegisterConfirmationsNtfn registers a notification with NeutrinoNotifier +// which will be triggered once the txid reaches numConfs number of +// confirmations. +func (n *NeutrinoNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, + numConfs, heightHint uint32) (*chainntnfs.ConfirmationEvent, error) { + + ntfn := &confirmationsNotification{ + txid: txid, + heightHint: heightHint, + numConfirmations: numConfs, + finConf: make(chan *chainntnfs.TxConfirmation, 1), + negativeConf: make(chan int32, 1), + } + + select { + case <-n.quit: + return nil, ErrChainNotifierShuttingDown + case n.notificationRegistry <- ntfn: + return &chainntnfs.ConfirmationEvent{ + Confirmed: ntfn.finConf, + NegativeConf: ntfn.negativeConf, + }, nil + } +} + +// blockEpochRegistration represents a client's intent to receive a +// notification with each newly connected block. +type blockEpochRegistration struct { + epochChan chan *chainntnfs.BlockEpoch + + cancelChan chan struct{} + + epochID uint64 +} + +// epochCancel is a message sent to the NeutrinoNotifier when a client wishes +// to cancel an outstanding epoch notification that has yet to be dispatched. +type epochCancel struct { + epochID uint64 + + done chan struct{} +} + +// RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the caller +// to receive notifications, of each new block connected to the main chain. +func (n *NeutrinoNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { + registration := &blockEpochRegistration{ + epochChan: make(chan *chainntnfs.BlockEpoch, 20), + cancelChan: make(chan struct{}), + epochID: atomic.AddUint64(&n.epochClientCounter, 1), + } + + select { + case <-n.quit: + return nil, errors.New("chainntnfs: system interrupt while " + + "attempting to register for block epoch notification.") + case n.notificationRegistry <- registration: + return &chainntnfs.BlockEpochEvent{ + Epochs: registration.epochChan, + Cancel: func() { + cancel := &epochCancel{ + epochID: registration.epochID, + done: make(chan struct{}), + } + + select { + case n.notificationCancels <- cancel: + select { + case <-cancel.done: + case <-n.quit: + } + case <-n.quit: + return + } + }, + }, nil + } +} diff --git a/routing/chainview/neutrino.go b/routing/chainview/neutrino.go index 96e45a69..c2a98559 100644 --- a/routing/chainview/neutrino.go +++ b/routing/chainview/neutrino.go @@ -76,7 +76,7 @@ func NewCfFilteredChainView(node *neutrino.ChainService) (*CfFilteredChainView, // Start kicks off the FilteredChainView implementation. This function must be // called before any calls to UpdateFilter can be processed. -// j +// // NOTE: This is part of the FilteredChainView interface. func (c *CfFilteredChainView) Start() error { // Already started? @@ -214,7 +214,7 @@ func (c *CfFilteredChainView) chainFilterer() { // With our internal chain view update, we'll craft a // new update to the chainView which includes our new - // UTOX's, and current update height. + // UTXO's, and current update height. rescanUpdate := []neutrino.UpdateOption{ neutrino.AddOutPoints(update.newUtxos...), neutrino.Rewind(update.updateHeight), @@ -232,7 +232,7 @@ func (c *CfFilteredChainView) chainFilterer() { // FilterBlock takes a block hash, and returns a FilteredBlocks which is the // result of applying the current registered UTXO sub-set on the block -// corresponding to that block hash. If any watched UTOX's are spent by the +// corresponding to that block hash. If any watched UTXO's are spent by the // selected lock, then the internal chainFilter will also be updated. // // NOTE: This is part of the FilteredChainView interface.