contractcourt: use single block subscription for block epochs

This commit is contained in:
carla 2020-11-12 15:23:24 +02:00
parent 3f9a707531
commit 89fe21b79a
No known key found for this signature in database
GPG Key ID: 4CA7FE54A6213C91
3 changed files with 128 additions and 50 deletions

@ -312,18 +312,8 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
log.Tracef("Creating ChannelArbitrator for ChannelPoint(%v)", log.Tracef("Creating ChannelArbitrator for ChannelPoint(%v)",
channel.FundingOutpoint) channel.FundingOutpoint)
// We'll start by registering for a block epoch notifications so this
// channel can keep track of the current state of the main chain.
//
// TODO(roasbeef): fetch best height (or pass in) so can ensure block // TODO(roasbeef): fetch best height (or pass in) so can ensure block
// epoch delivers all the notifications to // epoch delivers all the notifications to
//
// TODO(roasbeef): instead 1 block epoch that multi-plexes to the rest?
// * reduces the number of goroutines
blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn(nil)
if err != nil {
return nil, err
}
chanPoint := channel.FundingOutpoint chanPoint := channel.FundingOutpoint
@ -333,7 +323,6 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
ChanPoint: chanPoint, ChanPoint: chanPoint,
Channel: c.getArbChannel(channel), Channel: c.getArbChannel(channel),
ShortChanID: channel.ShortChanID(), ShortChanID: channel.ShortChanID(),
BlockEpochs: blockEpoch,
MarkCommitmentBroadcasted: channel.MarkCommitmentBroadcasted, MarkCommitmentBroadcasted: channel.MarkCommitmentBroadcasted,
MarkChannelClosed: func(summary *channeldb.ChannelCloseSummary, MarkChannelClosed: func(summary *channeldb.ChannelCloseSummary,
@ -369,7 +358,6 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint, c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint,
) )
if err != nil { if err != nil {
blockEpoch.Cancel()
return nil, err return nil, err
} }
@ -385,7 +373,6 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
pendingRemoteCommitment, err := channel.RemoteCommitChainTip() pendingRemoteCommitment, err := channel.RemoteCommitChainTip()
if err != nil && err != channeldb.ErrNoPendingCommit { if err != nil && err != channeldb.ErrNoPendingCommit {
blockEpoch.Cancel()
return nil, err return nil, err
} }
if pendingRemoteCommitment != nil { if pendingRemoteCommitment != nil {
@ -545,18 +532,12 @@ func (c *ChainArbitrator) Start() error {
// the chain any longer, only resolve the contracts on the confirmed // the chain any longer, only resolve the contracts on the confirmed
// commitment. // commitment.
for _, closeChanInfo := range closingChannels { for _, closeChanInfo := range closingChannels {
blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn(nil)
if err != nil {
return err
}
// We can leave off the CloseContract and ForceCloseChan // We can leave off the CloseContract and ForceCloseChan
// methods as the channel is already closed at this point. // methods as the channel is already closed at this point.
chanPoint := closeChanInfo.ChanPoint chanPoint := closeChanInfo.ChanPoint
arbCfg := ChannelArbitratorConfig{ arbCfg := ChannelArbitratorConfig{
ChanPoint: chanPoint, ChanPoint: chanPoint,
ShortChanID: closeChanInfo.ShortChanID, ShortChanID: closeChanInfo.ShortChanID,
BlockEpochs: blockEpoch,
ChainArbitratorConfig: c.cfg, ChainArbitratorConfig: c.cfg,
ChainEvents: &ChainEventSubscription{}, ChainEvents: &ChainEventSubscription{},
IsPendingClose: true, IsPendingClose: true,
@ -574,7 +555,6 @@ func (c *ChainArbitrator) Start() error {
c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint, c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint,
) )
if err != nil { if err != nil {
blockEpoch.Cancel()
return err return err
} }
arbCfg.MarkChannelResolved = func() error { arbCfg.MarkChannelResolved = func() error {
@ -627,8 +607,8 @@ func (c *ChainArbitrator) Start() error {
} }
} }
// Finally, we'll launch all the goroutines for each arbitrator so they // Launch all the goroutines for each arbitrator so they can carry out
// can carry out their duties. // their duties.
for _, arbitrator := range c.activeChannels { for _, arbitrator := range c.activeChannels {
if err := arbitrator.Start(); err != nil { if err := arbitrator.Start(); err != nil {
c.Stop() c.Stop()
@ -636,11 +616,121 @@ func (c *ChainArbitrator) Start() error {
} }
} }
// Subscribe to a single stream of block epoch notifications that we
// will dispatch to all active arbitrators.
blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn(nil)
if err != nil {
return err
}
// Start our goroutine which will dispatch blocks to each arbitrator.
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.dispatchBlocks(blockEpoch)
}()
// TODO(roasbeef): eventually move all breach watching here // TODO(roasbeef): eventually move all breach watching here
return nil return nil
} }
// blockRecipient contains the information we need to dispatch a block to a
// channel arbitrator.
type blockRecipient struct {
// chanPoint is the funding outpoint of the channel.
chanPoint wire.OutPoint
// blocks is the channel that new block heights are sent into. This
// channel should be sufficiently buffered as to not block the sender.
blocks chan<- int32
// quit is closed if the receiving entity is shutting down.
quit chan struct{}
}
// dispatchBlocks consumes a block epoch notification stream and dispatches
// blocks to each of the chain arb's active channel arbitrators. This function
// must be run in a goroutine.
func (c *ChainArbitrator) dispatchBlocks(
blockEpoch *chainntnfs.BlockEpochEvent) {
// getRecipients is a helper function which acquires the chain arb
// lock and returns a set of block recipients which can be used to
// dispatch blocks.
getRecipients := func() []blockRecipient {
c.Lock()
blocks := make([]blockRecipient, 0, len(c.activeChannels))
for _, channel := range c.activeChannels {
blocks = append(blocks, blockRecipient{
chanPoint: channel.cfg.ChanPoint,
blocks: channel.blocks,
quit: channel.quit,
})
}
c.Unlock()
return blocks
}
// On exit, cancel our blocks subscription and close each block channel
// so that the arbitrators know they will no longer be receiving blocks.
defer func() {
blockEpoch.Cancel()
recipients := getRecipients()
for _, recipient := range recipients {
close(recipient.blocks)
}
}()
// Consume block epochs until we receive the instruction to shutdown.
for {
select {
// Consume block epochs, exiting if our subscription is
// terminated.
case block, ok := <-blockEpoch.Epochs:
if !ok {
log.Trace("dispatchBlocks block epoch " +
"cancelled")
return
}
// Get the set of currently active channels block
// subscription channels and dispatch the block to
// each.
for _, recipient := range getRecipients() {
select {
// Deliver the block to the arbitrator.
case recipient.blocks <- block.Height:
// If the recipient is shutting down, exit
// without delivering the block. This may be
// the case when two blocks are mined in quick
// succession, and the arbitrator resolves
// after the first block, and does not need to
// consume the second block.
case <-recipient.quit:
log.Debugf("channel: %v exit without "+
"receiving block: %v",
recipient.chanPoint,
block.Height)
// If the chain arb is shutting down, we don't
// need to deliver any more blocks (everything
// will be shutting down).
case <-c.quit:
return
}
}
// Exit if the chain arbitrator is shutting down.
case <-c.quit:
return
}
}
}
// publishClosingTxs will load any stored cooperative or unilater closing // publishClosingTxs will load any stored cooperative or unilater closing
// transactions and republish them. This helps ensure propagation of the // transactions and republish them. This helps ensure propagation of the
// transactions in the event that prior publications failed. // transactions in the event that prior publications failed.

@ -12,7 +12,6 @@ import (
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil" "github.com/btcsuite/btcutil"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/kvdb" "github.com/lightningnetwork/lnd/channeldb/kvdb"
"github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/input"
@ -34,6 +33,10 @@ const (
// anchorSweepConfTarget is the conf target used when sweeping // anchorSweepConfTarget is the conf target used when sweeping
// commitment anchors. // commitment anchors.
anchorSweepConfTarget = 6 anchorSweepConfTarget = 6
// arbitratorBlockBufferSize is the size of the buffer we give to each
// channel arbitrator.
arbitratorBlockBufferSize = 20
) )
// WitnessSubscription represents an intent to be notified once new witnesses // WitnessSubscription represents an intent to be notified once new witnesses
@ -108,12 +111,6 @@ type ChannelArbitratorConfig struct {
// to the switch during contract resolution. // to the switch during contract resolution.
ShortChanID lnwire.ShortChannelID ShortChanID lnwire.ShortChannelID
// BlockEpochs is an active block epoch event stream backed by an
// active ChainNotifier instance. We will use new block notifications
// sent over this channel to decide when we should go on chain to
// reclaim/redeem the funds in an HTLC sent to/from us.
BlockEpochs *chainntnfs.BlockEpochEvent
// ChainEvents is an active subscription to the chain watcher for this // ChainEvents is an active subscription to the chain watcher for this
// channel to be notified of any on-chain activity related to this // channel to be notified of any on-chain activity related to this
// channel. // channel.
@ -325,6 +322,11 @@ type ChannelArbitrator struct {
// to do its duty. // to do its duty.
cfg ChannelArbitratorConfig cfg ChannelArbitratorConfig
// blocks is a channel that the arbitrator will receive new blocks on.
// This channel should be buffered by so that it does not block the
// sender.
blocks chan int32
// signalUpdates is a channel that any new live signals for the channel // signalUpdates is a channel that any new live signals for the channel
// we're watching over will be sent. // we're watching over will be sent.
signalUpdates chan *signalUpdateMsg signalUpdates chan *signalUpdateMsg
@ -366,6 +368,7 @@ func NewChannelArbitrator(cfg ChannelArbitratorConfig,
return &ChannelArbitrator{ return &ChannelArbitrator{
log: log, log: log,
blocks: make(chan int32, arbitratorBlockBufferSize),
signalUpdates: make(chan *signalUpdateMsg), signalUpdates: make(chan *signalUpdateMsg),
htlcUpdates: make(<-chan *ContractUpdate), htlcUpdates: make(<-chan *ContractUpdate),
resolutionSignal: make(chan struct{}), resolutionSignal: make(chan struct{}),
@ -397,13 +400,11 @@ func (c *ChannelArbitrator) Start() error {
// machine can act accordingly. // machine can act accordingly.
c.state, err = c.log.CurrentState() c.state, err = c.log.CurrentState()
if err != nil { if err != nil {
c.cfg.BlockEpochs.Cancel()
return err return err
} }
_, bestHeight, err := c.cfg.ChainIO.GetBestBlock() _, bestHeight, err := c.cfg.ChainIO.GetBestBlock()
if err != nil { if err != nil {
c.cfg.BlockEpochs.Cancel()
return err return err
} }
@ -479,7 +480,6 @@ func (c *ChannelArbitrator) Start() error {
c.cfg.ChanPoint) c.cfg.ChanPoint)
default: default:
c.cfg.BlockEpochs.Cancel()
return err return err
} }
} }
@ -501,7 +501,6 @@ func (c *ChannelArbitrator) Start() error {
// commitment has been confirmed on chain, and before we // commitment has been confirmed on chain, and before we
// advance our state step, we call InsertConfirmedCommitSet. // advance our state step, we call InsertConfirmedCommitSet.
if err := c.relaunchResolvers(commitSet, triggerHeight); err != nil { if err := c.relaunchResolvers(commitSet, triggerHeight); err != nil {
c.cfg.BlockEpochs.Cancel()
return err return err
} }
} }
@ -2111,7 +2110,6 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32) {
// TODO(roasbeef): tell top chain arb we're done // TODO(roasbeef): tell top chain arb we're done
defer func() { defer func() {
c.cfg.BlockEpochs.Cancel()
c.wg.Done() c.wg.Done()
}() }()
@ -2121,11 +2119,11 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32) {
// A new block has arrived, we'll examine all the active HTLC's // A new block has arrived, we'll examine all the active HTLC's
// to see if any of them have expired, and also update our // to see if any of them have expired, and also update our
// track of the best current height. // track of the best current height.
case blockEpoch, ok := <-c.cfg.BlockEpochs.Epochs: case blockHeight, ok := <-c.blocks:
if !ok { if !ok {
return return
} }
bestHeight = blockEpoch.Height bestHeight = blockHeight
// If we're not in the default state, then we can // If we're not in the default state, then we can
// ignore this signal as we're waiting for contract // ignore this signal as we're waiting for contract

@ -197,8 +197,6 @@ type chanArbTestCtx struct {
resolvedChan chan struct{} resolvedChan chan struct{}
blockEpochs chan *chainntnfs.BlockEpoch
incubationRequests chan struct{} incubationRequests chan struct{}
resolutions chan []ResolutionMsg resolutions chan []ResolutionMsg
@ -304,12 +302,6 @@ func withMarkClosed(markClosed func(*channeldb.ChannelCloseSummary,
func createTestChannelArbitrator(t *testing.T, log ArbitratorLog, func createTestChannelArbitrator(t *testing.T, log ArbitratorLog,
opts ...testChanArbOption) (*chanArbTestCtx, error) { opts ...testChanArbOption) (*chanArbTestCtx, error) {
blockEpochs := make(chan *chainntnfs.BlockEpoch)
blockEpoch := &chainntnfs.BlockEpochEvent{
Epochs: blockEpochs,
Cancel: func() {},
}
chanPoint := wire.OutPoint{} chanPoint := wire.OutPoint{}
shortChanID := lnwire.ShortChannelID{} shortChanID := lnwire.ShortChannelID{}
chanEvents := &ChainEventSubscription{ chanEvents := &ChainEventSubscription{
@ -366,7 +358,6 @@ func createTestChannelArbitrator(t *testing.T, log ArbitratorLog,
arbCfg := &ChannelArbitratorConfig{ arbCfg := &ChannelArbitratorConfig{
ChanPoint: chanPoint, ChanPoint: chanPoint,
ShortChanID: shortChanID, ShortChanID: shortChanID,
BlockEpochs: blockEpoch,
MarkChannelResolved: func() error { MarkChannelResolved: func() error {
resolvedChan <- struct{}{} resolvedChan <- struct{}{}
return nil return nil
@ -433,7 +424,6 @@ func createTestChannelArbitrator(t *testing.T, log ArbitratorLog,
cleanUp: cleanUp, cleanUp: cleanUp,
resolvedChan: resolvedChan, resolvedChan: resolvedChan,
resolutions: resolutionChan, resolutions: resolutionChan,
blockEpochs: blockEpochs,
log: log, log: log,
incubationRequests: incubateChan, incubationRequests: incubateChan,
sweeper: mockSweeper, sweeper: mockSweeper,
@ -1759,7 +1749,7 @@ func TestChannelArbitratorDanglingCommitForceClose(t *testing.T) {
// now mine a block (height 5), which is 5 blocks away // now mine a block (height 5), which is 5 blocks away
// (our grace delta) from the expiry of that HTLC. // (our grace delta) from the expiry of that HTLC.
case testCase.htlcExpired: case testCase.htlcExpired:
chanArbCtx.blockEpochs <- &chainntnfs.BlockEpoch{Height: 5} chanArbCtx.chanArb.blocks <- 5
// Otherwise, we'll just trigger a regular force close // Otherwise, we'll just trigger a regular force close
// request. // request.
@ -1863,7 +1853,7 @@ func TestChannelArbitratorDanglingCommitForceClose(t *testing.T) {
// so instead, we'll mine another block which'll cause // so instead, we'll mine another block which'll cause
// it to re-examine its state and realize there're no // it to re-examine its state and realize there're no
// more HTLCs. // more HTLCs.
chanArbCtx.blockEpochs <- &chainntnfs.BlockEpoch{Height: 6} chanArbCtx.chanArb.blocks <- 6
chanArbCtx.AssertStateTransitions(StateFullyResolved) chanArbCtx.AssertStateTransitions(StateFullyResolved)
}) })
} }
@ -1940,13 +1930,13 @@ func TestChannelArbitratorPendingExpiredHTLC(t *testing.T) {
// We will advance the uptime to 10 seconds which should be still within // We will advance the uptime to 10 seconds which should be still within
// the grace period and should not trigger going to chain. // the grace period and should not trigger going to chain.
testClock.SetTime(startTime.Add(time.Second * 10)) testClock.SetTime(startTime.Add(time.Second * 10))
chanArbCtx.blockEpochs <- &chainntnfs.BlockEpoch{Height: 5} chanArbCtx.chanArb.blocks <- 5
chanArbCtx.AssertState(StateDefault) chanArbCtx.AssertState(StateDefault)
// We will advance the uptime to 16 seconds which should trigger going // We will advance the uptime to 16 seconds which should trigger going
// to chain. // to chain.
testClock.SetTime(startTime.Add(time.Second * 16)) testClock.SetTime(startTime.Add(time.Second * 16))
chanArbCtx.blockEpochs <- &chainntnfs.BlockEpoch{Height: 6} chanArbCtx.chanArb.blocks <- 6
chanArbCtx.AssertStateTransitions( chanArbCtx.AssertStateTransitions(
StateBroadcastCommit, StateBroadcastCommit,
StateCommitmentBroadcasted, StateCommitmentBroadcasted,