routing: abandon ChainNotifier for FilteredChainView

This commit modifies the routing package to no longer use the
ChainNotifier for pruning the channel graph. Instead, we now use the
FilteredChainView interface to more (from the ChannelRouter’s PoV)
efficiently maintain the channel graph.

Rather than scanning the _entire_ block manually, we now rely on the
FilteredChainView to provide us with FilteredBlocks which include
_only_ the relevant transactions that we care about.
This commit is contained in:
Olaoluwa Osuntokun 2017-05-10 17:22:26 -07:00
parent 2ec0f5788e
commit 828d28581a
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
4 changed files with 150 additions and 113 deletions

@ -5,11 +5,12 @@ import (
"io"
"github.com/btcsuite/btclog"
"github.com/lightningnetwork/lnd/routing/chainview"
)
// log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller
// requests it.
// log is a logger that is initialized with no output filters. This means the
// package will not perform any logging by default until the caller requests
// it.
var log btclog.Logger
// The default amount of logging is none.
@ -17,17 +18,18 @@ func init() {
DisableLog()
}
// DisableLog disables all library log output. Logging output is disabled
// by default until either UseLogger or SetLogWriter are called.
// DisableLog disables all library log output. Logging output is disabled by
// default until either UseLogger or SetLogWriter are called.
func DisableLog() {
log = btclog.Disabled
}
// UseLogger uses a specified Logger to output package logging info.
// This should be used in preference to SetLogWriter if the caller is also
// using btclog.
// UseLogger uses a specified Logger to output package logging info. This
// should be used in preference to SetLogWriter if the caller is also using
// btclog.
func UseLogger(logger btclog.Logger) {
log = logger
chainview.UseLogger(logger)
}
// SetLogWriter uses a specified io.Writer to output package logging info.
@ -53,8 +55,8 @@ func SetLogWriter(w io.Writer, level string) error {
return nil
}
// logClosure is used to provide a closure over expensive logging operations
// so don't have to be performed when the logging level doesn't warrant it.
// logClosure is used to provide a closure over expensive logging operations so
// don't have to be performed when the logging level doesn't warrant it.
type logClosure func() string
// String invokes the underlying function and returns the result.

@ -11,10 +11,10 @@ import (
prand "math/rand"
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/chainview"
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/chaincfg/chainhash"
"github.com/roasbeef/btcd/wire"
@ -122,6 +122,10 @@ type mockChain struct {
sync.RWMutex
}
// A compile time check to ensure mockChain implements the
// lnwallet.BlockChainIO interface.
var _ lnwallet.BlockChainIO = (*mockChain)(nil)
func newMockChain(currentHeight uint32) *mockChain {
return &mockChain{
bestHeight: int32(currentHeight),
@ -168,16 +172,11 @@ func (m *mockChain) addUtxo(op wire.OutPoint, out *wire.TxOut) {
m.utxos[op] = *out
m.Unlock()
}
func (m *mockChain) GetUtxo(txid *chainhash.Hash, index uint32) (*wire.TxOut, error) {
func (m *mockChain) GetUtxo(op *wire.OutPoint, _ uint32) (*wire.TxOut, error) {
m.RLock()
defer m.RUnlock()
op := wire.OutPoint{
Hash: *txid,
Index: index,
}
utxo, ok := m.utxos[op]
utxo, ok := m.utxos[*op]
if !ok {
return nil, fmt.Errorf("utxo not found")
}
@ -205,61 +204,68 @@ func (m *mockChain) GetBlock(blockHash *chainhash.Hash) (*wire.MsgBlock, error)
return block, nil
}
type mockNotifier struct {
clientCounter uint32
epochClients map[uint32]chan *chainntnfs.BlockEpoch
type mockChainView struct {
sync.RWMutex
newBlocks chan *chainview.FilteredBlock
staleBlocks chan *chainview.FilteredBlock
filter map[wire.OutPoint]struct{}
}
func newMockNotifier() *mockNotifier {
return &mockNotifier{
epochClients: make(map[uint32]chan *chainntnfs.BlockEpoch),
// A compile time check to ensure mockChainView implements the
// chainview.FilteredChainView.
var _ chainview.FilteredChainView = (*mockChainView)(nil)
func newMockChainView() *mockChainView {
return &mockChainView{
newBlocks: make(chan *chainview.FilteredBlock, 10),
staleBlocks: make(chan *chainview.FilteredBlock, 10),
filter: make(map[wire.OutPoint]struct{}),
}
}
func (m *mockNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
numConfs uint32) (*chainntnfs.ConfirmationEvent, error) {
func (m *mockChainView) UpdateFilter(ops []wire.OutPoint, updateHeight uint32) error {
m.Lock()
defer m.Unlock()
return nil, nil
}
func (m *mockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint) (*chainntnfs.SpendEvent, error) {
return nil, nil
}
func (m *mockNotifier) notifyBlock(hash chainhash.Hash, height uint32) {
m.RLock()
defer m.RUnlock()
for _, client := range m.epochClients {
client <- &chainntnfs.BlockEpoch{
Height: int32(height),
Hash: &hash,
for _, op := range ops {
m.filter[op] = struct{}{}
}
}
}
func (m *mockNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) {
m.RLock()
defer m.RUnlock()
epochChan := make(chan *chainntnfs.BlockEpoch)
clientID := m.clientCounter
m.clientCounter++
m.epochClients[clientID] = epochChan
return &chainntnfs.BlockEpochEvent{
Epochs: epochChan,
Cancel: func() {},
}, nil
}
func (m *mockNotifier) Start() error {
return nil
}
func (m *mockNotifier) Stop() error {
func (m *mockChainView) notifyBlock(hash chainhash.Hash, height uint32,
txns []*wire.MsgTx) {
m.RLock()
defer m.RUnlock()
m.newBlocks <- &chainview.FilteredBlock{
Hash: hash,
Height: height,
Transactions: txns,
}
}
func (m *mockChainView) FilteredBlocks() <-chan *chainview.FilteredBlock {
return m.newBlocks
}
func (m *mockChainView) DisconnectedBlocks() <-chan *chainview.FilteredBlock {
return m.staleBlocks
}
func (m *mockChainView) FilterBlock(blockHash *chainhash.Hash) (*chainview.FilteredBlock, error) {
return nil, nil
}
func (m *mockChainView) Start() error {
return nil
}
func (m *mockChainView) Stop() error {
return nil
}
@ -643,7 +649,8 @@ func TestChannelCloseNotification(t *testing.T) {
},
}
ctx.chain.addBlock(newBlock, blockHeight)
ctx.notifier.notifyBlock(newBlock.Header.BlockHash(), blockHeight)
ctx.chainView.notifyBlock(newBlock.Header.BlockHash(), blockHeight,
newBlock.Transactions)
// The notification registered above should be sent, if not we'll time
// out and mark the test as failed.

@ -9,10 +9,10 @@ import (
"github.com/boltdb/bolt"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/chainview"
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil"
@ -97,13 +97,10 @@ type Config struct {
// to ensure that the channels advertised are still open.
Chain lnwallet.BlockChainIO
// Notifier is an instance of the ChainNotifier that the router uses to
// received notifications of incoming blocks. With each new incoming
// block found, the router may be able to partially prune the channel
// graph as channels may have been pruned.
// TODO(roasbeef): could possibly just replace this with an epoch
// channel.
Notifier chainntnfs.ChainNotifier
// ChainView is an instance of a FilteredChainView which is used to
// watch the sub-set of the UTXO set (the set of active channels) that
// we need in order to properly maintain the channel graph.
ChainView chainview.FilteredChainView
// FeeSchema is the set fee schema that will be announced on to the
// network.
@ -150,6 +147,8 @@ type ChannelRouter struct {
started uint32
stopped uint32
bestHeight uint32
// cfg is a copy of the configuration struct that the ChannelRouter was
// initialized with.
cfg *Config
@ -172,7 +171,7 @@ type ChannelRouter struct {
// newBlocks is a channel in which new blocks connected to the end of
// the main chain are sent over.
newBlocks <-chan *chainntnfs.BlockEpoch
newBlocks <-chan *chainview.FilteredBlock
// networkUpdates is a channel that carries new topology updates
// messages from outside the ChannelRouter to be processed by the
@ -231,15 +230,15 @@ func (r *ChannelRouter) Start() error {
log.Tracef("Channel Router starting")
// First we register for new notifications of newly discovered blocks.
// We do this immediately so we'll later be able to consume any/all
// blocks which were discovered as we prune the channel graph using a
// snapshot of the chain state.
blockEpochs, err := r.cfg.Notifier.RegisterBlockEpochNtfn()
if err != nil {
// First, we'll start the chain view instance (if it isn't already
// started).
if err := r.cfg.ChainView.Start(); err != nil {
return err
}
r.newBlocks = blockEpochs.Epochs
// Once the instance is active, we'll fetch the channel we'll receive
// notifications over.
r.newBlocks = r.cfg.ChainView.FilteredBlocks()
// Before we begin normal operation of the router, we first need to
// synchronize the channel graph to the latest state of the UTXO set.
@ -247,6 +246,18 @@ func (r *ChannelRouter) Start() error {
return err
}
// Once we've concluded our manual block pruning, we'll constrcut and
// apply a fresh chain filter to the active FilteredChainView instance.
channelView, err := r.cfg.Graph.ChannelView()
if err != nil && err != channeldb.ErrGraphNoEdgesFound {
return err
}
log.Infof("Filtering chain using %v channels active", len(channelView))
err = r.cfg.ChainView.UpdateFilter(channelView, r.bestHeight)
if err != nil {
return err
}
r.wg.Add(1)
go r.networkHandler()
@ -263,6 +274,10 @@ func (r *ChannelRouter) Stop() error {
log.Infof("Channel Router shutting down")
if err := r.cfg.ChainView.Stop(); err != nil {
return err
}
close(r.quit)
r.wg.Wait()
@ -280,6 +295,7 @@ func (r *ChannelRouter) syncGraphWithChain() error {
if err != nil {
return err
}
r.bestHeight = uint32(bestHeight)
pruneHash, pruneHeight, err := r.cfg.Graph.PruneTip()
if err != nil {
switch {
@ -317,13 +333,13 @@ func (r *ChannelRouter) syncGraphWithChain() error {
// that hasn't yet been consumed by the channel graph.
var numChansClosed uint32
for nextHeight := pruneHeight + 1; nextHeight <= uint32(bestHeight); nextHeight++ {
// Using the next height, fetch the next block to use in our
// incremental graph pruning routine.
// Using the next height, request a manual block pruning from
// the chainview for the particular block hash.
nextHash, err := r.cfg.Chain.GetBlockHash(int64(nextHeight))
if err != nil {
return err
}
nextBlock, err := r.cfg.Chain.GetBlock(nextHash)
filterBlock, err := r.cfg.ChainView.FilterBlock(nextHash)
if err != nil {
return err
}
@ -332,7 +348,7 @@ func (r *ChannelRouter) syncGraphWithChain() error {
// spent in the block, so collate all the referenced previous
// outpoints within each tx and input.
var spentOutputs []*wire.OutPoint
for _, tx := range nextBlock.Transactions {
for _, tx := range filterBlock.Transactions {
for _, txIn := range tx.TxIn {
spentOutputs = append(spentOutputs,
&txIn.PreviousOutPoint)
@ -357,7 +373,6 @@ func (r *ChannelRouter) syncGraphWithChain() error {
log.Infof("Graph pruning complete: %v channels we're closed since "+
"height %v", numChansClosed, pruneHeight)
return nil
}
@ -407,7 +422,7 @@ func (r *ChannelRouter) networkHandler() {
// A new block has arrived, so we can prune the channel graph
// of any channels which were closed in the block.
case newBlock, ok := <-r.newBlocks:
case chainUpdate, ok := <-r.newBlocks:
// If the channel has been closed, then this indicates
// the daemon is shutting down, so we exit ourselves.
if !ok {
@ -416,22 +431,17 @@ func (r *ChannelRouter) networkHandler() {
// Once a new block arrives, we update our running
// track of the height of the chain tip.
blockHeight := uint32(newBlock.Height)
blockHeight := uint32(chainUpdate.Height)
r.bestHeight = blockHeight
log.Infof("Pruning channel graph using block %v (height=%v)",
newBlock.Hash, blockHeight)
block, err := r.cfg.Chain.GetBlock(newBlock.Hash)
if err != nil {
log.Errorf("unable to get block: %v", err)
continue
}
chainUpdate.Hash, blockHeight)
// We're only interested in all prior outputs that've
// been spent in the block, so collate all the
// referenced previous outpoints within each tx and
// input.
var spentOutputs []*wire.OutPoint
for _, tx := range block.Transactions {
for _, tx := range chainUpdate.Transactions {
for _, txIn := range tx.TxIn {
spentOutputs = append(spentOutputs,
&txIn.PreviousOutPoint)
@ -443,14 +453,14 @@ func (r *ChannelRouter) networkHandler() {
// of the block being pruned so the prune tip can be
// updated.
chansClosed, err := r.cfg.Graph.PruneGraph(spentOutputs,
newBlock.Hash, blockHeight)
&chainUpdate.Hash, chainUpdate.Height)
if err != nil {
log.Errorf("unable to prune routing table: %v", err)
continue
}
log.Infof("Block %v (height=%v) closed %v channels",
newBlock.Hash, blockHeight, len(chansClosed))
chainUpdate.Hash, blockHeight, len(chansClosed))
if len(chansClosed) == 0 {
continue
@ -563,7 +573,8 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error {
// Now that we have the funding outpoint of the channel, ensure
// that it hasn't yet been spent. If so, then this channel has
// been closed so we'll ignore it.
chanUtxo, err := r.cfg.Chain.GetUtxo(&fundingPoint.Hash, fundingPoint.Index)
chanUtxo, err := r.cfg.Chain.GetUtxo(fundingPoint,
channelID.BlockHeight)
if err != nil {
return errors.Errorf("unable to fetch utxo for "+
"chan_id=%v: %v", msg.ChannelID, err)
@ -588,9 +599,8 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error {
// channel edge and also that the announced channel value is
// right.
if !bytes.Equal(witnessOutput.PkScript, chanUtxo.PkScript) {
return errors.New("pkscipts aren't equal, " +
"which means that either bitcoin keys" +
" are wrong or value don't correponds")
return errors.Errorf("pkScript mismatch: expected %v, "+
"got %v", witnessOutput.PkScript, chanUtxo.PkScript)
}
// TODO(roasbeef): this is a hack, needs to be removed
@ -609,6 +619,17 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error {
msg.NodeKey2.SerializeCompressed(),
fundingPoint, msg.ChannelID, msg.Capacity)
// As a new edge has been added to the channel graph, we'll
// update the current UTXO filter within our active
// FilteredChainView so we are notified if/when this channel is
// closed.
filterUpdate := []wire.OutPoint{*fundingPoint}
err = r.cfg.ChainView.UpdateFilter(filterUpdate, r.bestHeight)
if err != nil {
return errors.Errorf("unable to update chain "+
"view: %v", err)
}
case *channeldb.ChannelEdgePolicy:
channelID := lnwire.NewShortChanIDFromInt(msg.ChannelID)
edge1Timestamp, edge2Timestamp, _, err := r.cfg.Graph.HasChannelEdge(msg.ChannelID)
@ -647,19 +668,23 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error {
}
}
// Before we can update the channel information, we need to get
// the UTXO itself so we can store the proper capacity.
// Before we can update the channel information, we'll ensure
// that the target channel is still open by querying the
// utxo-set for its existence.
chanPoint, err := r.fetchChanPoint(&channelID)
if err != nil {
return errors.Errorf("unable to fetch chan point for "+
"chan_id=%v: %v", msg.ChannelID, err)
}
if _, err := r.cfg.Chain.GetUtxo(&chanPoint.Hash,
chanPoint.Index); err != nil {
_, err = r.cfg.Chain.GetUtxo(chanPoint, channelID.BlockHeight)
if err != nil {
return errors.Errorf("unable to fetch utxo for "+
"chan_id=%v: %v", msg.ChannelID, err)
}
// Now that we know this isn't a stale update, we'll apply the
// new edge policy to the proper directional edge within the
// channel graph.
if err = r.cfg.Graph.UpdateEdgePolicy(msg); err != nil {
err := errors.Errorf("unable to add channel: %v", err)
log.Error(err)
@ -688,6 +713,9 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error {
// fetchChanPoint retrieves the original outpoint which is encoded within the
// channelID.
//
// TODO(roasbeef): replace iwth call to GetBlockTransaction? (woudl allow to
// later use getblocktxn)
func (r *ChannelRouter) fetchChanPoint(chanID *lnwire.ShortChannelID) (*wire.OutPoint, error) {
// First fetch the block hash by the block number encoded, then use
// that hash to fetch the block itself.

@ -24,7 +24,7 @@ type testCtx struct {
chain *mockChain
notifier *mockNotifier
chainView *mockChainView
}
func createTestCtx(startingHeight uint32, testGraph ...string) (*testCtx, func(), error) {
@ -74,11 +74,11 @@ func createTestCtx(startingHeight uint32, testGraph ...string) (*testCtx, func()
// any p2p functionality, the peer send and switch send messages won't
// be populated.
chain := newMockChain(startingHeight)
notifier := newMockNotifier()
chainView := newMockChainView()
router, err := New(Config{
Graph: graph,
Chain: chain,
Notifier: notifier,
ChainView: chainView,
SendToSwitch: func(_ *btcec.PublicKey,
_ *lnwire.UpdateAddHTLC) ([32]byte, error) {
return [32]byte{}, nil
@ -101,7 +101,7 @@ func createTestCtx(startingHeight uint32, testGraph ...string) (*testCtx, func()
graph: graph,
aliases: aliasMap,
chain: chain,
notifier: notifier,
chainView: chainView,
}, cleanUp, nil
}