breacharbiter: reverts retributionInfo naming and realign diffs

This commit is contained in:
Conner Fromknecht 2017-07-25 20:14:03 -07:00 committed by Olaoluwa Osuntokun
parent 4cdce1fc0a
commit 8698085e35
2 changed files with 148 additions and 116 deletions

@ -59,7 +59,7 @@ type breachArbiter struct {
// struct to send the necessary information required to punish a // struct to send the necessary information required to punish a
// counterparty once a channel breach is detected. Breach observers // counterparty once a channel breach is detected. Breach observers
// use this to communicate with the main contractObserver goroutine. // use this to communicate with the main contractObserver goroutine.
breachedContracts chan *retribution breachedContracts chan *retributionInfo
// newContracts is a channel which is used by outside subsystems to // newContracts is a channel which is used by outside subsystems to
// notify the breachArbiter of a new contract (a channel) that should // notify the breachArbiter of a new contract (a channel) that should
@ -92,7 +92,7 @@ func newBreachArbiter(wallet *lnwallet.LightningWallet, db *channeldb.DB,
retributionStore: newRetributionStore(db), retributionStore: newRetributionStore(db),
breachObservers: make(map[wire.OutPoint]chan struct{}), breachObservers: make(map[wire.OutPoint]chan struct{}),
breachedContracts: make(chan *retribution), breachedContracts: make(chan *retributionInfo),
newContracts: make(chan *lnwallet.LightningChannel), newContracts: make(chan *lnwallet.LightningChannel),
settledContracts: make(chan *wire.OutPoint), settledContracts: make(chan *wire.OutPoint),
quit: make(chan struct{}), quit: make(chan struct{}),
@ -116,7 +116,7 @@ func (b *breachArbiter) Start() error {
// We load any pending retributions from the database. For each retribution // We load any pending retributions from the database. For each retribution
// we need to restart the retribution procedure to claim our just reward. // we need to restart the retribution procedure to claim our just reward.
err = b.retributionStore.ForAll(func(ret *retribution) error { err = b.retributionStore.ForAll(func(ret *retributionInfo) error {
// Register for a notification when the breach transaction is confirmed // Register for a notification when the breach transaction is confirmed
// on chain. // on chain.
breachTXID := &ret.commitHash breachTXID := &ret.commitHash
@ -366,6 +366,115 @@ out:
return return
} }
// exactRetribution is a goroutine which is executed once a contract breach has
// been detected by a breachObserver. This function is responsible for
// punishing a counterparty for violating the channel contract by sweeping ALL
// the lingering funds within the channel into the daemon's wallet.
//
// NOTE: This MUST be run as a goroutine.
func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent,
breachInfo *retributionInfo) {
defer b.wg.Done()
// TODO(roasbeef): state needs to be checkpointed here
select {
case _, ok := <-confChan.Confirmed:
// If the second value is !ok, then the channel has been closed
// signifying a daemon shutdown, so we exit.
if !ok {
return
}
// Otherwise, if this is a real confirmation notification, then
// we fall through to complete our duty.
case <-b.quit:
return
}
brarLog.Debugf("Breach transaction %v has been confirmed, sweeping "+
"revoked funds", breachInfo.commitHash)
// With the breach transaction confirmed, we now create the justice tx
// which will claim ALL the funds within the channel.
justiceTx, err := b.createJusticeTx(breachInfo)
if err != nil {
brarLog.Errorf("unable to create justice tx: %v", err)
return
}
brarLog.Debugf("Broadcasting justice tx: %v", newLogClosure(func() string {
return spew.Sdump(justiceTx)
}))
_, currentHeight, err := b.chainIO.GetBestBlock()
if err != nil {
brarLog.Errorf("unable to get current height: %v", err)
return
}
// Finally, broadcast the transaction, finalizing the channels'
// retribution against the cheating counterparty.
if err := b.wallet.PublishTransaction(justiceTx); err != nil {
brarLog.Errorf("unable to broadcast "+
"justice tx: %v", err)
return
}
// As a conclusionary step, we register for a notification to be
// dispatched once the justice tx is confirmed. After confirmation we
// notify the caller that initiated the retribution workflow that the
// deed has been done.
justiceTXID := justiceTx.TxHash()
confChan, err = b.notifier.RegisterConfirmationsNtfn(&justiceTXID, 1,
uint32(currentHeight))
if err != nil {
brarLog.Errorf("unable to register for conf for txid: %v",
justiceTXID)
return
}
select {
case _, ok := <-confChan.Confirmed:
if !ok {
return
}
// TODO(roasbeef): factor in HTLCs
revokedFunds := breachInfo.revokedOutput.amt
totalFunds := revokedFunds + breachInfo.selfOutput.amt
brarLog.Infof("Justice for ChannelPoint(%v) has "+
"been served, %v revoked funds (%v total) "+
"have been claimed", breachInfo.chanPoint,
revokedFunds, totalFunds)
// With the channel closed, mark it in the database as such.
err := b.db.MarkChanFullyClosed(&breachInfo.chanPoint)
if err != nil {
brarLog.Errorf("unable to mark chan as closed: %v", err)
}
// Justice has been carried out; we can safely delete the retribution
// info from the database.
err = b.retributionStore.Remove(&breachInfo.chanPoint)
if err != nil {
brarLog.Errorf("unable to remove retribution from the db: %v", err)
}
// TODO(roasbeef): add peer to blacklist?
// TODO(roasbeef): close other active channels with offending peer
close(breachInfo.doneChan)
return
case <-b.quit:
return
}
}
// breachObserver notifies the breachArbiter contract observer goroutine that a // breachObserver notifies the breachArbiter contract observer goroutine that a
// channel's contract has been breached by the prior counterparty. Once // channel's contract has been breached by the prior counterparty. Once
// notified the breachArbiter will attempt to sweep ALL funds within the // notified the breachArbiter will attempt to sweep ALL funds within the
@ -504,7 +613,7 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel,
// Finally, we send the retribution information into the breachArbiter // Finally, we send the retribution information into the breachArbiter
// event loop to deal swift justice. // event loop to deal swift justice.
// TODO(roasbeef): populate htlc breaches // TODO(roasbeef): populate htlc breaches
b.breachedContracts <- &retribution{ b.breachedContracts <- &retributionInfo{
commitHash: breachInfo.BreachTransaction.TxHash(), commitHash: breachInfo.BreachTransaction.TxHash(),
chanPoint: *chanPoint, chanPoint: *chanPoint,
@ -523,6 +632,7 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel,
}, },
htlcOutputs: []*breachedOutput{}, htlcOutputs: []*breachedOutput{},
doneChan: make(chan struct{}),
} }
case <-b.quit: case <-b.quit:
@ -530,120 +640,42 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel,
} }
} }
// exactRetribution is a goroutine which is executed once a contract breach has // breachedOutput contains all the information needed to sweep a breached
// been detected by a breachObserver. This function is responsible for // output. A breached output is an output that we are now entitled to due to a
// punishing a counterparty for violating the channel contract by sweeping ALL // revoked commitment transaction being broadcast.
// the lingering funds within the channel into the daemon's wallet. type breachedOutput struct {
// amt btcutil.Amount
// NOTE: This MUST be run as a goroutine. outpoint wire.OutPoint
func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent,
breachInfo *retribution) {
defer b.wg.Done() signDescriptor *lnwallet.SignDescriptor
witnessType lnwallet.WitnessType
// TODO(roasbeef): state needs to be checkpointed here twoStageClaim bool
}
select { // retributionInfo encapsulates all the data needed to sweep all the contested
case _, ok := <-confChan.Confirmed: // funds within a channel whose contract has been breached by the prior
// If the second value is !ok, then the channel has been closed // counterparty. This struct is used to create the justice transaction which
// signifying a daemon shutdown, so we exit. // spends all outputs of the commitment transaction into an output controlled
if !ok { // by the wallet.
return type retributionInfo struct {
} commitHash chainhash.Hash
chanPoint wire.OutPoint
// Otherwise, if this is a real confirmation notification, then selfOutput *breachedOutput
// we fall through to complete our duty.
case <-b.quit:
return
}
brarLog.Debugf("Breach transaction %v has been confirmed, sweeping "+ revokedOutput *breachedOutput
"revoked funds", breachInfo.commitHash)
// With the breach transaction confirmed, we now create the justice tx htlcOutputs []*breachedOutput
// which will claim ALL the funds within the channel.
justiceTx, err := b.createJusticeTx(breachInfo)
if err != nil {
brarLog.Errorf("unable to create justice tx: %v", err)
return
}
brarLog.Debugf("Broadcasting justice tx: %v", newLogClosure(func() string { doneChan chan struct{}
return spew.Sdump(justiceTx)
}))
_, currentHeight, err := b.chainIO.GetBestBlock()
if err != nil {
brarLog.Errorf("unable to get current height: %v", err)
return
}
// Finally, broadcast the transaction, finalizing the channels'
// retribution against the cheating counterparty.
if err := b.wallet.PublishTransaction(justiceTx); err != nil {
brarLog.Errorf("unable to broadcast "+
"justice tx: %v", err)
return
}
// As a conclusionary step, we register for a notification to be
// dispatched once the justice tx is confirmed. After confirmation we
// notify the caller that initiated the retribution workflow that the
// deed has been done.
justiceTXID := justiceTx.TxHash()
confChan, err = b.notifier.RegisterConfirmationsNtfn(&justiceTXID, 1,
uint32(currentHeight))
if err != nil {
brarLog.Errorf("unable to register for conf for txid: %v",
justiceTXID)
return
}
select {
case _, ok := <-confChan.Confirmed:
if !ok {
return
}
// TODO(roasbeef): factor in HTLCs
revokedFunds := breachInfo.revokedOutput.amt
totalFunds := revokedFunds + breachInfo.selfOutput.amt
brarLog.Infof("Justice for ChannelPoint(%v) has "+
"been served, %v revoked funds (%v total) "+
"have been claimed", breachInfo.chanPoint,
revokedFunds, totalFunds)
// With the channel closed, mark it in the database as such.
err := b.db.MarkChanFullyClosed(&breachInfo.chanPoint)
if err != nil {
brarLog.Errorf("unable to mark chan as closed: %v", err)
}
// Justice has been carried out; we can safely delete the retribution
// info from the database.
err = b.retributionStore.Remove(&breachInfo.chanPoint)
if err != nil {
brarLog.Errorf("unable to remove retribution from the db: %v", err)
}
// TODO(roasbeef): add peer to blacklist?
// TODO(roasbeef): close other active channels with offending peer
close(breachInfo.doneChan)
return
case <-b.quit:
return
}
} }
// createJusticeTx creates a transaction which exacts "justice" by sweeping ALL // createJusticeTx creates a transaction which exacts "justice" by sweeping ALL
// the funds within the channel which we are now entitled to due to a breach of // the funds within the channel which we are now entitled to due to a breach of
// the channel's contract by the counterparty. This function returns a *fully* // the channel's contract by the counterparty. This function returns a *fully*
// signed transaction with the witness for each input fully in place. // signed transaction with the witness for each input fully in place.
func (b *breachArbiter) createJusticeTx(r *retribution) (*wire.MsgTx, error) { func (b *breachArbiter) createJusticeTx(r *retributionInfo) (*wire.MsgTx, error) {
// First, we obtain a new public key script from the wallet which we'll // First, we obtain a new public key script from the wallet which we'll
// sweep the funds to. // sweep the funds to.
@ -784,7 +816,7 @@ type breachedOutput struct {
// counterparty. This struct is used to create the justice transaction which // counterparty. This struct is used to create the justice transaction which
// spends all outputs of the commitment transaction into an output controlled // spends all outputs of the commitment transaction into an output controlled
// by the wallet. // by the wallet.
type retribution struct { type retributionInfo struct {
commitHash chainhash.Hash commitHash chainhash.Hash
chanPoint wire.OutPoint chanPoint wire.OutPoint
@ -792,7 +824,7 @@ type retribution struct {
revokedOutput *breachedOutput revokedOutput *breachedOutput
htlcOutputs []*breachedOutput htlcOutputs []*breachedOutput
doneChan chan struct{} doneChan chan struct{}
} }
// retributionStore handles persistence of retribution states to disk and is // retributionStore handles persistence of retribution states to disk and is
@ -812,7 +844,7 @@ func newRetributionStore(db *channeldb.DB) *retributionStore {
// Add adds a retribution state to the retributionStore, which is then persisted // Add adds a retribution state to the retributionStore, which is then persisted
// to disk. // to disk.
func (rs *retributionStore) Add(ret *retribution) error { func (rs *retributionStore) Add(ret *retributionInfo) error {
return rs.db.Update(func(tx *bolt.Tx) error { return rs.db.Update(func(tx *bolt.Tx) error {
// If this is our first contract breach, the retributionBucket won't // If this is our first contract breach, the retributionBucket won't
// exist, in which case, we just create a new bucket. // exist, in which case, we just create a new bucket.
@ -867,7 +899,7 @@ func (rs *retributionStore) Remove(key *wire.OutPoint) error {
// ForAll iterates through all stored retributions and executes the passed // ForAll iterates through all stored retributions and executes the passed
// callback function on each retribution. // callback function on each retribution.
func (rs *retributionStore) ForAll(cb func(*retribution) error) error { func (rs *retributionStore) ForAll(cb func(*retributionInfo) error) error {
return rs.db.View(func(tx *bolt.Tx) error { return rs.db.View(func(tx *bolt.Tx) error {
// If the bucket does not exist, then there are no pending retributions. // If the bucket does not exist, then there are no pending retributions.
retBucket := tx.Bucket(retributionBucket) retBucket := tx.Bucket(retributionBucket)
@ -878,7 +910,7 @@ func (rs *retributionStore) ForAll(cb func(*retribution) error) error {
// Otherwise, we fetch each serialized retribution info, deserialize // Otherwise, we fetch each serialized retribution info, deserialize
// it, and execute the passed in callback function on it. // it, and execute the passed in callback function on it.
return retBucket.ForEach(func(outBytes, retBytes []byte) error { return retBucket.ForEach(func(outBytes, retBytes []byte) error {
ret := &retribution{} ret := &retributionInfo{}
if err := ret.Decode(bytes.NewBuffer(retBytes)); err != nil { if err := ret.Decode(bytes.NewBuffer(retBytes)); err != nil {
return err return err
} }
@ -889,7 +921,7 @@ func (rs *retributionStore) ForAll(cb func(*retribution) error) error {
} }
// Encode serializes the retribution into the passed byte stream. // Encode serializes the retribution into the passed byte stream.
func (ret *retribution) Encode(w io.Writer) error { func (ret *retributionInfo) Encode(w io.Writer) error {
if _, err := w.Write(ret.commitHash[:]); err != nil { if _, err := w.Write(ret.commitHash[:]); err != nil {
return err return err
} }
@ -921,7 +953,7 @@ func (ret *retribution) Encode(w io.Writer) error {
} }
// Dencode deserializes a retribution from the passed byte stream. // Dencode deserializes a retribution from the passed byte stream.
func (ret *retribution) Decode(r io.Reader) error { func (ret *retributionInfo) Decode(r io.Reader) error {
var scratch [32]byte var scratch [32]byte
if _, err := io.ReadFull(r, scratch[:]); err != nil { if _, err := io.ReadFull(r, scratch[:]); err != nil {

@ -191,7 +191,7 @@ var (
}, },
} }
retributions = []retribution{ retributions = []retributionInfo{
{ {
commitHash: [chainhash.HashSize]byte{ commitHash: [chainhash.HashSize]byte{
0xb7, 0x94, 0x38, 0x5f, 0x2d, 0x1e, 0xf7, 0xab, 0xb7, 0x94, 0x38, 0x5f, 0x2d, 0x1e, 0xf7, 0xab,
@ -284,7 +284,7 @@ func TestRetributionSerialization(t *testing.T) {
t.Fatalf("unable to serialize retribution [%v]: %v", i, err) t.Fatalf("unable to serialize retribution [%v]: %v", i, err)
} }
desRet := &retribution{} desRet := &retributionInfo{}
if err := desRet.Decode(&buf); err != nil { if err := desRet.Decode(&buf); err != nil {
t.Fatalf("unable to deserialize retribution [%v]: %v", i, err) t.Fatalf("unable to deserialize retribution [%v]: %v", i, err)
} }
@ -330,7 +330,7 @@ func makeTestDB() (*channeldb.DB, func(), error) {
func countRetributions(t *testing.T, rs *retributionStore) int { func countRetributions(t *testing.T, rs *retributionStore) int {
count := 0 count := 0
err := rs.ForAll(func(_ *retribution) error { err := rs.ForAll(func(_ *retributionInfo) error {
count++ count++
return nil return nil
}) })
@ -374,7 +374,7 @@ func TestRetributionStore(t *testing.T) {
// Retrieving the retribution states from the store should yield the same // Retrieving the retribution states from the store should yield the same
// values as the originals. // values as the originals.
rs.ForAll(func(ret *retribution) error { rs.ForAll(func(ret *retributionInfo) error {
equal0 := reflect.DeepEqual(ret, &retributions[0]) equal0 := reflect.DeepEqual(ret, &retributions[0])
equal1 := reflect.DeepEqual(ret, &retributions[1]) equal1 := reflect.DeepEqual(ret, &retributions[1])
if !equal0 || !equal1 { if !equal0 || !equal1 {