From b26560e0f4ccf5d1db43dd92a64c4282f3df7c8e Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Mon, 2 Oct 2017 16:38:45 +0200 Subject: [PATCH] channeldb: add DisconnectBlockAtHeight This commit adds the method DisconnectBlockAtHeight to the channel graph database, making it possible to "rewind" the database in case a block is disconnected from the main chain. To accomplish this, a prune log is introduced, making it possible to keep track of the point in time where the database was pruned. This is necessary for the case where lnd might wake up on a stale branch, and must "walk backwards" on this branch after it finds a common block fro the graph database and the new main chain. --- channeldb/graph.go | 182 +++++++++++++++++++++++++++++++++------- channeldb/graph_test.go | 163 +++++++++++++++++++++++++++++++++++ 2 files changed, 317 insertions(+), 28 deletions(-) diff --git a/channeldb/graph.go b/channeldb/graph.go index d9f21ef1..74c58908 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -3,8 +3,10 @@ package channeldb import ( "bytes" "encoding/binary" + "fmt" "image/color" "io" + "math" "net" "time" @@ -86,12 +88,17 @@ var ( // number of channels, etc. graphMetaBucket = []byte("graph-meta") - // pruneTipKey is a key within the above graphMetaBucket that stores - // the best known blockhash+height that the channel graph has been - // known to be pruned to. Once a new block is discovered, any channels - // that have been closed (by spending the outpoint) can safely be - // removed from the graph. - pruneTipKey = []byte("prune-tip") + // pruneLogBucket is a bucket within the graphMetaBucket that stores + // a mapping from the block height to the hash for the blocks used to + // prune the graph. + // Once a new block is discovered, any channels that have been closed + // (by spending the outpoint) can safely be removed from the graph, and + // the block is added to the prune log. We need to keep such a log for + // the case where a reorg happens, and we must "rewind" the state of the + // graph by removing channels that were previously confirmed. In such a + // case we'll remove all entries from the prune log with a block height + // that no longer exists. + pruneLogBucket = []byte("prune-log") edgeBloomKey = []byte("edge-bloom") nodeBloomKey = []byte("node-bloom") @@ -560,11 +567,12 @@ func (c *ChannelGraph) UpdateChannelEdge(edge *ChannelEdgeInfo) error { } const ( - // pruneTipBytes is the total size of the value which stores the - // current prune tip of the graph. The prune tip indicates if the - // channel graph is in sync with the current UTXO state. The structure - // is: blockHash || blockHeight, taking 36 bytes total. - pruneTipBytes = 32 + 4 + // pruneTipBytes is the total size of the value which stores a prune + // entry of the graph in the prune log. The "prune tip" is the last + // entry in the prune log, and indicates if the channel graph is in + // sync with the current UTXO state. The structure of the value + // is: blockHash, taking 32 bytes total. + pruneTipBytes = 32 ) // PruneGraph prunes newly closed channels from the channel graph in response @@ -641,14 +649,21 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint, return err } - // With the graph pruned, update the current "prune tip" which - // can be used to check if the graph is fully synced with the - // current UTXO state. + pruneBucket, err := metaBucket.CreateBucketIfNotExists(pruneLogBucket) + if err != nil { + return err + } + + // With the graph pruned, add a new entry to the prune log, + // which can be used to check if the graph is fully synced with + // the current UTXO state. + var blockHeightBytes [4]byte + byteOrder.PutUint32(blockHeightBytes[:], blockHeight) + var newTip [pruneTipBytes]byte copy(newTip[:], blockHash[:]) - byteOrder.PutUint32(newTip[32:], blockHeight) - return metaBucket.Put(pruneTipKey, newTip[:]) + return pruneBucket.Put(blockHeightBytes[:], newTip[:]) }) if err != nil { return nil, err @@ -657,15 +672,115 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint, return chansClosed, nil } +// DisconnectBlockAtHeight is used to indicate that the block specified +// by the passed height has been disconnected from the main chain. This +// will "rewind" the graph back to the height below, deleting channels +// that are no longer confirmed from the graph. The prune log will be +// set to the last prune height valid for the remaining chain. +// Channels that were removed from the graph resulting from the +// disconnected block are returned. +func (c *ChannelGraph) DisconnectBlockAtHeight(height uint32) ([]*ChannelEdgeInfo, + error) { + + // Every channel having a ShortChannelID starting at 'height' + // will no longer be confirmed. + startShortChanID := lnwire.ShortChannelID{ + BlockHeight: height, + } + + // Delete everything after this height from the db. + endShortChanID := lnwire.ShortChannelID{ + BlockHeight: math.MaxUint32 & 0x00ffffff, + TxIndex: math.MaxUint32 & 0x00ffffff, + TxPosition: math.MaxUint16, + } + // The block height will be the 3 first bytes of the channel IDs. + var chanIDStart [8]byte + byteOrder.PutUint64(chanIDStart[:], startShortChanID.ToUint64()) + var chanIDEnd [8]byte + byteOrder.PutUint64(chanIDEnd[:], endShortChanID.ToUint64()) + + // Keep track of the channels that are removed from the graph. + var removedChans []*ChannelEdgeInfo + + if err := c.db.Update(func(tx *bolt.Tx) error { + edges, err := tx.CreateBucketIfNotExists(edgeBucket) + if err != nil { + return err + } + + edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket) + if err != nil { + return err + } + + chanIndex, err := edges.CreateBucketIfNotExists(channelPointBucket) + if err != nil { + return err + } + + // Scan from chanIDStart to chanIDEnd, deleting every + // found edge. + cursor := edgeIndex.Cursor() + for k, v := cursor.Seek(chanIDStart[:]); k != nil && + bytes.Compare(k, chanIDEnd[:]) <= 0; k, v = cursor.Next() { + + edgeInfoReader := bytes.NewReader(v) + edgeInfo, err := deserializeChanEdgeInfo(edgeInfoReader) + if err != nil { + return err + } + err = delChannelByEdge(edges, edgeIndex, chanIndex, + &edgeInfo.ChannelPoint) + if err != nil && err != ErrEdgeNotFound { + return err + } + + removedChans = append(removedChans, edgeInfo) + } + + // Delete all the entries in the prune log having a height + // greater or equal to the block disconnected. + metaBucket, err := tx.CreateBucketIfNotExists(graphMetaBucket) + if err != nil { + return err + } + + pruneBucket, err := metaBucket.CreateBucketIfNotExists(pruneLogBucket) + if err != nil { + return err + } + + var pruneKeyStart [4]byte + byteOrder.PutUint32(pruneKeyStart[:], height) + + var pruneKeyEnd [4]byte + byteOrder.PutUint32(pruneKeyEnd[:], math.MaxUint32) + + pruneCursor := pruneBucket.Cursor() + for k, _ := pruneCursor.Seek(pruneKeyStart[:]); k != nil && + bytes.Compare(k, pruneKeyEnd[:]) <= 0; k, _ = pruneCursor.Next() { + if err := pruneCursor.Delete(); err != nil { + return err + } + } + + return nil + }); err != nil { + return nil, err + } + + return removedChans, nil +} + // PruneTip returns the block height and hash of the latest block that has been // used to prune channels in the graph. Knowing the "prune tip" allows callers // to tell if the graph is currently in sync with the current best known UTXO // state. func (c *ChannelGraph) PruneTip() (*chainhash.Hash, uint32, error) { var ( - currentTip [pruneTipBytes]byte - tipHash chainhash.Hash - tipHeight uint32 + tipHash chainhash.Hash + tipHeight uint32 ) err := c.db.View(func(tx *bolt.Tx) error { @@ -673,12 +788,24 @@ func (c *ChannelGraph) PruneTip() (*chainhash.Hash, uint32, error) { if graphMeta == nil { return ErrGraphNotFound } - - tipBytes := graphMeta.Get(pruneTipKey) - if tipBytes == nil { + pruneBucket := graphMeta.Bucket(pruneLogBucket) + if pruneBucket == nil { return ErrGraphNeverPruned } - copy(currentTip[:], tipBytes) + + pruneCursor := pruneBucket.Cursor() + + // The prune key with the largest block height will be our + // prune tip. + k, v := pruneCursor.Last() + if k == nil { + return ErrGraphNeverPruned + } + + // Once we have the prune tip, the value will be the block hash, + // and the key the block height. + copy(tipHash[:], v[:]) + tipHeight = byteOrder.Uint32(k[:]) return nil }) @@ -686,11 +813,6 @@ func (c *ChannelGraph) PruneTip() (*chainhash.Hash, uint32, error) { return nil, 0, err } - // Once we have the prune tip, the first 32 bytes are the block hash, - // with the latter 4 bytes being the block height. - copy(tipHash[:], currentTip[:32]) - tipHeight = byteOrder.Uint32(currentTip[32:]) - return &tipHash, tipHeight, nil } @@ -778,6 +900,10 @@ func delChannelByEdge(edges *bolt.Bucket, edgeIndex *bolt.Bucket, // the keys which house both of the directed edges for this // channel. nodeKeys := edgeIndex.Get(chanID) + if nodeKeys == nil { + return fmt.Errorf("could not find nodekeys for chanID %v", + chanID) + } // The edge key is of the format pubKey || chanID. First we // construct the latter half, populating the channel ID. diff --git a/channeldb/graph_test.go b/channeldb/graph_test.go index 587ade7d..e48d49ab 100644 --- a/channeldb/graph_test.go +++ b/channeldb/graph_test.go @@ -5,6 +5,7 @@ import ( "crypto/sha256" "fmt" "image/color" + "math" "math/big" prand "math/rand" "net" @@ -354,6 +355,168 @@ func TestEdgeInsertionDeletion(t *testing.T) { } } +// TestDisconnecteBlockAtHeight checks that the pruned state of the channel +// database is what we expect after calling DisconnectBlockAtHeight. +func TestDisconnecteBlockAtHeight(t *testing.T) { + t.Parallel() + + db, cleanUp, err := makeTestDB() + defer cleanUp() + if err != nil { + t.Fatalf("unable to make test database: %v", err) + } + + graph := db.ChannelGraph() + + // We'd like to test the insertion/deletion of edges, so we create two + // vertexes to connect. + node1, err := createTestVertex(db) + if err != nil { + t.Fatalf("unable to create test node: %v", err) + } + node2, err := createTestVertex(db) + if err != nil { + t.Fatalf("unable to create test node: %v", err) + } + + // In addition to the fake vertexes we create some fake channel + // identifiers. + var spendOutputs []*wire.OutPoint + var blockHash chainhash.Hash + copy(blockHash[:], bytes.Repeat([]byte{1}, 32)) + + // Prune the graph a few times to make sure we have entries in the + // prune log. + _, err = graph.PruneGraph(spendOutputs, &blockHash, 155) + if err != nil { + t.Fatalf("unable to prune graph: %v", err) + } + var blockHash2 chainhash.Hash + copy(blockHash2[:], bytes.Repeat([]byte{2}, 32)) + + _, err = graph.PruneGraph(spendOutputs, &blockHash2, 156) + if err != nil { + t.Fatalf("unable to prune graph: %v", err) + } + + // We'll create 3 almost identical edges, so first create a helper + // method containing all logic for doing so. + createEdge := func(height uint32, txIndex uint32, txPosition uint16, + outPointIndex uint32) ChannelEdgeInfo { + shortChanID := lnwire.ShortChannelID{ + BlockHeight: height, + TxIndex: txIndex, + TxPosition: txPosition, + } + outpoint := wire.OutPoint{ + Hash: rev, + Index: outPointIndex, + } + + edgeInfo := ChannelEdgeInfo{ + ChannelID: shortChanID.ToUint64(), + ChainHash: key, + NodeKey1: node1.PubKey, + NodeKey2: node2.PubKey, + BitcoinKey1: node1.PubKey, + BitcoinKey2: node2.PubKey, + AuthProof: &ChannelAuthProof{ + NodeSig1: testSig, + NodeSig2: testSig, + BitcoinSig1: testSig, + BitcoinSig2: testSig, + }, + ChannelPoint: outpoint, + Capacity: 9000, + } + return edgeInfo + } + + // Create an edge which has its block height at 156. + height := uint32(156) + edgeInfo := createEdge(height, 0, 0, 0) + + // Create an edge with block height 157. We give it + // maximum values for tx index and position, to make + // sure our database range scan get edges from the + // entire range. + edgeInfo2 := createEdge(height+1, math.MaxUint32&0x00ffffff, + math.MaxUint16, 1) + + // Create a third edge, this with a block height of 155. + edgeInfo3 := createEdge(height-1, 0, 0, 2) + + // Now add all these new edges to the database. + if err := graph.AddChannelEdge(&edgeInfo); err != nil { + t.Fatalf("unable to create channel edge: %v", err) + } + + if err := graph.AddChannelEdge(&edgeInfo2); err != nil { + t.Fatalf("unable to create channel edge: %v", err) + } + + if err := graph.AddChannelEdge(&edgeInfo3); err != nil { + t.Fatalf("unable to create channel edge: %v", err) + } + + // Call DisconnectBlockAtHeight, which should prune every channel + // that has an funding height of 'height' or greater. + removed, err := graph.DisconnectBlockAtHeight(uint32(height)) + if err != nil { + t.Fatalf("unable to prune %v", err) + } + + // The two edges should have been removed. + if len(removed) != 2 { + t.Fatalf("expected two edges to be removed from graph, "+ + "only %d were", len(removed)) + } + if removed[0].ChannelID != edgeInfo.ChannelID { + t.Fatalf("expected edge to be removed from graph") + } + if removed[1].ChannelID != edgeInfo2.ChannelID { + t.Fatalf("expected edge to be removed from graph") + } + + // The two first edges should be removed from the db. + _, _, has, err := graph.HasChannelEdge(edgeInfo.ChannelID) + if err != nil { + t.Fatalf("unable to query for edge: %v", err) + } + if has { + t.Fatalf("edge1 was not pruned from the graph") + } + _, _, has, err = graph.HasChannelEdge(edgeInfo2.ChannelID) + if err != nil { + t.Fatalf("unable to query for edge: %v", err) + } + if has { + t.Fatalf("edge2 was not pruned from the graph") + } + + // Edge 3 should not be removed. + _, _, has, err = graph.HasChannelEdge(edgeInfo3.ChannelID) + if err != nil { + t.Fatalf("unable to query for edge: %v", err) + } + if !has { + t.Fatalf("edge3 was pruned from the graph") + } + + // PruneTip should be set to the blockHash we specified for the block + // at height 155. + hash, h, err := graph.PruneTip() + if err != nil { + t.Fatalf("unable to get prune tip: %v", err) + } + if !blockHash.IsEqual(hash) { + t.Fatalf("expected best block to be %x, was %x", blockHash, hash) + } + if h != height-1 { + t.Fatalf("expected best block height to be %d, was %d", height-1, h) + } +} + func assertEdgeInfoEqual(t *testing.T, e1 *ChannelEdgeInfo, e2 *ChannelEdgeInfo) {