lnwallet/channel: delete state after ack from breach arb

This commit is contained in:
Conner Fromknecht 2017-11-20 23:57:33 -08:00
parent ff3a1389e5
commit bb8c5f82da
No known key found for this signature in database
GPG Key ID: 39DE78FBE6ACB0EF

@ -1111,6 +1111,9 @@ type LightningChannel struct {
status channelState status channelState
// ChanPoint is the funding outpoint of this channel.
ChanPoint *wire.OutPoint
// sigPool is a pool of workers that are capable of signing and // sigPool is a pool of workers that are capable of signing and
// validating signatures in parallel. This is utilized as an // validating signatures in parallel. This is utilized as an
// optimization to void serially signing or validating the HTLC // optimization to void serially signing or validating the HTLC
@ -1269,6 +1272,7 @@ func NewLightningChannel(signer Signer, events chainntnfs.ChainNotifier,
remoteChanCfg: &state.RemoteChanCfg, remoteChanCfg: &state.RemoteChanCfg,
localUpdateLog: localUpdateLog, localUpdateLog: localUpdateLog,
remoteUpdateLog: remoteUpdateLog, remoteUpdateLog: remoteUpdateLog,
ChanPoint: &state.FundingOutpoint,
Capacity: state.Capacity, Capacity: state.Capacity,
FundingWitnessScript: multiSigScript, FundingWitnessScript: multiSigScript,
ForceCloseSignal: make(chan struct{}), ForceCloseSignal: make(chan struct{}),
@ -1717,6 +1721,11 @@ type BreachRetribution struct {
// commitment transaction. // commitment transaction.
BreachTransaction *wire.MsgTx BreachTransaction *wire.MsgTx
// BreachHeight records the block height confirming the breach
// transaction, used as a height hint when registering for
// confirmations.
BreachHeight uint32
// ChainHash is the chain that the contract beach was identified // ChainHash is the chain that the contract beach was identified
// within. This is also the resident chain of the contract (the chain // within. This is also the resident chain of the contract (the chain
// the contract was created on). // the contract was created on).
@ -1757,13 +1766,18 @@ type BreachRetribution struct {
// HtlcRetributions is a slice of HTLC retributions for each output // HtlcRetributions is a slice of HTLC retributions for each output
// active HTLC output within the breached commitment transaction. // active HTLC output within the breached commitment transaction.
HtlcRetributions []HtlcRetribution HtlcRetributions []HtlcRetribution
// Err is used to reliably hand-off the breach retribution to the breach
// arbiter.
Err chan error
} }
// newBreachRetribution creates a new fully populated BreachRetribution for the // newBreachRetribution creates a new fully populated BreachRetribution for the
// passed channel, at a particular revoked state number, and one which targets // passed channel, at a particular revoked state number, and one which targets
// the passed commitment transaction. // the passed commitment transaction.
func newBreachRetribution(chanState *channeldb.OpenChannel, stateNum uint64, func newBreachRetribution(chanState *channeldb.OpenChannel, stateNum uint64,
broadcastCommitment *wire.MsgTx) (*BreachRetribution, error) { broadcastCommitment *wire.MsgTx,
breachHeight uint32) (*BreachRetribution, error) {
commitHash := broadcastCommitment.TxHash() commitHash := broadcastCommitment.TxHash()
@ -1926,6 +1940,7 @@ func newBreachRetribution(chanState *channeldb.OpenChannel, stateNum uint64,
return &BreachRetribution{ return &BreachRetribution{
ChainHash: chanState.ChainHash, ChainHash: chanState.ChainHash,
BreachTransaction: broadcastCommitment, BreachTransaction: broadcastCommitment,
BreachHeight: breachHeight,
RevokedStateNum: stateNum, RevokedStateNum: stateNum,
PendingHTLCs: revokedSnapshot.Htlcs, PendingHTLCs: revokedSnapshot.Htlcs,
LocalOutpoint: localOutpoint, LocalOutpoint: localOutpoint,
@ -1933,6 +1948,7 @@ func newBreachRetribution(chanState *channeldb.OpenChannel, stateNum uint64,
RemoteOutpoint: remoteOutpoint, RemoteOutpoint: remoteOutpoint,
RemoteOutputSignDesc: remoteSignDesc, RemoteOutputSignDesc: remoteSignDesc,
HtlcRetributions: htlcRetributions, HtlcRetributions: htlcRetributions,
Err: make(chan error, 1),
}, nil }, nil
} }
@ -1951,6 +1967,7 @@ func (lc *LightningChannel) closeObserver(channelCloseNtfn *chainntnfs.SpendEven
var ( var (
commitSpend *chainntnfs.SpendDetail commitSpend *chainntnfs.SpendDetail
spendHeight uint32
ok bool ok bool
) )
@ -1962,6 +1979,8 @@ func (lc *LightningChannel) closeObserver(channelCloseNtfn *chainntnfs.SpendEven
return return
} }
spendHeight = uint32(commitSpend.SpendingHeight)
// Otherwise, we've been signalled to bail out early by the // Otherwise, we've been signalled to bail out early by the
// caller/maintainer of this channel. // caller/maintainer of this channel.
case <-lc.observerQuit: case <-lc.observerQuit:
@ -2012,6 +2031,7 @@ func (lc *LightningChannel) closeObserver(channelCloseNtfn *chainntnfs.SpendEven
remoteStateNum := lc.channelState.RemoteCommitment.CommitHeight remoteStateNum := lc.channelState.RemoteCommitment.CommitHeight
// TODO(roasbeef): track heights distinctly? // TODO(roasbeef): track heights distinctly?
switch { switch {
// If state number spending transaction matches the current latest // If state number spending transaction matches the current latest
// state, then they've initiated a unilateral close. So we'll trigger // state, then they've initiated a unilateral close. So we'll trigger
@ -2037,15 +2057,17 @@ func (lc *LightningChannel) closeObserver(channelCloseNtfn *chainntnfs.SpendEven
ChanPoint: lc.channelState.FundingOutpoint, ChanPoint: lc.channelState.FundingOutpoint,
ChainHash: lc.channelState.ChainHash, ChainHash: lc.channelState.ChainHash,
ClosingTXID: *commitSpend.SpenderTxHash, ClosingTXID: *commitSpend.SpenderTxHash,
CloseHeight: spendHeight,
RemotePub: lc.channelState.IdentityPub, RemotePub: lc.channelState.IdentityPub,
Capacity: lc.Capacity, Capacity: lc.Capacity,
SettledBalance: lc.channelState.LocalCommitment.LocalBalance.ToSatoshis(), SettledBalance: lc.channelState.LocalCommitment.LocalBalance.ToSatoshis(),
CloseType: channeldb.ForceClose, CloseType: channeldb.ForceClose,
IsPending: true, IsPending: true,
} }
if err := lc.DeleteState(&closeSummary); err != nil { if err := lc.DeleteState(&closeSummary); err != nil {
walletLog.Errorf("unable to delete channel state: %v", walletLog.Errorf("unable to delete channel state: %v", err)
err) return
} }
// TODO(roasbeef): need to handle case of if > // TODO(roasbeef): need to handle case of if >
@ -2109,6 +2131,17 @@ func (lc *LightningChannel) closeObserver(channelCloseNtfn *chainntnfs.SpendEven
} }
} }
// We'll also send all the details necessary to re-claim funds
// that are suspended within any contracts.
unilateralCloseSummary := &UnilateralCloseSummary{
SpendDetail: commitSpend,
ChannelCloseSummary: closeSummary,
SelfOutPoint: selfPoint,
SelfOutputSignDesc: selfSignDesc,
MaturityDelay: uint32(lc.remoteChanCfg.CsvDelay),
HtlcResolutions: htlcResolutions,
}
// TODO(roasbeef): send msg before writing to disk // TODO(roasbeef): send msg before writing to disk
// * need to ensure proper fault tolerance in all cases // * need to ensure proper fault tolerance in all cases
// * get ACK from the consumer of the ntfn before writing to disk? // * get ACK from the consumer of the ntfn before writing to disk?
@ -2118,15 +2151,11 @@ func (lc *LightningChannel) closeObserver(channelCloseNtfn *chainntnfs.SpendEven
// commitment transaction broadcast. // commitment transaction broadcast.
close(lc.UnilateralCloseSignal) close(lc.UnilateralCloseSignal)
// We'll also send all the details necessary to re-claim funds select {
// that are suspended within any contracts. case lc.UnilateralClose <- unilateralCloseSummary:
lc.UnilateralClose <- &UnilateralCloseSummary{ case <-lc.observerQuit:
SpendDetail: commitSpend, walletLog.Errorf("channel shutting down")
ChannelCloseSummary: closeSummary, return
SelfOutPoint: selfPoint,
SelfOutputSignDesc: selfSignDesc,
MaturityDelay: uint32(lc.remoteChanCfg.CsvDelay),
HtlcResolutions: htlcResolutions,
} }
// If the state number broadcast is lower than the remote node's // If the state number broadcast is lower than the remote node's
@ -2140,10 +2169,15 @@ func (lc *LightningChannel) closeObserver(channelCloseNtfn *chainntnfs.SpendEven
"broadcast!!!", lc.channelState.FundingOutpoint, "broadcast!!!", lc.channelState.FundingOutpoint,
remoteStateNum) remoteStateNum)
if err := lc.channelState.MarkBorked(true); err != nil {
walletLog.Errorf("unable to mark channel as borked: %v", err)
return
}
// Create a new reach retribution struct which contains all the // Create a new reach retribution struct which contains all the
// data needed to swiftly bring the cheating peer to justice. // data needed to swiftly bring the cheating peer to justice.
retribution, err := newBreachRetribution(lc.channelState, retribution, err := newBreachRetribution(lc.channelState,
broadcastStateNum, commitTxBroadcast) broadcastStateNum, commitTxBroadcast, spendHeight)
if err != nil { if err != nil {
walletLog.Errorf("unable to create breach retribution: %v", err) walletLog.Errorf("unable to create breach retribution: %v", err)
return return
@ -2155,7 +2189,54 @@ func (lc *LightningChannel) closeObserver(channelCloseNtfn *chainntnfs.SpendEven
// Finally, send the retribution struct over the contract beach // Finally, send the retribution struct over the contract beach
// channel to allow the observer the use the breach retribution // channel to allow the observer the use the breach retribution
// to sweep ALL funds. // to sweep ALL funds.
lc.ContractBreach <- retribution select {
case lc.ContractBreach <- retribution:
case <-lc.observerQuit:
walletLog.Errorf("channel shutting down")
return
}
// Wait for the breach arbiter to ACK the handoff before marking
// the channel as pending force closed in channeldb.
select {
case err := <-retribution.Err:
// Bail if the handoff failed.
if err != nil {
walletLog.Errorf("unable to handoff "+
"retribution info: %v", err)
return
}
case <-lc.observerQuit:
walletLog.Errorf("channel shutting down")
return
}
// At this point, we've successfully received an ack for the
// breach close. We now construct and persist the close
// summary, marking the channel as pending force closed.
settledBalance := lc.channelState.LocalCommitment.
LocalBalance.ToSatoshis()
closeSummary := channeldb.ChannelCloseSummary{
ChanPoint: lc.channelState.FundingOutpoint,
ChainHash: lc.channelState.ChainHash,
ClosingTXID: *commitSpend.SpenderTxHash,
CloseHeight: spendHeight,
RemotePub: lc.channelState.IdentityPub,
Capacity: lc.Capacity,
SettledBalance: settledBalance,
CloseType: channeldb.BreachClose,
IsPending: true,
}
err = lc.DeleteState(&closeSummary)
if err != nil {
walletLog.Errorf("unable to delete channel state: %v", err)
return
}
walletLog.Infof("Breached channel=%v marked pending-closed",
lc.channelState.FundingOutpoint)
} }
} }
@ -3190,6 +3271,10 @@ func (lc *LightningChannel) ProcessChanSyncMsg(msg *lnwire.ChannelReestablish) (
// chain reported by the remote party is not equal to our chain tail, // chain reported by the remote party is not equal to our chain tail,
// then we cannot sync. // then we cannot sync.
case !oweRevocation && localChainTail.height != msg.RemoteCommitTailHeight: case !oweRevocation && localChainTail.height != msg.RemoteCommitTailHeight:
if err := lc.channelState.MarkBorked(true); err != nil {
return nil, err
}
return nil, ErrCannotSyncCommitChains return nil, ErrCannotSyncCommitChains
} }
@ -3218,6 +3303,10 @@ func (lc *LightningChannel) ProcessChanSyncMsg(msg *lnwire.ChannelReestablish) (
} else if !oweCommitment && remoteChainTip.height+1 != } else if !oweCommitment && remoteChainTip.height+1 !=
msg.NextLocalCommitHeight { msg.NextLocalCommitHeight {
if err := lc.channelState.MarkBorked(true); err != nil {
return nil, err
}
// If we don't owe them a commitment, yet the tip of their // If we don't owe them a commitment, yet the tip of their
// chain isn't one more than the next local commit height they // chain isn't one more than the next local commit height they
// report, we'll fail the channel. // report, we'll fail the channel.
@ -5118,3 +5207,14 @@ func (lc *LightningChannel) IsPending() bool {
return lc.channelState.IsPending return lc.channelState.IsPending
} }
// State provides access to the channel's internal state for testing.
func (lc *LightningChannel) State() *channeldb.OpenChannel {
return lc.channelState
}
// ObserverQuit returns the quit channel used to coordinate the shutdown of the
// close observer.
func (lc *LightningChannel) ObserverQuit() chan struct{} {
return lc.observerQuit
}