routing/chainview: use blockEventQueue for neutrino block events

This commit makes use of the blockEventQueue within the neutrino
implementation of FilteredChainView to ensure connected and
disconnected blocks are consumed in order by the reader.

It also specifies that neutrino is not to send disconnected blocks
notifications during rescans, making it consistent with the btcd
implementation.
This commit is contained in:
Johan T. Halseth 2017-10-02 17:15:42 +02:00
parent 082f012fcf
commit 6d15be5b79
No known key found for this signature in database
GPG Key ID: 15BAADA29DA20D26

@ -35,16 +35,10 @@ type CfFilteredChainView struct {
// rescan will be sent over. // rescan will be sent over.
rescanErrChan <-chan error rescanErrChan <-chan error
// newBlocks is the channel in which new filtered blocks are sent over. // blockEventQueue is the ordered queue used to keep the order
newBlocks chan *FilteredBlock // of connected and disconnected blocks sent to the reader of the
// chainView.
// staleBlocks is the channel in which blocks that have been blockQueue *blockEventQueue
// disconnected from the mainchain are sent over.
staleBlocks chan *FilteredBlock
// filterUpdates is a channel in which updates to the utxo filter
// attached to this instance are sent over.
filterUpdates chan filterUpdate
// chainFilter is the // chainFilter is the
filterMtx sync.RWMutex filterMtx sync.RWMutex
@ -65,11 +59,9 @@ var _ FilteredChainView = (*CfFilteredChainView)(nil)
// this function. // this function.
func NewCfFilteredChainView(node *neutrino.ChainService) (*CfFilteredChainView, error) { func NewCfFilteredChainView(node *neutrino.ChainService) (*CfFilteredChainView, error) {
return &CfFilteredChainView{ return &CfFilteredChainView{
newBlocks: make(chan *FilteredBlock), blockQueue: newBlockEventQueue(),
staleBlocks: make(chan *FilteredBlock),
quit: make(chan struct{}), quit: make(chan struct{}),
rescanErrChan: make(chan error), rescanErrChan: make(chan error),
filterUpdates: make(chan filterUpdate),
chainFilter: make(map[wire.OutPoint]struct{}), chainFilter: make(map[wire.OutPoint]struct{}),
p2pNode: node, p2pNode: node,
}, nil }, nil
@ -122,6 +114,8 @@ func (c *CfFilteredChainView) Start() error {
c.chainView = c.p2pNode.NewRescan(rescanOptions...) c.chainView = c.p2pNode.NewRescan(rescanOptions...)
c.rescanErrChan = c.chainView.Start() c.rescanErrChan = c.chainView.Start()
c.blockQueue.Start()
c.wg.Add(1) c.wg.Add(1)
go c.chainFilterer() go c.chainFilterer()
@ -140,6 +134,7 @@ func (c *CfFilteredChainView) Stop() error {
log.Infof("FilteredChainView stopping") log.Infof("FilteredChainView stopping")
close(c.quit) close(c.quit)
c.blockQueue.Stop()
c.wg.Wait() c.wg.Wait()
return nil return nil
@ -164,13 +159,16 @@ func (c *CfFilteredChainView) onFilteredBlockConnected(height int32,
} }
go func() { block := &FilteredBlock{
c.newBlocks <- &FilteredBlock{
Hash: header.BlockHash(), Hash: header.BlockHash(),
Height: uint32(height), Height: uint32(height),
Transactions: mtxs, Transactions: mtxs,
} }
}()
c.blockQueue.Add(&blockEvent{
eventType: connected,
block: block,
})
} }
// onFilteredBlockDisconnected is a callback which is executed once a block is // onFilteredBlockDisconnected is a callback which is executed once a block is
@ -178,59 +176,29 @@ func (c *CfFilteredChainView) onFilteredBlockConnected(height int32,
func (c *CfFilteredChainView) onFilteredBlockDisconnected(height int32, func (c *CfFilteredChainView) onFilteredBlockDisconnected(height int32,
header *wire.BlockHeader) { header *wire.BlockHeader) {
log.Debugf("got disconnected block at height %d: %v", height,
header.BlockHash())
filteredBlock := &FilteredBlock{ filteredBlock := &FilteredBlock{
Hash: header.BlockHash(), Hash: header.BlockHash(),
Height: uint32(height), Height: uint32(height),
} }
go func() { c.blockQueue.Add(&blockEvent{
c.staleBlocks <- filteredBlock eventType: disconnected,
}() block: filteredBlock,
})
} }
// chainFilterer is the primary coordination goroutine within the // chainFilterer is the primary coordination goroutine within the
// CfFilteredChainView. This goroutine handles errors from the running rescan, // CfFilteredChainView. This goroutine handles errors from the running rescan.
// and also filter updates.
func (c *CfFilteredChainView) chainFilterer() { func (c *CfFilteredChainView) chainFilterer() {
defer c.wg.Done() defer c.wg.Done()
for { for {
select { select {
case err := <-c.rescanErrChan: case err := <-c.rescanErrChan:
log.Errorf("Error encountered during rescan: %v", err) log.Errorf("Error encountered during rescan: %v", err)
// We've received a new update to the filter from the caller to
// mutate their established chain view.
case update := <-c.filterUpdates:
log.Debugf("Updating chain filter with new UTXO's: %v",
update.newUtxos)
// First, we'll update the current chain view, by
// adding any new UTXO's, ignoring duplicates int he
// process.
c.filterMtx.Lock()
for _, op := range update.newUtxos {
c.chainFilter[op] = struct{}{}
}
c.filterMtx.Unlock()
// With our internal chain view update, we'll craft a
// new update to the chainView which includes our new
// UTXO's, and current update height.
rescanUpdate := []neutrino.UpdateOption{
neutrino.AddOutPoints(update.newUtxos...),
neutrino.Rewind(update.updateHeight),
}
err := c.chainView.Update(rescanUpdate...)
if err != nil {
log.Errorf("unable to update rescan: %v", err)
}
if update.done != nil {
close(update.done)
}
case <-c.quit: case <-c.quit:
return return
} }
@ -343,27 +311,32 @@ func (c *CfFilteredChainView) FilterBlock(blockHash *chainhash.Hash) (*FilteredB
// rewound to ensure all relevant notifications are dispatched. // rewound to ensure all relevant notifications are dispatched.
// //
// NOTE: This is part of the FilteredChainView interface. // NOTE: This is part of the FilteredChainView interface.
func (c *CfFilteredChainView) UpdateFilter(ops []wire.OutPoint, updateHeight uint32) error { func (c *CfFilteredChainView) UpdateFilter(ops []wire.OutPoint,
doneChan := make(chan struct{}) updateHeight uint32) error {
update := filterUpdate{ log.Debugf("Updating chain filter with new UTXO's: %v", ops)
newUtxos: ops,
updateHeight: updateHeight,
done: doneChan,
}
select { // First, we'll update the current chain view, by
case c.filterUpdates <- update: // adding any new UTXO's, ignoring duplicates in the
case <-c.quit: // process.
return fmt.Errorf("chain filter shutting down") c.filterMtx.Lock()
for _, op := range ops {
c.chainFilter[op] = struct{}{}
} }
c.filterMtx.Unlock()
select { // With our internal chain view update, we'll craft a
case <-doneChan: // new update to the chainView which includes our new
// UTXO's, and current update height.
rescanUpdate := []neutrino.UpdateOption{
neutrino.AddOutPoints(ops...),
neutrino.Rewind(updateHeight),
neutrino.DisableDisconnectedNtfns(true),
}
err := c.chainView.Update(rescanUpdate...)
if err != nil {
return fmt.Errorf("unable to update rescan: %v", err)
}
return nil return nil
case <-c.quit:
return fmt.Errorf("chain filter shutting down")
}
} }
// FilteredBlocks returns the channel that filtered blocks are to be sent over. // FilteredBlocks returns the channel that filtered blocks are to be sent over.
@ -373,7 +346,7 @@ func (c *CfFilteredChainView) UpdateFilter(ops []wire.OutPoint, updateHeight uin
// //
// NOTE: This is part of the FilteredChainView interface. // NOTE: This is part of the FilteredChainView interface.
func (c *CfFilteredChainView) FilteredBlocks() <-chan *FilteredBlock { func (c *CfFilteredChainView) FilteredBlocks() <-chan *FilteredBlock {
return c.newBlocks return c.blockQueue.newBlocks
} }
// DisconnectedBlocks returns a receive only channel which will be sent upon // DisconnectedBlocks returns a receive only channel which will be sent upon
@ -382,5 +355,5 @@ func (c *CfFilteredChainView) FilteredBlocks() <-chan *FilteredBlock {
// //
// NOTE: This is part of the FilteredChainView interface. // NOTE: This is part of the FilteredChainView interface.
func (c *CfFilteredChainView) DisconnectedBlocks() <-chan *FilteredBlock { func (c *CfFilteredChainView) DisconnectedBlocks() <-chan *FilteredBlock {
return c.staleBlocks return c.blockQueue.staleBlocks
} }