diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index a4069a00..7dc47170 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -14,6 +14,7 @@ import ( "github.com/btcsuite/btcwallet/chain" "github.com/btcsuite/btcwallet/wtxmgr" "github.com/lightningnetwork/lnd/chainntnfs" + "github.com/lightningnetwork/lnd/queue" ) const ( @@ -996,7 +997,7 @@ type blockEpochRegistration struct { epochChan chan *chainntnfs.BlockEpoch - epochQueue *chainntnfs.ConcurrentQueue + epochQueue *queue.ConcurrentQueue bestBlock *chainntnfs.BlockEpoch @@ -1021,7 +1022,7 @@ func (b *BitcoindNotifier) RegisterBlockEpochNtfn( bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) { reg := &blockEpochRegistration{ - epochQueue: chainntnfs.NewConcurrentQueue(20), + epochQueue: queue.NewConcurrentQueue(20), epochChan: make(chan *chainntnfs.BlockEpoch, 20), cancelChan: make(chan struct{}), epochID: atomic.AddUint64(&b.epochClientCounter, 1), diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 8a8354d0..209b9744 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -14,6 +14,7 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" "github.com/lightningnetwork/lnd/chainntnfs" + "github.com/lightningnetwork/lnd/queue" ) const ( @@ -81,8 +82,8 @@ type BtcdNotifier struct { bestBlock chainntnfs.BlockEpoch - chainUpdates *chainntnfs.ConcurrentQueue - txUpdates *chainntnfs.ConcurrentQueue + chainUpdates *queue.ConcurrentQueue + txUpdates *queue.ConcurrentQueue // spendHintCache is a cache used to query and update the latest height // hints for an outpoint. Each height hint represents the earliest @@ -115,8 +116,8 @@ func New(config *rpcclient.ConnConfig, spendHintCache chainntnfs.SpendHintCache, spendNotifications: make(map[wire.OutPoint]map[uint64]*spendNotification), - chainUpdates: chainntnfs.NewConcurrentQueue(10), - txUpdates: chainntnfs.NewConcurrentQueue(10), + chainUpdates: queue.NewConcurrentQueue(10), + txUpdates: queue.NewConcurrentQueue(10), spendHintCache: spendHintCache, confirmHintCache: confirmHintCache, @@ -1056,7 +1057,7 @@ type blockEpochRegistration struct { epochChan chan *chainntnfs.BlockEpoch - epochQueue *chainntnfs.ConcurrentQueue + epochQueue *queue.ConcurrentQueue bestBlock *chainntnfs.BlockEpoch @@ -1081,7 +1082,7 @@ func (b *BtcdNotifier) RegisterBlockEpochNtfn( bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) { reg := &blockEpochRegistration{ - epochQueue: chainntnfs.NewConcurrentQueue(20), + epochQueue: queue.NewConcurrentQueue(20), epochChan: make(chan *chainntnfs.BlockEpoch, 20), cancelChan: make(chan struct{}), epochID: atomic.AddUint64(&b.epochClientCounter, 1), diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index b1be7cb2..f410b7b8 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -18,6 +18,7 @@ import ( "github.com/btcsuite/btcwallet/waddrmgr" "github.com/lightninglabs/neutrino" "github.com/lightningnetwork/lnd/chainntnfs" + "github.com/lightningnetwork/lnd/queue" ) const ( @@ -75,7 +76,7 @@ type NeutrinoNotifier struct { rescanErr <-chan error - chainUpdates *chainntnfs.ConcurrentQueue + chainUpdates *queue.ConcurrentQueue // spendHintCache is a cache used to query and update the latest height // hints for an outpoint. Each height hint represents the earliest @@ -114,7 +115,7 @@ func New(node *neutrino.ChainService, spendHintCache chainntnfs.SpendHintCache, rescanErr: make(chan error), - chainUpdates: chainntnfs.NewConcurrentQueue(10), + chainUpdates: queue.NewConcurrentQueue(10), spendHintCache: spendHintCache, confirmHintCache: confirmHintCache, @@ -968,7 +969,7 @@ type blockEpochRegistration struct { epochChan chan *chainntnfs.BlockEpoch - epochQueue *chainntnfs.ConcurrentQueue + epochQueue *queue.ConcurrentQueue cancelChan chan struct{} @@ -993,7 +994,7 @@ func (n *NeutrinoNotifier) RegisterBlockEpochNtfn( bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) { reg := &blockEpochRegistration{ - epochQueue: chainntnfs.NewConcurrentQueue(20), + epochQueue: queue.NewConcurrentQueue(20), epochChan: make(chan *chainntnfs.BlockEpoch, 20), cancelChan: make(chan struct{}), epochID: atomic.AddUint64(&n.epochClientCounter, 1), diff --git a/invoiceregistry.go b/invoiceregistry.go index 7d1a3754..235460f3 100644 --- a/invoiceregistry.go +++ b/invoiceregistry.go @@ -11,9 +11,9 @@ import ( "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcutil" "github.com/davecgh/go-spew/spew" - "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/queue" "github.com/lightningnetwork/lnd/zpay32" ) @@ -415,7 +415,7 @@ type invoiceSubscription struct { // are sent out. settleIndex uint64 - ntfnQueue *chainntnfs.ConcurrentQueue + ntfnQueue *queue.ConcurrentQueue id uint32 @@ -456,7 +456,7 @@ func (i *invoiceRegistry) SubscribeNotifications(addIndex, settleIndex uint64) * addIndex: addIndex, settleIndex: settleIndex, inv: i, - ntfnQueue: chainntnfs.NewConcurrentQueue(20), + ntfnQueue: queue.NewConcurrentQueue(20), cancelChan: make(chan struct{}), } client.ntfnQueue.Start() diff --git a/chainntnfs/queue.go b/queue/queue.go similarity index 99% rename from chainntnfs/queue.go rename to queue/queue.go index 90198a4b..497e6f7e 100644 --- a/chainntnfs/queue.go +++ b/queue/queue.go @@ -1,4 +1,4 @@ -package chainntnfs +package queue import ( "container/list" diff --git a/chainntnfs/queue_test.go b/queue/queue_test.go similarity index 78% rename from chainntnfs/queue_test.go rename to queue/queue_test.go index 44d62735..37d8a291 100644 --- a/chainntnfs/queue_test.go +++ b/queue/queue_test.go @@ -1,13 +1,13 @@ -package chainntnfs_test +package queue_test import ( "testing" - "github.com/lightningnetwork/lnd/chainntnfs" + "github.com/lightningnetwork/lnd/queue" ) func TestConcurrentQueue(t *testing.T) { - queue := chainntnfs.NewConcurrentQueue(100) + queue := queue.NewConcurrentQueue(100) queue.Start() defer queue.Stop()