Merge pull request #1696 from halseth/contractcourt-handoff3
contractcourt handoff 2
This commit is contained in:
commit
47788c3cec
@ -532,6 +532,13 @@ func TestContractResolutionsStorage(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// First make sure that fetching unlogged contract resolutions will
|
||||||
|
// fail.
|
||||||
|
_, err = testLog.FetchContractResolutions()
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("expected reading unlogged resolution from db to fail")
|
||||||
|
}
|
||||||
|
|
||||||
// Insert the resolution into the database, then immediately retrieve
|
// Insert the resolution into the database, then immediately retrieve
|
||||||
// them so we can compare equality against the original version.
|
// them so we can compare equality against the original version.
|
||||||
if err := testLog.LogContractResolutions(&res); err != nil {
|
if err := testLog.LogContractResolutions(&res); err != nil {
|
||||||
|
@ -243,6 +243,8 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
|
|||||||
return chanMachine.ForceClose()
|
return chanMachine.ForceClose()
|
||||||
},
|
},
|
||||||
MarkCommitmentBroadcasted: channel.MarkCommitmentBroadcasted,
|
MarkCommitmentBroadcasted: channel.MarkCommitmentBroadcasted,
|
||||||
|
MarkChannelClosed: channel.CloseChannel,
|
||||||
|
IsPendingClose: false,
|
||||||
ChainArbitratorConfig: c.cfg,
|
ChainArbitratorConfig: c.cfg,
|
||||||
ChainEvents: chanEvents,
|
ChainEvents: chanEvents,
|
||||||
}
|
}
|
||||||
@ -398,6 +400,9 @@ func (c *ChainArbitrator) Start() error {
|
|||||||
BlockEpochs: blockEpoch,
|
BlockEpochs: blockEpoch,
|
||||||
ChainArbitratorConfig: c.cfg,
|
ChainArbitratorConfig: c.cfg,
|
||||||
ChainEvents: &ChainEventSubscription{},
|
ChainEvents: &ChainEventSubscription{},
|
||||||
|
IsPendingClose: true,
|
||||||
|
ClosingHeight: closeChanInfo.CloseHeight,
|
||||||
|
CloseType: closeChanInfo.CloseType,
|
||||||
}
|
}
|
||||||
chanLog, err := newBoltArbitratorLog(
|
chanLog, err := newBoltArbitratorLog(
|
||||||
c.chanSource.DB, arbCfg, c.cfg.ChainHash, chanPoint,
|
c.chanSource.DB, arbCfg, c.cfg.ChainHash, chanPoint,
|
||||||
|
@ -21,6 +21,13 @@ import (
|
|||||||
type LocalUnilateralCloseInfo struct {
|
type LocalUnilateralCloseInfo struct {
|
||||||
*chainntnfs.SpendDetail
|
*chainntnfs.SpendDetail
|
||||||
*lnwallet.LocalForceCloseSummary
|
*lnwallet.LocalForceCloseSummary
|
||||||
|
*channeldb.ChannelCloseSummary
|
||||||
|
}
|
||||||
|
|
||||||
|
// CooperativeCloseInfo encapsulates all the informnation we need to act
|
||||||
|
// on a cooperative close that gets confirmed.
|
||||||
|
type CooperativeCloseInfo struct {
|
||||||
|
*channeldb.ChannelCloseSummary
|
||||||
}
|
}
|
||||||
|
|
||||||
// ChainEventSubscription is a struct that houses a subscription to be notified
|
// ChainEventSubscription is a struct that houses a subscription to be notified
|
||||||
@ -42,9 +49,7 @@ type ChainEventSubscription struct {
|
|||||||
|
|
||||||
// CooperativeClosure is a signal that will be sent upon once a
|
// CooperativeClosure is a signal that will be sent upon once a
|
||||||
// cooperative channel closure has been detected confirmed.
|
// cooperative channel closure has been detected confirmed.
|
||||||
//
|
CooperativeClosure chan *CooperativeCloseInfo
|
||||||
// TODO(roasbeef): or something else
|
|
||||||
CooperativeClosure chan struct{}
|
|
||||||
|
|
||||||
// ContractBreach is a channel that will be sent upon if we detect a
|
// ContractBreach is a channel that will be sent upon if we detect a
|
||||||
// contract breach. The struct sent across the channel contains all the
|
// contract breach. The struct sent across the channel contains all the
|
||||||
@ -232,7 +237,7 @@ func (c *chainWatcher) SubscribeChannelEvents() *ChainEventSubscription {
|
|||||||
ChanPoint: c.cfg.chanState.FundingOutpoint,
|
ChanPoint: c.cfg.chanState.FundingOutpoint,
|
||||||
RemoteUnilateralClosure: make(chan *lnwallet.UnilateralCloseSummary, 1),
|
RemoteUnilateralClosure: make(chan *lnwallet.UnilateralCloseSummary, 1),
|
||||||
LocalUnilateralClosure: make(chan *LocalUnilateralCloseInfo, 1),
|
LocalUnilateralClosure: make(chan *LocalUnilateralCloseInfo, 1),
|
||||||
CooperativeClosure: make(chan struct{}, 1),
|
CooperativeClosure: make(chan *CooperativeCloseInfo, 1),
|
||||||
ContractBreach: make(chan *lnwallet.BreachRetribution, 1),
|
ContractBreach: make(chan *lnwallet.BreachRetribution, 1),
|
||||||
Cancel: func() {
|
Cancel: func() {
|
||||||
c.Lock()
|
c.Lock()
|
||||||
@ -511,26 +516,24 @@ func (c *chainWatcher) dispatchCooperativeClose(commitSpend *chainntnfs.SpendDet
|
|||||||
SettledBalance: localAmt,
|
SettledBalance: localAmt,
|
||||||
CloseType: channeldb.CooperativeClose,
|
CloseType: channeldb.CooperativeClose,
|
||||||
ShortChanID: c.cfg.chanState.ShortChanID(),
|
ShortChanID: c.cfg.chanState.ShortChanID(),
|
||||||
IsPending: false,
|
IsPending: true,
|
||||||
RemoteCurrentRevocation: c.cfg.chanState.RemoteCurrentRevocation,
|
RemoteCurrentRevocation: c.cfg.chanState.RemoteCurrentRevocation,
|
||||||
RemoteNextRevocation: c.cfg.chanState.RemoteNextRevocation,
|
RemoteNextRevocation: c.cfg.chanState.RemoteNextRevocation,
|
||||||
LocalChanConfig: c.cfg.chanState.LocalChanCfg,
|
LocalChanConfig: c.cfg.chanState.LocalChanCfg,
|
||||||
}
|
}
|
||||||
err := c.cfg.chanState.CloseChannel(closeSummary)
|
|
||||||
if err != nil && err != channeldb.ErrNoActiveChannels &&
|
// Create a summary of all the information needed to handle the
|
||||||
err != channeldb.ErrNoChanDBExists {
|
// cooperative closure.
|
||||||
return fmt.Errorf("unable to close chan state: %v", err)
|
closeInfo := &CooperativeCloseInfo{
|
||||||
|
ChannelCloseSummary: closeSummary,
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("closeObserver: ChannelPoint(%v) is fully "+
|
// With the event processed, we'll now notify all subscribers of the
|
||||||
"closed, at height: %v",
|
// event.
|
||||||
c.cfg.chanState.FundingOutpoint,
|
|
||||||
commitSpend.SpendingHeight)
|
|
||||||
|
|
||||||
c.Lock()
|
c.Lock()
|
||||||
for _, sub := range c.clientSubscriptions {
|
for _, sub := range c.clientSubscriptions {
|
||||||
select {
|
select {
|
||||||
case sub.CooperativeClosure <- struct{}{}:
|
case sub.CooperativeClosure <- closeInfo:
|
||||||
case <-c.quit:
|
case <-c.quit:
|
||||||
c.Unlock()
|
c.Unlock()
|
||||||
return fmt.Errorf("exiting")
|
return fmt.Errorf("exiting")
|
||||||
@ -558,8 +561,7 @@ func (c *chainWatcher) dispatchLocalForceClose(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// As we've detected that the channel has been closed, immediately
|
// As we've detected that the channel has been closed, immediately
|
||||||
// delete the state from disk, creating a close summary for future
|
// creating a close summary for future usage by related sub-systems.
|
||||||
// usage by related sub-systems.
|
|
||||||
chanSnapshot := forceClose.ChanSnapshot
|
chanSnapshot := forceClose.ChanSnapshot
|
||||||
closeSummary := &channeldb.ChannelCloseSummary{
|
closeSummary := &channeldb.ChannelCloseSummary{
|
||||||
ChanPoint: chanSnapshot.ChannelPoint,
|
ChanPoint: chanSnapshot.ChannelPoint,
|
||||||
@ -587,14 +589,12 @@ func (c *chainWatcher) dispatchLocalForceClose(
|
|||||||
htlcValue := btcutil.Amount(htlc.SweepSignDesc.Output.Value)
|
htlcValue := btcutil.Amount(htlc.SweepSignDesc.Output.Value)
|
||||||
closeSummary.TimeLockedBalance += htlcValue
|
closeSummary.TimeLockedBalance += htlcValue
|
||||||
}
|
}
|
||||||
err = c.cfg.chanState.CloseChannel(closeSummary)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("unable to delete channel state: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// With the event processed, we'll now notify all subscribers of the
|
// With the event processed, we'll now notify all subscribers of the
|
||||||
// event.
|
// event.
|
||||||
closeInfo := &LocalUnilateralCloseInfo{commitSpend, forceClose}
|
closeInfo := &LocalUnilateralCloseInfo{
|
||||||
|
commitSpend, forceClose, closeSummary,
|
||||||
|
}
|
||||||
c.Lock()
|
c.Lock()
|
||||||
for _, sub := range c.clientSubscriptions {
|
for _, sub := range c.clientSubscriptions {
|
||||||
select {
|
select {
|
||||||
@ -641,22 +641,10 @@ func (c *chainWatcher) dispatchRemoteForceClose(
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// As we've detected that the channel has been closed, immediately
|
|
||||||
// delete the state from disk, creating a close summary for future
|
|
||||||
// usage by related sub-systems.
|
|
||||||
err = c.cfg.chanState.CloseChannel(&uniClose.ChannelCloseSummary)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("unable to delete channel state: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// With the event processed, we'll now notify all subscribers of the
|
// With the event processed, we'll now notify all subscribers of the
|
||||||
// event.
|
// event.
|
||||||
c.Lock()
|
c.Lock()
|
||||||
for _, sub := range c.clientSubscriptions {
|
for _, sub := range c.clientSubscriptions {
|
||||||
// TODO(roasbeef): send msg before writing to disk
|
|
||||||
// * need to ensure proper fault tolerance in all cases
|
|
||||||
// * get ACK from the consumer of the ntfn before writing to disk?
|
|
||||||
// * no harm in repeated ntfns: at least once semantics
|
|
||||||
select {
|
select {
|
||||||
case sub.RemoteUnilateralClosure <- uniClose:
|
case sub.RemoteUnilateralClosure <- uniClose:
|
||||||
case <-c.quit:
|
case <-c.quit:
|
||||||
@ -743,6 +731,7 @@ func (c *chainWatcher) dispatchContractBreach(spendEvent *chainntnfs.SpendDetail
|
|||||||
// channel as pending force closed.
|
// channel as pending force closed.
|
||||||
//
|
//
|
||||||
// TODO(roasbeef): instead mark we got all the monies?
|
// TODO(roasbeef): instead mark we got all the monies?
|
||||||
|
// TODO(halseth): move responsibility to breach arbiter?
|
||||||
settledBalance := remoteCommit.LocalBalance.ToSatoshis()
|
settledBalance := remoteCommit.LocalBalance.ToSatoshis()
|
||||||
closeSummary := channeldb.ChannelCloseSummary{
|
closeSummary := channeldb.ChannelCloseSummary{
|
||||||
ChanPoint: c.cfg.chanState.FundingOutpoint,
|
ChanPoint: c.cfg.chanState.FundingOutpoint,
|
||||||
|
@ -1,7 +1,6 @@
|
|||||||
package contractcourt
|
package contractcourt
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
@ -95,6 +94,25 @@ type ChannelArbitratorConfig struct {
|
|||||||
// being broadcast, and we are waiting for the commitment to confirm.
|
// being broadcast, and we are waiting for the commitment to confirm.
|
||||||
MarkCommitmentBroadcasted func() error
|
MarkCommitmentBroadcasted func() error
|
||||||
|
|
||||||
|
// MarkChannelClosed marks the channel closed in the database, with the
|
||||||
|
// passed close summary. After this method successfully returns we can
|
||||||
|
// no longer expect to receive chain events for this channel, and must
|
||||||
|
// be able to recover from a failure without getting the close event
|
||||||
|
// again.
|
||||||
|
MarkChannelClosed func(*channeldb.ChannelCloseSummary) error
|
||||||
|
|
||||||
|
// IsPendingClose is a boolean indicating whether the channel is marked
|
||||||
|
// as pending close in the database.
|
||||||
|
IsPendingClose bool
|
||||||
|
|
||||||
|
// ClosingHeight is the height at which the channel was closed. Note
|
||||||
|
// that this value is only valid if IsPendingClose is true.
|
||||||
|
ClosingHeight uint32
|
||||||
|
|
||||||
|
// CloseType is the type of the close event in case IsPendingClose is
|
||||||
|
// true. Otherwise this value is unset.
|
||||||
|
CloseType channeldb.ClosureType
|
||||||
|
|
||||||
// MarkChannelResolved is a function closure that serves to mark a
|
// MarkChannelResolved is a function closure that serves to mark a
|
||||||
// channel as "fully resolved". A channel itself can be considered
|
// channel as "fully resolved". A channel itself can be considered
|
||||||
// fully resolved once all active contracts have individually been
|
// fully resolved once all active contracts have individually been
|
||||||
@ -244,16 +262,57 @@ func (c *ChannelArbitrator) Start() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If the channel has been marked pending close in the database, and we
|
||||||
|
// haven't transitioned the state machine to StateContractClosed (or a
|
||||||
|
// suceeding state), then a state transition most likely failed. We'll
|
||||||
|
// try to recover from this by manually advancing the state by setting
|
||||||
|
// the corresponding close trigger.
|
||||||
|
trigger := chainTrigger
|
||||||
|
triggerHeight := uint32(bestHeight)
|
||||||
|
if c.cfg.IsPendingClose {
|
||||||
|
switch c.state {
|
||||||
|
case StateDefault:
|
||||||
|
fallthrough
|
||||||
|
case StateBroadcastCommit:
|
||||||
|
fallthrough
|
||||||
|
case StateCommitmentBroadcasted:
|
||||||
|
switch c.cfg.CloseType {
|
||||||
|
case channeldb.LocalForceClose:
|
||||||
|
trigger = localCloseTrigger
|
||||||
|
case channeldb.RemoteForceClose:
|
||||||
|
trigger = remoteCloseTrigger
|
||||||
|
}
|
||||||
|
triggerHeight = c.cfg.ClosingHeight
|
||||||
|
|
||||||
|
log.Warnf("ChannelArbitrator(%v): detected stalled "+
|
||||||
|
"state=%v for closed channel, using "+
|
||||||
|
"trigger=%v", c.cfg.ChanPoint, c.state, trigger)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// We'll now attempt to advance our state forward based on the current
|
// We'll now attempt to advance our state forward based on the current
|
||||||
// on-chain state, and our set of active contracts.
|
// on-chain state, and our set of active contracts.
|
||||||
startingState := c.state
|
startingState := c.state
|
||||||
nextState, _, err := c.advanceState(
|
nextState, _, err := c.advanceState(triggerHeight, trigger)
|
||||||
uint32(bestHeight), chainTrigger, nil,
|
|
||||||
)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
switch err {
|
||||||
|
|
||||||
|
// If we detect that we tried to fetch resolutions, but failed,
|
||||||
|
// this channel was marked closed in the database before
|
||||||
|
// resolutions successfully written. In this case there is not
|
||||||
|
// much we can do, so we don't return the error.
|
||||||
|
case errScopeBucketNoExist:
|
||||||
|
fallthrough
|
||||||
|
case errNoResolutions:
|
||||||
|
log.Warnf("ChannelArbitrator(%v): detected closed"+
|
||||||
|
"channel with no contract resolutions written.",
|
||||||
|
c.cfg.ChanPoint)
|
||||||
|
|
||||||
|
default:
|
||||||
c.cfg.BlockEpochs.Cancel()
|
c.cfg.BlockEpochs.Cancel()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// If we start and ended at the awaiting full resolution state, then
|
// If we start and ended at the awaiting full resolution state, then
|
||||||
// we'll relaunch our set of unresolved contracts.
|
// we'll relaunch our set of unresolved contracts.
|
||||||
@ -332,6 +391,10 @@ const (
|
|||||||
// localCloseTrigger is a transition trigger driven by our commitment
|
// localCloseTrigger is a transition trigger driven by our commitment
|
||||||
// being confirmed.
|
// being confirmed.
|
||||||
localCloseTrigger
|
localCloseTrigger
|
||||||
|
|
||||||
|
// coopCloseTrigger is a transition trigger driven by a cooperative
|
||||||
|
// close transaction being confirmed.
|
||||||
|
coopCloseTrigger
|
||||||
)
|
)
|
||||||
|
|
||||||
// String returns a human readable string describing the passed
|
// String returns a human readable string describing the passed
|
||||||
@ -350,6 +413,9 @@ func (t transitionTrigger) String() string {
|
|||||||
case localCloseTrigger:
|
case localCloseTrigger:
|
||||||
return "localCloseTrigger"
|
return "localCloseTrigger"
|
||||||
|
|
||||||
|
case coopCloseTrigger:
|
||||||
|
return "coopCloseTrigger"
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return "unknown trigger"
|
return "unknown trigger"
|
||||||
}
|
}
|
||||||
@ -415,10 +481,16 @@ func (c *ChannelArbitrator) stateStep(triggerHeight uint32,
|
|||||||
case userTrigger:
|
case userTrigger:
|
||||||
nextState = StateBroadcastCommit
|
nextState = StateBroadcastCommit
|
||||||
|
|
||||||
|
// If the trigger is a cooperative close being confirmed, then
|
||||||
|
// we can go straight to StateFullyResolved, as there won't be
|
||||||
|
// any contracts to resolve.
|
||||||
|
case coopCloseTrigger:
|
||||||
|
nextState = StateFullyResolved
|
||||||
|
|
||||||
// Otherwise, if this state advance was triggered by a
|
// Otherwise, if this state advance was triggered by a
|
||||||
// commitment being confirmed on chain, then we'll jump
|
// commitment being confirmed on chain, then we'll jump
|
||||||
// straight to the state where the contract has already been
|
// straight to the state where the contract has already been
|
||||||
// closed.
|
// closed, and we will inspect the set of unresolved contracts.
|
||||||
case localCloseTrigger:
|
case localCloseTrigger:
|
||||||
log.Errorf("ChannelArbitrator(%v): unexpected local "+
|
log.Errorf("ChannelArbitrator(%v): unexpected local "+
|
||||||
"commitment confirmed while in StateDefault",
|
"commitment confirmed while in StateDefault",
|
||||||
@ -613,17 +685,16 @@ func (c *ChannelArbitrator) stateStep(triggerHeight uint32,
|
|||||||
|
|
||||||
log.Infof("ChannelPoint(%v) has been fully resolved "+
|
log.Infof("ChannelPoint(%v) has been fully resolved "+
|
||||||
"on-chain at height=%v", c.cfg.ChanPoint, triggerHeight)
|
"on-chain at height=%v", c.cfg.ChanPoint, triggerHeight)
|
||||||
return nextState, closeTx, c.cfg.MarkChannelResolved()
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := c.log.CommitState(nextState); err != nil {
|
if err := c.cfg.MarkChannelResolved(); err != nil {
|
||||||
return StateError, nil, err
|
log.Errorf("unable to mark channel resolved: %v", err)
|
||||||
|
return StateError, closeTx, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Tracef("ChannelArbitrator(%v): next_state=%v", c.cfg.ChanPoint,
|
log.Tracef("ChannelArbitrator(%v): next_state=%v", c.cfg.ChanPoint,
|
||||||
nextState)
|
nextState)
|
||||||
|
|
||||||
c.state = nextState
|
|
||||||
return nextState, closeTx, nil
|
return nextState, closeTx, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -634,8 +705,7 @@ func (c *ChannelArbitrator) stateStep(triggerHeight uint32,
|
|||||||
// param is a callback that allows the caller to execute an arbitrary action
|
// param is a callback that allows the caller to execute an arbitrary action
|
||||||
// after each state transition.
|
// after each state transition.
|
||||||
func (c *ChannelArbitrator) advanceState(triggerHeight uint32,
|
func (c *ChannelArbitrator) advanceState(triggerHeight uint32,
|
||||||
trigger transitionTrigger, stateCallback func(ArbitratorState) error) (
|
trigger transitionTrigger) (ArbitratorState, *wire.MsgTx, error) {
|
||||||
ArbitratorState, *wire.MsgTx, error) {
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
priorState ArbitratorState
|
priorState ArbitratorState
|
||||||
@ -654,7 +724,8 @@ func (c *ChannelArbitrator) advanceState(triggerHeight uint32,
|
|||||||
triggerHeight, trigger,
|
triggerHeight, trigger,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("unable to advance state: %v", err)
|
log.Errorf("ChannelArbitrator(%v): unable to advance "+
|
||||||
|
"state: %v", c.cfg.ChanPoint, err)
|
||||||
return priorState, nil, err
|
return priorState, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -662,23 +733,25 @@ func (c *ChannelArbitrator) advanceState(triggerHeight uint32,
|
|||||||
forceCloseTx = closeTx
|
forceCloseTx = closeTx
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we have a state callback, then we'll attempt to execute
|
|
||||||
// it. If the callback doesn't execute successfully, then we'll
|
|
||||||
// exit early.
|
|
||||||
if stateCallback != nil {
|
|
||||||
if err := stateCallback(nextState); err != nil {
|
|
||||||
return nextState, closeTx, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Our termination transition is a noop transition. If we get
|
// Our termination transition is a noop transition. If we get
|
||||||
// our prior state back as the next state, then we'll
|
// our prior state back as the next state, then we'll
|
||||||
// terminate.
|
// terminate.
|
||||||
if nextState == priorState {
|
if nextState == priorState {
|
||||||
log.Tracef("ChannelArbitrator(%v): terminating at state=%v",
|
log.Tracef("ChannelArbitrator(%v): terminating at "+
|
||||||
c.cfg.ChanPoint, nextState)
|
"state=%v", c.cfg.ChanPoint, nextState)
|
||||||
return nextState, forceCloseTx, nil
|
return nextState, forceCloseTx, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// As the prior state was successfully executed, we can now
|
||||||
|
// commit the next state. This ensures that we will re-execute
|
||||||
|
// the prior state if anything fails.
|
||||||
|
if err := c.log.CommitState(nextState); err != nil {
|
||||||
|
log.Errorf("ChannelArbitrator(%v): unable to commit "+
|
||||||
|
"next state(%v): %v", c.cfg.ChanPoint,
|
||||||
|
nextState, err)
|
||||||
|
return priorState, nil, err
|
||||||
|
}
|
||||||
|
c.state = nextState
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1318,7 +1391,7 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32) {
|
|||||||
// Now that a new block has arrived, we'll attempt to
|
// Now that a new block has arrived, we'll attempt to
|
||||||
// advance our state forward.
|
// advance our state forward.
|
||||||
nextState, _, err := c.advanceState(
|
nextState, _, err := c.advanceState(
|
||||||
uint32(bestHeight), chainTrigger, nil,
|
uint32(bestHeight), chainTrigger,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("unable to advance state: %v", err)
|
log.Errorf("unable to advance state: %v", err)
|
||||||
@ -1365,16 +1438,28 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32) {
|
|||||||
|
|
||||||
// We've cooperatively closed the channel, so we're no longer
|
// We've cooperatively closed the channel, so we're no longer
|
||||||
// needed. We'll mark the channel as resolved and exit.
|
// needed. We'll mark the channel as resolved and exit.
|
||||||
case <-c.cfg.ChainEvents.CooperativeClosure:
|
case closeInfo := <-c.cfg.ChainEvents.CooperativeClosure:
|
||||||
log.Infof("ChannelArbitrator(%v) closing due to co-op "+
|
log.Infof("ChannelArbitrator(%v) marking channel "+
|
||||||
"closure", c.cfg.ChanPoint)
|
"cooperatively closed", c.cfg.ChanPoint)
|
||||||
|
|
||||||
if err := c.cfg.MarkChannelResolved(); err != nil {
|
err := c.cfg.MarkChannelClosed(
|
||||||
log.Errorf("Unable to mark contract "+
|
closeInfo.ChannelCloseSummary,
|
||||||
"resolved: %v", err)
|
)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("unable to mark channel closed: "+
|
||||||
|
"%v", err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We'll now advance our state machine until it reaches
|
||||||
|
// a terminal state, and the channel is marked resolved.
|
||||||
|
_, _, err = c.advanceState(
|
||||||
|
closeInfo.CloseHeight, coopCloseTrigger,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("unable to advance state: %v", err)
|
||||||
return
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// We have broadcasted our commitment, and it is now confirmed
|
// We have broadcasted our commitment, and it is now confirmed
|
||||||
// on-chain.
|
// on-chain.
|
||||||
@ -1396,31 +1481,40 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// When processing a unilateral close event, we'll
|
// When processing a unilateral close event, we'll
|
||||||
// transition directly to the ContractClosed state.
|
// transition to the ContractClosed state. We'll log
|
||||||
// When the state machine reaches that state, we'll log
|
// out the set of resolutions such that they are
|
||||||
// out the set of resolutions.
|
// available to fetch in that state.
|
||||||
stateCb := func(nextState ArbitratorState) error {
|
err := c.log.LogContractResolutions(contractRes)
|
||||||
if nextState != StateContractClosed {
|
if err != nil {
|
||||||
return nil
|
log.Errorf("unable to write resolutions: %v",
|
||||||
|
err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err := c.log.LogContractResolutions(
|
// After the set of resolutions are successfully
|
||||||
contractRes,
|
// logged, we can safely close the channel. After this
|
||||||
|
// succeeds we won't be getting chain events anymore,
|
||||||
|
// so we must make sure we can recover on restart after
|
||||||
|
// it is marked closed. If the next state transation
|
||||||
|
// fails, we'll start up in the prior state again, and
|
||||||
|
// we won't be longer getting chain events. In this
|
||||||
|
// case we must manually re-trigger the state
|
||||||
|
// transition into StateContractClosed based on the
|
||||||
|
// close status of the channel.
|
||||||
|
err = c.cfg.MarkChannelClosed(
|
||||||
|
closeInfo.ChannelCloseSummary,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to "+
|
log.Errorf("unable to mark "+
|
||||||
"write resolutions: %v",
|
"channel closed: %v", err)
|
||||||
err)
|
return
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// We'll now advance our state machine until it reaches
|
// We'll now advance our state machine until it reaches
|
||||||
// a terminal state.
|
// a terminal state.
|
||||||
_, _, err := c.advanceState(
|
_, _, err = c.advanceState(
|
||||||
uint32(closeInfo.SpendingHeight),
|
uint32(closeInfo.SpendingHeight),
|
||||||
localCloseTrigger, stateCb,
|
localCloseTrigger,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("unable to advance state: %v", err)
|
log.Errorf("unable to advance state: %v", err)
|
||||||
@ -1443,9 +1537,6 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32) {
|
|||||||
HtlcResolutions: *uniClosure.HtlcResolutions,
|
HtlcResolutions: *uniClosure.HtlcResolutions,
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(roasbeef): modify signal to also detect
|
|
||||||
// cooperative closures?
|
|
||||||
|
|
||||||
// As we're now acting upon an event triggered by the
|
// As we're now acting upon an event triggered by the
|
||||||
// broadcast of the remote commitment transaction,
|
// broadcast of the remote commitment transaction,
|
||||||
// we'll swap out our active HTLC set with the set
|
// we'll swap out our active HTLC set with the set
|
||||||
@ -1453,30 +1544,39 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32) {
|
|||||||
c.activeHTLCs = newHtlcSet(uniClosure.RemoteCommit.Htlcs)
|
c.activeHTLCs = newHtlcSet(uniClosure.RemoteCommit.Htlcs)
|
||||||
|
|
||||||
// When processing a unilateral close event, we'll
|
// When processing a unilateral close event, we'll
|
||||||
// transition directly to the ContractClosed state.
|
// transition to the ContractClosed state. We'll log
|
||||||
// When the state machine reaches that state, we'll log
|
// out the set of resolutions such that they are
|
||||||
// out the set of resolutions.
|
// available to fetch in that state.
|
||||||
stateCb := func(nextState ArbitratorState) error {
|
err := c.log.LogContractResolutions(contractRes)
|
||||||
if nextState != StateContractClosed {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
err := c.log.LogContractResolutions(
|
|
||||||
contractRes,
|
|
||||||
)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to write "+
|
log.Errorf("unable to write resolutions: %v",
|
||||||
"resolutions: %v", err)
|
err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
// After the set of resolutions are successfully
|
||||||
|
// logged, we can safely close the channel. After this
|
||||||
|
// succeeds we won't be getting chain events anymore,
|
||||||
|
// so we must make sure we can recover on restart after
|
||||||
|
// it is marked closed. If the next state transation
|
||||||
|
// fails, we'll start up in the prior state again, and
|
||||||
|
// we won't be longer getting chain events. In this
|
||||||
|
// case we must manually re-trigger the state
|
||||||
|
// transition into StateContractClosed based on the
|
||||||
|
// close status of the channel.
|
||||||
|
closeSummary := &uniClosure.ChannelCloseSummary
|
||||||
|
err = c.cfg.MarkChannelClosed(closeSummary)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("unable to mark channel closed: %v",
|
||||||
|
err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// We'll now advance our state machine until it reaches
|
// We'll now advance our state machine until it reaches
|
||||||
// a terminal state.
|
// a terminal state.
|
||||||
_, _, err := c.advanceState(
|
_, _, err = c.advanceState(
|
||||||
uint32(uniClosure.SpendingHeight),
|
uint32(uniClosure.SpendingHeight),
|
||||||
remoteCloseTrigger, stateCb,
|
remoteCloseTrigger,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("unable to advance state: %v", err)
|
log.Errorf("unable to advance state: %v", err)
|
||||||
@ -1521,7 +1621,7 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
nextState, closeTx, err := c.advanceState(
|
nextState, closeTx, err := c.advanceState(
|
||||||
uint32(bestHeight), userTrigger, nil,
|
uint32(bestHeight), userTrigger,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("unable to advance state: %v", err)
|
log.Errorf("unable to advance state: %v", err)
|
||||||
|
@ -8,10 +8,84 @@ import (
|
|||||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||||
"github.com/btcsuite/btcd/wire"
|
"github.com/btcsuite/btcd/wire"
|
||||||
"github.com/lightningnetwork/lnd/chainntnfs"
|
"github.com/lightningnetwork/lnd/chainntnfs"
|
||||||
|
"github.com/lightningnetwork/lnd/channeldb"
|
||||||
"github.com/lightningnetwork/lnd/lnwallet"
|
"github.com/lightningnetwork/lnd/lnwallet"
|
||||||
"github.com/lightningnetwork/lnd/lnwire"
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type mockArbitratorLog struct {
|
||||||
|
state ArbitratorState
|
||||||
|
newStates chan ArbitratorState
|
||||||
|
failLog bool
|
||||||
|
failFetch error
|
||||||
|
failCommit bool
|
||||||
|
failCommitState ArbitratorState
|
||||||
|
}
|
||||||
|
|
||||||
|
// A compile time check to ensure mockArbitratorLog meets the ArbitratorLog
|
||||||
|
// interface.
|
||||||
|
var _ ArbitratorLog = (*mockArbitratorLog)(nil)
|
||||||
|
|
||||||
|
func (b *mockArbitratorLog) CurrentState() (ArbitratorState, error) {
|
||||||
|
return b.state, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *mockArbitratorLog) CommitState(s ArbitratorState) error {
|
||||||
|
if b.failCommit && s == b.failCommitState {
|
||||||
|
return fmt.Errorf("intentional commit error at state %v",
|
||||||
|
b.failCommitState)
|
||||||
|
}
|
||||||
|
b.state = s
|
||||||
|
b.newStates <- s
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *mockArbitratorLog) FetchUnresolvedContracts() ([]ContractResolver, error) {
|
||||||
|
var contracts []ContractResolver
|
||||||
|
return contracts, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *mockArbitratorLog) InsertUnresolvedContracts(resolvers ...ContractResolver) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *mockArbitratorLog) SwapContract(oldContract, newContract ContractResolver) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *mockArbitratorLog) ResolveContract(res ContractResolver) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *mockArbitratorLog) LogContractResolutions(c *ContractResolutions) error {
|
||||||
|
if b.failLog {
|
||||||
|
return fmt.Errorf("intentional log failure")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *mockArbitratorLog) FetchContractResolutions() (*ContractResolutions, error) {
|
||||||
|
if b.failFetch != nil {
|
||||||
|
return nil, b.failFetch
|
||||||
|
}
|
||||||
|
c := &ContractResolutions{}
|
||||||
|
|
||||||
|
return c, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *mockArbitratorLog) LogChainActions(actions ChainActionMap) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *mockArbitratorLog) FetchChainActions() (ChainActionMap, error) {
|
||||||
|
actionsMap := make(ChainActionMap)
|
||||||
|
return actionsMap, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *mockArbitratorLog) WipeHistory() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
type mockChainIO struct{}
|
type mockChainIO struct{}
|
||||||
|
|
||||||
func (*mockChainIO) GetBestBlock() (*chainhash.Hash, int32, error) {
|
func (*mockChainIO) GetBestBlock() (*chainhash.Hash, int32, error) {
|
||||||
@ -31,7 +105,8 @@ func (*mockChainIO) GetBlock(blockHash *chainhash.Hash) (*wire.MsgBlock, error)
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func createTestChannelArbitrator() (*ChannelArbitrator, chan struct{}, func(), error) {
|
func createTestChannelArbitrator(log ArbitratorLog) (*ChannelArbitrator,
|
||||||
|
chan struct{}, error) {
|
||||||
blockEpoch := &chainntnfs.BlockEpochEvent{
|
blockEpoch := &chainntnfs.BlockEpochEvent{
|
||||||
Cancel: func() {},
|
Cancel: func() {},
|
||||||
}
|
}
|
||||||
@ -41,7 +116,7 @@ func createTestChannelArbitrator() (*ChannelArbitrator, chan struct{}, func(), e
|
|||||||
chanEvents := &ChainEventSubscription{
|
chanEvents := &ChainEventSubscription{
|
||||||
RemoteUnilateralClosure: make(chan *lnwallet.UnilateralCloseSummary, 1),
|
RemoteUnilateralClosure: make(chan *lnwallet.UnilateralCloseSummary, 1),
|
||||||
LocalUnilateralClosure: make(chan *LocalUnilateralCloseInfo, 1),
|
LocalUnilateralClosure: make(chan *LocalUnilateralCloseInfo, 1),
|
||||||
CooperativeClosure: make(chan struct{}, 1),
|
CooperativeClosure: make(chan *CooperativeCloseInfo, 1),
|
||||||
ContractBreach: make(chan *lnwallet.BreachRetribution, 1),
|
ContractBreach: make(chan *lnwallet.BreachRetribution, 1),
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -77,20 +152,15 @@ func createTestChannelArbitrator() (*ChannelArbitrator, chan struct{}, func(), e
|
|||||||
MarkCommitmentBroadcasted: func() error {
|
MarkCommitmentBroadcasted: func() error {
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
|
MarkChannelClosed: func(*channeldb.ChannelCloseSummary) error {
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
IsPendingClose: false,
|
||||||
ChainArbitratorConfig: chainArbCfg,
|
ChainArbitratorConfig: chainArbCfg,
|
||||||
ChainEvents: chanEvents,
|
ChainEvents: chanEvents,
|
||||||
}
|
}
|
||||||
testLog, cleanUp, err := newTestBoltArbLog(
|
|
||||||
testChainHash, testChanPoint1,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, nil, fmt.Errorf("unable to create test log: %v",
|
|
||||||
err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return NewChannelArbitrator(arbCfg, nil, testLog),
|
return NewChannelArbitrator(arbCfg, nil, log), resolvedChan, nil
|
||||||
resolvedChan, cleanUp, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// assertState checks that the ChannelArbitrator is in the state we expect it
|
// assertState checks that the ChannelArbitrator is in the state we expect it
|
||||||
@ -102,13 +172,18 @@ func assertState(t *testing.T, c *ChannelArbitrator, expected ArbitratorState) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TestChannelArbitratorCooperativeClose tests that the ChannelArbitertor
|
// TestChannelArbitratorCooperativeClose tests that the ChannelArbitertor
|
||||||
// correctly does nothing in case a cooperative close is confirmed.
|
// correctly marks the channel resolved in case a cooperative close is
|
||||||
|
// confirmed.
|
||||||
func TestChannelArbitratorCooperativeClose(t *testing.T) {
|
func TestChannelArbitratorCooperativeClose(t *testing.T) {
|
||||||
chanArb, _, cleanUp, err := createTestChannelArbitrator()
|
log := &mockArbitratorLog{
|
||||||
|
state: StateDefault,
|
||||||
|
newStates: make(chan ArbitratorState, 5),
|
||||||
|
}
|
||||||
|
|
||||||
|
chanArb, resolved, err := createTestChannelArbitrator(log)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unable to create ChannelArbitrator: %v", err)
|
t.Fatalf("unable to create ChannelArbitrator: %v", err)
|
||||||
}
|
}
|
||||||
defer cleanUp()
|
|
||||||
|
|
||||||
if err := chanArb.Start(); err != nil {
|
if err := chanArb.Start(); err != nil {
|
||||||
t.Fatalf("unable to start ChannelArbitrator: %v", err)
|
t.Fatalf("unable to start ChannelArbitrator: %v", err)
|
||||||
@ -118,21 +193,70 @@ func TestChannelArbitratorCooperativeClose(t *testing.T) {
|
|||||||
// It should start out in the default state.
|
// It should start out in the default state.
|
||||||
assertState(t, chanArb, StateDefault)
|
assertState(t, chanArb, StateDefault)
|
||||||
|
|
||||||
// Cooperative close should do nothing.
|
// We set up a channel to detect when MarkChannelClosed is called.
|
||||||
// TODO: this will change?
|
closeInfos := make(chan *channeldb.ChannelCloseSummary)
|
||||||
chanArb.cfg.ChainEvents.CooperativeClosure <- struct{}{}
|
chanArb.cfg.MarkChannelClosed = func(
|
||||||
assertState(t, chanArb, StateDefault)
|
closeInfo *channeldb.ChannelCloseSummary) error {
|
||||||
|
closeInfos <- closeInfo
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestChannelArbitratorRemoteForceClose checks that the ChannelArbitrotor goes
|
// Cooperative close should do trigger a MarkChannelClosed +
|
||||||
|
// MarkChannelResolved.
|
||||||
|
closeInfo := &CooperativeCloseInfo{
|
||||||
|
&channeldb.ChannelCloseSummary{},
|
||||||
|
}
|
||||||
|
chanArb.cfg.ChainEvents.CooperativeClosure <- closeInfo
|
||||||
|
|
||||||
|
select {
|
||||||
|
case c := <-closeInfos:
|
||||||
|
if c.CloseType != channeldb.CooperativeClose {
|
||||||
|
t.Fatalf("expected cooperative close, got %v", c.CloseType)
|
||||||
|
}
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatalf("timeout waiting for channel close")
|
||||||
|
}
|
||||||
|
|
||||||
|
// It should mark the channel as resolved.
|
||||||
|
select {
|
||||||
|
case <-resolved:
|
||||||
|
// Expected.
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatalf("contract was not resolved")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func assertStateTransitions(t *testing.T, newStates <-chan ArbitratorState,
|
||||||
|
expectedStates ...ArbitratorState) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
for _, exp := range expectedStates {
|
||||||
|
var state ArbitratorState
|
||||||
|
select {
|
||||||
|
case state = <-newStates:
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatalf("new state not received")
|
||||||
|
}
|
||||||
|
|
||||||
|
if state != exp {
|
||||||
|
t.Fatalf("expected new state %v, got %v", exp, state)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestChannelArbitratorRemoteForceClose checks that the ChannelArbitrator goes
|
||||||
// through the expected states if a remote force close is observed in the
|
// through the expected states if a remote force close is observed in the
|
||||||
// chain.
|
// chain.
|
||||||
func TestChannelArbitratorRemoteForceClose(t *testing.T) {
|
func TestChannelArbitratorRemoteForceClose(t *testing.T) {
|
||||||
chanArb, resolved, cleanUp, err := createTestChannelArbitrator()
|
log := &mockArbitratorLog{
|
||||||
|
state: StateDefault,
|
||||||
|
newStates: make(chan ArbitratorState, 5),
|
||||||
|
}
|
||||||
|
|
||||||
|
chanArb, resolved, err := createTestChannelArbitrator(log)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unable to create ChannelArbitrator: %v", err)
|
t.Fatalf("unable to create ChannelArbitrator: %v", err)
|
||||||
}
|
}
|
||||||
defer cleanUp()
|
|
||||||
|
|
||||||
if err := chanArb.Start(); err != nil {
|
if err := chanArb.Start(); err != nil {
|
||||||
t.Fatalf("unable to start ChannelArbitrator: %v", err)
|
t.Fatalf("unable to start ChannelArbitrator: %v", err)
|
||||||
@ -153,28 +277,34 @@ func TestChannelArbitratorRemoteForceClose(t *testing.T) {
|
|||||||
}
|
}
|
||||||
chanArb.cfg.ChainEvents.RemoteUnilateralClosure <- uniClose
|
chanArb.cfg.ChainEvents.RemoteUnilateralClosure <- uniClose
|
||||||
|
|
||||||
// It should mark the channel as resolved.
|
// It should transition StateDefault -> StateContractClosed ->
|
||||||
|
// StateFullyResolved.
|
||||||
|
assertStateTransitions(
|
||||||
|
t, log.newStates, StateContractClosed, StateFullyResolved,
|
||||||
|
)
|
||||||
|
|
||||||
|
// It should alos mark the channel as resolved.
|
||||||
select {
|
select {
|
||||||
case <-resolved:
|
case <-resolved:
|
||||||
// Expected.
|
// Expected.
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(5 * time.Second):
|
||||||
t.Fatalf("contract was not resolved")
|
t.Fatalf("contract was not resolved")
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: intermediate states.
|
|
||||||
// We expect the ChannelArbitrator to end up in the the resolved state.
|
|
||||||
assertState(t, chanArb, StateFullyResolved)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestChannelArbitratorLocalForceClose tests that the ChannelArbitrator goes
|
// TestChannelArbitratorLocalForceClose tests that the ChannelArbitrator goes
|
||||||
// through the expected states in case we request it to force close the channel,
|
// through the expected states in case we request it to force close the channel,
|
||||||
// and the local force close event is observed in chain.
|
// and the local force close event is observed in chain.
|
||||||
func TestChannelArbitratorLocalForceClose(t *testing.T) {
|
func TestChannelArbitratorLocalForceClose(t *testing.T) {
|
||||||
chanArb, resolved, cleanUp, err := createTestChannelArbitrator()
|
log := &mockArbitratorLog{
|
||||||
|
state: StateDefault,
|
||||||
|
newStates: make(chan ArbitratorState, 5),
|
||||||
|
}
|
||||||
|
|
||||||
|
chanArb, resolved, err := createTestChannelArbitrator(log)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unable to create ChannelArbitrator: %v", err)
|
t.Fatalf("unable to create ChannelArbitrator: %v", err)
|
||||||
}
|
}
|
||||||
defer cleanUp()
|
|
||||||
|
|
||||||
if err := chanArb.Start(); err != nil {
|
if err := chanArb.Start(); err != nil {
|
||||||
t.Fatalf("unable to start ChannelArbitrator: %v", err)
|
t.Fatalf("unable to start ChannelArbitrator: %v", err)
|
||||||
@ -208,6 +338,9 @@ func TestChannelArbitratorLocalForceClose(t *testing.T) {
|
|||||||
closeTx: respChan,
|
closeTx: respChan,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// It should transition to StateBroadcastCommit.
|
||||||
|
assertStateTransitions(t, log.newStates, StateBroadcastCommit)
|
||||||
|
|
||||||
// When it is broadcasting the force close, its state should be
|
// When it is broadcasting the force close, its state should be
|
||||||
// StateBroadcastCommit.
|
// StateBroadcastCommit.
|
||||||
select {
|
select {
|
||||||
@ -219,12 +352,23 @@ func TestChannelArbitratorLocalForceClose(t *testing.T) {
|
|||||||
t.Fatalf("did not get state update")
|
t.Fatalf("did not get state update")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// After broadcasting, transition should be to
|
||||||
|
// StateCommitmentBroadcasted.
|
||||||
|
assertStateTransitions(t, log.newStates, StateCommitmentBroadcasted)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-respChan:
|
case <-respChan:
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatalf("no response received")
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
case err := <-errChan:
|
case err := <-errChan:
|
||||||
|
if err != nil {
|
||||||
t.Fatalf("error force closing channel: %v", err)
|
t.Fatalf("error force closing channel: %v", err)
|
||||||
case <-time.After(15 * time.Second):
|
}
|
||||||
t.Fatalf("did not receive reponse")
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatalf("no response received")
|
||||||
}
|
}
|
||||||
|
|
||||||
// After broadcasting the close tx, it should be in state
|
// After broadcasting the close tx, it should be in state
|
||||||
@ -238,29 +382,35 @@ func TestChannelArbitratorLocalForceClose(t *testing.T) {
|
|||||||
CloseTx: &wire.MsgTx{},
|
CloseTx: &wire.MsgTx{},
|
||||||
HtlcResolutions: &lnwallet.HtlcResolutions{},
|
HtlcResolutions: &lnwallet.HtlcResolutions{},
|
||||||
},
|
},
|
||||||
|
&channeldb.ChannelCloseSummary{},
|
||||||
}
|
}
|
||||||
// It should mark the channel as resolved.
|
|
||||||
|
// It should transition StateContractClosed -> StateFullyResolved.
|
||||||
|
assertStateTransitions(t, log.newStates, StateContractClosed,
|
||||||
|
StateFullyResolved)
|
||||||
|
|
||||||
|
// It should also mark the channel as resolved.
|
||||||
select {
|
select {
|
||||||
case <-resolved:
|
case <-resolved:
|
||||||
// Expected.
|
// Expected.
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(5 * time.Second):
|
||||||
t.Fatalf("contract was not resolved")
|
t.Fatalf("contract was not resolved")
|
||||||
}
|
}
|
||||||
|
|
||||||
// And end up in the StateFullyResolved state.
|
|
||||||
// TODO: intermediate states as well.
|
|
||||||
assertState(t, chanArb, StateFullyResolved)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestChannelArbitratorLocalForceCloseRemoteConfiremd tests that the
|
// TestChannelArbitratorLocalForceCloseRemoteConfiremd tests that the
|
||||||
// ChannelArbitrator behaves as expected in the case where we request a local
|
// ChannelArbitrator behaves as expected in the case where we request a local
|
||||||
// force close, but a remote commitment ends up being confirmed in chain.
|
// force close, but a remote commitment ends up being confirmed in chain.
|
||||||
func TestChannelArbitratorLocalForceCloseRemoteConfirmed(t *testing.T) {
|
func TestChannelArbitratorLocalForceCloseRemoteConfirmed(t *testing.T) {
|
||||||
chanArb, resolved, cleanUp, err := createTestChannelArbitrator()
|
log := &mockArbitratorLog{
|
||||||
|
state: StateDefault,
|
||||||
|
newStates: make(chan ArbitratorState, 5),
|
||||||
|
}
|
||||||
|
|
||||||
|
chanArb, resolved, err := createTestChannelArbitrator(log)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unable to create ChannelArbitrator: %v", err)
|
t.Fatalf("unable to create ChannelArbitrator: %v", err)
|
||||||
}
|
}
|
||||||
defer cleanUp()
|
|
||||||
|
|
||||||
if err := chanArb.Start(); err != nil {
|
if err := chanArb.Start(); err != nil {
|
||||||
t.Fatalf("unable to start ChannelArbitrator: %v", err)
|
t.Fatalf("unable to start ChannelArbitrator: %v", err)
|
||||||
@ -294,6 +444,9 @@ func TestChannelArbitratorLocalForceCloseRemoteConfirmed(t *testing.T) {
|
|||||||
closeTx: respChan,
|
closeTx: respChan,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// It should transition to StateBroadcastCommit.
|
||||||
|
assertStateTransitions(t, log.newStates, StateBroadcastCommit)
|
||||||
|
|
||||||
// We expect it to be in state StateBroadcastCommit when publishing
|
// We expect it to be in state StateBroadcastCommit when publishing
|
||||||
// the force close.
|
// the force close.
|
||||||
select {
|
select {
|
||||||
@ -305,12 +458,23 @@ func TestChannelArbitratorLocalForceCloseRemoteConfirmed(t *testing.T) {
|
|||||||
t.Fatalf("no state update received")
|
t.Fatalf("no state update received")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// After broadcasting, transition should be to
|
||||||
|
// StateCommitmentBroadcasted.
|
||||||
|
assertStateTransitions(t, log.newStates, StateCommitmentBroadcasted)
|
||||||
|
|
||||||
// Wait for a response to the force close.
|
// Wait for a response to the force close.
|
||||||
select {
|
select {
|
||||||
case <-respChan:
|
case <-respChan:
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatalf("no response received")
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
case err := <-errChan:
|
case err := <-errChan:
|
||||||
|
if err != nil {
|
||||||
t.Fatalf("error force closing channel: %v", err)
|
t.Fatalf("error force closing channel: %v", err)
|
||||||
case <-time.After(15 * time.Second):
|
}
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
t.Fatalf("no response received")
|
t.Fatalf("no response received")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -327,6 +491,10 @@ func TestChannelArbitratorLocalForceCloseRemoteConfirmed(t *testing.T) {
|
|||||||
}
|
}
|
||||||
chanArb.cfg.ChainEvents.RemoteUnilateralClosure <- uniClose
|
chanArb.cfg.ChainEvents.RemoteUnilateralClosure <- uniClose
|
||||||
|
|
||||||
|
// It should transition StateContractClosed -> StateFullyResolved.
|
||||||
|
assertStateTransitions(t, log.newStates, StateContractClosed,
|
||||||
|
StateFullyResolved)
|
||||||
|
|
||||||
// It should resolve.
|
// It should resolve.
|
||||||
select {
|
select {
|
||||||
case <-resolved:
|
case <-resolved:
|
||||||
@ -334,10 +502,6 @@ func TestChannelArbitratorLocalForceCloseRemoteConfirmed(t *testing.T) {
|
|||||||
case <-time.After(15 * time.Second):
|
case <-time.After(15 * time.Second):
|
||||||
t.Fatalf("contract was not resolved")
|
t.Fatalf("contract was not resolved")
|
||||||
}
|
}
|
||||||
|
|
||||||
// And we expect it to end up in StateFullyResolved.
|
|
||||||
// TODO: intermediate states as well.
|
|
||||||
assertState(t, chanArb, StateFullyResolved)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestChannelArbitratorLocalForceCloseDoubleSpend tests that the
|
// TestChannelArbitratorLocalForceCloseDoubleSpend tests that the
|
||||||
@ -345,11 +509,15 @@ func TestChannelArbitratorLocalForceCloseRemoteConfirmed(t *testing.T) {
|
|||||||
// force close, but we fail broadcasting our commitment because a remote
|
// force close, but we fail broadcasting our commitment because a remote
|
||||||
// commitment has already been published.
|
// commitment has already been published.
|
||||||
func TestChannelArbitratorLocalForceDoubleSpend(t *testing.T) {
|
func TestChannelArbitratorLocalForceDoubleSpend(t *testing.T) {
|
||||||
chanArb, resolved, cleanUp, err := createTestChannelArbitrator()
|
log := &mockArbitratorLog{
|
||||||
|
state: StateDefault,
|
||||||
|
newStates: make(chan ArbitratorState, 5),
|
||||||
|
}
|
||||||
|
|
||||||
|
chanArb, resolved, err := createTestChannelArbitrator(log)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unable to create ChannelArbitrator: %v", err)
|
t.Fatalf("unable to create ChannelArbitrator: %v", err)
|
||||||
}
|
}
|
||||||
defer cleanUp()
|
|
||||||
|
|
||||||
if err := chanArb.Start(); err != nil {
|
if err := chanArb.Start(); err != nil {
|
||||||
t.Fatalf("unable to start ChannelArbitrator: %v", err)
|
t.Fatalf("unable to start ChannelArbitrator: %v", err)
|
||||||
@ -382,6 +550,9 @@ func TestChannelArbitratorLocalForceDoubleSpend(t *testing.T) {
|
|||||||
closeTx: respChan,
|
closeTx: respChan,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// It should transition to StateBroadcastCommit.
|
||||||
|
assertStateTransitions(t, log.newStates, StateBroadcastCommit)
|
||||||
|
|
||||||
// We expect it to be in state StateBroadcastCommit when publishing
|
// We expect it to be in state StateBroadcastCommit when publishing
|
||||||
// the force close.
|
// the force close.
|
||||||
select {
|
select {
|
||||||
@ -393,12 +564,23 @@ func TestChannelArbitratorLocalForceDoubleSpend(t *testing.T) {
|
|||||||
t.Fatalf("no state update received")
|
t.Fatalf("no state update received")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// After broadcasting, transition should be to
|
||||||
|
// StateCommitmentBroadcasted.
|
||||||
|
assertStateTransitions(t, log.newStates, StateCommitmentBroadcasted)
|
||||||
|
|
||||||
// Wait for a response to the force close.
|
// Wait for a response to the force close.
|
||||||
select {
|
select {
|
||||||
case <-respChan:
|
case <-respChan:
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatalf("no response received")
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
case err := <-errChan:
|
case err := <-errChan:
|
||||||
|
if err != nil {
|
||||||
t.Fatalf("error force closing channel: %v", err)
|
t.Fatalf("error force closing channel: %v", err)
|
||||||
case <-time.After(15 * time.Second):
|
}
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
t.Fatalf("no response received")
|
t.Fatalf("no response received")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -415,6 +597,10 @@ func TestChannelArbitratorLocalForceDoubleSpend(t *testing.T) {
|
|||||||
}
|
}
|
||||||
chanArb.cfg.ChainEvents.RemoteUnilateralClosure <- uniClose
|
chanArb.cfg.ChainEvents.RemoteUnilateralClosure <- uniClose
|
||||||
|
|
||||||
|
// It should transition StateContractClosed -> StateFullyResolved.
|
||||||
|
assertStateTransitions(t, log.newStates, StateContractClosed,
|
||||||
|
StateFullyResolved)
|
||||||
|
|
||||||
// It should resolve.
|
// It should resolve.
|
||||||
select {
|
select {
|
||||||
case <-resolved:
|
case <-resolved:
|
||||||
@ -422,8 +608,262 @@ func TestChannelArbitratorLocalForceDoubleSpend(t *testing.T) {
|
|||||||
case <-time.After(15 * time.Second):
|
case <-time.After(15 * time.Second):
|
||||||
t.Fatalf("contract was not resolved")
|
t.Fatalf("contract was not resolved")
|
||||||
}
|
}
|
||||||
|
}
|
||||||
// And we expect it to end up in StateFullyResolved.
|
|
||||||
// TODO: intermediate states as well.
|
// TestChannelArbitratorPersistence tests that the ChannelArbitrator is able to
|
||||||
assertState(t, chanArb, StateFullyResolved)
|
// keep advancing the state machine from various states after restart.
|
||||||
|
func TestChannelArbitratorPersistence(t *testing.T) {
|
||||||
|
// Start out with a log that will fail writing the set of resolutions.
|
||||||
|
log := &mockArbitratorLog{
|
||||||
|
state: StateDefault,
|
||||||
|
newStates: make(chan ArbitratorState, 5),
|
||||||
|
failLog: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
chanArb, resolved, err := createTestChannelArbitrator(log)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to create ChannelArbitrator: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := chanArb.Start(); err != nil {
|
||||||
|
t.Fatalf("unable to start ChannelArbitrator: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// It should start in StateDefault.
|
||||||
|
assertState(t, chanArb, StateDefault)
|
||||||
|
|
||||||
|
// Send a remote force close event.
|
||||||
|
commitSpend := &chainntnfs.SpendDetail{
|
||||||
|
SpenderTxHash: &chainhash.Hash{},
|
||||||
|
}
|
||||||
|
|
||||||
|
uniClose := &lnwallet.UnilateralCloseSummary{
|
||||||
|
SpendDetail: commitSpend,
|
||||||
|
HtlcResolutions: &lnwallet.HtlcResolutions{},
|
||||||
|
}
|
||||||
|
chanArb.cfg.ChainEvents.RemoteUnilateralClosure <- uniClose
|
||||||
|
|
||||||
|
// Since writing the resolutions fail, the arbitrator should not
|
||||||
|
// advance to the next state.
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
if log.state != StateDefault {
|
||||||
|
t.Fatalf("expected to stay in StateDefault")
|
||||||
|
}
|
||||||
|
chanArb.Stop()
|
||||||
|
|
||||||
|
// Create a new arbitrator with the same log.
|
||||||
|
chanArb, resolved, err = createTestChannelArbitrator(log)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to create ChannelArbitrator: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := chanArb.Start(); err != nil {
|
||||||
|
t.Fatalf("unable to start ChannelArbitrator: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Again, it should start up in the default state.
|
||||||
|
assertState(t, chanArb, StateDefault)
|
||||||
|
|
||||||
|
// Now we make the log succeed writing the resolutions, but fail when
|
||||||
|
// attempting to close the channel.
|
||||||
|
log.failLog = false
|
||||||
|
chanArb.cfg.MarkChannelClosed = func(*channeldb.ChannelCloseSummary) error {
|
||||||
|
return fmt.Errorf("intentional close error")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send a new remote force close event.
|
||||||
|
chanArb.cfg.ChainEvents.RemoteUnilateralClosure <- uniClose
|
||||||
|
|
||||||
|
// Since closing the channel failed, the arbitrator should stay in the
|
||||||
|
// default state.
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
if log.state != StateDefault {
|
||||||
|
t.Fatalf("expected to stay in StateDefault")
|
||||||
|
}
|
||||||
|
chanArb.Stop()
|
||||||
|
|
||||||
|
// Create yet another arbitrator with the same log.
|
||||||
|
chanArb, resolved, err = createTestChannelArbitrator(log)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to create ChannelArbitrator: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := chanArb.Start(); err != nil {
|
||||||
|
t.Fatalf("unable to start ChannelArbitrator: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Starts out in StateDefault.
|
||||||
|
assertState(t, chanArb, StateDefault)
|
||||||
|
|
||||||
|
// Now make fetching the resolutions fail.
|
||||||
|
log.failFetch = fmt.Errorf("intentional fetch failure")
|
||||||
|
chanArb.cfg.ChainEvents.RemoteUnilateralClosure <- uniClose
|
||||||
|
|
||||||
|
// Since logging the resolutions and closing the channel now succeeds,
|
||||||
|
// it should advance to StateContractClosed.
|
||||||
|
assertStateTransitions(
|
||||||
|
t, log.newStates, StateContractClosed,
|
||||||
|
)
|
||||||
|
|
||||||
|
// It should not advance further, however, as fetching resolutions
|
||||||
|
// failed.
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
if log.state != StateContractClosed {
|
||||||
|
t.Fatalf("expected to stay in StateContractClosed")
|
||||||
|
}
|
||||||
|
chanArb.Stop()
|
||||||
|
|
||||||
|
// Create a new arbitrator, and now make fetching resolutions succeed.
|
||||||
|
log.failFetch = nil
|
||||||
|
chanArb, resolved, err = createTestChannelArbitrator(log)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to create ChannelArbitrator: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := chanArb.Start(); err != nil {
|
||||||
|
t.Fatalf("unable to start ChannelArbitrator: %v", err)
|
||||||
|
}
|
||||||
|
defer chanArb.Stop()
|
||||||
|
|
||||||
|
// Finally it should advance to StateFullyResolved.
|
||||||
|
assertStateTransitions(
|
||||||
|
t, log.newStates, StateFullyResolved,
|
||||||
|
)
|
||||||
|
|
||||||
|
// It should also mark the channel as resolved.
|
||||||
|
select {
|
||||||
|
case <-resolved:
|
||||||
|
// Expected.
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatalf("contract was not resolved")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestChannelArbitratorCommitFailure tests that the channel arbitrator is able
|
||||||
|
// to recover from a failed CommitState call at restart.
|
||||||
|
func TestChannelArbitratorCommitFailure(t *testing.T) {
|
||||||
|
// Start out with a log that will fail committing to StateContractClosed.
|
||||||
|
log := &mockArbitratorLog{
|
||||||
|
state: StateDefault,
|
||||||
|
newStates: make(chan ArbitratorState, 5),
|
||||||
|
failCommit: true,
|
||||||
|
failCommitState: StateContractClosed,
|
||||||
|
}
|
||||||
|
|
||||||
|
chanArb, resolved, err := createTestChannelArbitrator(log)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to create ChannelArbitrator: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := chanArb.Start(); err != nil {
|
||||||
|
t.Fatalf("unable to start ChannelArbitrator: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// It should start in StateDefault.
|
||||||
|
assertState(t, chanArb, StateDefault)
|
||||||
|
|
||||||
|
closed := make(chan struct{})
|
||||||
|
chanArb.cfg.MarkChannelClosed = func(*channeldb.ChannelCloseSummary) error {
|
||||||
|
close(closed)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send a remote force close event.
|
||||||
|
commitSpend := &chainntnfs.SpendDetail{
|
||||||
|
SpenderTxHash: &chainhash.Hash{},
|
||||||
|
}
|
||||||
|
|
||||||
|
uniClose := &lnwallet.UnilateralCloseSummary{
|
||||||
|
SpendDetail: commitSpend,
|
||||||
|
HtlcResolutions: &lnwallet.HtlcResolutions{},
|
||||||
|
}
|
||||||
|
chanArb.cfg.ChainEvents.RemoteUnilateralClosure <- uniClose
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-closed:
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatalf("channel was not marked closed")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Since the channel was marked closed in the database, but the commit
|
||||||
|
// to the next state failed, the state should still be StateDefault.
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
if log.state != StateDefault {
|
||||||
|
t.Fatalf("expected to stay in StateDefault")
|
||||||
|
}
|
||||||
|
chanArb.Stop()
|
||||||
|
|
||||||
|
// Start the arbitrator again, with IsPendingClose reporting the
|
||||||
|
// channel closed in the database.
|
||||||
|
chanArb, resolved, err = createTestChannelArbitrator(log)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to create ChannelArbitrator: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.failCommit = false
|
||||||
|
|
||||||
|
chanArb.cfg.IsPendingClose = true
|
||||||
|
chanArb.cfg.ClosingHeight = 100
|
||||||
|
chanArb.cfg.CloseType = channeldb.RemoteForceClose
|
||||||
|
|
||||||
|
if err := chanArb.Start(); err != nil {
|
||||||
|
t.Fatalf("unable to start ChannelArbitrator: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Since the channel is marked closed in the database, it should
|
||||||
|
// advance to StateContractClosed and StateFullyResolved.
|
||||||
|
assertStateTransitions(
|
||||||
|
t, log.newStates, StateContractClosed, StateFullyResolved,
|
||||||
|
)
|
||||||
|
|
||||||
|
// It should also mark the channel as resolved.
|
||||||
|
select {
|
||||||
|
case <-resolved:
|
||||||
|
// Expected.
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatalf("contract was not resolved")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestChannelArbitratorEmptyResolutions makes sure that a channel that is
|
||||||
|
// pending close in the database, but haven't had any resolutions logged will
|
||||||
|
// not be marked resolved. This situation must be handled to avoid closing
|
||||||
|
// channels from earlier versions of the ChannelArbitrator, which didn't have a
|
||||||
|
// proper handoff from the ChainWatcher, and we could risk ending up in a state
|
||||||
|
// where the channel was closed in the DB, but the resolutions weren't properly
|
||||||
|
// written.
|
||||||
|
func TestChannelArbitratorEmptyResolutions(t *testing.T) {
|
||||||
|
// Start out with a log that will fail writing the set of resolutions.
|
||||||
|
log := &mockArbitratorLog{
|
||||||
|
state: StateDefault,
|
||||||
|
newStates: make(chan ArbitratorState, 5),
|
||||||
|
failFetch: errNoResolutions,
|
||||||
|
}
|
||||||
|
|
||||||
|
chanArb, _, err := createTestChannelArbitrator(log)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to create ChannelArbitrator: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
chanArb.cfg.IsPendingClose = true
|
||||||
|
chanArb.cfg.ClosingHeight = 100
|
||||||
|
chanArb.cfg.CloseType = channeldb.RemoteForceClose
|
||||||
|
|
||||||
|
if err := chanArb.Start(); err != nil {
|
||||||
|
t.Fatalf("unable to start ChannelArbitrator: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// It should not advance its state beyond StateContractClosed, since
|
||||||
|
// fetching resolutions fails.
|
||||||
|
assertStateTransitions(
|
||||||
|
t, log.newStates, StateContractClosed,
|
||||||
|
)
|
||||||
|
|
||||||
|
// It should not advance further, however, as fetching resolutions
|
||||||
|
// failed.
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
if log.state != StateContractClosed {
|
||||||
|
t.Fatalf("expected to stay in StateContractClosed")
|
||||||
|
}
|
||||||
|
chanArb.Stop()
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user