breacharbiter: add retribution state persistence

This commit adds a breached contract retribution storage layer using
boltdb to the breach arbiter. The breach arbiter now stores retribution
state on disk between detecting a contract breach, broadcasting a
justice transaction that sweeps the channel, and finally witnessing the
justice transaction confirm on the blockchain. It is critical that such
state is persisted on disk, so that if our node restarts at any point
during the retribution procedure, we can recover and continue from the
persisted state.
This commit is contained in:
Philip Hayes 2017-05-07 04:09:22 -07:00 committed by Olaoluwa Osuntokun
parent d8c56433e3
commit b19c483a04
3 changed files with 916 additions and 182 deletions

View File

@ -1,21 +1,35 @@
package main
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"sync"
"sync/atomic"
"github.com/boltdb/bolt"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/chaincfg/chainhash"
"github.com/roasbeef/btcd/txscript"
"github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil"
)
// retributionBucket stores retribution state on disk between detecting a
// contract breach, broadcasting a justice transaction that sweeps the channel,
// and finally witnessing the justice transaction confirm on the blockchain. It
// is critical that such state is persisted on disk, so that if our node
// restarts at any point during the retribution procedure, we can recover and
// continue from the persisted state.
var retributionBucket = []byte("ret")
// breachArbiter is a special subsystem which is responsible for watching and
// acting on the detection of any attempted uncooperative channel breaches by
// channel counterparties. This file essentially acts as deterrence code for
@ -25,12 +39,11 @@ import (
// counterparties.
// TODO(roasbeef): closures in config for subsystem pointers to decouple?
type breachArbiter struct {
wallet *lnwallet.LightningWallet
db *channeldb.DB
notifier chainntnfs.ChainNotifier
chainIO lnwallet.BlockChainIO
estimator lnwallet.FeeEstimator
htlcSwitch *htlcswitch.Switch
wallet *lnwallet.LightningWallet
notifier chainntnfs.ChainNotifier
htlcSwitch *htlcSwitch
db *channeldb.DB
retributionStore *retributionStore
// breachObservers is a map which tracks all the active breach
// observers we're currently managing. The key of the map is the
@ -44,7 +57,7 @@ type breachArbiter struct {
// struct to send the necessary information required to punish a
// counterparty once a channel breach is detected. Breach observers
// use this to communicate with the main contractObserver goroutine.
breachedContracts chan *retributionInfo
breachedContracts chan *retribution
// newContracts is a channel which is used by outside subsystems to
// notify the breachArbiter of a new contract (a channel) that should
@ -70,15 +83,14 @@ func newBreachArbiter(wallet *lnwallet.LightningWallet, db *channeldb.DB,
chain lnwallet.BlockChainIO, fe lnwallet.FeeEstimator) *breachArbiter {
return &breachArbiter{
wallet: wallet,
db: db,
notifier: notifier,
chainIO: chain,
htlcSwitch: h,
estimator: fe,
wallet: wallet,
notifier: notifier,
htlcSwitch: h,
db: db,
retributionStore: newRetributionStore(db),
breachObservers: make(map[wire.OutPoint]chan struct{}),
breachedContracts: make(chan *retributionInfo),
breachedContracts: make(chan *retribution),
newContracts: make(chan *lnwallet.LightningChannel),
settledContracts: make(chan *wire.OutPoint),
quit: make(chan struct{}),
@ -94,7 +106,31 @@ func (b *breachArbiter) Start() error {
brarLog.Tracef("Starting breach arbiter")
// First we need to query that database state for all currently active
// We load any pending retributions from the database. For each retribution
// we need to restart the retribution procedure to claim our just reward.
err := b.retributionStore.ForAll(func(ret *retribution) error {
// Register for a notification when the breach transaction is confirmed
// on chain.
breachTXID := &ret.commitHash
confChan, err := b.notifier.RegisterConfirmationsNtfn(breachTXID, 1)
if err != nil {
brarLog.Errorf("unable to register for conf updates for txid: "+
"%v, err: %v", breachTXID, err)
return err
}
// Launch a new goroutine which to finalize the channel retribution
// after the breach transaction confirms.
b.wg.Add(1)
go b.exactRetribution(confChan, ret)
return nil
})
if err != nil {
return err
}
// We need to query that database state for all currently active
// channels, each of these channels will need a goroutine assigned to
// it to watch for channel breaches.
activeChannels, err := b.db.FetchAllChannels()
@ -248,19 +284,24 @@ out:
breachTXID, 1, uint32(currentHeight),
)
if err != nil {
brarLog.Errorf("unable to register for conf for txid: %v",
breachTXID)
brarLog.Errorf("unable to register for conf updates for txid: "+
"%v, err: %v", breachTXID, err)
continue
}
brarLog.Warnf("A channel has been breached with tx: %v. "+
brarLog.Warnf("A channel has been breached with txid: %v. "+
"Waiting for confirmation, then justice will be served!",
breachTXID)
// With the notification registered, we launch a new
// goroutine which will finalize the channel
// retribution after the breach transaction has been
// confirmed.
// Persist the pending retribution state to disk.
if err := b.retributionStore.Add(breachInfo); err != nil {
brarLog.Errorf("unable to persist breach info to db: %v", err)
continue
}
// With the notification registered and retribution state persisted,
// we launch a new goroutine which will finalize the channel
// retribution after the breach transaction has been confirmed.
b.wg.Add(1)
go b.exactRetribution(confChan, breachInfo)
@ -322,108 +363,6 @@ out:
return
}
// exactRetribution is a goroutine which is executed once a contract breach has
// been detected by a breachObserver. This function is responsible for
// punishing a counterparty for violating the channel contract by sweeping ALL
// the lingering funds within the channel into the daemon's wallet.
//
// NOTE: This MUST be run as a goroutine.
func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent,
breachInfo *retributionInfo) {
defer b.wg.Done()
// TODO(roasbeef): state needs to be checkpointed here
select {
case _, ok := <-confChan.Confirmed:
// If the second value is !ok, then the channel has been closed
// signifying a daemon shutdown, so we exit.
if !ok {
return
}
// Otherwise, if this is a real confirmation notification, then
// we fall through to complete our duty.
case <-b.quit:
return
}
brarLog.Debugf("Breach transaction %v has been confirmed, sweeping "+
"revoked funds", breachInfo.commitHash)
// With the breach transaction confirmed, we now create the justice tx
// which will claim ALL the funds within the channel.
justiceTx, err := b.createJusticeTx(breachInfo)
if err != nil {
brarLog.Errorf("unable to create justice tx: %v", err)
return
}
brarLog.Debugf("Broadcasting justice tx: %v", newLogClosure(func() string {
return spew.Sdump(justiceTx)
}))
_, currentHeight, err := b.chainIO.GetBestBlock()
if err != nil {
brarLog.Errorf("unable to get current height: %v", err)
return
}
// Finally, broadcast the transaction, finalizing the channels'
// retribution against the cheating counterparty.
if err := b.wallet.PublishTransaction(justiceTx); err != nil {
brarLog.Errorf("unable to broadcast "+
"justice tx: %v", err)
return
}
// As a conclusionary step, we register for a notification to be
// dispatched once the justice tx is confirmed. After confirmation we
// notify the caller that initiated the retribution workflow that the
// deed has been done.
justiceTXID := justiceTx.TxHash()
confChan, err = b.notifier.RegisterConfirmationsNtfn(&justiceTXID, 1,
uint32(currentHeight))
if err != nil {
brarLog.Errorf("unable to register for conf for txid: %v",
justiceTXID)
return
}
select {
case _, ok := <-confChan.Confirmed:
if !ok {
return
}
// TODO(roasbeef): factor in HTLCs
revokedFunds := breachInfo.revokedOutput.amt
totalFunds := revokedFunds + breachInfo.selfOutput.amt
brarLog.Infof("Justice for ChannelPoint(%v) has "+
"been served, %v revoked funds (%v total) "+
"have been claimed", breachInfo.chanPoint,
revokedFunds, totalFunds)
// With the channel closed, mark it in the database as such.
err := b.db.MarkChanFullyClosed(&breachInfo.chanPoint)
if err != nil {
brarLog.Errorf("unable to mark chan as closed: %v", err)
}
// TODO(roasbeef): add peer to blacklist?
// TODO(roasbeef): close other active channels with offending peer
close(breachInfo.doneChan)
return
case <-b.quit:
return
}
}
// breachObserver notifies the breachArbiter contract observer goroutine that a
// channel's contract has been breached by the prior counterparty. Once
// notified the breachArbiter will attempt to sweep ALL funds within the
@ -559,26 +498,28 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel,
return lnwallet.CommitSpendRevoke(b.wallet.Cfg.Signer, &desc, tx)
}
// Finally, with the two witness generation funcs created, we
// send the retribution information to the utxo nursery.
// Finally, we send the retribution information into the breachArbiter
// event loop to deal swift justice.
// TODO(roasbeef): populate htlc breaches
b.breachedContracts <- &retributionInfo{
b.breachedContracts <- &retribution{
commitHash: breachInfo.BreachTransaction.TxHash(),
chanPoint: *chanPoint,
selfOutput: &breachedOutput{
amt: btcutil.Amount(localSignDesc.Output.Value),
outpoint: breachInfo.LocalOutpoint,
witnessFunc: localWitness,
amt: btcutil.Amount(localSignDesc.Output.Value),
outpoint: breachInfo.LocalOutpoint,
signDescriptor: localSignDesc,
witnessType: localWitnessType,
},
revokedOutput: &breachedOutput{
amt: btcutil.Amount(remoteSignDesc.Output.Value),
outpoint: breachInfo.RemoteOutpoint,
witnessFunc: remoteWitness,
amt: btcutil.Amount(remoteSignDesc.Output.Value),
outpoint: breachInfo.RemoteOutpoint,
signDescriptor: remoteSignDesc,
witnessType: remoteWitnessType,
},
doneChan: make(chan struct{}),
htlcOutputs: []*breachedOutput{},
}
case <-b.quit:
@ -586,40 +527,114 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel,
}
}
// breachedOutput contains all the information needed to sweep a breached
// output. A breached output is an output that we are now entitled to due to a
// revoked commitment transaction being broadcast.
type breachedOutput struct {
amt btcutil.Amount
outpoint wire.OutPoint
witnessFunc witnessGenerator
// exactRetribution is a goroutine which is executed once a contract breach has
// been detected by a breachObserver. This function is responsible for
// punishing a counterparty for violating the channel contract by sweeping ALL
// the lingering funds within the channel into the daemon's wallet.
//
// NOTE: This MUST be run as a goroutine.
func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent,
breachInfo *retributionInfo) {
twoStageClaim bool
}
defer b.wg.Done()
// retributionInfo encapsulates all the data needed to sweep all the contested
// funds within a channel whose contract has been breached by the prior
// counterparty. This struct is used by the utxoNursery to create the justice
// transaction which spends all outputs of the commitment transaction into an
// output controlled by the wallet.
type retributionInfo struct {
commitHash chainhash.Hash
chanPoint wire.OutPoint
// TODO(roasbeef): state needs to be checkpointed here
selfOutput *breachedOutput
select {
case _, ok := <-confChan.Confirmed:
// If the second value is !ok, then the channel has been closed
// signifying a daemon shutdown, so we exit.
if !ok {
return
}
revokedOutput *breachedOutput
// Otherwise, if this is a real confirmation notification, then
// we fall through to complete our duty.
case <-b.quit:
return
}
htlcOutputs *[]breachedOutput
brarLog.Debugf("Breach transaction %v has been confirmed, sweeping "+
"revoked funds", breachInfo.commitHash)
doneChan chan struct{}
// With the breach transaction confirmed, we now create the justice tx
// which will claim ALL the funds within the channel.
justiceTx, err := b.createJusticeTx(breachInfo)
if err != nil {
brarLog.Errorf("unable to create justice tx: %v", err)
return
}
brarLog.Debugf("Broadcasting justice tx: %v", newLogClosure(func() string {
return spew.Sdump(justiceTx)
}))
_, currentHeight, err := b.chainIO.GetBestBlock()
if err != nil {
brarLog.Errorf("unable to get current height: %v", err)
return
}
// Finally, broadcast the transaction, finalizing the channels'
// retribution against the cheating counterparty.
if err := b.wallet.PublishTransaction(justiceTx); err != nil {
brarLog.Errorf("unable to broadcast "+
"justice tx: %v", err)
return
}
// As a conclusionary step, we register for a notification to be
// dispatched once the justice tx is confirmed. After confirmation we
// notify the caller that initiated the retribution workflow that the
// deed has been done.
justiceTXID := justiceTx.TxHash()
confChan, err = b.notifier.RegisterConfirmationsNtfn(&justiceTXID, 1,
uint32(currentHeight))
if err != nil {
brarLog.Errorf("unable to register for conf for txid: %v",
justiceTXID)
return
}
select {
case _, ok := <-confChan.Confirmed:
if !ok {
return
}
// TODO(roasbeef): factor in HTLCs
revokedFunds := breachInfo.revokedOutput.amt
totalFunds := revokedFunds + breachInfo.selfOutput.amt
brarLog.Infof("Justice for ChannelPoint(%v) has "+
"been served, %v revoked funds (%v total) "+
"have been claimed", breachInfo.chanPoint,
revokedFunds, totalFunds)
// With the channel closed, mark it in the database as such.
err := b.db.MarkChanFullyClosed(&breachInfo.chanPoint)
if err != nil {
brarLog.Errorf("unable to mark chan as closed: %v", err)
}
// TODO(roasbeef): add peer to blacklist?
// TODO(roasbeef): close other active channels with offending peer
close(breachInfo.doneChan)
return
case <-b.quit:
return
}
}
// createJusticeTx creates a transaction which exacts "justice" by sweeping ALL
// the funds within the channel which we are now entitled to due to a breach of
// the channel's contract by the counterparty. This function returns a *fully*
// signed transaction with the witness for each input fully in place.
func (b *breachArbiter) createJusticeTx(r *retributionInfo) (*wire.MsgTx, error) {
func (b *breachArbiter) createJusticeTx(r *retribution) (*wire.MsgTx, error) {
// First, we obtain a new public key script from the wallet which we'll
// sweep the funds to.
// TODO(roasbeef): possibly create many outputs to minimize change in
@ -656,13 +671,17 @@ func (b *breachArbiter) createJusticeTx(r *retributionInfo) (*wire.MsgTx, error)
// witnesses for both commitment outputs, and all the pending HTLCs at
// this state in the channel's history.
// TODO(roasbeef): handle the 2-layer HTLCs
localWitness, err := r.selfOutput.witnessFunc(justiceTx, hashCache, 0)
localWitnessFunc := r.selfOutput.witnessType.GenWitnessFunc(
&b.wallet.Signer, r.selfOutput.signDescriptor)
localWitness, err := localWitnessFunc(justiceTx, hashCache, 0)
if err != nil {
return nil, err
}
justiceTx.TxIn[0].Witness = localWitness
remoteWitness, err := r.revokedOutput.witnessFunc(justiceTx, hashCache, 1)
remoteWitnessFunc := r.revokedOutput.witnessType.GenWitnessFunc(
&b.wallet.Signer, r.revokedOutput.signDescriptor)
remoteWitness, err := remoteWitnessFunc(justiceTx, hashCache, 1)
if err != nil {
return nil, err
}
@ -736,3 +755,269 @@ func (b *breachArbiter) craftCommitSweepTx(closeInfo *lnwallet.UnilateralCloseSu
return sweepTx, nil
}
// breachedOutput contains all the information needed to sweep a breached
// output. A breached output is an output that we are now entitled to due to a
// revoked commitment transaction being broadcast.
type breachedOutput struct {
amt btcutil.Amount
outpoint wire.OutPoint
signDescriptor *lnwallet.SignDescriptor
witnessType lnwallet.WitnessType
twoStageClaim bool
}
// retribution encapsulates all the data needed to sweep all the contested
// funds within a channel whose contract has been breached by the prior
// counterparty. This struct is used to create the justice transaction which
// spends all outputs of the commitment transaction into an output controlled
// by the wallet.
type retribution struct {
commitHash chainhash.Hash
chanPoint wire.OutPoint
selfOutput *breachedOutput
revokedOutput *breachedOutput
htlcOutputs []*breachedOutput
}
// retributionStore handles persistence of retribution states to disk and is
// backed by a boltdb bucket. The primary responsibility of the retribution
// store is to ensure that we can recover from a restart in the middle of a
// breached contract retribution.
type retributionStore struct {
db *channeldb.DB
}
// newRetributionStore creates a new instance of a retributionStore.
func newRetributionStore(db *channeldb.DB) *retributionStore {
return &retributionStore{
db: db,
}
}
// Add adds a retribution state to the retributionStore, which is then persisted
// to disk.
func (rs *retributionStore) Add(ret *retribution) error {
return rs.db.Update(func(tx *bolt.Tx) error {
// If this is our first contract breach, the retributionBucket won't
// exist, in which case, we just create a new bucket.
retBucket, err := tx.CreateBucketIfNotExists(retributionBucket)
if err != nil {
return err
}
var outBuf bytes.Buffer
if err := lnwire.WriteOutPoint(&outBuf, &ret.chanPoint); err != nil {
return err
}
var retBuf bytes.Buffer
if err := ret.Encode(&retBuf); err != nil {
return err
}
if err := retBucket.Put(outBuf.Bytes(), retBuf.Bytes()); err != nil {
return err
}
return nil
})
}
// Remove removes a retribution state from the retributionStore database.
func (rs *retributionStore) Remove(key *wire.OutPoint) error {
return rs.db.Update(func(tx *bolt.Tx) error {
retBucket := tx.Bucket(retributionBucket)
// We return an error if the bucket is not already created, since normal
// operation of the breach arbiter should never try to remove a
// finalized retribution state that is not already stored in the db.
if retBucket == nil {
return errors.New("unable to remove retribution because the " +
"db bucket doesn't exist.")
}
var outBuf bytes.Buffer
if err := lnwire.WriteOutPoint(&outBuf, key); err != nil {
return err
}
if err := retBucket.Delete(outBuf.Bytes()); err != nil {
return err
}
return nil
})
}
// ForAll iterates through all stored retributions and executes the passed
// callback function on each retribution.
func (rs *retributionStore) ForAll(cb func(*retribution) error) error {
return rs.db.View(func(tx *bolt.Tx) error {
// If the bucket does not exist, then there are no pending retributions.
retBucket := tx.Bucket(retributionBucket)
if retBucket == nil {
return nil
}
// Otherwise, we fetch each serialized retribution info, deserialize
// it, and execute the passed in callback function on it.
return retBucket.ForEach(func(outBytes, retBytes []byte) error {
ret := &retribution{}
if err := ret.Decode(bytes.NewBuffer(retBytes)); err != nil {
return err
}
return cb(ret)
})
})
}
// Encode serializes the retribution into the passed byte stream.
func (ret *retribution) Encode(w io.Writer) error {
if _, err := w.Write(ret.commitHash[:]); err != nil {
return err
}
if err := lnwire.WriteOutPoint(w, &ret.chanPoint); err != nil {
return err
}
if err := ret.selfOutput.Encode(w); err != nil {
return err
}
if err := ret.revokedOutput.Encode(w); err != nil {
return err
}
numHtlcOutputs := len(ret.htlcOutputs)
if err := wire.WriteVarInt(w, 0, uint64(numHtlcOutputs)); err != nil {
return err
}
for i := 0; i < numHtlcOutputs; i++ {
if err := ret.htlcOutputs[i].Encode(w); err != nil {
return err
}
}
return nil
}
// Dencode deserializes a retribution from the passed byte stream.
func (ret *retribution) Decode(r io.Reader) error {
var scratch [32]byte
if _, err := io.ReadFull(r, scratch[:]); err != nil {
return err
}
hash, err := chainhash.NewHash(scratch[:])
if err != nil {
return err
}
ret.commitHash = *hash
if err := lnwire.ReadOutPoint(r, &ret.chanPoint); err != nil {
return err
}
ret.selfOutput = &breachedOutput{}
if err := ret.selfOutput.Decode(r); err != nil {
return err
}
ret.revokedOutput = &breachedOutput{}
if err := ret.revokedOutput.Decode(r); err != nil {
return err
}
numHtlcOutputsU64, err := wire.ReadVarInt(r, 0)
if err != nil {
return err
}
numHtlcOutputs := int(numHtlcOutputsU64)
ret.htlcOutputs = make([]*breachedOutput, numHtlcOutputs)
for i := 0; i < numHtlcOutputs; i++ {
ret.htlcOutputs[i] = &breachedOutput{}
if err := ret.htlcOutputs[i].Decode(r); err != nil {
return err
}
}
return nil
}
// Encode serializes a breachedOutput into the passed byte stream.
func (bo *breachedOutput) Encode(w io.Writer) error {
var scratch [8]byte
binary.BigEndian.PutUint64(scratch[:8], uint64(bo.amt))
if _, err := w.Write(scratch[:8]); err != nil {
return err
}
if err := lnwire.WriteOutPoint(w, &bo.outpoint); err != nil {
return err
}
if err := lnwallet.WriteSignDescriptor(w, bo.signDescriptor); err != nil {
return err
}
binary.BigEndian.PutUint16(scratch[:2], uint16(bo.witnessType))
if _, err := w.Write(scratch[:2]); err != nil {
return err
}
if bo.twoStageClaim {
scratch[0] = 1
} else {
scratch[0] = 0
}
if _, err := w.Write(scratch[:1]); err != nil {
return err
}
return nil
}
// Decode deserializes a breachedOutput from the passed byte stream.
func (bo *breachedOutput) Decode(r io.Reader) error {
var scratch [8]byte
if _, err := io.ReadFull(r, scratch[:8]); err != nil {
return err
}
bo.amt = btcutil.Amount(binary.BigEndian.Uint64(scratch[:8]))
if err := lnwire.ReadOutPoint(r, &bo.outpoint); err != nil {
return err
}
signDescriptor := lnwallet.SignDescriptor{}
if err := lnwallet.ReadSignDescriptor(r, &signDescriptor); err != nil {
return err
}
bo.signDescriptor = &signDescriptor
if _, err := io.ReadFull(r, scratch[:2]); err != nil {
return err
}
bo.witnessType = lnwallet.WitnessType(binary.BigEndian.Uint16(scratch[:2]))
if _, err := io.ReadFull(r, scratch[:1]); err != nil {
return err
}
if scratch[0] == 1 {
bo.twoStageClaim = true
} else {
bo.twoStageClaim = false
}
return nil
}

398
breacharbiter_test.go Normal file
View File

@ -0,0 +1,398 @@
package main
import (
"bytes"
"errors"
"fmt"
"io/ioutil"
"os"
"reflect"
"testing"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/chaincfg/chainhash"
"github.com/roasbeef/btcd/txscript"
"github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil"
)
var (
breachOutPoints = []wire.OutPoint{
{
Hash: [chainhash.HashSize]byte{
0x51, 0xb6, 0x37, 0xd8, 0xfc, 0xd2, 0xc6, 0xda,
0x48, 0x59, 0xe6, 0x96, 0x31, 0x13, 0xa1, 0x17,
0x2d, 0xe7, 0x93, 0xe4, 0xb7, 0x25, 0xb8, 0x4d,
0x1f, 0xb, 0x4c, 0xf9, 0x9e, 0xc5, 0x8c, 0xe9,
},
Index: 9,
},
{
Hash: [chainhash.HashSize]byte{
0xb7, 0x94, 0x38, 0x5f, 0x2d, 0x1e, 0xf7, 0xab,
0x4d, 0x92, 0x73, 0xd1, 0x90, 0x63, 0x81, 0xb4,
0x4f, 0x2f, 0x6f, 0x25, 0x88, 0xa3, 0xef, 0xb9,
0x6a, 0x49, 0x18, 0x83, 0x31, 0x98, 0x47, 0x53,
},
Index: 49,
},
{
Hash: [chainhash.HashSize]byte{
0x81, 0xb6, 0x37, 0xd8, 0xfc, 0xd2, 0xc6, 0xda,
0x63, 0x59, 0xe6, 0x96, 0x31, 0x13, 0xa1, 0x17,
0xd, 0xe7, 0x95, 0xe4, 0xb7, 0x25, 0xb8, 0x4d,
0x1e, 0xb, 0x4c, 0xfd, 0x9e, 0xc5, 0x8c, 0xe9,
},
Index: 23,
},
}
breachKeys = [][]byte{
{0x04, 0x11, 0xdb, 0x93, 0xe1, 0xdc, 0xdb, 0x8a,
0x01, 0x6b, 0x49, 0x84, 0x0f, 0x8c, 0x53, 0xbc, 0x1e,
0xb6, 0x8a, 0x38, 0x2e, 0x97, 0xb1, 0x48, 0x2e, 0xca,
0xd7, 0xb1, 0x48, 0xa6, 0x90, 0x9a, 0x5c, 0xb2, 0xe0,
0xea, 0xdd, 0xfb, 0x84, 0xcc, 0xf9, 0x74, 0x44, 0x64,
0xf8, 0x2e, 0x16, 0x0b, 0xfa, 0x9b, 0x8b, 0x64, 0xf9,
0xd4, 0xc0, 0x3f, 0x99, 0x9b, 0x86, 0x43, 0xf6, 0x56,
0xb4, 0x12, 0xa3,
},
{0x07, 0x11, 0xdb, 0x93, 0xe1, 0xdc, 0xdb, 0x8a,
0x01, 0x6b, 0x49, 0x84, 0x0f, 0x8c, 0x53, 0xbc, 0x1e,
0xb6, 0x8a, 0x38, 0x2e, 0x97, 0xb1, 0x48, 0x2e, 0xca,
0xd7, 0xb1, 0x48, 0xa6, 0x90, 0x9a, 0x5c, 0xb2, 0xe0,
0xea, 0xdd, 0xfb, 0x84, 0xcc, 0xf9, 0x74, 0x44, 0x64,
0xf8, 0x2e, 0x16, 0x0b, 0xfa, 0x9b, 0x8b, 0x64, 0xf9,
0xd4, 0xc0, 0x3f, 0x99, 0x9b, 0x86, 0x43, 0xf6, 0x56,
0xb4, 0x12, 0xa3,
},
{0x02, 0xce, 0x0b, 0x14, 0xfb, 0x84, 0x2b, 0x1b,
0xa5, 0x49, 0xfd, 0xd6, 0x75, 0xc9, 0x80, 0x75, 0xf1,
0x2e, 0x9c, 0x51, 0x0f, 0x8e, 0xf5, 0x2b, 0xd0, 0x21,
0xa9, 0xa1, 0xf4, 0x80, 0x9d, 0x3b, 0x4d,
},
}
breachSignDescs = []lnwallet.SignDescriptor{
{
PrivateTweak: []byte{
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02,
},
WitnessScript: []byte{
0x00, 0x14, 0xee, 0x91, 0x41, 0x7e, 0x85, 0x6c, 0xde,
0x10, 0xa2, 0x91, 0x1e, 0xdc, 0xbd, 0xbd, 0x69, 0xe2,
0xef, 0xb5, 0x71, 0x48,
},
Output: &wire.TxOut{
Value: 5000000000,
PkScript: []byte{
0x41, // OP_DATA_65
0x04, 0xd6, 0x4b, 0xdf, 0xd0, 0x9e, 0xb1, 0xc5,
0xfe, 0x29, 0x5a, 0xbd, 0xeb, 0x1d, 0xca, 0x42,
0x81, 0xbe, 0x98, 0x8e, 0x2d, 0xa0, 0xb6, 0xc1,
0xc6, 0xa5, 0x9d, 0xc2, 0x26, 0xc2, 0x86, 0x24,
0xe1, 0x81, 0x75, 0xe8, 0x51, 0xc9, 0x6b, 0x97,
0x3d, 0x81, 0xb0, 0x1c, 0xc3, 0x1f, 0x04, 0x78,
0x34, 0xbc, 0x06, 0xd6, 0xd6, 0xed, 0xf6, 0x20,
0xd1, 0x84, 0x24, 0x1a, 0x6a, 0xed, 0x8b, 0x63,
0xa6, // 65-byte signature
0xac, // OP_CHECKSIG
},
},
HashType: txscript.SigHashAll,
},
{
PrivateTweak: []byte{
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02,
},
WitnessScript: []byte{
0x00, 0x14, 0xee, 0x91, 0x41, 0x7e, 0x85, 0x6c, 0xde,
0x10, 0xa2, 0x91, 0x1e, 0xdc, 0xbd, 0xbd, 0x69, 0xe2,
0xef, 0xb5, 0x71, 0x48,
},
Output: &wire.TxOut{
Value: 5000000000,
PkScript: []byte{
0x41, // OP_DATA_65
0x04, 0xd6, 0x4b, 0xdf, 0xd0, 0x9e, 0xb1, 0xc5,
0xfe, 0x29, 0x5a, 0xbd, 0xeb, 0x1d, 0xca, 0x42,
0x81, 0xbe, 0x98, 0x8e, 0x2d, 0xa0, 0xb6, 0xc1,
0xc6, 0xa5, 0x9d, 0xc2, 0x26, 0xc2, 0x86, 0x24,
0xe1, 0x81, 0x75, 0xe8, 0x51, 0xc9, 0x6b, 0x97,
0x3d, 0x81, 0xb0, 0x1c, 0xc3, 0x1f, 0x04, 0x78,
0x34, 0xbc, 0x06, 0xd6, 0xd6, 0xed, 0xf6, 0x20,
0xd1, 0x84, 0x24, 0x1a, 0x6a, 0xed, 0x8b, 0x63,
0xa6, // 65-byte signature
0xac, // OP_CHECKSIG
},
},
HashType: txscript.SigHashAll,
},
{
PrivateTweak: []byte{
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02,
},
WitnessScript: []byte{
0x00, 0x14, 0xee, 0x91, 0x41, 0x7e, 0x85, 0x6c, 0xde,
0x10, 0xa2, 0x91, 0x1e, 0xdc, 0xbd, 0xbd, 0x69, 0xe2,
0xef, 0xb5, 0x71, 0x48,
},
Output: &wire.TxOut{
Value: 5000000000,
PkScript: []byte{
0x41, // OP_DATA_65
0x04, 0xd6, 0x4b, 0xdf, 0xd0, 0x9e, 0xb1, 0xc5,
0xfe, 0x29, 0x5a, 0xbd, 0xeb, 0x1d, 0xca, 0x42,
0x81, 0xbe, 0x98, 0x8e, 0x2d, 0xa0, 0xb6, 0xc1,
0xc6, 0xa5, 0x9d, 0xc2, 0x26, 0xc2, 0x86, 0x24,
0xe1, 0x81, 0x75, 0xe8, 0x51, 0xc9, 0x6b, 0x97,
0x3d, 0x81, 0xb0, 0x1c, 0xc3, 0x1f, 0x04, 0x78,
0x34, 0xbc, 0x06, 0xd6, 0xd6, 0xed, 0xf6, 0x20,
0xd1, 0x84, 0x24, 0x1a, 0x6a, 0xed, 0x8b, 0x63,
0xa6, // 65-byte signature
0xac, // OP_CHECKSIG
},
},
HashType: txscript.SigHashAll,
},
}
breachedOutputs = []breachedOutput{
{
amt: btcutil.Amount(1e7),
outpoint: breachOutPoints[0],
witnessType: lnwallet.CommitmentNoDelay,
twoStageClaim: true,
},
{
amt: btcutil.Amount(2e9),
outpoint: breachOutPoints[1],
witnessType: lnwallet.CommitmentRevoke,
twoStageClaim: false,
},
{
amt: btcutil.Amount(3e4),
outpoint: breachOutPoints[2],
witnessType: lnwallet.CommitmentDelayOutput,
twoStageClaim: false,
},
}
retributions = []retribution{
{
commitHash: [chainhash.HashSize]byte{
0xb7, 0x94, 0x38, 0x5f, 0x2d, 0x1e, 0xf7, 0xab,
0x4d, 0x92, 0x73, 0xd1, 0x90, 0x63, 0x81, 0xb4,
0x4f, 0x2f, 0x6f, 0x25, 0x88, 0xa3, 0xef, 0xb9,
0x6a, 0x49, 0x18, 0x83, 0x31, 0x98, 0x47, 0x53,
},
chanPoint: breachOutPoints[0],
selfOutput: &breachedOutputs[0],
revokedOutput: &breachedOutputs[1],
htlcOutputs: []*breachedOutput{},
},
{
commitHash: [chainhash.HashSize]byte{
0x51, 0xb6, 0x37, 0xd8, 0xfc, 0xd2, 0xc6, 0xda,
0x48, 0x59, 0xe6, 0x96, 0x31, 0x13, 0xa1, 0x17,
0x2d, 0xe7, 0x93, 0xe4, 0xb7, 0x25, 0xb8, 0x4d,
0x1f, 0xb, 0x4c, 0xf9, 0x9e, 0xc5, 0x8c, 0xe9,
},
chanPoint: breachOutPoints[1],
selfOutput: &breachedOutputs[0],
revokedOutput: &breachedOutputs[1],
htlcOutputs: []*breachedOutput{
&breachedOutputs[1],
&breachedOutputs[2],
},
},
}
)
// Parse the pubkeys in the breached outputs.
func initBreachedOutputs() error {
for i := 0; i < len(breachedOutputs); i++ {
bo := &breachedOutputs[i]
// Parse the sign descriptor's pubkey.
sd := &breachSignDescs[i]
pubkey, err := btcec.ParsePubKey(breachKeys[i], btcec.S256())
if err != nil {
fmt.Errorf("unable to parse pubkey: %v", breachKeys[i])
}
sd.PubKey = pubkey
bo.signDescriptor = sd
}
return nil
}
// Test that breachedOutput Encode/Decode works.
func TestBreachedOutputSerialization(t *testing.T) {
if err := initBreachedOutputs(); err != nil {
t.Fatalf("unable to init breached outputs: %v", err)
}
for i := 0; i < len(breachedOutputs); i++ {
bo := &breachedOutputs[i]
var buf bytes.Buffer
if err := bo.Encode(&buf); err != nil {
t.Fatalf("unable to serialize breached output [%v]: %v", i, err)
}
desBo := &breachedOutput{}
if err := desBo.Decode(&buf); err != nil {
t.Fatalf("unable to deserialize breached output [%v]: %v", i, err)
}
if !reflect.DeepEqual(bo, desBo) {
t.Fatalf("original and deserialized breached outputs not equal:\n"+
"original : %+v\n"+
"deserialized : %+v\n",
bo, desBo)
}
}
}
// Test that retribution Encode/Decode works.
func TestRetributionSerialization(t *testing.T) {
if err := initBreachedOutputs(); err != nil {
t.Fatalf("unable to init breached outputs: %v", err)
}
for i := 0; i < len(retributions); i++ {
ret := &retributions[i]
var buf bytes.Buffer
if err := ret.Encode(&buf); err != nil {
t.Fatalf("unable to serialize retribution [%v]: %v", i, err)
}
desRet := &retribution{}
if err := desRet.Decode(&buf); err != nil {
t.Fatalf("unable to deserialize retribution [%v]: %v", i, err)
}
if !reflect.DeepEqual(ret, desRet) {
t.Fatalf("original and deserialized retribution infos not equal:\n"+
"original : %+v\n"+
"deserialized : %+v\n",
ret, desRet)
}
}
}
// TODO(phlip9): reuse existing function?
// makeTestDB creates a new instance of the ChannelDB for testing purposes. A
// callback which cleans up the created temporary directories is also returned
// and intended to be executed after the test completes.
func makeTestDB() (*channeldb.DB, func(), error) {
var db *channeldb.DB = nil
// First, create a temporary directory to be used for the duration of
// this test.
tempDirName, err := ioutil.TempDir("", "channeldb")
if err != nil {
return nil, nil, err
}
// Next, create channeldb for the first time.
db, err = channeldb.Open(tempDirName)
if err != nil {
return nil, nil, err
}
cleanUp := func() {
if db != nil {
db.Close()
}
os.RemoveAll(tempDirName)
}
return db, cleanUp, nil
}
func countRetributions(t *testing.T, rs *retributionStore) int {
count := 0
err := rs.ForAll(func(_ *retribution) error {
count += 1
return nil
})
if err != nil {
t.Fatalf("unable to list retributions in db: %v", err)
}
return count
}
// Test that the retribution persistence layer works.
func TestRetributionStore(t *testing.T) {
db, cleanUp, err := makeTestDB()
defer cleanUp()
if err != nil {
t.Fatalf("unable to create test db: %v", err)
}
if err := initBreachedOutputs(); err != nil {
t.Fatalf("unable to init breached outputs: %v", err)
}
rs := newRetributionStore(db)
// Make sure that a new retribution store is actually emtpy.
if count := countRetributions(t, rs); count != 0 {
t.Fatalf("expected 0 retributions, found %v", count)
}
// Add some retribution states to the store.
if err := rs.Add(&retributions[0]); err != nil {
t.Fatalf("unable to add to retribution store: %v", err)
}
if err := rs.Add(&retributions[1]); err != nil {
t.Fatalf("unable to add to retribution store: %v", err)
}
// There should be 2 retributions in the store.
if count := countRetributions(t, rs); count != 2 {
t.Fatalf("expected 2 retributions, found %v", count)
}
// Retrieving the retribution states from the store should yield the same
// values as the originals.
rs.ForAll(func(ret *retribution) error {
equal0 := reflect.DeepEqual(ret, &retributions[0])
equal1 := reflect.DeepEqual(ret, &retributions[1])
if !equal0 || !equal1 {
return errors.New("unexpected retribution retrieved from db")
}
return nil
})
// Remove the retribution states.
if err := rs.Remove(&retributions[0].chanPoint); err != nil {
t.Fatalf("unable to remove from retribution store: %v", err)
}
if err := rs.Remove(&retributions[1].chanPoint); err != nil {
t.Fatalf("unable to remove from retribution store: %v", err)
}
// Ensure that the retribution store is empty again.
if count := countRetributions(t, rs); count != 0 {
t.Fatalf("expected 0 retributions, found %v", count)
}
}

View File

@ -1720,7 +1720,35 @@ func copyFile(dest, src string) error {
}
return d.Close()
}
func waitForTxInMempool(miner *btcrpcclient.Client,
timeout time.Duration) (*chainhash.Hash, error) {
var txid *chainhash.Hash
breakTimeout := time.After(timeout)
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
poll:
for {
select {
case <-breakTimeout:
return nil, errors.New("no tx found in mempool")
case <-ticker.C:
mempool, err := miner.GetRawMempool()
if err != nil {
return nil, err
}
if len(mempool) == 0 {
continue
}
txid = mempool[0]
break poll
}
}
return txid, nil
}
func testRevokedCloseRetribution(net *networkHarness, t *harnessTest) {
@ -1883,34 +1911,48 @@ func testRevokedCloseRetribution(net *networkHarness, t *harnessTest) {
// broadcasting his current channel state. This is actually the
// commitment transaction of a prior *revoked* state, so he'll soon
// feel the wrath of Alice's retribution.
breachTXID := closeChannelAndAssert(ctxb, t, net, net.Bob, chanPoint,
true)
force := true
closeUpdates, _, err := net.CloseChannel(ctxb, net.Bob, chanPoint, force)
if err != nil {
t.Fatalf("unable to close channel: %v", err)
}
// Wait for Bob's breach transaction to show up in the mempool to ensure
// that Alice's node has started waiting for confirmations.
_, err = waitForTxInMempool(net.Miner.Node, 5*time.Second)
if err != nil {
t.Fatalf("unable to find Bob's breach tx in mempool: %v", err)
}
time.Sleep(100 * time.Millisecond)
// Here, Alice sees Bob's breach transaction in the mempool, but is waiting
// for it to confirm before continuing her retribution. We restart Alice to
// ensure that she is persisting her retribution state and continues
// watching for the breach transaction to confirm even after her node
// restarts.
if err := net.RestartNode(net.Alice, nil); err != nil {
t.Fatalf("unable to restart Alice's node: %v", err)
}
// Finally, generate a single block, wait for the final close status
// update, then ensure that the closing transaction was included in the
// block.
block := mineBlocks(t, net, 1)[0]
breachTXID, err := net.WaitForChannelClose(ctxb, closeUpdates)
if err != nil {
t.Fatalf("error while waiting for channel close: %v", err)
}
assertTxInBlock(t, block, breachTXID)
// Query the mempool for Alice's justice transaction, this should be
// broadcast as Bob's contract breaching transaction gets confirmed
// above.
var justiceTXID *chainhash.Hash
breakTimeout := time.After(time.Second * 5)
poll:
for {
select {
case <-breakTimeout:
t.Fatalf("justice tx not found in mempool")
default:
}
mempool, err := net.Miner.Node.GetRawMempool()
if err != nil {
t.Fatalf("unable to get mempool: %v", err)
}
if len(mempool) == 0 {
continue
}
justiceTXID = mempool[0]
break poll
justiceTXID, err := waitForTxInMempool(net.Miner.Node, 5*time.Second)
if err != nil {
t.Fatalf("unable to find Alice's justice tx in mempool: %v", err)
}
time.Sleep(100 * time.Millisecond)
// Query for the mempool transaction found above. Then assert that all
// the inputs of this transaction are spending outputs generated by
@ -1926,9 +1968,18 @@ poll:
}
}
// We restart Alice here to ensure that she persists her retribution state
// and successfully continues exacting retribution after restarting. At
// this point, Alice has broadcast the justice transaction, but it hasn't
// been confirmed yet; when Alice restarts, she should start waiting for
// the justice transaction to confirm again.
if err := net.RestartNode(net.Alice, nil); err != nil {
t.Fatalf("unable to restart Alice's node: %v", err)
}
// Now mine a block, this transaction should include Alice's justice
// transaction which was just accepted into the mempool.
block := mineBlocks(t, net, 1)[0]
block = mineBlocks(t, net, 1)[0]
// The block should have exactly *two* transactions, one of which is
// the justice transaction.