From 256db86b0247abb04be05c8a2fb75a03cceb5821 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Mon, 2 Oct 2017 17:54:29 +0200 Subject: [PATCH] routing: make channel router aware of stale blocks This commit make the channel router handle the case where it wakes up on a stale branch, and also the case where a reorg happens while it is active. --- routing/notifications_test.go | 34 ++- routing/router.go | 77 ++++++- routing/router_test.go | 401 +++++++++++++++++++++++++++++++++- 3 files changed, 498 insertions(+), 14 deletions(-) diff --git a/routing/notifications_test.go b/routing/notifications_test.go index e93bae46..1f24c398 100644 --- a/routing/notifications_test.go +++ b/routing/notifications_test.go @@ -147,7 +147,9 @@ func (m *mockChain) GetBestBlock() (*chainhash.Hash, int32, error) { m.RLock() defer m.RUnlock() - return nil, m.bestHeight, nil + blockHash := m.blockIndex[uint32(m.bestHeight)] + + return &blockHash, m.bestHeight, nil } func (m *mockChain) GetTransaction(txid *chainhash.Hash) (*wire.MsgTx, error) { @@ -162,7 +164,6 @@ func (m *mockChain) GetBlockHash(blockHeight int64) (*chainhash.Hash, error) { if !ok { return nil, fmt.Errorf("can't find block hash, for "+ "height %v", blockHeight) - } return &hash, nil @@ -185,9 +186,9 @@ func (m *mockChain) GetUtxo(op *wire.OutPoint, _ uint32) (*wire.TxOut, error) { return &utxo, nil } -func (m *mockChain) addBlock(block *wire.MsgBlock, height uint32) { +func (m *mockChain) addBlock(block *wire.MsgBlock, height uint32, nonce uint32) { m.Lock() - block.Header.Nonce = height + block.Header.Nonce = nonce hash := block.Header.BlockHash() m.blocks[hash] = block m.blockIndex[height] = hash @@ -250,6 +251,19 @@ func (m *mockChainView) notifyBlock(hash chainhash.Hash, height uint32, } } +func (m *mockChainView) notifyStaleBlock(hash chainhash.Hash, height uint32, + txns []*wire.MsgTx) { + + m.RLock() + defer m.RUnlock() + + m.staleBlocks <- &chainview.FilteredBlock{ + Hash: hash, + Height: height, + Transactions: txns, + } +} + func (m *mockChainView) FilteredBlocks() <-chan *chainview.FilteredBlock { return m.newBlocks } @@ -259,7 +273,7 @@ func (m *mockChainView) DisconnectedBlocks() <-chan *chainview.FilteredBlock { } func (m *mockChainView) FilterBlock(blockHash *chainhash.Hash) (*chainview.FilteredBlock, error) { - return nil, nil + return &chainview.FilteredBlock{}, nil } func (m *mockChainView) Start() error { @@ -295,7 +309,7 @@ func TestEdgeUpdateNotification(t *testing.T) { fundingBlock := &wire.MsgBlock{ Transactions: []*wire.MsgTx{fundingTx}, } - ctx.chain.addBlock(fundingBlock, chanID.BlockHeight) + ctx.chain.addBlock(fundingBlock, chanID.BlockHeight, chanID.BlockHeight) // Next we'll create two test nodes that the fake channel will be open // between. @@ -477,7 +491,7 @@ func TestNodeUpdateNotification(t *testing.T) { fundingBlock := &wire.MsgBlock{ Transactions: []*wire.MsgTx{fundingTx}, } - ctx.chain.addBlock(fundingBlock, chanID.BlockHeight) + ctx.chain.addBlock(fundingBlock, chanID.BlockHeight, chanID.BlockHeight) // Create two nodes acting as endpoints in the created channel, and use // them to trigger notifications by sending updated node announcement @@ -658,7 +672,7 @@ func TestNotificationCancellation(t *testing.T) { fundingBlock := &wire.MsgBlock{ Transactions: []*wire.MsgTx{fundingTx}, } - ctx.chain.addBlock(fundingBlock, chanID.BlockHeight) + ctx.chain.addBlock(fundingBlock, chanID.BlockHeight, chanID.BlockHeight) // We'll create a fresh new node topology update to feed to the channel // router. @@ -743,7 +757,7 @@ func TestChannelCloseNotification(t *testing.T) { fundingBlock := &wire.MsgBlock{ Transactions: []*wire.MsgTx{fundingTx}, } - ctx.chain.addBlock(fundingBlock, chanID.BlockHeight) + ctx.chain.addBlock(fundingBlock, chanID.BlockHeight, chanID.BlockHeight) // Next we'll create two test nodes that the fake channel will be open // between. @@ -797,7 +811,7 @@ func TestChannelCloseNotification(t *testing.T) { }, }, } - ctx.chain.addBlock(newBlock, blockHeight) + ctx.chain.addBlock(newBlock, blockHeight, blockHeight) ctx.chainView.notifyBlock(newBlock.Header.BlockHash(), blockHeight, newBlock.Transactions) diff --git a/routing/router.go b/routing/router.go index 59ceb22c..904aeade 100644 --- a/routing/router.go +++ b/routing/router.go @@ -185,9 +185,14 @@ type ChannelRouter struct { routeCache map[routeTuple][]*Route // newBlocks is a channel in which new blocks connected to the end of - // the main chain are sent over. + // the main chain are sent over, and blocks updated after a call to + // UpdateFilter. newBlocks <-chan *chainview.FilteredBlock + // staleBlocks is a channel in which blocks disconnected fromt the end + // of our currently known best chain are sent over. + staleBlocks <-chan *chainview.FilteredBlock + // networkUpdates is a channel that carries new topology updates // messages from outside the ChannelRouter to be processed by the // networkHandler. @@ -266,6 +271,7 @@ func (r *ChannelRouter) Start() error { // Once the instance is active, we'll fetch the channel we'll receive // notifications over. r.newBlocks = r.cfg.ChainView.FilteredBlocks() + r.staleBlocks = r.cfg.ChainView.DisconnectedBlocks() // Before we begin normal operation of the router, we first need to // synchronize the channel graph to the latest state of the UTXO set. @@ -352,6 +358,46 @@ func (r *ChannelRouter) syncGraphWithChain() error { return nil } + // If the main chain blockhash at prune height is different from the + // prune hash, this might indicate the database is on a stale branch. + mainBlockHash, err := r.cfg.Chain.GetBlockHash(int64(pruneHeight)) + if err != nil { + return err + } + + // While we are on a stale branch of the chain, walk backwards to find + // first common block. + for !pruneHash.IsEqual(mainBlockHash) { + log.Infof("channel graph is stale. Disconnecting block %v "+ + "(hash=%v)", pruneHeight, pruneHash) + // Prune the graph for every channel that was opened at height + // >= pruneHeigth. + _, err := r.cfg.Graph.DisconnectBlockAtHeight(pruneHeight) + if err != nil { + return err + } + + pruneHash, pruneHeight, err = r.cfg.Graph.PruneTip() + if err != nil { + switch { + // If at this point the graph has never been pruned, we + // can exit as this entails we are back to the point + // where it hasn't seen any block or created channels, + // alas there's nothing left to prune. + case err == channeldb.ErrGraphNeverPruned: + return nil + case err == channeldb.ErrGraphNotFound: + return nil + default: + return err + } + } + mainBlockHash, err = r.cfg.Chain.GetBlockHash(int64(pruneHeight)) + if err != nil { + return err + } + } + log.Infof("Syncing channel graph from height=%v (hash=%v) to height=%v "+ "(hash=%v)", pruneHeight, pruneHash, bestHeight, bestHash) @@ -449,6 +495,35 @@ func (r *ChannelRouter) networkHandler() { // after N blocks pass with no corresponding // announcements. + case chainUpdate, ok := <-r.staleBlocks: + // If the channel has been closed, then this indicates + // the daemon is shutting down, so we exit ourselves. + if !ok { + return + } + + // Since this block is stale, we update our best height + // to the previous block. + blockHeight := uint32(chainUpdate.Height) + r.bestHeight = blockHeight - 1 + + // Update the channel graph to reflect that this block + // was disconnected. + _, err := r.cfg.Graph.DisconnectBlockAtHeight(blockHeight) + if err != nil { + log.Errorf("unable to prune graph with stale "+ + "block: %v", err) + continue + } + + // Invalidate the route cache, as some channels might + // not be confirmed anymore. + r.routeCacheMtx.Lock() + r.routeCache = make(map[routeTuple][]*Route) + r.routeCacheMtx.Unlock() + + // TODO(halseth): notify client about the reorg? + // A new block has arrived, so we can prune the channel graph // of any channels which were closed in the block. case chainUpdate, ok := <-r.newBlocks: diff --git a/routing/router_test.go b/routing/router_test.go index 765b9297..c13c11c3 100644 --- a/routing/router_test.go +++ b/routing/router_test.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "image/color" + "math/rand" "strings" "testing" "time" @@ -400,7 +401,7 @@ func TestAddProof(t *testing.T) { fundingBlock := &wire.MsgBlock{ Transactions: []*wire.MsgTx{fundingTx}, } - ctx.chain.addBlock(fundingBlock, chanID.BlockHeight) + ctx.chain.addBlock(fundingBlock, chanID.BlockHeight, chanID.BlockHeight) // After utxo was recreated adding the edge without the proof. edge := &channeldb.ChannelEdgeInfo{ @@ -502,7 +503,7 @@ func TestAddEdgeUnknownVertexes(t *testing.T) { fundingBlock := &wire.MsgBlock{ Transactions: []*wire.MsgTx{fundingTx}, } - ctx.chain.addBlock(fundingBlock, chanID.BlockHeight) + ctx.chain.addBlock(fundingBlock, chanID.BlockHeight, chanID.BlockHeight) edge := &channeldb.ChannelEdgeInfo{ ChannelID: chanID.ToUint64(), @@ -600,7 +601,7 @@ func TestAddEdgeUnknownVertexes(t *testing.T) { fundingBlock = &wire.MsgBlock{ Transactions: []*wire.MsgTx{fundingTx}, } - ctx.chain.addBlock(fundingBlock, chanID.BlockHeight) + ctx.chain.addBlock(fundingBlock, chanID.BlockHeight, chanID.BlockHeight) edge = &channeldb.ChannelEdgeInfo{ ChannelID: chanID.ToUint64(), @@ -718,3 +719,397 @@ func TestAddEdgeUnknownVertexes(t *testing.T) { t.Fatalf("fetched node not equal to original") } } + +// TestWakeUpOnStaleBranch tests that upon startup of the ChannelRouter, if the +// the chain previously reflected in the channel graph is stale (overtaken by a +// longer chain), the channel router will prune the graph for any channels +// confirmed on the stale chain, and resync to the main chain. +func TestWakeUpOnStaleBranch(t *testing.T) { + t.Parallel() + + const startingBlockHeight = 101 + ctx, cleanUp, err := createTestCtx(startingBlockHeight) + defer cleanUp() + if err != nil { + t.Fatalf("unable to create router: %v", err) + } + + const chanValue = 10000 + + // chanID1 will not be reorged out. + var chanID1 uint64 + + // chanID2 will be reorged out. + var chanID2 uint64 + + // Create 10 common blocks, confirming chanID1. + for i := uint32(1); i <= 10; i++ { + block := &wire.MsgBlock{ + Transactions: []*wire.MsgTx{}, + } + height := startingBlockHeight + i + if i == 5 { + fundingTx, _, chanID, err := createChannelEdge(ctx, + bitcoinKey1.SerializeCompressed(), + bitcoinKey2.SerializeCompressed(), + chanValue, height) + if err != nil { + t.Fatalf("unable create channel edge: %v", err) + } + block.Transactions = append(block.Transactions, + fundingTx) + chanID1 = chanID.ToUint64() + + } + ctx.chain.addBlock(block, height, rand.Uint32()) + ctx.chain.setBestBlock(int32(height)) + ctx.chainView.notifyBlock(block.BlockHash(), height, + []*wire.MsgTx{}) + } + + // Give time to process new blocks + time.Sleep(time.Millisecond * 500) + + _, forkHeight, err := ctx.chain.GetBestBlock() + if err != nil { + t.Fatalf("unable to ge best block: %v", err) + } + + // Create 10 blocks on the minority chain, confirming chanID2. + for i := uint32(1); i <= 10; i++ { + block := &wire.MsgBlock{ + Transactions: []*wire.MsgTx{}, + } + height := uint32(forkHeight) + i + if i == 5 { + fundingTx, _, chanID, err := createChannelEdge(ctx, + bitcoinKey1.SerializeCompressed(), + bitcoinKey2.SerializeCompressed(), + chanValue, height) + if err != nil { + t.Fatalf("unable create channel edge: %v", err) + } + block.Transactions = append(block.Transactions, + fundingTx) + chanID2 = chanID.ToUint64() + } + ctx.chain.addBlock(block, height, rand.Uint32()) + ctx.chain.setBestBlock(int32(height)) + ctx.chainView.notifyBlock(block.BlockHash(), height, + []*wire.MsgTx{}) + } + // Give time to process new blocks + time.Sleep(time.Millisecond * 500) + + // Now add the two edges to the channel graph, and check that they + // correctly show up in the database. + node1, err := createTestNode() + if err != nil { + t.Fatalf("unable to create test node: %v", err) + } + node2, err := createTestNode() + if err != nil { + t.Fatalf("unable to create test node: %v", err) + } + + edge1 := &channeldb.ChannelEdgeInfo{ + ChannelID: chanID1, + NodeKey1: node1.PubKey, + NodeKey2: node2.PubKey, + BitcoinKey1: bitcoinKey1, + BitcoinKey2: bitcoinKey2, + AuthProof: &channeldb.ChannelAuthProof{ + NodeSig1: testSig, + NodeSig2: testSig, + BitcoinSig1: testSig, + BitcoinSig2: testSig, + }, + } + + if err := ctx.router.AddEdge(edge1); err != nil { + t.Fatalf("unable to add edge: %v", err) + } + + edge2 := &channeldb.ChannelEdgeInfo{ + ChannelID: chanID2, + NodeKey1: node1.PubKey, + NodeKey2: node2.PubKey, + BitcoinKey1: bitcoinKey1, + BitcoinKey2: bitcoinKey2, + AuthProof: &channeldb.ChannelAuthProof{ + NodeSig1: testSig, + NodeSig2: testSig, + BitcoinSig1: testSig, + BitcoinSig2: testSig, + }, + } + + if err := ctx.router.AddEdge(edge2); err != nil { + t.Fatalf("unable to add edge: %v", err) + } + + // Check that the fundingTxs are in the graph db. + _, _, has, err := ctx.graph.HasChannelEdge(chanID1) + if err != nil { + t.Fatalf("error looking for edge: %v", chanID1) + } + if !has { + t.Fatalf("could not find edge in graph") + } + + _, _, has, err = ctx.graph.HasChannelEdge(chanID2) + if err != nil { + t.Fatalf("error looking for edge: %v", chanID2) + } + if !has { + t.Fatalf("could not find edge in graph") + } + + // Stop the router, so we can reorg the chain while its offline. + if err := ctx.router.Stop(); err != nil { + t.Fatalf("unable to stop router: %v", err) + } + + // Create a 15 block fork. + for i := uint32(1); i <= 15; i++ { + block := &wire.MsgBlock{ + Transactions: []*wire.MsgTx{}, + } + height := uint32(forkHeight) + i + ctx.chain.addBlock(block, height, rand.Uint32()) + ctx.chain.setBestBlock(int32(height)) + } + + // Give time to process new blocks. + time.Sleep(time.Millisecond * 500) + + // Create new router with same graph database. + router, err := New(Config{ + Graph: ctx.graph, + Chain: ctx.chain, + ChainView: ctx.chainView, + SendToSwitch: func(_ *btcec.PublicKey, + _ *lnwire.UpdateAddHTLC, _ *sphinx.Circuit) ([32]byte, error) { + return [32]byte{}, nil + }, + ChannelPruneExpiry: time.Hour * 24, + GraphPruneInterval: time.Hour * 2, + }) + if err != nil { + t.Fatalf("unable to create router %v", err) + } + + // It should resync to the longer chain on startup. + if err := router.Start(); err != nil { + t.Fatalf("unable to start router: %v", err) + } + + // The channel with chanID2 should not be in the database anymore, + // since it is not confirmed on the longest chain. chanID1 should + // still be. + _, _, has, err = ctx.graph.HasChannelEdge(chanID1) + if err != nil { + t.Fatalf("error looking for edge: %v", chanID1) + } + if !has { + t.Fatalf("did not find edge in graph") + } + + _, _, has, err = ctx.graph.HasChannelEdge(chanID2) + if err != nil { + t.Fatalf("error looking for edge: %v", chanID2) + } + if has { + t.Fatalf("found edge in graph") + } + +} + +// TestDisconnectedBlocks checks that the router handles a reorg happening +// when it is active. +func TestDisconnectedBlocks(t *testing.T) { + t.Parallel() + + const startingBlockHeight = 101 + ctx, cleanUp, err := createTestCtx(startingBlockHeight) + defer cleanUp() + if err != nil { + t.Fatalf("unable to create router: %v", err) + } + + const chanValue = 10000 + + // chanID1 will not be reorged out. + var chanID1 uint64 + + // chanID2 will be reorged out. + var chanID2 uint64 + + // Create 10 common blocks, confirming chanID1. + for i := uint32(1); i <= 10; i++ { + block := &wire.MsgBlock{ + Transactions: []*wire.MsgTx{}, + } + height := startingBlockHeight + i + if i == 5 { + fundingTx, _, chanID, err := createChannelEdge(ctx, + bitcoinKey1.SerializeCompressed(), + bitcoinKey2.SerializeCompressed(), + chanValue, height) + if err != nil { + t.Fatalf("unable create channel edge: %v", err) + } + block.Transactions = append(block.Transactions, + fundingTx) + chanID1 = chanID.ToUint64() + + } + ctx.chain.addBlock(block, height, rand.Uint32()) + ctx.chain.setBestBlock(int32(height)) + ctx.chainView.notifyBlock(block.BlockHash(), height, + []*wire.MsgTx{}) + } + + // Give time to process new blocks + time.Sleep(time.Millisecond * 500) + + _, forkHeight, err := ctx.chain.GetBestBlock() + if err != nil { + t.Fatalf("unable to get best block: %v", err) + } + + // Create 10 blocks on the minority chain, confirming chanID2. + var minorityChain []*wire.MsgBlock + for i := uint32(1); i <= 10; i++ { + block := &wire.MsgBlock{ + Transactions: []*wire.MsgTx{}, + } + height := uint32(forkHeight) + i + if i == 5 { + fundingTx, _, chanID, err := createChannelEdge(ctx, + bitcoinKey1.SerializeCompressed(), + bitcoinKey2.SerializeCompressed(), + chanValue, height) + if err != nil { + t.Fatalf("unable create channel edge: %v", err) + } + block.Transactions = append(block.Transactions, + fundingTx) + chanID2 = chanID.ToUint64() + } + minorityChain = append(minorityChain, block) + ctx.chain.addBlock(block, height, rand.Uint32()) + ctx.chain.setBestBlock(int32(height)) + ctx.chainView.notifyBlock(block.BlockHash(), height, + []*wire.MsgTx{}) + } + // Give time to process new blocks + time.Sleep(time.Millisecond * 500) + + // Now add the two edges to the channel graph, and check that they + // correctly show up in the database. + node1, err := createTestNode() + if err != nil { + t.Fatalf("unable to create test node: %v", err) + } + node2, err := createTestNode() + if err != nil { + t.Fatalf("unable to create test node: %v", err) + } + + edge1 := &channeldb.ChannelEdgeInfo{ + ChannelID: chanID1, + NodeKey1: node1.PubKey, + NodeKey2: node2.PubKey, + BitcoinKey1: bitcoinKey1, + BitcoinKey2: bitcoinKey2, + AuthProof: &channeldb.ChannelAuthProof{ + NodeSig1: testSig, + NodeSig2: testSig, + BitcoinSig1: testSig, + BitcoinSig2: testSig, + }, + } + + if err := ctx.router.AddEdge(edge1); err != nil { + t.Fatalf("unable to add edge: %v", err) + } + + edge2 := &channeldb.ChannelEdgeInfo{ + ChannelID: chanID2, + NodeKey1: node1.PubKey, + NodeKey2: node2.PubKey, + BitcoinKey1: bitcoinKey1, + BitcoinKey2: bitcoinKey2, + AuthProof: &channeldb.ChannelAuthProof{ + NodeSig1: testSig, + NodeSig2: testSig, + BitcoinSig1: testSig, + BitcoinSig2: testSig, + }, + } + + if err := ctx.router.AddEdge(edge2); err != nil { + t.Fatalf("unable to add edge: %v", err) + } + + // Check that the fundingTxs are in the graph db. + _, _, has, err := ctx.graph.HasChannelEdge(chanID1) + if err != nil { + t.Fatalf("error looking for edge: %v", chanID1) + } + if !has { + t.Fatalf("could not find edge in graph") + } + + _, _, has, err = ctx.graph.HasChannelEdge(chanID2) + if err != nil { + t.Fatalf("error looking for edge: %v", chanID2) + } + if !has { + t.Fatalf("could not find edge in graph") + } + + // Create a 15 block fork. We first let the chainView notify the + // router about stale blocks, before sending the now connected + // blocks. We do this because we expect this order from the + // chainview. + for i := len(minorityChain) - 1; i >= 0; i-- { + block := minorityChain[i] + height := uint32(forkHeight) + uint32(i) + 1 + ctx.chainView.notifyStaleBlock(block.BlockHash(), height, + block.Transactions) + } + for i := uint32(1); i <= 15; i++ { + block := &wire.MsgBlock{ + Transactions: []*wire.MsgTx{}, + } + height := uint32(forkHeight) + i + ctx.chain.addBlock(block, height, rand.Uint32()) + ctx.chain.setBestBlock(int32(height)) + ctx.chainView.notifyBlock(block.BlockHash(), height, + block.Transactions) + } + + // Give time to process new blocks + time.Sleep(time.Millisecond * 500) + + // The with chanID2 should not be in the database anymore, since it is + // not confirmed on the longest chain. chanID1 should still be. + _, _, has, err = ctx.graph.HasChannelEdge(chanID1) + if err != nil { + t.Fatalf("error looking for edge: %v", chanID1) + } + if !has { + t.Fatalf("did not find edge in graph") + } + + _, _, has, err = ctx.graph.HasChannelEdge(chanID2) + if err != nil { + t.Fatalf("error looking for edge: %v", chanID2) + } + if has { + t.Fatalf("found edge in graph") + } + +}