breacharbiter: adds persistence to retribution flow

This commit introduces a RetributionStore interface, which
  establishes the methods used to access persisted information
  regarding breached channels. A RetributionStore is used to
  persist retributionInfo regarding all channels for which
  the wallet has signaled a breach.

  The current design could be improved by moving certain
  functionality, e.g. closing channels and htlc links, such
  that they are handled by upstream by their respective
  subsystems. This was investigated, but deemed preferable to
  postpone to a later update to prevent the current
  implementation from sprawling amongst too many packages.

  The test suite creates a mockRetributionStore and ensures that
  it exhibits the same behavior as the retribution store backed
  by a channeldb.DB.
This commit is contained in:
Conner Fromknecht 2017-07-25 22:57:29 -07:00 committed by Olaoluwa Osuntokun
parent 6ffe33f01a
commit c3736e6893
2 changed files with 514 additions and 212 deletions

@ -15,6 +15,7 @@ import (
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/chaincfg/chainhash"
"github.com/roasbeef/btcd/txscript"
"github.com/roasbeef/btcd/wire"
@ -27,7 +28,7 @@ import (
// is critical that such state is persisted on disk, so that if our node
// restarts at any point during the retribution procedure, we can recover and
// continue from the persisted state.
var retributionBucket = []byte("ret")
var retributionBucket = []byte("retribution")
// breachArbiter is a special subsystem which is responsible for watching and
// acting on the detection of any attempted uncooperative channel breaches by
@ -38,13 +39,14 @@ var retributionBucket = []byte("ret")
// counterparties.
// TODO(roasbeef): closures in config for subsystem pointers to decouple?
type breachArbiter struct {
wallet *lnwallet.LightningWallet
db *channeldb.DB
notifier chainntnfs.ChainNotifier
htlcSwitch *htlcswitch.Switch
chainIO lnwallet.BlockChainIO
estimator lnwallet.FeeEstimator
retributionStore *retributionStore
wallet *lnwallet.LightningWallet
db *channeldb.DB
notifier chainntnfs.ChainNotifier
chainIO lnwallet.BlockChainIO
estimator lnwallet.FeeEstimator
htlcSwitch *htlcswitch.Switch
retributionStore RetributionStore
// breachObservers is a map which tracks all the active breach
// observers we're currently managing. The key of the map is the
@ -84,10 +86,13 @@ func newBreachArbiter(wallet *lnwallet.LightningWallet, db *channeldb.DB,
chain lnwallet.BlockChainIO, fe lnwallet.FeeEstimator) *breachArbiter {
return &breachArbiter{
wallet: wallet,
notifier: notifier,
htlcSwitch: h,
db: db,
wallet: wallet,
db: db,
notifier: notifier,
chainIO: chain,
htlcSwitch: h,
estimator: fe,
retributionStore: newRetributionStore(db),
breachObservers: make(map[wire.OutPoint]chan struct{}),
@ -107,30 +112,30 @@ func (b *breachArbiter) Start() error {
brarLog.Tracef("Starting breach arbiter")
// TODO(roasbeef): instead use closure height of channel
_, currentHeight, err := b.chainIO.GetBestBlock()
if err != nil {
return err
}
// We load all pending retributions from the database and
// deterministically reconstruct a channel close summary for each. In
// the event that a channel is still open after being breached, we can
// use the close summary to reinitiate a channel close so that the
// breach is reflected in channeldb.
breachRetInfos := make(map[wire.OutPoint]retributionInfo)
closeSummaries := make(map[wire.OutPoint]channeldb.ChannelCloseSummary)
err := b.retributionStore.ForAll(func(ret *retributionInfo) error {
// Extract emitted retribution information.
breachRetInfos[ret.chanPoint] = *ret
// We load any pending retributions from the database. For each retribution
// we need to restart the retribution procedure to claim our just reward.
err = b.retributionStore.ForAll(func(ret *retributionInfo) error {
// Register for a notification when the breach transaction is confirmed
// on chain.
breachTXID := &ret.commitHash
confChan, err := b.notifier.RegisterConfirmationsNtfn(breachTXID, 1,
uint32(currentHeight))
if err != nil {
brarLog.Errorf("unable to register for conf updates for txid: "+
"%v, err: %v", breachTXID, err)
return err
// Deterministically reconstruct channel close summary from
// persisted retribution information and record in breach close
// summaries map under the corresponding channel point.
closeSummary := channeldb.ChannelCloseSummary{
ChanPoint: ret.chanPoint,
ClosingTXID: ret.commitHash,
RemotePub: &ret.remoteIdentity,
Capacity: ret.capacity,
SettledBalance: ret.settledBalance,
CloseType: channeldb.BreachClose,
IsPending: true,
}
// Launch a new goroutine which to finalize the channel retribution
// after the breach transaction confirms.
b.wg.Add(1)
go b.exactRetribution(confChan, ret)
closeSummaries[ret.chanPoint] = closeSummary
return nil
})
@ -147,15 +152,39 @@ func (b *breachArbiter) Start() error {
return err
}
if len(activeChannels) > 0 {
nActive := len(activeChannels)
if nActive > 0 {
brarLog.Infof("Retrieved %v channels from database, watching "+
"with vigilance!", len(activeChannels))
}
// For each of the channels read from disk, we'll create a channel
// state machine in order to watch for any potential channel closures.
channelsToWatch := make([]*lnwallet.LightningChannel, len(activeChannels))
for i, chanState := range activeChannels {
// Here we will determine a set of channels that will need to be managed
// by the contractObserver. For each of the open channels read from
// disk, we will create a channel state machine that can be used to
// watch for any potential channel closures. We must first exclude any
// channel whose retribution process has been initiated, and proceed to
// mark them as closed.
// The state machines generated for these filtered channels can be
// discarded, as their fate will be placed in the hands of an
// exactRetribution task spawned later.
//
// NOTE Spawning of the exactRetribution task is intentionally postponed
// until after this step in order to ensure that the all breached
// channels are reflected as closed in channeldb and consistent with
// what is checkpointed by the breach arbiter. Instead of treating the
// breached-and-closed and breached-but-still-active channels as
// separate sets of channels, we first
// ensure that all breach-but-still-active channels are promoted to
// breached-and-closed during restart, allowing us to treat them as a
// single set from here on out. This approach also has the added benefit
// of minimizing the likelihood that the wrong number of tasks are
// spawned per breached channel, and prevents us from being in a
// position where
// retribution has completed but the channel is still marked as open in
// channeldb.
channelsToWatch := make([]*lnwallet.LightningChannel, 0, nActive)
for _, chanState := range activeChannels {
// Initialize active channel from persisted channel state.
channel, err := lnwallet.NewLightningChannel(nil, b.notifier,
b.estimator, chanState)
if err != nil {
@ -164,9 +193,73 @@ func (b *breachArbiter) Start() error {
return err
}
channelsToWatch[i] = channel
// Before marking this as an active channel that the breach
// arbiter should watch, check to see if this channel was
// previously breached. If so, we attempt to reflect this in the
// channeldb by closing the channel. Upon success, we continue
// because the channel is no longer open, and thus does not need
// to be managed by the contractObserver.
chanPoint := chanState.FundingOutpoint
if closeSummary, ok := closeSummaries[chanPoint]; ok {
// Since this channel should not be open, we immediately
// notify the HTLC switch that this link should be
// closed, and that all activity on the link should
// cease.
b.htlcSwitch.CloseLink(
&chanState.FundingOutpoint,
htlcswitch.CloseBreach,
)
// Ensure channeldb is consistent with the persisted
// breach.
err := channel.DeleteState(&closeSummary)
if err != nil {
brarLog.Errorf("unable to delete channel "+
"state: %v", err)
return err
}
// Now that this channel is both breached _and_ closed,
// we can skip adding it to the `channelsToWatch` since
// we can begin the retribution process immediately.
continue
}
// Finally, add this channel to breach arbiter's list of
// channels to watch.
channelsToWatch = append(channelsToWatch, channel)
}
// Trim channels in the event that some were filtered.
channelsToWatch = channelsToWatch[:]
// TODO(roasbeef): instead use closure height of channel
_, currentHeight, err := b.chainIO.GetBestBlock()
if err != nil {
return err
}
// Spawn the exactRetribution tasks to monitor and resolve any breaches
// that were loaded from the retribution store.
for chanPoint, closeSummary := range closeSummaries {
// Register for a notification when the breach transaction is
// confirmed on chain.
breachTXID := closeSummary.ClosingTXID
confChan, err := b.notifier.RegisterConfirmationsNtfn(
&breachTXID, 1, uint32(currentHeight))
if err != nil {
brarLog.Errorf("unable to register for conf updates "+
"for txid: %v, err: %v", breachTXID, err)
return err
}
// Launch a new goroutine which to finalize the channel
// retribution after the breach transaction confirms.
retInfo := breachRetInfos[chanPoint]
b.wg.Add(1)
go b.exactRetribution(confChan, &retInfo)
}
// Start watching the remaining active channels!
b.wg.Add(1)
go b.contractObserver(channelsToWatch)
@ -191,34 +284,45 @@ func (b *breachArbiter) Start() error {
brarLog.Infof("Watching for the closure of ChannelPoint(%v)",
pendingClose.ChanPoint)
chanPoint := &pendingClose.ChanPoint
closeTXID := &pendingClose.ClosingTXID
closeTXID := pendingClose.ClosingTXID
confNtfn, err := b.notifier.RegisterConfirmationsNtfn(
closeTXID, 1, uint32(currentHeight),
&closeTXID, 1, uint32(currentHeight),
)
if err != nil {
return err
}
go func() {
b.wg.Add(1)
go func(chanPoint wire.OutPoint) {
defer b.wg.Done()
// In the case that the ChainNotifier is shutting down,
// all subscriber notification channels will be closed,
// generating a nil receive.
confInfo, ok := <-confNtfn.Confirmed
if !ok {
select {
case confInfo, ok := <-confNtfn.Confirmed:
if !ok {
return
}
brarLog.Infof("ChannelPoint(%v) is "+
"fully closed, at height: %v",
chanPoint, confInfo.BlockHeight)
// TODO(roasbeef): need to store
// UnilateralCloseSummary on disk so can
// possibly sweep output here
err := b.db.MarkChanFullyClosed(&chanPoint)
if err != nil {
brarLog.Errorf("unable to mark chan "+
"as closed: %v", err)
}
case <-b.quit:
return
}
brarLog.Infof("ChannelPoint(%v) is fully closed, "+
"at height: %v", chanPoint, confInfo.BlockHeight)
// TODO(roasbeef): need to store UnilateralCloseSummary
// on disk so can possibly sweep output here
if err := b.db.MarkChanFullyClosed(chanPoint); err != nil {
brarLog.Errorf("unable to mark chan as closed: %v", err)
}
}()
}(pendingClose.ChanPoint)
}
return nil
@ -248,7 +352,9 @@ func (b *breachArbiter) Stop() error {
// channel into the daemon's wallet.
//
// NOTE: This MUST be run as a goroutine.
func (b *breachArbiter) contractObserver(activeChannels []*lnwallet.LightningChannel) {
func (b *breachArbiter) contractObserver(
activeChannels []*lnwallet.LightningChannel) {
defer b.wg.Done()
// For each active channel found within the database, we launch a
@ -273,7 +379,8 @@ out:
case breachInfo := <-b.breachedContracts:
_, currentHeight, err := b.chainIO.GetBestBlock()
if err != nil {
brarLog.Errorf("unable to get best height: %v", err)
brarLog.Errorf(
"unable to get best height: %v", err)
}
// A new channel contract has just been breached! We
@ -286,28 +393,26 @@ out:
breachTXID, 1, uint32(currentHeight),
)
if err != nil {
brarLog.Errorf("unable to register for conf updates for txid: "+
"%v, err: %v", breachTXID, err)
brarLog.Errorf("unable to register for conf "+
"updates for txid: %v, err: %v",
breachTXID, err)
continue
}
brarLog.Warnf("A channel has been breached with txid: %v. "+
"Waiting for confirmation, then justice will be served!",
breachTXID)
brarLog.Warnf("A channel has been breached with "+
"txid: %v. Waiting for confirmation, then "+
"justice will be served!", breachTXID)
// Persist the pending retribution state to disk.
if err := b.retributionStore.Add(breachInfo); err != nil {
brarLog.Errorf("unable to persist breach info to db: %v", err)
continue
}
// With the notification registered and retribution state persisted,
// we launch a new goroutine which will finalize the channel
// retribution after the breach transaction has been confirmed.
// With the retribution state persisted, channel close
// persisted, and notification registered, we launch a
// new goroutine which will finalize the channel
// retribution after the breach transaction has been
// confirmed.
b.wg.Add(1)
go b.exactRetribution(confChan, breachInfo)
delete(b.breachObservers, breachInfo.chanPoint)
case contract := <-b.newContracts:
// A new channel has just been opened within the
// daemon, so we launch a new breachObserver to handle
@ -335,9 +440,10 @@ out:
b.wg.Add(1)
go b.breachObserver(contract, settleSignal)
// TODO(roasbeef): add doneChan to signal to peer continue
// * peer send over to us on loadActiveChanenls, sync
// until we're aware so no state transitions
// TODO(roasbeef): add doneChan to signal to peer
// continue * peer send over to us on
// loadActiveChanenls, sync until we're aware so no
// state transitions
case chanPoint := <-b.settledContracts:
// A new channel has been closed either unilaterally or
// cooperatively, as a result we no longer need a
@ -371,7 +477,8 @@ out:
// the lingering funds within the channel into the daemon's wallet.
//
// NOTE: This MUST be run as a goroutine.
func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent,
func (b *breachArbiter) exactRetribution(
confChan *chainntnfs.ConfirmationEvent,
breachInfo *retributionInfo) {
defer b.wg.Done()
@ -403,9 +510,11 @@ func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent,
return
}
brarLog.Debugf("Broadcasting justice tx: %v", newLogClosure(func() string {
return spew.Sdump(justiceTx)
}))
brarLog.Debugf(
"Broadcasting justice tx: %v",
newLogClosure(func() string {
return spew.Sdump(justiceTx)
}))
_, currentHeight, err := b.chainIO.GetBestBlock()
if err != nil {
@ -455,16 +564,18 @@ func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent,
brarLog.Errorf("unable to mark chan as closed: %v", err)
}
// Justice has been carried out; we can safely delete the retribution
// info from the database.
// Justice has been carried out; we can safely delete the
// retribution info from the database.
err = b.retributionStore.Remove(&breachInfo.chanPoint)
if err != nil {
brarLog.Errorf("unable to remove retribution from the db: %v", err)
brarLog.Errorf("unable to remove retribution "+
"from the db: %v", err)
}
// TODO(roasbeef): add peer to blacklist?
// TODO(roasbeef): close other active channels with offending peer
// TODO(roasbeef): close other active channels with offending
// peer
close(breachInfo.doneChan)
@ -488,7 +599,8 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel,
chanPoint := contract.ChannelPoint()
brarLog.Debugf("Breach observer for ChannelPoint(%v) started", chanPoint)
brarLog.Debugf(
"Breach observer for ChannelPoint(%v) started", chanPoint)
select {
// A read from this channel indicates that the contract has been
@ -502,18 +614,32 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel,
case closeInfo := <-contract.UnilateralClose:
// Launch a goroutine to cancel out this contract within the
// breachArbiter's main goroutine.
b.wg.Add(1)
go func() {
b.settledContracts <- chanPoint
defer b.wg.Done()
select {
case b.settledContracts <- chanPoint:
case <-b.quit:
}
}()
// Next, we'll launch a goroutine to wait until the closing
// transaction has been confirmed so we can mark the contract
// as resolved in the database.
// as resolved in the database. This go routine is _not_
// tracked by the breach aribter's wait group since the callback
// may not be executed before shutdown, potentially leading to
// a deadlock.
//
// TODO(roasbeef): also notify utxoNursery, might've had
// outbound HTLC's in flight
go waitForChanToClose(uint32(closeInfo.SpendingHeight), b.notifier,
nil, chanPoint, closeInfo.SpenderTxHash, func() {
go waitForChanToClose(
uint32(closeInfo.SpendingHeight),
b.notifier,
nil,
chanPoint,
closeInfo.SpenderTxHash,
func() {
// As we just detected a channel was closed via
// a unilateral commitment broadcast by the
// remote party, we'll need to sweep our main
@ -528,11 +654,14 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel,
)
if err != nil {
brarLog.Errorf("unable to "+
"generate sweep tx: %v", err)
"generate sweep tx: %v",
err)
goto close
}
err = b.wallet.PublishTransaction(sweepTx)
err = b.wallet.PublishTransaction(
sweepTx,
)
if err != nil {
brarLog.Errorf("unable to "+
"broadcast tx: %v", err)
@ -540,11 +669,14 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel,
}
close:
brarLog.Infof("Force closed ChannelPoint(%v) is "+
"fully closed, updating DB", chanPoint)
brarLog.Infof("Force closed ChannelPoint(%v) "+
"is fully closed, updating DB",
chanPoint)
if err := b.db.MarkChanFullyClosed(chanPoint); err != nil {
brarLog.Errorf("unable to mark chan as closed: %v", err)
err := b.db.MarkChanFullyClosed(chanPoint)
if err != nil {
brarLog.Errorf("unable to mark chan "+
"as closed: %v", err)
}
})
@ -563,21 +695,10 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel,
// links associated with this peer.
b.htlcSwitch.CloseLink(chanPoint, htlcswitch.CloseBreach)
chanInfo := contract.StateSnapshot()
closeInfo := &channeldb.ChannelCloseSummary{
ChanPoint: *chanPoint,
ClosingTXID: breachInfo.BreachTransaction.TxHash(),
RemotePub: &chanInfo.RemoteIdentity,
Capacity: chanInfo.Capacity,
SettledBalance: chanInfo.LocalBalance.ToSatoshis(),
CloseType: channeldb.BreachClose,
IsPending: true,
}
if err := contract.DeleteState(closeInfo); err != nil {
brarLog.Errorf("unable to delete channel state: %v", err)
}
// TODO(roasbeef): need to handle case of remote broadcast
// mid-local initiated state-transition, possible false-positive?
// mid-local initiated state-transition, possible
// false-positive?
// First we generate the witness generation function which will
// be used to sweep the output only we can satisfy on the
@ -591,7 +712,8 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel,
desc.SigHashes = hc
desc.InputIndex = inputIndex
return lnwallet.CommitSpendNoDelay(b.wallet.Cfg.Signer, &desc, tx)
return lnwallet.CommitSpendNoDelay(
b.wallet.Cfg.Signer, &desc, tx)
}
// Next we create the witness generation function that will be
@ -606,32 +728,67 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel,
desc.SigHashes = hc
desc.InputIndex = inputIndex
return lnwallet.CommitSpendRevoke(b.wallet.Cfg.Signer, &desc, tx)
return lnwallet.CommitSpendRevoke(
b.wallet.Cfg.Signer, &desc, tx)
}
// Finally, we send the retribution information into the breachArbiter
// event loop to deal swift justice.
// Assemble the retribution information that parameterizes the
// construction of transactions required to correct the breach.
// TODO(roasbeef): populate htlc breaches
b.breachedContracts <- &retributionInfo{
retInfo := &retributionInfo{
commitHash: breachInfo.BreachTransaction.TxHash(),
chanPoint: *chanPoint,
remoteIdentity: chanInfo.RemoteIdentity,
capacity: chanInfo.Capacity,
settledBalance: chanInfo.LocalBalance.ToSatoshis(),
selfOutput: &breachedOutput{
amt: btcutil.Amount(localSignDesc.Output.Value),
outpoint: breachInfo.LocalOutpoint,
signDescriptor: localSignDesc,
witnessType: localWitnessType,
witnessType: lnwallet.CommitmentNoDelay,
witnessFunc: localWitness,
},
revokedOutput: &breachedOutput{
amt: btcutil.Amount(remoteSignDesc.Output.Value),
outpoint: breachInfo.RemoteOutpoint,
signDescriptor: remoteSignDesc,
witnessType: remoteWitnessType,
witnessType: lnwallet.CommitmentRevoke,
witnessFunc: remoteWitness,
},
htlcOutputs: []*breachedOutput{},
doneChan: make(chan struct{}),
doneChan: make(chan struct{}),
}
// Persist the pending retribution state to disk.
if err := b.retributionStore.Add(retInfo); err != nil {
brarLog.Errorf("unable to persist "+
"retribution info to db: %v", err)
}
closeInfo := &channeldb.ChannelCloseSummary{
ChanPoint: *chanPoint,
ClosingTXID: breachInfo.BreachTransaction.TxHash(),
RemotePub: &chanInfo.RemoteIdentity,
Capacity: chanInfo.Capacity,
SettledBalance: chanInfo.LocalBalance.ToSatoshis(),
CloseType: channeldb.BreachClose,
IsPending: true,
}
if err := contract.DeleteState(closeInfo); err != nil {
brarLog.Errorf(
"unable to delete channel state: %v", err)
}
// Finally, we send the retribution information into the
// breachArbiter event loop to deal swift justice.
select {
case b.breachedContracts <- retInfo:
case <-b.quit:
}
case <-b.quit:
@ -646,8 +803,9 @@ type breachedOutput struct {
amt btcutil.Amount
outpoint wire.OutPoint
signDescriptor *lnwallet.SignDescriptor
signDescriptor lnwallet.SignDescriptor
witnessType lnwallet.WitnessType
witnessFunc lnwallet.WitnessGenerator
twoStageClaim bool
}
@ -661,6 +819,14 @@ type retributionInfo struct {
commitHash chainhash.Hash
chanPoint wire.OutPoint
// Fields copied from channel snapshot when a breach is detected. This
// is necessary for deterministically constructing the channel close
// summary in the event that the breach arbiter crashes before closing
// the channel.
remoteIdentity btcec.PublicKey
capacity btcutil.Amount
settledBalance btcutil.Amount
selfOutput *breachedOutput
revokedOutput *breachedOutput
@ -674,7 +840,9 @@ type retributionInfo struct {
// the funds within the channel which we are now entitled to due to a breach of
// the channel's contract by the counterparty. This function returns a *fully*
// signed transaction with the witness for each input fully in place.
func (b *breachArbiter) createJusticeTx(r *retributionInfo) (*wire.MsgTx, error) {
func (b *breachArbiter) createJusticeTx(
r *retributionInfo) (*wire.MsgTx, error) {
// First, we obtain a new public key script from the wallet which we'll
// sweep the funds to.
// TODO(roasbeef): possibly create many outputs to minimize change in
@ -684,8 +852,19 @@ func (b *breachArbiter) createJusticeTx(r *retributionInfo) (*wire.MsgTx, error)
return nil, err
}
// Before creating the actual TxOut, we'll need to calculate the proper fee
// to attach to the transaction to ensure a timely confirmation.
r.selfOutput.witnessFunc = r.selfOutput.witnessType.GenWitnessFunc(
&b.wallet.Cfg.Signer, &r.selfOutput.signDescriptor)
r.revokedOutput.witnessFunc = r.revokedOutput.witnessType.GenWitnessFunc(
&b.wallet.Cfg.Signer, &r.revokedOutput.signDescriptor)
for i := range r.htlcOutputs {
r.htlcOutputs[i].witnessFunc = r.htlcOutputs[i].witnessType.GenWitnessFunc(
&b.wallet.Cfg.Signer, &r.htlcOutputs[i].signDescriptor)
}
// Before creating the actual TxOut, we'll need to calculate the proper
// fee to attach to the transaction to ensure a timely confirmation.
// TODO(roasbeef): remove hard-coded fee
totalAmt := r.selfOutput.amt + r.revokedOutput.amt
sweepedAmt := int64(totalAmt - 5000)
@ -711,17 +890,13 @@ func (b *breachArbiter) createJusticeTx(r *retributionInfo) (*wire.MsgTx, error)
// witnesses for both commitment outputs, and all the pending HTLCs at
// this state in the channel's history.
// TODO(roasbeef): handle the 2-layer HTLCs
localWitnessFunc := r.selfOutput.witnessType.GenWitnessFunc(
&b.wallet.Signer, r.selfOutput.signDescriptor)
localWitness, err := localWitnessFunc(justiceTx, hashCache, 0)
localWitness, err := r.selfOutput.witnessFunc(justiceTx, hashCache, 0)
if err != nil {
return nil, err
}
justiceTx.TxIn[0].Witness = localWitness
remoteWitnessFunc := r.revokedOutput.witnessType.GenWitnessFunc(
&b.wallet.Signer, r.revokedOutput.signDescriptor)
remoteWitness, err := remoteWitnessFunc(justiceTx, hashCache, 1)
remoteWitness, err := r.revokedOutput.witnessFunc(justiceTx, hashCache, 1)
if err != nil {
return nil, err
}
@ -738,7 +913,9 @@ func (b *breachArbiter) createJusticeTx(r *retributionInfo) (*wire.MsgTx, error)
// TODO(roasbeef): alternative options
// * leave the output in the chain, use as input to future funding tx
// * leave output in the chain, extend wallet to add knowledge of how to claim
func (b *breachArbiter) craftCommitSweepTx(closeInfo *lnwallet.UnilateralCloseSummary) (*wire.MsgTx, error) {
func (b *breachArbiter) craftCommitSweepTx(
closeInfo *lnwallet.UnilateralCloseSummary) (*wire.MsgTx, error) {
// First, we'll fetch a fresh script that we can use to sweep the funds
// under the control of the wallet.
sweepPkScript, err := newSweepPkScript(b.wallet)
@ -796,33 +973,32 @@ func (b *breachArbiter) craftCommitSweepTx(closeInfo *lnwallet.UnilateralCloseSu
return sweepTx, nil
}
// breachedOutput contains all the information needed to sweep a breached
// output. A breached output is an output that we are now entitled to due to a
// revoked commitment transaction being broadcast.
type breachedOutput struct {
amt btcutil.Amount
outpoint wire.OutPoint
// RetributionStore provides an interface for managing a persistent map from
// wire.OutPoint -> retributionInfo. Upon learning of a breach, a BreachArbiter
// should record the retributionInfo for the breached channel, which serves a
// checkpoint in the event that retribution needs to be resumed after failure.
// A RetributionStore provides an interface for managing the persisted set, as
// well as mapping user defined functions over the entire on-disk contents.
//
// Calls to RetributionStore may occur concurrently. A concrete instance of
// RetributionStore should use appropriate synchronization primitives, or
// be otherwise safe for concurrent access.
type RetributionStore interface {
signDescriptor *lnwallet.SignDescriptor
witnessType lnwallet.WitnessType
// Add persists the retributionInfo to disk, using the information's
// chanPoint as the key. This method should overwrite any existing
// entires found under the same key, and an error should be raised if
// the addition fails.
Add(retInfo *retributionInfo) error
twoStageClaim bool
}
// Remove deletes the retributionInfo from disk, if any exists, under
// the given key. An error should be re raised if the removal fails.
Remove(key *wire.OutPoint) error
// retribution encapsulates all the data needed to sweep all the contested
// funds within a channel whose contract has been breached by the prior
// counterparty. This struct is used to create the justice transaction which
// spends all outputs of the commitment transaction into an output controlled
// by the wallet.
type retributionInfo struct {
commitHash chainhash.Hash
chanPoint wire.OutPoint
selfOutput *breachedOutput
revokedOutput *breachedOutput
htlcOutputs []*breachedOutput
doneChan chan struct{}
// ForAll iterates over the existing on-disk contents and applies a
// chosen, read-only callback to each. This method should ensure that it
// immediately propagate any errors generated by the callback.
ForAll(cb func(*retributionInfo) error) error
}
// retributionStore handles persistence of retribution states to disk and is
@ -844,8 +1020,8 @@ func newRetributionStore(db *channeldb.DB) *retributionStore {
// to disk.
func (rs *retributionStore) Add(ret *retributionInfo) error {
return rs.db.Update(func(tx *bolt.Tx) error {
// If this is our first contract breach, the retributionBucket won't
// exist, in which case, we just create a new bucket.
// If this is our first contract breach, the retributionBucket
// won't exist, in which case, we just create a new bucket.
retBucket, err := tx.CreateBucketIfNotExists(retributionBucket)
if err != nil {
return err
@ -861,7 +1037,10 @@ func (rs *retributionStore) Add(ret *retributionInfo) error {
return err
}
if err := retBucket.Put(outBuf.Bytes(), retBuf.Bytes()); err != nil {
if err := retBucket.Put(
outBuf.Bytes(),
retBuf.Bytes(),
); err != nil {
return err
}
@ -874,12 +1053,13 @@ func (rs *retributionStore) Remove(key *wire.OutPoint) error {
return rs.db.Update(func(tx *bolt.Tx) error {
retBucket := tx.Bucket(retributionBucket)
// We return an error if the bucket is not already created, since normal
// operation of the breach arbiter should never try to remove a
// finalized retribution state that is not already stored in the db.
// We return an error if the bucket is not already created,
// since normal operation of the breach arbiter should never try
// to remove a finalized retribution state that is not already
// stored in the db.
if retBucket == nil {
return errors.New("unable to remove retribution because the " +
"db bucket doesn't exist.")
return errors.New("unable to remove retribution " +
"because the db bucket doesn't exist.")
}
var outBuf bytes.Buffer
@ -899,17 +1079,21 @@ func (rs *retributionStore) Remove(key *wire.OutPoint) error {
// callback function on each retribution.
func (rs *retributionStore) ForAll(cb func(*retributionInfo) error) error {
return rs.db.View(func(tx *bolt.Tx) error {
// If the bucket does not exist, then there are no pending retributions.
// If the bucket does not exist, then there are no pending
// retributions.
retBucket := tx.Bucket(retributionBucket)
if retBucket == nil {
return nil
}
// Otherwise, we fetch each serialized retribution info, deserialize
// it, and execute the passed in callback function on it.
// Otherwise, we fetch each serialized retribution info,
// deserialize it, and execute the passed in callback function
// on it.
return retBucket.ForEach(func(outBytes, retBytes []byte) error {
ret := &retributionInfo{}
if err := ret.Decode(bytes.NewBuffer(retBytes)); err != nil {
if err := ret.Decode(
bytes.NewBuffer(retBytes),
); err != nil {
return err
}
@ -920,6 +1104,8 @@ func (rs *retributionStore) ForAll(cb func(*retributionInfo) error) error {
// Encode serializes the retribution into the passed byte stream.
func (ret *retributionInfo) Encode(w io.Writer) error {
var scratch [8]byte
if _, err := w.Write(ret.commitHash[:]); err != nil {
return err
}
@ -928,6 +1114,21 @@ func (ret *retributionInfo) Encode(w io.Writer) error {
return err
}
if _, err := w.Write(
ret.remoteIdentity.SerializeCompressed()); err != nil {
return err
}
binary.BigEndian.PutUint64(scratch[:8], uint64(ret.capacity))
if _, err := w.Write(scratch[:8]); err != nil {
return err
}
binary.BigEndian.PutUint64(scratch[:8], uint64(ret.settledBalance))
if _, err := w.Write(scratch[:8]); err != nil {
return err
}
if err := ret.selfOutput.Encode(w); err != nil {
return err
}
@ -952,12 +1153,12 @@ func (ret *retributionInfo) Encode(w io.Writer) error {
// Dencode deserializes a retribution from the passed byte stream.
func (ret *retributionInfo) Decode(r io.Reader) error {
var scratch [32]byte
var scratch [33]byte
if _, err := io.ReadFull(r, scratch[:]); err != nil {
if _, err := io.ReadFull(r, scratch[:32]); err != nil {
return err
}
hash, err := chainhash.NewHash(scratch[:])
hash, err := chainhash.NewHash(scratch[:32])
if err != nil {
return err
}
@ -967,6 +1168,26 @@ func (ret *retributionInfo) Decode(r io.Reader) error {
return err
}
if _, err = io.ReadFull(r, scratch[:33]); err != nil {
return err
}
remoteIdentity, err := btcec.ParsePubKey(scratch[:33], btcec.S256())
if err != nil {
return err
}
ret.remoteIdentity = *remoteIdentity
if _, err := io.ReadFull(r, scratch[:8]); err != nil {
return err
}
ret.capacity = btcutil.Amount(binary.BigEndian.Uint64(scratch[:8]))
if _, err := io.ReadFull(r, scratch[:8]); err != nil {
return err
}
ret.settledBalance = btcutil.Amount(
binary.BigEndian.Uint64(scratch[:8]))
ret.selfOutput = &breachedOutput{}
if err := ret.selfOutput.Decode(r); err != nil {
return err
@ -1007,7 +1228,8 @@ func (bo *breachedOutput) Encode(w io.Writer) error {
return err
}
if err := lnwallet.WriteSignDescriptor(w, bo.signDescriptor); err != nil {
if err := lnwallet.WriteSignDescriptor(
w, &bo.signDescriptor); err != nil {
return err
}
@ -1041,16 +1263,16 @@ func (bo *breachedOutput) Decode(r io.Reader) error {
return err
}
signDescriptor := lnwallet.SignDescriptor{}
if err := lnwallet.ReadSignDescriptor(r, &signDescriptor); err != nil {
if err := lnwallet.ReadSignDescriptor(
r, &bo.signDescriptor); err != nil {
return err
}
bo.signDescriptor = &signDescriptor
if _, err := io.ReadFull(r, scratch[:2]); err != nil {
return err
}
bo.witnessType = lnwallet.WitnessType(binary.BigEndian.Uint16(scratch[:2]))
bo.witnessType = lnwallet.WitnessType(
binary.BigEndian.Uint16(scratch[:2]))
if _, err := io.ReadFull(r, scratch[:1]); err != nil {
return err

@ -7,6 +7,7 @@ import (
"io/ioutil"
"os"
"reflect"
"sync"
"testing"
"github.com/lightningnetwork/lnd/channeldb"
@ -77,7 +78,7 @@ var (
breachSignDescs = []lnwallet.SignDescriptor{
{
PrivateTweak: []byte{
SingleTweak: []byte{
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
@ -107,7 +108,7 @@ var (
HashType: txscript.SigHashAll,
},
{
PrivateTweak: []byte{
SingleTweak: []byte{
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
@ -137,7 +138,7 @@ var (
HashType: txscript.SigHashAll,
},
{
PrivateTweak: []byte{
SingleTweak: []byte{
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
@ -199,10 +200,12 @@ var (
0x4f, 0x2f, 0x6f, 0x25, 0x88, 0xa3, 0xef, 0xb9,
0x6a, 0x49, 0x18, 0x83, 0x31, 0x98, 0x47, 0x53,
},
chanPoint: breachOutPoints[0],
selfOutput: &breachedOutputs[0],
revokedOutput: &breachedOutputs[1],
htlcOutputs: []*breachedOutput{},
chanPoint: breachOutPoints[0],
capacity: btcutil.Amount(1e7),
settledBalance: btcutil.Amount(1e7),
selfOutput: &breachedOutputs[0],
revokedOutput: &breachedOutputs[1],
htlcOutputs: []*breachedOutput{},
},
{
commitHash: [chainhash.HashSize]byte{
@ -211,9 +214,11 @@ var (
0x2d, 0xe7, 0x93, 0xe4, 0xb7, 0x25, 0xb8, 0x4d,
0x1f, 0xb, 0x4c, 0xf9, 0x9e, 0xc5, 0x8c, 0xe9,
},
chanPoint: breachOutPoints[1],
selfOutput: &breachedOutputs[0],
revokedOutput: &breachedOutputs[1],
chanPoint: breachOutPoints[1],
capacity: btcutil.Amount(1e7),
settledBalance: btcutil.Amount(1e7),
selfOutput: &breachedOutputs[0],
revokedOutput: &breachedOutputs[1],
htlcOutputs: []*breachedOutput{
&breachedOutputs[1],
&breachedOutputs[2],
@ -224,7 +229,7 @@ var (
// Parse the pubkeys in the breached outputs.
func initBreachedOutputs() error {
for i := 0; i < len(breachedOutputs); i++ {
for i := range breachedOutputs {
bo := &breachedOutputs[i]
// Parse the sign descriptor's pubkey.
@ -234,7 +239,7 @@ func initBreachedOutputs() error {
return fmt.Errorf("unable to parse pubkey: %v", breachKeys[i])
}
sd.PubKey = pubkey
bo.signDescriptor = sd
bo.signDescriptor = *sd
}
return nil
@ -278,6 +283,12 @@ func TestRetributionSerialization(t *testing.T) {
for i := 0; i < len(retributions); i++ {
ret := &retributions[i]
remoteIdentity, err := btcec.ParsePubKey(breachKeys[i], btcec.S256())
if err != nil {
t.Fatalf("unable to parse public key [%v]: %v", i, err)
}
ret.remoteIdentity = *remoteIdentity
var buf bytes.Buffer
if err := ret.Encode(&buf); err != nil {
@ -298,37 +309,104 @@ func TestRetributionSerialization(t *testing.T) {
}
}
// TODO(phlip9): reuse existing function?
// makeTestDB creates a new instance of the ChannelDB for testing purposes. A
// callback which cleans up the created temporary directories is also returned
// and intended to be executed after the test completes.
func makeTestDB() (*channeldb.DB, func(), error) {
var db *channeldb.DB
// copyRetInfo creates a complete copy of the given retributionInfo.
func copyRetInfo(retInfo *retributionInfo) *retributionInfo {
ret := &retributionInfo{
commitHash: retInfo.commitHash,
chanPoint: retInfo.chanPoint,
remoteIdentity: retInfo.remoteIdentity,
capacity: retInfo.capacity,
settledBalance: retInfo.settledBalance,
selfOutput: retInfo.selfOutput,
revokedOutput: retInfo.revokedOutput,
htlcOutputs: make([]*breachedOutput, len(retInfo.htlcOutputs)),
doneChan: make(chan struct{}),
}
for i, htlco := range retInfo.htlcOutputs {
ret.htlcOutputs[i] = htlco
}
return ret
}
// mockRetributionStore implements the RetributionStore interface and is backed
// by an in-memory map. Access to the internal state is provided by a mutex.
// TODO(cfromknecht) extend to support and test controlled failures.
type mockRetributionStore struct {
mu sync.Mutex
state map[wire.OutPoint]*retributionInfo
}
func newMockRetributionStore() *mockRetributionStore {
return &mockRetributionStore{
mu: sync.Mutex{},
state: make(map[wire.OutPoint]*retributionInfo),
}
}
func (rs *mockRetributionStore) Add(retInfo *retributionInfo) error {
rs.mu.Lock()
rs.state[retInfo.chanPoint] = copyRetInfo(retInfo)
rs.mu.Unlock()
return nil
}
func (rs *mockRetributionStore) Remove(key *wire.OutPoint) error {
rs.mu.Lock()
delete(rs.state, *key)
rs.mu.Unlock()
return nil
}
func (rs *mockRetributionStore) ForAll(cb func(*retributionInfo) error) error {
rs.mu.Lock()
defer rs.mu.Unlock()
for _, retInfo := range rs.state {
if err := cb(copyRetInfo(retInfo)); err != nil {
return err
}
}
return nil
}
// TestMockRetributionStore instantiates a mockRetributionStore and tests its
// behavior using the general RetributionStore test suite.
func TestMockRetributionStore(t *testing.T) {
mrs := newMockRetributionStore()
testRetributionStore(mrs, t)
}
// TestChannelDBRetributionStore instantiates a retributionStore backed by a
// channeldb.DB, and tests its behavior using the general RetributionStore test
// suite.
func TestChannelDBRetributionStore(t *testing.T) {
// First, create a temporary directory to be used for the duration of
// this test.
tempDirName, err := ioutil.TempDir("", "channeldb")
if err != nil {
return nil, nil, err
t.Fatalf("unable to initialize temp directory for channeldb: %v", err)
}
defer os.RemoveAll(tempDirName)
// Next, create channeldb for the first time.
db, err = channeldb.Open(tempDirName)
db, err := channeldb.Open(tempDirName)
if err != nil {
return nil, nil, err
t.Fatalf("unable to open channeldb: %v", err)
}
defer db.Close()
cleanUp := func() {
if db != nil {
db.Close()
}
os.RemoveAll(tempDirName)
}
return db, cleanUp, nil
// Finally, instantiate retribution store and execute RetributionStore test
// suite.
rs := newRetributionStore(db)
testRetributionStore(rs, t)
}
func countRetributions(t *testing.T, rs *retributionStore) int {
func countRetributions(t *testing.T, rs RetributionStore) int {
count := 0
err := rs.ForAll(func(_ *retributionInfo) error {
count++
@ -341,32 +419,29 @@ func countRetributions(t *testing.T, rs *retributionStore) int {
}
// Test that the retribution persistence layer works.
func TestRetributionStore(t *testing.T) {
db, cleanUp, err := makeTestDB()
defer cleanUp()
if err != nil {
t.Fatalf("unable to create test db: %v", err)
}
func testRetributionStore(rs RetributionStore, t *testing.T) {
if err := initBreachedOutputs(); err != nil {
t.Fatalf("unable to init breached outputs: %v", err)
}
rs := newRetributionStore(db)
// Make sure that a new retribution store is actually emtpy.
if count := countRetributions(t, rs); count != 0 {
t.Fatalf("expected 0 retributions, found %v", count)
}
// Add some retribution states to the store.
// Add first retribution state to the store.
if err := rs.Add(&retributions[0]); err != nil {
t.Fatalf("unable to add to retribution store: %v", err)
}
// Ensure that the retribution store has one retribution.
if count := countRetributions(t, rs); count != 1 {
t.Fatalf("expected 1 retributions, found %v", count)
}
// Add second retribution state to the store.
if err := rs.Add(&retributions[1]); err != nil {
t.Fatalf("unable to add to retribution store: %v", err)
}
// There should be 2 retributions in the store.
if count := countRetributions(t, rs); count != 2 {
t.Fatalf("expected 2 retributions, found %v", count)
@ -387,6 +462,11 @@ func TestRetributionStore(t *testing.T) {
if err := rs.Remove(&retributions[0].chanPoint); err != nil {
t.Fatalf("unable to remove from retribution store: %v", err)
}
// Ensure that the retribution store has one retribution.
if count := countRetributions(t, rs); count != 1 {
t.Fatalf("expected 1 retributions, found %v", count)
}
if err := rs.Remove(&retributions[1].chanPoint); err != nil {
t.Fatalf("unable to remove from retribution store: %v", err)
}