diff --git a/routing/log.go b/routing/log.go index 6e249457..8e26fe03 100644 --- a/routing/log.go +++ b/routing/log.go @@ -5,11 +5,12 @@ import ( "io" "github.com/btcsuite/btclog" + "github.com/lightningnetwork/lnd/routing/chainview" ) -// log is a logger that is initialized with no output filters. This -// means the package will not perform any logging by default until the caller -// requests it. +// log is a logger that is initialized with no output filters. This means the +// package will not perform any logging by default until the caller requests +// it. var log btclog.Logger // The default amount of logging is none. @@ -17,17 +18,18 @@ func init() { DisableLog() } -// DisableLog disables all library log output. Logging output is disabled -// by default until either UseLogger or SetLogWriter are called. +// DisableLog disables all library log output. Logging output is disabled by +// default until either UseLogger or SetLogWriter are called. func DisableLog() { log = btclog.Disabled } -// UseLogger uses a specified Logger to output package logging info. -// This should be used in preference to SetLogWriter if the caller is also -// using btclog. +// UseLogger uses a specified Logger to output package logging info. This +// should be used in preference to SetLogWriter if the caller is also using +// btclog. func UseLogger(logger btclog.Logger) { log = logger + chainview.UseLogger(logger) } // SetLogWriter uses a specified io.Writer to output package logging info. @@ -53,8 +55,8 @@ func SetLogWriter(w io.Writer, level string) error { return nil } -// logClosure is used to provide a closure over expensive logging operations -// so don't have to be performed when the logging level doesn't warrant it. +// logClosure is used to provide a closure over expensive logging operations so +// don't have to be performed when the logging level doesn't warrant it. type logClosure func() string // String invokes the underlying function and returns the result. diff --git a/routing/notifications_test.go b/routing/notifications_test.go index ab783317..bd4c7123 100644 --- a/routing/notifications_test.go +++ b/routing/notifications_test.go @@ -11,10 +11,10 @@ import ( prand "math/rand" "github.com/go-errors/errors" - "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/routing/chainview" "github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/chaincfg/chainhash" "github.com/roasbeef/btcd/wire" @@ -122,6 +122,10 @@ type mockChain struct { sync.RWMutex } +// A compile time check to ensure mockChain implements the +// lnwallet.BlockChainIO interface. +var _ lnwallet.BlockChainIO = (*mockChain)(nil) + func newMockChain(currentHeight uint32) *mockChain { return &mockChain{ bestHeight: int32(currentHeight), @@ -168,16 +172,11 @@ func (m *mockChain) addUtxo(op wire.OutPoint, out *wire.TxOut) { m.utxos[op] = *out m.Unlock() } -func (m *mockChain) GetUtxo(txid *chainhash.Hash, index uint32) (*wire.TxOut, error) { +func (m *mockChain) GetUtxo(op *wire.OutPoint, _ uint32) (*wire.TxOut, error) { m.RLock() defer m.RUnlock() - op := wire.OutPoint{ - Hash: *txid, - Index: index, - } - - utxo, ok := m.utxos[op] + utxo, ok := m.utxos[*op] if !ok { return nil, fmt.Errorf("utxo not found") } @@ -205,61 +204,68 @@ func (m *mockChain) GetBlock(blockHash *chainhash.Hash) (*wire.MsgBlock, error) return block, nil } -type mockNotifier struct { - clientCounter uint32 - epochClients map[uint32]chan *chainntnfs.BlockEpoch - +type mockChainView struct { sync.RWMutex + + newBlocks chan *chainview.FilteredBlock + staleBlocks chan *chainview.FilteredBlock + + filter map[wire.OutPoint]struct{} } -func newMockNotifier() *mockNotifier { - return &mockNotifier{ - epochClients: make(map[uint32]chan *chainntnfs.BlockEpoch), +// A compile time check to ensure mockChainView implements the +// chainview.FilteredChainView. +var _ chainview.FilteredChainView = (*mockChainView)(nil) + +func newMockChainView() *mockChainView { + return &mockChainView{ + newBlocks: make(chan *chainview.FilteredBlock, 10), + staleBlocks: make(chan *chainview.FilteredBlock, 10), + filter: make(map[wire.OutPoint]struct{}), } } -func (m *mockNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, - numConfs uint32) (*chainntnfs.ConfirmationEvent, error) { +func (m *mockChainView) UpdateFilter(ops []wire.OutPoint, updateHeight uint32) error { + m.Lock() + defer m.Unlock() - return nil, nil -} - -func (m *mockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint) (*chainntnfs.SpendEvent, error) { - return nil, nil -} - -func (m *mockNotifier) notifyBlock(hash chainhash.Hash, height uint32) { - m.RLock() - defer m.RUnlock() - - for _, client := range m.epochClients { - client <- &chainntnfs.BlockEpoch{ - Height: int32(height), - Hash: &hash, - } + for _, op := range ops { + m.filter[op] = struct{}{} } -} -func (m *mockNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { - m.RLock() - defer m.RUnlock() - - epochChan := make(chan *chainntnfs.BlockEpoch) - clientID := m.clientCounter - m.clientCounter++ - m.epochClients[clientID] = epochChan - - return &chainntnfs.BlockEpochEvent{ - Epochs: epochChan, - Cancel: func() {}, - }, nil -} - -func (m *mockNotifier) Start() error { return nil } -func (m *mockNotifier) Stop() error { +func (m *mockChainView) notifyBlock(hash chainhash.Hash, height uint32, + txns []*wire.MsgTx) { + + m.RLock() + defer m.RUnlock() + + m.newBlocks <- &chainview.FilteredBlock{ + Hash: hash, + Height: height, + Transactions: txns, + } +} + +func (m *mockChainView) FilteredBlocks() <-chan *chainview.FilteredBlock { + return m.newBlocks +} + +func (m *mockChainView) DisconnectedBlocks() <-chan *chainview.FilteredBlock { + return m.staleBlocks +} + +func (m *mockChainView) FilterBlock(blockHash *chainhash.Hash) (*chainview.FilteredBlock, error) { + return nil, nil +} + +func (m *mockChainView) Start() error { + return nil +} + +func (m *mockChainView) Stop() error { return nil } @@ -643,7 +649,8 @@ func TestChannelCloseNotification(t *testing.T) { }, } ctx.chain.addBlock(newBlock, blockHeight) - ctx.notifier.notifyBlock(newBlock.Header.BlockHash(), blockHeight) + ctx.chainView.notifyBlock(newBlock.Header.BlockHash(), blockHeight, + newBlock.Transactions) // The notification registered above should be sent, if not we'll time // out and mark the test as failed. diff --git a/routing/router.go b/routing/router.go index 1353b99e..33e8bda1 100644 --- a/routing/router.go +++ b/routing/router.go @@ -9,10 +9,10 @@ import ( "github.com/boltdb/bolt" "github.com/davecgh/go-spew/spew" - "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/routing/chainview" "github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcutil" @@ -97,13 +97,10 @@ type Config struct { // to ensure that the channels advertised are still open. Chain lnwallet.BlockChainIO - // Notifier is an instance of the ChainNotifier that the router uses to - // received notifications of incoming blocks. With each new incoming - // block found, the router may be able to partially prune the channel - // graph as channels may have been pruned. - // TODO(roasbeef): could possibly just replace this with an epoch - // channel. - Notifier chainntnfs.ChainNotifier + // ChainView is an instance of a FilteredChainView which is used to + // watch the sub-set of the UTXO set (the set of active channels) that + // we need in order to properly maintain the channel graph. + ChainView chainview.FilteredChainView // FeeSchema is the set fee schema that will be announced on to the // network. @@ -150,6 +147,8 @@ type ChannelRouter struct { started uint32 stopped uint32 + bestHeight uint32 + // cfg is a copy of the configuration struct that the ChannelRouter was // initialized with. cfg *Config @@ -172,7 +171,7 @@ type ChannelRouter struct { // newBlocks is a channel in which new blocks connected to the end of // the main chain are sent over. - newBlocks <-chan *chainntnfs.BlockEpoch + newBlocks <-chan *chainview.FilteredBlock // networkUpdates is a channel that carries new topology updates // messages from outside the ChannelRouter to be processed by the @@ -231,15 +230,15 @@ func (r *ChannelRouter) Start() error { log.Tracef("Channel Router starting") - // First we register for new notifications of newly discovered blocks. - // We do this immediately so we'll later be able to consume any/all - // blocks which were discovered as we prune the channel graph using a - // snapshot of the chain state. - blockEpochs, err := r.cfg.Notifier.RegisterBlockEpochNtfn() - if err != nil { + // First, we'll start the chain view instance (if it isn't already + // started). + if err := r.cfg.ChainView.Start(); err != nil { return err } - r.newBlocks = blockEpochs.Epochs + + // Once the instance is active, we'll fetch the channel we'll receive + // notifications over. + r.newBlocks = r.cfg.ChainView.FilteredBlocks() // Before we begin normal operation of the router, we first need to // synchronize the channel graph to the latest state of the UTXO set. @@ -247,6 +246,18 @@ func (r *ChannelRouter) Start() error { return err } + // Once we've concluded our manual block pruning, we'll constrcut and + // apply a fresh chain filter to the active FilteredChainView instance. + channelView, err := r.cfg.Graph.ChannelView() + if err != nil && err != channeldb.ErrGraphNoEdgesFound { + return err + } + log.Infof("Filtering chain using %v channels active", len(channelView)) + err = r.cfg.ChainView.UpdateFilter(channelView, r.bestHeight) + if err != nil { + return err + } + r.wg.Add(1) go r.networkHandler() @@ -263,6 +274,10 @@ func (r *ChannelRouter) Stop() error { log.Infof("Channel Router shutting down") + if err := r.cfg.ChainView.Stop(); err != nil { + return err + } + close(r.quit) r.wg.Wait() @@ -280,6 +295,7 @@ func (r *ChannelRouter) syncGraphWithChain() error { if err != nil { return err } + r.bestHeight = uint32(bestHeight) pruneHash, pruneHeight, err := r.cfg.Graph.PruneTip() if err != nil { switch { @@ -317,13 +333,13 @@ func (r *ChannelRouter) syncGraphWithChain() error { // that hasn't yet been consumed by the channel graph. var numChansClosed uint32 for nextHeight := pruneHeight + 1; nextHeight <= uint32(bestHeight); nextHeight++ { - // Using the next height, fetch the next block to use in our - // incremental graph pruning routine. + // Using the next height, request a manual block pruning from + // the chainview for the particular block hash. nextHash, err := r.cfg.Chain.GetBlockHash(int64(nextHeight)) if err != nil { return err } - nextBlock, err := r.cfg.Chain.GetBlock(nextHash) + filterBlock, err := r.cfg.ChainView.FilterBlock(nextHash) if err != nil { return err } @@ -332,7 +348,7 @@ func (r *ChannelRouter) syncGraphWithChain() error { // spent in the block, so collate all the referenced previous // outpoints within each tx and input. var spentOutputs []*wire.OutPoint - for _, tx := range nextBlock.Transactions { + for _, tx := range filterBlock.Transactions { for _, txIn := range tx.TxIn { spentOutputs = append(spentOutputs, &txIn.PreviousOutPoint) @@ -357,7 +373,6 @@ func (r *ChannelRouter) syncGraphWithChain() error { log.Infof("Graph pruning complete: %v channels we're closed since "+ "height %v", numChansClosed, pruneHeight) - return nil } @@ -407,7 +422,7 @@ func (r *ChannelRouter) networkHandler() { // A new block has arrived, so we can prune the channel graph // of any channels which were closed in the block. - case newBlock, ok := <-r.newBlocks: + case chainUpdate, ok := <-r.newBlocks: // If the channel has been closed, then this indicates // the daemon is shutting down, so we exit ourselves. if !ok { @@ -416,22 +431,17 @@ func (r *ChannelRouter) networkHandler() { // Once a new block arrives, we update our running // track of the height of the chain tip. - blockHeight := uint32(newBlock.Height) + blockHeight := uint32(chainUpdate.Height) + r.bestHeight = blockHeight log.Infof("Pruning channel graph using block %v (height=%v)", - newBlock.Hash, blockHeight) - - block, err := r.cfg.Chain.GetBlock(newBlock.Hash) - if err != nil { - log.Errorf("unable to get block: %v", err) - continue - } + chainUpdate.Hash, blockHeight) // We're only interested in all prior outputs that've // been spent in the block, so collate all the // referenced previous outpoints within each tx and // input. var spentOutputs []*wire.OutPoint - for _, tx := range block.Transactions { + for _, tx := range chainUpdate.Transactions { for _, txIn := range tx.TxIn { spentOutputs = append(spentOutputs, &txIn.PreviousOutPoint) @@ -443,14 +453,14 @@ func (r *ChannelRouter) networkHandler() { // of the block being pruned so the prune tip can be // updated. chansClosed, err := r.cfg.Graph.PruneGraph(spentOutputs, - newBlock.Hash, blockHeight) + &chainUpdate.Hash, chainUpdate.Height) if err != nil { log.Errorf("unable to prune routing table: %v", err) continue } log.Infof("Block %v (height=%v) closed %v channels", - newBlock.Hash, blockHeight, len(chansClosed)) + chainUpdate.Hash, blockHeight, len(chansClosed)) if len(chansClosed) == 0 { continue @@ -563,7 +573,8 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error { // Now that we have the funding outpoint of the channel, ensure // that it hasn't yet been spent. If so, then this channel has // been closed so we'll ignore it. - chanUtxo, err := r.cfg.Chain.GetUtxo(&fundingPoint.Hash, fundingPoint.Index) + chanUtxo, err := r.cfg.Chain.GetUtxo(fundingPoint, + channelID.BlockHeight) if err != nil { return errors.Errorf("unable to fetch utxo for "+ "chan_id=%v: %v", msg.ChannelID, err) @@ -588,9 +599,8 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error { // channel edge and also that the announced channel value is // right. if !bytes.Equal(witnessOutput.PkScript, chanUtxo.PkScript) { - return errors.New("pkscipts aren't equal, " + - "which means that either bitcoin keys" + - " are wrong or value don't correponds") + return errors.Errorf("pkScript mismatch: expected %v, "+ + "got %v", witnessOutput.PkScript, chanUtxo.PkScript) } // TODO(roasbeef): this is a hack, needs to be removed @@ -609,6 +619,17 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error { msg.NodeKey2.SerializeCompressed(), fundingPoint, msg.ChannelID, msg.Capacity) + // As a new edge has been added to the channel graph, we'll + // update the current UTXO filter within our active + // FilteredChainView so we are notified if/when this channel is + // closed. + filterUpdate := []wire.OutPoint{*fundingPoint} + err = r.cfg.ChainView.UpdateFilter(filterUpdate, r.bestHeight) + if err != nil { + return errors.Errorf("unable to update chain "+ + "view: %v", err) + } + case *channeldb.ChannelEdgePolicy: channelID := lnwire.NewShortChanIDFromInt(msg.ChannelID) edge1Timestamp, edge2Timestamp, _, err := r.cfg.Graph.HasChannelEdge(msg.ChannelID) @@ -647,19 +668,23 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error { } } - // Before we can update the channel information, we need to get - // the UTXO itself so we can store the proper capacity. + // Before we can update the channel information, we'll ensure + // that the target channel is still open by querying the + // utxo-set for its existence. chanPoint, err := r.fetchChanPoint(&channelID) if err != nil { return errors.Errorf("unable to fetch chan point for "+ "chan_id=%v: %v", msg.ChannelID, err) } - if _, err := r.cfg.Chain.GetUtxo(&chanPoint.Hash, - chanPoint.Index); err != nil { + _, err = r.cfg.Chain.GetUtxo(chanPoint, channelID.BlockHeight) + if err != nil { return errors.Errorf("unable to fetch utxo for "+ "chan_id=%v: %v", msg.ChannelID, err) } + // Now that we know this isn't a stale update, we'll apply the + // new edge policy to the proper directional edge within the + // channel graph. if err = r.cfg.Graph.UpdateEdgePolicy(msg); err != nil { err := errors.Errorf("unable to add channel: %v", err) log.Error(err) @@ -688,6 +713,9 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error { // fetchChanPoint retrieves the original outpoint which is encoded within the // channelID. +// +// TODO(roasbeef): replace iwth call to GetBlockTransaction? (woudl allow to +// later use getblocktxn) func (r *ChannelRouter) fetchChanPoint(chanID *lnwire.ShortChannelID) (*wire.OutPoint, error) { // First fetch the block hash by the block number encoded, then use // that hash to fetch the block itself. diff --git a/routing/router_test.go b/routing/router_test.go index 9b363fdc..ee2bd6c5 100644 --- a/routing/router_test.go +++ b/routing/router_test.go @@ -24,7 +24,7 @@ type testCtx struct { chain *mockChain - notifier *mockNotifier + chainView *mockChainView } func createTestCtx(startingHeight uint32, testGraph ...string) (*testCtx, func(), error) { @@ -74,11 +74,11 @@ func createTestCtx(startingHeight uint32, testGraph ...string) (*testCtx, func() // any p2p functionality, the peer send and switch send messages won't // be populated. chain := newMockChain(startingHeight) - notifier := newMockNotifier() + chainView := newMockChainView() router, err := New(Config{ - Graph: graph, - Chain: chain, - Notifier: notifier, + Graph: graph, + Chain: chain, + ChainView: chainView, SendToSwitch: func(_ *btcec.PublicKey, _ *lnwire.UpdateAddHTLC) ([32]byte, error) { return [32]byte{}, nil @@ -97,11 +97,11 @@ func createTestCtx(startingHeight uint32, testGraph ...string) (*testCtx, func() } return &testCtx{ - router: router, - graph: graph, - aliases: aliasMap, - chain: chain, - notifier: notifier, + router: router, + graph: graph, + aliases: aliasMap, + chain: chain, + chainView: chainView, }, cleanUp, nil }