breacharbiter: sweep incoming + outgoing htlcs

This commit also adds a BreachConfig to abstract
  the instantiation of the breach arbiter, as well
  as various formatting improvements.
This commit is contained in:
Conner Fromknecht 2017-09-01 03:11:14 -07:00
parent b64d4356c1
commit a8d667ba35
No known key found for this signature in database
GPG Key ID: 39DE78FBE6ACB0EF

@ -30,6 +30,52 @@ import (
// continue from the persisted state.
var retributionBucket = []byte("retribution")
// BreachConfig bundles the required subsystems used by the breach arbiter. An
// instance of BreachConfig is passed to newBreachArbiter during instantiation.
type BreachConfig struct {
// Signer is used by the breach arbiter to generate sweep transactions,
// which move coins from previously open channels back to the user's
// wallet.
Signer lnwallet.Signer
// DB provides access to the user's channels, allowing the breach
// arbiter to determine the current state of a user's channels, and how
// it should respond to channel closure.
DB *channeldb.DB
// PublishTransaction facilitates the process of broadcasting a
// transaction to the network.
PublishTransaction func(*wire.MsgTx) error
// Notifier provides a publish/subscribe interface for event driven
// notifications regarding the confirmation of txids.
Notifier chainntnfs.ChainNotifier
// ChainIO is used by the breach arbiter to determine the current height
// of the blockchain, which is required to subscribe for spend
// notifications from Notifier.
ChainIO lnwallet.BlockChainIO
// Estimator is used by the breach arbiter to determine an appropriate
// fee level when generating, signing, and broadcasting sweep
// transactions.
Estimator lnwallet.FeeEstimator
// CloseLink allows the breach arbiter to shutdown any channel links for
// which it detects a breach, ensuring now further activity will
// continue across the link. The method accepts link's channel point and a
// close type to be included in the channel close summary.
CloseLink func(*wire.OutPoint, htlcswitch.ChannelCloseType)
// Store is a persistent resource that maintains information regarding
// breached channels. This is used in conjunction with DB to recover
// from crashes, restarts, or other failures.
Store RetributionStore
// GenSweepScript generates the receiving scripts for swept outputs.
GenSweepScript func() ([]byte, error)
}
// breachArbiter is a special subsystem which is responsible for watching and
// acting on the detection of any attempted uncooperative channel breaches by
// channel counterparties. This file essentially acts as deterrence code for
@ -39,15 +85,7 @@ var retributionBucket = []byte("retribution")
// counterparties.
// TODO(roasbeef): closures in config for subsystem pointers to decouple?
type breachArbiter struct {
wallet *lnwallet.LightningWallet
signer lnwallet.Signer
db *channeldb.DB
notifier chainntnfs.ChainNotifier
chainIO lnwallet.BlockChainIO
estimator lnwallet.FeeEstimator
htlcSwitch *htlcswitch.Switch
retributionStore RetributionStore
cfg *BreachConfig
// breachObservers is a map which tracks all the active breach
// observers we're currently managing. The key of the map is the
@ -82,20 +120,9 @@ type breachArbiter struct {
// newBreachArbiter creates a new instance of a breachArbiter initialized with
// its dependent objects.
func newBreachArbiter(wallet *lnwallet.LightningWallet, db *channeldb.DB,
notifier chainntnfs.ChainNotifier, h *htlcswitch.Switch,
chain lnwallet.BlockChainIO, fe lnwallet.FeeEstimator) *breachArbiter {
func newBreachArbiter(cfg *BreachConfig) *breachArbiter {
return &breachArbiter{
wallet: wallet,
signer: wallet.Cfg.Signer,
db: db,
notifier: notifier,
chainIO: chain,
htlcSwitch: h,
estimator: fe,
retributionStore: newRetributionStore(db),
cfg: cfg,
breachObservers: make(map[wire.OutPoint]chan struct{}),
breachedContracts: make(chan *retributionInfo),
@ -121,7 +148,7 @@ func (b *breachArbiter) Start() error {
// breach is reflected in channeldb.
breachRetInfos := make(map[wire.OutPoint]retributionInfo)
closeSummaries := make(map[wire.OutPoint]channeldb.ChannelCloseSummary)
err := b.retributionStore.ForAll(func(ret *retributionInfo) error {
err := b.cfg.Store.ForAll(func(ret *retributionInfo) error {
// Extract emitted retribution information.
breachRetInfos[ret.chanPoint] = *ret
@ -148,7 +175,7 @@ func (b *breachArbiter) Start() error {
// We need to query that database state for all currently active
// channels, each of these channels will need a goroutine assigned to
// it to watch for channel breaches.
activeChannels, err := b.db.FetchAllChannels()
activeChannels, err := b.cfg.DB.FetchAllChannels()
if err != nil && err != channeldb.ErrNoActiveChannels {
brarLog.Errorf("unable to fetch active channels: %v", err)
return err
@ -185,8 +212,9 @@ func (b *breachArbiter) Start() error {
channelsToWatch := make([]*lnwallet.LightningChannel, 0, nActive)
for _, chanState := range activeChannels {
// Initialize active channel from persisted channel state.
channel, err := lnwallet.NewLightningChannel(nil, b.notifier,
b.estimator, chanState)
channel, err := lnwallet.NewLightningChannel(
nil, b.cfg.Notifier, b.cfg.Estimator, chanState,
)
if err != nil {
brarLog.Errorf("unable to load channel from "+
"disk: %v", err)
@ -205,10 +233,8 @@ func (b *breachArbiter) Start() error {
// notify the HTLC switch that this link should be
// closed, and that all activity on the link should
// cease.
b.htlcSwitch.CloseLink(
&chanState.FundingOutpoint,
htlcswitch.CloseBreach,
)
b.cfg.CloseLink(&chanState.FundingOutpoint,
htlcswitch.CloseBreach)
// Ensure channeldb is consistent with the persisted
// breach.
@ -231,7 +257,7 @@ func (b *breachArbiter) Start() error {
}
// TODO(roasbeef): instead use closure height of channel
_, currentHeight, err := b.chainIO.GetBestBlock()
_, currentHeight, err := b.cfg.ChainIO.GetBestBlock()
if err != nil {
return err
}
@ -249,7 +275,7 @@ func (b *breachArbiter) Start() error {
// Register for a notification when the breach transaction is
// confirmed on chain.
breachTXID := closeSummary.ClosingTXID
confChan, err := b.notifier.RegisterConfirmationsNtfn(
confChan, err := b.cfg.Notifier.RegisterConfirmationsNtfn(
&breachTXID, 1, uint32(currentHeight))
if err != nil {
brarLog.Errorf("unable to register for conf updates "+
@ -293,7 +319,7 @@ func (b *breachArbiter) watchForPendingCloseConfs(currentHeight int32) error {
pendingClose.ChanPoint)
closeTXID := pendingClose.ClosingTXID
confNtfn, err := b.notifier.RegisterConfirmationsNtfn(
confNtfn, err := b.cfg.Notifier.RegisterConfirmationsNtfn(
&closeTXID, 1, uint32(currentHeight),
)
if err != nil {
@ -321,10 +347,10 @@ func (b *breachArbiter) watchForPendingCloseConfs(currentHeight int32) error {
// UnilateralCloseSummary on disk so can
// possibly sweep output here
err := b.db.MarkChanFullyClosed(&chanPoint)
err := b.cfg.DB.MarkChanFullyClosed(&chanPoint)
if err != nil {
brarLog.Errorf("unable to mark channel "+
"as closed: %v", err)
brarLog.Errorf("unable to mark channel"+
" as closed: %v", err)
}
case <-b.quit:
@ -385,7 +411,7 @@ out:
for {
select {
case breachInfo := <-b.breachedContracts:
_, currentHeight, err := b.chainIO.GetBestBlock()
_, currentHeight, err := b.cfg.ChainIO.GetBestBlock()
if err != nil {
brarLog.Errorf("unable to get best height: %v",
err)
@ -397,7 +423,7 @@ out:
// transaction) has been confirmed in the chain to
// ensure we're not dealing with a moving target.
breachTXID := &breachInfo.commitHash
confChan, err := b.notifier.RegisterConfirmationsNtfn(
cfChan, err := b.cfg.Notifier.RegisterConfirmationsNtfn(
breachTXID, 1, uint32(currentHeight),
)
if err != nil {
@ -417,7 +443,7 @@ out:
// retribution after the breach transaction has been
// confirmed.
b.wg.Add(1)
go b.exactRetribution(confChan, breachInfo)
go b.exactRetribution(cfChan, breachInfo)
delete(b.breachObservers, breachInfo.chanPoint)
@ -523,7 +549,7 @@ func (b *breachArbiter) exactRetribution(
return spew.Sdump(justiceTx)
}))
_, currentHeight, err := b.chainIO.GetBestBlock()
_, currentHeight, err := b.cfg.ChainIO.GetBestBlock()
if err != nil {
brarLog.Errorf("unable to get current height: %v", err)
return
@ -531,7 +557,7 @@ func (b *breachArbiter) exactRetribution(
// Finally, broadcast the transaction, finalizing the channels'
// retribution against the cheating counterparty.
if err := b.wallet.PublishTransaction(justiceTx); err != nil {
if err := b.cfg.PublishTransaction(justiceTx); err != nil {
brarLog.Errorf("unable to broadcast "+
"justice tx: %v", err)
return
@ -542,8 +568,8 @@ func (b *breachArbiter) exactRetribution(
// notify the caller that initiated the retribution workflow that the
// deed has been done.
justiceTXID := justiceTx.TxHash()
confChan, err = b.notifier.RegisterConfirmationsNtfn(&justiceTXID, 1,
uint32(currentHeight))
confChan, err = b.cfg.Notifier.RegisterConfirmationsNtfn(
&justiceTXID, 1, uint32(currentHeight))
if err != nil {
brarLog.Errorf("unable to register for conf for txid: %v",
justiceTXID)
@ -566,14 +592,14 @@ func (b *breachArbiter) exactRetribution(
revokedFunds, totalFunds)
// With the channel closed, mark it in the database as such.
err := b.db.MarkChanFullyClosed(&breachInfo.chanPoint)
err := b.cfg.DB.MarkChanFullyClosed(&breachInfo.chanPoint)
if err != nil {
brarLog.Errorf("unable to mark chan as closed: %v", err)
}
// Justice has been carried out; we can safely delete the
// retribution info from the database.
err = b.retributionStore.Remove(&breachInfo.chanPoint)
err = b.cfg.Store.Remove(&breachInfo.chanPoint)
if err != nil {
brarLog.Errorf("unable to remove retribution "+
"from the db: %v", err)
@ -640,7 +666,7 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel,
// TODO(roasbeef): also notify utxoNursery, might've had
// outbound HTLC's in flight
go waitForChanToClose(uint32(closeInfo.SpendingHeight),
b.notifier, nil, chanPoint, closeInfo.SpenderTxHash,
b.cfg.Notifier, nil, chanPoint, closeInfo.SpenderTxHash,
func() {
// As we just detected a channel was closed via
// a unilateral commitment broadcast by the
@ -661,13 +687,11 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel,
goto close
}
brarLog.Infof("Sweeping %v breached "+
brarLog.Infof("Sweeping breached "+
"outputs with: %v",
spew.Sdump(sweepTx))
err = b.wallet.PublishTransaction(
sweepTx,
)
err = b.cfg.PublishTransaction(sweepTx)
if err != nil {
brarLog.Errorf("unable to "+
"broadcast tx: %v", err)
@ -679,7 +703,7 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel,
"is fully closed, updating DB",
chanPoint)
err := b.db.MarkChanFullyClosed(chanPoint)
err := b.cfg.DB.MarkChanFullyClosed(chanPoint)
if err != nil {
brarLog.Errorf("unable to mark chan "+
"as closed: %v", err)
@ -699,7 +723,7 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel,
// breached in order to ensure any incoming or outgoing
// multi-hop HTLCs aren't sent over this link, nor any other
// links associated with this peer.
b.htlcSwitch.CloseLink(chanPoint, htlcswitch.CloseBreach)
b.cfg.CloseLink(chanPoint, htlcswitch.CloseBreach)
// TODO(roasbeef): need to handle case of remote broadcast
// mid-local initiated state-transition, possible
@ -715,7 +739,7 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel,
retInfo := newRetributionInfo(chanPoint, breachInfo, chanInfo)
// Persist the pending retribution state to disk.
if err := b.retributionStore.Add(retInfo); err != nil {
if err := b.cfg.Store.Add(retInfo); err != nil {
brarLog.Errorf("unable to persist retribution info "+
"to db: %v", err)
}
@ -795,7 +819,7 @@ type breachedOutput struct {
amt btcutil.Amount
outpoint wire.OutPoint
witnessType lnwallet.WitnessType
signDesc *lnwallet.SignDescriptor
signDesc lnwallet.SignDescriptor
witnessFunc lnwallet.WitnessGenerator
}
@ -812,7 +836,7 @@ func newBreachedOutput(outpoint *wire.OutPoint,
amt: btcutil.Amount(amount),
outpoint: *outpoint,
witnessType: witnessType,
signDesc: signDescriptor,
signDesc: *signDescriptor,
}
}
@ -841,8 +865,7 @@ func (bo *breachedOutput) BuildWitness(signer lnwallet.Signer,
// been initialized for this breached output.
if bo.witnessFunc == nil {
bo.witnessFunc = bo.witnessType.GenWitnessFunc(
signer,
bo.signDesc,
signer, &bo.signDesc,
)
}
@ -923,13 +946,21 @@ func newRetributionInfo(chanPoint *wire.OutPoint,
// deterministically generate a valid witness for each output. This will
// allow the breach arbiter to recover from failures, in the event that
// it must sign and broadcast the justice transaction.
var htlcOutputs = make([]*breachedOutput, nHtlcs)
htlcOutputs := make([]*breachedOutput, nHtlcs)
for i, breachedHtlc := range breachInfo.HtlcRetributions {
// Using the breachedHtlc's incoming flag, determine the
// appropriate witness type that needs to be generated in order
// to sweep the HTLC output.
var htlcWitnessType lnwallet.WitnessType
if breachedHtlc.IsIncoming {
htlcWitnessType = lnwallet.HtlcAcceptedRevoke
} else {
htlcWitnessType = lnwallet.HtlcOfferedRevoke
}
htlcOutputs[i] = newBreachedOutput(
&breachedHtlc.OutPoint,
lnwallet.CommitmentRevoke,
&breachedHtlc.SignDesc,
)
&breachInfo.HtlcRetributions[i].OutPoint, htlcWitnessType,
&breachInfo.HtlcRetributions[i].SignDesc)
}
// TODO(conner): remove dependency on channel snapshot after decoupling
@ -960,24 +991,35 @@ func (b *breachArbiter) createJusticeTx(
// Assemble the breached outputs into a slice of spendable outputs,
// starting with the self and revoked outputs, then adding any htlc
// outputs.
var breachedOutputs = make([]SpendableOutput, 2+nHtlcs)
breachedOutputs := make([]SpendableOutput, 2+nHtlcs)
breachedOutputs[0] = r.selfOutput
breachedOutputs[1] = r.revokedOutput
for i, htlcOutput := range r.htlcOutputs {
breachedOutputs[2+i] = htlcOutput
}
// Compute the transaction weight of the justice transaction, which
// includes 2 + nHtlcs inputs and one output.
var txWeight uint64
// Begin with a base txn weight, e.g. version, nLockTime, etc.
txWeight += 4*lnwallet.BaseSweepTxSize + lnwallet.WitnessHeaderSize
// Add to_local revoke script and tx input.
txWeight += 4*lnwallet.InputSize + lnwallet.ToLocalPenaltyWitnessSize
// Add to_remote p2wpkh witness and tx input.
txWeight += 4*lnwallet.InputSize + lnwallet.P2WKHWitnessSize
// Add revoked offered-htlc witnesses and tx inputs.
txWeight += uint64(len(r.htlcOutputs)) *
(4*lnwallet.InputSize + lnwallet.OfferedHtlcWitnessSize)
// Compute the appropriate weight contributed by each revoked accepted
// or offered HTLC witnesses and tx inputs.
for _, htlcOutput := range r.htlcOutputs {
switch htlcOutput.witnessType {
case lnwallet.HtlcOfferedRevoke:
txWeight += 4*lnwallet.InputSize +
lnwallet.OfferedHtlcPenaltyWitnessSize
case lnwallet.HtlcAcceptedRevoke:
txWeight += 4*lnwallet.InputSize +
lnwallet.AcceptedHtlcPenaltyWitnessSize
}
}
return b.sweepSpendableOutputsTxn(txWeight, breachedOutputs...)
}
@ -999,6 +1041,8 @@ func (b *breachArbiter) craftCommitSweepTx(
closeInfo.SelfOutputSignDesc,
)
// Compute the transaction weight of the commit sweep transaction, which
// includes a single input and output.
var txWeight uint64
// Begin with a base txn weight, e.g. version, nLockTime, etc.
txWeight += 4*lnwallet.BaseSweepTxSize + lnwallet.WitnessHeaderSize
@ -1017,7 +1061,7 @@ func (b *breachArbiter) sweepSpendableOutputsTxn(txWeight uint64,
// sweep the funds to.
// TODO(roasbeef): possibly create many outputs to minimize change in
// the future?
pkScript, err := newSweepPkScript(b.wallet)
pkScript, err := b.cfg.GenSweepScript()
if err != nil {
return nil, err
}
@ -1028,14 +1072,14 @@ func (b *breachArbiter) sweepSpendableOutputsTxn(txWeight uint64,
totalAmt += input.Amount()
}
feePerWeight := b.estimator.EstimateFeePerWeight(1)
feePerWeight := b.cfg.Estimator.EstimateFeePerWeight(1)
txFee := btcutil.Amount(txWeight * feePerWeight)
sweepAmt := int64(totalAmt - txFee)
// With the fee calculated, we can now create the transaction using the
// information gathered above and the provided retribution information.
var txn = wire.NewMsgTx(2)
txn := wire.NewMsgTx(2)
// We begin by adding the output to which our funds will be deposited.
txn.AddTxOut(&wire.TxOut{
@ -1072,7 +1116,7 @@ func (b *breachArbiter) sweepSpendableOutputsTxn(txWeight uint64,
// transaction using the SpendableOutput's witness generation
// function.
witness, err := so.BuildWitness(
b.wallet.Cfg.Signer, txn, hashCache, idx,
b.cfg.Signer, txn, hashCache, idx,
)
if err != nil {
return err
@ -1339,7 +1383,7 @@ func (bo *breachedOutput) Encode(w io.Writer) error {
return err
}
if err := lnwallet.WriteSignDescriptor(w, bo.signDesc); err != nil {
if err := lnwallet.WriteSignDescriptor(w, &bo.signDesc); err != nil {
return err
}
@ -1364,8 +1408,7 @@ func (bo *breachedOutput) Decode(r io.Reader) error {
return err
}
bo.signDesc = &lnwallet.SignDescriptor{}
if err := lnwallet.ReadSignDescriptor(r, bo.signDesc); err != nil {
if err := lnwallet.ReadSignDescriptor(r, &bo.signDesc); err != nil {
return err
}