From 275d55c9e62fa54785b85a7c401459bfb0f0a559 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Thu, 18 Mar 2021 13:17:36 +0200 Subject: [PATCH] blockcache: add blockcache package This commit adds a new blockcache package along with the GetBlock method to be used along with the blockcache. --- blockcache/blockcache.go | 70 +++++++++++++ blockcache/blockcache_test.go | 188 ++++++++++++++++++++++++++++++++++ 2 files changed, 258 insertions(+) create mode 100644 blockcache/blockcache.go create mode 100644 blockcache/blockcache_test.go diff --git a/blockcache/blockcache.go b/blockcache/blockcache.go new file mode 100644 index 00000000..34db764a --- /dev/null +++ b/blockcache/blockcache.go @@ -0,0 +1,70 @@ +package blockcache + +import ( + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcutil" + "github.com/lightninglabs/neutrino/cache" + "github.com/lightninglabs/neutrino/cache/lru" + "github.com/lightningnetwork/lnd/lntypes" + "github.com/lightningnetwork/lnd/multimutex" +) + +// BlockCache is an lru cache for blocks. +type BlockCache struct { + Cache *lru.Cache + HashMutex *multimutex.HashMutex +} + +// NewBlockCache creates a new BlockCache with the given maximum capacity. +func NewBlockCache(capacity uint64) *BlockCache { + return &BlockCache{ + Cache: lru.NewCache(capacity), + HashMutex: multimutex.NewHashMutex(), + } +} + +// GetBlock first checks to see if the BlockCache already contains the block +// with the given hash. If it does then the block is fetched from the cache and +// returned. Otherwise the getBlockImpl function is used in order to fetch the +// new block and then it is stored in the block cache and returned. +func (bc *BlockCache) GetBlock(hash *chainhash.Hash, + getBlockImpl func(hash *chainhash.Hash) (*wire.MsgBlock, + error)) (*wire.MsgBlock, error) { + + bc.HashMutex.Lock(lntypes.Hash(*hash)) + defer bc.HashMutex.Unlock(lntypes.Hash(*hash)) + + // Create an inv vector for getting the block. + inv := wire.NewInvVect(wire.InvTypeWitnessBlock, hash) + + // Check if the block corresponding to the given hash is already + // stored in the blockCache and return it if it is. + cacheBlock, err := bc.Cache.Get(*inv) + if err != nil && err != cache.ErrElementNotFound { + return nil, err + } + if cacheBlock != nil { + return cacheBlock.(*cache.CacheableBlock).MsgBlock(), nil + } + + // Fetch the block from the chain backends. + block, err := getBlockImpl(hash) + if err != nil { + return nil, err + } + + // Add the new block to blockCache. If the Cache is at its maximum + // capacity then the LFU item will be evicted in favour of this new + // block. + _, err = bc.Cache.Put( + *inv, &cache.CacheableBlock{ + Block: btcutil.NewBlock(block), + }, + ) + if err != nil { + return nil, err + } + + return block, nil +} diff --git a/blockcache/blockcache_test.go b/blockcache/blockcache_test.go new file mode 100644 index 00000000..5ea5ae9c --- /dev/null +++ b/blockcache/blockcache_test.go @@ -0,0 +1,188 @@ +package blockcache + +import ( + "errors" + "fmt" + "sync" + "testing" + + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcutil" + "github.com/lightninglabs/neutrino/cache" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type mockChainBackend struct { + blocks map[chainhash.Hash]*wire.MsgBlock + chainCallCount int + + sync.RWMutex +} + +func (m *mockChainBackend) addBlock(block *wire.MsgBlock, nonce uint32) { + m.Lock() + defer m.Unlock() + block.Header.Nonce = nonce + hash := block.Header.BlockHash() + m.blocks[hash] = block +} +func (m *mockChainBackend) GetBlock(blockHash *chainhash.Hash) (*wire.MsgBlock, error) { + m.RLock() + defer m.RUnlock() + m.chainCallCount++ + + block, ok := m.blocks[*blockHash] + if !ok { + return nil, fmt.Errorf("block not found") + } + + return block, nil +} + +func newMockChain() *mockChainBackend { + return &mockChainBackend{ + blocks: make(map[chainhash.Hash]*wire.MsgBlock), + } +} + +func (m *mockChainBackend) resetChainCallCount() { + m.RLock() + defer m.RUnlock() + + m.chainCallCount = 0 +} + +// TestBlockCacheGetBlock tests that the block Cache works correctly as a LFU block +// Cache for the given max capacity. +func TestBlockCacheGetBlock(t *testing.T) { + mc := newMockChain() + getBlockImpl := mc.GetBlock + + block1 := &wire.MsgBlock{Header: wire.BlockHeader{Nonce: 1}} + block2 := &wire.MsgBlock{Header: wire.BlockHeader{Nonce: 2}} + block3 := &wire.MsgBlock{Header: wire.BlockHeader{Nonce: 3}} + + blockhash1 := block1.BlockHash() + blockhash2 := block2.BlockHash() + blockhash3 := block3.BlockHash() + + inv1 := wire.NewInvVect(wire.InvTypeWitnessBlock, &blockhash1) + inv2 := wire.NewInvVect(wire.InvTypeWitnessBlock, &blockhash2) + inv3 := wire.NewInvVect(wire.InvTypeWitnessBlock, &blockhash3) + + // Determine the size of one of the blocks. + sz, _ := (&cache.CacheableBlock{Block: btcutil.NewBlock(block1)}).Size() + + // A new Cache is set up with a capacity of 2 blocks + bc := NewBlockCache(2 * sz) + + mc.addBlock(&wire.MsgBlock{}, 1) + mc.addBlock(&wire.MsgBlock{}, 2) + mc.addBlock(&wire.MsgBlock{}, 3) + + // We expect the initial Cache to be empty + require.Equal(t, 0, bc.Cache.Len()) + + // After calling getBlock for block1, it is expected that the Cache + // will have a size of 1 and will contain block1. One chain backends + // call is expected to fetch the block. + _, err := bc.GetBlock(&blockhash1, getBlockImpl) + require.NoError(t, err) + require.Equal(t, 1, bc.Cache.Len()) + require.Equal(t, 1, mc.chainCallCount) + mc.resetChainCallCount() + + _, err = bc.Cache.Get(*inv1) + require.NoError(t, err) + + // After calling getBlock for block2, it is expected that the Cache + // will have a size of 2 and will contain both block1 and block2. + // One chain backends call is expected to fetch the block. + _, err = bc.GetBlock(&blockhash2, getBlockImpl) + require.NoError(t, err) + require.Equal(t, 2, bc.Cache.Len()) + require.Equal(t, 1, mc.chainCallCount) + mc.resetChainCallCount() + + _, err = bc.Cache.Get(*inv1) + require.NoError(t, err) + + _, err = bc.Cache.Get(*inv2) + require.NoError(t, err) + + // getBlock is called again for block1 to make block2 the LFU block. + // No call to the chain backend is expected since block 1 is already + // in the Cache. + _, err = bc.GetBlock(&blockhash1, getBlockImpl) + require.NoError(t, err) + require.Equal(t, 2, bc.Cache.Len()) + require.Equal(t, 0, mc.chainCallCount) + mc.resetChainCallCount() + + // Since the Cache is now at its max capacity, it is expected that when + // getBlock is called for a new block then the LFU block will be + // evicted. It is expected that block2 will be evicted. After calling + // Getblock for block3, it is expected that the Cache will have a + // length of 2 and will contain block 1 and 3. + _, err = bc.GetBlock(&blockhash3, getBlockImpl) + require.NoError(t, err) + require.Equal(t, 2, bc.Cache.Len()) + require.Equal(t, 1, mc.chainCallCount) + mc.resetChainCallCount() + + _, err = bc.Cache.Get(*inv1) + require.NoError(t, err) + + _, err = bc.Cache.Get(*inv2) + require.True(t, errors.Is(err, cache.ErrElementNotFound)) + + _, err = bc.Cache.Get(*inv3) + require.NoError(t, err) +} + +// TestBlockCacheMutexes is used to test that concurrent calls to GetBlock with +// the same block hash does not result in multiple calls to the chain backend. +// In other words this tests the HashMutex. +func TestBlockCacheMutexes(t *testing.T) { + mc := newMockChain() + getBlockImpl := mc.GetBlock + + block1 := &wire.MsgBlock{Header: wire.BlockHeader{Nonce: 1}} + block2 := &wire.MsgBlock{Header: wire.BlockHeader{Nonce: 2}} + + blockhash1 := block1.BlockHash() + blockhash2 := block2.BlockHash() + + // Determine the size of the block. + sz, _ := (&cache.CacheableBlock{Block: btcutil.NewBlock(block1)}).Size() + + // A new Cache is set up with a capacity of 2 blocks + bc := NewBlockCache(2 * sz) + + mc.addBlock(&wire.MsgBlock{}, 1) + mc.addBlock(&wire.MsgBlock{}, 2) + + // Spin off multiple go routines and ensure that concurrent calls to the + // GetBlock method does not result in multiple calls to the chain + // backend. + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func(e int) { + if e%2 == 0 { + _, err := bc.GetBlock(&blockhash1, getBlockImpl) + assert.NoError(t, err) + } else { + _, err := bc.GetBlock(&blockhash2, getBlockImpl) + assert.NoError(t, err) + } + + wg.Done() + }(i) + } + + wg.Wait() + require.Equal(t, 2, mc.chainCallCount) +}