multi: add bitcoind drivers and tests

This commit is contained in:
Alex 2017-11-09 17:30:20 -07:00 committed by Olaoluwa Osuntokun
parent 244ae4b571
commit 187f59556a
13 changed files with 2138 additions and 241 deletions

@ -0,0 +1,679 @@
package bitcoindnotify
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/roasbeef/btcd/btcjson"
"github.com/roasbeef/btcd/chaincfg"
"github.com/roasbeef/btcd/chaincfg/chainhash"
"github.com/roasbeef/btcd/rpcclient"
"github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil"
"github.com/roasbeef/btcwallet/chain"
)
const (
// notifierType uniquely identifies this concrete implementation of the
// ChainNotifier interface.
notifierType = "bitcoind"
// reorgSafetyLimit is assumed maximum depth of a chain reorganization.
// After this many confirmation, transaction confirmation info will be
// pruned.
reorgSafetyLimit = 100
)
var (
// ErrChainNotifierShuttingDown is used when we are trying to
// measure a spend notification when notifier is already stopped.
ErrChainNotifierShuttingDown = errors.New("chainntnfs: system interrupt " +
"while attempting to register for spend notification.")
)
// chainUpdate encapsulates an update to the current main chain. This struct is
// used as an element within an unbounded queue in order to avoid blocking the
// main rpc dispatch rule.
type chainUpdate struct {
blockHash *chainhash.Hash
blockHeight int32
}
// txUpdate encapsulates a transaction related notification sent from bitcoind
// to the registered RPC client. This struct is used as an element within an
// unbounded queue in order to avoid blocking the main rpc dispatch rule.
type txUpdate struct {
tx *btcutil.Tx
details *btcjson.BlockDetails
}
// TODO(roasbeef): generalize struct below:
// * move chans to config, allow outside callers to handle send conditions
// BitcoindNotifier implements the ChainNotifier interface using a bitcoind
// chain client. Multiple concurrent clients are supported. All notifications
// are achieved via non-blocking sends on client channels.
type BitcoindNotifier struct {
spendClientCounter uint64 // To be used atomically.
epochClientCounter uint64 // To be used atomically.
started int32 // To be used atomically.
stopped int32 // To be used atomically.
heightMtx sync.RWMutex
bestHeight int32
chainConn *chain.BitcoindClient
notificationCancels chan interface{}
notificationRegistry chan interface{}
spendNotifications map[wire.OutPoint]map[uint64]*spendNotification
txConfNotifier *chainntnfs.TxConfNotifier
blockEpochClients map[uint64]*blockEpochRegistration
wg sync.WaitGroup
quit chan struct{}
}
// Ensure BitcoindNotifier implements the ChainNotifier interface at compile
// time.
var _ chainntnfs.ChainNotifier = (*BitcoindNotifier)(nil)
// New returns a new BitcoindNotifier instance. This function assumes the
// bitcoind node detailed in the passed configuration is already running, and
// willing to accept RPC requests and new zmq clients.
func New(config *rpcclient.ConnConfig, zmqConnect string,
params chaincfg.Params) (*BitcoindNotifier, error) {
notifier := &BitcoindNotifier{
notificationCancels: make(chan interface{}),
notificationRegistry: make(chan interface{}),
blockEpochClients: make(map[uint64]*blockEpochRegistration),
spendNotifications: make(map[wire.OutPoint]map[uint64]*spendNotification),
quit: make(chan struct{}),
}
// Disable connecting to bitcoind within the rpcclient.New method. We
// defer establishing the connection to our .Start() method.
config.DisableConnectOnNew = true
config.DisableAutoReconnect = false
chainConn, err := chain.NewBitcoindClient(&params, config.Host,
config.User, config.Pass, zmqConnect, 100*time.Millisecond)
if err != nil {
return nil, err
}
notifier.chainConn = chainConn
return notifier, nil
}
// Start connects to the running bitcoind node over websockets, registers for
// block notifications, and finally launches all related helper goroutines.
func (b *BitcoindNotifier) Start() error {
// Already started?
if atomic.AddInt32(&b.started, 1) != 1 {
return nil
}
// Connect to bitcoind, and register for notifications on connected,
// and disconnected blocks.
if err := b.chainConn.Start(); err != nil {
return err
}
if err := b.chainConn.NotifyBlocks(); err != nil {
return err
}
_, currentHeight, err := b.chainConn.GetBestBlock()
if err != nil {
return err
}
b.heightMtx.Lock()
b.bestHeight = currentHeight
b.heightMtx.Unlock()
b.txConfNotifier = chainntnfs.NewTxConfNotifier(
uint32(currentHeight), reorgSafetyLimit)
b.wg.Add(1)
go b.notificationDispatcher()
return nil
}
// Stop shutsdown the BitcoindNotifier.
func (b *BitcoindNotifier) Stop() error {
// Already shutting down?
if atomic.AddInt32(&b.stopped, 1) != 1 {
return nil
}
// Shutdown the rpc client, this gracefully disconnects from bitcoind,
// and cleans up all related resources.
b.chainConn.Stop()
close(b.quit)
b.wg.Wait()
// Notify all pending clients of our shutdown by closing the related
// notification channels.
for _, spendClients := range b.spendNotifications {
for _, spendClient := range spendClients {
close(spendClient.spendChan)
}
}
for _, epochClient := range b.blockEpochClients {
close(epochClient.epochChan)
}
b.txConfNotifier.TearDown()
return nil
}
// blockNtfn packages a notification of a connected/disconnected block along
// with its height at the time.
type blockNtfn struct {
sha *chainhash.Hash
height int32
}
// notificationDispatcher is the primary goroutine which handles client
// notification registrations, as well as notification dispatches.
func (b *BitcoindNotifier) notificationDispatcher() {
out:
for {
select {
case cancelMsg := <-b.notificationCancels:
switch msg := cancelMsg.(type) {
case *spendCancel:
chainntnfs.Log.Infof("Cancelling spend "+
"notification for out_point=%v, "+
"spend_id=%v", msg.op, msg.spendID)
// Before we attempt to close the spendChan,
// ensure that the notification hasn't already
// yet been dispatched.
if outPointClients, ok := b.spendNotifications[msg.op]; ok {
close(outPointClients[msg.spendID].spendChan)
delete(b.spendNotifications[msg.op], msg.spendID)
}
case *epochCancel:
chainntnfs.Log.Infof("Cancelling epoch "+
"notification, epoch_id=%v", msg.epochID)
// First, close the cancel channel for this
// specific client, and wait for the client to
// exit.
close(b.blockEpochClients[msg.epochID].cancelChan)
b.blockEpochClients[msg.epochID].wg.Wait()
// Once the client has exited, we can then
// safely close the channel used to send epoch
// notifications, in order to notify any
// listeners that the intent has been
// cancelled.
close(b.blockEpochClients[msg.epochID].epochChan)
delete(b.blockEpochClients, msg.epochID)
}
case registerMsg := <-b.notificationRegistry:
switch msg := registerMsg.(type) {
case *spendNotification:
chainntnfs.Log.Infof("New spend subscription: "+
"utxo=%v", msg.targetOutpoint)
op := *msg.targetOutpoint
if _, ok := b.spendNotifications[op]; !ok {
b.spendNotifications[op] = make(map[uint64]*spendNotification)
}
b.spendNotifications[op][msg.spendID] = msg
b.chainConn.NotifySpent([]*wire.OutPoint{&op})
case *confirmationsNotification:
chainntnfs.Log.Infof("New confirmations "+
"subscription: txid=%v, numconfs=%v",
msg.TxID, msg.NumConfirmations)
// Lookup whether the transaction is already included in the
// active chain.
txConf, err := b.historicalConfDetails(msg.TxID)
if err != nil {
chainntnfs.Log.Error(err)
}
b.heightMtx.RLock()
err = b.txConfNotifier.Register(&msg.ConfNtfn, txConf)
if err != nil {
chainntnfs.Log.Error(err)
}
b.heightMtx.RUnlock()
case *blockEpochRegistration:
chainntnfs.Log.Infof("New block epoch subscription")
b.blockEpochClients[msg.epochID] = msg
}
case ntfn := <-b.chainConn.Notifications():
switch item := ntfn.(type) {
case chain.BlockConnected:
b.heightMtx.Lock()
if item.Height != b.bestHeight+1 {
chainntnfs.Log.Warnf("Received blocks out of order: "+
"current height=%d, new height=%d",
b.bestHeight, item.Height)
b.heightMtx.Unlock()
continue
}
b.bestHeight = item.Height
rawBlock, err := b.chainConn.GetBlock(&item.Hash)
if err != nil {
chainntnfs.Log.Errorf("Unable to get block: %v", err)
b.heightMtx.Unlock()
continue
}
chainntnfs.Log.Infof("New block: height=%v, sha=%v",
item.Height, item.Hash)
b.notifyBlockEpochs(item.Height, &item.Hash)
txns := btcutil.NewBlock(rawBlock).Transactions()
err = b.txConfNotifier.ConnectTip(&item.Hash,
uint32(item.Height), txns)
if err != nil {
chainntnfs.Log.Error(err)
}
b.heightMtx.Unlock()
continue
case chain.BlockDisconnected:
b.heightMtx.Lock()
if item.Height != b.bestHeight {
chainntnfs.Log.Warnf("Received blocks "+
"out of order: current height="+
"%d, disconnected height=%d",
b.bestHeight, item.Height)
b.heightMtx.Unlock()
continue
}
b.bestHeight = item.Height - 1
chainntnfs.Log.Infof("Block disconnected from "+
"main chain: height=%v, sha=%v",
item.Height, item.Hash)
err := b.txConfNotifier.DisconnectTip(
uint32(item.Height))
if err != nil {
chainntnfs.Log.Error(err)
}
b.heightMtx.Unlock()
case chain.RelevantTx:
tx := item.TxRecord.MsgTx
// First, check if this transaction spends an output
// that has an existing spend notification for it.
for i, txIn := range tx.TxIn {
prevOut := txIn.PreviousOutPoint
// If this transaction indeed does spend an
// output which we have a registered
// notification for, then create a spend
// summary, finally sending off the details to
// the notification subscriber.
if clients, ok := b.spendNotifications[prevOut]; ok {
spenderSha := tx.TxHash()
spendDetails := &chainntnfs.SpendDetail{
SpentOutPoint: &prevOut,
SpenderTxHash: &spenderSha,
SpendingTx: &tx,
SpenderInputIndex: uint32(i),
}
// TODO(roasbeef): after change to
// loadfilter, only notify on block
// inclusion?
if item.Block != nil {
spendDetails.SpendingHeight = item.Block.Height
} else {
b.heightMtx.RLock()
spendDetails.SpendingHeight = b.bestHeight + 1
b.heightMtx.RUnlock()
}
for _, ntfn := range clients {
chainntnfs.Log.Infof("Dispatching "+
"spend notification for "+
"outpoint=%v", ntfn.targetOutpoint)
ntfn.spendChan <- spendDetails
// Close spendChan to ensure that any calls to Cancel will not
// block. This is safe to do since the channel is buffered, and the
// message can still be read by the receiver.
close(ntfn.spendChan)
}
delete(b.spendNotifications, prevOut)
}
}
}
case <-b.quit:
break out
}
}
b.wg.Done()
}
// historicalConfDetails looks up whether a transaction is already included in a
// block in the active chain and, if so, returns details about the confirmation.
func (b *BitcoindNotifier) historicalConfDetails(txid *chainhash.Hash,
) (*chainntnfs.TxConfirmation, error) {
// If the transaction already has some or all of the confirmations,
// then we may be able to dispatch it immediately.
// TODO: fall back to scanning blocks if txindex isn't on.
tx, err := b.chainConn.GetRawTransactionVerbose(txid)
if err != nil || tx == nil || tx.BlockHash == "" {
if err == nil {
return nil, nil
}
// Do not return an error if the transaction was not found.
if jsonErr, ok := err.(*btcjson.RPCError); ok {
if jsonErr.Code == btcjson.ErrRPCNoTxInfo {
return nil, nil
}
}
return nil, fmt.Errorf("unable to query for txid(%v): %v", txid, err)
}
// As we need to fully populate the returned TxConfirmation struct,
// grab the block in which the transaction was confirmed so we can
// locate its exact index within the block.
blockHash, err := chainhash.NewHashFromStr(tx.BlockHash)
if err != nil {
return nil, fmt.Errorf("unable to get block hash %v for historical "+
"dispatch: %v", tx.BlockHash, err)
}
block, err := b.chainConn.GetBlockVerbose(blockHash)
if err != nil {
return nil, fmt.Errorf("unable to get block hash: %v", err)
}
// If the block obtained, locate the transaction's index within the
// block so we can give the subscriber full confirmation details.
txIndex := -1
targetTxidStr := txid.String()
for i, txHash := range block.Tx {
if txHash == targetTxidStr {
txIndex = i
break
}
}
if txIndex == -1 {
return nil, fmt.Errorf("unable to locate tx %v in block %v",
txid, blockHash)
}
txConf := chainntnfs.TxConfirmation{
BlockHash: blockHash,
BlockHeight: uint32(block.Height),
TxIndex: uint32(txIndex),
}
return &txConf, nil
}
// notifyBlockEpochs notifies all registered block epoch clients of the newly
// connected block to the main chain.
func (b *BitcoindNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) {
epoch := &chainntnfs.BlockEpoch{
Height: newHeight,
Hash: newSha,
}
for _, epochClient := range b.blockEpochClients {
b.wg.Add(1)
epochClient.wg.Add(1)
go func(ntfnChan chan *chainntnfs.BlockEpoch, cancelChan chan struct{},
clientWg *sync.WaitGroup) {
// TODO(roasbeef): move to goroutine per client, use sync queue
defer clientWg.Done()
defer b.wg.Done()
select {
case ntfnChan <- epoch:
case <-cancelChan:
return
case <-b.quit:
return
}
}(epochClient.epochChan, epochClient.cancelChan, &epochClient.wg)
}
}
// spendNotification couples a target outpoint along with the channel used for
// notifications once a spend of the outpoint has been detected.
type spendNotification struct {
targetOutpoint *wire.OutPoint
spendChan chan *chainntnfs.SpendDetail
spendID uint64
}
// spendCancel is a message sent to the BitcoindNotifier when a client wishes
// to cancel an outstanding spend notification that has yet to be dispatched.
type spendCancel struct {
// op is the target outpoint of the notification to be cancelled.
op wire.OutPoint
// spendID the ID of the notification to cancel.
spendID uint64
}
// RegisterSpendNtfn registers an intent to be notified once the target
// outpoint has been spent by a transaction on-chain. Once a spend of the target
// outpoint has been detected, the details of the spending event will be sent
// across the 'Spend' channel.
func (b *BitcoindNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
_ uint32) (*chainntnfs.SpendEvent, error) {
if err := b.chainConn.NotifySpent([]*wire.OutPoint{outpoint}); err != nil {
return nil, err
}
ntfn := &spendNotification{
targetOutpoint: outpoint,
spendChan: make(chan *chainntnfs.SpendDetail, 1),
spendID: atomic.AddUint64(&b.spendClientCounter, 1),
}
select {
case <-b.quit:
return nil, ErrChainNotifierShuttingDown
case b.notificationRegistry <- ntfn:
}
// The following conditional checks to ensure that when a spend notification
// is registered, the output hasn't already been spent. If the output
// is no longer in the UTXO set, the chain will be rescanned from the point
// where the output was added. The rescan will dispatch the notification.
txout, err := b.chainConn.GetTxOut(&outpoint.Hash, outpoint.Index, true)
if err != nil {
return nil, err
}
if txout == nil {
// TODO: fall back to scanning blocks if txindex isn't on.
transaction, err := b.chainConn.GetRawTransactionVerbose(&outpoint.Hash)
if err != nil {
jsonErr, ok := err.(*btcjson.RPCError)
if !ok || jsonErr.Code != btcjson.ErrRPCNoTxInfo {
return nil, err
}
}
if transaction != nil {
blockhash, err := chainhash.NewHashFromStr(transaction.BlockHash)
if err != nil {
return nil, err
}
// Rewind the rescan, since the btcwallet bitcoind
// back-end doesn't support that.
blockHeight, err := b.chainConn.GetBlockHeight(blockhash)
if err != nil {
return nil, err
}
b.heightMtx.Lock()
currentHeight := b.bestHeight
b.bestHeight = blockHeight
for i := currentHeight; i > blockHeight; i-- {
err = b.txConfNotifier.DisconnectTip(uint32(i))
if err != nil {
return nil, err
}
}
b.heightMtx.Unlock()
ops := []*wire.OutPoint{outpoint}
if err := b.chainConn.Rescan(blockhash, nil, ops); err != nil {
chainntnfs.Log.Errorf("Rescan for spend "+
"notification txout failed: %v", err)
return nil, err
}
}
}
return &chainntnfs.SpendEvent{
Spend: ntfn.spendChan,
Cancel: func() {
cancel := &spendCancel{
op: *outpoint,
spendID: ntfn.spendID,
}
// Submit spend cancellation to notification dispatcher.
select {
case b.notificationCancels <- cancel:
// Cancellation is being handled, drain the spend chan until it is
// closed before yielding to the caller.
for {
select {
case _, ok := <-ntfn.spendChan:
if !ok {
return
}
case <-b.quit:
return
}
}
case <-b.quit:
}
},
}, nil
}
// confirmationNotification represents a client's intent to receive a
// notification once the target txid reaches numConfirmations confirmations.
type confirmationsNotification struct {
chainntnfs.ConfNtfn
}
// RegisterConfirmationsNtfn registers a notification with BitcoindNotifier
// which will be triggered once the txid reaches numConfs number of
// confirmations.
func (b *BitcoindNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
numConfs, _ uint32) (*chainntnfs.ConfirmationEvent, error) {
ntfn := &confirmationsNotification{
chainntnfs.ConfNtfn{
TxID: txid,
NumConfirmations: numConfs,
Event: chainntnfs.NewConfirmationEvent(),
},
}
select {
case <-b.quit:
return nil, ErrChainNotifierShuttingDown
case b.notificationRegistry <- ntfn:
return ntfn.Event, nil
}
}
// blockEpochRegistration represents a client's intent to receive a
// notification with each newly connected block.
type blockEpochRegistration struct {
epochID uint64
epochChan chan *chainntnfs.BlockEpoch
cancelChan chan struct{}
wg sync.WaitGroup
}
// epochCancel is a message sent to the BitcoindNotifier when a client wishes
// to cancel an outstanding epoch notification that has yet to be dispatched.
type epochCancel struct {
epochID uint64
}
// RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the
// caller to receive notifications, of each new block connected to the main
// chain.
func (b *BitcoindNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) {
registration := &blockEpochRegistration{
epochChan: make(chan *chainntnfs.BlockEpoch, 20),
cancelChan: make(chan struct{}),
epochID: atomic.AddUint64(&b.epochClientCounter, 1),
}
select {
case <-b.quit:
return nil, errors.New("chainntnfs: system interrupt while " +
"attempting to register for block epoch notification.")
case b.notificationRegistry <- registration:
return &chainntnfs.BlockEpochEvent{
Epochs: registration.epochChan,
Cancel: func() {
cancel := &epochCancel{
epochID: registration.epochID,
}
// Submit epoch cancellation to notification dispatcher.
select {
case b.notificationCancels <- cancel:
// Cancellation is being handled, drain the epoch channel until it is
// closed before yielding to caller.
for {
select {
case _, ok := <-registration.epochChan:
if !ok {
return
}
case <-b.quit:
return
}
}
case <-b.quit:
}
},
}, nil
}
}

@ -0,0 +1,53 @@
package bitcoindnotify
import (
"fmt"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/roasbeef/btcd/chaincfg"
"github.com/roasbeef/btcd/rpcclient"
)
// createNewNotifier creates a new instance of the ChainNotifier interface
// implemented by BitcoindNotifier.
func createNewNotifier(args ...interface{}) (chainntnfs.ChainNotifier, error) {
if len(args) != 3 {
return nil, fmt.Errorf("incorrect number of arguments to "+
".New(...), expected 3, instead passed %v", len(args))
}
config, ok := args[0].(*rpcclient.ConnConfig)
if !ok {
return nil, fmt.Errorf("first argument to bitcoindnotifier." +
"New is incorrect, expected a *rpcclient.ConnConfig")
}
zmqConnect, ok := args[1].(string)
if !ok {
return nil, fmt.Errorf("second argument to bitcoindnotifier." +
"New is incorrect, expected a string")
}
params, ok := args[2].(chaincfg.Params)
if !ok {
return nil, fmt.Errorf("third argument to bitcoindnotifier." +
"New is incorrect, expected a chaincfg.Params")
}
return New(config, zmqConnect, params)
}
// init registers a driver for the BtcdNotifier concrete implementation of the
// chainntnfs.ChainNotifier interface.
func init() {
// Register the driver.
notifier := &chainntnfs.NotifierDriver{
NotifierType: notifierType,
New: createNewNotifier,
}
if err := chainntnfs.RegisterNotifier(notifier); err != nil {
panic(fmt.Sprintf("failed to register notifier driver '%s': %v",
notifierType, err))
}
}

@ -5,7 +5,9 @@ import (
"fmt"
"io/ioutil"
"log"
"math/rand"
"os"
"os/exec"
"path/filepath"
"sync"
"testing"
@ -13,6 +15,7 @@ import (
"github.com/lightninglabs/neutrino"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/ltcsuite/ltcd/btcjson"
"github.com/roasbeef/btcd/chaincfg/chainhash"
"github.com/roasbeef/btcwallet/walletdb"
@ -24,6 +27,10 @@ import (
"github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil"
// Required to auto-register the bitcoind backed ChainNotifier
// implementation.
_ "github.com/lightningnetwork/lnd/chainntnfs/bitcoindnotify"
// Required to auto-register the btcd backed ChainNotifier
// implementation.
_ "github.com/lightningnetwork/lnd/chainntnfs/btcdnotify"
@ -32,7 +39,8 @@ import (
// implementation.
_ "github.com/lightningnetwork/lnd/chainntnfs/neutrinonotify"
_ "github.com/roasbeef/btcwallet/walletdb/bdb" // Required to register the boltdb walletdb implementation.
// Required to register the boltdb walletdb implementation.
_ "github.com/roasbeef/btcwallet/walletdb/bdb"
)
var (
@ -43,7 +51,7 @@ var (
0x1e, 0xb, 0x4c, 0xfd, 0x9e, 0xc5, 0x8c, 0xe9,
}
netParams = &chaincfg.SimNetParams
netParams = &chaincfg.RegressionNetParams
privKey, pubKey = btcec.PrivKeyFromBytes(btcec.S256(), testPrivKey)
addrPk, _ = btcutil.NewAddressPubKey(pubKey.SerializeCompressed(),
netParams)
@ -65,6 +73,39 @@ func getTestTxId(miner *rpctest.Harness) (*chainhash.Hash, error) {
return miner.SendOutputs(outputs, 10)
}
func waitForMempoolTx(r *rpctest.Harness, txid *chainhash.Hash) error {
var found bool
var tx *btcutil.Tx
var err error
timeout := time.After(10 * time.Second)
for !found {
// Do a short wait
select {
case <-timeout:
return fmt.Errorf("timeout after 10s")
default:
}
time.Sleep(100 * time.Millisecond)
// Check for the harness' knowledge of the txid
tx, err = r.Node.GetRawTransaction(txid)
if err != nil {
switch e := err.(type) {
case *btcjson.RPCError:
if e.Code == btcjson.ErrRPCNoTxInfo {
continue
}
default:
}
return err
}
if tx != nil && tx.MsgTx().TxHash() == *txid {
found = true
}
}
return nil
}
func testSingleConfirmationNotification(miner *rpctest.Harness,
notifier chainntnfs.ChainNotifier, t *testing.T) {
@ -80,6 +121,11 @@ func testSingleConfirmationNotification(miner *rpctest.Harness,
t.Fatalf("unable to create test tx: %v", err)
}
err = waitForMempoolTx(miner, txid)
if err != nil {
t.Fatalf("tx not relayed to miner: %v", err)
}
_, currentHeight, err := miner.Node.GetBestBlock()
if err != nil {
t.Fatalf("unable to get current height: %v", err)
@ -143,6 +189,11 @@ func testMultiConfirmationNotification(miner *rpctest.Harness,
t.Fatalf("unable to create test addr: %v", err)
}
err = waitForMempoolTx(miner, txid)
if err != nil {
t.Fatalf("tx not relayed to miner: %v", err)
}
_, currentHeight, err := miner.Node.GetBestBlock()
if err != nil {
t.Fatalf("unable to get current height: %v", err)
@ -201,6 +252,11 @@ func testBatchConfirmationNotification(miner *rpctest.Harness,
t.Fatalf("unable to register ntfn: %v", err)
}
confIntents[i] = confIntent
err = waitForMempoolTx(miner, txid)
if err != nil {
t.Fatalf("tx not relayed to miner: %v", err)
}
}
initialConfHeight := uint32(currentHeight + 1)
@ -252,6 +308,11 @@ func createSpendableOutput(miner *rpctest.Harness,
t.Fatalf("unable to create test addr: %v", err)
}
err = waitForMempoolTx(miner, txid)
if err != nil {
t.Fatalf("tx not relayed to miner: %v", err)
}
// Mine a single block which should include that txid above.
if _, err := miner.Node.Generate(1); err != nil {
t.Fatalf("unable to generate single block: %v", err)
@ -342,6 +403,11 @@ func testSpendNotification(miner *rpctest.Harness,
t.Fatalf("unable to broadcast tx: %v", err)
}
err = waitForMempoolTx(miner, spenderSha)
if err != nil {
t.Fatalf("tx not relayed to miner: %v", err)
}
// Now we mine a single block, which should include our spend. The
// notification should also be sent off.
if _, err := miner.Node.Generate(1); err != nil {
@ -453,6 +519,11 @@ func testMultiClientConfirmationNotification(miner *rpctest.Harness,
t.Fatalf("unable to create test tx: %v", err)
}
err = waitForMempoolTx(miner, txid)
if err != nil {
t.Fatalf("tx not relayed to miner: %v", err)
}
var wg sync.WaitGroup
const (
numConfsClients = 5
@ -514,6 +585,11 @@ func testTxConfirmedBeforeNtfnRegistration(miner *rpctest.Harness,
t.Fatalf("unable to create test tx: %v", err)
}
err = waitForMempoolTx(miner, txid3)
if err != nil {
t.Fatalf("tx not relayed to miner: %v", err)
}
// Generate another block containing tx 3, but we won't register conf
// notifications for this tx until much later. The notifier must check
// older blocks when the confirmation event is registered below to ensure
@ -529,11 +605,21 @@ func testTxConfirmedBeforeNtfnRegistration(miner *rpctest.Harness,
t.Fatalf("unable to create test tx: %v", err)
}
err = waitForMempoolTx(miner, txid1)
if err != nil {
t.Fatalf("tx not relayed to miner: %v", err)
}
txid2, err := getTestTxId(miner)
if err != nil {
t.Fatalf("unable to create test tx: %v", err)
}
err = waitForMempoolTx(miner, txid2)
if err != nil {
t.Fatalf("tx not relayed to miner: %v", err)
}
_, currentHeight, err := miner.Node.GetBestBlock()
if err != nil {
t.Fatalf("unable to get current height: %v", err)
@ -654,6 +740,11 @@ func testLazyNtfnConsumer(miner *rpctest.Harness,
t.Fatalf("unable to create test tx: %v", err)
}
err = waitForMempoolTx(miner, txid)
if err != nil {
t.Fatalf("tx not relayed to miner: %v", err)
}
_, currentHeight, err := miner.Node.GetBestBlock()
if err != nil {
t.Fatalf("unable to get current height: %v", err)
@ -686,6 +777,11 @@ func testLazyNtfnConsumer(miner *rpctest.Harness,
t.Fatalf("unable to create test tx: %v", err)
}
err = waitForMempoolTx(miner, txid)
if err != nil {
t.Fatalf("tx not relayed to miner: %v", err)
}
_, currentHeight, err = miner.Node.GetBestBlock()
if err != nil {
t.Fatalf("unable to get current height: %v", err)
@ -736,6 +832,11 @@ func testSpendBeforeNtfnRegistration(miner *rpctest.Harness,
t.Fatalf("unable to create test addr: %v", err)
}
err = waitForMempoolTx(miner, txid)
if err != nil {
t.Fatalf("tx not relayed to miner: %v", err)
}
// Mine a single block which should include that txid above.
if _, err := miner.Node.Generate(1); err != nil {
t.Fatalf("unable to generate single block: %v", err)
@ -789,6 +890,11 @@ func testSpendBeforeNtfnRegistration(miner *rpctest.Harness,
t.Fatalf("unable to brodacst tx: %v", err)
}
err = waitForMempoolTx(miner, spenderSha)
if err != nil {
t.Fatalf("tx not relayed to miner: %v", err)
}
// Now we mine an additional block, which should include our spend.
if _, err := miner.Node.Generate(1); err != nil {
t.Fatalf("unable to generate single block: %v", err)
@ -877,6 +983,11 @@ func testCancelSpendNtfn(node *rpctest.Harness,
t.Fatalf("unable to brodacst tx: %v", err)
}
err = waitForMempoolTx(node, spenderSha)
if err != nil {
t.Fatalf("tx not relayed to miner: %v", err)
}
// Now we mine a single block, which should include our spend. The
// notification should also be sent off.
if _, err := node.Node.Generate(1); err != nil {
@ -1021,6 +1132,11 @@ func testReorgConf(miner *rpctest.Harness, notifier chainntnfs.ChainNotifier,
t.Fatalf("unable to create test tx: %v", err)
}
err = waitForMempoolTx(miner, txid)
if err != nil {
t.Fatalf("tx not relayed to miner: %v", err)
}
_, currentHeight, err := miner.Node.GetBestBlock()
if err != nil {
t.Fatalf("unable to get current height: %v", err)
@ -1094,11 +1210,16 @@ func testReorgConf(miner *rpctest.Harness, notifier chainntnfs.ChainNotifier,
t.Fatalf("unable to get raw tx: %v", err)
}
_, err = miner2.Node.SendRawTransaction(tx.MsgTx(), false)
txid, err = miner2.Node.SendRawTransaction(tx.MsgTx(), false)
if err != nil {
t.Fatalf("unable to get send tx: %v", err)
}
err = waitForMempoolTx(miner, txid)
if err != nil {
t.Fatalf("tx not relayed to miner: %v", err)
}
_, err = miner.Node.Generate(3)
if err != nil {
t.Fatalf("unable to generate single block: %v", err)
@ -1206,12 +1327,73 @@ func TestInterfaces(t *testing.T) {
switch notifierType {
case "bitcoind":
// Start a bitcoind instance.
tempBitcoindDir, err := ioutil.TempDir("", "bitcoind")
if err != nil {
t.Fatalf("Unable to create temp dir: %v", err)
}
zmqPath := "ipc:///" + tempBitcoindDir + "/weks.socket"
cleanUp1 := func() {
os.RemoveAll(tempBitcoindDir)
}
cleanUp = cleanUp1
rpcPort := rand.Int()%(65536-1024) + 1024
bitcoind := exec.Command(
"bitcoind",
"-datadir="+tempBitcoindDir,
"-regtest",
"-connect="+p2pAddr,
"-txindex",
"-rpcauth=weks:469e9bb14ab2360f8e226efed5ca6f"+
"d$507c670e800a95284294edb5773b05544b"+
"220110063096c221be9933c82d38e1",
fmt.Sprintf("-rpcport=%d", rpcPort),
"-disablewallet",
"-zmqpubrawblock="+zmqPath,
"-zmqpubrawtx="+zmqPath,
)
err = bitcoind.Start()
if err != nil {
cleanUp1()
t.Fatalf("Couldn't start bitcoind: %v", err)
}
cleanUp2 := func() {
bitcoind.Process.Kill()
bitcoind.Wait()
cleanUp1()
}
cleanUp = cleanUp2
// Wait for the bitcoind instance to start up.
time.Sleep(time.Second)
// Start the FilteredChainView implementation instance.
config := rpcclient.ConnConfig{
Host: fmt.Sprintf(
"127.0.0.1:%d", rpcPort),
User: "weks",
Pass: "weks",
DisableAutoReconnect: false,
DisableConnectOnNew: true,
DisableTLS: true,
HTTPPostMode: true,
}
notifier, err = notifierDriver.New(&config, zmqPath,
*netParams)
if err != nil {
t.Fatalf("unable to create %v notifier: %v",
notifierType, err)
}
case "btcd":
notifier, err = notifierDriver.New(&rpcConfig)
if err != nil {
t.Fatalf("unable to create %v notifier: %v",
notifierType, err)
}
cleanUp = func() {}
case "neutrino":
spvDir, err := ioutil.TempDir("", "neutrino")

6
glide.lock generated

@ -1,5 +1,5 @@
hash: 8438e391bed32638a8e14402992af50b7656b380f69741cf8422584c2e1f2b31
updated: 2017-12-13T15:13:22.311098343-08:00
hash: d145c16f2f9cfdf4937eb8b7cdd65919a8c351593a179acc23f2cbca5b42f34b
updated: 2017-12-22T23:45:25.148488338-07:00
imports:
- name: github.com/aead/chacha20
version: d31a916ded42d1640b9d89a26f8abd53cc96790c
@ -86,6 +86,8 @@ imports:
version: 946bd9fbed05568b0f3cd188353d8aa28f38b688
subpackages:
- internal/socket
- name: github.com/pebbe/zmq4
version: 90d69e412a09549f2e90bac70fbb449081f1e5c1
- name: github.com/roasbeef/btcd
version: 9978b939c33973be19b932fa7b936079bb7ba38d
subpackages:

@ -24,6 +24,7 @@ import:
- txscript
- wire
- connmgr
- package: github.com/pebbe/zmq4
- package: github.com/roasbeef/btcrpcclient
version: d0f4db8b4dad0ca3d569b804f21247c3dd96acbb
- package: github.com/roasbeef/btcutil

@ -29,23 +29,7 @@ var (
//
// This method is a part of the lnwallet.BlockChainIO interface.
func (b *BtcWallet) GetBestBlock() (*chainhash.Hash, int32, error) {
switch backend := b.chain.(type) {
case *chain.NeutrinoClient:
header, height, err := backend.CS.BlockHeaders.ChainTip()
if err != nil {
return nil, -1, err
}
blockHash := header.BlockHash()
return &blockHash, int32(height), nil
case *chain.RPCClient:
return backend.GetBestBlock()
default:
return nil, -1, fmt.Errorf("unknown backend")
}
return b.chain.GetBestBlock()
}
// GetUtxo returns the original output referenced by the passed outpoint.
@ -100,6 +84,26 @@ func (b *BtcWallet) GetUtxo(op *wire.OutPoint, heightHint uint32) (*wire.TxOut,
PkScript: pkScript,
}, nil
case *chain.BitcoindClient:
txout, err := backend.GetTxOut(&op.Hash, op.Index, false)
if err != nil {
return nil, err
} else if txout == nil {
return nil, ErrOutputSpent
}
pkScript, err := hex.DecodeString(txout.ScriptPubKey.Hex)
if err != nil {
return nil, err
}
return &wire.TxOut{
// Sadly, gettxout returns the output value in BTC
// instead of satoshis.
Value: int64(txout.Value * 1e8),
PkScript: pkScript,
}, nil
default:
return nil, fmt.Errorf("unknown backend")
}
@ -109,27 +113,7 @@ func (b *BtcWallet) GetUtxo(op *wire.OutPoint, heightHint uint32) (*wire.TxOut,
//
// This method is a part of the lnwallet.BlockChainIO interface.
func (b *BtcWallet) GetBlock(blockHash *chainhash.Hash) (*wire.MsgBlock, error) {
switch backend := b.chain.(type) {
case *chain.NeutrinoClient:
block, err := backend.CS.GetBlockFromNetwork(*blockHash)
if err != nil {
return nil, err
}
return block.MsgBlock(), nil
case *chain.RPCClient:
block, err := backend.GetBlock(blockHash)
if err != nil {
return nil, err
}
return block, nil
default:
return nil, fmt.Errorf("unknown backend")
}
return b.chain.GetBlock(blockHash)
}
// GetBlockHash returns the hash of the block in the best blockchain at the
@ -137,29 +121,7 @@ func (b *BtcWallet) GetBlock(blockHash *chainhash.Hash) (*wire.MsgBlock, error)
//
// This method is a part of the lnwallet.BlockChainIO interface.
func (b *BtcWallet) GetBlockHash(blockHeight int64) (*chainhash.Hash, error) {
switch backend := b.chain.(type) {
case *chain.NeutrinoClient:
height := uint32(blockHeight)
blockHeader, err := backend.CS.BlockHeaders.FetchHeaderByHeight(height)
if err != nil {
return nil, err
}
blockHash := blockHeader.BlockHash()
return &blockHash, nil
case *chain.RPCClient:
blockHash, err := backend.GetBlockHash(blockHeight)
if err != nil {
return nil, err
}
return blockHash, nil
default:
return nil, fmt.Errorf("unknown backend")
}
return b.chain.GetBlockHash(blockHeight)
}
// A compile time check to ensure that BtcWallet implements the BlockChainIO

@ -119,6 +119,17 @@ func New(cfg Config) (*BtcWallet, error) {
}, nil
}
// BackEnd returns the underlying ChainService's name as a string.
//
// This is a part of the WalletController interface.
func (b *BtcWallet) BackEnd() string {
if b.chain != nil {
return b.chain.BackEnd()
}
return ""
}
// Start initializes the underlying rpc connection, the wallet itself, and
// begins syncing to the current available blockchain state.
//
@ -668,22 +679,9 @@ func (b *BtcWallet) IsSynced() (bool, error) {
// Next, query the chain backend to grab the info about the tip of the
// main chain.
switch backend := b.cfg.ChainSource.(type) {
case *chain.NeutrinoClient:
header, height, err := backend.CS.BlockHeaders.ChainTip()
if err != nil {
return false, err
}
bh := header.BlockHash()
bestHash = &bh
bestHeight = int32(height)
case *chain.RPCClient:
bestHash, bestHeight, err = backend.GetBestBlock()
if err != nil {
return false, err
}
bestHash, bestHeight, err = b.cfg.ChainSource.GetBestBlock()
if err != nil {
return false, err
}
// If the wallet hasn't yet fully synced to the node's best chain tip,
@ -696,21 +694,9 @@ func (b *BtcWallet) IsSynced() (bool, error) {
// still may not yet be synced as the chain backend may still be
// catching up to the main chain. So we'll grab the block header in
// order to make a guess based on the current time stamp.
var blockHeader *wire.BlockHeader
switch backend := b.cfg.ChainSource.(type) {
case *chain.NeutrinoClient:
bh, _, err := backend.CS.BlockHeaders.FetchHeader(bestHash)
if err != nil {
return false, err
}
blockHeader = bh
case *chain.RPCClient:
blockHeader, err = backend.GetBlockHeader(bestHash)
if err != nil {
return false, err
}
blockHeader, err := b.cfg.ChainSource.GetBlockHeader(bestHash)
if err != nil {
return false, err
}
// If the timestamp no the best header is more than 2 hours in the

@ -4,6 +4,7 @@ import (
"fmt"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/roasbeef/btcwallet/chain"
)
const (
@ -36,6 +37,7 @@ func init() {
driver := &lnwallet.WalletDriver{
WalletType: walletType,
New: createNewWallet,
BackEnds: chain.BackEnds,
}
if err := lnwallet.RegisterWallet(driver); err != nil {

@ -1,6 +1,8 @@
package lnwallet
import (
"encoding/json"
"github.com/roasbeef/btcd/blockchain"
"github.com/roasbeef/btcd/rpcclient"
"github.com/roasbeef/btcutil"
@ -200,3 +202,145 @@ func (b *BtcdFeeEstimator) fetchEstimatePerByte(confTarget uint32) (btcutil.Amou
// A compile-time assertion to ensure that BtcdFeeEstimator implements the
// FeeEstimator interface.
var _ FeeEstimator = (*BtcdFeeEstimator)(nil)
// BitcoindFeeEstimator is an implementation of the FeeEstimator interface
// backed by the RPC interface of an active bitcoind node. This implementation
// will proxy any fee estimation requests to bitcoind's RPC interace.
type BitcoindFeeEstimator struct {
// fallBackFeeRate is the fall back fee rate in satoshis per byte that
// is returned if the fee estimator does not yet have enough data to
// actually produce fee estimates.
fallBackFeeRate btcutil.Amount
bitcoindConn *rpcclient.Client
}
// NewBitcoindFeeEstimator creates a new BitcoindFeeEstimator given a fully
// populated rpc config that is able to successfully connect and authenticate
// with the bitcoind node, and also a fall back fee rate. The fallback fee rate
// is used in the occasion that the estimator has insufficient data, or returns
// zero for a fee estimate.
func NewBitcoindFeeEstimator(rpcConfig rpcclient.ConnConfig,
fallBackFeeRate btcutil.Amount) (*BitcoindFeeEstimator, error) {
rpcConfig.DisableConnectOnNew = true
rpcConfig.DisableAutoReconnect = false
rpcConfig.DisableTLS = true
rpcConfig.HTTPPostMode = true
chainConn, err := rpcclient.New(&rpcConfig, nil)
if err != nil {
return nil, err
}
return &BitcoindFeeEstimator{
fallBackFeeRate: fallBackFeeRate,
bitcoindConn: chainConn,
}, nil
}
// Start signals the FeeEstimator to start any processes or goroutines
// it needs to perform its duty.
//
// NOTE: This method is part of the FeeEstimator interface.
func (b *BitcoindFeeEstimator) Start() error {
return nil
}
// Stop stops any spawned goroutines and cleans up the resources used
// by the fee estimator.
//
// NOTE: This method is part of the FeeEstimator interface.
func (b *BitcoindFeeEstimator) Stop() error {
return nil
}
// EstimateFeePerByte takes in a target for the number of blocks until an
// initial confirmation and returns the estimated fee expressed in
// satoshis/byte.
func (b *BitcoindFeeEstimator) EstimateFeePerByte(numBlocks uint32) (btcutil.Amount, error) {
feeEstimate, err := b.fetchEstimatePerByte(numBlocks)
switch {
// If the estimator doesn't have enough data, or returns an error, then
// to return a proper value, then we'll return the default fall back
// fee rate.
case err != nil:
walletLog.Errorf("unable to query estimator: %v", err)
fallthrough
case feeEstimate == 0:
return b.fallBackFeeRate, nil
}
return feeEstimate, nil
}
// EstimateFeePerWeight takes in a target for the number of blocks until an
// initial confirmation and returns the estimated fee expressed in
// satoshis/weight.
func (b *BitcoindFeeEstimator) EstimateFeePerWeight(numBlocks uint32) (btcutil.Amount, error) {
feePerByte, err := b.EstimateFeePerByte(numBlocks)
if err != nil {
return 0, err
}
// We'll scale down the fee per byte to fee per weight, as for each raw
// byte, there's 1/4 unit of weight mapped to it.
satWeight := feePerByte / blockchain.WitnessScaleFactor
// If this ends up scaling down to a zero sat/weight amount, then we'll
// use the default fallback fee rate.
// TODO(aakselrod): maybe use the per-byte rate if it's non-zero?
// Otherwise, we can return a higher sat/byte than sat/weight.
if satWeight == 0 {
return b.fallBackFeeRate / blockchain.WitnessScaleFactor, nil
}
return satWeight, nil
}
// fetchEstimate returns a fee estimate for a transaction be be confirmed in
// confTarget blocks. The estimate is returned in sat/byte.
func (b *BitcoindFeeEstimator) fetchEstimatePerByte(confTarget uint32) (btcutil.Amount, error) {
// First, we'll send an "estimatesmartfee" command as a raw request,
// since it isn't supported by btcd but is available in bitcoind.
target, err := json.Marshal(uint64(confTarget))
if err != nil {
return 0, err
}
// TODO: Allow selection of economical/conservative modifiers.
resp, err := b.bitcoindConn.RawRequest("estimatesmartfee",
[]json.RawMessage{target})
if err != nil {
return 0, err
}
// Next, we'll parse the response to get the BTC per KB.
feeEstimate := struct {
Feerate float64 `json:"feerate"`
}{}
err = json.Unmarshal(resp, &feeEstimate)
if err != nil {
return 0, err
}
// Next, we'll convert the returned value to satoshis, as it's
// currently returned in BTC.
satPerKB, err := btcutil.NewAmount(feeEstimate.Feerate)
if err != nil {
return 0, err
}
// The value returned is expressed in fees per KB, while we want
// fee-per-byte, so we'll divide by 1024 to map to satoshis-per-byte
// before returning the estimate.
satPerByte := satPerKB / 1024
walletLog.Debugf("Returning %v sat/byte for conf target of %v",
int64(satPerByte), confTarget)
return satPerByte, nil
}
// A compile-time assertion to ensure that BitcoindFeeEstimator implements the
// FeeEstimator interface.
var _ FeeEstimator = (*BitcoindFeeEstimator)(nil)

@ -207,6 +207,11 @@ type WalletController interface {
// Stop signals the wallet for shutdown. Shutdown may entail closing
// any active sockets, database handles, stopping goroutines, etc.
Stop() error
// BackEnd returns a name for the wallet's backing chain service,
// which could be e.g. btcd, bitcoind, neutrino, or another consensus
// service.
BackEnd() string
}
// BlockChainIO is a dedicated source which will be used to obtain queries
@ -288,6 +293,10 @@ type WalletDriver struct {
// initialization flexibility, thereby accommodating several potential
// WalletController implementations.
New func(args ...interface{}) (WalletController, error)
// BackEnds returns a list of available chain service drivers for the
// wallet driver. This could be e.g. bitcoind, btcd, neutrino, etc.
BackEnds func() []string
}
var (

@ -5,8 +5,10 @@ import (
"encoding/hex"
"fmt"
"io/ioutil"
"math/rand"
"net"
"os"
"os/exec"
"path/filepath"
"reflect"
"runtime"
@ -17,7 +19,10 @@ import (
"github.com/boltdb/bolt"
"github.com/davecgh/go-spew/spew"
"github.com/lightninglabs/neutrino"
"github.com/roasbeef/btcwallet/chain"
"github.com/roasbeef/btcwallet/walletdb"
_ "github.com/roasbeef/btcwallet/walletdb/bdb"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/chainntnfs/btcdnotify"
@ -25,10 +30,10 @@ import (
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/btcwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/btcjson"
"github.com/roasbeef/btcd/chaincfg"
"github.com/roasbeef/btcd/chaincfg/chainhash"
"github.com/roasbeef/btcd/rpcclient"
_ "github.com/roasbeef/btcwallet/walletdb/bdb"
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/integration/rpctest"
@ -76,7 +81,7 @@ var (
0x69, 0x49, 0x18, 0x83, 0x31, 0x98, 0x47, 0x53,
}
netParams = &chaincfg.SimNetParams
netParams = &chaincfg.RegressionNetParams
chainHash = netParams.GenesisHash
_, alicePub = btcec.PrivKeyFromBytes(btcec.S256(), testHdSeed[:])
@ -145,6 +150,12 @@ func calcStaticFee(numHTLCs int) btcutil.Amount {
func loadTestCredits(miner *rpctest.Harness, w *lnwallet.LightningWallet,
numOutputs, btcPerOutput int) error {
// For initial neutrino connection, wait a second.
// TODO(aakselrod): Eliminate the need for this.
switch w.BackEnd() {
case "neutrino":
time.Sleep(time.Second)
}
// Using the mining node, spend from a coinbase output numOutputs to
// give us btcPerOutput with each output.
satoshiPerOutput := int64(btcPerOutput * 1e8)
@ -188,6 +199,7 @@ func loadTestCredits(miner *rpctest.Harness, w *lnwallet.LightningWallet,
// Wait until the wallet has finished syncing up to the main chain.
ticker := time.NewTicker(100 * time.Millisecond)
timeout := time.After(30 * time.Second)
for range ticker.C {
balance, err := w.ConfirmedBalance(1, false)
@ -197,6 +209,17 @@ func loadTestCredits(miner *rpctest.Harness, w *lnwallet.LightningWallet,
if balance == expectedBalance {
break
}
select {
case <-timeout:
synced, err := w.IsSynced()
if err != nil {
return err
}
return fmt.Errorf("timed out after 30 seconds "+
"waiting for balance %v, current balance %v, "+
"synced: %t", expectedBalance, balance, synced)
default:
}
}
ticker.Stop()
@ -222,7 +245,7 @@ func createTestWallet(tempTestDir string, miningNode *rpctest.Harness,
WalletController: wc,
Signer: signer,
ChainIO: bio,
FeeEstimator: lnwallet.StaticFeeEstimator{FeeRate: 250},
FeeEstimator: lnwallet.StaticFeeEstimator{FeeRate: 10},
DefaultConstraints: channeldb.ChannelConstraints{
DustLimit: 500,
MaxPendingAmount: lnwire.NewMSatFromSatoshis(btcutil.SatoshiPerBitcoin) * 100,
@ -343,6 +366,9 @@ func testDualFundingReservationWorkflow(miner *rpctest.Harness,
bobFundingSigs, bobCommitSig,
)
if err != nil {
for _, in := range aliceChanReservation.FinalFundingTx().TxIn {
fmt.Println(in.PreviousOutPoint.String())
}
t.Fatalf("unable to consume alice's sigs: %v", err)
}
_, err = bobChanReservation.CompleteReservation(
@ -384,6 +410,10 @@ func testDualFundingReservationWorkflow(miner *rpctest.Harness,
// Mine a single block, the funding transaction should be included
// within this block.
err = waitForMempoolTx(miner, &fundingSha)
if err != nil {
t.Fatalf("tx not relayed to miner: %v", err)
}
blockHashes, err := miner.Node.Generate(1)
if err != nil {
t.Fatalf("unable to generate block: %v", err)
@ -402,6 +432,16 @@ func testDualFundingReservationWorkflow(miner *rpctest.Harness,
assertReservationDeleted(aliceChanReservation, t)
assertReservationDeleted(bobChanReservation, t)
// Wait for wallets to catch up to prevent issues in subsequent tests.
err = waitForWalletSync(miner, alice)
if err != nil {
t.Fatalf("unable to sync alice: %v", err)
}
err = waitForWalletSync(miner, bob)
if err != nil {
t.Fatalf("unable to sync bob: %v", err)
}
}
func testFundingTransactionLockedOutputs(miner *rpctest.Harness,
@ -759,6 +799,10 @@ func testSingleFunderReservationWorkflow(miner *rpctest.Harness,
// Mine a single block, the funding transaction should be included
// within this block.
err = waitForMempoolTx(miner, &fundingSha)
if err != nil {
t.Fatalf("tx not relayed to miner: %v", err)
}
blockHashes, err := miner.Node.Generate(1)
if err != nil {
t.Fatalf("unable to generate block: %v", err)
@ -768,7 +812,8 @@ func testSingleFunderReservationWorkflow(miner *rpctest.Harness,
t.Fatalf("unable to find block: %v", err)
}
if len(block.Transactions) != 2 {
t.Fatalf("funding transaction wasn't mined: %v", err)
t.Fatalf("funding transaction wasn't mined: %d",
len(block.Transactions))
}
blockTx := block.Transactions[1]
if blockTx.TxHash() != fundingSha {
@ -815,8 +860,10 @@ func testListTransactionDetails(miner *rpctest.Harness,
}
// Next, fetch all the current transaction details.
// TODO(roasbeef): use ntfn client here instead?
time.Sleep(time.Second * 2)
err = waitForWalletSync(miner, alice)
if err != nil {
t.Fatalf("Couldn't sync Alice's wallet: %v", err)
}
txDetails, err := alice.ListTransactionDetails()
if err != nil {
t.Fatalf("unable to fetch tx details: %v", err)
@ -905,6 +952,10 @@ func testListTransactionDetails(miner *rpctest.Harness,
if err != nil {
t.Fatalf("unable to create burn tx: %v", err)
}
err = waitForMempoolTx(miner, burnTXID)
if err != nil {
t.Fatalf("tx not relayed to miner: %v", err)
}
burnBlock, err := miner.Node.Generate(1)
if err != nil {
t.Fatalf("unable to mine block: %v", err)
@ -912,7 +963,10 @@ func testListTransactionDetails(miner *rpctest.Harness,
// Fetch the transaction details again, the new transaction should be
// shown as debiting from the wallet's balance.
time.Sleep(time.Second * 2)
err = waitForWalletSync(miner, alice)
if err != nil {
t.Fatalf("Couldn't sync Alice's wallet: %v", err)
}
txDetails, err = alice.ListTransactionDetails()
if err != nil {
t.Fatalf("unable to fetch tx details: %v", err)
@ -955,7 +1009,7 @@ func testTransactionSubscriptions(miner *rpctest.Harness,
// implementation of the WalletController.
txClient, err := alice.SubscribeTransactions()
if err != nil {
t.Fatalf("unable to generate tx subscription: %v", err)
t.Skipf("unable to generate tx subscription: %v", err)
}
defer txClient.Cancel()
@ -964,25 +1018,33 @@ func testTransactionSubscriptions(miner *rpctest.Harness,
numTxns = 3
)
unconfirmedNtfns := make(chan struct{})
go func() {
for i := 0; i < numTxns; i++ {
txDetail := <-txClient.UnconfirmedTransactions()
if txDetail.NumConfirmations != 0 {
t.Fatalf("incorrect number of confs, expected %v got %v",
0, txDetail.NumConfirmations)
switch alice.BackEnd() {
case "neutrino":
// Neutrino doesn't listen for unconfirmed transactions.
default:
go func() {
for i := 0; i < numTxns; i++ {
txDetail := <-txClient.UnconfirmedTransactions()
if txDetail.NumConfirmations != 0 {
t.Fatalf("incorrect number of confs, "+
"expected %v got %v", 0,
txDetail.NumConfirmations)
}
if txDetail.Value != outputAmt {
t.Fatalf("incorrect output amt, "+
"expected %v got %v", outputAmt,
txDetail.Value)
}
if txDetail.BlockHash != nil {
t.Fatalf("block hash should be nil, "+
"is instead %v",
txDetail.BlockHash)
}
}
if txDetail.Value != outputAmt {
t.Fatalf("incorrect output amt, expected %v got %v",
outputAmt, txDetail.Value)
}
if txDetail.BlockHash != nil {
t.Fatalf("block hash should be nil, is instead %v",
txDetail.BlockHash)
}
}
close(unconfirmedNtfns)
}()
close(unconfirmedNtfns)
}()
}
// Next, fetch a fresh address from the wallet, create 3 new outputs
// with the pkScript.
@ -1000,17 +1062,27 @@ func testTransactionSubscriptions(miner *rpctest.Harness,
Value: outputAmt,
PkScript: script,
}
if _, err := miner.SendOutputs([]*wire.TxOut{output}, 10); err != nil {
txid, err := miner.SendOutputs([]*wire.TxOut{output}, 10)
if err != nil {
t.Fatalf("unable to send coinbase: %v", err)
}
err = waitForMempoolTx(miner, txid)
if err != nil {
t.Fatalf("tx not relayed to miner: %v", err)
}
}
// We should receive a notification for all three transactions
// generated above.
select {
case <-time.After(time.Second * 5):
t.Fatalf("transactions not received after 3 seconds")
case <-unconfirmedNtfns: // Fall through on successs
switch alice.BackEnd() {
case "neutrino":
// Neutrino doesn't listen for on unconfirmed transactions.
default:
// We should receive a notification for all three transactions
// generated above.
select {
case <-time.After(time.Second * 10):
t.Fatalf("transactions not received after 10 seconds")
case <-unconfirmedNtfns: // Fall through on successs
}
}
confirmedNtfns := make(chan struct{})
@ -1018,12 +1090,12 @@ func testTransactionSubscriptions(miner *rpctest.Harness,
for i := 0; i < numTxns; i++ {
txDetail := <-txClient.ConfirmedTransactions()
if txDetail.NumConfirmations != 1 {
t.Fatalf("incorrect number of confs, expected %v got %v",
1, txDetail.NumConfirmations)
t.Fatalf("incorrect number of confs for %s, expected %v got %v",
txDetail.Hash, 1, txDetail.NumConfirmations)
}
if txDetail.Value != outputAmt {
t.Fatalf("incorrect output amt, expected %v got %v",
outputAmt, txDetail.Value)
t.Fatalf("incorrect output amt, expected %v got %v in txid %s",
outputAmt, txDetail.Value, txDetail.Hash)
}
}
close(confirmedNtfns)
@ -1039,7 +1111,7 @@ func testTransactionSubscriptions(miner *rpctest.Harness,
// since they should be mined in the next block.
select {
case <-time.After(time.Second * 5):
t.Fatalf("transactions not received after 3 seconds")
t.Fatalf("transactions not received after 5 seconds")
case <-confirmedNtfns: // Fall through on success
}
}
@ -1088,7 +1160,7 @@ func testSignOutputUsingTweaks(r *rpctest.Harness,
// generate a regular p2wkh from that.
pubkeyHash := btcutil.Hash160(tweakedKey.SerializeCompressed())
keyAddr, err := btcutil.NewAddressWitnessPubKeyHash(pubkeyHash,
&chaincfg.SimNetParams)
&chaincfg.RegressionNetParams)
if err != nil {
t.Fatalf("unable to create addr: %v", err)
}
@ -1110,6 +1182,10 @@ func testSignOutputUsingTweaks(r *rpctest.Harness,
// Query for the transaction generated above so we can located
// the index of our output.
err = waitForMempoolTx(r, txid)
if err != nil {
t.Fatalf("tx not relayed to miner: %v", err)
}
tx, err := r.Node.GetRawTransaction(txid)
if err != nil {
t.Fatalf("unable to query for tx: %v", err)
@ -1197,7 +1273,7 @@ func testReorgWalletBalance(r *rpctest.Harness, w *lnwallet.LightningWallet,
}
// Give wallet time to catch up.
err = waitForWalletSync(w)
err = waitForWalletSync(r, w)
if err != nil {
t.Fatalf("unable to sync wallet: %v", err)
}
@ -1222,16 +1298,21 @@ func testReorgWalletBalance(r *rpctest.Harness, w *lnwallet.LightningWallet,
Value: 1e8,
PkScript: script,
}
if _, err = w.SendOutputs([]*wire.TxOut{output}, 10); err != nil {
txid, err := w.SendOutputs([]*wire.TxOut{output}, 10)
if err != nil {
t.Fatalf("unable to send outputs: %v", err)
}
err = waitForMempoolTx(r, txid)
if err != nil {
t.Fatalf("tx not relayed to miner: %v", err)
}
_, err = r.Node.Generate(50)
if err != nil {
t.Fatalf("unable to generate blocks on passed node: %v", err)
}
// Give wallet time to catch up.
err = waitForWalletSync(w)
err = waitForWalletSync(r, w)
if err != nil {
t.Fatalf("unable to sync wallet: %v", err)
}
@ -1277,32 +1358,34 @@ func testReorgWalletBalance(r *rpctest.Harness, w *lnwallet.LightningWallet,
// one block on the passed miner and two on the created miner,
// connecting them, and waiting for them to sync.
for i := 0; i < 5; i++ {
peers, err := r2.Node.GetPeerInfo()
if err != nil {
t.Fatalf("unable to get peer info: %v", err)
}
numPeers := len(peers)
err = r2.Node.AddNode(r.P2PAddress(), rpcclient.ANRemove)
if err != nil {
t.Fatalf("unable to disconnect mining nodes: %v", err)
}
// Wait for disconnection
timeout := time.After(30 * time.Second)
for true {
stillConnected := true
var peers []btcjson.GetPeerInfoResult
for stillConnected {
// Allow for timeout
time.Sleep(100 * time.Millisecond)
select {
case <-timeout:
t.Fatalf("timeout waiting for miner disconnect")
default:
}
err = r2.Node.AddNode(r.P2PAddress(), rpcclient.ANRemove)
if err != nil {
t.Fatalf("unable to disconnect mining nodes: %v",
err)
}
peers, err = r2.Node.GetPeerInfo()
if err != nil {
t.Fatalf("unable to get peer info: %v", err)
}
if len(peers) < numPeers {
break
stillConnected = false
for _, peer := range peers {
if peer.Addr == r.P2PAddress() {
stillConnected = true
break
}
}
time.Sleep(100 * time.Millisecond)
}
_, err = r.Node.Generate(2)
if err != nil {
@ -1318,8 +1401,16 @@ func testReorgWalletBalance(r *rpctest.Harness, w *lnwallet.LightningWallet,
// Step 5: Reconnect the miners and wait for them to synchronize.
err = r2.Node.AddNode(r.P2PAddress(), rpcclient.ANAdd)
if err != nil {
t.Fatalf("unable to connect mining nodes together: %v",
err)
switch err := err.(type) {
case *btcjson.RPCError:
if err.Code != -8 {
t.Fatalf("unable to connect mining "+
"nodes together: %v", err)
}
default:
t.Fatalf("unable to connect mining nodes "+
"together: %v", err)
}
}
err = rpctest.JoinNodes([]*rpctest.Harness{r2, r},
rpctest.Blocks)
@ -1328,7 +1419,7 @@ func testReorgWalletBalance(r *rpctest.Harness, w *lnwallet.LightningWallet,
}
// Give wallet time to catch up.
err = waitForWalletSync(w)
err = waitForWalletSync(r, w)
if err != nil {
t.Fatalf("unable to sync wallet: %v", err)
}
@ -1405,21 +1496,78 @@ func clearWalletStates(a, b *lnwallet.LightningWallet) error {
return b.Cfg.Database.Wipe()
}
func waitForWalletSync(w *lnwallet.LightningWallet) error {
var synced bool
func waitForMempoolTx(r *rpctest.Harness, txid *chainhash.Hash) error {
var found bool
var tx *btcutil.Tx
var err error
timeout := time.After(10 * time.Second)
for !synced {
synced, err = w.IsSynced()
if err != nil {
return err
}
for !found {
// Do a short wait
select {
case <-timeout:
return fmt.Errorf("timeout after 10s")
default:
}
time.Sleep(100 * time.Millisecond)
// Check for the harness' knowledge of the txid
tx, err = r.Node.GetRawTransaction(txid)
if err != nil {
switch e := err.(type) {
case *btcjson.RPCError:
if e.Code == btcjson.ErrRPCNoTxInfo {
continue
}
default:
}
return err
}
if tx != nil && tx.MsgTx().TxHash() == *txid {
found = true
}
}
return nil
}
func waitForWalletSync(r *rpctest.Harness, w *lnwallet.LightningWallet) error {
var synced bool
var err error
var bestHash, knownHash *chainhash.Hash
var bestHeight, knownHeight int32
timeout := time.After(10 * time.Second)
for !synced {
// Do a short wait
select {
case <-timeout:
return fmt.Errorf("timeout after 10s")
default:
}
time.Sleep(100 * time.Millisecond)
// Check whether the chain source of the wallet is caught up to
// the harness it's supposed to be catching up to.
bestHash, bestHeight, err = r.Node.GetBestBlock()
if err != nil {
return err
}
knownHash, knownHeight, err = w.Cfg.ChainIO.GetBestBlock()
if err != nil {
return err
}
if knownHeight != bestHeight {
continue
}
if *knownHash != *bestHash {
return fmt.Errorf("hash at height %d doesn't match: "+
"expected %s, got %s", bestHeight, bestHash,
knownHash)
}
// Check for synchronization.
synced, err = w.IsSynced()
if err != nil {
return err
}
}
return nil
}
@ -1454,7 +1602,7 @@ func TestLightningWallet(t *testing.T) {
}
// Next mine enough blocks in order for segwit and the CSV package
// soft-fork to activate on SimNet.
// soft-fork to activate on RegNet.
numBlocks := netParams.MinerConfirmationWindow * 2
if _, err := miningNode.Node.Generate(numBlocks); err != nil {
t.Fatalf("unable to generate blocks: %v", err)
@ -1470,6 +1618,22 @@ func TestLightningWallet(t *testing.T) {
t.Fatalf("unable to start notifier: %v", err)
}
for _, walletDriver := range lnwallet.RegisteredWallets() {
for _, backEnd := range walletDriver.BackEnds() {
runTests(t, walletDriver, backEnd, miningNode,
rpcConfig, chainNotifier)
}
}
}
// runTests runs all of the tests for a single interface implementation and
// chain back-end combination. This makes it easier to use `defer` as well as
// factoring out the test logic from the loop which cycles through the
// interface implementations.
func runTests(t *testing.T, walletDriver *lnwallet.WalletDriver,
backEnd string, miningNode *rpctest.Harness,
rpcConfig rpcclient.ConnConfig,
chainNotifier *btcdnotify.BtcdNotifier) {
var (
bio lnwallet.BlockChainIO
@ -1478,107 +1642,230 @@ func TestLightningWallet(t *testing.T) {
aliceWalletController lnwallet.WalletController
bobWalletController lnwallet.WalletController
feeEstimator lnwallet.FeeEstimator
)
for _, walletDriver := range lnwallet.RegisteredWallets() {
tempTestDirAlice, err := ioutil.TempDir("", "lnwallet")
if err != nil {
t.Fatalf("unable to create temp directory: %v", err)
}
defer os.RemoveAll(tempTestDirAlice)
tempTestDirBob, err := ioutil.TempDir("", "lnwallet")
if err != nil {
t.Fatalf("unable to create temp directory: %v", err)
}
defer os.RemoveAll(tempTestDirBob)
tempTestDirAlice, err := ioutil.TempDir("", "lnwallet")
if err != nil {
t.Fatalf("unable to create temp directory: %v", err)
}
defer os.RemoveAll(tempTestDirAlice)
walletType := walletDriver.WalletType
switch walletType {
case "btcwallet":
aliceChainRPC, err := chain.NewRPCClient(netParams,
tempTestDirBob, err := ioutil.TempDir("", "lnwallet")
if err != nil {
t.Fatalf("unable to create temp directory: %v", err)
}
defer os.RemoveAll(tempTestDirBob)
walletType := walletDriver.WalletType
switch walletType {
case "btcwallet":
var aliceClient, bobClient chain.Interface
switch backEnd {
case "btcd":
feeEstimator, err = lnwallet.NewBtcdFeeEstimator(
rpcConfig, 250)
if err != nil {
t.Fatalf("unable to create btcd fee estimator: %v",
err)
}
aliceClient, err = chain.NewRPCClient(netParams,
rpcConfig.Host, rpcConfig.User, rpcConfig.Pass,
rpcConfig.Certificates, false, 20)
if err != nil {
t.Fatalf("unable to make chain rpc: %v", err)
}
aliceWalletConfig := &btcwallet.Config{
PrivatePass: []byte("alice-pass"),
HdSeed: aliceHDSeed[:],
DataDir: tempTestDirAlice,
NetParams: netParams,
ChainSource: aliceChainRPC,
FeeEstimator: lnwallet.StaticFeeEstimator{FeeRate: 250},
}
aliceWalletController, err = walletDriver.New(aliceWalletConfig)
if err != nil {
t.Fatalf("unable to create btcwallet: %v", err)
}
aliceSigner = aliceWalletController.(*btcwallet.BtcWallet)
bobChainRPC, err := chain.NewRPCClient(netParams,
bobClient, err = chain.NewRPCClient(netParams,
rpcConfig.Host, rpcConfig.User, rpcConfig.Pass,
rpcConfig.Certificates, false, 20)
if err != nil {
t.Fatalf("unable to make chain rpc: %v", err)
}
bobWalletConfig := &btcwallet.Config{
PrivatePass: []byte("bob-pass"),
HdSeed: bobHDSeed[:],
DataDir: tempTestDirBob,
NetParams: netParams,
ChainSource: bobChainRPC,
FeeEstimator: lnwallet.StaticFeeEstimator{FeeRate: 250},
}
bobWalletController, err = walletDriver.New(bobWalletConfig)
case "neutrino":
feeEstimator = lnwallet.StaticFeeEstimator{FeeRate: 250}
// Set some package-level variable to speed up
// operation for tests.
neutrino.WaitForMoreCFHeaders = time.Millisecond * 100
neutrino.BanDuration = time.Millisecond * 100
neutrino.QueryTimeout = time.Millisecond * 500
neutrino.QueryNumRetries = 2
// Start Alice - open a database, start a neutrino
// instance, and initialize a btcwallet driver for it.
aliceDB, err := walletdb.Create("bdb",
tempTestDirAlice+"/neutrino.db")
if err != nil {
t.Fatalf("unable to create btcwallet: %v", err)
t.Fatalf("unable to create DB: %v", err)
}
defer aliceDB.Close()
aliceChain, err := neutrino.NewChainService(
neutrino.Config{
DataDir: tempTestDirAlice,
Database: aliceDB,
Namespace: []byte("alice"),
ChainParams: *netParams,
ConnectPeers: []string{
miningNode.P2PAddress(),
},
},
)
if err != nil {
t.Fatalf("unable to make neutrino: %v", err)
}
aliceChain.Start()
defer aliceChain.Stop()
aliceClient = chain.NewNeutrinoClient(aliceChain)
// Start Bob - open a database, start a neutrino
// instance, and initialize a btcwallet driver for it.
bobDB, err := walletdb.Create("bdb",
tempTestDirBob+"/neutrino.db")
if err != nil {
t.Fatalf("unable to create DB: %v", err)
}
defer bobDB.Close()
bobChain, err := neutrino.NewChainService(
neutrino.Config{
DataDir: tempTestDirBob,
Database: bobDB,
Namespace: []byte("bob"),
ChainParams: *netParams,
ConnectPeers: []string{
miningNode.P2PAddress(),
},
},
)
if err != nil {
t.Fatalf("unable to make neutrino: %v", err)
}
bobChain.Start()
defer bobChain.Stop()
bobClient = chain.NewNeutrinoClient(bobChain)
case "bitcoind":
feeEstimator, err = lnwallet.NewBitcoindFeeEstimator(
rpcConfig, 250)
if err != nil {
t.Fatalf("unable to create bitcoind fee estimator: %v",
err)
}
// Start a bitcoind instance.
tempBitcoindDir, err := ioutil.TempDir("", "bitcoind")
if err != nil {
t.Fatalf("unable to create temp directory: %v", err)
}
zmqPath := "ipc:///" + tempBitcoindDir + "/weks.socket"
defer os.RemoveAll(tempBitcoindDir)
rpcPort := rand.Int()%(65536-1024) + 1024
bitcoind := exec.Command(
"bitcoind",
"-datadir="+tempBitcoindDir,
"-regtest",
"-connect="+miningNode.P2PAddress(),
"-txindex",
"-rpcauth=weks:469e9bb14ab2360f8e226efed5ca6f"+
"d$507c670e800a95284294edb5773b05544b"+
"220110063096c221be9933c82d38e1",
fmt.Sprintf("-rpcport=%d", rpcPort),
"-disablewallet",
"-zmqpubrawblock="+zmqPath,
"-zmqpubrawtx="+zmqPath,
)
err = bitcoind.Start()
if err != nil {
t.Fatalf("couldn't start bitcoind: %v", err)
}
defer bitcoind.Wait()
defer bitcoind.Process.Kill()
// Start an Alice btcwallet bitcoind back end instance.
aliceClient, err = chain.NewBitcoindClient(netParams,
fmt.Sprintf("127.0.0.1:%d", rpcPort), "weks",
"weks", zmqPath, 100*time.Millisecond)
if err != nil {
t.Fatalf("couldn't start alice client: %v", err)
}
// Start a Bob btcwallet bitcoind back end instance.
bobClient, err = chain.NewBitcoindClient(netParams,
fmt.Sprintf("127.0.0.1:%d", rpcPort), "weks",
"weks", zmqPath, 100*time.Millisecond)
if err != nil {
t.Fatalf("couldn't start bob client: %v", err)
}
bobSigner = bobWalletController.(*btcwallet.BtcWallet)
bio = bobWalletController.(*btcwallet.BtcWallet)
default:
// TODO(roasbeef): add neutrino case
t.Fatalf("unknown wallet driver: %v", walletType)
t.Fatalf("unknown chain driver: %v", backEnd)
}
// Funding via 20 outputs with 4BTC each.
alice, err := createTestWallet(tempTestDirAlice, miningNode,
netParams, chainNotifier, aliceWalletController,
aliceSigner, bio)
aliceWalletConfig := &btcwallet.Config{
PrivatePass: []byte("alice-pass"),
HdSeed: aliceHDSeed[:],
DataDir: tempTestDirAlice,
NetParams: netParams,
ChainSource: aliceClient,
FeeEstimator: feeEstimator,
}
aliceWalletController, err = walletDriver.New(aliceWalletConfig)
if err != nil {
t.Fatalf("unable to create test ln wallet: %v", err)
t.Fatalf("unable to create btcwallet: %v", err)
}
defer alice.Shutdown()
aliceSigner = aliceWalletController.(*btcwallet.BtcWallet)
bob, err := createTestWallet(tempTestDirBob, miningNode,
netParams, chainNotifier, bobWalletController,
bobSigner, bio)
bobWalletConfig := &btcwallet.Config{
PrivatePass: []byte("bob-pass"),
HdSeed: bobHDSeed[:],
DataDir: tempTestDirBob,
NetParams: netParams,
ChainSource: bobClient,
FeeEstimator: feeEstimator,
}
bobWalletController, err = walletDriver.New(bobWalletConfig)
if err != nil {
t.Fatalf("unable to create test ln wallet: %v", err)
t.Fatalf("unable to create btcwallet: %v", err)
}
defer bob.Shutdown()
bobSigner = bobWalletController.(*btcwallet.BtcWallet)
bio = bobWalletController.(*btcwallet.BtcWallet)
default:
t.Fatalf("unknown wallet driver: %v", walletType)
}
// Both wallets should now have 80BTC available for spending.
assertProperBalance(t, alice, 1, 80)
assertProperBalance(t, bob, 1, 80)
// Funding via 20 outputs with 4BTC each.
alice, err := createTestWallet(tempTestDirAlice, miningNode, netParams,
chainNotifier, aliceWalletController, aliceSigner, bio)
if err != nil {
t.Fatalf("unable to create test ln wallet: %v", err)
}
defer alice.Shutdown()
// Execute every test, clearing possibly mutated wallet state
// after each step.
for _, walletTest := range walletTests {
testName := fmt.Sprintf("%v:%v", walletType,
walletTest.name)
success := t.Run(testName, func(t *testing.T) {
walletTest.test(miningNode, alice, bob, t)
})
if !success {
break
}
bob, err := createTestWallet(tempTestDirBob, miningNode, netParams,
chainNotifier, bobWalletController, bobSigner, bio)
if err != nil {
t.Fatalf("unable to create test ln wallet: %v", err)
}
defer bob.Shutdown()
// TODO(roasbeef): possible reset mining node's
// chainstate to initial level, cleanly wipe buckets
if err := clearWalletStates(alice, bob); err != nil &&
err != bolt.ErrBucketNotFound {
t.Fatalf("unable to wipe wallet state: %v", err)
}
// Both wallets should now have 80BTC available for
// spending.
assertProperBalance(t, alice, 1, 80)
assertProperBalance(t, bob, 1, 80)
// Execute every test, clearing possibly mutated
// wallet state after each step.
for _, walletTest := range walletTests {
testName := fmt.Sprintf("%v/%v:%v", walletType, backEnd,
walletTest.name)
success := t.Run(testName, func(t *testing.T) {
walletTest.test(miningNode, alice, bob, t)
})
if !success {
break
}
// TODO(roasbeef): possible reset mining
// node's chainstate to initial level, cleanly
// wipe buckets
if err := clearWalletStates(alice, bob); err !=
nil && err != bolt.ErrBucketNotFound {
t.Fatalf("unable to wipe wallet state: %v", err)
}
}
}

@ -0,0 +1,460 @@
package chainview
import (
"bytes"
"encoding/hex"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/roasbeef/btcd/btcjson"
"github.com/roasbeef/btcd/chaincfg"
"github.com/roasbeef/btcd/chaincfg/chainhash"
"github.com/roasbeef/btcd/rpcclient"
"github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil"
"github.com/roasbeef/btcwallet/chain"
"github.com/roasbeef/btcwallet/wtxmgr"
)
// BitcoindFilteredChainView is an implementation of the FilteredChainView
// interface which is backed by bitcoind.
type BitcoindFilteredChainView struct {
started int32
stopped int32
// bestHeight is the height of the latest block added to the
// blockQueue from the onFilteredConnectedMethod. It is used to
// determine up to what height we would need to rescan in case
// of a filter update.
bestHeightMtx sync.Mutex
bestHeight uint32
// TODO: Factor out common logic between bitcoind and btcd into a
// NodeFilteredView interface.
chainClient *chain.BitcoindClient
// blockEventQueue is the ordered queue used to keep the order
// of connected and disconnected blocks sent to the reader of the
// chainView.
blockQueue *blockEventQueue
// filterUpdates is a channel in which updates to the utxo filter
// attached to this instance are sent over.
filterUpdates chan filterUpdate
// chainFilter is the set of utox's that we're currently watching
// spends for within the chain.
filterMtx sync.RWMutex
chainFilter map[wire.OutPoint]struct{}
// filterBlockReqs is a channel in which requests to filter select
// blocks will be sent over.
filterBlockReqs chan *filterBlockReq
quit chan struct{}
wg sync.WaitGroup
}
// A compile time check to ensure BitcoindFilteredChainView implements the
// chainview.FilteredChainView.
var _ FilteredChainView = (*BitcoindFilteredChainView)(nil)
// NewBitcoindFilteredChainView creates a new instance of a FilteredChainView
// from RPC credentials and a ZMQ socket address for a bitcoind instance.
func NewBitcoindFilteredChainView(config rpcclient.ConnConfig,
zmqConnect string, params chaincfg.Params) (*BitcoindFilteredChainView,
error) {
chainView := &BitcoindFilteredChainView{
chainFilter: make(map[wire.OutPoint]struct{}),
filterUpdates: make(chan filterUpdate),
filterBlockReqs: make(chan *filterBlockReq),
quit: make(chan struct{}),
}
chainConn, err := chain.NewBitcoindClient(&params, config.Host,
config.User, config.Pass, zmqConnect, 100*time.Millisecond)
if err != nil {
return nil, err
}
chainView.chainClient = chainConn
chainView.blockQueue = newBlockEventQueue()
return chainView, nil
}
// Start starts all goroutines necessary for normal operation.
//
// NOTE: This is part of the FilteredChainView interface.
func (b *BitcoindFilteredChainView) Start() error {
// Already started?
if atomic.AddInt32(&b.started, 1) != 1 {
return nil
}
log.Infof("FilteredChainView starting")
err := b.chainClient.Start()
if err != nil {
return err
}
_, bestHeight, err := b.chainClient.GetBestBlock()
if err != nil {
return err
}
b.bestHeightMtx.Lock()
b.bestHeight = uint32(bestHeight)
b.bestHeightMtx.Unlock()
b.blockQueue.Start()
b.wg.Add(1)
go b.chainFilterer()
return nil
}
// Stop stops all goroutines which we launched by the prior call to the Start
// method.
//
// NOTE: This is part of the FilteredChainView interface.
func (b *BitcoindFilteredChainView) Stop() error {
// Already shutting down?
if atomic.AddInt32(&b.stopped, 1) != 1 {
return nil
}
// Shutdown the rpc client, this gracefully disconnects from bitcoind's
// zmq socket, and cleans up all related resources.
b.chainClient.Stop()
b.blockQueue.Stop()
log.Infof("FilteredChainView stopping")
close(b.quit)
b.wg.Wait()
return nil
}
// onFilteredBlockConnected is called for each block that's connected to the
// end of the main chain. Based on our current chain filter, the block may or
// may not include any relevant transactions.
func (b *BitcoindFilteredChainView) onFilteredBlockConnected(height int32,
hash chainhash.Hash, txns []*wtxmgr.TxRecord) {
mtxs := make([]*wire.MsgTx, len(txns))
for i, tx := range txns {
mtxs[i] = &tx.MsgTx
for _, txIn := range mtxs[i].TxIn {
// We can delete this outpoint from the chainFilter, as
// we just received a block where it was spent. In case
// of a reorg, this outpoint might get "un-spent", but
// that's okay since it would never be wise to consider
// the channel open again (since a spending transaction
// exists on the network).
b.filterMtx.Lock()
delete(b.chainFilter, txIn.PreviousOutPoint)
b.filterMtx.Unlock()
}
}
// We record the height of the last connected block added to the
// blockQueue such that we can scan up to this height in case of
// a rescan. It must be protected by a mutex since a filter update
// might be trying to read it concurrently.
b.bestHeightMtx.Lock()
b.bestHeight = uint32(height)
b.bestHeightMtx.Unlock()
block := &FilteredBlock{
Hash: hash,
Height: uint32(height),
Transactions: mtxs,
}
b.blockQueue.Add(&blockEvent{
eventType: connected,
block: block,
})
}
// onFilteredBlockDisconnected is a callback which is executed once a block is
// disconnected from the end of the main chain.
func (b *BitcoindFilteredChainView) onFilteredBlockDisconnected(height int32,
hash chainhash.Hash) {
log.Debugf("got disconnected block at height %d: %v", height,
hash)
filteredBlock := &FilteredBlock{
Hash: hash,
Height: uint32(height),
}
b.blockQueue.Add(&blockEvent{
eventType: disconnected,
block: filteredBlock,
})
}
// FilterBlock takes a block hash, and returns a FilteredBlocks which is the
// result of applying the current registered UTXO sub-set on the block
// corresponding to that block hash. If any watched UTOX's are spent by the
// selected lock, then the internal chainFilter will also be updated.
//
// NOTE: This is part of the FilteredChainView interface.
func (b *BitcoindFilteredChainView) FilterBlock(blockHash *chainhash.Hash) (*FilteredBlock, error) {
req := &filterBlockReq{
blockHash: blockHash,
resp: make(chan *FilteredBlock, 1),
err: make(chan error, 1),
}
select {
case b.filterBlockReqs <- req:
case <-b.quit:
return nil, fmt.Errorf("FilteredChainView shutting down")
}
return <-req.resp, <-req.err
}
// chainFilterer is the primary goroutine which: listens for new blocks coming
// and dispatches the relevent FilteredBlock notifications, updates the filter
// due to requests by callers, and finally is able to preform targeted block
// filtration.
//
// TODO(roasbeef): change to use loadfilter RPC's
func (b *BitcoindFilteredChainView) chainFilterer() {
defer b.wg.Done()
// filterBlock is a helper funciton that scans the given block, and
// notes which transactions spend outputs which are currently being
// watched. Additionally, the chain filter will also be updated by
// removing any spent outputs.
filterBlock := func(blk *wire.MsgBlock) []*wire.MsgTx {
var filteredTxns []*wire.MsgTx
for _, tx := range blk.Transactions {
for _, txIn := range tx.TxIn {
prevOp := txIn.PreviousOutPoint
if _, ok := b.chainFilter[prevOp]; ok {
filteredTxns = append(filteredTxns, tx)
b.filterMtx.Lock()
delete(b.chainFilter, prevOp)
b.filterMtx.Unlock()
break
}
}
}
return filteredTxns
}
decodeJSONBlock := func(block *btcjson.RescannedBlock,
height uint32) (*FilteredBlock, error) {
hash, err := chainhash.NewHashFromStr(block.Hash)
if err != nil {
return nil, err
}
txs := make([]*wire.MsgTx, 0, len(block.Transactions))
for _, str := range block.Transactions {
b, err := hex.DecodeString(str)
if err != nil {
return nil, err
}
tx := &wire.MsgTx{}
err = tx.Deserialize(bytes.NewReader(b))
if err != nil {
return nil, err
}
txs = append(txs, tx)
}
return &FilteredBlock{
Hash: *hash,
Height: height,
Transactions: txs,
}, nil
}
for {
select {
// The caller has just sent an update to the current chain
// filter, so we'll apply the update, possibly rewinding our
// state partially.
case update := <-b.filterUpdates:
// First, we'll add all the new UTXO's to the set of
// watched UTXO's, eliminating any duplicates in the
// process.
log.Debugf("Updating chain filter with new UTXO's: %v",
update.newUtxos)
for _, newOp := range update.newUtxos {
b.filterMtx.Lock()
b.chainFilter[newOp] = struct{}{}
b.filterMtx.Unlock()
}
// Apply the new TX filter to the chain client, which
// will cause all following notifications from and
// calls to it return blocks filtered with the new
// filter.
b.chainClient.LoadTxFilter(false, []btcutil.Address{},
update.newUtxos)
// All blocks gotten after we loaded the filter will
// have the filter applied, but we will need to rescan
// the blocks up to the height of the block we last
// added to the blockQueue.
b.bestHeightMtx.Lock()
bestHeight := b.bestHeight
b.bestHeightMtx.Unlock()
// If the update height matches our best known height,
// then we don't need to do any rewinding.
if update.updateHeight == bestHeight {
continue
}
// Otherwise, we'll rewind the state to ensure the
// caller doesn't miss any relevant notifications.
// Starting from the height _after_ the update height,
// we'll walk forwards, rescanning one block at a time
// with the chain client applying the newly loaded
// filter to each block.
for i := update.updateHeight + 1; i < bestHeight+1; i++ {
blockHash, err := b.chainClient.GetBlockHash(int64(i))
if err != nil {
log.Warnf("Unable to get block hash "+
"for block at height %d: %v",
i, err)
continue
}
// To avoid dealing with the case where a reorg
// is happening while we rescan, we scan one
// block at a time, skipping blocks that might
// have gone missing.
rescanned, err := b.chainClient.RescanBlocks(
[]chainhash.Hash{*blockHash})
if err != nil {
log.Warnf("Unable to rescan block "+
"with hash %v at height %d: %v",
blockHash, i, err)
continue
}
// If no block was returned from the rescan, it
// means no matching transactions were found.
if len(rescanned) != 1 {
log.Tracef("rescan of block %v at "+
"height=%d yielded no "+
"transactions", blockHash, i)
continue
}
decoded, err := decodeJSONBlock(
&rescanned[0], i)
if err != nil {
log.Errorf("Unable to decode block: %v",
err)
continue
}
b.blockQueue.Add(&blockEvent{
eventType: connected,
block: decoded,
})
}
// We've received a new request to manually filter a block.
case req := <-b.filterBlockReqs:
// First we'll fetch the block itself as well as some
// additional information including its height.
block, err := b.chainClient.GetBlock(req.blockHash)
if err != nil {
req.err <- err
req.resp <- nil
continue
}
header, err := b.chainClient.GetBlockHeaderVerbose(
req.blockHash)
if err != nil {
req.err <- err
req.resp <- nil
continue
}
// Once we have this info, we can directly filter the
// block and dispatch the proper notification.
req.resp <- &FilteredBlock{
Hash: *req.blockHash,
Height: uint32(header.Height),
Transactions: filterBlock(block),
}
req.err <- err
// We've received a new event from the chain client.
case event := <-b.chainClient.Notifications():
switch e := event.(type) {
case chain.FilteredBlockConnected:
b.onFilteredBlockConnected(e.Block.Height,
e.Block.Hash, e.RelevantTxs)
case chain.BlockDisconnected:
b.onFilteredBlockDisconnected(e.Height, e.Hash)
}
case <-b.quit:
return
}
}
}
// UpdateFilter updates the UTXO filter which is to be consulted when creating
// FilteredBlocks to be sent to subscribed clients. This method is cumulative
// meaning repeated calls to this method should _expand_ the size of the UTXO
// sub-set currently being watched. If the set updateHeight is _lower_ than
// the best known height of the implementation, then the state should be
// rewound to ensure all relevant notifications are dispatched.
//
// NOTE: This is part of the FilteredChainView interface.
func (b *BitcoindFilteredChainView) UpdateFilter(ops []wire.OutPoint, updateHeight uint32) error {
select {
case b.filterUpdates <- filterUpdate{
newUtxos: ops,
updateHeight: updateHeight,
}:
return nil
case <-b.quit:
return fmt.Errorf("chain filter shutting down")
}
}
// FilteredBlocks returns the channel that filtered blocks are to be sent over.
// Each time a block is connected to the end of a main chain, and appropriate
// FilteredBlock which contains the transactions which mutate our watched UTXO
// set is to be returned.
//
// NOTE: This is part of the FilteredChainView interface.
func (b *BitcoindFilteredChainView) FilteredBlocks() <-chan *FilteredBlock {
return b.blockQueue.newBlocks
}
// DisconnectedBlocks returns a receive only channel which will be sent upon
// with the empty filtered blocks of blocks which are disconnected from the
// main chain in the case of a re-org.
//
// NOTE: This is part of the FilteredChainView interface.
func (b *BitcoindFilteredChainView) DisconnectedBlocks() <-chan *FilteredBlock {
return b.blockQueue.staleBlocks
}

@ -4,13 +4,16 @@ import (
"bytes"
"fmt"
"io/ioutil"
"math/rand"
"os"
"os/exec"
"path/filepath"
"runtime"
"testing"
"time"
"github.com/lightninglabs/neutrino"
"github.com/ltcsuite/ltcd/btcjson"
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/chaincfg"
"github.com/roasbeef/btcd/chaincfg/chainhash"
@ -25,7 +28,7 @@ import (
)
var (
netParams = &chaincfg.SimNetParams
netParams = &chaincfg.RegressionNetParams
testPrivKey = []byte{
0x81, 0xb6, 0x37, 0xd8, 0xfc, 0xd2, 0xc6, 0xda,
@ -42,6 +45,39 @@ var (
testScript, _ = txscript.PayToAddrScript(testAddr)
)
func waitForMempoolTx(r *rpctest.Harness, txid *chainhash.Hash) error {
var found bool
var tx *btcutil.Tx
var err error
timeout := time.After(10 * time.Second)
for !found {
// Do a short wait
select {
case <-timeout:
return fmt.Errorf("timeout after 10s")
default:
}
time.Sleep(100 * time.Millisecond)
// Check for the harness' knowledge of the txid
tx, err = r.Node.GetRawTransaction(txid)
if err != nil {
switch e := err.(type) {
case *btcjson.RPCError:
if e.Code == btcjson.ErrRPCNoTxInfo {
continue
}
default:
}
return err
}
if tx != nil && tx.MsgTx().TxHash() == *txid {
found = true
}
}
return nil
}
func getTestTXID(miner *rpctest.Harness) (*chainhash.Hash, error) {
script, err := txscript.PayToAddrScript(testAddr)
if err != nil {
@ -131,11 +167,19 @@ func testFilterBlockNotifications(node *rpctest.Harness,
// private key that we generated above.
txid1, err := getTestTXID(node)
if err != nil {
t.Fatalf("unable to get test txid")
t.Fatalf("unable to get test txid: %v", err)
}
err = waitForMempoolTx(node, txid1)
if err != nil {
t.Fatalf("unable to get test txid in mempool: %v", err)
}
txid2, err := getTestTXID(node)
if err != nil {
t.Fatalf("unable to get test txid")
t.Fatalf("unable to get test txid: %v", err)
}
err = waitForMempoolTx(node, txid2)
if err != nil {
t.Fatalf("unable to get test txid in mempool: %v", err)
}
blockChan := chainView.FilteredBlocks()
@ -218,6 +262,10 @@ func testFilterBlockNotifications(node *rpctest.Harness,
if err != nil {
t.Fatalf("unable to broadcast transaction: %v", err)
}
err = waitForMempoolTx(node, spendTxid1)
if err != nil {
t.Fatalf("unable to get spending txid in mempool: %v", err)
}
newBlockHashes, err = node.Node.Generate(1)
if err != nil {
t.Fatalf("unable to generate block: %v", err)
@ -240,6 +288,10 @@ func testFilterBlockNotifications(node *rpctest.Harness,
if err != nil {
t.Fatalf("unable to broadcast transaction: %v", err)
}
err = waitForMempoolTx(node, spendTxid2)
if err != nil {
t.Fatalf("unable to get spending txid in mempool: %v", err)
}
newBlockHashes, err = node.Node.Generate(1)
if err != nil {
t.Fatalf("unable to generate block: %v", err)
@ -264,6 +316,10 @@ func testUpdateFilterBackTrack(node *rpctest.Harness,
if err != nil {
t.Fatalf("unable to get test txid")
}
err = waitForMempoolTx(node, txid)
if err != nil {
t.Fatalf("unable to get test txid in mempool: %v", err)
}
// Next we'll mine a block confirming the output generated above.
initBlockHashes, err := node.Node.Generate(1)
@ -306,6 +362,10 @@ func testUpdateFilterBackTrack(node *rpctest.Harness,
if err != nil {
t.Fatalf("unable to broadcast transaction: %v", err)
}
err = waitForMempoolTx(node, spendTxid)
if err != nil {
t.Fatalf("unable to get spending txid in mempool: %v", err)
}
newBlockHashes, err := node.Node.Generate(1)
if err != nil {
t.Fatalf("unable to generate block: %v", err)
@ -352,10 +412,18 @@ func testFilterSingleBlock(node *rpctest.Harness, chainView FilteredChainView,
if err != nil {
t.Fatalf("unable to get test txid")
}
err = waitForMempoolTx(node, txid1)
if err != nil {
t.Fatalf("unable to get test txid in mempool: %v", err)
}
txid2, err := getTestTXID(node)
if err != nil {
t.Fatalf("unable to get test txid")
}
err = waitForMempoolTx(node, txid2)
if err != nil {
t.Fatalf("unable to get test txid in mempool: %v", err)
}
blockChan := chainView.FilteredBlocks()
@ -671,7 +739,7 @@ var chainViewTests = []testCase{
test: testUpdateFilterBackTrack,
},
{
name: "fitler single block",
name: "filter single block",
test: testFilterSingleBlock,
},
{
@ -684,6 +752,68 @@ var interfaceImpls = []struct {
name string
chainViewInit chainViewInitFunc
}{
{
name: "bitcoind_zmq",
chainViewInit: func(_ rpcclient.ConnConfig, p2pAddr string) (func(), FilteredChainView, error) {
// Start a bitcoind instance.
tempBitcoindDir, err := ioutil.TempDir("", "bitcoind")
if err != nil {
return nil, nil, err
}
zmqPath := "ipc:///" + tempBitcoindDir + "/weks.socket"
cleanUp1 := func() {
os.RemoveAll(tempBitcoindDir)
}
rpcPort := rand.Int()%(65536-1024) + 1024
bitcoind := exec.Command(
"bitcoind",
"-datadir="+tempBitcoindDir,
"-regtest",
"-connect="+p2pAddr,
"-txindex",
"-rpcauth=weks:469e9bb14ab2360f8e226efed5ca6f"+
"d$507c670e800a95284294edb5773b05544b"+
"220110063096c221be9933c82d38e1",
fmt.Sprintf("-rpcport=%d", rpcPort),
"-disablewallet",
"-zmqpubrawblock="+zmqPath,
"-zmqpubrawtx="+zmqPath,
)
err = bitcoind.Start()
if err != nil {
cleanUp1()
return nil, nil, err
}
cleanUp2 := func() {
bitcoind.Process.Kill()
bitcoind.Wait()
cleanUp1()
}
// Wait for the bitcoind instance to start up.
time.Sleep(time.Second)
// Start the FilteredChainView implementation instance.
config := rpcclient.ConnConfig{
Host: fmt.Sprintf(
"127.0.0.1:%d", rpcPort),
User: "weks",
Pass: "weks",
DisableAutoReconnect: false,
DisableConnectOnNew: true,
DisableTLS: true,
HTTPPostMode: true,
}
chainView, err := NewBitcoindFilteredChainView(config,
zmqPath, chaincfg.RegressionNetParams)
if err != nil {
cleanUp2()
return nil, nil, err
}
return cleanUp2, chainView, nil
},
},
{
name: "p2p_neutrino",
chainViewInit: func(_ rpcclient.ConnConfig, p2pAddr string) (func(), FilteredChainView, error) {