chainntnfs: add height hint caches to chain notifiers

This commit is contained in:
Wilmer Paulino 2018-08-14 17:53:34 -07:00
parent 6be642a033
commit 30fd219b1c
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
10 changed files with 138 additions and 30 deletions

@ -76,6 +76,16 @@ type BitcoindNotifier struct {
bestBlock chainntnfs.BlockEpoch
// spendHintCache is a cache used to query and update the latest height
// hints for an outpoint. Each height hint represents the earliest
// height at which the outpoint could have been spent within the chain.
spendHintCache chainntnfs.SpendHintCache
// confirmHintCache is a cache used to query the latest height hints for
// a transaction. Each height hint represents the earliest height at
// which the transaction could have confirmed within the chain.
confirmHintCache chainntnfs.ConfirmHintCache
wg sync.WaitGroup
quit chan struct{}
}
@ -87,7 +97,9 @@ var _ chainntnfs.ChainNotifier = (*BitcoindNotifier)(nil)
// New returns a new BitcoindNotifier instance. This function assumes the
// bitcoind node detailed in the passed configuration is already running, and
// willing to accept RPC requests and new zmq clients.
func New(chainConn *chain.BitcoindConn) *BitcoindNotifier {
func New(chainConn *chain.BitcoindConn, spendHintCache chainntnfs.SpendHintCache,
confirmHintCache chainntnfs.ConfirmHintCache) *BitcoindNotifier {
notifier := &BitcoindNotifier{
notificationCancels: make(chan interface{}),
notificationRegistry: make(chan interface{}),
@ -96,6 +108,9 @@ func New(chainConn *chain.BitcoindConn) *BitcoindNotifier {
spendNotifications: make(map[wire.OutPoint]map[uint64]*spendNotification),
spendHintCache: spendHintCache,
confirmHintCache: confirmHintCache,
quit: make(chan struct{}),
}
@ -127,7 +142,8 @@ func (b *BitcoindNotifier) Start() error {
}
b.txConfNotifier = chainntnfs.NewTxConfNotifier(
uint32(currentHeight), reorgSafetyLimit)
uint32(currentHeight), reorgSafetyLimit, b.confirmHintCache,
)
b.bestBlock = chainntnfs.BlockEpoch{
Height: currentHeight,

@ -29,7 +29,8 @@ func (b *BitcoindNotifier) UnsafeStart(bestHeight int32, bestHash *chainhash.Has
}
b.txConfNotifier = chainntnfs.NewTxConfNotifier(
uint32(bestHeight), reorgSafetyLimit)
uint32(bestHeight), reorgSafetyLimit, b.confirmHintCache,
)
if generateBlocks != nil {
// Ensure no block notifications are pending when we start the

@ -1,6 +1,7 @@
package bitcoindnotify
import (
"errors"
"fmt"
"github.com/btcsuite/btcwallet/chain"
@ -10,18 +11,30 @@ import (
// createNewNotifier creates a new instance of the ChainNotifier interface
// implemented by BitcoindNotifier.
func createNewNotifier(args ...interface{}) (chainntnfs.ChainNotifier, error) {
if len(args) != 1 {
if len(args) != 3 {
return nil, fmt.Errorf("incorrect number of arguments to "+
".New(...), expected 1, instead passed %v", len(args))
".New(...), expected 2, instead passed %v", len(args))
}
chainConn, ok := args[0].(*chain.BitcoindConn)
if !ok {
return nil, fmt.Errorf("first argument to bitcoindnotify.New " +
return nil, errors.New("first argument to bitcoindnotify.New " +
"is incorrect, expected a *chain.BitcoindConn")
}
return New(chainConn), nil
spendHintCache, ok := args[1].(chainntnfs.SpendHintCache)
if !ok {
return nil, errors.New("second argument to bitcoindnotify.New " +
"is incorrect, expected a chainntnfs.SpendHintCache")
}
confirmHintCache, ok := args[2].(chainntnfs.ConfirmHintCache)
if !ok {
return nil, errors.New("third argument to bitcoindnotify.New " +
"is incorrect, expected a chainntnfs.ConfirmHintCache")
}
return New(chainConn, spendHintCache, confirmHintCache), nil
}
// init registers a driver for the BtcdNotifier concrete implementation of the

@ -83,6 +83,16 @@ type BtcdNotifier struct {
chainUpdates *chainntnfs.ConcurrentQueue
txUpdates *chainntnfs.ConcurrentQueue
// spendHintCache is a cache used to query and update the latest height
// hints for an outpoint. Each height hint represents the earliest
// height at which the outpoint could have been spent within the chain.
spendHintCache chainntnfs.SpendHintCache
// confirmHintCache is a cache used to query the latest height hints for
// a transaction. Each height hint represents the earliest height at
// which the transaction could have confirmed within the chain.
confirmHintCache chainntnfs.ConfirmHintCache
wg sync.WaitGroup
quit chan struct{}
}
@ -93,7 +103,9 @@ var _ chainntnfs.ChainNotifier = (*BtcdNotifier)(nil)
// New returns a new BtcdNotifier instance. This function assumes the btcd node
// detailed in the passed configuration is already running, and willing to
// accept new websockets clients.
func New(config *rpcclient.ConnConfig) (*BtcdNotifier, error) {
func New(config *rpcclient.ConnConfig, spendHintCache chainntnfs.SpendHintCache,
confirmHintCache chainntnfs.ConfirmHintCache) (*BtcdNotifier, error) {
notifier := &BtcdNotifier{
notificationCancels: make(chan interface{}),
notificationRegistry: make(chan interface{}),
@ -105,6 +117,9 @@ func New(config *rpcclient.ConnConfig) (*BtcdNotifier, error) {
chainUpdates: chainntnfs.NewConcurrentQueue(10),
txUpdates: chainntnfs.NewConcurrentQueue(10),
spendHintCache: spendHintCache,
confirmHintCache: confirmHintCache,
quit: make(chan struct{}),
}
@ -150,7 +165,8 @@ func (b *BtcdNotifier) Start() error {
}
b.txConfNotifier = chainntnfs.NewTxConfNotifier(
uint32(currentHeight), reorgSafetyLimit)
uint32(currentHeight), reorgSafetyLimit, b.confirmHintCache,
)
b.bestBlock = chainntnfs.BlockEpoch{
Height: currentHeight,

@ -28,7 +28,8 @@ func (b *BtcdNotifier) UnsafeStart(bestHeight int32, bestHash *chainhash.Hash,
}
b.txConfNotifier = chainntnfs.NewTxConfNotifier(
uint32(bestHeight), reorgSafetyLimit)
uint32(bestHeight), reorgSafetyLimit, b.confirmHintCache,
)
b.chainUpdates.Start()
b.txUpdates.Start()

@ -1,6 +1,7 @@
package btcdnotify
import (
"errors"
"fmt"
"github.com/btcsuite/btcd/rpcclient"
@ -10,18 +11,30 @@ import (
// createNewNotifier creates a new instance of the ChainNotifier interface
// implemented by BtcdNotifier.
func createNewNotifier(args ...interface{}) (chainntnfs.ChainNotifier, error) {
if len(args) != 1 {
return nil, fmt.Errorf("incorrect number of arguments to .New(...), "+
"expected 1, instead passed %v", len(args))
if len(args) != 3 {
return nil, fmt.Errorf("incorrect number of arguments to "+
".New(...), expected 2, instead passed %v", len(args))
}
config, ok := args[0].(*rpcclient.ConnConfig)
if !ok {
return nil, fmt.Errorf("first argument to btcdnotifier.New is " +
"incorrect, expected a *rpcclient.ConnConfig")
return nil, errors.New("first argument to btcdnotifier.New " +
"is incorrect, expected a *rpcclient.ConnConfig")
}
return New(config)
spendHintCache, ok := args[1].(chainntnfs.SpendHintCache)
if !ok {
return nil, errors.New("second argument to btcdnotifier.New " +
"is incorrect, expected a chainntnfs.SpendHintCache")
}
confirmHintCache, ok := args[2].(chainntnfs.ConfirmHintCache)
if !ok {
return nil, errors.New("third argument to btcdnotifier.New " +
"is incorrect, expected a chainntnfs.ConfirmHintCache")
}
return New(config, spendHintCache, confirmHintCache)
}
// init registers a driver for the BtcdNotifier concrete implementation of the

@ -26,6 +26,7 @@ import (
"github.com/btcsuite/btcwallet/walletdb"
"github.com/lightninglabs/neutrino"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
// Required to auto-register the bitcoind backed ChainNotifier
// implementation.
@ -1741,10 +1742,22 @@ func TestInterfaces(t *testing.T) {
newNotifier func() (chainntnfs.TestChainNotifier, error)
)
for _, notifierDriver := range chainntnfs.RegisteredNotifiers() {
// Initialize a height hint cache for each notifier.
tempDir, err := ioutil.TempDir("", "channeldb")
if err != nil {
t.Fatalf("unable to create temp dir: %v", err)
}
db, err := channeldb.Open(tempDir)
if err != nil {
t.Fatalf("unable to create db: %v", err)
}
hintCache, err := chainntnfs.NewHeightHintCache(db)
if err != nil {
t.Fatalf("unable to create height hint cache: %v", err)
}
notifierType := notifierDriver.NotifierType
switch notifierType {
case "bitcoind":
// Start a bitcoind instance.
tempBitcoindDir, err := ioutil.TempDir("", "bitcoind")
@ -1807,12 +1820,16 @@ func TestInterfaces(t *testing.T) {
cleanUp = cleanUp3
newNotifier = func() (chainntnfs.TestChainNotifier, error) {
return bitcoindnotify.New(chainConn), nil
return bitcoindnotify.New(
chainConn, hintCache, hintCache,
), nil
}
case "btcd":
newNotifier = func() (chainntnfs.TestChainNotifier, error) {
return btcdnotify.New(&rpcConfig)
return btcdnotify.New(
&rpcConfig, hintCache, hintCache,
)
}
cleanUp = func() {}
@ -1855,7 +1872,9 @@ func TestInterfaces(t *testing.T) {
time.Sleep(time.Millisecond * 100)
}
newNotifier = func() (chainntnfs.TestChainNotifier, error) {
return neutrinonotify.New(spvNode)
return neutrinonotify.New(
spvNode, hintCache, hintCache,
)
}
}

@ -1,6 +1,7 @@
package neutrinonotify
import (
"errors"
"fmt"
"github.com/lightninglabs/neutrino"
@ -10,18 +11,30 @@ import (
// createNewNotifier creates a new instance of the ChainNotifier interface
// implemented by NeutrinoNotifier.
func createNewNotifier(args ...interface{}) (chainntnfs.ChainNotifier, error) {
if len(args) != 1 {
return nil, fmt.Errorf("incorrect number of arguments to .New(...), "+
"expected 1, instead passed %v", len(args))
if len(args) != 2 {
return nil, fmt.Errorf("incorrect number of arguments to "+
".New(...), expected 2, instead passed %v", len(args))
}
config, ok := args[0].(*neutrino.ChainService)
if !ok {
return nil, fmt.Errorf("first argument to neutrinonotify.New is " +
"incorrect, expected a *neutrino.ChainService")
return nil, errors.New("first argument to neutrinonotify.New " +
"is incorrect, expected a *neutrino.ChainService")
}
return New(config)
spendHintCache, ok := args[1].(chainntnfs.SpendHintCache)
if !ok {
return nil, errors.New("second argument to neutrinonotify.New " +
"is incorrect, expected a chainntfs.SpendHintCache")
}
confirmHintCache, ok := args[2].(chainntnfs.ConfirmHintCache)
if !ok {
return nil, errors.New("third argument to neutrinonotify.New " +
"is incorrect, expected a chainntfs.ConfirmHintCache")
}
return New(config, spendHintCache, confirmHintCache)
}
// init registers a driver for the NeutrinoNotify concrete implementation of

@ -77,6 +77,16 @@ type NeutrinoNotifier struct {
chainUpdates *chainntnfs.ConcurrentQueue
// spendHintCache is a cache used to query and update the latest height
// hints for an outpoint. Each height hint represents the earliest
// height at which the outpoint could have been spent within the chain.
spendHintCache chainntnfs.SpendHintCache
// confirmHintCache is a cache used to query the latest height hints for
// a transaction. Each height hint represents the earliest height at
// which the transaction could have confirmed within the chain.
confirmHintCache chainntnfs.ConfirmHintCache
wg sync.WaitGroup
quit chan struct{}
}
@ -89,7 +99,9 @@ var _ chainntnfs.ChainNotifier = (*NeutrinoNotifier)(nil)
//
// NOTE: The passed neutrino node should already be running and active before
// being passed into this function.
func New(node *neutrino.ChainService) (*NeutrinoNotifier, error) {
func New(node *neutrino.ChainService, spendHintCache chainntnfs.SpendHintCache,
confirmHintCache chainntnfs.ConfirmHintCache) (*NeutrinoNotifier, error) {
notifier := &NeutrinoNotifier{
notificationCancels: make(chan interface{}),
notificationRegistry: make(chan interface{}),
@ -104,6 +116,9 @@ func New(node *neutrino.ChainService) (*NeutrinoNotifier, error) {
chainUpdates: chainntnfs.NewConcurrentQueue(10),
spendHintCache: spendHintCache,
confirmHintCache: confirmHintCache,
quit: make(chan struct{}),
}
@ -150,7 +165,7 @@ func (n *NeutrinoNotifier) Start() error {
}
n.txConfNotifier = chainntnfs.NewTxConfNotifier(
bestHeight, reorgSafetyLimit,
bestHeight, reorgSafetyLimit, n.confirmHintCache,
)
n.chainConn = &NeutrinoChainConn{n.p2pNode}

@ -51,7 +51,8 @@ func (n *NeutrinoNotifier) UnsafeStart(bestHeight int32, bestHash *chainhash.Has
}
n.txConfNotifier = chainntnfs.NewTxConfNotifier(
uint32(bestHeight), reorgSafetyLimit)
uint32(bestHeight), reorgSafetyLimit, n.confirmHintCache,
)
n.chainConn = &NeutrinoChainConn{n.p2pNode}