diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go new file mode 100644 index 00000000..8be208b6 --- /dev/null +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -0,0 +1,679 @@ +package bitcoindnotify + +import ( + "errors" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/lightningnetwork/lnd/chainntnfs" + "github.com/roasbeef/btcd/btcjson" + "github.com/roasbeef/btcd/chaincfg" + "github.com/roasbeef/btcd/chaincfg/chainhash" + "github.com/roasbeef/btcd/rpcclient" + "github.com/roasbeef/btcd/wire" + "github.com/roasbeef/btcutil" + "github.com/roasbeef/btcwallet/chain" +) + +const ( + + // notifierType uniquely identifies this concrete implementation of the + // ChainNotifier interface. + notifierType = "bitcoind" + + // reorgSafetyLimit is assumed maximum depth of a chain reorganization. + // After this many confirmation, transaction confirmation info will be + // pruned. + reorgSafetyLimit = 100 +) + +var ( + // ErrChainNotifierShuttingDown is used when we are trying to + // measure a spend notification when notifier is already stopped. + ErrChainNotifierShuttingDown = errors.New("chainntnfs: system interrupt " + + "while attempting to register for spend notification.") +) + +// chainUpdate encapsulates an update to the current main chain. This struct is +// used as an element within an unbounded queue in order to avoid blocking the +// main rpc dispatch rule. +type chainUpdate struct { + blockHash *chainhash.Hash + blockHeight int32 +} + +// txUpdate encapsulates a transaction related notification sent from bitcoind +// to the registered RPC client. This struct is used as an element within an +// unbounded queue in order to avoid blocking the main rpc dispatch rule. +type txUpdate struct { + tx *btcutil.Tx + details *btcjson.BlockDetails +} + +// TODO(roasbeef): generalize struct below: +// * move chans to config, allow outside callers to handle send conditions + +// BitcoindNotifier implements the ChainNotifier interface using a bitcoind +// chain client. Multiple concurrent clients are supported. All notifications +// are achieved via non-blocking sends on client channels. +type BitcoindNotifier struct { + spendClientCounter uint64 // To be used atomically. + epochClientCounter uint64 // To be used atomically. + + started int32 // To be used atomically. + stopped int32 // To be used atomically. + + heightMtx sync.RWMutex + bestHeight int32 + + chainConn *chain.BitcoindClient + + notificationCancels chan interface{} + notificationRegistry chan interface{} + + spendNotifications map[wire.OutPoint]map[uint64]*spendNotification + + txConfNotifier *chainntnfs.TxConfNotifier + + blockEpochClients map[uint64]*blockEpochRegistration + + wg sync.WaitGroup + quit chan struct{} +} + +// Ensure BitcoindNotifier implements the ChainNotifier interface at compile +// time. +var _ chainntnfs.ChainNotifier = (*BitcoindNotifier)(nil) + +// New returns a new BitcoindNotifier instance. This function assumes the +// bitcoind node detailed in the passed configuration is already running, and +// willing to accept RPC requests and new zmq clients. +func New(config *rpcclient.ConnConfig, zmqConnect string, + params chaincfg.Params) (*BitcoindNotifier, error) { + notifier := &BitcoindNotifier{ + notificationCancels: make(chan interface{}), + notificationRegistry: make(chan interface{}), + + blockEpochClients: make(map[uint64]*blockEpochRegistration), + + spendNotifications: make(map[wire.OutPoint]map[uint64]*spendNotification), + + quit: make(chan struct{}), + } + + // Disable connecting to bitcoind within the rpcclient.New method. We + // defer establishing the connection to our .Start() method. + config.DisableConnectOnNew = true + config.DisableAutoReconnect = false + chainConn, err := chain.NewBitcoindClient(¶ms, config.Host, + config.User, config.Pass, zmqConnect, 100*time.Millisecond) + if err != nil { + return nil, err + } + notifier.chainConn = chainConn + + return notifier, nil +} + +// Start connects to the running bitcoind node over websockets, registers for +// block notifications, and finally launches all related helper goroutines. +func (b *BitcoindNotifier) Start() error { + // Already started? + if atomic.AddInt32(&b.started, 1) != 1 { + return nil + } + + // Connect to bitcoind, and register for notifications on connected, + // and disconnected blocks. + if err := b.chainConn.Start(); err != nil { + return err + } + if err := b.chainConn.NotifyBlocks(); err != nil { + return err + } + + _, currentHeight, err := b.chainConn.GetBestBlock() + if err != nil { + return err + } + + b.heightMtx.Lock() + b.bestHeight = currentHeight + b.heightMtx.Unlock() + + b.txConfNotifier = chainntnfs.NewTxConfNotifier( + uint32(currentHeight), reorgSafetyLimit) + + b.wg.Add(1) + go b.notificationDispatcher() + + return nil +} + +// Stop shutsdown the BitcoindNotifier. +func (b *BitcoindNotifier) Stop() error { + // Already shutting down? + if atomic.AddInt32(&b.stopped, 1) != 1 { + return nil + } + + // Shutdown the rpc client, this gracefully disconnects from bitcoind, + // and cleans up all related resources. + b.chainConn.Stop() + + close(b.quit) + b.wg.Wait() + + // Notify all pending clients of our shutdown by closing the related + // notification channels. + for _, spendClients := range b.spendNotifications { + for _, spendClient := range spendClients { + close(spendClient.spendChan) + } + } + for _, epochClient := range b.blockEpochClients { + close(epochClient.epochChan) + } + b.txConfNotifier.TearDown() + + return nil +} + +// blockNtfn packages a notification of a connected/disconnected block along +// with its height at the time. +type blockNtfn struct { + sha *chainhash.Hash + height int32 +} + +// notificationDispatcher is the primary goroutine which handles client +// notification registrations, as well as notification dispatches. +func (b *BitcoindNotifier) notificationDispatcher() { +out: + for { + select { + case cancelMsg := <-b.notificationCancels: + switch msg := cancelMsg.(type) { + case *spendCancel: + chainntnfs.Log.Infof("Cancelling spend "+ + "notification for out_point=%v, "+ + "spend_id=%v", msg.op, msg.spendID) + + // Before we attempt to close the spendChan, + // ensure that the notification hasn't already + // yet been dispatched. + if outPointClients, ok := b.spendNotifications[msg.op]; ok { + close(outPointClients[msg.spendID].spendChan) + delete(b.spendNotifications[msg.op], msg.spendID) + } + + case *epochCancel: + chainntnfs.Log.Infof("Cancelling epoch "+ + "notification, epoch_id=%v", msg.epochID) + + // First, close the cancel channel for this + // specific client, and wait for the client to + // exit. + close(b.blockEpochClients[msg.epochID].cancelChan) + b.blockEpochClients[msg.epochID].wg.Wait() + + // Once the client has exited, we can then + // safely close the channel used to send epoch + // notifications, in order to notify any + // listeners that the intent has been + // cancelled. + close(b.blockEpochClients[msg.epochID].epochChan) + delete(b.blockEpochClients, msg.epochID) + + } + case registerMsg := <-b.notificationRegistry: + switch msg := registerMsg.(type) { + case *spendNotification: + chainntnfs.Log.Infof("New spend subscription: "+ + "utxo=%v", msg.targetOutpoint) + op := *msg.targetOutpoint + + if _, ok := b.spendNotifications[op]; !ok { + b.spendNotifications[op] = make(map[uint64]*spendNotification) + } + b.spendNotifications[op][msg.spendID] = msg + b.chainConn.NotifySpent([]*wire.OutPoint{&op}) + case *confirmationsNotification: + chainntnfs.Log.Infof("New confirmations "+ + "subscription: txid=%v, numconfs=%v", + msg.TxID, msg.NumConfirmations) + + // Lookup whether the transaction is already included in the + // active chain. + txConf, err := b.historicalConfDetails(msg.TxID) + if err != nil { + chainntnfs.Log.Error(err) + } + b.heightMtx.RLock() + err = b.txConfNotifier.Register(&msg.ConfNtfn, txConf) + if err != nil { + chainntnfs.Log.Error(err) + } + b.heightMtx.RUnlock() + case *blockEpochRegistration: + chainntnfs.Log.Infof("New block epoch subscription") + b.blockEpochClients[msg.epochID] = msg + } + + case ntfn := <-b.chainConn.Notifications(): + switch item := ntfn.(type) { + case chain.BlockConnected: + b.heightMtx.Lock() + if item.Height != b.bestHeight+1 { + chainntnfs.Log.Warnf("Received blocks out of order: "+ + "current height=%d, new height=%d", + b.bestHeight, item.Height) + b.heightMtx.Unlock() + continue + } + b.bestHeight = item.Height + + rawBlock, err := b.chainConn.GetBlock(&item.Hash) + if err != nil { + chainntnfs.Log.Errorf("Unable to get block: %v", err) + b.heightMtx.Unlock() + continue + } + + chainntnfs.Log.Infof("New block: height=%v, sha=%v", + item.Height, item.Hash) + + b.notifyBlockEpochs(item.Height, &item.Hash) + + txns := btcutil.NewBlock(rawBlock).Transactions() + err = b.txConfNotifier.ConnectTip(&item.Hash, + uint32(item.Height), txns) + if err != nil { + chainntnfs.Log.Error(err) + } + b.heightMtx.Unlock() + continue + + case chain.BlockDisconnected: + b.heightMtx.Lock() + if item.Height != b.bestHeight { + chainntnfs.Log.Warnf("Received blocks "+ + "out of order: current height="+ + "%d, disconnected height=%d", + b.bestHeight, item.Height) + b.heightMtx.Unlock() + continue + } + b.bestHeight = item.Height - 1 + + chainntnfs.Log.Infof("Block disconnected from "+ + "main chain: height=%v, sha=%v", + item.Height, item.Hash) + + err := b.txConfNotifier.DisconnectTip( + uint32(item.Height)) + if err != nil { + chainntnfs.Log.Error(err) + } + b.heightMtx.Unlock() + + case chain.RelevantTx: + tx := item.TxRecord.MsgTx + // First, check if this transaction spends an output + // that has an existing spend notification for it. + for i, txIn := range tx.TxIn { + prevOut := txIn.PreviousOutPoint + + // If this transaction indeed does spend an + // output which we have a registered + // notification for, then create a spend + // summary, finally sending off the details to + // the notification subscriber. + if clients, ok := b.spendNotifications[prevOut]; ok { + spenderSha := tx.TxHash() + spendDetails := &chainntnfs.SpendDetail{ + SpentOutPoint: &prevOut, + SpenderTxHash: &spenderSha, + SpendingTx: &tx, + SpenderInputIndex: uint32(i), + } + // TODO(roasbeef): after change to + // loadfilter, only notify on block + // inclusion? + if item.Block != nil { + spendDetails.SpendingHeight = item.Block.Height + } else { + b.heightMtx.RLock() + spendDetails.SpendingHeight = b.bestHeight + 1 + b.heightMtx.RUnlock() + } + + for _, ntfn := range clients { + chainntnfs.Log.Infof("Dispatching "+ + "spend notification for "+ + "outpoint=%v", ntfn.targetOutpoint) + ntfn.spendChan <- spendDetails + + // Close spendChan to ensure that any calls to Cancel will not + // block. This is safe to do since the channel is buffered, and the + // message can still be read by the receiver. + close(ntfn.spendChan) + } + delete(b.spendNotifications, prevOut) + } + } + } + + case <-b.quit: + break out + } + } + b.wg.Done() +} + +// historicalConfDetails looks up whether a transaction is already included in a +// block in the active chain and, if so, returns details about the confirmation. +func (b *BitcoindNotifier) historicalConfDetails(txid *chainhash.Hash, +) (*chainntnfs.TxConfirmation, error) { + + // If the transaction already has some or all of the confirmations, + // then we may be able to dispatch it immediately. + // TODO: fall back to scanning blocks if txindex isn't on. + tx, err := b.chainConn.GetRawTransactionVerbose(txid) + if err != nil || tx == nil || tx.BlockHash == "" { + if err == nil { + return nil, nil + } + // Do not return an error if the transaction was not found. + if jsonErr, ok := err.(*btcjson.RPCError); ok { + if jsonErr.Code == btcjson.ErrRPCNoTxInfo { + return nil, nil + } + } + return nil, fmt.Errorf("unable to query for txid(%v): %v", txid, err) + } + + // As we need to fully populate the returned TxConfirmation struct, + // grab the block in which the transaction was confirmed so we can + // locate its exact index within the block. + blockHash, err := chainhash.NewHashFromStr(tx.BlockHash) + if err != nil { + return nil, fmt.Errorf("unable to get block hash %v for historical "+ + "dispatch: %v", tx.BlockHash, err) + } + block, err := b.chainConn.GetBlockVerbose(blockHash) + if err != nil { + return nil, fmt.Errorf("unable to get block hash: %v", err) + } + + // If the block obtained, locate the transaction's index within the + // block so we can give the subscriber full confirmation details. + txIndex := -1 + targetTxidStr := txid.String() + for i, txHash := range block.Tx { + if txHash == targetTxidStr { + txIndex = i + break + } + } + + if txIndex == -1 { + return nil, fmt.Errorf("unable to locate tx %v in block %v", + txid, blockHash) + } + + txConf := chainntnfs.TxConfirmation{ + BlockHash: blockHash, + BlockHeight: uint32(block.Height), + TxIndex: uint32(txIndex), + } + return &txConf, nil +} + +// notifyBlockEpochs notifies all registered block epoch clients of the newly +// connected block to the main chain. +func (b *BitcoindNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) { + epoch := &chainntnfs.BlockEpoch{ + Height: newHeight, + Hash: newSha, + } + + for _, epochClient := range b.blockEpochClients { + b.wg.Add(1) + epochClient.wg.Add(1) + go func(ntfnChan chan *chainntnfs.BlockEpoch, cancelChan chan struct{}, + clientWg *sync.WaitGroup) { + + // TODO(roasbeef): move to goroutine per client, use sync queue + + defer clientWg.Done() + defer b.wg.Done() + + select { + case ntfnChan <- epoch: + + case <-cancelChan: + return + + case <-b.quit: + return + } + + }(epochClient.epochChan, epochClient.cancelChan, &epochClient.wg) + } +} + +// spendNotification couples a target outpoint along with the channel used for +// notifications once a spend of the outpoint has been detected. +type spendNotification struct { + targetOutpoint *wire.OutPoint + + spendChan chan *chainntnfs.SpendDetail + + spendID uint64 +} + +// spendCancel is a message sent to the BitcoindNotifier when a client wishes +// to cancel an outstanding spend notification that has yet to be dispatched. +type spendCancel struct { + // op is the target outpoint of the notification to be cancelled. + op wire.OutPoint + + // spendID the ID of the notification to cancel. + spendID uint64 +} + +// RegisterSpendNtfn registers an intent to be notified once the target +// outpoint has been spent by a transaction on-chain. Once a spend of the target +// outpoint has been detected, the details of the spending event will be sent +// across the 'Spend' channel. +func (b *BitcoindNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, + _ uint32) (*chainntnfs.SpendEvent, error) { + + if err := b.chainConn.NotifySpent([]*wire.OutPoint{outpoint}); err != nil { + return nil, err + } + + ntfn := &spendNotification{ + targetOutpoint: outpoint, + spendChan: make(chan *chainntnfs.SpendDetail, 1), + spendID: atomic.AddUint64(&b.spendClientCounter, 1), + } + + select { + case <-b.quit: + return nil, ErrChainNotifierShuttingDown + case b.notificationRegistry <- ntfn: + } + + // The following conditional checks to ensure that when a spend notification + // is registered, the output hasn't already been spent. If the output + // is no longer in the UTXO set, the chain will be rescanned from the point + // where the output was added. The rescan will dispatch the notification. + txout, err := b.chainConn.GetTxOut(&outpoint.Hash, outpoint.Index, true) + if err != nil { + return nil, err + } + + if txout == nil { + // TODO: fall back to scanning blocks if txindex isn't on. + transaction, err := b.chainConn.GetRawTransactionVerbose(&outpoint.Hash) + if err != nil { + jsonErr, ok := err.(*btcjson.RPCError) + if !ok || jsonErr.Code != btcjson.ErrRPCNoTxInfo { + return nil, err + } + } + + if transaction != nil { + blockhash, err := chainhash.NewHashFromStr(transaction.BlockHash) + if err != nil { + return nil, err + } + + // Rewind the rescan, since the btcwallet bitcoind + // back-end doesn't support that. + blockHeight, err := b.chainConn.GetBlockHeight(blockhash) + if err != nil { + return nil, err + } + b.heightMtx.Lock() + currentHeight := b.bestHeight + b.bestHeight = blockHeight + for i := currentHeight; i > blockHeight; i-- { + err = b.txConfNotifier.DisconnectTip(uint32(i)) + if err != nil { + return nil, err + } + } + b.heightMtx.Unlock() + + ops := []*wire.OutPoint{outpoint} + if err := b.chainConn.Rescan(blockhash, nil, ops); err != nil { + chainntnfs.Log.Errorf("Rescan for spend "+ + "notification txout failed: %v", err) + return nil, err + } + } + } + + return &chainntnfs.SpendEvent{ + Spend: ntfn.spendChan, + Cancel: func() { + cancel := &spendCancel{ + op: *outpoint, + spendID: ntfn.spendID, + } + + // Submit spend cancellation to notification dispatcher. + select { + case b.notificationCancels <- cancel: + // Cancellation is being handled, drain the spend chan until it is + // closed before yielding to the caller. + for { + select { + case _, ok := <-ntfn.spendChan: + if !ok { + return + } + case <-b.quit: + return + } + } + case <-b.quit: + } + }, + }, nil +} + +// confirmationNotification represents a client's intent to receive a +// notification once the target txid reaches numConfirmations confirmations. +type confirmationsNotification struct { + chainntnfs.ConfNtfn +} + +// RegisterConfirmationsNtfn registers a notification with BitcoindNotifier +// which will be triggered once the txid reaches numConfs number of +// confirmations. +func (b *BitcoindNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, + numConfs, _ uint32) (*chainntnfs.ConfirmationEvent, error) { + + ntfn := &confirmationsNotification{ + chainntnfs.ConfNtfn{ + TxID: txid, + NumConfirmations: numConfs, + Event: chainntnfs.NewConfirmationEvent(), + }, + } + + select { + case <-b.quit: + return nil, ErrChainNotifierShuttingDown + case b.notificationRegistry <- ntfn: + return ntfn.Event, nil + } +} + +// blockEpochRegistration represents a client's intent to receive a +// notification with each newly connected block. +type blockEpochRegistration struct { + epochID uint64 + + epochChan chan *chainntnfs.BlockEpoch + + cancelChan chan struct{} + + wg sync.WaitGroup +} + +// epochCancel is a message sent to the BitcoindNotifier when a client wishes +// to cancel an outstanding epoch notification that has yet to be dispatched. +type epochCancel struct { + epochID uint64 +} + +// RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the +// caller to receive notifications, of each new block connected to the main +// chain. +func (b *BitcoindNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { + registration := &blockEpochRegistration{ + epochChan: make(chan *chainntnfs.BlockEpoch, 20), + cancelChan: make(chan struct{}), + epochID: atomic.AddUint64(&b.epochClientCounter, 1), + } + + select { + case <-b.quit: + return nil, errors.New("chainntnfs: system interrupt while " + + "attempting to register for block epoch notification.") + case b.notificationRegistry <- registration: + return &chainntnfs.BlockEpochEvent{ + Epochs: registration.epochChan, + Cancel: func() { + cancel := &epochCancel{ + epochID: registration.epochID, + } + + // Submit epoch cancellation to notification dispatcher. + select { + case b.notificationCancels <- cancel: + // Cancellation is being handled, drain the epoch channel until it is + // closed before yielding to caller. + for { + select { + case _, ok := <-registration.epochChan: + if !ok { + return + } + case <-b.quit: + return + } + } + case <-b.quit: + } + }, + }, nil + } +} diff --git a/chainntnfs/bitcoindnotify/driver.go b/chainntnfs/bitcoindnotify/driver.go new file mode 100644 index 00000000..52dae673 --- /dev/null +++ b/chainntnfs/bitcoindnotify/driver.go @@ -0,0 +1,53 @@ +package bitcoindnotify + +import ( + "fmt" + + "github.com/lightningnetwork/lnd/chainntnfs" + "github.com/roasbeef/btcd/chaincfg" + "github.com/roasbeef/btcd/rpcclient" +) + +// createNewNotifier creates a new instance of the ChainNotifier interface +// implemented by BitcoindNotifier. +func createNewNotifier(args ...interface{}) (chainntnfs.ChainNotifier, error) { + if len(args) != 3 { + return nil, fmt.Errorf("incorrect number of arguments to "+ + ".New(...), expected 3, instead passed %v", len(args)) + } + + config, ok := args[0].(*rpcclient.ConnConfig) + if !ok { + return nil, fmt.Errorf("first argument to bitcoindnotifier." + + "New is incorrect, expected a *rpcclient.ConnConfig") + } + + zmqConnect, ok := args[1].(string) + if !ok { + return nil, fmt.Errorf("second argument to bitcoindnotifier." + + "New is incorrect, expected a string") + } + + params, ok := args[2].(chaincfg.Params) + if !ok { + return nil, fmt.Errorf("third argument to bitcoindnotifier." + + "New is incorrect, expected a chaincfg.Params") + } + + return New(config, zmqConnect, params) +} + +// init registers a driver for the BtcdNotifier concrete implementation of the +// chainntnfs.ChainNotifier interface. +func init() { + // Register the driver. + notifier := &chainntnfs.NotifierDriver{ + NotifierType: notifierType, + New: createNewNotifier, + } + + if err := chainntnfs.RegisterNotifier(notifier); err != nil { + panic(fmt.Sprintf("failed to register notifier driver '%s': %v", + notifierType, err)) + } +} diff --git a/chainntnfs/interface_test.go b/chainntnfs/interface_test.go index dc859926..1a1635dc 100644 --- a/chainntnfs/interface_test.go +++ b/chainntnfs/interface_test.go @@ -5,7 +5,9 @@ import ( "fmt" "io/ioutil" "log" + "math/rand" "os" + "os/exec" "path/filepath" "sync" "testing" @@ -13,6 +15,7 @@ import ( "github.com/lightninglabs/neutrino" "github.com/lightningnetwork/lnd/chainntnfs" + "github.com/ltcsuite/ltcd/btcjson" "github.com/roasbeef/btcd/chaincfg/chainhash" "github.com/roasbeef/btcwallet/walletdb" @@ -24,6 +27,10 @@ import ( "github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcutil" + // Required to auto-register the bitcoind backed ChainNotifier + // implementation. + _ "github.com/lightningnetwork/lnd/chainntnfs/bitcoindnotify" + // Required to auto-register the btcd backed ChainNotifier // implementation. _ "github.com/lightningnetwork/lnd/chainntnfs/btcdnotify" @@ -32,7 +39,8 @@ import ( // implementation. _ "github.com/lightningnetwork/lnd/chainntnfs/neutrinonotify" - _ "github.com/roasbeef/btcwallet/walletdb/bdb" // Required to register the boltdb walletdb implementation. + // Required to register the boltdb walletdb implementation. + _ "github.com/roasbeef/btcwallet/walletdb/bdb" ) var ( @@ -43,7 +51,7 @@ var ( 0x1e, 0xb, 0x4c, 0xfd, 0x9e, 0xc5, 0x8c, 0xe9, } - netParams = &chaincfg.SimNetParams + netParams = &chaincfg.RegressionNetParams privKey, pubKey = btcec.PrivKeyFromBytes(btcec.S256(), testPrivKey) addrPk, _ = btcutil.NewAddressPubKey(pubKey.SerializeCompressed(), netParams) @@ -65,6 +73,39 @@ func getTestTxId(miner *rpctest.Harness) (*chainhash.Hash, error) { return miner.SendOutputs(outputs, 10) } +func waitForMempoolTx(r *rpctest.Harness, txid *chainhash.Hash) error { + var found bool + var tx *btcutil.Tx + var err error + timeout := time.After(10 * time.Second) + for !found { + // Do a short wait + select { + case <-timeout: + return fmt.Errorf("timeout after 10s") + default: + } + time.Sleep(100 * time.Millisecond) + + // Check for the harness' knowledge of the txid + tx, err = r.Node.GetRawTransaction(txid) + if err != nil { + switch e := err.(type) { + case *btcjson.RPCError: + if e.Code == btcjson.ErrRPCNoTxInfo { + continue + } + default: + } + return err + } + if tx != nil && tx.MsgTx().TxHash() == *txid { + found = true + } + } + return nil +} + func testSingleConfirmationNotification(miner *rpctest.Harness, notifier chainntnfs.ChainNotifier, t *testing.T) { @@ -80,6 +121,11 @@ func testSingleConfirmationNotification(miner *rpctest.Harness, t.Fatalf("unable to create test tx: %v", err) } + err = waitForMempoolTx(miner, txid) + if err != nil { + t.Fatalf("tx not relayed to miner: %v", err) + } + _, currentHeight, err := miner.Node.GetBestBlock() if err != nil { t.Fatalf("unable to get current height: %v", err) @@ -143,6 +189,11 @@ func testMultiConfirmationNotification(miner *rpctest.Harness, t.Fatalf("unable to create test addr: %v", err) } + err = waitForMempoolTx(miner, txid) + if err != nil { + t.Fatalf("tx not relayed to miner: %v", err) + } + _, currentHeight, err := miner.Node.GetBestBlock() if err != nil { t.Fatalf("unable to get current height: %v", err) @@ -201,6 +252,11 @@ func testBatchConfirmationNotification(miner *rpctest.Harness, t.Fatalf("unable to register ntfn: %v", err) } confIntents[i] = confIntent + err = waitForMempoolTx(miner, txid) + if err != nil { + t.Fatalf("tx not relayed to miner: %v", err) + } + } initialConfHeight := uint32(currentHeight + 1) @@ -252,6 +308,11 @@ func createSpendableOutput(miner *rpctest.Harness, t.Fatalf("unable to create test addr: %v", err) } + err = waitForMempoolTx(miner, txid) + if err != nil { + t.Fatalf("tx not relayed to miner: %v", err) + } + // Mine a single block which should include that txid above. if _, err := miner.Node.Generate(1); err != nil { t.Fatalf("unable to generate single block: %v", err) @@ -342,6 +403,11 @@ func testSpendNotification(miner *rpctest.Harness, t.Fatalf("unable to broadcast tx: %v", err) } + err = waitForMempoolTx(miner, spenderSha) + if err != nil { + t.Fatalf("tx not relayed to miner: %v", err) + } + // Now we mine a single block, which should include our spend. The // notification should also be sent off. if _, err := miner.Node.Generate(1); err != nil { @@ -453,6 +519,11 @@ func testMultiClientConfirmationNotification(miner *rpctest.Harness, t.Fatalf("unable to create test tx: %v", err) } + err = waitForMempoolTx(miner, txid) + if err != nil { + t.Fatalf("tx not relayed to miner: %v", err) + } + var wg sync.WaitGroup const ( numConfsClients = 5 @@ -514,6 +585,11 @@ func testTxConfirmedBeforeNtfnRegistration(miner *rpctest.Harness, t.Fatalf("unable to create test tx: %v", err) } + err = waitForMempoolTx(miner, txid3) + if err != nil { + t.Fatalf("tx not relayed to miner: %v", err) + } + // Generate another block containing tx 3, but we won't register conf // notifications for this tx until much later. The notifier must check // older blocks when the confirmation event is registered below to ensure @@ -529,11 +605,21 @@ func testTxConfirmedBeforeNtfnRegistration(miner *rpctest.Harness, t.Fatalf("unable to create test tx: %v", err) } + err = waitForMempoolTx(miner, txid1) + if err != nil { + t.Fatalf("tx not relayed to miner: %v", err) + } + txid2, err := getTestTxId(miner) if err != nil { t.Fatalf("unable to create test tx: %v", err) } + err = waitForMempoolTx(miner, txid2) + if err != nil { + t.Fatalf("tx not relayed to miner: %v", err) + } + _, currentHeight, err := miner.Node.GetBestBlock() if err != nil { t.Fatalf("unable to get current height: %v", err) @@ -654,6 +740,11 @@ func testLazyNtfnConsumer(miner *rpctest.Harness, t.Fatalf("unable to create test tx: %v", err) } + err = waitForMempoolTx(miner, txid) + if err != nil { + t.Fatalf("tx not relayed to miner: %v", err) + } + _, currentHeight, err := miner.Node.GetBestBlock() if err != nil { t.Fatalf("unable to get current height: %v", err) @@ -686,6 +777,11 @@ func testLazyNtfnConsumer(miner *rpctest.Harness, t.Fatalf("unable to create test tx: %v", err) } + err = waitForMempoolTx(miner, txid) + if err != nil { + t.Fatalf("tx not relayed to miner: %v", err) + } + _, currentHeight, err = miner.Node.GetBestBlock() if err != nil { t.Fatalf("unable to get current height: %v", err) @@ -736,6 +832,11 @@ func testSpendBeforeNtfnRegistration(miner *rpctest.Harness, t.Fatalf("unable to create test addr: %v", err) } + err = waitForMempoolTx(miner, txid) + if err != nil { + t.Fatalf("tx not relayed to miner: %v", err) + } + // Mine a single block which should include that txid above. if _, err := miner.Node.Generate(1); err != nil { t.Fatalf("unable to generate single block: %v", err) @@ -789,6 +890,11 @@ func testSpendBeforeNtfnRegistration(miner *rpctest.Harness, t.Fatalf("unable to brodacst tx: %v", err) } + err = waitForMempoolTx(miner, spenderSha) + if err != nil { + t.Fatalf("tx not relayed to miner: %v", err) + } + // Now we mine an additional block, which should include our spend. if _, err := miner.Node.Generate(1); err != nil { t.Fatalf("unable to generate single block: %v", err) @@ -877,6 +983,11 @@ func testCancelSpendNtfn(node *rpctest.Harness, t.Fatalf("unable to brodacst tx: %v", err) } + err = waitForMempoolTx(node, spenderSha) + if err != nil { + t.Fatalf("tx not relayed to miner: %v", err) + } + // Now we mine a single block, which should include our spend. The // notification should also be sent off. if _, err := node.Node.Generate(1); err != nil { @@ -1021,6 +1132,11 @@ func testReorgConf(miner *rpctest.Harness, notifier chainntnfs.ChainNotifier, t.Fatalf("unable to create test tx: %v", err) } + err = waitForMempoolTx(miner, txid) + if err != nil { + t.Fatalf("tx not relayed to miner: %v", err) + } + _, currentHeight, err := miner.Node.GetBestBlock() if err != nil { t.Fatalf("unable to get current height: %v", err) @@ -1094,11 +1210,16 @@ func testReorgConf(miner *rpctest.Harness, notifier chainntnfs.ChainNotifier, t.Fatalf("unable to get raw tx: %v", err) } - _, err = miner2.Node.SendRawTransaction(tx.MsgTx(), false) + txid, err = miner2.Node.SendRawTransaction(tx.MsgTx(), false) if err != nil { t.Fatalf("unable to get send tx: %v", err) } + err = waitForMempoolTx(miner, txid) + if err != nil { + t.Fatalf("tx not relayed to miner: %v", err) + } + _, err = miner.Node.Generate(3) if err != nil { t.Fatalf("unable to generate single block: %v", err) @@ -1206,12 +1327,73 @@ func TestInterfaces(t *testing.T) { switch notifierType { + case "bitcoind": + // Start a bitcoind instance. + tempBitcoindDir, err := ioutil.TempDir("", "bitcoind") + if err != nil { + t.Fatalf("Unable to create temp dir: %v", err) + } + zmqPath := "ipc:///" + tempBitcoindDir + "/weks.socket" + cleanUp1 := func() { + os.RemoveAll(tempBitcoindDir) + } + cleanUp = cleanUp1 + rpcPort := rand.Int()%(65536-1024) + 1024 + bitcoind := exec.Command( + "bitcoind", + "-datadir="+tempBitcoindDir, + "-regtest", + "-connect="+p2pAddr, + "-txindex", + "-rpcauth=weks:469e9bb14ab2360f8e226efed5ca6f"+ + "d$507c670e800a95284294edb5773b05544b"+ + "220110063096c221be9933c82d38e1", + fmt.Sprintf("-rpcport=%d", rpcPort), + "-disablewallet", + "-zmqpubrawblock="+zmqPath, + "-zmqpubrawtx="+zmqPath, + ) + err = bitcoind.Start() + if err != nil { + cleanUp1() + t.Fatalf("Couldn't start bitcoind: %v", err) + } + cleanUp2 := func() { + bitcoind.Process.Kill() + bitcoind.Wait() + cleanUp1() + } + cleanUp = cleanUp2 + + // Wait for the bitcoind instance to start up. + time.Sleep(time.Second) + + // Start the FilteredChainView implementation instance. + config := rpcclient.ConnConfig{ + Host: fmt.Sprintf( + "127.0.0.1:%d", rpcPort), + User: "weks", + Pass: "weks", + DisableAutoReconnect: false, + DisableConnectOnNew: true, + DisableTLS: true, + HTTPPostMode: true, + } + + notifier, err = notifierDriver.New(&config, zmqPath, + *netParams) + if err != nil { + t.Fatalf("unable to create %v notifier: %v", + notifierType, err) + } + case "btcd": notifier, err = notifierDriver.New(&rpcConfig) if err != nil { t.Fatalf("unable to create %v notifier: %v", notifierType, err) } + cleanUp = func() {} case "neutrino": spvDir, err := ioutil.TempDir("", "neutrino") diff --git a/glide.lock b/glide.lock index 9113b471..b6721ecc 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 8438e391bed32638a8e14402992af50b7656b380f69741cf8422584c2e1f2b31 -updated: 2017-12-13T15:13:22.311098343-08:00 +hash: d145c16f2f9cfdf4937eb8b7cdd65919a8c351593a179acc23f2cbca5b42f34b +updated: 2017-12-22T23:45:25.148488338-07:00 imports: - name: github.com/aead/chacha20 version: d31a916ded42d1640b9d89a26f8abd53cc96790c @@ -86,6 +86,8 @@ imports: version: 946bd9fbed05568b0f3cd188353d8aa28f38b688 subpackages: - internal/socket +- name: github.com/pebbe/zmq4 + version: 90d69e412a09549f2e90bac70fbb449081f1e5c1 - name: github.com/roasbeef/btcd version: 9978b939c33973be19b932fa7b936079bb7ba38d subpackages: diff --git a/glide.yaml b/glide.yaml index 00ef7b0c..0174fc59 100644 --- a/glide.yaml +++ b/glide.yaml @@ -24,6 +24,7 @@ import: - txscript - wire - connmgr +- package: github.com/pebbe/zmq4 - package: github.com/roasbeef/btcrpcclient version: d0f4db8b4dad0ca3d569b804f21247c3dd96acbb - package: github.com/roasbeef/btcutil diff --git a/lnwallet/btcwallet/blockchain.go b/lnwallet/btcwallet/blockchain.go index d930ac55..39f73b01 100644 --- a/lnwallet/btcwallet/blockchain.go +++ b/lnwallet/btcwallet/blockchain.go @@ -29,23 +29,7 @@ var ( // // This method is a part of the lnwallet.BlockChainIO interface. func (b *BtcWallet) GetBestBlock() (*chainhash.Hash, int32, error) { - switch backend := b.chain.(type) { - - case *chain.NeutrinoClient: - header, height, err := backend.CS.BlockHeaders.ChainTip() - if err != nil { - return nil, -1, err - } - - blockHash := header.BlockHash() - return &blockHash, int32(height), nil - - case *chain.RPCClient: - return backend.GetBestBlock() - - default: - return nil, -1, fmt.Errorf("unknown backend") - } + return b.chain.GetBestBlock() } // GetUtxo returns the original output referenced by the passed outpoint. @@ -100,6 +84,26 @@ func (b *BtcWallet) GetUtxo(op *wire.OutPoint, heightHint uint32) (*wire.TxOut, PkScript: pkScript, }, nil + case *chain.BitcoindClient: + txout, err := backend.GetTxOut(&op.Hash, op.Index, false) + if err != nil { + return nil, err + } else if txout == nil { + return nil, ErrOutputSpent + } + + pkScript, err := hex.DecodeString(txout.ScriptPubKey.Hex) + if err != nil { + return nil, err + } + + return &wire.TxOut{ + // Sadly, gettxout returns the output value in BTC + // instead of satoshis. + Value: int64(txout.Value * 1e8), + PkScript: pkScript, + }, nil + default: return nil, fmt.Errorf("unknown backend") } @@ -109,27 +113,7 @@ func (b *BtcWallet) GetUtxo(op *wire.OutPoint, heightHint uint32) (*wire.TxOut, // // This method is a part of the lnwallet.BlockChainIO interface. func (b *BtcWallet) GetBlock(blockHash *chainhash.Hash) (*wire.MsgBlock, error) { - switch backend := b.chain.(type) { - - case *chain.NeutrinoClient: - block, err := backend.CS.GetBlockFromNetwork(*blockHash) - if err != nil { - return nil, err - } - - return block.MsgBlock(), nil - - case *chain.RPCClient: - block, err := backend.GetBlock(blockHash) - if err != nil { - return nil, err - } - - return block, nil - - default: - return nil, fmt.Errorf("unknown backend") - } + return b.chain.GetBlock(blockHash) } // GetBlockHash returns the hash of the block in the best blockchain at the @@ -137,29 +121,7 @@ func (b *BtcWallet) GetBlock(blockHash *chainhash.Hash) (*wire.MsgBlock, error) // // This method is a part of the lnwallet.BlockChainIO interface. func (b *BtcWallet) GetBlockHash(blockHeight int64) (*chainhash.Hash, error) { - switch backend := b.chain.(type) { - - case *chain.NeutrinoClient: - height := uint32(blockHeight) - blockHeader, err := backend.CS.BlockHeaders.FetchHeaderByHeight(height) - if err != nil { - return nil, err - } - - blockHash := blockHeader.BlockHash() - return &blockHash, nil - - case *chain.RPCClient: - blockHash, err := backend.GetBlockHash(blockHeight) - if err != nil { - return nil, err - } - - return blockHash, nil - - default: - return nil, fmt.Errorf("unknown backend") - } + return b.chain.GetBlockHash(blockHeight) } // A compile time check to ensure that BtcWallet implements the BlockChainIO diff --git a/lnwallet/btcwallet/btcwallet.go b/lnwallet/btcwallet/btcwallet.go index 72c783b1..72bba4c9 100644 --- a/lnwallet/btcwallet/btcwallet.go +++ b/lnwallet/btcwallet/btcwallet.go @@ -119,6 +119,17 @@ func New(cfg Config) (*BtcWallet, error) { }, nil } +// BackEnd returns the underlying ChainService's name as a string. +// +// This is a part of the WalletController interface. +func (b *BtcWallet) BackEnd() string { + if b.chain != nil { + return b.chain.BackEnd() + } + + return "" +} + // Start initializes the underlying rpc connection, the wallet itself, and // begins syncing to the current available blockchain state. // @@ -668,22 +679,9 @@ func (b *BtcWallet) IsSynced() (bool, error) { // Next, query the chain backend to grab the info about the tip of the // main chain. - switch backend := b.cfg.ChainSource.(type) { - case *chain.NeutrinoClient: - header, height, err := backend.CS.BlockHeaders.ChainTip() - if err != nil { - return false, err - } - - bh := header.BlockHash() - bestHash = &bh - bestHeight = int32(height) - - case *chain.RPCClient: - bestHash, bestHeight, err = backend.GetBestBlock() - if err != nil { - return false, err - } + bestHash, bestHeight, err = b.cfg.ChainSource.GetBestBlock() + if err != nil { + return false, err } // If the wallet hasn't yet fully synced to the node's best chain tip, @@ -696,21 +694,9 @@ func (b *BtcWallet) IsSynced() (bool, error) { // still may not yet be synced as the chain backend may still be // catching up to the main chain. So we'll grab the block header in // order to make a guess based on the current time stamp. - var blockHeader *wire.BlockHeader - switch backend := b.cfg.ChainSource.(type) { - - case *chain.NeutrinoClient: - bh, _, err := backend.CS.BlockHeaders.FetchHeader(bestHash) - if err != nil { - return false, err - } - blockHeader = bh - - case *chain.RPCClient: - blockHeader, err = backend.GetBlockHeader(bestHash) - if err != nil { - return false, err - } + blockHeader, err := b.cfg.ChainSource.GetBlockHeader(bestHash) + if err != nil { + return false, err } // If the timestamp no the best header is more than 2 hours in the diff --git a/lnwallet/btcwallet/driver.go b/lnwallet/btcwallet/driver.go index 16078572..91f64973 100644 --- a/lnwallet/btcwallet/driver.go +++ b/lnwallet/btcwallet/driver.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/lightningnetwork/lnd/lnwallet" + "github.com/roasbeef/btcwallet/chain" ) const ( @@ -36,6 +37,7 @@ func init() { driver := &lnwallet.WalletDriver{ WalletType: walletType, New: createNewWallet, + BackEnds: chain.BackEnds, } if err := lnwallet.RegisterWallet(driver); err != nil { diff --git a/lnwallet/fee_estimator.go b/lnwallet/fee_estimator.go index a36fa81a..c98adf29 100644 --- a/lnwallet/fee_estimator.go +++ b/lnwallet/fee_estimator.go @@ -1,6 +1,8 @@ package lnwallet import ( + "encoding/json" + "github.com/roasbeef/btcd/blockchain" "github.com/roasbeef/btcd/rpcclient" "github.com/roasbeef/btcutil" @@ -200,3 +202,145 @@ func (b *BtcdFeeEstimator) fetchEstimatePerByte(confTarget uint32) (btcutil.Amou // A compile-time assertion to ensure that BtcdFeeEstimator implements the // FeeEstimator interface. var _ FeeEstimator = (*BtcdFeeEstimator)(nil) + +// BitcoindFeeEstimator is an implementation of the FeeEstimator interface +// backed by the RPC interface of an active bitcoind node. This implementation +// will proxy any fee estimation requests to bitcoind's RPC interace. +type BitcoindFeeEstimator struct { + // fallBackFeeRate is the fall back fee rate in satoshis per byte that + // is returned if the fee estimator does not yet have enough data to + // actually produce fee estimates. + fallBackFeeRate btcutil.Amount + + bitcoindConn *rpcclient.Client +} + +// NewBitcoindFeeEstimator creates a new BitcoindFeeEstimator given a fully +// populated rpc config that is able to successfully connect and authenticate +// with the bitcoind node, and also a fall back fee rate. The fallback fee rate +// is used in the occasion that the estimator has insufficient data, or returns +// zero for a fee estimate. +func NewBitcoindFeeEstimator(rpcConfig rpcclient.ConnConfig, + fallBackFeeRate btcutil.Amount) (*BitcoindFeeEstimator, error) { + + rpcConfig.DisableConnectOnNew = true + rpcConfig.DisableAutoReconnect = false + rpcConfig.DisableTLS = true + rpcConfig.HTTPPostMode = true + chainConn, err := rpcclient.New(&rpcConfig, nil) + if err != nil { + return nil, err + } + + return &BitcoindFeeEstimator{ + fallBackFeeRate: fallBackFeeRate, + bitcoindConn: chainConn, + }, nil +} + +// Start signals the FeeEstimator to start any processes or goroutines +// it needs to perform its duty. +// +// NOTE: This method is part of the FeeEstimator interface. +func (b *BitcoindFeeEstimator) Start() error { + return nil +} + +// Stop stops any spawned goroutines and cleans up the resources used +// by the fee estimator. +// +// NOTE: This method is part of the FeeEstimator interface. +func (b *BitcoindFeeEstimator) Stop() error { + return nil +} + +// EstimateFeePerByte takes in a target for the number of blocks until an +// initial confirmation and returns the estimated fee expressed in +// satoshis/byte. +func (b *BitcoindFeeEstimator) EstimateFeePerByte(numBlocks uint32) (btcutil.Amount, error) { + feeEstimate, err := b.fetchEstimatePerByte(numBlocks) + switch { + // If the estimator doesn't have enough data, or returns an error, then + // to return a proper value, then we'll return the default fall back + // fee rate. + case err != nil: + walletLog.Errorf("unable to query estimator: %v", err) + fallthrough + + case feeEstimate == 0: + return b.fallBackFeeRate, nil + } + + return feeEstimate, nil +} + +// EstimateFeePerWeight takes in a target for the number of blocks until an +// initial confirmation and returns the estimated fee expressed in +// satoshis/weight. +func (b *BitcoindFeeEstimator) EstimateFeePerWeight(numBlocks uint32) (btcutil.Amount, error) { + feePerByte, err := b.EstimateFeePerByte(numBlocks) + if err != nil { + return 0, err + } + + // We'll scale down the fee per byte to fee per weight, as for each raw + // byte, there's 1/4 unit of weight mapped to it. + satWeight := feePerByte / blockchain.WitnessScaleFactor + + // If this ends up scaling down to a zero sat/weight amount, then we'll + // use the default fallback fee rate. + // TODO(aakselrod): maybe use the per-byte rate if it's non-zero? + // Otherwise, we can return a higher sat/byte than sat/weight. + if satWeight == 0 { + return b.fallBackFeeRate / blockchain.WitnessScaleFactor, nil + } + + return satWeight, nil +} + +// fetchEstimate returns a fee estimate for a transaction be be confirmed in +// confTarget blocks. The estimate is returned in sat/byte. +func (b *BitcoindFeeEstimator) fetchEstimatePerByte(confTarget uint32) (btcutil.Amount, error) { + // First, we'll send an "estimatesmartfee" command as a raw request, + // since it isn't supported by btcd but is available in bitcoind. + target, err := json.Marshal(uint64(confTarget)) + if err != nil { + return 0, err + } + // TODO: Allow selection of economical/conservative modifiers. + resp, err := b.bitcoindConn.RawRequest("estimatesmartfee", + []json.RawMessage{target}) + if err != nil { + return 0, err + } + + // Next, we'll parse the response to get the BTC per KB. + feeEstimate := struct { + Feerate float64 `json:"feerate"` + }{} + err = json.Unmarshal(resp, &feeEstimate) + if err != nil { + return 0, err + } + + // Next, we'll convert the returned value to satoshis, as it's + // currently returned in BTC. + satPerKB, err := btcutil.NewAmount(feeEstimate.Feerate) + if err != nil { + return 0, err + } + + // The value returned is expressed in fees per KB, while we want + // fee-per-byte, so we'll divide by 1024 to map to satoshis-per-byte + // before returning the estimate. + satPerByte := satPerKB / 1024 + + walletLog.Debugf("Returning %v sat/byte for conf target of %v", + int64(satPerByte), confTarget) + + return satPerByte, nil +} + +// A compile-time assertion to ensure that BitcoindFeeEstimator implements the +// FeeEstimator interface. +var _ FeeEstimator = (*BitcoindFeeEstimator)(nil) diff --git a/lnwallet/interface.go b/lnwallet/interface.go index c257d3e6..88b261e0 100644 --- a/lnwallet/interface.go +++ b/lnwallet/interface.go @@ -207,6 +207,11 @@ type WalletController interface { // Stop signals the wallet for shutdown. Shutdown may entail closing // any active sockets, database handles, stopping goroutines, etc. Stop() error + + // BackEnd returns a name for the wallet's backing chain service, + // which could be e.g. btcd, bitcoind, neutrino, or another consensus + // service. + BackEnd() string } // BlockChainIO is a dedicated source which will be used to obtain queries @@ -288,6 +293,10 @@ type WalletDriver struct { // initialization flexibility, thereby accommodating several potential // WalletController implementations. New func(args ...interface{}) (WalletController, error) + + // BackEnds returns a list of available chain service drivers for the + // wallet driver. This could be e.g. bitcoind, btcd, neutrino, etc. + BackEnds func() []string } var ( diff --git a/lnwallet/interface_test.go b/lnwallet/interface_test.go index 9d09d595..7ba38e34 100644 --- a/lnwallet/interface_test.go +++ b/lnwallet/interface_test.go @@ -5,8 +5,10 @@ import ( "encoding/hex" "fmt" "io/ioutil" + "math/rand" "net" "os" + "os/exec" "path/filepath" "reflect" "runtime" @@ -17,7 +19,10 @@ import ( "github.com/boltdb/bolt" "github.com/davecgh/go-spew/spew" + "github.com/lightninglabs/neutrino" "github.com/roasbeef/btcwallet/chain" + "github.com/roasbeef/btcwallet/walletdb" + _ "github.com/roasbeef/btcwallet/walletdb/bdb" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chainntnfs/btcdnotify" @@ -25,10 +30,10 @@ import ( "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet/btcwallet" "github.com/lightningnetwork/lnd/lnwire" + "github.com/roasbeef/btcd/btcjson" "github.com/roasbeef/btcd/chaincfg" "github.com/roasbeef/btcd/chaincfg/chainhash" "github.com/roasbeef/btcd/rpcclient" - _ "github.com/roasbeef/btcwallet/walletdb/bdb" "github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/integration/rpctest" @@ -76,7 +81,7 @@ var ( 0x69, 0x49, 0x18, 0x83, 0x31, 0x98, 0x47, 0x53, } - netParams = &chaincfg.SimNetParams + netParams = &chaincfg.RegressionNetParams chainHash = netParams.GenesisHash _, alicePub = btcec.PrivKeyFromBytes(btcec.S256(), testHdSeed[:]) @@ -145,6 +150,12 @@ func calcStaticFee(numHTLCs int) btcutil.Amount { func loadTestCredits(miner *rpctest.Harness, w *lnwallet.LightningWallet, numOutputs, btcPerOutput int) error { + // For initial neutrino connection, wait a second. + // TODO(aakselrod): Eliminate the need for this. + switch w.BackEnd() { + case "neutrino": + time.Sleep(time.Second) + } // Using the mining node, spend from a coinbase output numOutputs to // give us btcPerOutput with each output. satoshiPerOutput := int64(btcPerOutput * 1e8) @@ -188,6 +199,7 @@ func loadTestCredits(miner *rpctest.Harness, w *lnwallet.LightningWallet, // Wait until the wallet has finished syncing up to the main chain. ticker := time.NewTicker(100 * time.Millisecond) + timeout := time.After(30 * time.Second) for range ticker.C { balance, err := w.ConfirmedBalance(1, false) @@ -197,6 +209,17 @@ func loadTestCredits(miner *rpctest.Harness, w *lnwallet.LightningWallet, if balance == expectedBalance { break } + select { + case <-timeout: + synced, err := w.IsSynced() + if err != nil { + return err + } + return fmt.Errorf("timed out after 30 seconds "+ + "waiting for balance %v, current balance %v, "+ + "synced: %t", expectedBalance, balance, synced) + default: + } } ticker.Stop() @@ -222,7 +245,7 @@ func createTestWallet(tempTestDir string, miningNode *rpctest.Harness, WalletController: wc, Signer: signer, ChainIO: bio, - FeeEstimator: lnwallet.StaticFeeEstimator{FeeRate: 250}, + FeeEstimator: lnwallet.StaticFeeEstimator{FeeRate: 10}, DefaultConstraints: channeldb.ChannelConstraints{ DustLimit: 500, MaxPendingAmount: lnwire.NewMSatFromSatoshis(btcutil.SatoshiPerBitcoin) * 100, @@ -343,6 +366,9 @@ func testDualFundingReservationWorkflow(miner *rpctest.Harness, bobFundingSigs, bobCommitSig, ) if err != nil { + for _, in := range aliceChanReservation.FinalFundingTx().TxIn { + fmt.Println(in.PreviousOutPoint.String()) + } t.Fatalf("unable to consume alice's sigs: %v", err) } _, err = bobChanReservation.CompleteReservation( @@ -384,6 +410,10 @@ func testDualFundingReservationWorkflow(miner *rpctest.Harness, // Mine a single block, the funding transaction should be included // within this block. + err = waitForMempoolTx(miner, &fundingSha) + if err != nil { + t.Fatalf("tx not relayed to miner: %v", err) + } blockHashes, err := miner.Node.Generate(1) if err != nil { t.Fatalf("unable to generate block: %v", err) @@ -402,6 +432,16 @@ func testDualFundingReservationWorkflow(miner *rpctest.Harness, assertReservationDeleted(aliceChanReservation, t) assertReservationDeleted(bobChanReservation, t) + + // Wait for wallets to catch up to prevent issues in subsequent tests. + err = waitForWalletSync(miner, alice) + if err != nil { + t.Fatalf("unable to sync alice: %v", err) + } + err = waitForWalletSync(miner, bob) + if err != nil { + t.Fatalf("unable to sync bob: %v", err) + } } func testFundingTransactionLockedOutputs(miner *rpctest.Harness, @@ -759,6 +799,10 @@ func testSingleFunderReservationWorkflow(miner *rpctest.Harness, // Mine a single block, the funding transaction should be included // within this block. + err = waitForMempoolTx(miner, &fundingSha) + if err != nil { + t.Fatalf("tx not relayed to miner: %v", err) + } blockHashes, err := miner.Node.Generate(1) if err != nil { t.Fatalf("unable to generate block: %v", err) @@ -768,7 +812,8 @@ func testSingleFunderReservationWorkflow(miner *rpctest.Harness, t.Fatalf("unable to find block: %v", err) } if len(block.Transactions) != 2 { - t.Fatalf("funding transaction wasn't mined: %v", err) + t.Fatalf("funding transaction wasn't mined: %d", + len(block.Transactions)) } blockTx := block.Transactions[1] if blockTx.TxHash() != fundingSha { @@ -815,8 +860,10 @@ func testListTransactionDetails(miner *rpctest.Harness, } // Next, fetch all the current transaction details. - // TODO(roasbeef): use ntfn client here instead? - time.Sleep(time.Second * 2) + err = waitForWalletSync(miner, alice) + if err != nil { + t.Fatalf("Couldn't sync Alice's wallet: %v", err) + } txDetails, err := alice.ListTransactionDetails() if err != nil { t.Fatalf("unable to fetch tx details: %v", err) @@ -905,6 +952,10 @@ func testListTransactionDetails(miner *rpctest.Harness, if err != nil { t.Fatalf("unable to create burn tx: %v", err) } + err = waitForMempoolTx(miner, burnTXID) + if err != nil { + t.Fatalf("tx not relayed to miner: %v", err) + } burnBlock, err := miner.Node.Generate(1) if err != nil { t.Fatalf("unable to mine block: %v", err) @@ -912,7 +963,10 @@ func testListTransactionDetails(miner *rpctest.Harness, // Fetch the transaction details again, the new transaction should be // shown as debiting from the wallet's balance. - time.Sleep(time.Second * 2) + err = waitForWalletSync(miner, alice) + if err != nil { + t.Fatalf("Couldn't sync Alice's wallet: %v", err) + } txDetails, err = alice.ListTransactionDetails() if err != nil { t.Fatalf("unable to fetch tx details: %v", err) @@ -955,7 +1009,7 @@ func testTransactionSubscriptions(miner *rpctest.Harness, // implementation of the WalletController. txClient, err := alice.SubscribeTransactions() if err != nil { - t.Fatalf("unable to generate tx subscription: %v", err) + t.Skipf("unable to generate tx subscription: %v", err) } defer txClient.Cancel() @@ -964,25 +1018,33 @@ func testTransactionSubscriptions(miner *rpctest.Harness, numTxns = 3 ) unconfirmedNtfns := make(chan struct{}) - go func() { - for i := 0; i < numTxns; i++ { - txDetail := <-txClient.UnconfirmedTransactions() - if txDetail.NumConfirmations != 0 { - t.Fatalf("incorrect number of confs, expected %v got %v", - 0, txDetail.NumConfirmations) + switch alice.BackEnd() { + case "neutrino": + // Neutrino doesn't listen for unconfirmed transactions. + default: + go func() { + for i := 0; i < numTxns; i++ { + txDetail := <-txClient.UnconfirmedTransactions() + if txDetail.NumConfirmations != 0 { + t.Fatalf("incorrect number of confs, "+ + "expected %v got %v", 0, + txDetail.NumConfirmations) + } + if txDetail.Value != outputAmt { + t.Fatalf("incorrect output amt, "+ + "expected %v got %v", outputAmt, + txDetail.Value) + } + if txDetail.BlockHash != nil { + t.Fatalf("block hash should be nil, "+ + "is instead %v", + txDetail.BlockHash) + } } - if txDetail.Value != outputAmt { - t.Fatalf("incorrect output amt, expected %v got %v", - outputAmt, txDetail.Value) - } - if txDetail.BlockHash != nil { - t.Fatalf("block hash should be nil, is instead %v", - txDetail.BlockHash) - } - } - close(unconfirmedNtfns) - }() + close(unconfirmedNtfns) + }() + } // Next, fetch a fresh address from the wallet, create 3 new outputs // with the pkScript. @@ -1000,17 +1062,27 @@ func testTransactionSubscriptions(miner *rpctest.Harness, Value: outputAmt, PkScript: script, } - if _, err := miner.SendOutputs([]*wire.TxOut{output}, 10); err != nil { + txid, err := miner.SendOutputs([]*wire.TxOut{output}, 10) + if err != nil { t.Fatalf("unable to send coinbase: %v", err) } + err = waitForMempoolTx(miner, txid) + if err != nil { + t.Fatalf("tx not relayed to miner: %v", err) + } } - // We should receive a notification for all three transactions - // generated above. - select { - case <-time.After(time.Second * 5): - t.Fatalf("transactions not received after 3 seconds") - case <-unconfirmedNtfns: // Fall through on successs + switch alice.BackEnd() { + case "neutrino": + // Neutrino doesn't listen for on unconfirmed transactions. + default: + // We should receive a notification for all three transactions + // generated above. + select { + case <-time.After(time.Second * 10): + t.Fatalf("transactions not received after 10 seconds") + case <-unconfirmedNtfns: // Fall through on successs + } } confirmedNtfns := make(chan struct{}) @@ -1018,12 +1090,12 @@ func testTransactionSubscriptions(miner *rpctest.Harness, for i := 0; i < numTxns; i++ { txDetail := <-txClient.ConfirmedTransactions() if txDetail.NumConfirmations != 1 { - t.Fatalf("incorrect number of confs, expected %v got %v", - 1, txDetail.NumConfirmations) + t.Fatalf("incorrect number of confs for %s, expected %v got %v", + txDetail.Hash, 1, txDetail.NumConfirmations) } if txDetail.Value != outputAmt { - t.Fatalf("incorrect output amt, expected %v got %v", - outputAmt, txDetail.Value) + t.Fatalf("incorrect output amt, expected %v got %v in txid %s", + outputAmt, txDetail.Value, txDetail.Hash) } } close(confirmedNtfns) @@ -1039,7 +1111,7 @@ func testTransactionSubscriptions(miner *rpctest.Harness, // since they should be mined in the next block. select { case <-time.After(time.Second * 5): - t.Fatalf("transactions not received after 3 seconds") + t.Fatalf("transactions not received after 5 seconds") case <-confirmedNtfns: // Fall through on success } } @@ -1088,7 +1160,7 @@ func testSignOutputUsingTweaks(r *rpctest.Harness, // generate a regular p2wkh from that. pubkeyHash := btcutil.Hash160(tweakedKey.SerializeCompressed()) keyAddr, err := btcutil.NewAddressWitnessPubKeyHash(pubkeyHash, - &chaincfg.SimNetParams) + &chaincfg.RegressionNetParams) if err != nil { t.Fatalf("unable to create addr: %v", err) } @@ -1110,6 +1182,10 @@ func testSignOutputUsingTweaks(r *rpctest.Harness, // Query for the transaction generated above so we can located // the index of our output. + err = waitForMempoolTx(r, txid) + if err != nil { + t.Fatalf("tx not relayed to miner: %v", err) + } tx, err := r.Node.GetRawTransaction(txid) if err != nil { t.Fatalf("unable to query for tx: %v", err) @@ -1197,7 +1273,7 @@ func testReorgWalletBalance(r *rpctest.Harness, w *lnwallet.LightningWallet, } // Give wallet time to catch up. - err = waitForWalletSync(w) + err = waitForWalletSync(r, w) if err != nil { t.Fatalf("unable to sync wallet: %v", err) } @@ -1222,16 +1298,21 @@ func testReorgWalletBalance(r *rpctest.Harness, w *lnwallet.LightningWallet, Value: 1e8, PkScript: script, } - if _, err = w.SendOutputs([]*wire.TxOut{output}, 10); err != nil { + txid, err := w.SendOutputs([]*wire.TxOut{output}, 10) + if err != nil { t.Fatalf("unable to send outputs: %v", err) } + err = waitForMempoolTx(r, txid) + if err != nil { + t.Fatalf("tx not relayed to miner: %v", err) + } _, err = r.Node.Generate(50) if err != nil { t.Fatalf("unable to generate blocks on passed node: %v", err) } // Give wallet time to catch up. - err = waitForWalletSync(w) + err = waitForWalletSync(r, w) if err != nil { t.Fatalf("unable to sync wallet: %v", err) } @@ -1277,32 +1358,34 @@ func testReorgWalletBalance(r *rpctest.Harness, w *lnwallet.LightningWallet, // one block on the passed miner and two on the created miner, // connecting them, and waiting for them to sync. for i := 0; i < 5; i++ { - peers, err := r2.Node.GetPeerInfo() - if err != nil { - t.Fatalf("unable to get peer info: %v", err) - } - numPeers := len(peers) - err = r2.Node.AddNode(r.P2PAddress(), rpcclient.ANRemove) - if err != nil { - t.Fatalf("unable to disconnect mining nodes: %v", err) - } // Wait for disconnection timeout := time.After(30 * time.Second) - for true { + stillConnected := true + var peers []btcjson.GetPeerInfoResult + for stillConnected { // Allow for timeout + time.Sleep(100 * time.Millisecond) select { case <-timeout: t.Fatalf("timeout waiting for miner disconnect") default: } + err = r2.Node.AddNode(r.P2PAddress(), rpcclient.ANRemove) + if err != nil { + t.Fatalf("unable to disconnect mining nodes: %v", + err) + } peers, err = r2.Node.GetPeerInfo() if err != nil { t.Fatalf("unable to get peer info: %v", err) } - if len(peers) < numPeers { - break + stillConnected = false + for _, peer := range peers { + if peer.Addr == r.P2PAddress() { + stillConnected = true + break + } } - time.Sleep(100 * time.Millisecond) } _, err = r.Node.Generate(2) if err != nil { @@ -1318,8 +1401,16 @@ func testReorgWalletBalance(r *rpctest.Harness, w *lnwallet.LightningWallet, // Step 5: Reconnect the miners and wait for them to synchronize. err = r2.Node.AddNode(r.P2PAddress(), rpcclient.ANAdd) if err != nil { - t.Fatalf("unable to connect mining nodes together: %v", - err) + switch err := err.(type) { + case *btcjson.RPCError: + if err.Code != -8 { + t.Fatalf("unable to connect mining "+ + "nodes together: %v", err) + } + default: + t.Fatalf("unable to connect mining nodes "+ + "together: %v", err) + } } err = rpctest.JoinNodes([]*rpctest.Harness{r2, r}, rpctest.Blocks) @@ -1328,7 +1419,7 @@ func testReorgWalletBalance(r *rpctest.Harness, w *lnwallet.LightningWallet, } // Give wallet time to catch up. - err = waitForWalletSync(w) + err = waitForWalletSync(r, w) if err != nil { t.Fatalf("unable to sync wallet: %v", err) } @@ -1405,21 +1496,78 @@ func clearWalletStates(a, b *lnwallet.LightningWallet) error { return b.Cfg.Database.Wipe() } -func waitForWalletSync(w *lnwallet.LightningWallet) error { - var synced bool +func waitForMempoolTx(r *rpctest.Harness, txid *chainhash.Hash) error { + var found bool + var tx *btcutil.Tx var err error timeout := time.After(10 * time.Second) - for !synced { - synced, err = w.IsSynced() - if err != nil { - return err - } + for !found { + // Do a short wait select { case <-timeout: return fmt.Errorf("timeout after 10s") default: } time.Sleep(100 * time.Millisecond) + + // Check for the harness' knowledge of the txid + tx, err = r.Node.GetRawTransaction(txid) + if err != nil { + switch e := err.(type) { + case *btcjson.RPCError: + if e.Code == btcjson.ErrRPCNoTxInfo { + continue + } + default: + } + return err + } + if tx != nil && tx.MsgTx().TxHash() == *txid { + found = true + } + } + return nil +} + +func waitForWalletSync(r *rpctest.Harness, w *lnwallet.LightningWallet) error { + var synced bool + var err error + var bestHash, knownHash *chainhash.Hash + var bestHeight, knownHeight int32 + timeout := time.After(10 * time.Second) + for !synced { + // Do a short wait + select { + case <-timeout: + return fmt.Errorf("timeout after 10s") + default: + } + time.Sleep(100 * time.Millisecond) + + // Check whether the chain source of the wallet is caught up to + // the harness it's supposed to be catching up to. + bestHash, bestHeight, err = r.Node.GetBestBlock() + if err != nil { + return err + } + knownHash, knownHeight, err = w.Cfg.ChainIO.GetBestBlock() + if err != nil { + return err + } + if knownHeight != bestHeight { + continue + } + if *knownHash != *bestHash { + return fmt.Errorf("hash at height %d doesn't match: "+ + "expected %s, got %s", bestHeight, bestHash, + knownHash) + } + + // Check for synchronization. + synced, err = w.IsSynced() + if err != nil { + return err + } } return nil } @@ -1454,7 +1602,7 @@ func TestLightningWallet(t *testing.T) { } // Next mine enough blocks in order for segwit and the CSV package - // soft-fork to activate on SimNet. + // soft-fork to activate on RegNet. numBlocks := netParams.MinerConfirmationWindow * 2 if _, err := miningNode.Node.Generate(numBlocks); err != nil { t.Fatalf("unable to generate blocks: %v", err) @@ -1470,6 +1618,22 @@ func TestLightningWallet(t *testing.T) { t.Fatalf("unable to start notifier: %v", err) } + for _, walletDriver := range lnwallet.RegisteredWallets() { + for _, backEnd := range walletDriver.BackEnds() { + runTests(t, walletDriver, backEnd, miningNode, + rpcConfig, chainNotifier) + } + } +} + +// runTests runs all of the tests for a single interface implementation and +// chain back-end combination. This makes it easier to use `defer` as well as +// factoring out the test logic from the loop which cycles through the +// interface implementations. +func runTests(t *testing.T, walletDriver *lnwallet.WalletDriver, + backEnd string, miningNode *rpctest.Harness, + rpcConfig rpcclient.ConnConfig, + chainNotifier *btcdnotify.BtcdNotifier) { var ( bio lnwallet.BlockChainIO @@ -1478,107 +1642,230 @@ func TestLightningWallet(t *testing.T) { aliceWalletController lnwallet.WalletController bobWalletController lnwallet.WalletController + + feeEstimator lnwallet.FeeEstimator ) - for _, walletDriver := range lnwallet.RegisteredWallets() { - tempTestDirAlice, err := ioutil.TempDir("", "lnwallet") - if err != nil { - t.Fatalf("unable to create temp directory: %v", err) - } - defer os.RemoveAll(tempTestDirAlice) - tempTestDirBob, err := ioutil.TempDir("", "lnwallet") - if err != nil { - t.Fatalf("unable to create temp directory: %v", err) - } - defer os.RemoveAll(tempTestDirBob) + tempTestDirAlice, err := ioutil.TempDir("", "lnwallet") + if err != nil { + t.Fatalf("unable to create temp directory: %v", err) + } + defer os.RemoveAll(tempTestDirAlice) - walletType := walletDriver.WalletType - switch walletType { - case "btcwallet": - aliceChainRPC, err := chain.NewRPCClient(netParams, + tempTestDirBob, err := ioutil.TempDir("", "lnwallet") + if err != nil { + t.Fatalf("unable to create temp directory: %v", err) + } + defer os.RemoveAll(tempTestDirBob) + + walletType := walletDriver.WalletType + switch walletType { + case "btcwallet": + var aliceClient, bobClient chain.Interface + switch backEnd { + case "btcd": + feeEstimator, err = lnwallet.NewBtcdFeeEstimator( + rpcConfig, 250) + if err != nil { + t.Fatalf("unable to create btcd fee estimator: %v", + err) + } + aliceClient, err = chain.NewRPCClient(netParams, rpcConfig.Host, rpcConfig.User, rpcConfig.Pass, rpcConfig.Certificates, false, 20) if err != nil { t.Fatalf("unable to make chain rpc: %v", err) } - aliceWalletConfig := &btcwallet.Config{ - PrivatePass: []byte("alice-pass"), - HdSeed: aliceHDSeed[:], - DataDir: tempTestDirAlice, - NetParams: netParams, - ChainSource: aliceChainRPC, - FeeEstimator: lnwallet.StaticFeeEstimator{FeeRate: 250}, - } - aliceWalletController, err = walletDriver.New(aliceWalletConfig) - if err != nil { - t.Fatalf("unable to create btcwallet: %v", err) - } - aliceSigner = aliceWalletController.(*btcwallet.BtcWallet) - - bobChainRPC, err := chain.NewRPCClient(netParams, + bobClient, err = chain.NewRPCClient(netParams, rpcConfig.Host, rpcConfig.User, rpcConfig.Pass, rpcConfig.Certificates, false, 20) if err != nil { t.Fatalf("unable to make chain rpc: %v", err) } - bobWalletConfig := &btcwallet.Config{ - PrivatePass: []byte("bob-pass"), - HdSeed: bobHDSeed[:], - DataDir: tempTestDirBob, - NetParams: netParams, - ChainSource: bobChainRPC, - FeeEstimator: lnwallet.StaticFeeEstimator{FeeRate: 250}, - } - bobWalletController, err = walletDriver.New(bobWalletConfig) + case "neutrino": + feeEstimator = lnwallet.StaticFeeEstimator{FeeRate: 250} + // Set some package-level variable to speed up + // operation for tests. + neutrino.WaitForMoreCFHeaders = time.Millisecond * 100 + neutrino.BanDuration = time.Millisecond * 100 + neutrino.QueryTimeout = time.Millisecond * 500 + neutrino.QueryNumRetries = 2 + // Start Alice - open a database, start a neutrino + // instance, and initialize a btcwallet driver for it. + aliceDB, err := walletdb.Create("bdb", + tempTestDirAlice+"/neutrino.db") if err != nil { - t.Fatalf("unable to create btcwallet: %v", err) + t.Fatalf("unable to create DB: %v", err) + } + defer aliceDB.Close() + aliceChain, err := neutrino.NewChainService( + neutrino.Config{ + DataDir: tempTestDirAlice, + Database: aliceDB, + Namespace: []byte("alice"), + ChainParams: *netParams, + ConnectPeers: []string{ + miningNode.P2PAddress(), + }, + }, + ) + if err != nil { + t.Fatalf("unable to make neutrino: %v", err) + } + aliceChain.Start() + defer aliceChain.Stop() + aliceClient = chain.NewNeutrinoClient(aliceChain) + + // Start Bob - open a database, start a neutrino + // instance, and initialize a btcwallet driver for it. + bobDB, err := walletdb.Create("bdb", + tempTestDirBob+"/neutrino.db") + if err != nil { + t.Fatalf("unable to create DB: %v", err) + } + defer bobDB.Close() + bobChain, err := neutrino.NewChainService( + neutrino.Config{ + DataDir: tempTestDirBob, + Database: bobDB, + Namespace: []byte("bob"), + ChainParams: *netParams, + ConnectPeers: []string{ + miningNode.P2PAddress(), + }, + }, + ) + if err != nil { + t.Fatalf("unable to make neutrino: %v", err) + } + bobChain.Start() + defer bobChain.Stop() + bobClient = chain.NewNeutrinoClient(bobChain) + case "bitcoind": + feeEstimator, err = lnwallet.NewBitcoindFeeEstimator( + rpcConfig, 250) + if err != nil { + t.Fatalf("unable to create bitcoind fee estimator: %v", + err) + } + // Start a bitcoind instance. + tempBitcoindDir, err := ioutil.TempDir("", "bitcoind") + if err != nil { + t.Fatalf("unable to create temp directory: %v", err) + } + zmqPath := "ipc:///" + tempBitcoindDir + "/weks.socket" + defer os.RemoveAll(tempBitcoindDir) + rpcPort := rand.Int()%(65536-1024) + 1024 + bitcoind := exec.Command( + "bitcoind", + "-datadir="+tempBitcoindDir, + "-regtest", + "-connect="+miningNode.P2PAddress(), + "-txindex", + "-rpcauth=weks:469e9bb14ab2360f8e226efed5ca6f"+ + "d$507c670e800a95284294edb5773b05544b"+ + "220110063096c221be9933c82d38e1", + fmt.Sprintf("-rpcport=%d", rpcPort), + "-disablewallet", + "-zmqpubrawblock="+zmqPath, + "-zmqpubrawtx="+zmqPath, + ) + err = bitcoind.Start() + if err != nil { + t.Fatalf("couldn't start bitcoind: %v", err) + } + defer bitcoind.Wait() + defer bitcoind.Process.Kill() + + // Start an Alice btcwallet bitcoind back end instance. + aliceClient, err = chain.NewBitcoindClient(netParams, + fmt.Sprintf("127.0.0.1:%d", rpcPort), "weks", + "weks", zmqPath, 100*time.Millisecond) + if err != nil { + t.Fatalf("couldn't start alice client: %v", err) + } + + // Start a Bob btcwallet bitcoind back end instance. + bobClient, err = chain.NewBitcoindClient(netParams, + fmt.Sprintf("127.0.0.1:%d", rpcPort), "weks", + "weks", zmqPath, 100*time.Millisecond) + if err != nil { + t.Fatalf("couldn't start bob client: %v", err) } - bobSigner = bobWalletController.(*btcwallet.BtcWallet) - bio = bobWalletController.(*btcwallet.BtcWallet) default: - // TODO(roasbeef): add neutrino case - t.Fatalf("unknown wallet driver: %v", walletType) + t.Fatalf("unknown chain driver: %v", backEnd) } - // Funding via 20 outputs with 4BTC each. - alice, err := createTestWallet(tempTestDirAlice, miningNode, - netParams, chainNotifier, aliceWalletController, - aliceSigner, bio) + aliceWalletConfig := &btcwallet.Config{ + PrivatePass: []byte("alice-pass"), + HdSeed: aliceHDSeed[:], + DataDir: tempTestDirAlice, + NetParams: netParams, + ChainSource: aliceClient, + FeeEstimator: feeEstimator, + } + aliceWalletController, err = walletDriver.New(aliceWalletConfig) if err != nil { - t.Fatalf("unable to create test ln wallet: %v", err) + t.Fatalf("unable to create btcwallet: %v", err) } - defer alice.Shutdown() + aliceSigner = aliceWalletController.(*btcwallet.BtcWallet) - bob, err := createTestWallet(tempTestDirBob, miningNode, - netParams, chainNotifier, bobWalletController, - bobSigner, bio) + bobWalletConfig := &btcwallet.Config{ + PrivatePass: []byte("bob-pass"), + HdSeed: bobHDSeed[:], + DataDir: tempTestDirBob, + NetParams: netParams, + ChainSource: bobClient, + FeeEstimator: feeEstimator, + } + bobWalletController, err = walletDriver.New(bobWalletConfig) if err != nil { - t.Fatalf("unable to create test ln wallet: %v", err) + t.Fatalf("unable to create btcwallet: %v", err) } - defer bob.Shutdown() + bobSigner = bobWalletController.(*btcwallet.BtcWallet) + bio = bobWalletController.(*btcwallet.BtcWallet) + default: + t.Fatalf("unknown wallet driver: %v", walletType) + } - // Both wallets should now have 80BTC available for spending. - assertProperBalance(t, alice, 1, 80) - assertProperBalance(t, bob, 1, 80) + // Funding via 20 outputs with 4BTC each. + alice, err := createTestWallet(tempTestDirAlice, miningNode, netParams, + chainNotifier, aliceWalletController, aliceSigner, bio) + if err != nil { + t.Fatalf("unable to create test ln wallet: %v", err) + } + defer alice.Shutdown() - // Execute every test, clearing possibly mutated wallet state - // after each step. - for _, walletTest := range walletTests { - testName := fmt.Sprintf("%v:%v", walletType, - walletTest.name) - success := t.Run(testName, func(t *testing.T) { - walletTest.test(miningNode, alice, bob, t) - }) - if !success { - break - } + bob, err := createTestWallet(tempTestDirBob, miningNode, netParams, + chainNotifier, bobWalletController, bobSigner, bio) + if err != nil { + t.Fatalf("unable to create test ln wallet: %v", err) + } + defer bob.Shutdown() - // TODO(roasbeef): possible reset mining node's - // chainstate to initial level, cleanly wipe buckets - if err := clearWalletStates(alice, bob); err != nil && - err != bolt.ErrBucketNotFound { - t.Fatalf("unable to wipe wallet state: %v", err) - } + // Both wallets should now have 80BTC available for + // spending. + assertProperBalance(t, alice, 1, 80) + assertProperBalance(t, bob, 1, 80) + + // Execute every test, clearing possibly mutated + // wallet state after each step. + for _, walletTest := range walletTests { + testName := fmt.Sprintf("%v/%v:%v", walletType, backEnd, + walletTest.name) + success := t.Run(testName, func(t *testing.T) { + walletTest.test(miningNode, alice, bob, t) + }) + if !success { + break + } + + // TODO(roasbeef): possible reset mining + // node's chainstate to initial level, cleanly + // wipe buckets + if err := clearWalletStates(alice, bob); err != + nil && err != bolt.ErrBucketNotFound { + t.Fatalf("unable to wipe wallet state: %v", err) } } } diff --git a/routing/chainview/bitcoind.go b/routing/chainview/bitcoind.go new file mode 100644 index 00000000..6ec0b43f --- /dev/null +++ b/routing/chainview/bitcoind.go @@ -0,0 +1,460 @@ +package chainview + +import ( + "bytes" + "encoding/hex" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/roasbeef/btcd/btcjson" + "github.com/roasbeef/btcd/chaincfg" + "github.com/roasbeef/btcd/chaincfg/chainhash" + "github.com/roasbeef/btcd/rpcclient" + "github.com/roasbeef/btcd/wire" + "github.com/roasbeef/btcutil" + "github.com/roasbeef/btcwallet/chain" + "github.com/roasbeef/btcwallet/wtxmgr" +) + +// BitcoindFilteredChainView is an implementation of the FilteredChainView +// interface which is backed by bitcoind. +type BitcoindFilteredChainView struct { + started int32 + stopped int32 + + // bestHeight is the height of the latest block added to the + // blockQueue from the onFilteredConnectedMethod. It is used to + // determine up to what height we would need to rescan in case + // of a filter update. + bestHeightMtx sync.Mutex + bestHeight uint32 + + // TODO: Factor out common logic between bitcoind and btcd into a + // NodeFilteredView interface. + chainClient *chain.BitcoindClient + + // blockEventQueue is the ordered queue used to keep the order + // of connected and disconnected blocks sent to the reader of the + // chainView. + blockQueue *blockEventQueue + + // filterUpdates is a channel in which updates to the utxo filter + // attached to this instance are sent over. + filterUpdates chan filterUpdate + + // chainFilter is the set of utox's that we're currently watching + // spends for within the chain. + filterMtx sync.RWMutex + chainFilter map[wire.OutPoint]struct{} + + // filterBlockReqs is a channel in which requests to filter select + // blocks will be sent over. + filterBlockReqs chan *filterBlockReq + + quit chan struct{} + wg sync.WaitGroup +} + +// A compile time check to ensure BitcoindFilteredChainView implements the +// chainview.FilteredChainView. +var _ FilteredChainView = (*BitcoindFilteredChainView)(nil) + +// NewBitcoindFilteredChainView creates a new instance of a FilteredChainView +// from RPC credentials and a ZMQ socket address for a bitcoind instance. +func NewBitcoindFilteredChainView(config rpcclient.ConnConfig, + zmqConnect string, params chaincfg.Params) (*BitcoindFilteredChainView, + error) { + chainView := &BitcoindFilteredChainView{ + chainFilter: make(map[wire.OutPoint]struct{}), + filterUpdates: make(chan filterUpdate), + filterBlockReqs: make(chan *filterBlockReq), + quit: make(chan struct{}), + } + + chainConn, err := chain.NewBitcoindClient(¶ms, config.Host, + config.User, config.Pass, zmqConnect, 100*time.Millisecond) + if err != nil { + return nil, err + } + chainView.chainClient = chainConn + + chainView.blockQueue = newBlockEventQueue() + + return chainView, nil +} + +// Start starts all goroutines necessary for normal operation. +// +// NOTE: This is part of the FilteredChainView interface. +func (b *BitcoindFilteredChainView) Start() error { + // Already started? + if atomic.AddInt32(&b.started, 1) != 1 { + return nil + } + + log.Infof("FilteredChainView starting") + + err := b.chainClient.Start() + if err != nil { + return err + } + + _, bestHeight, err := b.chainClient.GetBestBlock() + if err != nil { + return err + } + + b.bestHeightMtx.Lock() + b.bestHeight = uint32(bestHeight) + b.bestHeightMtx.Unlock() + + b.blockQueue.Start() + + b.wg.Add(1) + go b.chainFilterer() + + return nil +} + +// Stop stops all goroutines which we launched by the prior call to the Start +// method. +// +// NOTE: This is part of the FilteredChainView interface. +func (b *BitcoindFilteredChainView) Stop() error { + // Already shutting down? + if atomic.AddInt32(&b.stopped, 1) != 1 { + return nil + } + + // Shutdown the rpc client, this gracefully disconnects from bitcoind's + // zmq socket, and cleans up all related resources. + b.chainClient.Stop() + + b.blockQueue.Stop() + + log.Infof("FilteredChainView stopping") + + close(b.quit) + b.wg.Wait() + + return nil +} + +// onFilteredBlockConnected is called for each block that's connected to the +// end of the main chain. Based on our current chain filter, the block may or +// may not include any relevant transactions. +func (b *BitcoindFilteredChainView) onFilteredBlockConnected(height int32, + hash chainhash.Hash, txns []*wtxmgr.TxRecord) { + + mtxs := make([]*wire.MsgTx, len(txns)) + for i, tx := range txns { + mtxs[i] = &tx.MsgTx + + for _, txIn := range mtxs[i].TxIn { + // We can delete this outpoint from the chainFilter, as + // we just received a block where it was spent. In case + // of a reorg, this outpoint might get "un-spent", but + // that's okay since it would never be wise to consider + // the channel open again (since a spending transaction + // exists on the network). + b.filterMtx.Lock() + delete(b.chainFilter, txIn.PreviousOutPoint) + b.filterMtx.Unlock() + } + + } + + // We record the height of the last connected block added to the + // blockQueue such that we can scan up to this height in case of + // a rescan. It must be protected by a mutex since a filter update + // might be trying to read it concurrently. + b.bestHeightMtx.Lock() + b.bestHeight = uint32(height) + b.bestHeightMtx.Unlock() + + block := &FilteredBlock{ + Hash: hash, + Height: uint32(height), + Transactions: mtxs, + } + + b.blockQueue.Add(&blockEvent{ + eventType: connected, + block: block, + }) +} + +// onFilteredBlockDisconnected is a callback which is executed once a block is +// disconnected from the end of the main chain. +func (b *BitcoindFilteredChainView) onFilteredBlockDisconnected(height int32, + hash chainhash.Hash) { + + log.Debugf("got disconnected block at height %d: %v", height, + hash) + + filteredBlock := &FilteredBlock{ + Hash: hash, + Height: uint32(height), + } + + b.blockQueue.Add(&blockEvent{ + eventType: disconnected, + block: filteredBlock, + }) +} + +// FilterBlock takes a block hash, and returns a FilteredBlocks which is the +// result of applying the current registered UTXO sub-set on the block +// corresponding to that block hash. If any watched UTOX's are spent by the +// selected lock, then the internal chainFilter will also be updated. +// +// NOTE: This is part of the FilteredChainView interface. +func (b *BitcoindFilteredChainView) FilterBlock(blockHash *chainhash.Hash) (*FilteredBlock, error) { + req := &filterBlockReq{ + blockHash: blockHash, + resp: make(chan *FilteredBlock, 1), + err: make(chan error, 1), + } + + select { + case b.filterBlockReqs <- req: + case <-b.quit: + return nil, fmt.Errorf("FilteredChainView shutting down") + } + + return <-req.resp, <-req.err +} + +// chainFilterer is the primary goroutine which: listens for new blocks coming +// and dispatches the relevent FilteredBlock notifications, updates the filter +// due to requests by callers, and finally is able to preform targeted block +// filtration. +// +// TODO(roasbeef): change to use loadfilter RPC's +func (b *BitcoindFilteredChainView) chainFilterer() { + defer b.wg.Done() + + // filterBlock is a helper funciton that scans the given block, and + // notes which transactions spend outputs which are currently being + // watched. Additionally, the chain filter will also be updated by + // removing any spent outputs. + filterBlock := func(blk *wire.MsgBlock) []*wire.MsgTx { + var filteredTxns []*wire.MsgTx + for _, tx := range blk.Transactions { + for _, txIn := range tx.TxIn { + prevOp := txIn.PreviousOutPoint + if _, ok := b.chainFilter[prevOp]; ok { + filteredTxns = append(filteredTxns, tx) + + b.filterMtx.Lock() + delete(b.chainFilter, prevOp) + b.filterMtx.Unlock() + + break + } + } + } + + return filteredTxns + } + + decodeJSONBlock := func(block *btcjson.RescannedBlock, + height uint32) (*FilteredBlock, error) { + hash, err := chainhash.NewHashFromStr(block.Hash) + if err != nil { + return nil, err + + } + txs := make([]*wire.MsgTx, 0, len(block.Transactions)) + for _, str := range block.Transactions { + b, err := hex.DecodeString(str) + if err != nil { + return nil, err + } + tx := &wire.MsgTx{} + err = tx.Deserialize(bytes.NewReader(b)) + if err != nil { + return nil, err + } + txs = append(txs, tx) + } + return &FilteredBlock{ + Hash: *hash, + Height: height, + Transactions: txs, + }, nil + } + + for { + select { + // The caller has just sent an update to the current chain + // filter, so we'll apply the update, possibly rewinding our + // state partially. + case update := <-b.filterUpdates: + + // First, we'll add all the new UTXO's to the set of + // watched UTXO's, eliminating any duplicates in the + // process. + log.Debugf("Updating chain filter with new UTXO's: %v", + update.newUtxos) + for _, newOp := range update.newUtxos { + b.filterMtx.Lock() + b.chainFilter[newOp] = struct{}{} + b.filterMtx.Unlock() + } + + // Apply the new TX filter to the chain client, which + // will cause all following notifications from and + // calls to it return blocks filtered with the new + // filter. + b.chainClient.LoadTxFilter(false, []btcutil.Address{}, + update.newUtxos) + + // All blocks gotten after we loaded the filter will + // have the filter applied, but we will need to rescan + // the blocks up to the height of the block we last + // added to the blockQueue. + b.bestHeightMtx.Lock() + bestHeight := b.bestHeight + b.bestHeightMtx.Unlock() + + // If the update height matches our best known height, + // then we don't need to do any rewinding. + if update.updateHeight == bestHeight { + continue + } + + // Otherwise, we'll rewind the state to ensure the + // caller doesn't miss any relevant notifications. + // Starting from the height _after_ the update height, + // we'll walk forwards, rescanning one block at a time + // with the chain client applying the newly loaded + // filter to each block. + for i := update.updateHeight + 1; i < bestHeight+1; i++ { + blockHash, err := b.chainClient.GetBlockHash(int64(i)) + if err != nil { + log.Warnf("Unable to get block hash "+ + "for block at height %d: %v", + i, err) + continue + } + + // To avoid dealing with the case where a reorg + // is happening while we rescan, we scan one + // block at a time, skipping blocks that might + // have gone missing. + rescanned, err := b.chainClient.RescanBlocks( + []chainhash.Hash{*blockHash}) + if err != nil { + log.Warnf("Unable to rescan block "+ + "with hash %v at height %d: %v", + blockHash, i, err) + continue + } + + // If no block was returned from the rescan, it + // means no matching transactions were found. + if len(rescanned) != 1 { + log.Tracef("rescan of block %v at "+ + "height=%d yielded no "+ + "transactions", blockHash, i) + continue + } + decoded, err := decodeJSONBlock( + &rescanned[0], i) + if err != nil { + log.Errorf("Unable to decode block: %v", + err) + continue + } + b.blockQueue.Add(&blockEvent{ + eventType: connected, + block: decoded, + }) + } + + // We've received a new request to manually filter a block. + case req := <-b.filterBlockReqs: + // First we'll fetch the block itself as well as some + // additional information including its height. + block, err := b.chainClient.GetBlock(req.blockHash) + if err != nil { + req.err <- err + req.resp <- nil + continue + } + header, err := b.chainClient.GetBlockHeaderVerbose( + req.blockHash) + if err != nil { + req.err <- err + req.resp <- nil + continue + } + + // Once we have this info, we can directly filter the + // block and dispatch the proper notification. + req.resp <- &FilteredBlock{ + Hash: *req.blockHash, + Height: uint32(header.Height), + Transactions: filterBlock(block), + } + req.err <- err + + // We've received a new event from the chain client. + case event := <-b.chainClient.Notifications(): + switch e := event.(type) { + case chain.FilteredBlockConnected: + b.onFilteredBlockConnected(e.Block.Height, + e.Block.Hash, e.RelevantTxs) + case chain.BlockDisconnected: + b.onFilteredBlockDisconnected(e.Height, e.Hash) + } + + case <-b.quit: + return + } + } +} + +// UpdateFilter updates the UTXO filter which is to be consulted when creating +// FilteredBlocks to be sent to subscribed clients. This method is cumulative +// meaning repeated calls to this method should _expand_ the size of the UTXO +// sub-set currently being watched. If the set updateHeight is _lower_ than +// the best known height of the implementation, then the state should be +// rewound to ensure all relevant notifications are dispatched. +// +// NOTE: This is part of the FilteredChainView interface. +func (b *BitcoindFilteredChainView) UpdateFilter(ops []wire.OutPoint, updateHeight uint32) error { + select { + + case b.filterUpdates <- filterUpdate{ + newUtxos: ops, + updateHeight: updateHeight, + }: + return nil + + case <-b.quit: + return fmt.Errorf("chain filter shutting down") + } +} + +// FilteredBlocks returns the channel that filtered blocks are to be sent over. +// Each time a block is connected to the end of a main chain, and appropriate +// FilteredBlock which contains the transactions which mutate our watched UTXO +// set is to be returned. +// +// NOTE: This is part of the FilteredChainView interface. +func (b *BitcoindFilteredChainView) FilteredBlocks() <-chan *FilteredBlock { + return b.blockQueue.newBlocks +} + +// DisconnectedBlocks returns a receive only channel which will be sent upon +// with the empty filtered blocks of blocks which are disconnected from the +// main chain in the case of a re-org. +// +// NOTE: This is part of the FilteredChainView interface. +func (b *BitcoindFilteredChainView) DisconnectedBlocks() <-chan *FilteredBlock { + return b.blockQueue.staleBlocks +} diff --git a/routing/chainview/interface_test.go b/routing/chainview/interface_test.go index d0b0da07..c8c03c7a 100644 --- a/routing/chainview/interface_test.go +++ b/routing/chainview/interface_test.go @@ -4,13 +4,16 @@ import ( "bytes" "fmt" "io/ioutil" + "math/rand" "os" + "os/exec" "path/filepath" "runtime" "testing" "time" "github.com/lightninglabs/neutrino" + "github.com/ltcsuite/ltcd/btcjson" "github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/chaincfg" "github.com/roasbeef/btcd/chaincfg/chainhash" @@ -25,7 +28,7 @@ import ( ) var ( - netParams = &chaincfg.SimNetParams + netParams = &chaincfg.RegressionNetParams testPrivKey = []byte{ 0x81, 0xb6, 0x37, 0xd8, 0xfc, 0xd2, 0xc6, 0xda, @@ -42,6 +45,39 @@ var ( testScript, _ = txscript.PayToAddrScript(testAddr) ) +func waitForMempoolTx(r *rpctest.Harness, txid *chainhash.Hash) error { + var found bool + var tx *btcutil.Tx + var err error + timeout := time.After(10 * time.Second) + for !found { + // Do a short wait + select { + case <-timeout: + return fmt.Errorf("timeout after 10s") + default: + } + time.Sleep(100 * time.Millisecond) + + // Check for the harness' knowledge of the txid + tx, err = r.Node.GetRawTransaction(txid) + if err != nil { + switch e := err.(type) { + case *btcjson.RPCError: + if e.Code == btcjson.ErrRPCNoTxInfo { + continue + } + default: + } + return err + } + if tx != nil && tx.MsgTx().TxHash() == *txid { + found = true + } + } + return nil +} + func getTestTXID(miner *rpctest.Harness) (*chainhash.Hash, error) { script, err := txscript.PayToAddrScript(testAddr) if err != nil { @@ -131,11 +167,19 @@ func testFilterBlockNotifications(node *rpctest.Harness, // private key that we generated above. txid1, err := getTestTXID(node) if err != nil { - t.Fatalf("unable to get test txid") + t.Fatalf("unable to get test txid: %v", err) + } + err = waitForMempoolTx(node, txid1) + if err != nil { + t.Fatalf("unable to get test txid in mempool: %v", err) } txid2, err := getTestTXID(node) if err != nil { - t.Fatalf("unable to get test txid") + t.Fatalf("unable to get test txid: %v", err) + } + err = waitForMempoolTx(node, txid2) + if err != nil { + t.Fatalf("unable to get test txid in mempool: %v", err) } blockChan := chainView.FilteredBlocks() @@ -218,6 +262,10 @@ func testFilterBlockNotifications(node *rpctest.Harness, if err != nil { t.Fatalf("unable to broadcast transaction: %v", err) } + err = waitForMempoolTx(node, spendTxid1) + if err != nil { + t.Fatalf("unable to get spending txid in mempool: %v", err) + } newBlockHashes, err = node.Node.Generate(1) if err != nil { t.Fatalf("unable to generate block: %v", err) @@ -240,6 +288,10 @@ func testFilterBlockNotifications(node *rpctest.Harness, if err != nil { t.Fatalf("unable to broadcast transaction: %v", err) } + err = waitForMempoolTx(node, spendTxid2) + if err != nil { + t.Fatalf("unable to get spending txid in mempool: %v", err) + } newBlockHashes, err = node.Node.Generate(1) if err != nil { t.Fatalf("unable to generate block: %v", err) @@ -264,6 +316,10 @@ func testUpdateFilterBackTrack(node *rpctest.Harness, if err != nil { t.Fatalf("unable to get test txid") } + err = waitForMempoolTx(node, txid) + if err != nil { + t.Fatalf("unable to get test txid in mempool: %v", err) + } // Next we'll mine a block confirming the output generated above. initBlockHashes, err := node.Node.Generate(1) @@ -306,6 +362,10 @@ func testUpdateFilterBackTrack(node *rpctest.Harness, if err != nil { t.Fatalf("unable to broadcast transaction: %v", err) } + err = waitForMempoolTx(node, spendTxid) + if err != nil { + t.Fatalf("unable to get spending txid in mempool: %v", err) + } newBlockHashes, err := node.Node.Generate(1) if err != nil { t.Fatalf("unable to generate block: %v", err) @@ -352,10 +412,18 @@ func testFilterSingleBlock(node *rpctest.Harness, chainView FilteredChainView, if err != nil { t.Fatalf("unable to get test txid") } + err = waitForMempoolTx(node, txid1) + if err != nil { + t.Fatalf("unable to get test txid in mempool: %v", err) + } txid2, err := getTestTXID(node) if err != nil { t.Fatalf("unable to get test txid") } + err = waitForMempoolTx(node, txid2) + if err != nil { + t.Fatalf("unable to get test txid in mempool: %v", err) + } blockChan := chainView.FilteredBlocks() @@ -671,7 +739,7 @@ var chainViewTests = []testCase{ test: testUpdateFilterBackTrack, }, { - name: "fitler single block", + name: "filter single block", test: testFilterSingleBlock, }, { @@ -684,6 +752,68 @@ var interfaceImpls = []struct { name string chainViewInit chainViewInitFunc }{ + { + name: "bitcoind_zmq", + chainViewInit: func(_ rpcclient.ConnConfig, p2pAddr string) (func(), FilteredChainView, error) { + // Start a bitcoind instance. + tempBitcoindDir, err := ioutil.TempDir("", "bitcoind") + if err != nil { + return nil, nil, err + } + zmqPath := "ipc:///" + tempBitcoindDir + "/weks.socket" + cleanUp1 := func() { + os.RemoveAll(tempBitcoindDir) + } + rpcPort := rand.Int()%(65536-1024) + 1024 + bitcoind := exec.Command( + "bitcoind", + "-datadir="+tempBitcoindDir, + "-regtest", + "-connect="+p2pAddr, + "-txindex", + "-rpcauth=weks:469e9bb14ab2360f8e226efed5ca6f"+ + "d$507c670e800a95284294edb5773b05544b"+ + "220110063096c221be9933c82d38e1", + fmt.Sprintf("-rpcport=%d", rpcPort), + "-disablewallet", + "-zmqpubrawblock="+zmqPath, + "-zmqpubrawtx="+zmqPath, + ) + err = bitcoind.Start() + if err != nil { + cleanUp1() + return nil, nil, err + } + cleanUp2 := func() { + bitcoind.Process.Kill() + bitcoind.Wait() + cleanUp1() + } + + // Wait for the bitcoind instance to start up. + time.Sleep(time.Second) + + // Start the FilteredChainView implementation instance. + config := rpcclient.ConnConfig{ + Host: fmt.Sprintf( + "127.0.0.1:%d", rpcPort), + User: "weks", + Pass: "weks", + DisableAutoReconnect: false, + DisableConnectOnNew: true, + DisableTLS: true, + HTTPPostMode: true, + } + + chainView, err := NewBitcoindFilteredChainView(config, + zmqPath, chaincfg.RegressionNetParams) + if err != nil { + cleanUp2() + return nil, nil, err + } + return cleanUp2, chainView, nil + }, + }, { name: "p2p_neutrino", chainViewInit: func(_ rpcclient.ConnConfig, p2pAddr string) (func(), FilteredChainView, error) {