breacharbiter: reliable handoff from wallet

This commit is contained in:
Conner Fromknecht 2017-11-20 23:56:42 -08:00
parent e2fe4c2955
commit fb228a0f7d
No known key found for this signature in database
GPG Key ID: 39DE78FBE6ACB0EF

@ -15,33 +15,34 @@ import (
"github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet"
"github.com/roasbeef/btcd/blockchain" "github.com/roasbeef/btcd/blockchain"
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/chaincfg/chainhash" "github.com/roasbeef/btcd/chaincfg/chainhash"
"github.com/roasbeef/btcd/txscript" "github.com/roasbeef/btcd/txscript"
"github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil" "github.com/roasbeef/btcutil"
) )
// retributionBucket stores retribution state on disk between detecting a var (
// contract breach, broadcasting a justice transaction that sweeps the channel, // retributionBucket stores retribution state on disk between detecting
// and finally witnessing the justice transaction confirm on the blockchain. It // a contract breach, broadcasting a justice transaction that sweeps the
// is critical that such state is persisted on disk, so that if our node // channel, and finally witnessing the justice transaction confirm on
// restarts at any point during the retribution procedure, we can recover and // the blockchain. It is critical that such state is persisted on disk,
// continue from the persisted state. // so that if our node restarts at any point during the retribution
var retributionBucket = []byte("retribution") // procedure, we can recover and continue from the persisted state.
retributionBucket = []byte("retribution")
// justiceTxnBucket holds the finalized justice transactions for all
// breached contracts. Entries are added to the justice txn bucket just
// before broadcasting the sweep txn.
justiceTxnBucket = []byte("justice-txn")
)
// BreachConfig bundles the required subsystems used by the breach arbiter. An // BreachConfig bundles the required subsystems used by the breach arbiter. An
// instance of BreachConfig is passed to newBreachArbiter during instantiation. // instance of BreachConfig is passed to newBreachArbiter during instantiation.
type BreachConfig struct { type BreachConfig struct {
// ChainIO is used by the breach arbiter to determine the current height
// of the blockchain, which is required to subscribe for spend
// notifications from Notifier.
ChainIO lnwallet.BlockChainIO
// CloseLink allows the breach arbiter to shutdown any channel links for // CloseLink allows the breach arbiter to shutdown any channel links for
// which it detects a breach, ensuring now further activity will // which it detects a breach, ensuring now further activity will
// continue across the link. The method accepts link's channel point and a // continue across the link. The method accepts link's channel point and
// close type to be included in the channel close summary. // a close type to be included in the channel close summary.
CloseLink func(*wire.OutPoint, htlcswitch.ChannelCloseType) CloseLink func(*wire.OutPoint, htlcswitch.ChannelCloseType)
// DB provides access to the user's channels, allowing the breach // DB provides access to the user's channels, allowing the breach
@ -85,6 +86,9 @@ type BreachConfig struct {
// counterparties. // counterparties.
// TODO(roasbeef): closures in config for subsystem pointers to decouple? // TODO(roasbeef): closures in config for subsystem pointers to decouple?
type breachArbiter struct { type breachArbiter struct {
started uint32
stopped uint32
cfg *BreachConfig cfg *BreachConfig
// breachObservers is a map which tracks all the active breach // breachObservers is a map which tracks all the active breach
@ -112,10 +116,8 @@ type breachArbiter struct {
// breach closes. // breach closes.
settledContracts chan *wire.OutPoint settledContracts chan *wire.OutPoint
started uint32 quit chan struct{}
stopped uint32 wg sync.WaitGroup
quit chan struct{}
wg sync.WaitGroup
} }
// newBreachArbiter creates a new instance of a breachArbiter initialized with // newBreachArbiter creates a new instance of a breachArbiter initialized with
@ -141,40 +143,52 @@ func (b *breachArbiter) Start() error {
brarLog.Tracef("Starting breach arbiter") brarLog.Tracef("Starting breach arbiter")
// We load all pending retributions from the database and // Load all retributions currently persisted in the retribution store.
// deterministically reconstruct a channel close summary for each. In
// the event that a channel is still open after being breached, we can
// use the close summary to reinitiate a channel close so that the
// breach is reflected in channeldb.
breachRetInfos := make(map[wire.OutPoint]retributionInfo) breachRetInfos := make(map[wire.OutPoint]retributionInfo)
closeSummaries := make(map[wire.OutPoint]channeldb.ChannelCloseSummary) if err := b.cfg.Store.ForAll(func(ret *retributionInfo) error {
err := b.cfg.Store.ForAll(func(ret *retributionInfo) error {
// Extract emitted retribution information.
breachRetInfos[ret.chanPoint] = *ret breachRetInfos[ret.chanPoint] = *ret
// Deterministically reconstruct channel close summary from
// persisted retribution information and record in breach close
// summaries map under the corresponding channel point.
closeSummary := channeldb.ChannelCloseSummary{
ChanPoint: ret.chanPoint,
ClosingTXID: ret.commitHash,
RemotePub: ret.remoteIdentity,
Capacity: ret.capacity,
SettledBalance: ret.settledBalance,
CloseType: channeldb.BreachClose,
IsPending: true,
}
closeSummaries[ret.chanPoint] = closeSummary
return nil return nil
}) }); err != nil {
if err != nil {
return err return err
} }
// Load all currently closed channels from disk, we will use the
// channels that have been marked fully closed to filter the retribution
// information loaded from disk. This is necessary in the event that the
// channel was marked fully closed, but was not removed from the
// retribution store.
closedChans, err := b.cfg.DB.FetchClosedChannels(false)
if err != nil {
brarLog.Errorf("unable to fetch closing channels: %v", err)
return err
}
// Using the set of non-pending, closed channels, reconcile any
// discrepancies between the channeldb and the retribution store by
// removing any retribution information for which we have already
// finished our responsibilities. If the removal is successful, we also
// remove the entry from our in-memory map, to avoid any further action
// for this channel.
for _, chanSummary := range closedChans {
if chanSummary.IsPending {
continue
}
chanPoint := &chanSummary.ChanPoint
if _, ok := breachRetInfos[*chanPoint]; ok {
if err := b.cfg.Store.Remove(chanPoint); err != nil {
brarLog.Errorf("unable to remove closed "+
"chanid=%v from breach arbiter: %v",
chanPoint, err)
return err
}
delete(breachRetInfos, *chanPoint)
}
}
// We need to query that database state for all currently active // We need to query that database state for all currently active
// channels, each of these channels will need a goroutine assigned to // channels, these channels will represent a super set of all channels
// it to watch for channel breaches. // that may be assigned a go routine to monitor for channel breaches.
activeChannels, err := b.cfg.DB.FetchAllChannels() activeChannels, err := b.cfg.DB.FetchAllChannels()
if err != nil && err != channeldb.ErrNoActiveChannels { if err != nil && err != channeldb.ErrNoActiveChannels {
brarLog.Errorf("unable to fetch active channels: %v", err) brarLog.Errorf("unable to fetch active channels: %v", err)
@ -188,29 +202,20 @@ func (b *breachArbiter) Start() error {
} }
// Here we will determine a set of channels that will need to be managed // Here we will determine a set of channels that will need to be managed
// by the contractObserver. For each of the open channels read from // by the contractObserver. This should comprise all active channels
// disk, we will create a channel state machine that can be used to // that have not been breached. If the channel point has an entry in the
// watch for any potential channel closures. We must first exclude any // retribution store, we skip it to avoid creating a breach observer.
// channel whose retribution process has been initiated, and proceed to // Resolving breached channels will be handled later by spawning an
// mark them as closed. The state machines generated for these filtered // exactRetribution task for each.
// channels can be discarded, as their fate will be placed in the hands
// of an exactRetribution task spawned later.
//
// NOTE: Spawning of the exactRetribution task is intentionally
// postponed until after this step in order to ensure that the all
// breached channels are reflected as closed in channeldb and consistent
// with what is checkpointed by the breach arbiter. Instead of treating
// the breached-and-closed and breached-but-still-active channels as
// separate sets of channels, we first ensure that all
// breached-but-still-active channels are promoted to
// breached-and-closed during restart, allowing us to treat them as a
// single set from here on out. This approach also has the added benefit
// of minimizing the likelihood that the wrong number of tasks are
// spawned per breached channel, and prevents us from being in a
// position where retribution has completed but the channel is still
// marked as open in channeldb.
channelsToWatch := make([]*lnwallet.LightningChannel, 0, nActive) channelsToWatch := make([]*lnwallet.LightningChannel, 0, nActive)
for _, chanState := range activeChannels { for _, chanState := range activeChannels {
// If this channel was previously breached, we skip it here to
// avoid creating a breach observer, as we can go straight to
// the task of exacting retribution.
if _, ok := breachRetInfos[chanState.FundingOutpoint]; ok {
continue
}
// Initialize active channel from persisted channel state. // Initialize active channel from persisted channel state.
channel, err := lnwallet.NewLightningChannel(nil, channel, err := lnwallet.NewLightningChannel(nil,
b.cfg.Notifier, b.cfg.Estimator, chanState) b.cfg.Notifier, b.cfg.Estimator, chanState)
@ -220,62 +225,28 @@ func (b *breachArbiter) Start() error {
return err return err
} }
// Before marking this as an active channel that the breach
// arbiter should watch, check to see if this channel was
// previously breached. If so, we attempt to reflect this in the
// channeldb by closing the channel. Upon success, we continue
// because the channel is no longer open, and thus does not need
// to be managed by the contractObserver.
chanPoint := chanState.FundingOutpoint
if closeSummary, ok := closeSummaries[chanPoint]; ok {
// Since this channel should not be open, we
// immediately notify the HTLC switch that this link
// should be closed, and that all activity on the link
// should cease.
b.cfg.CloseLink(&chanState.FundingOutpoint,
htlcswitch.CloseBreach)
// Ensure channeldb is consistent with the persisted
// breach.
err := channel.DeleteState(&closeSummary)
if err != nil {
brarLog.Errorf("unable to delete channel "+
"state: %v", err)
return err
}
// Now that this channel is both breached _and_ closed,
// we can skip adding it to the `channelsToWatch` since
// we can begin the retribution process immediately.
continue
}
// Finally, add this channel to breach arbiter's list of // Finally, add this channel to breach arbiter's list of
// channels to watch. // channels to watch.
channelsToWatch = append(channelsToWatch, channel) channelsToWatch = append(channelsToWatch, channel)
} }
// TODO(roasbeef): instead use closure height of channel
_, currentHeight, err := b.cfg.ChainIO.GetBestBlock()
if err != nil {
return err
}
// Additionally, we'll also want to watch any pending close or force // Additionally, we'll also want to watch any pending close or force
// close transactions so we can properly mark them as resolved in the // close transactions so we can properly mark them as resolved in the
// database. // database.
if err := b.watchForPendingCloseConfs(currentHeight); err != nil { if err := b.watchForPendingCloseConfs(); err != nil {
return err return err
} }
// Spawn the exactRetribution tasks to monitor and resolve any breaches // Spawn the exactRetribution tasks to monitor and resolve any breaches
// that were loaded from the retribution store. // that were loaded from the retribution store.
for chanPoint, closeSummary := range closeSummaries { for chanPoint := range breachRetInfos {
retInfo := breachRetInfos[chanPoint]
// Register for a notification when the breach transaction is // Register for a notification when the breach transaction is
// confirmed on chain. // confirmed on chain.
breachTXID := closeSummary.ClosingTXID breachTXID := retInfo.commitHash
confChan, err := b.cfg.Notifier.RegisterConfirmationsNtfn( confChan, err := b.cfg.Notifier.RegisterConfirmationsNtfn(
&breachTXID, 1, uint32(currentHeight)) &breachTXID, 1, retInfo.breachHeight)
if err != nil { if err != nil {
brarLog.Errorf("unable to register for conf updates "+ brarLog.Errorf("unable to register for conf updates "+
"for txid: %v, err: %v", breachTXID, err) "for txid: %v, err: %v", breachTXID, err)
@ -284,7 +255,6 @@ func (b *breachArbiter) Start() error {
// Launch a new goroutine which to finalize the channel // Launch a new goroutine which to finalize the channel
// retribution after the breach transaction confirms. // retribution after the breach transaction confirms.
retInfo := breachRetInfos[chanPoint]
b.wg.Add(1) b.wg.Add(1)
go b.exactRetribution(confChan, &retInfo) go b.exactRetribution(confChan, &retInfo)
} }
@ -298,12 +268,13 @@ func (b *breachArbiter) Start() error {
// watchForPendingCloseConfs dispatches confirmation notification subscribers // watchForPendingCloseConfs dispatches confirmation notification subscribers
// that mark any pending channels as fully closed when signaled. // that mark any pending channels as fully closed when signaled.
func (b *breachArbiter) watchForPendingCloseConfs(currentHeight int32) error { func (b *breachArbiter) watchForPendingCloseConfs() error {
pendingCloseChans, err := b.cfg.DB.FetchClosedChannels(true) pendingCloseChans, err := b.cfg.DB.FetchClosedChannels(true)
if err != nil { if err != nil {
brarLog.Errorf("unable to fetch closing channels: %v", err) brarLog.Errorf("unable to fetch closing channels: %v", err)
return err return err
} }
for _, pendingClose := range pendingCloseChans { for _, pendingClose := range pendingCloseChans {
// If this channel was force closed, and we have a non-zero // If this channel was force closed, and we have a non-zero
// time-locked balance, then the utxoNursery is currently // time-locked balance, then the utxoNursery is currently
@ -319,7 +290,7 @@ func (b *breachArbiter) watchForPendingCloseConfs(currentHeight int32) error {
closeTXID := pendingClose.ClosingTXID closeTXID := pendingClose.ClosingTXID
confNtfn, err := b.cfg.Notifier.RegisterConfirmationsNtfn( confNtfn, err := b.cfg.Notifier.RegisterConfirmationsNtfn(
&closeTXID, 1, uint32(currentHeight)) &closeTXID, 1, pendingClose.CloseHeight)
if err != nil { if err != nil {
return err return err
} }
@ -376,6 +347,12 @@ func (b *breachArbiter) Stop() error {
return nil return nil
} }
// IsBreached queries the breach arbiter's retribution store to see if it is
// aware of any channel breaches for a particular channel point.
func (b *breachArbiter) IsBreached(chanPoint *wire.OutPoint) (bool, error) {
return b.cfg.Store.IsBreached(chanPoint)
}
// contractObserver is the primary goroutine for the breachArbiter. This // contractObserver is the primary goroutine for the breachArbiter. This
// goroutine is responsible for managing goroutines that watch for breaches for // goroutine is responsible for managing goroutines that watch for breaches for
// all current active and newly created channels. If a channel breach is // all current active and newly created channels. If a channel breach is
@ -389,13 +366,16 @@ func (b *breachArbiter) contractObserver(
defer b.wg.Done() defer b.wg.Done()
brarLog.Infof("Starting contract observer with %v active channels",
len(activeChannels))
// For each active channel found within the database, we launch a // For each active channel found within the database, we launch a
// detected breachObserver goroutine for that channel and also track // detected breachObserver goroutine for that channel and also track
// the new goroutine within the breachObservers map so we can cancel it // the new goroutine within the breachObservers map so we can cancel it
// later if necessary. // later if necessary.
for _, channel := range activeChannels { for _, channel := range activeChannels {
settleSignal := make(chan struct{}) settleSignal := make(chan struct{})
chanPoint := channel.ChannelPoint() chanPoint := channel.ChanPoint
b.breachObservers[*chanPoint] = settleSignal b.breachObservers[*chanPoint] = settleSignal
b.wg.Add(1) b.wg.Add(1)
@ -409,12 +389,6 @@ out:
for { for {
select { select {
case breachInfo := <-b.breachedContracts: case breachInfo := <-b.breachedContracts:
_, currentHeight, err := b.cfg.ChainIO.GetBestBlock()
if err != nil {
brarLog.Errorf("unable to get best height: %v",
err)
}
// A new channel contract has just been breached! We // A new channel contract has just been breached! We
// first register for a notification to be dispatched // first register for a notification to be dispatched
// once the breach transaction (the revoked commitment // once the breach transaction (the revoked commitment
@ -422,7 +396,7 @@ out:
// ensure we're not dealing with a moving target. // ensure we're not dealing with a moving target.
breachTXID := &breachInfo.commitHash breachTXID := &breachInfo.commitHash
cfChan, err := b.cfg.Notifier.RegisterConfirmationsNtfn( cfChan, err := b.cfg.Notifier.RegisterConfirmationsNtfn(
breachTXID, 1, uint32(currentHeight)) breachTXID, 1, breachInfo.breachHeight)
if err != nil { if err != nil {
brarLog.Errorf("unable to register for conf "+ brarLog.Errorf("unable to register for conf "+
"updates for txid: %v, err: %v", "updates for txid: %v, err: %v",
@ -449,7 +423,7 @@ out:
// daemon, so we launch a new breachObserver to handle // daemon, so we launch a new breachObserver to handle
// the detection of attempted contract breaches. // the detection of attempted contract breaches.
settleSignal := make(chan struct{}) settleSignal := make(chan struct{})
chanPoint := contract.ChannelPoint() chanPoint := contract.ChanPoint
// If the contract is already being watched, then an // If the contract is already being watched, then an
// additional send indicates we have a stale version of // additional send indicates we have a stale version of
@ -516,14 +490,17 @@ func (b *breachArbiter) exactRetribution(
// TODO(roasbeef): state needs to be checkpointed here // TODO(roasbeef): state needs to be checkpointed here
var breachConfHeight uint32
select { select {
case _, ok := <-confChan.Confirmed: case breachConf, ok := <-confChan.Confirmed:
// If the second value is !ok, then the channel has been closed // If the second value is !ok, then the channel has been closed
// signifying a daemon shutdown, so we exit. // signifying a daemon shutdown, so we exit.
if !ok { if !ok {
return return
} }
breachConfHeight = breachConf.BlockHeight
// Otherwise, if this is a real confirmation notification, then // Otherwise, if this is a real confirmation notification, then
// we fall through to complete our duty. // we fall through to complete our duty.
case <-b.quit: case <-b.quit:
@ -533,40 +510,55 @@ func (b *breachArbiter) exactRetribution(
brarLog.Debugf("Breach transaction %v has been confirmed, sweeping "+ brarLog.Debugf("Breach transaction %v has been confirmed, sweeping "+
"revoked funds", breachInfo.commitHash) "revoked funds", breachInfo.commitHash)
// With the breach transaction confirmed, we now create the justice tx finalTx, err := b.cfg.Store.GetFinalizedTxn(&breachInfo.chanPoint)
// which will claim ALL the funds within the channel.
justiceTx, err := b.createJusticeTx(breachInfo)
if err != nil { if err != nil {
brarLog.Errorf("unable to create justice tx: %v", err) brarLog.Errorf("unable to get finalized txn for"+
"chanid=%v: %v", &breachInfo.chanPoint, err)
return return
} }
// If this retribution has not been finalized before, we will first
// construct a sweep transaction and write it to disk. This will allow
// the breach arbiter to re-register for notifications for the justice
// txid.
if finalTx == nil {
// With the breach transaction confirmed, we now create the
// justice tx which will claim ALL the funds within the channel.
finalTx, err = b.createJusticeTx(breachInfo)
if err != nil {
brarLog.Errorf("unable to create justice tx: %v", err)
return
}
// Persist our finalized justice transaction before making an
// attempt to broadcast.
err := b.cfg.Store.Finalize(&breachInfo.chanPoint, finalTx)
if err != nil {
brarLog.Errorf("unable to finalize justice tx for "+
"chanid=%v: %v", &breachInfo.chanPoint, err)
return
}
}
brarLog.Debugf("Broadcasting justice tx: %v", brarLog.Debugf("Broadcasting justice tx: %v",
newLogClosure(func() string { newLogClosure(func() string {
return spew.Sdump(justiceTx) return spew.Sdump(finalTx)
})) }))
_, currentHeight, err := b.cfg.ChainIO.GetBestBlock()
if err != nil {
brarLog.Errorf("unable to get current height: %v", err)
return
}
// Finally, broadcast the transaction, finalizing the channels' // Finally, broadcast the transaction, finalizing the channels'
// retribution against the cheating counterparty. // retribution against the cheating counterparty.
if err := b.cfg.PublishTransaction(justiceTx); err != nil { if err := b.cfg.PublishTransaction(finalTx); err != nil {
brarLog.Errorf("unable to broadcast "+ brarLog.Errorf("unable to broadcast "+
"justice tx: %v", err) "justice tx: %v", err)
return
} }
// As a conclusionary step, we register for a notification to be // As a conclusionary step, we register for a notification to be
// dispatched once the justice tx is confirmed. After confirmation we // dispatched once the justice tx is confirmed. After confirmation we
// notify the caller that initiated the retribution workflow that the // notify the caller that initiated the retribution workflow that the
// deed has been done. // deed has been done.
justiceTXID := justiceTx.TxHash() justiceTXID := finalTx.TxHash()
confChan, err = b.cfg.Notifier.RegisterConfirmationsNtfn( confChan, err = b.cfg.Notifier.RegisterConfirmationsNtfn(
&justiceTXID, 1, uint32(currentHeight)) &justiceTXID, 1, breachConfHeight)
if err != nil { if err != nil {
brarLog.Errorf("unable to register for conf for txid: %v", brarLog.Errorf("unable to register for conf for txid: %v",
justiceTXID) justiceTXID)
@ -607,6 +599,7 @@ func (b *breachArbiter) exactRetribution(
err := b.cfg.DB.MarkChanFullyClosed(&breachInfo.chanPoint) err := b.cfg.DB.MarkChanFullyClosed(&breachInfo.chanPoint)
if err != nil { if err != nil {
brarLog.Errorf("unable to mark chan as closed: %v", err) brarLog.Errorf("unable to mark chan as closed: %v", err)
return
} }
// Justice has been carried out; we can safely delete the // Justice has been carried out; we can safely delete the
@ -640,7 +633,7 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel,
defer b.wg.Done() defer b.wg.Done()
chanPoint := contract.ChannelPoint() chanPoint := contract.ChanPoint
brarLog.Debugf("Breach observer for ChannelPoint(%v) started ", brarLog.Debugf("Breach observer for ChannelPoint(%v) started ",
chanPoint) chanPoint)
@ -746,57 +739,39 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel,
// mid-local initiated state-transition, possible // mid-local initiated state-transition, possible
// false-positive? // false-positive?
// Obtain a snapshot of the final channel state, which can be
// used to reclose a breached channel in the event of a failure.
chanInfo := contract.StateSnapshot()
// Using the breach information provided by the wallet and the // Using the breach information provided by the wallet and the
// channel snapshot, construct the retribution information that // channel snapshot, construct the retribution information that
// will be persisted to disk. // will be persisted to disk.
retInfo := newRetributionInfo(chanPoint, breachInfo, chanInfo) retInfo := newRetributionInfo(chanPoint, breachInfo)
// Persist the pending retribution state to disk. // Persist the pending retribution state to disk.
if err := b.cfg.Store.Add(retInfo); err != nil { err := b.cfg.Store.Add(retInfo)
brarLog.Errorf("unable to persist retribution info "+ if err != nil {
"to db: %v", err) brarLog.Errorf("unable to persist retribution "+
"info to db: %v", err)
} }
// TODO(conner): move responsibility of channel closure into // Now that the breach has been persisted, try to send an
// lnwallet. Have breach arbiter ACK after writing to disk, then // acknowledgment back to the close observer with the error. If
// have wallet mark channel as closed. This allows the wallet to // the ack is successful, the close observer will mark the
// attempt to retransmit the breach info if the either arbiter // channel as pending-closed in the channeldb.
// or the wallet goes down before completing the hand off. select {
case breachInfo.Err <- err:
// Bail if we failed to persist retribution info.
if err != nil {
return
}
// Now that the breach arbiter has persisted the information, case <-contract.ObserverQuit():
// we can go ahead and mark the channel as closed in the // If the close observer has already exited, it will
// channeldb. This step is done after persisting the // never read the acknowledgment, so we exit.
// retribution information so that a failure between these steps return
// will cause an attempt to monitor the still-open channel.
// However, since the retribution information was persisted
// before, the arbiter will recognize that the channel should be
// closed, and proceed to mark it as such after a restart, and
// forgo monitoring it for breaches.
// Construct the breached channel's close summary marking the case <-b.quit:
// channel using the snapshot from before, and marking this as a // Cancel the close observer if the breach arbiter is
// BreachClose. // shutting down, dropping the acknowledgment.
closeInfo := &channeldb.ChannelCloseSummary{ contract.CancelObserver()
ChanPoint: *chanPoint, return
ChainHash: breachInfo.ChainHash,
ClosingTXID: breachInfo.BreachTransaction.TxHash(),
RemotePub: &chanInfo.RemoteIdentity,
Capacity: chanInfo.Capacity,
SettledBalance: chanInfo.LocalBalance.ToSatoshis(),
CloseType: channeldb.BreachClose,
IsPending: true,
}
// Next, persist the channel close to disk. Upon restart, the
// arbiter will recognize that this channel has been breached
// and marked close, and fast track its path to justice.
if err := contract.DeleteState(closeInfo); err != nil {
brarLog.Errorf("unable to delete channel state: %v",
err)
} }
// Finally, we send the retribution information into the // Finally, we send the retribution information into the
@ -807,6 +782,8 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel,
} }
case <-b.quit: case <-b.quit:
contract.Stop()
contract.CancelObserver()
return return
} }
} }
@ -922,20 +899,10 @@ var _ SpendableOutput = (*breachedOutput)(nil)
// spends all outputs of the commitment transaction into an output controlled // spends all outputs of the commitment transaction into an output controlled
// by the wallet. // by the wallet.
type retributionInfo struct { type retributionInfo struct {
commitHash chainhash.Hash commitHash chainhash.Hash
chanPoint wire.OutPoint chanPoint wire.OutPoint
chainHash chainhash.Hash chainHash chainhash.Hash
breachHeight uint32
// TODO(conner): remove the following group of fields after decoupling
// the breach arbiter from the wallet.
// Fields copied from channel snapshot when a breach is detected. This
// is necessary for deterministically constructing the channel close
// summary in the event that the breach arbiter crashes before closing
// the channel.
remoteIdentity *btcec.PublicKey
capacity btcutil.Amount
settledBalance btcutil.Amount
breachedOutputs []breachedOutput breachedOutputs []breachedOutput
} }
@ -945,8 +912,7 @@ type retributionInfo struct {
// channels. The information is primarily populated using the BreachRetribution // channels. The information is primarily populated using the BreachRetribution
// delivered by the wallet when it detects a channel breach. // delivered by the wallet when it detects a channel breach.
func newRetributionInfo(chanPoint *wire.OutPoint, func newRetributionInfo(chanPoint *wire.OutPoint,
breachInfo *lnwallet.BreachRetribution, breachInfo *lnwallet.BreachRetribution) *retributionInfo {
chanInfo *channeldb.ChannelSnapshot) *retributionInfo {
// Determine the number of second layer HTLCs we will attempt to sweep. // Determine the number of second layer HTLCs we will attempt to sweep.
nHtlcs := len(breachInfo.HtlcRetributions) nHtlcs := len(breachInfo.HtlcRetributions)
@ -1009,16 +975,10 @@ func newRetributionInfo(chanPoint *wire.OutPoint,
breachedOutputs = append(breachedOutputs, htlcOutput) breachedOutputs = append(breachedOutputs, htlcOutput)
} }
// TODO(conner): remove dependency on channel snapshot after decoupling
// channel closure from the breach arbiter.
return &retributionInfo{ return &retributionInfo{
commitHash: breachInfo.BreachTransaction.TxHash(), commitHash: breachInfo.BreachTransaction.TxHash(),
chainHash: chanInfo.ChainHash, chainHash: breachInfo.ChainHash,
chanPoint: *chanPoint, chanPoint: *chanPoint,
remoteIdentity: &chanInfo.RemoteIdentity,
capacity: chanInfo.Capacity,
settledBalance: chanInfo.LocalBalance.ToSatoshis(),
breachedOutputs: breachedOutputs, breachedOutputs: breachedOutputs,
} }
} }
@ -1226,6 +1186,19 @@ type RetributionStore interface {
// the addition fails. // the addition fails.
Add(retInfo *retributionInfo) error Add(retInfo *retributionInfo) error
// IsBreached queries the retribution store to see if the breach arbiter
// is aware of any breaches for the provided channel point.
IsBreached(chanPoint *wire.OutPoint) (bool, error)
// Finalize persists the finalized justice transaction for a particular
// channel.
Finalize(chanPoint *wire.OutPoint, finalTx *wire.MsgTx) error
// GetFinalizedTxn loads the finalized justice transaction, if any, from
// the retribution store. The finalized transaction will be nil if
// Finalize has not yet been called for this channel point.
GetFinalizedTxn(chanPoint *wire.OutPoint) (*wire.MsgTx, error)
// Remove deletes the retributionInfo from disk, if any exists, under // Remove deletes the retributionInfo from disk, if any exists, under
// the given key. An error should be re raised if the removal fails. // the given key. An error should be re raised if the removal fails.
Remove(key *wire.OutPoint) error Remove(key *wire.OutPoint) error
@ -1276,8 +1249,97 @@ func (rs *retributionStore) Add(ret *retributionInfo) error {
}) })
} }
// Remove removes a retribution state from the retributionStore database. // Finalize writes a signed justice transaction to the retribution store. This
func (rs *retributionStore) Remove(key *wire.OutPoint) error { // is done before publishing the transaction, so that we can recover the txid on
// startup and re-register for confirmation notifications.
func (rs *retributionStore) Finalize(chanPoint *wire.OutPoint,
finalTx *wire.MsgTx) error {
return rs.db.Update(func(tx *bolt.Tx) error {
justiceBkt, err := tx.CreateBucketIfNotExists(justiceTxnBucket)
if err != nil {
return err
}
var chanBuf bytes.Buffer
if err := writeOutpoint(&chanBuf, chanPoint); err != nil {
return err
}
var txBuf bytes.Buffer
if err := finalTx.Serialize(&txBuf); err != nil {
return err
}
return justiceBkt.Put(chanBuf.Bytes(), txBuf.Bytes())
})
}
// GetFinalizedTxn loads the finalized justice transaction for the provided
// channel point. The finalized transaction will be nil if Finalize has yet to
// be called for this channel point.
func (rs *retributionStore) GetFinalizedTxn(
chanPoint *wire.OutPoint) (*wire.MsgTx, error) {
var finalTxBytes []byte
if err := rs.db.View(func(tx *bolt.Tx) error {
justiceBkt := tx.Bucket(justiceTxnBucket)
if justiceBkt == nil {
return nil
}
var chanBuf bytes.Buffer
if err := writeOutpoint(&chanBuf, chanPoint); err != nil {
return err
}
finalTxBytes = justiceBkt.Get(chanBuf.Bytes())
return nil
}); err != nil {
return nil, err
}
if finalTxBytes == nil {
return nil, nil
}
finalTx := &wire.MsgTx{}
err := finalTx.Deserialize(bytes.NewReader(finalTxBytes))
return finalTx, err
}
// IsBreached queries the retribution store to discern if this channel was
// previously breached. This is used when connecting to a peer to determine if
// it is safe to add a link to the htlcswitch, as we should never add a channel
// that has already been breached.
func (rs *retributionStore) IsBreached(chanPoint *wire.OutPoint) (bool, error) {
var found bool
err := rs.db.View(func(tx *bolt.Tx) error {
retBucket := tx.Bucket(retributionBucket)
if retBucket == nil {
return nil
}
var chanBuf bytes.Buffer
if err := writeOutpoint(&chanBuf, chanPoint); err != nil {
return err
}
retInfo := retBucket.Get(chanBuf.Bytes())
if retInfo != nil {
found = true
}
return nil
})
return found, err
}
// Remove removes a retribution state and finalized justice transaction by
// channel point from the retribution store.
func (rs *retributionStore) Remove(chanPoint *wire.OutPoint) error {
return rs.db.Update(func(tx *bolt.Tx) error { return rs.db.Update(func(tx *bolt.Tx) error {
retBucket := tx.Bucket(retributionBucket) retBucket := tx.Bucket(retributionBucket)
@ -1287,15 +1349,30 @@ func (rs *retributionStore) Remove(key *wire.OutPoint) error {
// stored in the db. // stored in the db.
if retBucket == nil { if retBucket == nil {
return errors.New("unable to remove retribution " + return errors.New("unable to remove retribution " +
"because the db bucket doesn't exist.") "because the retribution bucket doesn't exist.")
} }
var outBuf bytes.Buffer // Serialize the channel point we are intending to remove.
if err := writeOutpoint(&outBuf, key); err != nil { var chanBuf bytes.Buffer
if err := writeOutpoint(&chanBuf, chanPoint); err != nil {
return err
}
chanBytes := chanBuf.Bytes()
// Remove the persisted retribution info and finalized justice
// transaction.
if err := retBucket.Delete(chanBytes); err != nil {
return err return err
} }
return retBucket.Delete(outBuf.Bytes()) // If we have not finalized this channel breach, we can exit
// early.
justiceBkt := tx.Bucket(justiceTxnBucket)
if justiceBkt == nil {
return nil
}
return justiceBkt.Delete(chanBytes)
}) })
} }
@ -1313,11 +1390,10 @@ func (rs *retributionStore) ForAll(cb func(*retributionInfo) error) error {
// Otherwise, we fetch each serialized retribution info, // Otherwise, we fetch each serialized retribution info,
// deserialize it, and execute the passed in callback function // deserialize it, and execute the passed in callback function
// on it. // on it.
return retBucket.ForEach(func(outBytes, retBytes []byte) error { return retBucket.ForEach(func(_, retBytes []byte) error {
ret := &retributionInfo{} ret := &retributionInfo{}
if err := ret.Decode( err := ret.Decode(bytes.NewBuffer(retBytes))
bytes.NewBuffer(retBytes), if err != nil {
); err != nil {
return err return err
} }
@ -1328,7 +1404,7 @@ func (rs *retributionStore) ForAll(cb func(*retributionInfo) error) error {
// Encode serializes the retribution into the passed byte stream. // Encode serializes the retribution into the passed byte stream.
func (ret *retributionInfo) Encode(w io.Writer) error { func (ret *retributionInfo) Encode(w io.Writer) error {
var scratch [8]byte var scratch [4]byte
if _, err := w.Write(ret.commitHash[:]); err != nil { if _, err := w.Write(ret.commitHash[:]); err != nil {
return err return err
@ -1342,18 +1418,8 @@ func (ret *retributionInfo) Encode(w io.Writer) error {
return err return err
} }
if _, err := w.Write( binary.BigEndian.PutUint32(scratch[:], ret.breachHeight)
ret.remoteIdentity.SerializeCompressed()); err != nil { if _, err := w.Write(scratch[:]); err != nil {
return err
}
binary.BigEndian.PutUint64(scratch[:8], uint64(ret.capacity))
if _, err := w.Write(scratch[:8]); err != nil {
return err
}
binary.BigEndian.PutUint64(scratch[:8], uint64(ret.settledBalance))
if _, err := w.Write(scratch[:8]); err != nil {
return err return err
} }
@ -1373,12 +1439,12 @@ func (ret *retributionInfo) Encode(w io.Writer) error {
// Dencode deserializes a retribution from the passed byte stream. // Dencode deserializes a retribution from the passed byte stream.
func (ret *retributionInfo) Decode(r io.Reader) error { func (ret *retributionInfo) Decode(r io.Reader) error {
var scratch [33]byte var scratch [32]byte
if _, err := io.ReadFull(r, scratch[:32]); err != nil { if _, err := io.ReadFull(r, scratch[:]); err != nil {
return err return err
} }
hash, err := chainhash.NewHash(scratch[:32]) hash, err := chainhash.NewHash(scratch[:])
if err != nil { if err != nil {
return err return err
} }
@ -1388,34 +1454,19 @@ func (ret *retributionInfo) Decode(r io.Reader) error {
return err return err
} }
if _, err := io.ReadFull(r, scratch[:32]); err != nil { if _, err := io.ReadFull(r, scratch[:]); err != nil {
return err return err
} }
chainHash, err := chainhash.NewHash(scratch[:32]) chainHash, err := chainhash.NewHash(scratch[:])
if err != nil { if err != nil {
return err return err
} }
ret.chainHash = *chainHash ret.chainHash = *chainHash
if _, err = io.ReadFull(r, scratch[:33]); err != nil { if _, err := io.ReadFull(r, scratch[:4]); err != nil {
return err return err
} }
remoteIdentity, err := btcec.ParsePubKey(scratch[:33], btcec.S256()) ret.breachHeight = binary.BigEndian.Uint32(scratch[:4])
if err != nil {
return err
}
ret.remoteIdentity = remoteIdentity
if _, err := io.ReadFull(r, scratch[:8]); err != nil {
return err
}
ret.capacity = btcutil.Amount(binary.BigEndian.Uint64(scratch[:8]))
if _, err := io.ReadFull(r, scratch[:8]); err != nil {
return err
}
ret.settledBalance = btcutil.Amount(
binary.BigEndian.Uint64(scratch[:8]))
nOutputsU64, err := wire.ReadVarInt(r, 0) nOutputsU64, err := wire.ReadVarInt(r, 0)
if err != nil { if err != nil {