From a8d667ba35faaec53c32a340fb5a4da32a004559 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 1 Sep 2017 03:11:14 -0700 Subject: [PATCH] breacharbiter: sweep incoming + outgoing htlcs This commit also adds a BreachConfig to abstract the instantiation of the breach arbiter, as well as various formatting improvements. --- breacharbiter.go | 191 +++++++++++++++++++++++++++++------------------ 1 file changed, 117 insertions(+), 74 deletions(-) diff --git a/breacharbiter.go b/breacharbiter.go index 30ff4f91..acbbded8 100644 --- a/breacharbiter.go +++ b/breacharbiter.go @@ -30,6 +30,52 @@ import ( // continue from the persisted state. var retributionBucket = []byte("retribution") +// BreachConfig bundles the required subsystems used by the breach arbiter. An +// instance of BreachConfig is passed to newBreachArbiter during instantiation. +type BreachConfig struct { + // Signer is used by the breach arbiter to generate sweep transactions, + // which move coins from previously open channels back to the user's + // wallet. + Signer lnwallet.Signer + + // DB provides access to the user's channels, allowing the breach + // arbiter to determine the current state of a user's channels, and how + // it should respond to channel closure. + DB *channeldb.DB + + // PublishTransaction facilitates the process of broadcasting a + // transaction to the network. + PublishTransaction func(*wire.MsgTx) error + + // Notifier provides a publish/subscribe interface for event driven + // notifications regarding the confirmation of txids. + Notifier chainntnfs.ChainNotifier + + // ChainIO is used by the breach arbiter to determine the current height + // of the blockchain, which is required to subscribe for spend + // notifications from Notifier. + ChainIO lnwallet.BlockChainIO + + // Estimator is used by the breach arbiter to determine an appropriate + // fee level when generating, signing, and broadcasting sweep + // transactions. + Estimator lnwallet.FeeEstimator + + // CloseLink allows the breach arbiter to shutdown any channel links for + // which it detects a breach, ensuring now further activity will + // continue across the link. The method accepts link's channel point and a + // close type to be included in the channel close summary. + CloseLink func(*wire.OutPoint, htlcswitch.ChannelCloseType) + + // Store is a persistent resource that maintains information regarding + // breached channels. This is used in conjunction with DB to recover + // from crashes, restarts, or other failures. + Store RetributionStore + + // GenSweepScript generates the receiving scripts for swept outputs. + GenSweepScript func() ([]byte, error) +} + // breachArbiter is a special subsystem which is responsible for watching and // acting on the detection of any attempted uncooperative channel breaches by // channel counterparties. This file essentially acts as deterrence code for @@ -39,15 +85,7 @@ var retributionBucket = []byte("retribution") // counterparties. // TODO(roasbeef): closures in config for subsystem pointers to decouple? type breachArbiter struct { - wallet *lnwallet.LightningWallet - signer lnwallet.Signer - db *channeldb.DB - notifier chainntnfs.ChainNotifier - chainIO lnwallet.BlockChainIO - estimator lnwallet.FeeEstimator - htlcSwitch *htlcswitch.Switch - - retributionStore RetributionStore + cfg *BreachConfig // breachObservers is a map which tracks all the active breach // observers we're currently managing. The key of the map is the @@ -82,20 +120,9 @@ type breachArbiter struct { // newBreachArbiter creates a new instance of a breachArbiter initialized with // its dependent objects. -func newBreachArbiter(wallet *lnwallet.LightningWallet, db *channeldb.DB, - notifier chainntnfs.ChainNotifier, h *htlcswitch.Switch, - chain lnwallet.BlockChainIO, fe lnwallet.FeeEstimator) *breachArbiter { - +func newBreachArbiter(cfg *BreachConfig) *breachArbiter { return &breachArbiter{ - wallet: wallet, - signer: wallet.Cfg.Signer, - db: db, - notifier: notifier, - chainIO: chain, - htlcSwitch: h, - estimator: fe, - - retributionStore: newRetributionStore(db), + cfg: cfg, breachObservers: make(map[wire.OutPoint]chan struct{}), breachedContracts: make(chan *retributionInfo), @@ -121,7 +148,7 @@ func (b *breachArbiter) Start() error { // breach is reflected in channeldb. breachRetInfos := make(map[wire.OutPoint]retributionInfo) closeSummaries := make(map[wire.OutPoint]channeldb.ChannelCloseSummary) - err := b.retributionStore.ForAll(func(ret *retributionInfo) error { + err := b.cfg.Store.ForAll(func(ret *retributionInfo) error { // Extract emitted retribution information. breachRetInfos[ret.chanPoint] = *ret @@ -148,7 +175,7 @@ func (b *breachArbiter) Start() error { // We need to query that database state for all currently active // channels, each of these channels will need a goroutine assigned to // it to watch for channel breaches. - activeChannels, err := b.db.FetchAllChannels() + activeChannels, err := b.cfg.DB.FetchAllChannels() if err != nil && err != channeldb.ErrNoActiveChannels { brarLog.Errorf("unable to fetch active channels: %v", err) return err @@ -185,8 +212,9 @@ func (b *breachArbiter) Start() error { channelsToWatch := make([]*lnwallet.LightningChannel, 0, nActive) for _, chanState := range activeChannels { // Initialize active channel from persisted channel state. - channel, err := lnwallet.NewLightningChannel(nil, b.notifier, - b.estimator, chanState) + channel, err := lnwallet.NewLightningChannel( + nil, b.cfg.Notifier, b.cfg.Estimator, chanState, + ) if err != nil { brarLog.Errorf("unable to load channel from "+ "disk: %v", err) @@ -205,10 +233,8 @@ func (b *breachArbiter) Start() error { // notify the HTLC switch that this link should be // closed, and that all activity on the link should // cease. - b.htlcSwitch.CloseLink( - &chanState.FundingOutpoint, - htlcswitch.CloseBreach, - ) + b.cfg.CloseLink(&chanState.FundingOutpoint, + htlcswitch.CloseBreach) // Ensure channeldb is consistent with the persisted // breach. @@ -231,7 +257,7 @@ func (b *breachArbiter) Start() error { } // TODO(roasbeef): instead use closure height of channel - _, currentHeight, err := b.chainIO.GetBestBlock() + _, currentHeight, err := b.cfg.ChainIO.GetBestBlock() if err != nil { return err } @@ -249,7 +275,7 @@ func (b *breachArbiter) Start() error { // Register for a notification when the breach transaction is // confirmed on chain. breachTXID := closeSummary.ClosingTXID - confChan, err := b.notifier.RegisterConfirmationsNtfn( + confChan, err := b.cfg.Notifier.RegisterConfirmationsNtfn( &breachTXID, 1, uint32(currentHeight)) if err != nil { brarLog.Errorf("unable to register for conf updates "+ @@ -293,7 +319,7 @@ func (b *breachArbiter) watchForPendingCloseConfs(currentHeight int32) error { pendingClose.ChanPoint) closeTXID := pendingClose.ClosingTXID - confNtfn, err := b.notifier.RegisterConfirmationsNtfn( + confNtfn, err := b.cfg.Notifier.RegisterConfirmationsNtfn( &closeTXID, 1, uint32(currentHeight), ) if err != nil { @@ -321,10 +347,10 @@ func (b *breachArbiter) watchForPendingCloseConfs(currentHeight int32) error { // UnilateralCloseSummary on disk so can // possibly sweep output here - err := b.db.MarkChanFullyClosed(&chanPoint) + err := b.cfg.DB.MarkChanFullyClosed(&chanPoint) if err != nil { - brarLog.Errorf("unable to mark channel "+ - "as closed: %v", err) + brarLog.Errorf("unable to mark channel"+ + " as closed: %v", err) } case <-b.quit: @@ -385,7 +411,7 @@ out: for { select { case breachInfo := <-b.breachedContracts: - _, currentHeight, err := b.chainIO.GetBestBlock() + _, currentHeight, err := b.cfg.ChainIO.GetBestBlock() if err != nil { brarLog.Errorf("unable to get best height: %v", err) @@ -397,7 +423,7 @@ out: // transaction) has been confirmed in the chain to // ensure we're not dealing with a moving target. breachTXID := &breachInfo.commitHash - confChan, err := b.notifier.RegisterConfirmationsNtfn( + cfChan, err := b.cfg.Notifier.RegisterConfirmationsNtfn( breachTXID, 1, uint32(currentHeight), ) if err != nil { @@ -417,7 +443,7 @@ out: // retribution after the breach transaction has been // confirmed. b.wg.Add(1) - go b.exactRetribution(confChan, breachInfo) + go b.exactRetribution(cfChan, breachInfo) delete(b.breachObservers, breachInfo.chanPoint) @@ -523,7 +549,7 @@ func (b *breachArbiter) exactRetribution( return spew.Sdump(justiceTx) })) - _, currentHeight, err := b.chainIO.GetBestBlock() + _, currentHeight, err := b.cfg.ChainIO.GetBestBlock() if err != nil { brarLog.Errorf("unable to get current height: %v", err) return @@ -531,7 +557,7 @@ func (b *breachArbiter) exactRetribution( // Finally, broadcast the transaction, finalizing the channels' // retribution against the cheating counterparty. - if err := b.wallet.PublishTransaction(justiceTx); err != nil { + if err := b.cfg.PublishTransaction(justiceTx); err != nil { brarLog.Errorf("unable to broadcast "+ "justice tx: %v", err) return @@ -542,8 +568,8 @@ func (b *breachArbiter) exactRetribution( // notify the caller that initiated the retribution workflow that the // deed has been done. justiceTXID := justiceTx.TxHash() - confChan, err = b.notifier.RegisterConfirmationsNtfn(&justiceTXID, 1, - uint32(currentHeight)) + confChan, err = b.cfg.Notifier.RegisterConfirmationsNtfn( + &justiceTXID, 1, uint32(currentHeight)) if err != nil { brarLog.Errorf("unable to register for conf for txid: %v", justiceTXID) @@ -566,14 +592,14 @@ func (b *breachArbiter) exactRetribution( revokedFunds, totalFunds) // With the channel closed, mark it in the database as such. - err := b.db.MarkChanFullyClosed(&breachInfo.chanPoint) + err := b.cfg.DB.MarkChanFullyClosed(&breachInfo.chanPoint) if err != nil { brarLog.Errorf("unable to mark chan as closed: %v", err) } // Justice has been carried out; we can safely delete the // retribution info from the database. - err = b.retributionStore.Remove(&breachInfo.chanPoint) + err = b.cfg.Store.Remove(&breachInfo.chanPoint) if err != nil { brarLog.Errorf("unable to remove retribution "+ "from the db: %v", err) @@ -640,7 +666,7 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel, // TODO(roasbeef): also notify utxoNursery, might've had // outbound HTLC's in flight go waitForChanToClose(uint32(closeInfo.SpendingHeight), - b.notifier, nil, chanPoint, closeInfo.SpenderTxHash, + b.cfg.Notifier, nil, chanPoint, closeInfo.SpenderTxHash, func() { // As we just detected a channel was closed via // a unilateral commitment broadcast by the @@ -661,13 +687,11 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel, goto close } - brarLog.Infof("Sweeping %v breached "+ + brarLog.Infof("Sweeping breached "+ "outputs with: %v", spew.Sdump(sweepTx)) - err = b.wallet.PublishTransaction( - sweepTx, - ) + err = b.cfg.PublishTransaction(sweepTx) if err != nil { brarLog.Errorf("unable to "+ "broadcast tx: %v", err) @@ -679,7 +703,7 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel, "is fully closed, updating DB", chanPoint) - err := b.db.MarkChanFullyClosed(chanPoint) + err := b.cfg.DB.MarkChanFullyClosed(chanPoint) if err != nil { brarLog.Errorf("unable to mark chan "+ "as closed: %v", err) @@ -699,7 +723,7 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel, // breached in order to ensure any incoming or outgoing // multi-hop HTLCs aren't sent over this link, nor any other // links associated with this peer. - b.htlcSwitch.CloseLink(chanPoint, htlcswitch.CloseBreach) + b.cfg.CloseLink(chanPoint, htlcswitch.CloseBreach) // TODO(roasbeef): need to handle case of remote broadcast // mid-local initiated state-transition, possible @@ -715,7 +739,7 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel, retInfo := newRetributionInfo(chanPoint, breachInfo, chanInfo) // Persist the pending retribution state to disk. - if err := b.retributionStore.Add(retInfo); err != nil { + if err := b.cfg.Store.Add(retInfo); err != nil { brarLog.Errorf("unable to persist retribution info "+ "to db: %v", err) } @@ -795,7 +819,7 @@ type breachedOutput struct { amt btcutil.Amount outpoint wire.OutPoint witnessType lnwallet.WitnessType - signDesc *lnwallet.SignDescriptor + signDesc lnwallet.SignDescriptor witnessFunc lnwallet.WitnessGenerator } @@ -812,7 +836,7 @@ func newBreachedOutput(outpoint *wire.OutPoint, amt: btcutil.Amount(amount), outpoint: *outpoint, witnessType: witnessType, - signDesc: signDescriptor, + signDesc: *signDescriptor, } } @@ -841,8 +865,7 @@ func (bo *breachedOutput) BuildWitness(signer lnwallet.Signer, // been initialized for this breached output. if bo.witnessFunc == nil { bo.witnessFunc = bo.witnessType.GenWitnessFunc( - signer, - bo.signDesc, + signer, &bo.signDesc, ) } @@ -923,13 +946,21 @@ func newRetributionInfo(chanPoint *wire.OutPoint, // deterministically generate a valid witness for each output. This will // allow the breach arbiter to recover from failures, in the event that // it must sign and broadcast the justice transaction. - var htlcOutputs = make([]*breachedOutput, nHtlcs) + htlcOutputs := make([]*breachedOutput, nHtlcs) for i, breachedHtlc := range breachInfo.HtlcRetributions { + // Using the breachedHtlc's incoming flag, determine the + // appropriate witness type that needs to be generated in order + // to sweep the HTLC output. + var htlcWitnessType lnwallet.WitnessType + if breachedHtlc.IsIncoming { + htlcWitnessType = lnwallet.HtlcAcceptedRevoke + } else { + htlcWitnessType = lnwallet.HtlcOfferedRevoke + } + htlcOutputs[i] = newBreachedOutput( - &breachedHtlc.OutPoint, - lnwallet.CommitmentRevoke, - &breachedHtlc.SignDesc, - ) + &breachInfo.HtlcRetributions[i].OutPoint, htlcWitnessType, + &breachInfo.HtlcRetributions[i].SignDesc) } // TODO(conner): remove dependency on channel snapshot after decoupling @@ -960,24 +991,35 @@ func (b *breachArbiter) createJusticeTx( // Assemble the breached outputs into a slice of spendable outputs, // starting with the self and revoked outputs, then adding any htlc // outputs. - var breachedOutputs = make([]SpendableOutput, 2+nHtlcs) + breachedOutputs := make([]SpendableOutput, 2+nHtlcs) breachedOutputs[0] = r.selfOutput breachedOutputs[1] = r.revokedOutput for i, htlcOutput := range r.htlcOutputs { breachedOutputs[2+i] = htlcOutput } + // Compute the transaction weight of the justice transaction, which + // includes 2 + nHtlcs inputs and one output. var txWeight uint64 // Begin with a base txn weight, e.g. version, nLockTime, etc. txWeight += 4*lnwallet.BaseSweepTxSize + lnwallet.WitnessHeaderSize - // Add to_local revoke script and tx input. txWeight += 4*lnwallet.InputSize + lnwallet.ToLocalPenaltyWitnessSize // Add to_remote p2wpkh witness and tx input. txWeight += 4*lnwallet.InputSize + lnwallet.P2WKHWitnessSize - // Add revoked offered-htlc witnesses and tx inputs. - txWeight += uint64(len(r.htlcOutputs)) * - (4*lnwallet.InputSize + lnwallet.OfferedHtlcWitnessSize) + + // Compute the appropriate weight contributed by each revoked accepted + // or offered HTLC witnesses and tx inputs. + for _, htlcOutput := range r.htlcOutputs { + switch htlcOutput.witnessType { + case lnwallet.HtlcOfferedRevoke: + txWeight += 4*lnwallet.InputSize + + lnwallet.OfferedHtlcPenaltyWitnessSize + case lnwallet.HtlcAcceptedRevoke: + txWeight += 4*lnwallet.InputSize + + lnwallet.AcceptedHtlcPenaltyWitnessSize + } + } return b.sweepSpendableOutputsTxn(txWeight, breachedOutputs...) } @@ -999,6 +1041,8 @@ func (b *breachArbiter) craftCommitSweepTx( closeInfo.SelfOutputSignDesc, ) + // Compute the transaction weight of the commit sweep transaction, which + // includes a single input and output. var txWeight uint64 // Begin with a base txn weight, e.g. version, nLockTime, etc. txWeight += 4*lnwallet.BaseSweepTxSize + lnwallet.WitnessHeaderSize @@ -1017,7 +1061,7 @@ func (b *breachArbiter) sweepSpendableOutputsTxn(txWeight uint64, // sweep the funds to. // TODO(roasbeef): possibly create many outputs to minimize change in // the future? - pkScript, err := newSweepPkScript(b.wallet) + pkScript, err := b.cfg.GenSweepScript() if err != nil { return nil, err } @@ -1028,14 +1072,14 @@ func (b *breachArbiter) sweepSpendableOutputsTxn(txWeight uint64, totalAmt += input.Amount() } - feePerWeight := b.estimator.EstimateFeePerWeight(1) + feePerWeight := b.cfg.Estimator.EstimateFeePerWeight(1) txFee := btcutil.Amount(txWeight * feePerWeight) sweepAmt := int64(totalAmt - txFee) // With the fee calculated, we can now create the transaction using the // information gathered above and the provided retribution information. - var txn = wire.NewMsgTx(2) + txn := wire.NewMsgTx(2) // We begin by adding the output to which our funds will be deposited. txn.AddTxOut(&wire.TxOut{ @@ -1072,7 +1116,7 @@ func (b *breachArbiter) sweepSpendableOutputsTxn(txWeight uint64, // transaction using the SpendableOutput's witness generation // function. witness, err := so.BuildWitness( - b.wallet.Cfg.Signer, txn, hashCache, idx, + b.cfg.Signer, txn, hashCache, idx, ) if err != nil { return err @@ -1339,7 +1383,7 @@ func (bo *breachedOutput) Encode(w io.Writer) error { return err } - if err := lnwallet.WriteSignDescriptor(w, bo.signDesc); err != nil { + if err := lnwallet.WriteSignDescriptor(w, &bo.signDesc); err != nil { return err } @@ -1364,8 +1408,7 @@ func (bo *breachedOutput) Decode(r io.Reader) error { return err } - bo.signDesc = &lnwallet.SignDescriptor{} - if err := lnwallet.ReadSignDescriptor(r, bo.signDesc); err != nil { + if err := lnwallet.ReadSignDescriptor(r, &bo.signDesc); err != nil { return err }