Merge pull request #2067 from halseth/concurrent-queue

chainntnfs+queue: move ConcurrentQueue to own package 'queue'
This commit is contained in:
Olaoluwa Osuntokun 2018-10-18 18:44:05 -07:00 committed by GitHub
commit c973ef18bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 22 additions and 19 deletions

@ -14,6 +14,7 @@ import (
"github.com/btcsuite/btcwallet/chain" "github.com/btcsuite/btcwallet/chain"
"github.com/btcsuite/btcwallet/wtxmgr" "github.com/btcsuite/btcwallet/wtxmgr"
"github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/queue"
) )
const ( const (
@ -996,7 +997,7 @@ type blockEpochRegistration struct {
epochChan chan *chainntnfs.BlockEpoch epochChan chan *chainntnfs.BlockEpoch
epochQueue *chainntnfs.ConcurrentQueue epochQueue *queue.ConcurrentQueue
bestBlock *chainntnfs.BlockEpoch bestBlock *chainntnfs.BlockEpoch
@ -1021,7 +1022,7 @@ func (b *BitcoindNotifier) RegisterBlockEpochNtfn(
bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) { bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) {
reg := &blockEpochRegistration{ reg := &blockEpochRegistration{
epochQueue: chainntnfs.NewConcurrentQueue(20), epochQueue: queue.NewConcurrentQueue(20),
epochChan: make(chan *chainntnfs.BlockEpoch, 20), epochChan: make(chan *chainntnfs.BlockEpoch, 20),
cancelChan: make(chan struct{}), cancelChan: make(chan struct{}),
epochID: atomic.AddUint64(&b.epochClientCounter, 1), epochID: atomic.AddUint64(&b.epochClientCounter, 1),

@ -14,6 +14,7 @@ import (
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil" "github.com/btcsuite/btcutil"
"github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/queue"
) )
const ( const (
@ -81,8 +82,8 @@ type BtcdNotifier struct {
bestBlock chainntnfs.BlockEpoch bestBlock chainntnfs.BlockEpoch
chainUpdates *chainntnfs.ConcurrentQueue chainUpdates *queue.ConcurrentQueue
txUpdates *chainntnfs.ConcurrentQueue txUpdates *queue.ConcurrentQueue
// spendHintCache is a cache used to query and update the latest height // spendHintCache is a cache used to query and update the latest height
// hints for an outpoint. Each height hint represents the earliest // hints for an outpoint. Each height hint represents the earliest
@ -115,8 +116,8 @@ func New(config *rpcclient.ConnConfig, spendHintCache chainntnfs.SpendHintCache,
spendNotifications: make(map[wire.OutPoint]map[uint64]*spendNotification), spendNotifications: make(map[wire.OutPoint]map[uint64]*spendNotification),
chainUpdates: chainntnfs.NewConcurrentQueue(10), chainUpdates: queue.NewConcurrentQueue(10),
txUpdates: chainntnfs.NewConcurrentQueue(10), txUpdates: queue.NewConcurrentQueue(10),
spendHintCache: spendHintCache, spendHintCache: spendHintCache,
confirmHintCache: confirmHintCache, confirmHintCache: confirmHintCache,
@ -1056,7 +1057,7 @@ type blockEpochRegistration struct {
epochChan chan *chainntnfs.BlockEpoch epochChan chan *chainntnfs.BlockEpoch
epochQueue *chainntnfs.ConcurrentQueue epochQueue *queue.ConcurrentQueue
bestBlock *chainntnfs.BlockEpoch bestBlock *chainntnfs.BlockEpoch
@ -1081,7 +1082,7 @@ func (b *BtcdNotifier) RegisterBlockEpochNtfn(
bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) { bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) {
reg := &blockEpochRegistration{ reg := &blockEpochRegistration{
epochQueue: chainntnfs.NewConcurrentQueue(20), epochQueue: queue.NewConcurrentQueue(20),
epochChan: make(chan *chainntnfs.BlockEpoch, 20), epochChan: make(chan *chainntnfs.BlockEpoch, 20),
cancelChan: make(chan struct{}), cancelChan: make(chan struct{}),
epochID: atomic.AddUint64(&b.epochClientCounter, 1), epochID: atomic.AddUint64(&b.epochClientCounter, 1),

@ -18,6 +18,7 @@ import (
"github.com/btcsuite/btcwallet/waddrmgr" "github.com/btcsuite/btcwallet/waddrmgr"
"github.com/lightninglabs/neutrino" "github.com/lightninglabs/neutrino"
"github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/queue"
) )
const ( const (
@ -75,7 +76,7 @@ type NeutrinoNotifier struct {
rescanErr <-chan error rescanErr <-chan error
chainUpdates *chainntnfs.ConcurrentQueue chainUpdates *queue.ConcurrentQueue
// spendHintCache is a cache used to query and update the latest height // spendHintCache is a cache used to query and update the latest height
// hints for an outpoint. Each height hint represents the earliest // hints for an outpoint. Each height hint represents the earliest
@ -114,7 +115,7 @@ func New(node *neutrino.ChainService, spendHintCache chainntnfs.SpendHintCache,
rescanErr: make(chan error), rescanErr: make(chan error),
chainUpdates: chainntnfs.NewConcurrentQueue(10), chainUpdates: queue.NewConcurrentQueue(10),
spendHintCache: spendHintCache, spendHintCache: spendHintCache,
confirmHintCache: confirmHintCache, confirmHintCache: confirmHintCache,
@ -968,7 +969,7 @@ type blockEpochRegistration struct {
epochChan chan *chainntnfs.BlockEpoch epochChan chan *chainntnfs.BlockEpoch
epochQueue *chainntnfs.ConcurrentQueue epochQueue *queue.ConcurrentQueue
cancelChan chan struct{} cancelChan chan struct{}
@ -993,7 +994,7 @@ func (n *NeutrinoNotifier) RegisterBlockEpochNtfn(
bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) { bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) {
reg := &blockEpochRegistration{ reg := &blockEpochRegistration{
epochQueue: chainntnfs.NewConcurrentQueue(20), epochQueue: queue.NewConcurrentQueue(20),
epochChan: make(chan *chainntnfs.BlockEpoch, 20), epochChan: make(chan *chainntnfs.BlockEpoch, 20),
cancelChan: make(chan struct{}), cancelChan: make(chan struct{}),
epochID: atomic.AddUint64(&n.epochClientCounter, 1), epochID: atomic.AddUint64(&n.epochClientCounter, 1),

@ -11,9 +11,9 @@ import (
"github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcutil" "github.com/btcsuite/btcutil"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/queue"
"github.com/lightningnetwork/lnd/zpay32" "github.com/lightningnetwork/lnd/zpay32"
) )
@ -415,7 +415,7 @@ type invoiceSubscription struct {
// are sent out. // are sent out.
settleIndex uint64 settleIndex uint64
ntfnQueue *chainntnfs.ConcurrentQueue ntfnQueue *queue.ConcurrentQueue
id uint32 id uint32
@ -456,7 +456,7 @@ func (i *invoiceRegistry) SubscribeNotifications(addIndex, settleIndex uint64) *
addIndex: addIndex, addIndex: addIndex,
settleIndex: settleIndex, settleIndex: settleIndex,
inv: i, inv: i,
ntfnQueue: chainntnfs.NewConcurrentQueue(20), ntfnQueue: queue.NewConcurrentQueue(20),
cancelChan: make(chan struct{}), cancelChan: make(chan struct{}),
} }
client.ntfnQueue.Start() client.ntfnQueue.Start()

@ -1,4 +1,4 @@
package chainntnfs package queue
import ( import (
"container/list" "container/list"

@ -1,13 +1,13 @@
package chainntnfs_test package queue_test
import ( import (
"testing" "testing"
"github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/queue"
) )
func TestConcurrentQueue(t *testing.T) { func TestConcurrentQueue(t *testing.T) {
queue := chainntnfs.NewConcurrentQueue(100) queue := queue.NewConcurrentQueue(100)
queue.Start() queue.Start()
defer queue.Stop() defer queue.Stop()