Merge pull request #1278 from wpaulino/persistent-chain-notifiers

chainntnfs: introduce persistent height hint layer to ChainNotifier implementations
This commit is contained in:
Olaoluwa Osuntokun 2018-08-22 21:02:27 -07:00 committed by GitHub
commit c9bead7c21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1410 additions and 225 deletions

@ -76,6 +76,16 @@ type BitcoindNotifier struct {
bestBlock chainntnfs.BlockEpoch
// spendHintCache is a cache used to query and update the latest height
// hints for an outpoint. Each height hint represents the earliest
// height at which the outpoint could have been spent within the chain.
spendHintCache chainntnfs.SpendHintCache
// confirmHintCache is a cache used to query the latest height hints for
// a transaction. Each height hint represents the earliest height at
// which the transaction could have confirmed within the chain.
confirmHintCache chainntnfs.ConfirmHintCache
wg sync.WaitGroup
quit chan struct{}
}
@ -87,7 +97,9 @@ var _ chainntnfs.ChainNotifier = (*BitcoindNotifier)(nil)
// New returns a new BitcoindNotifier instance. This function assumes the
// bitcoind node detailed in the passed configuration is already running, and
// willing to accept RPC requests and new zmq clients.
func New(chainConn *chain.BitcoindConn) *BitcoindNotifier {
func New(chainConn *chain.BitcoindConn, spendHintCache chainntnfs.SpendHintCache,
confirmHintCache chainntnfs.ConfirmHintCache) *BitcoindNotifier {
notifier := &BitcoindNotifier{
notificationCancels: make(chan interface{}),
notificationRegistry: make(chan interface{}),
@ -96,6 +108,9 @@ func New(chainConn *chain.BitcoindConn) *BitcoindNotifier {
spendNotifications: make(map[wire.OutPoint]map[uint64]*spendNotification),
spendHintCache: spendHintCache,
confirmHintCache: confirmHintCache,
quit: make(chan struct{}),
}
@ -127,7 +142,8 @@ func (b *BitcoindNotifier) Start() error {
}
b.txConfNotifier = chainntnfs.NewTxConfNotifier(
uint32(currentHeight), reorgSafetyLimit)
uint32(currentHeight), reorgSafetyLimit, b.confirmHintCache,
)
b.bestBlock = chainntnfs.BlockEpoch{
Height: currentHeight,
@ -571,9 +587,6 @@ func (b *BitcoindNotifier) handleBlockConnected(block chainntnfs.BlockEpoch) err
return fmt.Errorf("unable to get block: %v", err)
}
chainntnfs.Log.Infof("New block: height=%v, sha=%v",
block.Height, block.Hash)
txns := btcutil.NewBlock(rawBlock).Transactions()
err = b.txConfNotifier.ConnectTip(
block.Hash, uint32(block.Height), txns)
@ -581,6 +594,9 @@ func (b *BitcoindNotifier) handleBlockConnected(block chainntnfs.BlockEpoch) err
return fmt.Errorf("unable to connect tip: %v", err)
}
chainntnfs.Log.Infof("New block: height=%v, sha=%v", block.Height,
block.Hash)
// We want to set the best block before dispatching notifications so
// if any subscribers make queries based on their received block epoch,
// our state is fully updated in time.
@ -588,6 +604,26 @@ func (b *BitcoindNotifier) handleBlockConnected(block chainntnfs.BlockEpoch) err
b.notifyBlockEpochs(block.Height, block.Hash)
// Finally, we'll update the spend height hint for all of our watched
// outpoints that have not been spent yet. This is safe to do as we do
// not watch already spent outpoints for spend notifications.
ops := make([]wire.OutPoint, 0, len(b.spendNotifications))
for op := range b.spendNotifications {
ops = append(ops, op)
}
if len(ops) > 0 {
err := b.spendHintCache.CommitSpendHint(
uint32(block.Height), ops...,
)
if err != nil {
// The error is not fatal, so we should not return an
// error to the caller.
chainntnfs.Log.Errorf("Unable to update spend hint to "+
"%d for %v: %v", block.Height, ops, err)
}
}
return nil
}
@ -646,6 +682,18 @@ type spendCancel struct {
func (b *BitcoindNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
pkScript []byte, heightHint uint32) (*chainntnfs.SpendEvent, error) {
// Before proceeding to register the notification, we'll query our
// height hint cache to determine whether a better one exists.
if hint, err := b.spendHintCache.QuerySpendHint(*outpoint); err == nil {
if hint > heightHint {
chainntnfs.Log.Debugf("Using height hint %d retrieved "+
"from cache for %v", hint, outpoint)
heightHint = hint
}
}
// Construct a notification request for the outpoint and send it to the
// main event loop.
ntfn := &spendNotification{
targetOutpoint: outpoint,
spendChan: make(chan *chainntnfs.SpendDetail, 1),
@ -672,7 +720,20 @@ func (b *BitcoindNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
return nil, err
}
if txOut == nil {
// If the output is unspent, then we'll write it to the cache with the
// given height hint. This allows us to increase the height hint as the
// chain extends and the output remains unspent.
if txOut != nil {
err := b.spendHintCache.CommitSpendHint(heightHint, *outpoint)
if err != nil {
// The error is not fatal, so we should not return an
// error to the caller.
chainntnfs.Log.Error("Unable to update spend hint to "+
"%d for %v: %v", heightHint, *outpoint, err)
}
} else {
// Otherwise, we'll determine when the output was spent.
//
// First, we'll attempt to retrieve the transaction's block hash
// using the backend's transaction index.
tx, err := b.chainConn.GetRawTransactionVerbose(&outpoint.Hash)
@ -702,7 +763,9 @@ func (b *BitcoindNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
int64(heightHint),
)
if err != nil {
return nil, err
return nil, fmt.Errorf("unable to retrieve "+
"hash for block with height %d: %v",
heightHint, err)
}
}
@ -794,11 +857,13 @@ func (b *BitcoindNotifier) dispatchSpendDetailsManually(op wire.OutPoint,
blockHash, err := b.chainConn.GetBlockHash(int64(height))
if err != nil {
return err
return fmt.Errorf("unable to retrieve hash for block "+
"with height %d: %v", height, err)
}
block, err := b.chainConn.GetBlock(blockHash)
if err != nil {
return err
return fmt.Errorf("unable to retrieve block with hash "+
"%v: %v", blockHash, err)
}
for _, tx := range block.Transactions {
@ -853,6 +918,18 @@ type confirmationNotification struct {
func (b *BitcoindNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
_ []byte, numConfs, heightHint uint32) (*chainntnfs.ConfirmationEvent, error) {
// Before proceeding to register the notification, we'll query our
// height hint cache to determine whether a better one exists.
if hint, err := b.confirmHintCache.QueryConfirmHint(*txid); err == nil {
if hint > heightHint {
chainntnfs.Log.Debugf("Using height hint %d retrieved "+
"from cache for %v", hint, txid)
heightHint = hint
}
}
// Construct a notification request for the transaction and send it to
// the main event loop.
ntfn := &confirmationNotification{
ConfNtfn: chainntnfs.ConfNtfn{
ConfID: atomic.AddUint64(&b.confClientCounter, 1),

@ -29,7 +29,8 @@ func (b *BitcoindNotifier) UnsafeStart(bestHeight int32, bestHash *chainhash.Has
}
b.txConfNotifier = chainntnfs.NewTxConfNotifier(
uint32(bestHeight), reorgSafetyLimit)
uint32(bestHeight), reorgSafetyLimit, b.confirmHintCache,
)
if generateBlocks != nil {
// Ensure no block notifications are pending when we start the

@ -1,6 +1,7 @@
package bitcoindnotify
import (
"errors"
"fmt"
"github.com/btcsuite/btcwallet/chain"
@ -10,18 +11,30 @@ import (
// createNewNotifier creates a new instance of the ChainNotifier interface
// implemented by BitcoindNotifier.
func createNewNotifier(args ...interface{}) (chainntnfs.ChainNotifier, error) {
if len(args) != 1 {
if len(args) != 3 {
return nil, fmt.Errorf("incorrect number of arguments to "+
".New(...), expected 1, instead passed %v", len(args))
".New(...), expected 2, instead passed %v", len(args))
}
chainConn, ok := args[0].(*chain.BitcoindConn)
if !ok {
return nil, fmt.Errorf("first argument to bitcoindnotify.New " +
return nil, errors.New("first argument to bitcoindnotify.New " +
"is incorrect, expected a *chain.BitcoindConn")
}
return New(chainConn), nil
spendHintCache, ok := args[1].(chainntnfs.SpendHintCache)
if !ok {
return nil, errors.New("second argument to bitcoindnotify.New " +
"is incorrect, expected a chainntnfs.SpendHintCache")
}
confirmHintCache, ok := args[2].(chainntnfs.ConfirmHintCache)
if !ok {
return nil, errors.New("third argument to bitcoindnotify.New " +
"is incorrect, expected a chainntnfs.ConfirmHintCache")
}
return New(chainConn, spendHintCache, confirmHintCache), nil
}
// init registers a driver for the BtcdNotifier concrete implementation of the

@ -83,6 +83,16 @@ type BtcdNotifier struct {
chainUpdates *chainntnfs.ConcurrentQueue
txUpdates *chainntnfs.ConcurrentQueue
// spendHintCache is a cache used to query and update the latest height
// hints for an outpoint. Each height hint represents the earliest
// height at which the outpoint could have been spent within the chain.
spendHintCache chainntnfs.SpendHintCache
// confirmHintCache is a cache used to query the latest height hints for
// a transaction. Each height hint represents the earliest height at
// which the transaction could have confirmed within the chain.
confirmHintCache chainntnfs.ConfirmHintCache
wg sync.WaitGroup
quit chan struct{}
}
@ -93,7 +103,9 @@ var _ chainntnfs.ChainNotifier = (*BtcdNotifier)(nil)
// New returns a new BtcdNotifier instance. This function assumes the btcd node
// detailed in the passed configuration is already running, and willing to
// accept new websockets clients.
func New(config *rpcclient.ConnConfig) (*BtcdNotifier, error) {
func New(config *rpcclient.ConnConfig, spendHintCache chainntnfs.SpendHintCache,
confirmHintCache chainntnfs.ConfirmHintCache) (*BtcdNotifier, error) {
notifier := &BtcdNotifier{
notificationCancels: make(chan interface{}),
notificationRegistry: make(chan interface{}),
@ -105,6 +117,9 @@ func New(config *rpcclient.ConnConfig) (*BtcdNotifier, error) {
chainUpdates: chainntnfs.NewConcurrentQueue(10),
txUpdates: chainntnfs.NewConcurrentQueue(10),
spendHintCache: spendHintCache,
confirmHintCache: confirmHintCache,
quit: make(chan struct{}),
}
@ -150,7 +165,8 @@ func (b *BtcdNotifier) Start() error {
}
b.txConfNotifier = chainntnfs.NewTxConfNotifier(
uint32(currentHeight), reorgSafetyLimit)
uint32(currentHeight), reorgSafetyLimit, b.confirmHintCache,
)
b.bestBlock = chainntnfs.BlockEpoch{
Height: currentHeight,
@ -646,23 +662,23 @@ func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error {
return fmt.Errorf("unable to get block: %v", err)
}
chainntnfs.Log.Infof("New block: height=%v, sha=%v",
epoch.Height, epoch.Hash)
txns := btcutil.NewBlock(rawBlock).Transactions()
newBlock := &filteredBlock{
hash: *epoch.Hash,
height: uint32(epoch.Height),
txns: txns,
txns: btcutil.NewBlock(rawBlock).Transactions(),
connect: true,
}
err = b.txConfNotifier.ConnectTip(&newBlock.hash, newBlock.height,
newBlock.txns)
err = b.txConfNotifier.ConnectTip(
&newBlock.hash, newBlock.height, newBlock.txns,
)
if err != nil {
return fmt.Errorf("unable to connect tip: %v", err)
}
chainntnfs.Log.Infof("New block: height=%v, sha=%v", epoch.Height,
epoch.Hash)
// We want to set the best block before dispatching notifications
// so if any subscribers make queries based on their received
// block epoch, our state is fully updated in time.
@ -671,8 +687,8 @@ func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error {
// Next we'll notify any subscribed clients of the block.
b.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash)
// Finally, we'll scan over the list of relevant transactions and
// possibly dispatch notifications for confirmations and spends.
// Scan over the list of relevant transactions and possibly dispatch
// notifications for spends.
for _, tx := range newBlock.txns {
mtx := tx.MsgTx()
txSha := mtx.TxHash()
@ -698,8 +714,10 @@ func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error {
}
for _, ntfn := range clients {
chainntnfs.Log.Infof("Dispatching spend notification for "+
"outpoint=%v", ntfn.targetOutpoint)
chainntnfs.Log.Infof("Dispatching spend "+
"notification for outpoint=%v",
ntfn.targetOutpoint)
ntfn.spendChan <- spendDetails
// Close spendChan to ensure that any calls to
@ -713,6 +731,26 @@ func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error {
}
}
// Finally, we'll update the spend height hint for all of our watched
// outpoints that have not been spent yet. This is safe to do as we do
// not watch already spent outpoints for spend notifications.
ops := make([]wire.OutPoint, 0, len(b.spendNotifications))
for op := range b.spendNotifications {
ops = append(ops, op)
}
if len(ops) > 0 {
err := b.spendHintCache.CommitSpendHint(
uint32(epoch.Height), ops...,
)
if err != nil {
// The error is not fatal, so we should not return an
// error to the caller.
chainntnfs.Log.Errorf("Unable to update spend hint to "+
"%d for %v: %v", epoch.Height, ops, err)
}
}
return nil
}
@ -771,6 +809,18 @@ type spendCancel struct {
func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
pkScript []byte, heightHint uint32) (*chainntnfs.SpendEvent, error) {
// Before proceeding to register the notification, we'll query our
// height hint cache to determine whether a better one exists.
if hint, err := b.spendHintCache.QuerySpendHint(*outpoint); err == nil {
if hint > heightHint {
chainntnfs.Log.Debugf("Using height hint %d retrieved "+
"from cache for %v", hint, outpoint)
heightHint = hint
}
}
// Construct a notification request for the outpoint and send it to the
// main event loop.
ntfn := &spendNotification{
targetOutpoint: outpoint,
spendChan: make(chan *chainntnfs.SpendDetail, 1),
@ -799,7 +849,20 @@ func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
return nil, err
}
if txOut == nil {
// If the output is unspent, then we'll write it to the cache with the
// given height hint. This allows us to increase the height hint as the
// chain extends and the output remains unspent.
if txOut != nil {
err := b.spendHintCache.CommitSpendHint(heightHint, *outpoint)
if err != nil {
// The error is not fatal, so we should not return an
// error to the caller.
chainntnfs.Log.Error("Unable to update spend hint to "+
"%d for %v: %v", heightHint, *outpoint, err)
}
} else {
// Otherwise, we'll determine when the output was spent.
//
// First, we'll attempt to retrieve the transaction's block hash
// using the backend's transaction index.
tx, err := b.chainConn.GetRawTransactionVerbose(&outpoint.Hash)
@ -900,6 +963,18 @@ type confirmationNotification struct {
func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, _ []byte,
numConfs, heightHint uint32) (*chainntnfs.ConfirmationEvent, error) {
// Before proceeding to register the notification, we'll query our
// height hint cache to determine whether a better one exists.
if hint, err := b.confirmHintCache.QueryConfirmHint(*txid); err == nil {
if hint > heightHint {
chainntnfs.Log.Debugf("Using height hint %d retrieved "+
"from cache for %v", hint, txid)
heightHint = hint
}
}
// Construct a notification request for the transaction and send it to
// the main event loop.
ntfn := &confirmationNotification{
ConfNtfn: chainntnfs.ConfNtfn{
ConfID: atomic.AddUint64(&b.confClientCounter, 1),

@ -28,7 +28,8 @@ func (b *BtcdNotifier) UnsafeStart(bestHeight int32, bestHash *chainhash.Hash,
}
b.txConfNotifier = chainntnfs.NewTxConfNotifier(
uint32(bestHeight), reorgSafetyLimit)
uint32(bestHeight), reorgSafetyLimit, b.confirmHintCache,
)
b.chainUpdates.Start()
b.txUpdates.Start()

@ -1,6 +1,7 @@
package btcdnotify
import (
"errors"
"fmt"
"github.com/btcsuite/btcd/rpcclient"
@ -10,18 +11,30 @@ import (
// createNewNotifier creates a new instance of the ChainNotifier interface
// implemented by BtcdNotifier.
func createNewNotifier(args ...interface{}) (chainntnfs.ChainNotifier, error) {
if len(args) != 1 {
return nil, fmt.Errorf("incorrect number of arguments to .New(...), "+
"expected 1, instead passed %v", len(args))
if len(args) != 3 {
return nil, fmt.Errorf("incorrect number of arguments to "+
".New(...), expected 2, instead passed %v", len(args))
}
config, ok := args[0].(*rpcclient.ConnConfig)
if !ok {
return nil, fmt.Errorf("first argument to btcdnotifier.New is " +
"incorrect, expected a *rpcclient.ConnConfig")
return nil, errors.New("first argument to btcdnotifier.New " +
"is incorrect, expected a *rpcclient.ConnConfig")
}
return New(config)
spendHintCache, ok := args[1].(chainntnfs.SpendHintCache)
if !ok {
return nil, errors.New("second argument to btcdnotifier.New " +
"is incorrect, expected a chainntnfs.SpendHintCache")
}
confirmHintCache, ok := args[2].(chainntnfs.ConfirmHintCache)
if !ok {
return nil, errors.New("third argument to btcdnotifier.New " +
"is incorrect, expected a chainntnfs.ConfirmHintCache")
}
return New(config, spendHintCache, confirmHintCache)
}
// init registers a driver for the BtcdNotifier concrete implementation of the

@ -0,0 +1,297 @@
package chainntnfs
import (
"bytes"
"errors"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
bolt "github.com/coreos/bbolt"
"github.com/lightningnetwork/lnd/channeldb"
)
const (
// dbName is the default name of the database storing the height hints.
dbName = "heighthint.db"
// dbFilePermission is the default permission of the database file
// storing the height hints.
dbFilePermission = 0600
)
var (
// spendHintBucket is the name of the bucket which houses the height
// hint for outpoints. Each height hint represents the earliest height
// at which its corresponding outpoint could have been spent within.
spendHintBucket = []byte("spend-hints")
// confirmHintBucket is the name of the bucket which houses the height
// hints for transactions. Each height hint represents the earliest
// height at which its corresponding transaction could have been
// confirmed within.
confirmHintBucket = []byte("confirm-hints")
// ErrCorruptedHeightHintCache indicates that the on-disk bucketing
// structure has altered since the height hint cache instance was
// initialized.
ErrCorruptedHeightHintCache = errors.New("height hint cache has been " +
"corrupted")
// ErrSpendHintNotFound is an error returned when a spend hint for an
// outpoint was not found.
ErrSpendHintNotFound = errors.New("spend hint not found")
// ErrConfirmHintNotFound is an error returned when a confirm hint for a
// transaction was not found.
ErrConfirmHintNotFound = errors.New("confirm hint not found")
)
// SpendHintCache is an interface whose duty is to cache spend hints for
// outpoints. A spend hint is defined as the earliest height in the chain at
// which an outpoint could have been spent within.
type SpendHintCache interface {
// CommitSpendHint commits a spend hint for the outpoints to the cache.
CommitSpendHint(height uint32, ops ...wire.OutPoint) error
// QuerySpendHint returns the latest spend hint for an outpoint.
// ErrSpendHintNotFound is returned if a spend hint does not exist
// within the cache for the outpoint.
QuerySpendHint(op wire.OutPoint) (uint32, error)
// PurgeSpendHint removes the spend hint for the outpoints from the
// cache.
PurgeSpendHint(ops ...wire.OutPoint) error
}
// ConfirmHintCache is an interface whose duty is to cache confirm hints for
// transactions. A confirm hint is defined as the earliest height in the chain
// at which a transaction could have been included in a block.
type ConfirmHintCache interface {
// CommitConfirmHint commits a confirm hint for the transactions to the
// cache.
CommitConfirmHint(height uint32, txids ...chainhash.Hash) error
// QueryConfirmHint returns the latest confirm hint for a transaction
// hash. ErrConfirmHintNotFound is returned if a confirm hint does not
// exist within the cache for the transaction hash.
QueryConfirmHint(txid chainhash.Hash) (uint32, error)
// PurgeConfirmHint removes the confirm hint for the transactions from
// the cache.
PurgeConfirmHint(txids ...chainhash.Hash) error
}
// HeightHintCache is an implementation of the SpendHintCache and
// ConfirmHintCache interfaces backed by a channeldb DB instance where the hints
// will be stored.
type HeightHintCache struct {
db *channeldb.DB
}
// Compile-time checks to ensure HeightHintCache satisfies the SpendHintCache
// and ConfirmHintCache interfaces.
var _ SpendHintCache = (*HeightHintCache)(nil)
var _ ConfirmHintCache = (*HeightHintCache)(nil)
// NewHeightHintCache returns a new height hint cache backed by a database.
func NewHeightHintCache(db *channeldb.DB) (*HeightHintCache, error) {
cache := &HeightHintCache{db}
if err := cache.initBuckets(); err != nil {
return nil, err
}
return cache, nil
}
// initBuckets ensures that the primary buckets used by the circuit are
// initialized so that we can assume their existence after startup.
func (c *HeightHintCache) initBuckets() error {
return c.db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(spendHintBucket)
if err != nil {
return err
}
_, err = tx.CreateBucketIfNotExists(confirmHintBucket)
return err
})
}
// CommitSpendHint commits a spend hint for the outpoints to the cache.
func (c *HeightHintCache) CommitSpendHint(height uint32, ops ...wire.OutPoint) error {
Log.Tracef("Updating spend hint to height %d for %v", height, ops)
return c.db.Batch(func(tx *bolt.Tx) error {
spendHints := tx.Bucket(spendHintBucket)
if spendHints == nil {
return ErrCorruptedHeightHintCache
}
var hint bytes.Buffer
if err := channeldb.WriteElement(&hint, height); err != nil {
return err
}
for _, op := range ops {
var outpoint bytes.Buffer
err := channeldb.WriteElement(&outpoint, op)
if err != nil {
return err
}
err = spendHints.Put(outpoint.Bytes(), hint.Bytes())
if err != nil {
return err
}
}
return nil
})
}
// QuerySpendHint returns the latest spend hint for an outpoint.
// ErrSpendHintNotFound is returned if a spend hint does not exist within the
// cache for the outpoint.
func (c *HeightHintCache) QuerySpendHint(op wire.OutPoint) (uint32, error) {
var hint uint32
err := c.db.View(func(tx *bolt.Tx) error {
spendHints := tx.Bucket(spendHintBucket)
if spendHints == nil {
return ErrCorruptedHeightHintCache
}
var outpoint bytes.Buffer
if err := channeldb.WriteElement(&outpoint, op); err != nil {
return err
}
spendHint := spendHints.Get(outpoint.Bytes())
if spendHint == nil {
return ErrSpendHintNotFound
}
return channeldb.ReadElement(bytes.NewReader(spendHint), &hint)
})
if err != nil {
return 0, err
}
return hint, nil
}
// PurgeSpendHint removes the spend hint for the outpoints from the cache.
func (c *HeightHintCache) PurgeSpendHint(ops ...wire.OutPoint) error {
Log.Tracef("Removing spend hints for %v", ops)
return c.db.Batch(func(tx *bolt.Tx) error {
spendHints := tx.Bucket(spendHintBucket)
if spendHints == nil {
return ErrCorruptedHeightHintCache
}
for _, op := range ops {
var outpoint bytes.Buffer
err := channeldb.WriteElement(&outpoint, op)
if err != nil {
return err
}
err = spendHints.Delete(outpoint.Bytes())
if err != nil {
return err
}
}
return nil
})
}
// CommitConfirmHint commits a confirm hint for the transactions to the cache.
func (c *HeightHintCache) CommitConfirmHint(height uint32, txids ...chainhash.Hash) error {
Log.Tracef("Updating confirm hints to height %d for %v", height, txids)
return c.db.Batch(func(tx *bolt.Tx) error {
confirmHints := tx.Bucket(confirmHintBucket)
if confirmHints == nil {
return ErrCorruptedHeightHintCache
}
var hint bytes.Buffer
if err := channeldb.WriteElement(&hint, height); err != nil {
return err
}
for _, txid := range txids {
var txHash bytes.Buffer
err := channeldb.WriteElement(&txHash, txid)
if err != nil {
return err
}
err = confirmHints.Put(txHash.Bytes(), hint.Bytes())
if err != nil {
return err
}
}
return nil
})
}
// QueryConfirmHint returns the latest confirm hint for a transaction hash.
// ErrConfirmHintNotFound is returned if a confirm hint does not exist within
// the cache for the transaction hash.
func (c *HeightHintCache) QueryConfirmHint(txid chainhash.Hash) (uint32, error) {
var hint uint32
err := c.db.View(func(tx *bolt.Tx) error {
confirmHints := tx.Bucket(confirmHintBucket)
if confirmHints == nil {
return ErrCorruptedHeightHintCache
}
var txHash bytes.Buffer
if err := channeldb.WriteElement(&txHash, txid); err != nil {
return err
}
confirmHint := confirmHints.Get(txHash.Bytes())
if confirmHint == nil {
return ErrConfirmHintNotFound
}
return channeldb.ReadElement(bytes.NewReader(confirmHint), &hint)
})
if err != nil {
return 0, err
}
return hint, nil
}
// PurgeConfirmHint removes the confirm hint for the transactions from the
// cache.
func (c *HeightHintCache) PurgeConfirmHint(txids ...chainhash.Hash) error {
Log.Tracef("Removing confirm hints for %v", txids)
return c.db.Batch(func(tx *bolt.Tx) error {
confirmHints := tx.Bucket(confirmHintBucket)
if confirmHints == nil {
return ErrCorruptedHeightHintCache
}
for _, txid := range txids {
var txHash bytes.Buffer
err := channeldb.WriteElement(&txHash, txid)
if err != nil {
return err
}
err = confirmHints.Delete(txHash.Bytes())
if err != nil {
return err
}
}
return nil
})
}

@ -0,0 +1,148 @@
package chainntnfs
import (
"bytes"
"io/ioutil"
"testing"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/channeldb"
)
func initHintCache(t *testing.T) *HeightHintCache {
t.Helper()
tempDir, err := ioutil.TempDir("", "kek")
if err != nil {
t.Fatalf("unable to create temp dir: %v", err)
}
db, err := channeldb.Open(tempDir)
if err != nil {
t.Fatalf("unable to create db: %v", err)
}
hintCache, err := NewHeightHintCache(db)
if err != nil {
t.Fatalf("unable to create hint cache: %v", err)
}
return hintCache
}
// TestHeightHintCacheConfirms ensures that the height hint cache properly
// caches confirm hints for transactions.
func TestHeightHintCacheConfirms(t *testing.T) {
t.Parallel()
hintCache := initHintCache(t)
// Querying for a transaction hash not found within the cache should
// return an error indication so.
var unknownHash chainhash.Hash
_, err := hintCache.QueryConfirmHint(unknownHash)
if err != ErrConfirmHintNotFound {
t.Fatalf("expected ErrConfirmHintNotFound, got: %v", err)
}
// Now, we'll create some transaction hashes and commit them to the
// cache with the same confirm hint.
const height = 100
const numHashes = 5
txHashes := make([]chainhash.Hash, numHashes)
for i := 0; i < numHashes; i++ {
var txHash chainhash.Hash
copy(txHash[:], bytes.Repeat([]byte{byte(i)}, 32))
txHashes[i] = txHash
}
if err := hintCache.CommitConfirmHint(height, txHashes...); err != nil {
t.Fatalf("unable to add entries to cache: %v", err)
}
// With the hashes committed, we'll now query the cache to ensure that
// we're able to properly retrieve the confirm hints.
for _, txHash := range txHashes {
confirmHint, err := hintCache.QueryConfirmHint(txHash)
if err != nil {
t.Fatalf("unable to query for hint: %v", err)
}
if confirmHint != height {
t.Fatalf("expected confirm hint %d, got %d", height,
confirmHint)
}
}
// We'll also attempt to purge all of them in a single database
// transaction.
if err := hintCache.PurgeConfirmHint(txHashes...); err != nil {
t.Fatalf("unable to remove confirm hints: %v", err)
}
// Finally, we'll attempt to query for each hash. We should expect not
// to find a hint for any of them.
for _, txHash := range txHashes {
_, err := hintCache.QueryConfirmHint(txHash)
if err != ErrConfirmHintNotFound {
t.Fatalf("expected ErrConfirmHintNotFound, got :%v", err)
}
}
}
// TestHeightHintCacheSpends ensures that the height hint cache properly caches
// spend hints for outpoints.
func TestHeightHintCacheSpends(t *testing.T) {
t.Parallel()
hintCache := initHintCache(t)
// Querying for an outpoint not found within the cache should return an
// error indication so.
var unknownOutPoint wire.OutPoint
_, err := hintCache.QuerySpendHint(unknownOutPoint)
if err != ErrSpendHintNotFound {
t.Fatalf("expected ErrSpendHintNotFound, got: %v", err)
}
// Now, we'll create some outpoints and commit them to the cache with
// the same spend hint.
const height = 100
const numOutpoints = 5
var txHash chainhash.Hash
copy(txHash[:], bytes.Repeat([]byte{0xFF}, 32))
outpoints := make([]wire.OutPoint, numOutpoints)
for i := uint32(0); i < numOutpoints; i++ {
outpoints[i] = wire.OutPoint{Hash: txHash, Index: i}
}
if err := hintCache.CommitSpendHint(height, outpoints...); err != nil {
t.Fatalf("unable to add entry to cache: %v", err)
}
// With the outpoints committed, we'll now query the cache to ensure
// that we're able to properly retrieve the confirm hints.
for _, op := range outpoints {
spendHint, err := hintCache.QuerySpendHint(op)
if err != nil {
t.Fatalf("unable to query for hint: %v", err)
}
if spendHint != height {
t.Fatalf("expected spend hint %d, got %d", height,
spendHint)
}
}
// We'll also attempt to purge all of them in a single database
// transaction.
if err := hintCache.PurgeSpendHint(outpoints...); err != nil {
t.Fatalf("unable to remove spend hint: %v", err)
}
// Finally, we'll attempt to query for each outpoint. We should expect
// not to find a hint for any of them.
for _, op := range outpoints {
_, err = hintCache.QuerySpendHint(op)
if err != ErrSpendHintNotFound {
t.Fatalf("expected ErrSpendHintNotFound, got: %v", err)
}
}
}

@ -26,6 +26,7 @@ import (
"github.com/btcsuite/btcwallet/walletdb"
"github.com/lightninglabs/neutrino"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
// Required to auto-register the bitcoind backed ChainNotifier
// implementation.
@ -1741,10 +1742,22 @@ func TestInterfaces(t *testing.T) {
newNotifier func() (chainntnfs.TestChainNotifier, error)
)
for _, notifierDriver := range chainntnfs.RegisteredNotifiers() {
// Initialize a height hint cache for each notifier.
tempDir, err := ioutil.TempDir("", "channeldb")
if err != nil {
t.Fatalf("unable to create temp dir: %v", err)
}
db, err := channeldb.Open(tempDir)
if err != nil {
t.Fatalf("unable to create db: %v", err)
}
hintCache, err := chainntnfs.NewHeightHintCache(db)
if err != nil {
t.Fatalf("unable to create height hint cache: %v", err)
}
notifierType := notifierDriver.NotifierType
switch notifierType {
case "bitcoind":
// Start a bitcoind instance.
tempBitcoindDir, err := ioutil.TempDir("", "bitcoind")
@ -1807,12 +1820,16 @@ func TestInterfaces(t *testing.T) {
cleanUp = cleanUp3
newNotifier = func() (chainntnfs.TestChainNotifier, error) {
return bitcoindnotify.New(chainConn), nil
return bitcoindnotify.New(
chainConn, hintCache, hintCache,
), nil
}
case "btcd":
newNotifier = func() (chainntnfs.TestChainNotifier, error) {
return btcdnotify.New(&rpcConfig)
return btcdnotify.New(
&rpcConfig, hintCache, hintCache,
)
}
cleanUp = func() {}
@ -1855,7 +1872,9 @@ func TestInterfaces(t *testing.T) {
time.Sleep(time.Millisecond * 100)
}
newNotifier = func() (chainntnfs.TestChainNotifier, error) {
return neutrinonotify.New(spvNode)
return neutrinonotify.New(
spvNode, hintCache, hintCache,
)
}
}

@ -1,6 +1,7 @@
package neutrinonotify
import (
"errors"
"fmt"
"github.com/lightninglabs/neutrino"
@ -10,18 +11,30 @@ import (
// createNewNotifier creates a new instance of the ChainNotifier interface
// implemented by NeutrinoNotifier.
func createNewNotifier(args ...interface{}) (chainntnfs.ChainNotifier, error) {
if len(args) != 1 {
return nil, fmt.Errorf("incorrect number of arguments to .New(...), "+
"expected 1, instead passed %v", len(args))
if len(args) != 2 {
return nil, fmt.Errorf("incorrect number of arguments to "+
".New(...), expected 2, instead passed %v", len(args))
}
config, ok := args[0].(*neutrino.ChainService)
if !ok {
return nil, fmt.Errorf("first argument to neutrinonotify.New is " +
"incorrect, expected a *neutrino.ChainService")
return nil, errors.New("first argument to neutrinonotify.New " +
"is incorrect, expected a *neutrino.ChainService")
}
return New(config)
spendHintCache, ok := args[1].(chainntnfs.SpendHintCache)
if !ok {
return nil, errors.New("second argument to neutrinonotify.New " +
"is incorrect, expected a chainntfs.SpendHintCache")
}
confirmHintCache, ok := args[2].(chainntnfs.ConfirmHintCache)
if !ok {
return nil, errors.New("third argument to neutrinonotify.New " +
"is incorrect, expected a chainntfs.ConfirmHintCache")
}
return New(config, spendHintCache, confirmHintCache)
}
// init registers a driver for the NeutrinoNotify concrete implementation of

@ -77,6 +77,16 @@ type NeutrinoNotifier struct {
chainUpdates *chainntnfs.ConcurrentQueue
// spendHintCache is a cache used to query and update the latest height
// hints for an outpoint. Each height hint represents the earliest
// height at which the outpoint could have been spent within the chain.
spendHintCache chainntnfs.SpendHintCache
// confirmHintCache is a cache used to query the latest height hints for
// a transaction. Each height hint represents the earliest height at
// which the transaction could have confirmed within the chain.
confirmHintCache chainntnfs.ConfirmHintCache
wg sync.WaitGroup
quit chan struct{}
}
@ -89,7 +99,9 @@ var _ chainntnfs.ChainNotifier = (*NeutrinoNotifier)(nil)
//
// NOTE: The passed neutrino node should already be running and active before
// being passed into this function.
func New(node *neutrino.ChainService) (*NeutrinoNotifier, error) {
func New(node *neutrino.ChainService, spendHintCache chainntnfs.SpendHintCache,
confirmHintCache chainntnfs.ConfirmHintCache) (*NeutrinoNotifier, error) {
notifier := &NeutrinoNotifier{
notificationCancels: make(chan interface{}),
notificationRegistry: make(chan interface{}),
@ -104,6 +116,9 @@ func New(node *neutrino.ChainService) (*NeutrinoNotifier, error) {
chainUpdates: chainntnfs.NewConcurrentQueue(10),
spendHintCache: spendHintCache,
confirmHintCache: confirmHintCache,
quit: make(chan struct{}),
}
@ -150,7 +165,7 @@ func (n *NeutrinoNotifier) Start() error {
}
n.txConfNotifier = chainntnfs.NewTxConfNotifier(
bestHeight, reorgSafetyLimit,
bestHeight, reorgSafetyLimit, n.confirmHintCache,
)
n.chainConn = &NeutrinoChainConn{n.p2pNode}
@ -358,6 +373,7 @@ out:
rescanUpdate := []neutrino.UpdateOption{
neutrino.AddAddrs(addrs...),
neutrino.Rewind(currentHeight),
neutrino.DisableDisconnectedNtfns(true),
}
err = n.chainView.Update(rescanUpdate...)
if err != nil {
@ -588,22 +604,23 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error {
// First process the block for our internal state. A new block has
// been connected to the main chain. Send out any N confirmation
// notifications which may have been triggered by this new block.
err := n.txConfNotifier.ConnectTip(&newBlock.hash, newBlock.height,
newBlock.txns)
err := n.txConfNotifier.ConnectTip(
&newBlock.hash, newBlock.height, newBlock.txns,
)
if err != nil {
return fmt.Errorf("unable to connect tip: %v", err)
}
chainntnfs.Log.Infof("New block: height=%v, sha=%v",
newBlock.height, newBlock.hash)
chainntnfs.Log.Infof("New block: height=%v, sha=%v", newBlock.height,
newBlock.hash)
n.bestHeight = newBlock.height
// Next, notify any subscribed clients of the block.
n.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash)
// Finally, we'll scan over the list of relevant transactions and
// possibly dispatch notifications for confirmations and spends.
// Scan over the list of relevant transactions and possibly dispatch
// notifications for spends.
for _, tx := range newBlock.txns {
mtx := tx.MsgTx()
txSha := mtx.TxHash()
@ -646,6 +663,24 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error {
}
}
// Finally, we'll update the spend height hint for all of our watched
// outpoints that have not been spent yet. This is safe to do as we do
// not watch already spent outpoints for spend notifications.
ops := make([]wire.OutPoint, 0, len(n.spendNotifications))
for op := range n.spendNotifications {
ops = append(ops, op)
}
if len(ops) > 0 {
err := n.spendHintCache.CommitSpendHint(newBlock.height, ops...)
if err != nil {
// The error is not fatal, so we should not return an
// error to the caller.
chainntnfs.Log.Errorf("Unable to update spend hint to "+
"%d for %v: %v", newBlock.height, ops, err)
}
}
return nil
}
@ -725,15 +760,26 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
currentHeight := n.bestHeight
n.heightMtx.RUnlock()
chainntnfs.Log.Infof("New spend notification for outpoint=%v, "+
"height_hint=%v", outpoint, heightHint)
// Before proceeding to register the notification, we'll query our
// height hint cache to determine whether a better one exists.
if hint, err := n.spendHintCache.QuerySpendHint(*outpoint); err == nil {
if hint > heightHint {
chainntnfs.Log.Debugf("Using height hint %d retrieved "+
"from cache for %v", hint, outpoint)
heightHint = hint
}
}
// Construct a notification request for the outpoint. We'll defer
// sending it to the main event loop until after we've guaranteed that
// the outpoint has not been spent.
ntfn := &spendNotification{
targetOutpoint: outpoint,
spendChan: make(chan *chainntnfs.SpendDetail, 1),
spendID: atomic.AddUint64(&n.spendClientCounter, 1),
heightHint: heightHint,
}
spendEvent := &chainntnfs.SpendEvent{
Spend: ntfn.spendChan,
Cancel: func() {
@ -745,8 +791,9 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
// Submit spend cancellation to notification dispatcher.
select {
case n.notificationCancels <- cancel:
// Cancellation is being handled, drain the spend chan until it is
// closed before yielding to the caller.
// Cancellation is being handled, drain the
// spend chan until it is closed before yielding
// to the caller.
for {
select {
case _, ok := <-ntfn.spendChan:
@ -824,6 +871,7 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
rescanUpdate := []neutrino.UpdateOption{
neutrino.AddInputs(inputToWatch),
neutrino.Rewind(currentHeight),
neutrino.DisableDisconnectedNtfns(true),
}
if err := n.chainView.Update(rescanUpdate...); err != nil {
@ -836,6 +884,16 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
return nil, ErrChainNotifierShuttingDown
}
// Finally, we'll add a spent hint with the current height to the cache
// in order to better keep track of when this outpoint is spent.
err = n.spendHintCache.CommitSpendHint(currentHeight, *outpoint)
if err != nil {
// The error is not fatal, so we should not return an error to
// the caller.
chainntnfs.Log.Errorf("Unable to update spend hint to %d for "+
"%v: %v", currentHeight, outpoint, err)
}
return spendEvent, nil
}
@ -854,6 +912,18 @@ func (n *NeutrinoNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
pkScript []byte,
numConfs, heightHint uint32) (*chainntnfs.ConfirmationEvent, error) {
// Before proceeding to register the notification, we'll query our
// height hint cache to determine whether a better one exists.
if hint, err := n.confirmHintCache.QueryConfirmHint(*txid); err == nil {
if hint > heightHint {
chainntnfs.Log.Debugf("Using height hint %d retrieved "+
"from cache for %v", hint, txid)
heightHint = hint
}
}
// Construct a notification request for the transaction and send it to
// the main event loop.
ntfn := &confirmationsNotification{
ConfNtfn: chainntnfs.ConfNtfn{
ConfID: atomic.AddUint64(&n.confClientCounter, 1),

@ -51,7 +51,8 @@ func (n *NeutrinoNotifier) UnsafeStart(bestHeight int32, bestHash *chainhash.Has
}
n.txConfNotifier = chainntnfs.NewTxConfNotifier(
uint32(bestHeight), reorgSafetyLimit)
uint32(bestHeight), reorgSafetyLimit, n.confirmHintCache,
)
n.chainConn = &NeutrinoChainConn{n.p2pNode}

@ -89,6 +89,11 @@ type TxConfNotifier struct {
// at which the transaction will have sufficient confirmations.
ntfnsByConfirmHeight map[uint32]map[*ConfNtfn]struct{}
// hintCache is a cache used to maintain the latest height hints for
// transactions. Each height hint represents the earliest height at
// which the transactions could have been confirmed within the chain.
hintCache ConfirmHintCache
// quit is closed in order to signal that the notifier is gracefully
// exiting.
quit chan struct{}
@ -98,13 +103,16 @@ type TxConfNotifier struct {
// NewTxConfNotifier creates a TxConfNotifier. The current height of the
// blockchain is accepted as a parameter.
func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32) *TxConfNotifier {
func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32,
hintCache ConfirmHintCache) *TxConfNotifier {
return &TxConfNotifier{
currentHeight: startHeight,
reorgSafetyLimit: reorgSafetyLimit,
confNotifications: make(map[chainhash.Hash]map[uint64]*ConfNtfn),
txsByInitialHeight: make(map[uint32]map[chainhash.Hash]struct{}),
ntfnsByConfirmHeight: make(map[uint32]map[*ConfNtfn]struct{}),
hintCache: hintCache,
quit: make(chan struct{}),
}
}
@ -130,6 +138,16 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn) error {
if !ok {
ntfns = make(map[uint64]*ConfNtfn)
tcn.confNotifications[*ntfn.TxID] = ntfns
err := tcn.hintCache.CommitConfirmHint(
tcn.currentHeight, *ntfn.TxID,
)
if err != nil {
// The error is not fatal, so we should not return an
// error to the caller.
Log.Errorf("Unable to update confirm hint to %d for "+
"%v: %v", tcn.currentHeight, *ntfn.TxID, err)
}
}
ntfns[ntfn.ConfID] = ntfn
@ -175,6 +193,14 @@ func (tcn *TxConfNotifier) UpdateConfDetails(txid chainhash.Hash,
return nil
}
err := tcn.hintCache.CommitConfirmHint(details.BlockHeight, txid)
if err != nil {
// The error is not fatal, so we should not return an error to
// the caller.
Log.Errorf("Unable to update confirm hint to %d for %v: %v",
details.BlockHeight, txid, err)
}
// The notifier has yet to reach the height at which the transaction was
// included in a block, so we should defer until handling it then within
// ConnectTip.
@ -297,6 +323,48 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash,
}
}
// In order to update the height hint for all the required transactions
// under one database transaction, we'll gather the set of unconfirmed
// transactions along with the ones that confirmed at the current
// height. To do so, we'll iterate over the confNotifications map, which
// contains the transactions we currently have notifications for. Since
// this map doesn't tell us whether the transaction hsa confirmed or
// not, we'll need to look at txsByInitialHeight to determine so.
var txsToUpdateHints []chainhash.Hash
for confirmedTx := range tcn.txsByInitialHeight[tcn.currentHeight] {
txsToUpdateHints = append(txsToUpdateHints, confirmedTx)
}
out:
for maybeUnconfirmedTx := range tcn.confNotifications {
for height, confirmedTxs := range tcn.txsByInitialHeight {
// Skip the transactions that confirmed at the new block
// height as those have already been added.
if height == blockHeight {
continue
}
// If the transaction was found within the set of
// confirmed transactions at this height, we'll skip it.
if _, ok := confirmedTxs[maybeUnconfirmedTx]; ok {
continue out
}
}
txsToUpdateHints = append(txsToUpdateHints, maybeUnconfirmedTx)
}
if len(txsToUpdateHints) > 0 {
err := tcn.hintCache.CommitConfirmHint(
tcn.currentHeight, txsToUpdateHints...,
)
if err != nil {
// The error is not fatal, so we should not return an
// error to the caller.
Log.Errorf("Unable to update confirm hint to %d for "+
"%v: %v", tcn.currentHeight, txsToUpdateHints,
err)
}
}
// Next, we'll dispatch an update to all of the notification clients for
// our watched transactions with the number of confirmations left at
// this new height.
@ -447,6 +515,20 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error {
}
}
// Rewind the height hint for all watched transactions.
var txs []chainhash.Hash
for tx := range tcn.confNotifications {
txs = append(txs, tx)
}
err := tcn.hintCache.CommitConfirmHint(tcn.currentHeight, txs...)
if err != nil {
// The error is not fatal, so we should not return an error to
// the caller.
Log.Errorf("Unable to update confirm hint to %d for %v: %v",
tcn.currentHeight, txs, err)
}
// Finally, we can remove the transactions we're currently watching that
// were included in this block height.
delete(tcn.txsByInitialHeight, blockHeight)

@ -1,6 +1,7 @@
package chainntnfs_test
import (
"sync"
"testing"
"github.com/btcsuite/btcd/chaincfg/chainhash"
@ -11,6 +12,90 @@ import (
var zeroHash chainhash.Hash
type mockHintCache struct {
mu sync.Mutex
confHints map[chainhash.Hash]uint32
spendHints map[wire.OutPoint]uint32
}
var _ chainntnfs.SpendHintCache = (*mockHintCache)(nil)
var _ chainntnfs.ConfirmHintCache = (*mockHintCache)(nil)
func (c *mockHintCache) CommitSpendHint(heightHint uint32, ops ...wire.OutPoint) error {
c.mu.Lock()
defer c.mu.Unlock()
for _, op := range ops {
c.spendHints[op] = heightHint
}
return nil
}
func (c *mockHintCache) QuerySpendHint(op wire.OutPoint) (uint32, error) {
c.mu.Lock()
defer c.mu.Unlock()
hint, ok := c.spendHints[op]
if !ok {
return 0, chainntnfs.ErrSpendHintNotFound
}
return hint, nil
}
func (c *mockHintCache) PurgeSpendHint(ops ...wire.OutPoint) error {
c.mu.Lock()
defer c.mu.Unlock()
for _, op := range ops {
delete(c.spendHints, op)
}
return nil
}
func (c *mockHintCache) CommitConfirmHint(heightHint uint32, txids ...chainhash.Hash) error {
c.mu.Lock()
defer c.mu.Unlock()
for _, txid := range txids {
c.confHints[txid] = heightHint
}
return nil
}
func (c *mockHintCache) QueryConfirmHint(txid chainhash.Hash) (uint32, error) {
c.mu.Lock()
defer c.mu.Unlock()
hint, ok := c.confHints[txid]
if !ok {
return 0, chainntnfs.ErrConfirmHintNotFound
}
return hint, nil
}
func (c *mockHintCache) PurgeConfirmHint(txids ...chainhash.Hash) error {
c.mu.Lock()
defer c.mu.Unlock()
for _, txid := range txids {
delete(c.confHints, txid)
}
return nil
}
func newMockHintCache() *mockHintCache {
return &mockHintCache{
confHints: make(map[chainhash.Hash]uint32),
spendHints: make(map[wire.OutPoint]uint32),
}
}
// TestTxConfFutureDispatch tests that the TxConfNotifier dispatches
// registered notifications when the transaction confirms after registration.
func TestTxConfFutureDispatch(t *testing.T) {
@ -27,7 +112,8 @@ func TestTxConfFutureDispatch(t *testing.T) {
tx3 = wire.MsgTx{Version: 3}
)
txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100)
hintCache := newMockHintCache()
txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100, hintCache)
// Create the test transactions and register them with the
// TxConfNotifier before including them in a block to receive future
@ -200,7 +286,8 @@ func TestTxConfHistoricalDispatch(t *testing.T) {
tx3 = wire.MsgTx{Version: 3}
)
txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100)
hintCache := newMockHintCache()
txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100, hintCache)
// Create the test transactions at a height before the TxConfNotifier's
// starting height so that they are confirmed once registering them.
@ -351,7 +438,8 @@ func TestTxConfChainReorg(t *testing.T) {
tx3 = wire.MsgTx{Version: 3}
)
txConfNotifier := chainntnfs.NewTxConfNotifier(7, 100)
hintCache := newMockHintCache()
txConfNotifier := chainntnfs.NewTxConfNotifier(7, 100, hintCache)
// Tx 1 will be confirmed in block 9 and requires 2 confs.
tx1Hash := tx1.TxHash()
@ -586,6 +674,147 @@ func TestTxConfChainReorg(t *testing.T) {
}
}
// TestTxConfHeightHintCache ensures that the height hints for transactions are
// kept track of correctly with each new block connected/disconnected.
func TestTxConfHeightHintCache(t *testing.T) {
t.Parallel()
const (
startingHeight = 10
tx1Height = 11
tx2Height = 12
)
// Initialize our TxConfNotifier instance backed by a height hint cache.
hintCache := newMockHintCache()
txConfNotifier := chainntnfs.NewTxConfNotifier(
startingHeight, 100, hintCache,
)
// Create two test transactions and register them for notifications.
tx1 := wire.MsgTx{Version: 1}
tx1Hash := tx1.TxHash()
ntfn1 := &chainntnfs.ConfNtfn{
TxID: &tx1Hash,
NumConfirmations: 1,
Event: chainntnfs.NewConfirmationEvent(1),
}
tx2 := wire.MsgTx{Version: 2}
tx2Hash := tx2.TxHash()
ntfn2 := &chainntnfs.ConfNtfn{
TxID: &tx2Hash,
NumConfirmations: 2,
Event: chainntnfs.NewConfirmationEvent(2),
}
if err := txConfNotifier.Register(ntfn1); err != nil {
t.Fatalf("unable to register tx1: %v", err)
}
if err := txConfNotifier.Register(ntfn2); err != nil {
t.Fatalf("unable to register tx2: %v", err)
}
// Both transactions should have a height hint of the starting height
// due to registering notifications for them.
hint, err := hintCache.QueryConfirmHint(tx1Hash)
if err != nil {
t.Fatalf("unable to query for hint: %v", err)
}
if hint != startingHeight {
t.Fatalf("expected hint %d, got %d", startingHeight, hint)
}
hint, err = hintCache.QueryConfirmHint(tx2Hash)
if err != nil {
t.Fatalf("unable to query for hint: %v", err)
}
if hint != startingHeight {
t.Fatalf("expected hint %d, got %d", startingHeight, hint)
}
// Create a new block that will include the first transaction and extend
// the chain.
block1 := btcutil.NewBlock(&wire.MsgBlock{
Transactions: []*wire.MsgTx{&tx1},
})
err = txConfNotifier.ConnectTip(
block1.Hash(), tx1Height, block1.Transactions(),
)
if err != nil {
t.Fatalf("Failed to connect block: %v", err)
}
// The height hint for the first transaction should now be updated to
// reflect its confirmation.
hint, err = hintCache.QueryConfirmHint(tx1Hash)
if err != nil {
t.Fatalf("unable to query for hint: %v", err)
}
if hint != tx1Height {
t.Fatalf("expected hint %d, got %d", tx1Height, hint)
}
// The height hint for the second transaction should also be updated due
// to it still being unconfirmed.
hint, err = hintCache.QueryConfirmHint(tx2Hash)
if err != nil {
t.Fatalf("unable to query for hint: %v", err)
}
if hint != tx1Height {
t.Fatalf("expected hint %d, got %d", tx1Height, hint)
}
// Now, we'll create another block that will include the second
// transaction and extend the chain.
block2 := btcutil.NewBlock(&wire.MsgBlock{
Transactions: []*wire.MsgTx{&tx2},
})
err = txConfNotifier.ConnectTip(
block2.Hash(), tx2Height, block2.Transactions(),
)
if err != nil {
t.Fatalf("Failed to connect block: %v", err)
}
// The height hint for the first transaction should remain the same.
hint, err = hintCache.QueryConfirmHint(tx1Hash)
if err != nil {
t.Fatalf("unable to query for hint: %v", err)
}
if hint != tx1Height {
t.Fatalf("expected hint %d, got %d", tx1Height, hint)
}
// The height hint for the second transaction should now be updated to
// reflect its confirmation.
hint, err = hintCache.QueryConfirmHint(tx2Hash)
if err != nil {
t.Fatalf("unable to query for hint: %v", err)
}
if hint != tx2Height {
t.Fatalf("expected hint %d, got %d", tx2Height, hint)
}
// Now, we'll attempt do disconnect the last block in order to simulate
// a chain reorg.
if err := txConfNotifier.DisconnectTip(tx2Height); err != nil {
t.Fatalf("Failed to disconnect block: %v", err)
}
// This should update the second transaction's height hint within the
// cache to the previous height.
hint, err = hintCache.QueryConfirmHint(tx2Hash)
if err != nil {
t.Fatalf("unable to query for hint: %v", err)
}
if hint != tx1Height {
t.Fatalf("expected hint %d, got %d", tx1Height, hint)
}
}
func TestTxConfTearDown(t *testing.T) {
t.Parallel()
@ -594,7 +823,8 @@ func TestTxConfTearDown(t *testing.T) {
tx2 = wire.MsgTx{Version: 2}
)
txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100)
hintCache := newMockHintCache()
txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100, hintCache)
// Create the test transactions and register them with the
// TxConfNotifier to receive notifications.

@ -181,6 +181,13 @@ func newChainControlFromConfig(cfg *config, chanDB *channeldb.DB,
cleanUp func()
)
// Initialize the height hint cache within the chain directory.
hintCache, err := chainntnfs.NewHeightHintCache(chanDB)
if err != nil {
return nil, nil, fmt.Errorf("unable to initialize height hint "+
"cache: %v", err)
}
// If spv mode is active, then we'll be using a distinct set of
// chainControl interfaces that interface directly with the p2p network
// of the selected chain.
@ -245,7 +252,9 @@ func newChainControlFromConfig(cfg *config, chanDB *channeldb.DB,
// Next we'll create the instances of the ChainNotifier and
// FilteredChainView interface which is backed by the neutrino
// light client.
cc.chainNotifier, err = neutrinonotify.New(svc)
cc.chainNotifier, err = neutrinonotify.New(
svc, hintCache, hintCache,
)
if err != nil {
return nil, nil, err
}
@ -322,7 +331,9 @@ func newChainControlFromConfig(cfg *config, chanDB *channeldb.DB,
"bitcoind: %v", err)
}
cc.chainNotifier = bitcoindnotify.New(bitcoindConn)
cc.chainNotifier = bitcoindnotify.New(
bitcoindConn, hintCache, hintCache,
)
cc.chainView = chainview.NewBitcoindFilteredChainView(bitcoindConn)
walletConfig.ChainSource = bitcoindConn.NewBitcoindClient(birthday)
@ -430,7 +441,9 @@ func newChainControlFromConfig(cfg *config, chanDB *channeldb.DB,
DisableConnectOnNew: true,
DisableAutoReconnect: false,
}
cc.chainNotifier, err = btcdnotify.New(rpcConfig)
cc.chainNotifier, err = btcdnotify.New(
rpcConfig, hintCache, hintCache,
)
if err != nil {
return nil, nil, err
}

@ -1628,122 +1628,119 @@ func testChannelBalance(net *lntest.NetworkHarness, t *harnessTest) {
// findForceClosedChannel searches a pending channel response for a particular
// channel, returning the force closed channel upon success.
func findForceClosedChannel(t *harnessTest,
pendingChanResp *lnrpc.PendingChannelsResponse,
op *wire.OutPoint) *lnrpc.PendingChannelsResponse_ForceClosedChannel {
func findForceClosedChannel(pendingChanResp *lnrpc.PendingChannelsResponse,
op *wire.OutPoint) (*lnrpc.PendingChannelsResponse_ForceClosedChannel, error) {
var found bool
var forceClose *lnrpc.PendingChannelsResponse_ForceClosedChannel
for _, forceClose = range pendingChanResp.PendingForceClosingChannels {
for _, forceClose := range pendingChanResp.PendingForceClosingChannels {
if forceClose.Channel.ChannelPoint == op.String() {
found = true
break
return forceClose, nil
}
}
if !found {
t.Fatalf("channel not marked as force closed")
}
return forceClose
return nil, errors.New("channel not marked as force closed")
}
// findWaitingCloseChannel searches a pending channel response for a particular
// channel, returning the waiting close channel upon success.
func findWaitingCloseChannel(t *harnessTest,
pendingChanResp *lnrpc.PendingChannelsResponse,
op *wire.OutPoint) *lnrpc.PendingChannelsResponse_WaitingCloseChannel {
func findWaitingCloseChannel(pendingChanResp *lnrpc.PendingChannelsResponse,
op *wire.OutPoint) (*lnrpc.PendingChannelsResponse_WaitingCloseChannel, error) {
var found bool
var waitingClose *lnrpc.PendingChannelsResponse_WaitingCloseChannel
for _, waitingClose = range pendingChanResp.WaitingCloseChannels {
for _, waitingClose := range pendingChanResp.WaitingCloseChannels {
if waitingClose.Channel.ChannelPoint == op.String() {
found = true
break
return waitingClose, nil
}
}
if !found {
t.Fatalf("channel not marked as waiting close")
}
return waitingClose
return nil, errors.New("channel not marked as waiting close")
}
func assertCommitmentMaturity(t *harnessTest,
func checkCommitmentMaturity(
forceClose *lnrpc.PendingChannelsResponse_ForceClosedChannel,
maturityHeight uint32, blocksTilMaturity int32) {
maturityHeight uint32, blocksTilMaturity int32) error {
if forceClose.MaturityHeight != maturityHeight {
t.Fatalf("expected commitment maturity height to be %d, "+
"found %d instead", maturityHeight,
return fmt.Errorf("expected commitment maturity height to be "+
"%d, found %d instead", maturityHeight,
forceClose.MaturityHeight)
}
if forceClose.BlocksTilMaturity != blocksTilMaturity {
t.Fatalf("expected commitment blocks til maturity to be %d, "+
"found %d instead", blocksTilMaturity,
return fmt.Errorf("expected commitment blocks til maturity to "+
"be %d, found %d instead", blocksTilMaturity,
forceClose.BlocksTilMaturity)
}
return nil
}
// assertForceClosedChannelNumHtlcs verifies that a force closed channel has the
// checkForceClosedChannelNumHtlcs verifies that a force closed channel has the
// proper number of htlcs.
func assertPendingChannelNumHtlcs(t *harnessTest,
func checkPendingChannelNumHtlcs(
forceClose *lnrpc.PendingChannelsResponse_ForceClosedChannel,
expectedNumHtlcs int) {
expectedNumHtlcs int) error {
if len(forceClose.PendingHtlcs) != expectedNumHtlcs {
t.Fatalf("expected force closed channel to have %d pending "+
"htlcs, found %d instead", expectedNumHtlcs,
return fmt.Errorf("expected force closed channel to have %d "+
"pending htlcs, found %d instead", expectedNumHtlcs,
len(forceClose.PendingHtlcs))
}
return nil
}
// assertNumForceClosedChannels checks that a pending channel response has the
// checkNumForceClosedChannels checks that a pending channel response has the
// expected number of force closed channels.
func assertNumForceClosedChannels(t *harnessTest,
pendingChanResp *lnrpc.PendingChannelsResponse, expectedNumChans int) {
func checkNumForceClosedChannels(pendingChanResp *lnrpc.PendingChannelsResponse,
expectedNumChans int) error {
if len(pendingChanResp.PendingForceClosingChannels) != expectedNumChans {
t.Fatalf("expected to find %d force closed channels, got %d",
expectedNumChans,
return fmt.Errorf("expected to find %d force closed channels, "+
"got %d", expectedNumChans,
len(pendingChanResp.PendingForceClosingChannels))
}
return nil
}
// assertNumWaitingCloseChannels checks that a pending channel response has the
// checkNumWaitingCloseChannels checks that a pending channel response has the
// expected number of channels waiting for closing tx to confirm.
func assertNumWaitingCloseChannels(t *harnessTest,
pendingChanResp *lnrpc.PendingChannelsResponse, expectedNumChans int) {
func checkNumWaitingCloseChannels(pendingChanResp *lnrpc.PendingChannelsResponse,
expectedNumChans int) error {
if len(pendingChanResp.WaitingCloseChannels) != expectedNumChans {
t.Fatalf("expected to find %d channels waiting closure, got %d",
expectedNumChans,
return fmt.Errorf("expected to find %d channels waiting "+
"closure, got %d", expectedNumChans,
len(pendingChanResp.WaitingCloseChannels))
}
return nil
}
// assertPendingHtlcStageAndMaturity uniformly tests all pending htlc's
// belonging to a force closed channel, testing for the expected stage number,
// blocks till maturity, and the maturity height.
func assertPendingHtlcStageAndMaturity(t *harnessTest,
// checkPendingHtlcStageAndMaturity uniformly tests all pending htlc's belonging
// to a force closed channel, testing for the expected stage number, blocks till
// maturity, and the maturity height.
func checkPendingHtlcStageAndMaturity(
forceClose *lnrpc.PendingChannelsResponse_ForceClosedChannel,
stage, maturityHeight uint32, blocksTillMaturity int32) {
stage, maturityHeight uint32, blocksTillMaturity int32) error {
for _, pendingHtlc := range forceClose.PendingHtlcs {
if pendingHtlc.Stage != stage {
t.Fatalf("expected pending htlc to be stage %d, "+
"found %d", stage, pendingHtlc.Stage)
return fmt.Errorf("expected pending htlc to be stage "+
"%d, found %d", stage, pendingHtlc.Stage)
}
if pendingHtlc.MaturityHeight != maturityHeight {
t.Fatalf("expected pending htlc maturity height to be "+
"%d, instead has %d", maturityHeight,
pendingHtlc.MaturityHeight)
return fmt.Errorf("expected pending htlc maturity "+
"height to be %d, instead has %d",
maturityHeight, pendingHtlc.MaturityHeight)
}
if pendingHtlc.BlocksTilMaturity != blocksTillMaturity {
t.Fatalf("expected pending htlc blocks til maturity "+
"to be %d, instead has %d", blocksTillMaturity,
return fmt.Errorf("expected pending htlc blocks til "+
"maturity to be %d, instead has %d",
blocksTillMaturity,
pendingHtlc.BlocksTilMaturity)
}
}
return nil
}
// testChannelForceClosure performs a test to exercise the behavior of "force"
@ -1887,8 +1884,6 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) {
htlcCsvMaturityHeight = startHeight + defaultCLTV + 1 + defaultCSV
)
time.Sleep(200 * time.Millisecond)
aliceChan, err := getAliceChanInfo()
if err != nil {
t.Fatalf("unable to get alice's channel info: %v", err)
@ -1913,7 +1908,10 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) {
if err != nil {
t.Fatalf("unable to query for pending channels: %v", err)
}
assertNumWaitingCloseChannels(t, pendingChanResp, 1)
err = checkNumWaitingCloseChannels(pendingChanResp, 1)
if err != nil {
t.Fatalf(err.Error())
}
// Compute the outpoint of the channel, which we will use repeatedly to
// locate the pending channel information in the rpc responses.
@ -1930,7 +1928,10 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) {
Index: chanPoint.OutputIndex,
}
waitingClose := findWaitingCloseChannel(t, pendingChanResp, &op)
waitingClose, err := findWaitingCloseChannel(pendingChanResp, &op)
if err != nil {
t.Fatalf(err.Error())
}
// Immediately after force closing, all of the funds should be in limbo.
if waitingClose.LimboBalance == 0 {
@ -1953,37 +1954,56 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) {
t.Fatalf("unable to generate block: %v", err)
}
// The following sleep provides time for the UTXO nursery to move the
// output from the preschool to the kindergarten database buckets
// prior to RestartNode() being triggered. Without this sleep, the
// database update may fail, causing the UTXO nursery to retry the move
// operation upon restart. This will change the blockheights from what
// is expected by the test.
// TODO(bvu): refactor out this sleep.
duration := time.Millisecond * 300
time.Sleep(duration)
// Now that the commitment has been confirmed, the channel should be
// marked as force closed.
pendingChanResp, err = net.Alice.PendingChannels(ctxb, pendingChansRequest)
err = lntest.WaitPredicate(func() bool {
pendingChanResp, err := net.Alice.PendingChannels(
ctxb, pendingChansRequest,
)
if err != nil {
predErr = fmt.Errorf("unable to query for pending "+
"channels: %v", err)
return false
}
predErr = checkNumForceClosedChannels(pendingChanResp, 1)
if predErr != nil {
return false
}
forceClose, predErr := findForceClosedChannel(
pendingChanResp, &op,
)
if predErr != nil {
return false
}
// Now that the channel has been force closed, it should now
// have the height and number of blocks to confirm populated.
predErr = checkCommitmentMaturity(
forceClose, commCsvMaturityHeight, int32(defaultCSV),
)
if predErr != nil {
return false
}
// None of our outputs have been swept, so they should all be in
// limbo.
if forceClose.LimboBalance == 0 {
predErr = errors.New("all funds should still be in " +
"limbo")
return false
}
if forceClose.RecoveredBalance != 0 {
predErr = errors.New("no funds should yet be shown " +
"as recovered")
return false
}
return true
}, 15*time.Second)
if err != nil {
t.Fatalf("unable to query for pending channels: %v", err)
}
assertNumForceClosedChannels(t, pendingChanResp, 1)
forceClose := findForceClosedChannel(t, pendingChanResp, &op)
// Now that the channel has been force closed, it should now have the
// height and number of blocks to confirm populated.
assertCommitmentMaturity(t, forceClose, commCsvMaturityHeight,
int32(defaultCSV))
// None of our outputs have been swept, so they should all be limbo.
if forceClose.LimboBalance == 0 {
t.Fatalf("all funds should still be in limbo")
}
if forceClose.RecoveredBalance != 0 {
t.Fatalf("no funds should yet be shown as recovered")
t.Fatalf(predErr.Error())
}
// The following restart is intended to ensure that outputs from the
@ -2009,26 +2029,58 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) {
t.Fatalf("Node restart failed: %v", err)
}
pendingChanResp, err = net.Alice.PendingChannels(ctxb, pendingChansRequest)
// Alice should see the channel in her set of pending force closed
// channels with her funds still in limbo.
err = lntest.WaitPredicate(func() bool {
pendingChanResp, err := net.Alice.PendingChannels(
ctxb, pendingChansRequest,
)
if err != nil {
predErr = fmt.Errorf("unable to query for pending "+
"channels: %v", err)
return false
}
predErr = checkNumForceClosedChannels(pendingChanResp, 1)
if predErr != nil {
return false
}
forceClose, predErr := findForceClosedChannel(
pendingChanResp, &op,
)
if predErr != nil {
return false
}
// At this point, the nursery should show that the commitment
// output has 1 block left before its CSV delay expires. In
// total, we have mined exactly defaultCSV blocks, so the htlc
// outputs should also reflect that this many blocks have
// passed.
predErr = checkCommitmentMaturity(
forceClose, commCsvMaturityHeight, 1,
)
if predErr != nil {
return false
}
// All funds should still be shown in limbo.
if forceClose.LimboBalance == 0 {
predErr = errors.New("all funds should still be in " +
"limbo")
return false
}
if forceClose.RecoveredBalance != 0 {
predErr = errors.New("no funds should yet be shown " +
"as recovered")
return false
}
return true
}, 15*time.Second)
if err != nil {
t.Fatalf("unable to query for pending channels: %v", err)
}
assertNumForceClosedChannels(t, pendingChanResp, 1)
forceClose = findForceClosedChannel(t, pendingChanResp, &op)
// At this point, the nursery should show that the commitment output has
// 1 block left before its CSV delay expires. In total, we have mined
// exactly defaultCSV blocks, so the htlc outputs should also reflect
// that this many blocks have passed.
assertCommitmentMaturity(t, forceClose, commCsvMaturityHeight, 1)
// All funds should still be shown in limbo.
if forceClose.LimboBalance == 0 {
t.Fatalf("all funds should still be in limbo")
}
if forceClose.RecoveredBalance != 0 {
t.Fatalf("no funds should yet be shown as recovered")
t.Fatalf(predErr.Error())
}
// Generate an additional block, which should cause the CSV delayed
@ -2079,22 +2131,25 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) {
assertTxInBlock(t, block, sweepTx.Hash())
// We sleep here to ensure that Alice has enough time to receive a
// confirmation for the commitment transaction, which we already
// asserted was in the last block.
time.Sleep(300 * time.Millisecond)
// Now that the commit output has been fully swept, check to see that
// the channel remains open for the pending htlc outputs.
pendingChanResp, err = net.Alice.PendingChannels(ctxb, pendingChansRequest)
if err != nil {
t.Fatalf("unable to query for pending channels: %v", err)
}
assertNumForceClosedChannels(t, pendingChanResp, 1)
err = checkNumForceClosedChannels(pendingChanResp, 1)
if err != nil {
t.Fatalf(err.Error())
}
// The htlc funds will still be shown as limbo, since they are still in
// their first stage. The commitment funds will have been recovered
// after the commit txn was included in the last block.
forceClose, err := findForceClosedChannel(pendingChanResp, &op)
if err != nil {
t.Fatalf(err.Error())
}
if forceClose.LimboBalance == 0 {
t.Fatalf("htlc funds should still be in limbo")
}
@ -2113,7 +2168,6 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) {
if err != nil {
t.Fatalf("unable to generate block: %v", err)
}
time.Sleep(duration)
// We now restart Alice, to ensure that she will broadcast the presigned
// htlc timeout txns after the delay expires after experiencing a while
@ -2121,26 +2175,58 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) {
if err := net.RestartNode(net.Alice, nil); err != nil {
t.Fatalf("Node restart failed: %v", err)
}
time.Sleep(duration)
pendingChanResp, err = net.Alice.PendingChannels(ctxb, pendingChansRequest)
// Alice should now see the channel in her set of pending force closed
// channels with one pending HTLC.
err = lntest.WaitPredicate(func() bool {
pendingChanResp, err = net.Alice.PendingChannels(
ctxb, pendingChansRequest,
)
if err != nil {
predErr = fmt.Errorf("unable to query for pending "+
"channels: %v", err)
return false
}
predErr = checkNumForceClosedChannels(pendingChanResp, 1)
if predErr != nil {
return false
}
forceClose, predErr = findForceClosedChannel(
pendingChanResp, &op,
)
if predErr != nil {
return false
}
// We should now be at the block just before the utxo nursery
// will attempt to broadcast the htlc timeout transactions.
predErr = checkPendingChannelNumHtlcs(forceClose, numInvoices)
if predErr != nil {
return false
}
predErr = checkPendingHtlcStageAndMaturity(
forceClose, 1, htlcExpiryHeight, 1,
)
if predErr != nil {
return false
}
// Now that our commitment confirmation depth has been
// surpassed, we should now see a non-zero recovered balance.
// All htlc outputs are still left in limbo, so it should be
// non-zero as well.
if forceClose.LimboBalance == 0 {
predErr = errors.New("htlc funds should still be in " +
"limbo")
return false
}
return true
}, 15*time.Second)
if err != nil {
t.Fatalf("unable to query for pending channels: %v", err)
}
assertNumForceClosedChannels(t, pendingChanResp, 1)
forceClose = findForceClosedChannel(t, pendingChanResp, &op)
// We should now be at the block just before the utxo nursery will
// attempt to broadcast the htlc timeout transactions.
assertPendingChannelNumHtlcs(t, forceClose, numInvoices)
assertPendingHtlcStageAndMaturity(t, forceClose, 1, htlcExpiryHeight, 1)
// Now that our commitment confirmation depth has been surpassed, we
// should now see a non-zero recovered balance. All htlc outputs are
// still left in limbo, so it should be non-zero as well.
if forceClose.LimboBalance == 0 {
t.Fatalf("htlc funds should still be in limbo")
t.Fatalf(predErr.Error())
}
// Now, generate the block which will cause Alice to broadcast the
@ -2191,7 +2277,6 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) {
if err := net.RestartNode(net.Alice, nil); err != nil {
t.Fatalf("Node restart failed: %v", err)
}
time.Sleep(duration)
// Generate a block that mines the htlc timeout txns. Doing so now
// activates the 2nd-stage CSV delayed outputs.
@ -2199,9 +2284,6 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) {
if err != nil {
t.Fatalf("unable to generate block: %v", err)
}
// This sleep gives Alice enough to time move the crib outputs into the
// kindergarten bucket.
time.Sleep(duration)
// Alice is restarted here to ensure that she promptly moved the crib
// outputs to the kindergarten bucket after the htlc timeout txns were
@ -2229,15 +2311,24 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) {
if err != nil {
t.Fatalf("unable to query for pending channels: %v", err)
}
assertNumForceClosedChannels(t, pendingChanResp, 1)
err = checkNumForceClosedChannels(pendingChanResp, 1)
if err != nil {
t.Fatalf(err.Error())
}
forceClose = findForceClosedChannel(t, pendingChanResp, &op)
forceClose, err = findForceClosedChannel(pendingChanResp, &op)
if err != nil {
t.Fatalf(err.Error())
}
if forceClose.LimboBalance == 0 {
t.Fatalf("htlc funds should still be in limbo")
}
assertPendingChannelNumHtlcs(t, forceClose, numInvoices)
err = checkPendingChannelNumHtlcs(forceClose, numInvoices)
if err != nil {
t.Fatalf(err.Error())
}
// Generate a block that causes Alice to sweep the htlc outputs in the
// kindergarten bucket.
@ -2296,7 +2387,6 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) {
if err := net.RestartNode(net.Alice, nil); err != nil {
t.Fatalf("Node restart failed: %v", err)
}
time.Sleep(duration)
// Now that the channel has been fully swept, it should no longer show
// incubated, check to see that Alice's node still reports the channel
@ -2305,14 +2395,27 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) {
if err != nil {
t.Fatalf("unable to query for pending channels: %v", err)
}
assertNumForceClosedChannels(t, pendingChanResp, 1)
err = checkNumForceClosedChannels(pendingChanResp, 1)
if err != nil {
t.Fatalf(err.Error())
}
// All htlcs should show zero blocks until maturity, as evidenced by
// having checked the sweep transaction in the mempool.
forceClose = findForceClosedChannel(t, pendingChanResp, &op)
assertPendingChannelNumHtlcs(t, forceClose, numInvoices)
assertPendingHtlcStageAndMaturity(t, forceClose, 2,
htlcCsvMaturityHeight, 0)
forceClose, err = findForceClosedChannel(pendingChanResp, &op)
if err != nil {
t.Fatalf(err.Error())
}
err = checkPendingChannelNumHtlcs(forceClose, numInvoices)
if err != nil {
t.Fatalf(err.Error())
}
err = checkPendingHtlcStageAndMaturity(
forceClose, 2, htlcCsvMaturityHeight, 0,
)
if err != nil {
t.Fatalf(err.Error())
}
// Generate the final block that sweeps all htlc funds into the user's
// wallet.
@ -2320,20 +2423,37 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) {
if err != nil {
t.Fatalf("unable to generate block: %v", err)
}
time.Sleep(3 * duration)
// Now that the channel has been fully swept, it should no longer show
// up within the pending channels RPC.
pendingChanResp, err = net.Alice.PendingChannels(ctxb, pendingChansRequest)
if err != nil {
t.Fatalf("unable to query for pending channels: %v", err)
}
assertNumForceClosedChannels(t, pendingChanResp, 0)
err = lntest.WaitPredicate(func() bool {
pendingChanResp, err := net.Alice.PendingChannels(
ctxb, pendingChansRequest,
)
if err != nil {
predErr = fmt.Errorf("unable to query for pending "+
"channels: %v", err)
return false
}
// In addition to there being no pending channels, we verify that
// pending channels does not report any money still in limbo.
if pendingChanResp.TotalLimboBalance != 0 {
t.Fatalf("no user funds should be left in limbo after incubation")
predErr = checkNumForceClosedChannels(pendingChanResp, 0)
if predErr != nil {
return false
}
// In addition to there being no pending channels, we verify
// that pending channels does not report any money still in
// limbo.
if pendingChanResp.TotalLimboBalance != 0 {
predErr = errors.New("no user funds should be left " +
"in limbo after incubation")
return false
}
return true
}, 15*time.Second)
if err != nil {
t.Fatalf(predErr.Error())
}
// At this point, Bob should now be aware of his new immediately

@ -2068,7 +2068,19 @@ func TestLightningWallet(t *testing.T) {
rpcConfig := miningNode.RPCConfig()
chainNotifier, err := btcdnotify.New(&rpcConfig)
tempDir, err := ioutil.TempDir("", "channeldb")
if err != nil {
t.Fatalf("unable to create temp dir: %v", err)
}
db, err := channeldb.Open(tempDir)
if err != nil {
t.Fatalf("unable to create db: %v", err)
}
hintCache, err := chainntnfs.NewHeightHintCache(db)
if err != nil {
t.Fatalf("unable to create height hint cache: %v", err)
}
chainNotifier, err := btcdnotify.New(&rpcConfig, hintCache, hintCache)
if err != nil {
t.Fatalf("unable to create notifier: %v", err)
}