Merge pull request #4697 from carlaKC/4481-batchstartandblocks

contractcourt: batch startup reads and block epoch notifications
This commit is contained in:
Olaoluwa Osuntokun 2020-11-13 16:16:39 -08:00 committed by GitHub
commit c0247583cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 317 additions and 141 deletions

@ -54,8 +54,10 @@ type ArbitratorLog interface {
// TODO(roasbeef): document on interface the errors expected to be // TODO(roasbeef): document on interface the errors expected to be
// returned // returned
// CurrentState returns the current state of the ChannelArbitrator. // CurrentState returns the current state of the ChannelArbitrator. It
CurrentState() (ArbitratorState, error) // takes an optional database transaction, which will be used if it is
// non-nil, otherwise the lookup will be done in its own transaction.
CurrentState(tx kvdb.RTx) (ArbitratorState, error)
// CommitState persists, the current state of the chain attendant. // CommitState persists, the current state of the chain attendant.
CommitState(ArbitratorState) error CommitState(ArbitratorState) error
@ -96,8 +98,10 @@ type ArbitratorLog interface {
InsertConfirmedCommitSet(c *CommitSet) error InsertConfirmedCommitSet(c *CommitSet) error
// FetchConfirmedCommitSet fetches the known confirmed active HTLC set // FetchConfirmedCommitSet fetches the known confirmed active HTLC set
// from the database. // from the database. It takes an optional database transaction, which
FetchConfirmedCommitSet() (*CommitSet, error) // will be used if it is non-nil, otherwise the lookup will be done in
// its own transaction.
FetchConfirmedCommitSet(tx kvdb.RTx) (*CommitSet, error)
// FetchChainActions attempts to fetch the set of previously stored // FetchChainActions attempts to fetch the set of previously stored
// chain actions. We'll use this upon restart to properly advance our // chain actions. We'll use this upon restart to properly advance our
@ -412,27 +416,28 @@ func (b *boltArbitratorLog) writeResolver(contractBucket kvdb.RwBucket,
return contractBucket.Put(resKey, buf.Bytes()) return contractBucket.Put(resKey, buf.Bytes())
} }
// CurrentState returns the current state of the ChannelArbitrator. // CurrentState returns the current state of the ChannelArbitrator. It takes an
// optional database transaction, which will be used if it is non-nil, otherwise
// the lookup will be done in its own transaction.
// //
// NOTE: Part of the ContractResolver interface. // NOTE: Part of the ContractResolver interface.
func (b *boltArbitratorLog) CurrentState() (ArbitratorState, error) { func (b *boltArbitratorLog) CurrentState(tx kvdb.RTx) (ArbitratorState, error) {
var s ArbitratorState var (
err := kvdb.View(b.db, func(tx kvdb.RTx) error { s ArbitratorState
scopeBucket := tx.ReadBucket(b.scopeKey[:]) err error
if scopeBucket == nil { )
return errScopeBucketNoExist
}
stateBytes := scopeBucket.Get(stateKey) if tx != nil {
if stateBytes == nil { s, err = b.currentState(tx)
return nil } else {
} err = kvdb.View(b.db, func(tx kvdb.RTx) error {
s, err = b.currentState(tx)
s = ArbitratorState(stateBytes[0]) return err
return nil
}, func() { }, func() {
s = 0 s = 0
}) })
}
if err != nil && err != errScopeBucketNoExist { if err != nil && err != errScopeBucketNoExist {
return s, err return s, err
} }
@ -440,6 +445,20 @@ func (b *boltArbitratorLog) CurrentState() (ArbitratorState, error) {
return s, nil return s, nil
} }
func (b *boltArbitratorLog) currentState(tx kvdb.RTx) (ArbitratorState, error) {
scopeBucket := tx.ReadBucket(b.scopeKey[:])
if scopeBucket == nil {
return 0, errScopeBucketNoExist
}
stateBytes := scopeBucket.Get(stateKey)
if stateBytes == nil {
return 0, nil
}
return ArbitratorState(stateBytes[0]), nil
}
// CommitState persists, the current state of the chain attendant. // CommitState persists, the current state of the chain attendant.
// //
// NOTE: Part of the ContractResolver interface. // NOTE: Part of the ContractResolver interface.
@ -851,29 +870,20 @@ func (b *boltArbitratorLog) InsertConfirmedCommitSet(c *CommitSet) error {
} }
// FetchConfirmedCommitSet fetches the known confirmed active HTLC set from the // FetchConfirmedCommitSet fetches the known confirmed active HTLC set from the
// database. // database. It takes an optional database transaction, which will be used if it
// is non-nil, otherwise the lookup will be done in its own transaction.
// //
// NOTE: Part of the ContractResolver interface. // NOTE: Part of the ContractResolver interface.
func (b *boltArbitratorLog) FetchConfirmedCommitSet() (*CommitSet, error) { func (b *boltArbitratorLog) FetchConfirmedCommitSet(tx kvdb.RTx) (*CommitSet, error) {
if tx != nil {
return b.fetchConfirmedCommitSet(tx)
}
var c *CommitSet var c *CommitSet
err := kvdb.View(b.db, func(tx kvdb.RTx) error { err := kvdb.View(b.db, func(tx kvdb.RTx) error {
scopeBucket := tx.ReadBucket(b.scopeKey[:]) var err error
if scopeBucket == nil { c, err = b.fetchConfirmedCommitSet(tx)
return errScopeBucketNoExist
}
commitSetBytes := scopeBucket.Get(commitSetKey)
if commitSetBytes == nil {
return errNoCommitSet
}
commitSet, err := decodeCommitSet(bytes.NewReader(commitSetBytes))
if err != nil {
return err return err
}
c = commitSet
return nil
}, func() { }, func() {
c = nil c = nil
}) })
@ -884,6 +894,22 @@ func (b *boltArbitratorLog) FetchConfirmedCommitSet() (*CommitSet, error) {
return c, nil return c, nil
} }
func (b *boltArbitratorLog) fetchConfirmedCommitSet(tx kvdb.RTx) (*CommitSet,
error) {
scopeBucket := tx.ReadBucket(b.scopeKey[:])
if scopeBucket == nil {
return nil, errScopeBucketNoExist
}
commitSetBytes := scopeBucket.Get(commitSetKey)
if commitSetBytes == nil {
return nil, errNoCommitSet
}
return decodeCommitSet(bytes.NewReader(commitSetBytes))
}
// WipeHistory is to be called ONLY once *all* contracts have been fully // WipeHistory is to be called ONLY once *all* contracts have been fully
// resolved, and the channel closure if finalized. This method will delete all // resolved, and the channel closure if finalized. This method will delete all
// on-disk state within the persistent log. // on-disk state within the persistent log.

@ -611,7 +611,7 @@ func TestStateMutation(t *testing.T) {
defer cleanUp() defer cleanUp()
// The default state of an arbitrator should be StateDefault. // The default state of an arbitrator should be StateDefault.
arbState, err := testLog.CurrentState() arbState, err := testLog.CurrentState(nil)
if err != nil { if err != nil {
t.Fatalf("unable to read arb state: %v", err) t.Fatalf("unable to read arb state: %v", err)
} }
@ -625,7 +625,7 @@ func TestStateMutation(t *testing.T) {
if err := testLog.CommitState(StateFullyResolved); err != nil { if err := testLog.CommitState(StateFullyResolved); err != nil {
t.Fatalf("unable to write state: %v", err) t.Fatalf("unable to write state: %v", err)
} }
arbState, err = testLog.CurrentState() arbState, err = testLog.CurrentState(nil)
if err != nil { if err != nil {
t.Fatalf("unable to read arb state: %v", err) t.Fatalf("unable to read arb state: %v", err)
} }
@ -643,7 +643,7 @@ func TestStateMutation(t *testing.T) {
// If we try to query for the state again, we should get the default // If we try to query for the state again, we should get the default
// state again. // state again.
arbState, err = testLog.CurrentState() arbState, err = testLog.CurrentState(nil)
if err != nil { if err != nil {
t.Fatalf("unable to query current state: %v", err) t.Fatalf("unable to query current state: %v", err)
} }
@ -687,11 +687,11 @@ func TestScopeIsolation(t *testing.T) {
// Querying each log, the states should be the prior one we set, and be // Querying each log, the states should be the prior one we set, and be
// disjoint. // disjoint.
log1State, err := testLog1.CurrentState() log1State, err := testLog1.CurrentState(nil)
if err != nil { if err != nil {
t.Fatalf("unable to read arb state: %v", err) t.Fatalf("unable to read arb state: %v", err)
} }
log2State, err := testLog2.CurrentState() log2State, err := testLog2.CurrentState(nil)
if err != nil { if err != nil {
t.Fatalf("unable to read arb state: %v", err) t.Fatalf("unable to read arb state: %v", err)
} }
@ -752,7 +752,7 @@ func TestCommitSetStorage(t *testing.T) {
t.Fatalf("unable to write commit set: %v", err) t.Fatalf("unable to write commit set: %v", err)
} }
diskCommitSet, err := testLog.FetchConfirmedCommitSet() diskCommitSet, err := testLog.FetchConfirmedCommitSet(nil)
if err != nil { if err != nil {
t.Fatalf("unable to read commit set: %v", err) t.Fatalf("unable to read commit set: %v", err)
} }

@ -10,6 +10,7 @@ import (
"github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil" "github.com/btcsuite/btcutil"
"github.com/btcsuite/btcwallet/walletdb"
"github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/kvdb" "github.com/lightningnetwork/lnd/channeldb/kvdb"
@ -312,18 +313,8 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
log.Tracef("Creating ChannelArbitrator for ChannelPoint(%v)", log.Tracef("Creating ChannelArbitrator for ChannelPoint(%v)",
channel.FundingOutpoint) channel.FundingOutpoint)
// We'll start by registering for a block epoch notifications so this
// channel can keep track of the current state of the main chain.
//
// TODO(roasbeef): fetch best height (or pass in) so can ensure block // TODO(roasbeef): fetch best height (or pass in) so can ensure block
// epoch delivers all the notifications to // epoch delivers all the notifications to
//
// TODO(roasbeef): instead 1 block epoch that multi-plexes to the rest?
// * reduces the number of goroutines
blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn(nil)
if err != nil {
return nil, err
}
chanPoint := channel.FundingOutpoint chanPoint := channel.FundingOutpoint
@ -333,7 +324,6 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
ChanPoint: chanPoint, ChanPoint: chanPoint,
Channel: c.getArbChannel(channel), Channel: c.getArbChannel(channel),
ShortChanID: channel.ShortChanID(), ShortChanID: channel.ShortChanID(),
BlockEpochs: blockEpoch,
MarkCommitmentBroadcasted: channel.MarkCommitmentBroadcasted, MarkCommitmentBroadcasted: channel.MarkCommitmentBroadcasted,
MarkChannelClosed: func(summary *channeldb.ChannelCloseSummary, MarkChannelClosed: func(summary *channeldb.ChannelCloseSummary,
@ -369,7 +359,6 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint, c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint,
) )
if err != nil { if err != nil {
blockEpoch.Cancel()
return nil, err return nil, err
} }
@ -385,7 +374,6 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
pendingRemoteCommitment, err := channel.RemoteCommitChainTip() pendingRemoteCommitment, err := channel.RemoteCommitChainTip()
if err != nil && err != channeldb.ErrNoPendingCommit { if err != nil && err != channeldb.ErrNoPendingCommit {
blockEpoch.Cancel()
return nil, err return nil, err
} }
if pendingRemoteCommitment != nil { if pendingRemoteCommitment != nil {
@ -545,18 +533,12 @@ func (c *ChainArbitrator) Start() error {
// the chain any longer, only resolve the contracts on the confirmed // the chain any longer, only resolve the contracts on the confirmed
// commitment. // commitment.
for _, closeChanInfo := range closingChannels { for _, closeChanInfo := range closingChannels {
blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn(nil)
if err != nil {
return err
}
// We can leave off the CloseContract and ForceCloseChan // We can leave off the CloseContract and ForceCloseChan
// methods as the channel is already closed at this point. // methods as the channel is already closed at this point.
chanPoint := closeChanInfo.ChanPoint chanPoint := closeChanInfo.ChanPoint
arbCfg := ChannelArbitratorConfig{ arbCfg := ChannelArbitratorConfig{
ChanPoint: chanPoint, ChanPoint: chanPoint,
ShortChanID: closeChanInfo.ShortChanID, ShortChanID: closeChanInfo.ShortChanID,
BlockEpochs: blockEpoch,
ChainArbitratorConfig: c.cfg, ChainArbitratorConfig: c.cfg,
ChainEvents: &ChainEventSubscription{}, ChainEvents: &ChainEventSubscription{},
IsPendingClose: true, IsPendingClose: true,
@ -574,7 +556,6 @@ func (c *ChainArbitrator) Start() error {
c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint, c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint,
) )
if err != nil { if err != nil {
blockEpoch.Cancel()
return err return err
} }
arbCfg.MarkChannelResolved = func() error { arbCfg.MarkChannelResolved = func() error {
@ -617,30 +598,182 @@ func (c *ChainArbitrator) Start() error {
close(watcherErrs) close(watcherErrs)
}() }()
// stopAndLog is a helper function which shuts down the chain arb and
// logs errors if they occur.
stopAndLog := func() {
if err := c.Stop(); err != nil {
log.Errorf("ChainArbitrator could not shutdown: %v", err)
}
}
// Handle all errors returned from spawning our chain watchers. If any // Handle all errors returned from spawning our chain watchers. If any
// of them failed, we will stop the chain arb to shutdown any active // of them failed, we will stop the chain arb to shutdown any active
// goroutines. // goroutines.
for err := range watcherErrs { for err := range watcherErrs {
if err != nil { if err != nil {
c.Stop() stopAndLog()
return err return err
} }
} }
// Finally, we'll launch all the goroutines for each arbitrator so they // Before we start all of our arbitrators, we do a preliminary state
// can carry out their duties. // lookup so that we can combine all of these lookups in a single db
// transaction.
var startStates map[wire.OutPoint]*chanArbStartState
err = kvdb.View(c.chanSource, func(tx walletdb.ReadTx) error {
for _, arbitrator := range c.activeChannels { for _, arbitrator := range c.activeChannels {
if err := arbitrator.Start(); err != nil { startState, err := arbitrator.getStartState(tx)
c.Stop() if err != nil {
return err
}
startStates[arbitrator.cfg.ChanPoint] = startState
}
return nil
}, func() {
startStates = make(
map[wire.OutPoint]*chanArbStartState,
len(c.activeChannels),
)
})
if err != nil {
stopAndLog()
return err
}
// Launch all the goroutines for each arbitrator so they can carry out
// their duties.
for _, arbitrator := range c.activeChannels {
startState, ok := startStates[arbitrator.cfg.ChanPoint]
if !ok {
stopAndLog()
return fmt.Errorf("arbitrator: %v has no start state",
arbitrator.cfg.ChanPoint)
}
if err := arbitrator.Start(startState); err != nil {
stopAndLog()
return err return err
} }
} }
// Subscribe to a single stream of block epoch notifications that we
// will dispatch to all active arbitrators.
blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn(nil)
if err != nil {
return err
}
// Start our goroutine which will dispatch blocks to each arbitrator.
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.dispatchBlocks(blockEpoch)
}()
// TODO(roasbeef): eventually move all breach watching here // TODO(roasbeef): eventually move all breach watching here
return nil return nil
} }
// blockRecipient contains the information we need to dispatch a block to a
// channel arbitrator.
type blockRecipient struct {
// chanPoint is the funding outpoint of the channel.
chanPoint wire.OutPoint
// blocks is the channel that new block heights are sent into. This
// channel should be sufficiently buffered as to not block the sender.
blocks chan<- int32
// quit is closed if the receiving entity is shutting down.
quit chan struct{}
}
// dispatchBlocks consumes a block epoch notification stream and dispatches
// blocks to each of the chain arb's active channel arbitrators. This function
// must be run in a goroutine.
func (c *ChainArbitrator) dispatchBlocks(
blockEpoch *chainntnfs.BlockEpochEvent) {
// getRecipients is a helper function which acquires the chain arb
// lock and returns a set of block recipients which can be used to
// dispatch blocks.
getRecipients := func() []blockRecipient {
c.Lock()
blocks := make([]blockRecipient, 0, len(c.activeChannels))
for _, channel := range c.activeChannels {
blocks = append(blocks, blockRecipient{
chanPoint: channel.cfg.ChanPoint,
blocks: channel.blocks,
quit: channel.quit,
})
}
c.Unlock()
return blocks
}
// On exit, cancel our blocks subscription and close each block channel
// so that the arbitrators know they will no longer be receiving blocks.
defer func() {
blockEpoch.Cancel()
recipients := getRecipients()
for _, recipient := range recipients {
close(recipient.blocks)
}
}()
// Consume block epochs until we receive the instruction to shutdown.
for {
select {
// Consume block epochs, exiting if our subscription is
// terminated.
case block, ok := <-blockEpoch.Epochs:
if !ok {
log.Trace("dispatchBlocks block epoch " +
"cancelled")
return
}
// Get the set of currently active channels block
// subscription channels and dispatch the block to
// each.
for _, recipient := range getRecipients() {
select {
// Deliver the block to the arbitrator.
case recipient.blocks <- block.Height:
// If the recipient is shutting down, exit
// without delivering the block. This may be
// the case when two blocks are mined in quick
// succession, and the arbitrator resolves
// after the first block, and does not need to
// consume the second block.
case <-recipient.quit:
log.Debugf("channel: %v exit without "+
"receiving block: %v",
recipient.chanPoint,
block.Height)
// If the chain arb is shutting down, we don't
// need to deliver any more blocks (everything
// will be shutting down).
case <-c.quit:
return
}
}
// Exit if the chain arbitrator is shutting down.
case <-c.quit:
return
}
}
}
// publishClosingTxs will load any stored cooperative or unilater closing // publishClosingTxs will load any stored cooperative or unilater closing
// transactions and republish them. This helps ensure propagation of the // transactions and republish them. This helps ensure propagation of the
// transactions in the event that prior publications failed. // transactions in the event that prior publications failed.
@ -970,7 +1103,7 @@ func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error
// arbitrators, then launch it. // arbitrators, then launch it.
c.activeChannels[chanPoint] = channelArb c.activeChannels[chanPoint] = channelArb
if err := channelArb.Start(); err != nil { if err := channelArb.Start(nil); err != nil {
return err return err
} }

@ -12,7 +12,6 @@ import (
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil" "github.com/btcsuite/btcutil"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/kvdb" "github.com/lightningnetwork/lnd/channeldb/kvdb"
"github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/input"
@ -34,6 +33,10 @@ const (
// anchorSweepConfTarget is the conf target used when sweeping // anchorSweepConfTarget is the conf target used when sweeping
// commitment anchors. // commitment anchors.
anchorSweepConfTarget = 6 anchorSweepConfTarget = 6
// arbitratorBlockBufferSize is the size of the buffer we give to each
// channel arbitrator.
arbitratorBlockBufferSize = 20
) )
// WitnessSubscription represents an intent to be notified once new witnesses // WitnessSubscription represents an intent to be notified once new witnesses
@ -108,12 +111,6 @@ type ChannelArbitratorConfig struct {
// to the switch during contract resolution. // to the switch during contract resolution.
ShortChanID lnwire.ShortChannelID ShortChanID lnwire.ShortChannelID
// BlockEpochs is an active block epoch event stream backed by an
// active ChainNotifier instance. We will use new block notifications
// sent over this channel to decide when we should go on chain to
// reclaim/redeem the funds in an HTLC sent to/from us.
BlockEpochs *chainntnfs.BlockEpochEvent
// ChainEvents is an active subscription to the chain watcher for this // ChainEvents is an active subscription to the chain watcher for this
// channel to be notified of any on-chain activity related to this // channel to be notified of any on-chain activity related to this
// channel. // channel.
@ -325,6 +322,11 @@ type ChannelArbitrator struct {
// to do its duty. // to do its duty.
cfg ChannelArbitratorConfig cfg ChannelArbitratorConfig
// blocks is a channel that the arbitrator will receive new blocks on.
// This channel should be buffered by so that it does not block the
// sender.
blocks chan int32
// signalUpdates is a channel that any new live signals for the channel // signalUpdates is a channel that any new live signals for the channel
// we're watching over will be sent. // we're watching over will be sent.
signalUpdates chan *signalUpdateMsg signalUpdates chan *signalUpdateMsg
@ -366,6 +368,7 @@ func NewChannelArbitrator(cfg ChannelArbitratorConfig,
return &ChannelArbitrator{ return &ChannelArbitrator{
log: log, log: log,
blocks: make(chan int32, arbitratorBlockBufferSize),
signalUpdates: make(chan *signalUpdateMsg), signalUpdates: make(chan *signalUpdateMsg),
htlcUpdates: make(<-chan *ContractUpdate), htlcUpdates: make(<-chan *ContractUpdate),
resolutionSignal: make(chan struct{}), resolutionSignal: make(chan struct{}),
@ -376,16 +379,58 @@ func NewChannelArbitrator(cfg ChannelArbitratorConfig,
} }
} }
// chanArbStartState contains the information from disk that we need to start
// up a channel arbitrator.
type chanArbStartState struct {
currentState ArbitratorState
commitSet *CommitSet
}
// getStartState retrieves the information from disk that our channel arbitrator
// requires to start.
func (c *ChannelArbitrator) getStartState(tx kvdb.RTx) (*chanArbStartState,
error) {
// First, we'll read our last state from disk, so our internal state
// machine can act accordingly.
state, err := c.log.CurrentState(tx)
if err != nil {
return nil, err
}
// Next we'll fetch our confirmed commitment set. This will only exist
// if the channel has been closed out on chain for modern nodes. For
// older nodes, this won't be found at all, and will rely on the
// existing written chain actions. Additionally, if this channel hasn't
// logged any actions in the log, then this field won't be present.
commitSet, err := c.log.FetchConfirmedCommitSet(tx)
if err != nil && err != errNoCommitSet && err != errScopeBucketNoExist {
return nil, err
}
return &chanArbStartState{
currentState: state,
commitSet: commitSet,
}, nil
}
// Start starts all the goroutines that the ChannelArbitrator needs to operate. // Start starts all the goroutines that the ChannelArbitrator needs to operate.
func (c *ChannelArbitrator) Start() error { // If takes a start state, which will be looked up on disk if it is not
// provided.
func (c *ChannelArbitrator) Start(state *chanArbStartState) error {
if !atomic.CompareAndSwapInt32(&c.started, 0, 1) { if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
return nil return nil
} }
c.startTimestamp = c.cfg.Clock.Now() c.startTimestamp = c.cfg.Clock.Now()
var ( // If the state passed in is nil, we look it up now.
err error if state == nil {
) var err error
state, err = c.getStartState(nil)
if err != nil {
return err
}
}
log.Debugf("Starting ChannelArbitrator(%v), htlc_set=%v", log.Debugf("Starting ChannelArbitrator(%v), htlc_set=%v",
c.cfg.ChanPoint, newLogClosure(func() string { c.cfg.ChanPoint, newLogClosure(func() string {
@ -393,17 +438,11 @@ func (c *ChannelArbitrator) Start() error {
}), }),
) )
// First, we'll read our last state from disk, so our internal state // Set our state from our starting state.
// machine can act accordingly. c.state = state.currentState
c.state, err = c.log.CurrentState()
if err != nil {
c.cfg.BlockEpochs.Cancel()
return err
}
_, bestHeight, err := c.cfg.ChainIO.GetBestBlock() _, bestHeight, err := c.cfg.ChainIO.GetBestBlock()
if err != nil { if err != nil {
c.cfg.BlockEpochs.Cancel()
return err return err
} }
@ -448,21 +487,11 @@ func (c *ChannelArbitrator) Start() error {
"triggerHeight=%v", c.cfg.ChanPoint, c.state, trigger, "triggerHeight=%v", c.cfg.ChanPoint, c.state, trigger,
triggerHeight) triggerHeight)
// Next we'll fetch our confirmed commitment set. This will only exist
// if the channel has been closed out on chain for modern nodes. For
// older nodes, this won't be found at all, and will rely on the
// existing written chain actions. Additionally, if this channel hasn't
// logged any actions in the log, then this field won't be present.
commitSet, err := c.log.FetchConfirmedCommitSet()
if err != nil && err != errNoCommitSet && err != errScopeBucketNoExist {
return err
}
// We'll now attempt to advance our state forward based on the current // We'll now attempt to advance our state forward based on the current
// on-chain state, and our set of active contracts. // on-chain state, and our set of active contracts.
startingState := c.state startingState := c.state
nextState, _, err := c.advanceState( nextState, _, err := c.advanceState(
triggerHeight, trigger, commitSet, triggerHeight, trigger, state.commitSet,
) )
if err != nil { if err != nil {
switch err { switch err {
@ -479,7 +508,6 @@ func (c *ChannelArbitrator) Start() error {
c.cfg.ChanPoint) c.cfg.ChanPoint)
default: default:
c.cfg.BlockEpochs.Cancel()
return err return err
} }
} }
@ -500,8 +528,8 @@ func (c *ChannelArbitrator) Start() error {
// receive a chain event from the chain watcher than the // receive a chain event from the chain watcher than the
// commitment has been confirmed on chain, and before we // commitment has been confirmed on chain, and before we
// advance our state step, we call InsertConfirmedCommitSet. // advance our state step, we call InsertConfirmedCommitSet.
if err := c.relaunchResolvers(commitSet, triggerHeight); err != nil { err := c.relaunchResolvers(state.commitSet, triggerHeight)
c.cfg.BlockEpochs.Cancel() if err != nil {
return err return err
} }
} }
@ -2111,7 +2139,6 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32) {
// TODO(roasbeef): tell top chain arb we're done // TODO(roasbeef): tell top chain arb we're done
defer func() { defer func() {
c.cfg.BlockEpochs.Cancel()
c.wg.Done() c.wg.Done()
}() }()
@ -2121,11 +2148,11 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32) {
// A new block has arrived, we'll examine all the active HTLC's // A new block has arrived, we'll examine all the active HTLC's
// to see if any of them have expired, and also update our // to see if any of them have expired, and also update our
// track of the best current height. // track of the best current height.
case blockEpoch, ok := <-c.cfg.BlockEpochs.Epochs: case blockHeight, ok := <-c.blocks:
if !ok { if !ok {
return return
} }
bestHeight = blockEpoch.Height bestHeight = blockHeight
// If we're not in the default state, then we can // If we're not in the default state, then we can
// ignore this signal as we're waiting for contract // ignore this signal as we're waiting for contract

@ -51,7 +51,7 @@ type mockArbitratorLog struct {
// interface. // interface.
var _ ArbitratorLog = (*mockArbitratorLog)(nil) var _ ArbitratorLog = (*mockArbitratorLog)(nil)
func (b *mockArbitratorLog) CurrentState() (ArbitratorState, error) { func (b *mockArbitratorLog) CurrentState(kvdb.RTx) (ArbitratorState, error) {
return b.state, nil return b.state, nil
} }
@ -140,7 +140,7 @@ func (b *mockArbitratorLog) InsertConfirmedCommitSet(c *CommitSet) error {
return nil return nil
} }
func (b *mockArbitratorLog) FetchConfirmedCommitSet() (*CommitSet, error) { func (b *mockArbitratorLog) FetchConfirmedCommitSet(kvdb.RTx) (*CommitSet, error) {
return b.commitSet, nil return b.commitSet, nil
} }
@ -197,8 +197,6 @@ type chanArbTestCtx struct {
resolvedChan chan struct{} resolvedChan chan struct{}
blockEpochs chan *chainntnfs.BlockEpoch
incubationRequests chan struct{} incubationRequests chan struct{}
resolutions chan []ResolutionMsg resolutions chan []ResolutionMsg
@ -277,7 +275,7 @@ func (c *chanArbTestCtx) Restart(restartClosure func(*chanArbTestCtx)) (*chanArb
restartClosure(newCtx) restartClosure(newCtx)
} }
if err := newCtx.chanArb.Start(); err != nil { if err := newCtx.chanArb.Start(nil); err != nil {
return nil, err return nil, err
} }
@ -304,12 +302,6 @@ func withMarkClosed(markClosed func(*channeldb.ChannelCloseSummary,
func createTestChannelArbitrator(t *testing.T, log ArbitratorLog, func createTestChannelArbitrator(t *testing.T, log ArbitratorLog,
opts ...testChanArbOption) (*chanArbTestCtx, error) { opts ...testChanArbOption) (*chanArbTestCtx, error) {
blockEpochs := make(chan *chainntnfs.BlockEpoch)
blockEpoch := &chainntnfs.BlockEpochEvent{
Epochs: blockEpochs,
Cancel: func() {},
}
chanPoint := wire.OutPoint{} chanPoint := wire.OutPoint{}
shortChanID := lnwire.ShortChannelID{} shortChanID := lnwire.ShortChannelID{}
chanEvents := &ChainEventSubscription{ chanEvents := &ChainEventSubscription{
@ -366,7 +358,6 @@ func createTestChannelArbitrator(t *testing.T, log ArbitratorLog,
arbCfg := &ChannelArbitratorConfig{ arbCfg := &ChannelArbitratorConfig{
ChanPoint: chanPoint, ChanPoint: chanPoint,
ShortChanID: shortChanID, ShortChanID: shortChanID,
BlockEpochs: blockEpoch,
MarkChannelResolved: func() error { MarkChannelResolved: func() error {
resolvedChan <- struct{}{} resolvedChan <- struct{}{}
return nil return nil
@ -433,7 +424,6 @@ func createTestChannelArbitrator(t *testing.T, log ArbitratorLog,
cleanUp: cleanUp, cleanUp: cleanUp,
resolvedChan: resolvedChan, resolvedChan: resolvedChan,
resolutions: resolutionChan, resolutions: resolutionChan,
blockEpochs: blockEpochs,
log: log, log: log,
incubationRequests: incubateChan, incubationRequests: incubateChan,
sweeper: mockSweeper, sweeper: mockSweeper,
@ -454,7 +444,7 @@ func TestChannelArbitratorCooperativeClose(t *testing.T) {
t.Fatalf("unable to create ChannelArbitrator: %v", err) t.Fatalf("unable to create ChannelArbitrator: %v", err)
} }
if err := chanArbCtx.chanArb.Start(); err != nil { if err := chanArbCtx.chanArb.Start(nil); err != nil {
t.Fatalf("unable to start ChannelArbitrator: %v", err) t.Fatalf("unable to start ChannelArbitrator: %v", err)
} }
defer func() { defer func() {
@ -516,7 +506,7 @@ func TestChannelArbitratorRemoteForceClose(t *testing.T) {
} }
chanArb := chanArbCtx.chanArb chanArb := chanArbCtx.chanArb
if err := chanArb.Start(); err != nil { if err := chanArb.Start(nil); err != nil {
t.Fatalf("unable to start ChannelArbitrator: %v", err) t.Fatalf("unable to start ChannelArbitrator: %v", err)
} }
defer chanArb.Stop() defer chanArb.Stop()
@ -571,7 +561,7 @@ func TestChannelArbitratorLocalForceClose(t *testing.T) {
} }
chanArb := chanArbCtx.chanArb chanArb := chanArbCtx.chanArb
if err := chanArb.Start(); err != nil { if err := chanArb.Start(nil); err != nil {
t.Fatalf("unable to start ChannelArbitrator: %v", err) t.Fatalf("unable to start ChannelArbitrator: %v", err)
} }
defer chanArb.Stop() defer chanArb.Stop()
@ -677,7 +667,7 @@ func TestChannelArbitratorBreachClose(t *testing.T) {
} }
chanArb := chanArbCtx.chanArb chanArb := chanArbCtx.chanArb
if err := chanArb.Start(); err != nil { if err := chanArb.Start(nil); err != nil {
t.Fatalf("unable to start ChannelArbitrator: %v", err) t.Fatalf("unable to start ChannelArbitrator: %v", err)
} }
defer func() { defer func() {
@ -722,7 +712,7 @@ func TestChannelArbitratorLocalForceClosePendingHtlc(t *testing.T) {
chanArb.cfg.PreimageDB = newMockWitnessBeacon() chanArb.cfg.PreimageDB = newMockWitnessBeacon()
chanArb.cfg.Registry = &mockRegistry{} chanArb.cfg.Registry = &mockRegistry{}
if err := chanArb.Start(); err != nil { if err := chanArb.Start(nil); err != nil {
t.Fatalf("unable to start ChannelArbitrator: %v", err) t.Fatalf("unable to start ChannelArbitrator: %v", err)
} }
defer chanArb.Stop() defer chanArb.Stop()
@ -994,7 +984,7 @@ func TestChannelArbitratorLocalForceCloseRemoteConfirmed(t *testing.T) {
} }
chanArb := chanArbCtx.chanArb chanArb := chanArbCtx.chanArb
if err := chanArb.Start(); err != nil { if err := chanArb.Start(nil); err != nil {
t.Fatalf("unable to start ChannelArbitrator: %v", err) t.Fatalf("unable to start ChannelArbitrator: %v", err)
} }
defer chanArb.Stop() defer chanArb.Stop()
@ -1103,7 +1093,7 @@ func TestChannelArbitratorLocalForceDoubleSpend(t *testing.T) {
} }
chanArb := chanArbCtx.chanArb chanArb := chanArbCtx.chanArb
if err := chanArb.Start(); err != nil { if err := chanArb.Start(nil); err != nil {
t.Fatalf("unable to start ChannelArbitrator: %v", err) t.Fatalf("unable to start ChannelArbitrator: %v", err)
} }
defer chanArb.Stop() defer chanArb.Stop()
@ -1211,7 +1201,7 @@ func TestChannelArbitratorPersistence(t *testing.T) {
} }
chanArb := chanArbCtx.chanArb chanArb := chanArbCtx.chanArb
if err := chanArb.Start(); err != nil { if err := chanArb.Start(nil); err != nil {
t.Fatalf("unable to start ChannelArbitrator: %v", err) t.Fatalf("unable to start ChannelArbitrator: %v", err)
} }
@ -1335,7 +1325,7 @@ func TestChannelArbitratorForceCloseBreachedChannel(t *testing.T) {
} }
chanArb := chanArbCtx.chanArb chanArb := chanArbCtx.chanArb
if err := chanArb.Start(); err != nil { if err := chanArb.Start(nil); err != nil {
t.Fatalf("unable to start ChannelArbitrator: %v", err) t.Fatalf("unable to start ChannelArbitrator: %v", err)
} }
@ -1497,7 +1487,7 @@ func TestChannelArbitratorCommitFailure(t *testing.T) {
} }
chanArb := chanArbCtx.chanArb chanArb := chanArbCtx.chanArb
if err := chanArb.Start(); err != nil { if err := chanArb.Start(nil); err != nil {
t.Fatalf("unable to start ChannelArbitrator: %v", err) t.Fatalf("unable to start ChannelArbitrator: %v", err)
} }
@ -1582,7 +1572,7 @@ func TestChannelArbitratorEmptyResolutions(t *testing.T) {
chanArb.cfg.ClosingHeight = 100 chanArb.cfg.ClosingHeight = 100
chanArb.cfg.CloseType = channeldb.RemoteForceClose chanArb.cfg.CloseType = channeldb.RemoteForceClose
if err := chanArb.Start(); err != nil { if err := chanArb.Start(nil); err != nil {
t.Fatalf("unable to start ChannelArbitrator: %v", err) t.Fatalf("unable to start ChannelArbitrator: %v", err)
} }
@ -1614,7 +1604,7 @@ func TestChannelArbitratorAlreadyForceClosed(t *testing.T) {
t.Fatalf("unable to create ChannelArbitrator: %v", err) t.Fatalf("unable to create ChannelArbitrator: %v", err)
} }
chanArb := chanArbCtx.chanArb chanArb := chanArbCtx.chanArb
if err := chanArb.Start(); err != nil { if err := chanArb.Start(nil); err != nil {
t.Fatalf("unable to start ChannelArbitrator: %v", err) t.Fatalf("unable to start ChannelArbitrator: %v", err)
} }
defer chanArb.Stop() defer chanArb.Stop()
@ -1712,7 +1702,7 @@ func TestChannelArbitratorDanglingCommitForceClose(t *testing.T) {
t.Fatalf("unable to create ChannelArbitrator: %v", err) t.Fatalf("unable to create ChannelArbitrator: %v", err)
} }
chanArb := chanArbCtx.chanArb chanArb := chanArbCtx.chanArb
if err := chanArb.Start(); err != nil { if err := chanArb.Start(nil); err != nil {
t.Fatalf("unable to start ChannelArbitrator: %v", err) t.Fatalf("unable to start ChannelArbitrator: %v", err)
} }
defer chanArb.Stop() defer chanArb.Stop()
@ -1759,7 +1749,7 @@ func TestChannelArbitratorDanglingCommitForceClose(t *testing.T) {
// now mine a block (height 5), which is 5 blocks away // now mine a block (height 5), which is 5 blocks away
// (our grace delta) from the expiry of that HTLC. // (our grace delta) from the expiry of that HTLC.
case testCase.htlcExpired: case testCase.htlcExpired:
chanArbCtx.blockEpochs <- &chainntnfs.BlockEpoch{Height: 5} chanArbCtx.chanArb.blocks <- 5
// Otherwise, we'll just trigger a regular force close // Otherwise, we'll just trigger a regular force close
// request. // request.
@ -1863,7 +1853,7 @@ func TestChannelArbitratorDanglingCommitForceClose(t *testing.T) {
// so instead, we'll mine another block which'll cause // so instead, we'll mine another block which'll cause
// it to re-examine its state and realize there're no // it to re-examine its state and realize there're no
// more HTLCs. // more HTLCs.
chanArbCtx.blockEpochs <- &chainntnfs.BlockEpoch{Height: 6} chanArbCtx.chanArb.blocks <- 6
chanArbCtx.AssertStateTransitions(StateFullyResolved) chanArbCtx.AssertStateTransitions(StateFullyResolved)
}) })
} }
@ -1903,7 +1893,7 @@ func TestChannelArbitratorPendingExpiredHTLC(t *testing.T) {
return false return false
} }
if err := chanArb.Start(); err != nil { if err := chanArb.Start(nil); err != nil {
t.Fatalf("unable to start ChannelArbitrator: %v", err) t.Fatalf("unable to start ChannelArbitrator: %v", err)
} }
defer func() { defer func() {
@ -1940,13 +1930,13 @@ func TestChannelArbitratorPendingExpiredHTLC(t *testing.T) {
// We will advance the uptime to 10 seconds which should be still within // We will advance the uptime to 10 seconds which should be still within
// the grace period and should not trigger going to chain. // the grace period and should not trigger going to chain.
testClock.SetTime(startTime.Add(time.Second * 10)) testClock.SetTime(startTime.Add(time.Second * 10))
chanArbCtx.blockEpochs <- &chainntnfs.BlockEpoch{Height: 5} chanArbCtx.chanArb.blocks <- 5
chanArbCtx.AssertState(StateDefault) chanArbCtx.AssertState(StateDefault)
// We will advance the uptime to 16 seconds which should trigger going // We will advance the uptime to 16 seconds which should trigger going
// to chain. // to chain.
testClock.SetTime(startTime.Add(time.Second * 16)) testClock.SetTime(startTime.Add(time.Second * 16))
chanArbCtx.blockEpochs <- &chainntnfs.BlockEpoch{Height: 6} chanArbCtx.chanArb.blocks <- 6
chanArbCtx.AssertStateTransitions( chanArbCtx.AssertStateTransitions(
StateBroadcastCommit, StateBroadcastCommit,
StateCommitmentBroadcasted, StateCommitmentBroadcasted,
@ -2060,7 +2050,7 @@ func TestRemoteCloseInitiator(t *testing.T) {
} }
chanArb := chanArbCtx.chanArb chanArb := chanArbCtx.chanArb
if err := chanArb.Start(); err != nil { if err := chanArb.Start(nil); err != nil {
t.Fatalf("unable to start "+ t.Fatalf("unable to start "+
"ChannelArbitrator: %v", err) "ChannelArbitrator: %v", err)
} }
@ -2130,7 +2120,7 @@ func TestChannelArbitratorAnchors(t *testing.T) {
{}, {}, {}, {},
} }
if err := chanArb.Start(); err != nil { if err := chanArb.Start(nil); err != nil {
t.Fatalf("unable to start ChannelArbitrator: %v", err) t.Fatalf("unable to start ChannelArbitrator: %v", err)
} }
defer func() { defer func() {