Merge remote-tracking branch 'origin/pr/445'

This commit is contained in:
Olaoluwa Osuntokun 2018-01-06 17:22:30 -08:00
commit 882b1313ba
12 changed files with 1489 additions and 364 deletions

View File

@ -15,33 +15,34 @@ import (
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/roasbeef/btcd/blockchain"
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/chaincfg/chainhash"
"github.com/roasbeef/btcd/txscript"
"github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil"
)
// retributionBucket stores retribution state on disk between detecting a
// contract breach, broadcasting a justice transaction that sweeps the channel,
// and finally witnessing the justice transaction confirm on the blockchain. It
// is critical that such state is persisted on disk, so that if our node
// restarts at any point during the retribution procedure, we can recover and
// continue from the persisted state.
var retributionBucket = []byte("retribution")
var (
// retributionBucket stores retribution state on disk between detecting
// a contract breach, broadcasting a justice transaction that sweeps the
// channel, and finally witnessing the justice transaction confirm on
// the blockchain. It is critical that such state is persisted on disk,
// so that if our node restarts at any point during the retribution
// procedure, we can recover and continue from the persisted state.
retributionBucket = []byte("retribution")
// justiceTxnBucket holds the finalized justice transactions for all
// breached contracts. Entries are added to the justice txn bucket just
// before broadcasting the sweep txn.
justiceTxnBucket = []byte("justice-txn")
)
// BreachConfig bundles the required subsystems used by the breach arbiter. An
// instance of BreachConfig is passed to newBreachArbiter during instantiation.
type BreachConfig struct {
// ChainIO is used by the breach arbiter to determine the current height
// of the blockchain, which is required to subscribe for spend
// notifications from Notifier.
ChainIO lnwallet.BlockChainIO
// CloseLink allows the breach arbiter to shutdown any channel links for
// which it detects a breach, ensuring now further activity will
// continue across the link. The method accepts link's channel point and a
// close type to be included in the channel close summary.
// continue across the link. The method accepts link's channel point and
// a close type to be included in the channel close summary.
CloseLink func(*wire.OutPoint, htlcswitch.ChannelCloseType)
// DB provides access to the user's channels, allowing the breach
@ -85,6 +86,9 @@ type BreachConfig struct {
// counterparties.
// TODO(roasbeef): closures in config for subsystem pointers to decouple?
type breachArbiter struct {
started uint32
stopped uint32
cfg *BreachConfig
// breachObservers is a map which tracks all the active breach
@ -112,10 +116,8 @@ type breachArbiter struct {
// breach closes.
settledContracts chan *wire.OutPoint
started uint32
stopped uint32
quit chan struct{}
wg sync.WaitGroup
quit chan struct{}
wg sync.WaitGroup
}
// newBreachArbiter creates a new instance of a breachArbiter initialized with
@ -141,40 +143,52 @@ func (b *breachArbiter) Start() error {
brarLog.Tracef("Starting breach arbiter")
// We load all pending retributions from the database and
// deterministically reconstruct a channel close summary for each. In
// the event that a channel is still open after being breached, we can
// use the close summary to reinitiate a channel close so that the
// breach is reflected in channeldb.
// Load all retributions currently persisted in the retribution store.
breachRetInfos := make(map[wire.OutPoint]retributionInfo)
closeSummaries := make(map[wire.OutPoint]channeldb.ChannelCloseSummary)
err := b.cfg.Store.ForAll(func(ret *retributionInfo) error {
// Extract emitted retribution information.
if err := b.cfg.Store.ForAll(func(ret *retributionInfo) error {
breachRetInfos[ret.chanPoint] = *ret
// Deterministically reconstruct channel close summary from
// persisted retribution information and record in breach close
// summaries map under the corresponding channel point.
closeSummary := channeldb.ChannelCloseSummary{
ChanPoint: ret.chanPoint,
ClosingTXID: ret.commitHash,
RemotePub: ret.remoteIdentity,
Capacity: ret.capacity,
SettledBalance: ret.settledBalance,
CloseType: channeldb.BreachClose,
IsPending: true,
}
closeSummaries[ret.chanPoint] = closeSummary
return nil
})
if err != nil {
}); err != nil {
return err
}
// Load all currently closed channels from disk, we will use the
// channels that have been marked fully closed to filter the retribution
// information loaded from disk. This is necessary in the event that the
// channel was marked fully closed, but was not removed from the
// retribution store.
closedChans, err := b.cfg.DB.FetchClosedChannels(false)
if err != nil {
brarLog.Errorf("unable to fetch closing channels: %v", err)
return err
}
// Using the set of non-pending, closed channels, reconcile any
// discrepancies between the channeldb and the retribution store by
// removing any retribution information for which we have already
// finished our responsibilities. If the removal is successful, we also
// remove the entry from our in-memory map, to avoid any further action
// for this channel.
for _, chanSummary := range closedChans {
if chanSummary.IsPending {
continue
}
chanPoint := &chanSummary.ChanPoint
if _, ok := breachRetInfos[*chanPoint]; ok {
if err := b.cfg.Store.Remove(chanPoint); err != nil {
brarLog.Errorf("unable to remove closed "+
"chanid=%v from breach arbiter: %v",
chanPoint, err)
return err
}
delete(breachRetInfos, *chanPoint)
}
}
// We need to query that database state for all currently active
// channels, each of these channels will need a goroutine assigned to
// it to watch for channel breaches.
// channels, these channels will represent a super set of all channels
// that may be assigned a go routine to monitor for channel breaches.
activeChannels, err := b.cfg.DB.FetchAllChannels()
if err != nil && err != channeldb.ErrNoActiveChannels {
brarLog.Errorf("unable to fetch active channels: %v", err)
@ -188,29 +202,20 @@ func (b *breachArbiter) Start() error {
}
// Here we will determine a set of channels that will need to be managed
// by the contractObserver. For each of the open channels read from
// disk, we will create a channel state machine that can be used to
// watch for any potential channel closures. We must first exclude any
// channel whose retribution process has been initiated, and proceed to
// mark them as closed. The state machines generated for these filtered
// channels can be discarded, as their fate will be placed in the hands
// of an exactRetribution task spawned later.
//
// NOTE: Spawning of the exactRetribution task is intentionally
// postponed until after this step in order to ensure that the all
// breached channels are reflected as closed in channeldb and consistent
// with what is checkpointed by the breach arbiter. Instead of treating
// the breached-and-closed and breached-but-still-active channels as
// separate sets of channels, we first ensure that all
// breached-but-still-active channels are promoted to
// breached-and-closed during restart, allowing us to treat them as a
// single set from here on out. This approach also has the added benefit
// of minimizing the likelihood that the wrong number of tasks are
// spawned per breached channel, and prevents us from being in a
// position where retribution has completed but the channel is still
// marked as open in channeldb.
// by the contractObserver. This should comprise all active channels
// that have not been breached. If the channel point has an entry in the
// retribution store, we skip it to avoid creating a breach observer.
// Resolving breached channels will be handled later by spawning an
// exactRetribution task for each.
channelsToWatch := make([]*lnwallet.LightningChannel, 0, nActive)
for _, chanState := range activeChannels {
// If this channel was previously breached, we skip it here to
// avoid creating a breach observer, as we can go straight to
// the task of exacting retribution.
if _, ok := breachRetInfos[chanState.FundingOutpoint]; ok {
continue
}
// Initialize active channel from persisted channel state.
channel, err := lnwallet.NewLightningChannel(nil,
b.cfg.Notifier, b.cfg.Estimator, chanState)
@ -220,62 +225,28 @@ func (b *breachArbiter) Start() error {
return err
}
// Before marking this as an active channel that the breach
// arbiter should watch, check to see if this channel was
// previously breached. If so, we attempt to reflect this in the
// channeldb by closing the channel. Upon success, we continue
// because the channel is no longer open, and thus does not need
// to be managed by the contractObserver.
chanPoint := chanState.FundingOutpoint
if closeSummary, ok := closeSummaries[chanPoint]; ok {
// Since this channel should not be open, we
// immediately notify the HTLC switch that this link
// should be closed, and that all activity on the link
// should cease.
b.cfg.CloseLink(&chanState.FundingOutpoint,
htlcswitch.CloseBreach)
// Ensure channeldb is consistent with the persisted
// breach.
err := channel.DeleteState(&closeSummary)
if err != nil {
brarLog.Errorf("unable to delete channel "+
"state: %v", err)
return err
}
// Now that this channel is both breached _and_ closed,
// we can skip adding it to the `channelsToWatch` since
// we can begin the retribution process immediately.
continue
}
// Finally, add this channel to breach arbiter's list of
// channels to watch.
channelsToWatch = append(channelsToWatch, channel)
}
// TODO(roasbeef): instead use closure height of channel
_, currentHeight, err := b.cfg.ChainIO.GetBestBlock()
if err != nil {
return err
}
// Additionally, we'll also want to watch any pending close or force
// close transactions so we can properly mark them as resolved in the
// database.
if err := b.watchForPendingCloseConfs(currentHeight); err != nil {
if err := b.watchForPendingCloseConfs(); err != nil {
return err
}
// Spawn the exactRetribution tasks to monitor and resolve any breaches
// that were loaded from the retribution store.
for chanPoint, closeSummary := range closeSummaries {
for chanPoint := range breachRetInfos {
retInfo := breachRetInfos[chanPoint]
// Register for a notification when the breach transaction is
// confirmed on chain.
breachTXID := closeSummary.ClosingTXID
breachTXID := retInfo.commitHash
confChan, err := b.cfg.Notifier.RegisterConfirmationsNtfn(
&breachTXID, 1, uint32(currentHeight))
&breachTXID, 1, retInfo.breachHeight)
if err != nil {
brarLog.Errorf("unable to register for conf updates "+
"for txid: %v, err: %v", breachTXID, err)
@ -284,7 +255,6 @@ func (b *breachArbiter) Start() error {
// Launch a new goroutine which to finalize the channel
// retribution after the breach transaction confirms.
retInfo := breachRetInfos[chanPoint]
b.wg.Add(1)
go b.exactRetribution(confChan, &retInfo)
}
@ -298,12 +268,13 @@ func (b *breachArbiter) Start() error {
// watchForPendingCloseConfs dispatches confirmation notification subscribers
// that mark any pending channels as fully closed when signaled.
func (b *breachArbiter) watchForPendingCloseConfs(currentHeight int32) error {
func (b *breachArbiter) watchForPendingCloseConfs() error {
pendingCloseChans, err := b.cfg.DB.FetchClosedChannels(true)
if err != nil {
brarLog.Errorf("unable to fetch closing channels: %v", err)
return err
}
for _, pendingClose := range pendingCloseChans {
// If this channel was force closed, and we have a non-zero
// time-locked balance, then the utxoNursery is currently
@ -319,7 +290,7 @@ func (b *breachArbiter) watchForPendingCloseConfs(currentHeight int32) error {
closeTXID := pendingClose.ClosingTXID
confNtfn, err := b.cfg.Notifier.RegisterConfirmationsNtfn(
&closeTXID, 1, uint32(currentHeight))
&closeTXID, 1, pendingClose.CloseHeight)
if err != nil {
return err
}
@ -376,6 +347,12 @@ func (b *breachArbiter) Stop() error {
return nil
}
// IsBreached queries the breach arbiter's retribution store to see if it is
// aware of any channel breaches for a particular channel point.
func (b *breachArbiter) IsBreached(chanPoint *wire.OutPoint) (bool, error) {
return b.cfg.Store.IsBreached(chanPoint)
}
// contractObserver is the primary goroutine for the breachArbiter. This
// goroutine is responsible for managing goroutines that watch for breaches for
// all current active and newly created channels. If a channel breach is
@ -389,13 +366,16 @@ func (b *breachArbiter) contractObserver(
defer b.wg.Done()
brarLog.Infof("Starting contract observer with %v active channels",
len(activeChannels))
// For each active channel found within the database, we launch a
// detected breachObserver goroutine for that channel and also track
// the new goroutine within the breachObservers map so we can cancel it
// later if necessary.
for _, channel := range activeChannels {
settleSignal := make(chan struct{})
chanPoint := channel.ChannelPoint()
chanPoint := channel.ChanPoint
b.breachObservers[*chanPoint] = settleSignal
b.wg.Add(1)
@ -409,12 +389,6 @@ out:
for {
select {
case breachInfo := <-b.breachedContracts:
_, currentHeight, err := b.cfg.ChainIO.GetBestBlock()
if err != nil {
brarLog.Errorf("unable to get best height: %v",
err)
}
// A new channel contract has just been breached! We
// first register for a notification to be dispatched
// once the breach transaction (the revoked commitment
@ -422,7 +396,7 @@ out:
// ensure we're not dealing with a moving target.
breachTXID := &breachInfo.commitHash
cfChan, err := b.cfg.Notifier.RegisterConfirmationsNtfn(
breachTXID, 1, uint32(currentHeight))
breachTXID, 1, breachInfo.breachHeight)
if err != nil {
brarLog.Errorf("unable to register for conf "+
"updates for txid: %v, err: %v",
@ -449,7 +423,7 @@ out:
// daemon, so we launch a new breachObserver to handle
// the detection of attempted contract breaches.
settleSignal := make(chan struct{})
chanPoint := contract.ChannelPoint()
chanPoint := contract.ChanPoint
// If the contract is already being watched, then an
// additional send indicates we have a stale version of
@ -516,14 +490,17 @@ func (b *breachArbiter) exactRetribution(
// TODO(roasbeef): state needs to be checkpointed here
var breachConfHeight uint32
select {
case _, ok := <-confChan.Confirmed:
case breachConf, ok := <-confChan.Confirmed:
// If the second value is !ok, then the channel has been closed
// signifying a daemon shutdown, so we exit.
if !ok {
return
}
breachConfHeight = breachConf.BlockHeight
// Otherwise, if this is a real confirmation notification, then
// we fall through to complete our duty.
case <-b.quit:
@ -533,40 +510,55 @@ func (b *breachArbiter) exactRetribution(
brarLog.Debugf("Breach transaction %v has been confirmed, sweeping "+
"revoked funds", breachInfo.commitHash)
// With the breach transaction confirmed, we now create the justice tx
// which will claim ALL the funds within the channel.
justiceTx, err := b.createJusticeTx(breachInfo)
finalTx, err := b.cfg.Store.GetFinalizedTxn(&breachInfo.chanPoint)
if err != nil {
brarLog.Errorf("unable to create justice tx: %v", err)
brarLog.Errorf("unable to get finalized txn for"+
"chanid=%v: %v", &breachInfo.chanPoint, err)
return
}
// If this retribution has not been finalized before, we will first
// construct a sweep transaction and write it to disk. This will allow
// the breach arbiter to re-register for notifications for the justice
// txid.
if finalTx == nil {
// With the breach transaction confirmed, we now create the
// justice tx which will claim ALL the funds within the channel.
finalTx, err = b.createJusticeTx(breachInfo)
if err != nil {
brarLog.Errorf("unable to create justice tx: %v", err)
return
}
// Persist our finalized justice transaction before making an
// attempt to broadcast.
err := b.cfg.Store.Finalize(&breachInfo.chanPoint, finalTx)
if err != nil {
brarLog.Errorf("unable to finalize justice tx for "+
"chanid=%v: %v", &breachInfo.chanPoint, err)
return
}
}
brarLog.Debugf("Broadcasting justice tx: %v",
newLogClosure(func() string {
return spew.Sdump(justiceTx)
return spew.Sdump(finalTx)
}))
_, currentHeight, err := b.cfg.ChainIO.GetBestBlock()
if err != nil {
brarLog.Errorf("unable to get current height: %v", err)
return
}
// Finally, broadcast the transaction, finalizing the channels'
// retribution against the cheating counterparty.
if err := b.cfg.PublishTransaction(justiceTx); err != nil {
if err := b.cfg.PublishTransaction(finalTx); err != nil {
brarLog.Errorf("unable to broadcast "+
"justice tx: %v", err)
return
}
// As a conclusionary step, we register for a notification to be
// dispatched once the justice tx is confirmed. After confirmation we
// notify the caller that initiated the retribution workflow that the
// deed has been done.
justiceTXID := justiceTx.TxHash()
justiceTXID := finalTx.TxHash()
confChan, err = b.cfg.Notifier.RegisterConfirmationsNtfn(
&justiceTXID, 1, uint32(currentHeight))
&justiceTXID, 1, breachConfHeight)
if err != nil {
brarLog.Errorf("unable to register for conf for txid: %v",
justiceTXID)
@ -607,6 +599,7 @@ func (b *breachArbiter) exactRetribution(
err := b.cfg.DB.MarkChanFullyClosed(&breachInfo.chanPoint)
if err != nil {
brarLog.Errorf("unable to mark chan as closed: %v", err)
return
}
// Justice has been carried out; we can safely delete the
@ -640,7 +633,7 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel,
defer b.wg.Done()
chanPoint := contract.ChannelPoint()
chanPoint := contract.ChanPoint
brarLog.Debugf("Breach observer for ChannelPoint(%v) started ",
chanPoint)
@ -746,57 +739,39 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel,
// mid-local initiated state-transition, possible
// false-positive?
// Obtain a snapshot of the final channel state, which can be
// used to reclose a breached channel in the event of a failure.
chanInfo := contract.StateSnapshot()
// Using the breach information provided by the wallet and the
// channel snapshot, construct the retribution information that
// will be persisted to disk.
retInfo := newRetributionInfo(chanPoint, breachInfo, chanInfo)
retInfo := newRetributionInfo(chanPoint, breachInfo)
// Persist the pending retribution state to disk.
if err := b.cfg.Store.Add(retInfo); err != nil {
brarLog.Errorf("unable to persist retribution info "+
"to db: %v", err)
err := b.cfg.Store.Add(retInfo)
if err != nil {
brarLog.Errorf("unable to persist retribution "+
"info to db: %v", err)
}
// TODO(conner): move responsibility of channel closure into
// lnwallet. Have breach arbiter ACK after writing to disk, then
// have wallet mark channel as closed. This allows the wallet to
// attempt to retransmit the breach info if the either arbiter
// or the wallet goes down before completing the hand off.
// Now that the breach has been persisted, try to send an
// acknowledgment back to the close observer with the error. If
// the ack is successful, the close observer will mark the
// channel as pending-closed in the channeldb.
select {
case breachInfo.Err <- err:
// Bail if we failed to persist retribution info.
if err != nil {
return
}
// Now that the breach arbiter has persisted the information,
// we can go ahead and mark the channel as closed in the
// channeldb. This step is done after persisting the
// retribution information so that a failure between these steps
// will cause an attempt to monitor the still-open channel.
// However, since the retribution information was persisted
// before, the arbiter will recognize that the channel should be
// closed, and proceed to mark it as such after a restart, and
// forgo monitoring it for breaches.
case <-contract.ObserverQuit():
// If the close observer has already exited, it will
// never read the acknowledgment, so we exit.
return
// Construct the breached channel's close summary marking the
// channel using the snapshot from before, and marking this as a
// BreachClose.
closeInfo := &channeldb.ChannelCloseSummary{
ChanPoint: *chanPoint,
ChainHash: breachInfo.ChainHash,
ClosingTXID: breachInfo.BreachTransaction.TxHash(),
RemotePub: &chanInfo.RemoteIdentity,
Capacity: chanInfo.Capacity,
SettledBalance: chanInfo.LocalBalance.ToSatoshis(),
CloseType: channeldb.BreachClose,
IsPending: true,
}
// Next, persist the channel close to disk. Upon restart, the
// arbiter will recognize that this channel has been breached
// and marked close, and fast track its path to justice.
if err := contract.DeleteState(closeInfo); err != nil {
brarLog.Errorf("unable to delete channel state: %v",
err)
case <-b.quit:
// Cancel the close observer if the breach arbiter is
// shutting down, dropping the acknowledgment.
contract.CancelObserver()
return
}
// Finally, we send the retribution information into the
@ -807,6 +782,8 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel,
}
case <-b.quit:
contract.Stop()
contract.CancelObserver()
return
}
}
@ -922,20 +899,10 @@ var _ SpendableOutput = (*breachedOutput)(nil)
// spends all outputs of the commitment transaction into an output controlled
// by the wallet.
type retributionInfo struct {
commitHash chainhash.Hash
chanPoint wire.OutPoint
chainHash chainhash.Hash
// TODO(conner): remove the following group of fields after decoupling
// the breach arbiter from the wallet.
// Fields copied from channel snapshot when a breach is detected. This
// is necessary for deterministically constructing the channel close
// summary in the event that the breach arbiter crashes before closing
// the channel.
remoteIdentity *btcec.PublicKey
capacity btcutil.Amount
settledBalance btcutil.Amount
commitHash chainhash.Hash
chanPoint wire.OutPoint
chainHash chainhash.Hash
breachHeight uint32
breachedOutputs []breachedOutput
}
@ -945,8 +912,7 @@ type retributionInfo struct {
// channels. The information is primarily populated using the BreachRetribution
// delivered by the wallet when it detects a channel breach.
func newRetributionInfo(chanPoint *wire.OutPoint,
breachInfo *lnwallet.BreachRetribution,
chanInfo *channeldb.ChannelSnapshot) *retributionInfo {
breachInfo *lnwallet.BreachRetribution) *retributionInfo {
// Determine the number of second layer HTLCs we will attempt to sweep.
nHtlcs := len(breachInfo.HtlcRetributions)
@ -1009,16 +975,10 @@ func newRetributionInfo(chanPoint *wire.OutPoint,
breachedOutputs = append(breachedOutputs, htlcOutput)
}
// TODO(conner): remove dependency on channel snapshot after decoupling
// channel closure from the breach arbiter.
return &retributionInfo{
commitHash: breachInfo.BreachTransaction.TxHash(),
chainHash: chanInfo.ChainHash,
chainHash: breachInfo.ChainHash,
chanPoint: *chanPoint,
remoteIdentity: &chanInfo.RemoteIdentity,
capacity: chanInfo.Capacity,
settledBalance: chanInfo.LocalBalance.ToSatoshis(),
breachedOutputs: breachedOutputs,
}
}
@ -1226,6 +1186,19 @@ type RetributionStore interface {
// the addition fails.
Add(retInfo *retributionInfo) error
// IsBreached queries the retribution store to see if the breach arbiter
// is aware of any breaches for the provided channel point.
IsBreached(chanPoint *wire.OutPoint) (bool, error)
// Finalize persists the finalized justice transaction for a particular
// channel.
Finalize(chanPoint *wire.OutPoint, finalTx *wire.MsgTx) error
// GetFinalizedTxn loads the finalized justice transaction, if any, from
// the retribution store. The finalized transaction will be nil if
// Finalize has not yet been called for this channel point.
GetFinalizedTxn(chanPoint *wire.OutPoint) (*wire.MsgTx, error)
// Remove deletes the retributionInfo from disk, if any exists, under
// the given key. An error should be re raised if the removal fails.
Remove(key *wire.OutPoint) error
@ -1276,8 +1249,97 @@ func (rs *retributionStore) Add(ret *retributionInfo) error {
})
}
// Remove removes a retribution state from the retributionStore database.
func (rs *retributionStore) Remove(key *wire.OutPoint) error {
// Finalize writes a signed justice transaction to the retribution store. This
// is done before publishing the transaction, so that we can recover the txid on
// startup and re-register for confirmation notifications.
func (rs *retributionStore) Finalize(chanPoint *wire.OutPoint,
finalTx *wire.MsgTx) error {
return rs.db.Update(func(tx *bolt.Tx) error {
justiceBkt, err := tx.CreateBucketIfNotExists(justiceTxnBucket)
if err != nil {
return err
}
var chanBuf bytes.Buffer
if err := writeOutpoint(&chanBuf, chanPoint); err != nil {
return err
}
var txBuf bytes.Buffer
if err := finalTx.Serialize(&txBuf); err != nil {
return err
}
return justiceBkt.Put(chanBuf.Bytes(), txBuf.Bytes())
})
}
// GetFinalizedTxn loads the finalized justice transaction for the provided
// channel point. The finalized transaction will be nil if Finalize has yet to
// be called for this channel point.
func (rs *retributionStore) GetFinalizedTxn(
chanPoint *wire.OutPoint) (*wire.MsgTx, error) {
var finalTxBytes []byte
if err := rs.db.View(func(tx *bolt.Tx) error {
justiceBkt := tx.Bucket(justiceTxnBucket)
if justiceBkt == nil {
return nil
}
var chanBuf bytes.Buffer
if err := writeOutpoint(&chanBuf, chanPoint); err != nil {
return err
}
finalTxBytes = justiceBkt.Get(chanBuf.Bytes())
return nil
}); err != nil {
return nil, err
}
if finalTxBytes == nil {
return nil, nil
}
finalTx := &wire.MsgTx{}
err := finalTx.Deserialize(bytes.NewReader(finalTxBytes))
return finalTx, err
}
// IsBreached queries the retribution store to discern if this channel was
// previously breached. This is used when connecting to a peer to determine if
// it is safe to add a link to the htlcswitch, as we should never add a channel
// that has already been breached.
func (rs *retributionStore) IsBreached(chanPoint *wire.OutPoint) (bool, error) {
var found bool
err := rs.db.View(func(tx *bolt.Tx) error {
retBucket := tx.Bucket(retributionBucket)
if retBucket == nil {
return nil
}
var chanBuf bytes.Buffer
if err := writeOutpoint(&chanBuf, chanPoint); err != nil {
return err
}
retInfo := retBucket.Get(chanBuf.Bytes())
if retInfo != nil {
found = true
}
return nil
})
return found, err
}
// Remove removes a retribution state and finalized justice transaction by
// channel point from the retribution store.
func (rs *retributionStore) Remove(chanPoint *wire.OutPoint) error {
return rs.db.Update(func(tx *bolt.Tx) error {
retBucket := tx.Bucket(retributionBucket)
@ -1287,15 +1349,30 @@ func (rs *retributionStore) Remove(key *wire.OutPoint) error {
// stored in the db.
if retBucket == nil {
return errors.New("unable to remove retribution " +
"because the db bucket doesn't exist.")
"because the retribution bucket doesn't exist.")
}
var outBuf bytes.Buffer
if err := writeOutpoint(&outBuf, key); err != nil {
// Serialize the channel point we are intending to remove.
var chanBuf bytes.Buffer
if err := writeOutpoint(&chanBuf, chanPoint); err != nil {
return err
}
chanBytes := chanBuf.Bytes()
// Remove the persisted retribution info and finalized justice
// transaction.
if err := retBucket.Delete(chanBytes); err != nil {
return err
}
return retBucket.Delete(outBuf.Bytes())
// If we have not finalized this channel breach, we can exit
// early.
justiceBkt := tx.Bucket(justiceTxnBucket)
if justiceBkt == nil {
return nil
}
return justiceBkt.Delete(chanBytes)
})
}
@ -1313,11 +1390,10 @@ func (rs *retributionStore) ForAll(cb func(*retributionInfo) error) error {
// Otherwise, we fetch each serialized retribution info,
// deserialize it, and execute the passed in callback function
// on it.
return retBucket.ForEach(func(outBytes, retBytes []byte) error {
return retBucket.ForEach(func(_, retBytes []byte) error {
ret := &retributionInfo{}
if err := ret.Decode(
bytes.NewBuffer(retBytes),
); err != nil {
err := ret.Decode(bytes.NewBuffer(retBytes))
if err != nil {
return err
}
@ -1328,7 +1404,7 @@ func (rs *retributionStore) ForAll(cb func(*retributionInfo) error) error {
// Encode serializes the retribution into the passed byte stream.
func (ret *retributionInfo) Encode(w io.Writer) error {
var scratch [8]byte
var scratch [4]byte
if _, err := w.Write(ret.commitHash[:]); err != nil {
return err
@ -1342,18 +1418,8 @@ func (ret *retributionInfo) Encode(w io.Writer) error {
return err
}
if _, err := w.Write(
ret.remoteIdentity.SerializeCompressed()); err != nil {
return err
}
binary.BigEndian.PutUint64(scratch[:8], uint64(ret.capacity))
if _, err := w.Write(scratch[:8]); err != nil {
return err
}
binary.BigEndian.PutUint64(scratch[:8], uint64(ret.settledBalance))
if _, err := w.Write(scratch[:8]); err != nil {
binary.BigEndian.PutUint32(scratch[:], ret.breachHeight)
if _, err := w.Write(scratch[:]); err != nil {
return err
}
@ -1373,12 +1439,12 @@ func (ret *retributionInfo) Encode(w io.Writer) error {
// Dencode deserializes a retribution from the passed byte stream.
func (ret *retributionInfo) Decode(r io.Reader) error {
var scratch [33]byte
var scratch [32]byte
if _, err := io.ReadFull(r, scratch[:32]); err != nil {
if _, err := io.ReadFull(r, scratch[:]); err != nil {
return err
}
hash, err := chainhash.NewHash(scratch[:32])
hash, err := chainhash.NewHash(scratch[:])
if err != nil {
return err
}
@ -1388,34 +1454,19 @@ func (ret *retributionInfo) Decode(r io.Reader) error {
return err
}
if _, err := io.ReadFull(r, scratch[:32]); err != nil {
if _, err := io.ReadFull(r, scratch[:]); err != nil {
return err
}
chainHash, err := chainhash.NewHash(scratch[:32])
chainHash, err := chainhash.NewHash(scratch[:])
if err != nil {
return err
}
ret.chainHash = *chainHash
if _, err = io.ReadFull(r, scratch[:33]); err != nil {
if _, err := io.ReadFull(r, scratch[:4]); err != nil {
return err
}
remoteIdentity, err := btcec.ParsePubKey(scratch[:33], btcec.S256())
if err != nil {
return err
}
ret.remoteIdentity = remoteIdentity
if _, err := io.ReadFull(r, scratch[:8]); err != nil {
return err
}
ret.capacity = btcutil.Amount(binary.BigEndian.Uint64(scratch[:8]))
if _, err := io.ReadFull(r, scratch[:8]); err != nil {
return err
}
ret.settledBalance = btcutil.Amount(
binary.BigEndian.Uint64(scratch[:8]))
ret.breachHeight = binary.BigEndian.Uint32(scratch[:4])
nOutputsU64, err := wire.ReadVarInt(r, 0)
if err != nil {

View File

@ -4,16 +4,24 @@ package main
import (
"bytes"
"crypto/sha256"
"fmt"
"io/ioutil"
"math/rand"
"os"
"reflect"
"sync"
"testing"
"time"
"github.com/btcsuite/btclog"
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/shachain"
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/chaincfg/chainhash"
"github.com/roasbeef/btcd/txscript"
@ -225,9 +233,8 @@ var (
0xb7, 0x94, 0x38, 0x5f, 0x2d, 0x1e, 0xf7, 0xab,
0x6b, 0x49, 0x18, 0x83, 0x31, 0x98, 0x47, 0x53,
},
chanPoint: breachOutPoints[0],
capacity: btcutil.Amount(1e7),
settledBalance: btcutil.Amount(1e7),
chanPoint: breachOutPoints[0],
breachHeight: 337,
// Set to breachedOutputs 0 and 1 in init()
breachedOutputs: []breachedOutput{{}, {}},
},
@ -244,9 +251,8 @@ var (
0x6b, 0x49, 0x18, 0x83, 0x31, 0x98, 0x47, 0x53,
0x4d, 0x92, 0x73, 0xd1, 0x90, 0x63, 0x81, 0xb4,
},
chanPoint: breachOutPoints[1],
capacity: btcutil.Amount(1e7),
settledBalance: btcutil.Amount(1e7),
chanPoint: breachOutPoints[1],
breachHeight: 420420,
// Set to breachedOutputs 1 and 2 in init()
breachedOutputs: []breachedOutput{{}, {}},
},
@ -254,6 +260,10 @@ var (
)
func init() {
channeldb.UseLogger(btclog.Disabled)
lnwallet.UseLogger(btclog.Disabled)
brarLog = btclog.Disabled
// Ensure that breached outputs are initialized before starting tests.
if err := initBreachedOutputs(); err != nil {
panic(err)
@ -263,7 +273,6 @@ func init() {
// channel point.
for i := range retributions {
retInfo := &retributions[i]
retInfo.remoteIdentity = breachedOutputs[i].signDesc.PubKey
retInfo.breachedOutputs[0] = breachedOutputs[i]
retInfo.breachedOutputs[1] = breachedOutputs[i+1]
@ -291,6 +300,8 @@ type failingRetributionStore struct {
rs RetributionStore
nextAddErr error
restart func() RetributionStore
}
@ -307,19 +318,64 @@ func newFailingRetributionStore(
}
}
// FailNextAdd instructs the retribution store to return the provided error. If
// the error is nil, a generic default will be used.
func (frs *failingRetributionStore) FailNextAdd(err error) {
if err == nil {
err = errors.New("retribution store failed")
}
frs.mu.Lock()
frs.nextAddErr = err
frs.mu.Unlock()
}
func (frs *failingRetributionStore) Restart() {
frs.mu.Lock()
frs.rs = frs.restart()
frs.mu.Unlock()
}
// Add forwards the call to the underlying retribution store, unless this Add
// has been previously instructed to fail.
func (frs *failingRetributionStore) Add(retInfo *retributionInfo) error {
frs.mu.Lock()
defer frs.mu.Unlock()
if frs.nextAddErr != nil {
err := frs.nextAddErr
frs.nextAddErr = nil
return err
}
return frs.rs.Add(retInfo)
}
func (frs *failingRetributionStore) IsBreached(chanPoint *wire.OutPoint) (bool, error) {
frs.mu.Lock()
defer frs.mu.Unlock()
return frs.rs.IsBreached(chanPoint)
}
func (frs *failingRetributionStore) Finalize(chanPoint *wire.OutPoint,
finalTx *wire.MsgTx) error {
frs.mu.Lock()
defer frs.mu.Unlock()
return frs.rs.Finalize(chanPoint, finalTx)
}
func (frs *failingRetributionStore) GetFinalizedTxn(
chanPoint *wire.OutPoint) (*wire.MsgTx, error) {
frs.mu.Lock()
defer frs.mu.Unlock()
return frs.rs.GetFinalizedTxn(chanPoint)
}
func (frs *failingRetributionStore) Remove(key *wire.OutPoint) error {
frs.mu.Lock()
defer frs.mu.Unlock()
@ -415,9 +471,7 @@ func copyRetInfo(retInfo *retributionInfo) *retributionInfo {
commitHash: retInfo.commitHash,
chainHash: retInfo.chainHash,
chanPoint: retInfo.chanPoint,
remoteIdentity: retInfo.remoteIdentity,
capacity: retInfo.capacity,
settledBalance: retInfo.settledBalance,
breachHeight: retInfo.breachHeight,
breachedOutputs: make([]breachedOutput, nOutputs),
}
@ -432,14 +486,16 @@ func copyRetInfo(retInfo *retributionInfo) *retributionInfo {
// by an in-memory map. Access to the internal state is provided by a mutex.
// TODO(cfromknecht) extend to support and test controlled failures.
type mockRetributionStore struct {
mu sync.Mutex
state map[wire.OutPoint]*retributionInfo
mu sync.Mutex
state map[wire.OutPoint]*retributionInfo
finalTxs map[wire.OutPoint]*wire.MsgTx
}
func newMockRetributionStore() *mockRetributionStore {
return &mockRetributionStore{
mu: sync.Mutex{},
state: make(map[wire.OutPoint]*retributionInfo),
mu: sync.Mutex{},
state: make(map[wire.OutPoint]*retributionInfo),
finalTxs: make(map[wire.OutPoint]*wire.MsgTx),
}
}
@ -451,9 +507,38 @@ func (rs *mockRetributionStore) Add(retInfo *retributionInfo) error {
return nil
}
func (rs *mockRetributionStore) IsBreached(chanPoint *wire.OutPoint) (bool, error) {
rs.mu.Lock()
_, ok := rs.state[*chanPoint]
rs.mu.Unlock()
return ok, nil
}
func (rs *mockRetributionStore) Finalize(chanPoint *wire.OutPoint,
finalTx *wire.MsgTx) error {
rs.mu.Lock()
rs.finalTxs[*chanPoint] = finalTx
rs.mu.Unlock()
return nil
}
func (rs *mockRetributionStore) GetFinalizedTxn(
chanPoint *wire.OutPoint) (*wire.MsgTx, error) {
rs.mu.Lock()
finalTx := rs.finalTxs[*chanPoint]
rs.mu.Unlock()
return finalTx, nil
}
func (rs *mockRetributionStore) Remove(key *wire.OutPoint) error {
rs.mu.Lock()
delete(rs.state, *key)
delete(rs.finalTxs, *key)
rs.mu.Unlock()
return nil
@ -515,28 +600,37 @@ func TestMockRetributionStore(t *testing.T) {
}
}
// TestChannelDBRetributionStore instantiates a retributionStore backed by a
// channeldb.DB, and tests its behavior using the general RetributionStore test
// suite.
func TestChannelDBRetributionStore(t *testing.T) {
func makeTestChannelDB() (*channeldb.DB, func(), error) {
// First, create a temporary directory to be used for the duration of
// this test.
tempDirName, err := ioutil.TempDir("", "channeldb")
if err != nil {
t.Fatalf("unable to initialize temp "+
"directory for channeldb: %v", err)
return nil, nil, err
}
defer os.RemoveAll(tempDirName)
// Disable logging to prevent panics bc. of global state
channeldb.UseLogger(btclog.Disabled)
cleanUp := func() {
os.RemoveAll(tempDirName)
}
// Next, create channeldb for the first time.
db, err := channeldb.Open(tempDirName)
if err != nil {
cleanUp()
return nil, nil, err
}
return db, cleanUp, nil
}
// TestChannelDBRetributionStore instantiates a retributionStore backed by a
// channeldb.DB, and tests its behavior using the general RetributionStore test
// suite.
func TestChannelDBRetributionStore(t *testing.T) {
db, cleanUp, err := makeTestChannelDB()
if err != nil {
t.Fatalf("unable to open channeldb: %v", err)
}
defer db.Close()
defer cleanUp()
restartDb := func() RetributionStore {
// Close and reopen channeldb
@ -544,7 +638,7 @@ func TestChannelDBRetributionStore(t *testing.T) {
t.Fatalf("unalbe to close channeldb during restart: %v",
err)
}
db, err = channeldb.Open(tempDirName)
db, err = channeldb.Open(db.Path())
if err != nil {
t.Fatalf("unable to open channeldb: %v", err)
}
@ -831,3 +925,596 @@ restartCheck:
goto restartCheck
}
}
// TestBreachHandoffSuccess tests that a channel's close observer properly
// delivers retribution information to the breach arbiter in response to a
// breach close. This test verifies correctness in the event that the handoff
// experiences no interruptions.
func TestBreachHandoffSuccess(t *testing.T) {
// Create a pair of channels using a notifier that allows us to signal
// a spend of the funding transaction. Alice's channel will be the on
// observing a breach.
notifier := makeMockSpendNotifier()
alice, bob, cleanUpChans, err := createInitChannelsWithNotifier(
1, notifier)
if err != nil {
t.Fatalf("unable to create test channels: %v", err)
}
defer cleanUpChans()
// Instantiate a breach arbiter to handle the breach of alice's channel.
brar, cleanUpArb, err := createTestArbiter(t, notifier, alice.State().Db)
if err != nil {
t.Fatalf("unable to initialize test breach arbiter: %v", err)
}
defer cleanUpArb()
// Send the channel to the arbiter so that it set up the receiving end
// of the handoff.
select {
case brar.newContracts <- alice:
case <-time.After(500 * time.Millisecond):
t.Fatalf("unable to register alice with breach arbiter: %v", err)
}
// Send one HTLC to Bob and perform a state transition to lock it in.
htlcAmount := lnwire.NewMSatFromSatoshis(20000)
htlc, _ := createHTLC(0, htlcAmount)
if _, err := alice.AddHTLC(htlc); err != nil {
t.Fatalf("alice unable to add htlc: %v", err)
}
if _, err := bob.ReceiveHTLC(htlc); err != nil {
t.Fatalf("bob unable to recv add htlc: %v", err)
}
if err := forceStateTransition(alice, bob); err != nil {
t.Fatalf("Can't update the channel state: %v", err)
}
// Generate the force close summary at this point in time, this will
// serve as the old state bob will broadcast.
forceCloseSummary, err := bob.ForceClose()
if err != nil {
t.Fatalf("unable to force close bob's channel: %v", err)
}
// Now send another HTLC and perform a state transition, this ensures
// Alice is ahead of the state Bob will broadcast.
htlc2, _ := createHTLC(1, htlcAmount)
if _, err := alice.AddHTLC(htlc2); err != nil {
t.Fatalf("alice unable to add htlc: %v", err)
}
if _, err := bob.ReceiveHTLC(htlc2); err != nil {
t.Fatalf("bob unable to recv add htlc: %v", err)
}
if err := forceStateTransition(alice, bob); err != nil {
t.Fatalf("Can't update the channel state: %v", err)
}
chanPoint := alice.ChanPoint
breachTxn := forceCloseSummary.CloseTx
// Signal a spend of the funding transaction and wait for the close
// observer to exit.
notifier.Spend(chanPoint, 100, breachTxn)
alice.WaitForClose()
// After exiting, the breach arbiter should have persisted the
// retribution information and the channel should be shown as pending
// force closed.
assertArbiterBreach(t, brar, chanPoint)
assertPendingClosed(t, alice)
}
// TestBreachHandoffFail tests that a channel's close observer properly
// delivers retribution information to the breach arbiter in response to a
// breach close. This test verifies correctness in the event that the breach
// arbiter fails to write the information to disk, and that a subsequent attempt
// at the handoff succeeds.
func TestBreachHandoffFail(t *testing.T) {
// Create a pair of channels using a notifier that allows us to signal
// a spend of the funding transaction. Alice's channel will be the on
// observing a breach.
notifier := makeMockSpendNotifier()
alice, bob, cleanUpChans, err := createInitChannelsWithNotifier(
1, notifier)
if err != nil {
t.Fatalf("unable to create test channels: %v", err)
}
defer cleanUpChans()
// Instantiate a breach arbiter to handle the breach of alice's channel.
brar, cleanUpArb, err := createTestArbiter(t, notifier, alice.State().Db)
if err != nil {
t.Fatalf("unable to initialize test breach arbiter: %v", err)
}
defer cleanUpArb()
// Send the channel to the arbiter so that it set up the receiving end
// of the handoff.
select {
case brar.newContracts <- alice:
case <-time.After(500 * time.Millisecond):
t.Fatalf("unable to register alice with breach arbiter: %v", err)
}
// Send one HTLC to Bob and perform a state transition to lock it in.
htlcAmount := lnwire.NewMSatFromSatoshis(20000)
htlc, _ := createHTLC(0, htlcAmount)
if _, err := alice.AddHTLC(htlc); err != nil {
t.Fatalf("alice unable to add htlc: %v", err)
}
if _, err := bob.ReceiveHTLC(htlc); err != nil {
t.Fatalf("bob unable to recv add htlc: %v", err)
}
if err := forceStateTransition(alice, bob); err != nil {
t.Fatalf("Can't update the channel state: %v", err)
}
// Generate the force close summary at this point in time, this will
// serve as the old state bob will broadcast.
forceCloseSummary, err := bob.ForceClose()
if err != nil {
t.Fatalf("unable to force close bob's channel: %v", err)
}
// Now send another HTLC and perform a state transition, this ensures
// Alice is ahead of the state Bob will broadcast.
htlc2, _ := createHTLC(1, htlcAmount)
if _, err := alice.AddHTLC(htlc2); err != nil {
t.Fatalf("alice unable to add htlc: %v", err)
}
if _, err := bob.ReceiveHTLC(htlc2); err != nil {
t.Fatalf("bob unable to recv add htlc: %v", err)
}
if err := forceStateTransition(alice, bob); err != nil {
t.Fatalf("Can't update the channel state: %v", err)
}
// Before alerting Alice of the breach, instruct our failing retribution
// store to fail the next database operation, which we expect to write
// the information handed off by the channel's close observer.
fstore := brar.cfg.Store.(*failingRetributionStore)
fstore.FailNextAdd(nil)
// Signal the notifier to dispatch spend notifications of the funding
// transaction using the transaction from bob's closing summary.
chanPoint := alice.ChanPoint
breachTxn := forceCloseSummary.CloseTx
notifier.Spend(chanPoint, 100, breachTxn)
// Wait for the close observer to exit, all persistent effects should be
// observable after this point.
alice.WaitForClose()
// Since the handoff failed, the breach arbiter should not show the
// channel as breached, and the channel should also not have been marked
// pending closed.
assertNoArbiterBreach(t, brar, chanPoint)
assertNotPendingClosed(t, alice)
// Instantiate a second lightning channel for alice, using the state of
// her last channel.
aliceKeyPriv, _ := btcec.PrivKeyFromBytes(btcec.S256(),
alicesPrivKey)
aliceSigner := &mockSigner{aliceKeyPriv}
estimator := &lnwallet.StaticFeeEstimator{FeeRate: 50}
alice2, err := lnwallet.NewLightningChannel(aliceSigner, notifier,
estimator, alice.State())
if err != nil {
t.Fatalf("unable to create test channels: %v", err)
}
defer alice2.Stop()
// Send this newer channel to breach arbiter, which should replace the
// prior.
select {
case brar.newContracts <- alice2:
case <-time.After(500 * time.Millisecond):
t.Fatalf("unable to register alice with breach arbiter: %v", err)
}
// Signal a spend of the funding transaction and wait for the close
// observer to exit. This time we are allowing the handoff to succeed.
notifier.Spend(chanPoint, 100, breachTxn)
alice2.WaitForClose()
// Check that the breach was properly recorded in the breach arbiter,
// and that the close observer marked the channel as pending closed
// before exiting.
assertArbiterBreach(t, brar, chanPoint)
assertPendingClosed(t, alice)
}
// assertArbiterBreach checks that the breach arbiter has persisted the breach
// information for a particular channel.
func assertArbiterBreach(t *testing.T, brar *breachArbiter,
chanPoint *wire.OutPoint) {
isBreached, err := brar.IsBreached(chanPoint)
if err != nil {
t.Fatalf("unable to determine if channel is "+
"breached: %v", err)
}
if !isBreached {
t.Fatalf("channel %v was never marked breached",
chanPoint)
}
}
// assertNoArbiterBreach checks that the breach arbiter has not persisted the
// breach information for a particular channel.
func assertNoArbiterBreach(t *testing.T, brar *breachArbiter,
chanPoint *wire.OutPoint) {
isBreached, err := brar.IsBreached(chanPoint)
if err != nil {
t.Fatalf("unable to determine if channel is "+
"breached: %v", err)
}
if isBreached {
t.Fatalf("channel %v was marked breached",
chanPoint)
}
}
// assertPendingClosed checks that the channel has been marked pending closed in
// the channel database.
func assertPendingClosed(t *testing.T, c *lnwallet.LightningChannel) {
closedChans, err := c.State().Db.FetchClosedChannels(true)
if err != nil {
t.Fatalf("unable to load pending closed channels: %v", err)
}
for _, chanSummary := range closedChans {
if chanSummary.ChanPoint == *c.ChanPoint {
return
}
}
t.Fatalf("channel %v was not marked pending closed",
c.ChanPoint)
}
// assertNotPendingClosed checks that the channel has not been marked pending
// closed in the channel database.
func assertNotPendingClosed(t *testing.T, c *lnwallet.LightningChannel) {
closedChans, err := c.State().Db.FetchClosedChannels(true)
if err != nil {
t.Fatalf("unable to load pending closed channels: %v", err)
}
for _, chanSummary := range closedChans {
if chanSummary.ChanPoint == *c.ChanPoint {
t.Fatalf("channel %v was marked pending closed",
c.ChanPoint)
}
}
}
// createTestArbiter instantiates a breach arbiter with a failing retribution
// store, so that controlled failures can be tested.
func createTestArbiter(t *testing.T, notifier chainntnfs.ChainNotifier,
db *channeldb.DB) (*breachArbiter, func(), error) {
// Create a failing retribution store, that wraps a normal one.
store := newFailingRetributionStore(func() RetributionStore {
return newRetributionStore(db)
})
aliceKeyPriv, _ := btcec.PrivKeyFromBytes(btcec.S256(),
alicesPrivKey)
signer := &mockSigner{key: aliceKeyPriv}
// Assemble our test arbiter.
ba := newBreachArbiter(&BreachConfig{
CloseLink: func(_ *wire.OutPoint, _ htlcswitch.ChannelCloseType) {},
DB: db,
Estimator: &lnwallet.StaticFeeEstimator{FeeRate: 50},
GenSweepScript: func() ([]byte, error) { return nil, nil },
Notifier: notifier,
Signer: signer,
PublishTransaction: func(_ *wire.MsgTx) error { return nil },
Store: store,
})
if err := ba.Start(); err != nil {
return nil, nil, err
}
// The caller is responsible for closing the database.
cleanUp := func() {
ba.Stop()
}
return ba, cleanUp, nil
}
// createInitChannelsWithNotifier creates two initialized test channels funded
// with 10 BTC, with 5 BTC allocated to each side. Within the channel, Alice is
// the initiator.
func createInitChannelsWithNotifier(revocationWindow int,
notifier chainntnfs.ChainNotifier) (*lnwallet.LightningChannel,
*lnwallet.LightningChannel, func(), error) {
aliceKeyPriv, aliceKeyPub := btcec.PrivKeyFromBytes(btcec.S256(),
alicesPrivKey)
bobKeyPriv, bobKeyPub := btcec.PrivKeyFromBytes(btcec.S256(),
bobsPrivKey)
channelCapacity := btcutil.Amount(10 * 1e8)
channelBal := channelCapacity / 2
aliceDustLimit := btcutil.Amount(200)
bobDustLimit := btcutil.Amount(1300)
csvTimeoutAlice := uint32(5)
csvTimeoutBob := uint32(4)
prevOut := &wire.OutPoint{
Hash: chainhash.Hash(testHdSeed),
Index: 0,
}
fundingTxIn := wire.NewTxIn(prevOut, nil, nil)
aliceCfg := channeldb.ChannelConfig{
ChannelConstraints: channeldb.ChannelConstraints{
DustLimit: aliceDustLimit,
MaxPendingAmount: lnwire.MilliSatoshi(rand.Int63()),
ChanReserve: btcutil.Amount(rand.Int63()),
MinHTLC: lnwire.MilliSatoshi(rand.Int63()),
MaxAcceptedHtlcs: uint16(rand.Int31()),
},
CsvDelay: uint16(csvTimeoutAlice),
MultiSigKey: aliceKeyPub,
RevocationBasePoint: aliceKeyPub,
PaymentBasePoint: aliceKeyPub,
DelayBasePoint: aliceKeyPub,
HtlcBasePoint: aliceKeyPub,
}
bobCfg := channeldb.ChannelConfig{
ChannelConstraints: channeldb.ChannelConstraints{
DustLimit: bobDustLimit,
MaxPendingAmount: lnwire.MilliSatoshi(rand.Int63()),
ChanReserve: btcutil.Amount(rand.Int63()),
MinHTLC: lnwire.MilliSatoshi(rand.Int63()),
MaxAcceptedHtlcs: uint16(rand.Int31()),
},
CsvDelay: uint16(csvTimeoutBob),
MultiSigKey: bobKeyPub,
RevocationBasePoint: bobKeyPub,
PaymentBasePoint: bobKeyPub,
DelayBasePoint: bobKeyPub,
HtlcBasePoint: bobKeyPub,
}
bobRoot := lnwallet.DeriveRevocationRoot(bobKeyPriv, testHdSeed, aliceKeyPub)
bobPreimageProducer := shachain.NewRevocationProducer(bobRoot)
bobFirstRevoke, err := bobPreimageProducer.AtIndex(0)
if err != nil {
return nil, nil, nil, err
}
bobCommitPoint := lnwallet.ComputeCommitmentPoint(bobFirstRevoke[:])
aliceRoot := lnwallet.DeriveRevocationRoot(aliceKeyPriv, testHdSeed, bobKeyPub)
alicePreimageProducer := shachain.NewRevocationProducer(aliceRoot)
aliceFirstRevoke, err := alicePreimageProducer.AtIndex(0)
if err != nil {
return nil, nil, nil, err
}
aliceCommitPoint := lnwallet.ComputeCommitmentPoint(aliceFirstRevoke[:])
aliceCommitTx, bobCommitTx, err := lnwallet.CreateCommitmentTxns(channelBal,
channelBal, &aliceCfg, &bobCfg, aliceCommitPoint, bobCommitPoint,
*fundingTxIn)
if err != nil {
return nil, nil, nil, err
}
alicePath, err := ioutil.TempDir("", "alicedb")
dbAlice, err := channeldb.Open(alicePath)
if err != nil {
return nil, nil, nil, err
}
bobPath, err := ioutil.TempDir("", "bobdb")
dbBob, err := channeldb.Open(bobPath)
if err != nil {
return nil, nil, nil, err
}
estimator := &lnwallet.StaticFeeEstimator{FeeRate: 50}
feePerWeight, err := estimator.EstimateFeePerWeight(1)
if err != nil {
return nil, nil, nil, err
}
feePerKw := feePerWeight * 1000
// TODO(roasbeef): need to factor in commit fee?
aliceCommit := channeldb.ChannelCommitment{
CommitHeight: 0,
LocalBalance: lnwire.NewMSatFromSatoshis(channelBal),
RemoteBalance: lnwire.NewMSatFromSatoshis(channelBal),
FeePerKw: feePerKw,
CommitFee: 8688,
CommitTx: aliceCommitTx,
CommitSig: bytes.Repeat([]byte{1}, 71),
}
bobCommit := channeldb.ChannelCommitment{
CommitHeight: 0,
LocalBalance: lnwire.NewMSatFromSatoshis(channelBal),
RemoteBalance: lnwire.NewMSatFromSatoshis(channelBal),
FeePerKw: feePerKw,
CommitFee: 8688,
CommitTx: bobCommitTx,
CommitSig: bytes.Repeat([]byte{1}, 71),
}
aliceChannelState := &channeldb.OpenChannel{
LocalChanCfg: aliceCfg,
RemoteChanCfg: bobCfg,
IdentityPub: aliceKeyPub,
FundingOutpoint: *prevOut,
ChanType: channeldb.SingleFunder,
IsInitiator: true,
Capacity: channelCapacity,
RemoteCurrentRevocation: bobCommitPoint,
RevocationProducer: alicePreimageProducer,
RevocationStore: shachain.NewRevocationStore(),
LocalCommitment: aliceCommit,
RemoteCommitment: aliceCommit,
Db: dbAlice,
}
bobChannelState := &channeldb.OpenChannel{
LocalChanCfg: bobCfg,
RemoteChanCfg: aliceCfg,
IdentityPub: bobKeyPub,
FundingOutpoint: *prevOut,
ChanType: channeldb.SingleFunder,
IsInitiator: false,
Capacity: channelCapacity,
RemoteCurrentRevocation: aliceCommitPoint,
RevocationProducer: bobPreimageProducer,
RevocationStore: shachain.NewRevocationStore(),
LocalCommitment: bobCommit,
RemoteCommitment: bobCommit,
Db: dbBob,
}
aliceSigner := &mockSigner{aliceKeyPriv}
bobSigner := &mockSigner{bobKeyPriv}
channelAlice, err := lnwallet.NewLightningChannel(aliceSigner, notifier,
estimator, aliceChannelState)
if err != nil {
return nil, nil, nil, err
}
channelBob, err := lnwallet.NewLightningChannel(bobSigner, notifier,
estimator, bobChannelState)
if err != nil {
return nil, nil, nil, err
}
if err := channelAlice.State().FullSync(); err != nil {
return nil, nil, nil, err
}
if err := channelBob.State().FullSync(); err != nil {
return nil, nil, nil, err
}
cleanUpFunc := func() {
dbBob.Close()
dbAlice.Close()
os.RemoveAll(bobPath)
os.RemoveAll(alicePath)
}
// Now that the channel are open, simulate the start of a session by
// having Alice and Bob extend their revocation windows to each other.
err = initRevocationWindows(channelAlice, channelBob, revocationWindow)
if err != nil {
return nil, nil, nil, err
}
return channelAlice, channelBob, cleanUpFunc, nil
}
// initRevocationWindows simulates a new channel being opened within the p2p
// network by populating the initial revocation windows of the passed
// commitment state machines.
//
// TODO(conner) remove code duplication
func initRevocationWindows(chanA, chanB *lnwallet.LightningChannel, windowSize int) error {
aliceNextRevoke, err := chanA.NextRevocationKey()
if err != nil {
return err
}
if err := chanB.InitNextRevocation(aliceNextRevoke); err != nil {
return err
}
bobNextRevoke, err := chanB.NextRevocationKey()
if err != nil {
return err
}
if err := chanA.InitNextRevocation(bobNextRevoke); err != nil {
return err
}
return nil
}
// createHTLC is a utility function for generating an HTLC with a given
// preimage and a given amount.
// TODO(conner) remove code duplication
func createHTLC(data int, amount lnwire.MilliSatoshi) (*lnwire.UpdateAddHTLC, [32]byte) {
preimage := bytes.Repeat([]byte{byte(data)}, 32)
paymentHash := sha256.Sum256(preimage)
var returnPreimage [32]byte
copy(returnPreimage[:], preimage)
return &lnwire.UpdateAddHTLC{
ID: uint64(data),
PaymentHash: paymentHash,
Amount: amount,
Expiry: uint32(5),
}, returnPreimage
}
// forceStateTransition executes the necessary interaction between the two
// commitment state machines to transition to a new state locking in any
// pending updates.
// TODO(conner) remove code duplication
func forceStateTransition(chanA, chanB *lnwallet.LightningChannel) error {
aliceSig, aliceHtlcSigs, err := chanA.SignNextCommitment()
if err != nil {
return err
}
if err = chanB.ReceiveNewCommitment(aliceSig, aliceHtlcSigs); err != nil {
return err
}
bobRevocation, err := chanB.RevokeCurrentCommitment()
if err != nil {
return err
}
bobSig, bobHtlcSigs, err := chanB.SignNextCommitment()
if err != nil {
return err
}
if _, err := chanA.ReceiveRevocation(bobRevocation); err != nil {
return err
}
if err := chanA.ReceiveNewCommitment(bobSig, bobHtlcSigs); err != nil {
return err
}
aliceRevocation, err := chanA.RevokeCurrentCommitment()
if err != nil {
return err
}
if _, err := chanB.ReceiveRevocation(aliceRevocation); err != nil {
return err
}
return nil
}
// calcStaticFee calculates appropriate fees for commitment transactions. This
// function provides a simple way to allow test balance assertions to take fee
// calculations into account.
//
// TODO(bvu): Refactor when dynamic fee estimation is added.
// TODO(conner) remove code duplication
func calcStaticFee(numHTLCs int) btcutil.Amount {
const (
commitWeight = btcutil.Amount(724)
htlcWeight = 172
feePerKw = btcutil.Amount(24/4) * 1000
)
return feePerKw * (commitWeight +
btcutil.Amount(htlcWeight*numHTLCs)) / 1000
}

View File

@ -305,6 +305,11 @@ type OpenChannel struct {
// negotiate fees, or close the channel.
IsInitiator bool
// IsBorked indicates that the channel has entered an irreconcilable
// state, triggered by a state desynchronization or channel breach.
// Channels in this state should never be added to the htlc switch.
IsBorked bool
// FundingBroadcastHeight is the height in which the funding
// transaction was broadcast. This value can be used by higher level
// sub-systems to determine if a channel is stale and/or should have
@ -522,6 +527,37 @@ func (c *OpenChannel) MarkAsOpen(openLoc lnwire.ShortChannelID) error {
})
}
// MarkBorked marks the event when the channel as reached an irreconcilable
// state, such as a channel breach or state desynchronization. Borked channels
// should never be added to the switch.
func (c *OpenChannel) MarkBorked() error {
c.Lock()
defer c.Unlock()
if err := c.Db.Update(func(tx *bolt.Tx) error {
chanBucket, err := updateChanBucket(tx, c.IdentityPub,
&c.FundingOutpoint, c.ChainHash)
if err != nil {
return err
}
channel, err := fetchOpenChannel(chanBucket, &c.FundingOutpoint)
if err != nil {
return err
}
channel.IsBorked = true
return putOpenChannel(chanBucket, channel)
}); err != nil {
return err
}
c.IsBorked = true
return nil
}
// putChannel serializes, and stores the current state of the channel in its
// entirety.
func putOpenChannel(chanBucket *bolt.Bucket, channel *OpenChannel) error {
@ -1217,6 +1253,9 @@ type ChannelCloseSummary struct {
// Capacity was the total capacity of the channel.
Capacity btcutil.Amount
// CloseHeight is the height at which the funding transaction was spent.
CloseHeight uint32
// SettledBalance is our total balance settled balance at the time of
// channel closure. This _does not_ include the sum of any outputs that
// have been time-locked as a result of the unilateral channel closure.
@ -1402,9 +1441,9 @@ func putChannelCloseSummary(tx *bolt.Tx, chanID []byte,
func serializeChannelCloseSummary(w io.Writer, cs *ChannelCloseSummary) error {
return writeElements(w,
cs.ChanPoint, cs.ChainHash, cs.ClosingTXID, cs.RemotePub, cs.Capacity,
cs.SettledBalance, cs.TimeLockedBalance, cs.CloseType,
cs.IsPending,
cs.ChanPoint, cs.ChainHash, cs.ClosingTXID, cs.CloseHeight,
cs.RemotePub, cs.Capacity, cs.SettledBalance,
cs.TimeLockedBalance, cs.CloseType, cs.IsPending,
)
}
@ -1429,9 +1468,9 @@ func deserializeCloseChannelSummary(r io.Reader) (*ChannelCloseSummary, error) {
c := &ChannelCloseSummary{}
err := readElements(r,
&c.ChanPoint, &c.ChainHash, &c.ClosingTXID, &c.RemotePub, &c.Capacity,
&c.SettledBalance, &c.TimeLockedBalance, &c.CloseType,
&c.IsPending,
&c.ChanPoint, &c.ChainHash, &c.ClosingTXID, &c.CloseHeight,
&c.RemotePub, &c.Capacity, &c.SettledBalance,
&c.TimeLockedBalance, &c.CloseType, &c.IsPending,
)
if err != nil {
return nil, err
@ -1445,9 +1484,10 @@ func putChanInfo(chanBucket *bolt.Bucket, channel *OpenChannel) error {
if err := writeElements(&w,
channel.ChanType, channel.ChainHash, channel.FundingOutpoint,
channel.ShortChanID, channel.IsPending, channel.IsInitiator,
channel.FundingBroadcastHeight, channel.NumConfsRequired,
channel.ChannelFlags, channel.IdentityPub, channel.Capacity,
channel.TotalMSatSent, channel.TotalMSatReceived,
channel.IsBorked, channel.FundingBroadcastHeight,
channel.NumConfsRequired, channel.ChannelFlags,
channel.IdentityPub, channel.Capacity, channel.TotalMSatSent,
channel.TotalMSatReceived,
); err != nil {
return err
}
@ -1545,9 +1585,10 @@ func fetchChanInfo(chanBucket *bolt.Bucket, channel *OpenChannel) error {
if err := readElements(r,
&channel.ChanType, &channel.ChainHash, &channel.FundingOutpoint,
&channel.ShortChanID, &channel.IsPending, &channel.IsInitiator,
&channel.FundingBroadcastHeight, &channel.NumConfsRequired,
&channel.ChannelFlags, &channel.IdentityPub, &channel.Capacity,
&channel.TotalMSatSent, &channel.TotalMSatReceived,
&channel.IsBorked, &channel.FundingBroadcastHeight,
&channel.NumConfsRequired, &channel.ChannelFlags,
&channel.IdentityPub, &channel.Capacity, &channel.TotalMSatSent,
&channel.TotalMSatReceived,
); err != nil {
return err
}

View File

@ -88,6 +88,11 @@ func Open(dbPath string) (*DB, error) {
return chanDB, nil
}
// Path returns the file path to the channel database.
func (d *DB) Path() string {
return d.dbPath
}
// Wipe completely deletes all saved state within all used buckets within the
// database. The deletion is done in a single transaction, therefore this
// operation is fully atomic.

View File

@ -132,7 +132,7 @@ type testNode struct {
shutdownChannel chan struct{}
}
func disableFndgLogger(t *testing.T) {
func init() {
channeldb.UseLogger(btclog.Disabled)
lnwallet.UseLogger(btclog.Disabled)
fndgLog = btclog.Disabled
@ -802,8 +802,6 @@ func assertHandleFundingLocked(t *testing.T, alice, bob *testNode) {
}
func TestFundingManagerNormalWorkflow(t *testing.T) {
disableFndgLogger(t)
alice, bob := setupFundingManagers(t)
defer tearDownFundingManagers(t, alice, bob)
@ -866,8 +864,6 @@ func TestFundingManagerNormalWorkflow(t *testing.T) {
}
func TestFundingManagerRestartBehavior(t *testing.T) {
disableFndgLogger(t)
alice, bob := setupFundingManagers(t)
defer tearDownFundingManagers(t, alice, bob)
@ -986,8 +982,6 @@ func TestFundingManagerRestartBehavior(t *testing.T) {
// server to notify when the peer comes online, in case sending the
// fundingLocked message fails the first time.
func TestFundingManagerOfflinePeer(t *testing.T) {
disableFndgLogger(t)
alice, bob := setupFundingManagers(t)
defer tearDownFundingManagers(t, alice, bob)
@ -1114,8 +1108,6 @@ func TestFundingManagerOfflinePeer(t *testing.T) {
}
func TestFundingManagerFundingTimeout(t *testing.T) {
disableFndgLogger(t)
alice, bob := setupFundingManagers(t)
defer tearDownFundingManagers(t, alice, bob)
@ -1158,8 +1150,6 @@ func TestFundingManagerFundingTimeout(t *testing.T) {
// continues to operate as expected in case we receive a duplicate fundingLocked
// message.
func TestFundingManagerReceiveFundingLockedTwice(t *testing.T) {
disableFndgLogger(t)
alice, bob := setupFundingManagers(t)
defer tearDownFundingManagers(t, alice, bob)
@ -1257,8 +1247,6 @@ func TestFundingManagerReceiveFundingLockedTwice(t *testing.T) {
// handles receiving a fundingLocked after the its own fundingLocked and channel
// announcement is sent and gets restarted.
func TestFundingManagerRestartAfterChanAnn(t *testing.T) {
disableFndgLogger(t)
alice, bob := setupFundingManagers(t)
defer tearDownFundingManagers(t, alice, bob)
@ -1329,8 +1317,6 @@ func TestFundingManagerRestartAfterChanAnn(t *testing.T) {
// fundingManager continues to operate as expected after it has received
// fundingLocked and then gets restarted.
func TestFundingManagerRestartAfterReceivingFundingLocked(t *testing.T) {
disableFndgLogger(t)
alice, bob := setupFundingManagers(t)
defer tearDownFundingManagers(t, alice, bob)
@ -1397,8 +1383,6 @@ func TestFundingManagerRestartAfterReceivingFundingLocked(t *testing.T) {
// (a channel not supposed to be announced to the rest of the network),
// the announcementSignatures nor the nodeAnnouncement messages are sent.
func TestFundingManagerPrivateChannel(t *testing.T) {
disableFndgLogger(t)
alice, bob := setupFundingManagers(t)
defer tearDownFundingManagers(t, alice, bob)
@ -1475,8 +1459,6 @@ func TestFundingManagerPrivateChannel(t *testing.T) {
// announcement signatures nor the node announcement messages are sent upon
// restart.
func TestFundingManagerPrivateRestart(t *testing.T) {
disableFndgLogger(t)
alice, bob := setupFundingManagers(t)
defer tearDownFundingManagers(t, alice, bob)

View File

@ -3057,18 +3057,7 @@ func testRevokedCloseRetribution(net *lntest.NetworkHarness, t *harnessTest) {
t.Fatalf("justice tx wasn't mined")
}
// Finally, obtain Alice's channel state, she shouldn't report any
// channel as she just successfully brought Bob to justice by sweeping
// all the channel funds.
req := &lnrpc.ListChannelsRequest{}
aliceChanInfo, err := net.Alice.ListChannels(ctxb, req)
if err != nil {
t.Fatalf("unable to query for alice's channels: %v", err)
}
if len(aliceChanInfo.Channels) != 0 {
t.Fatalf("alice shouldn't have a channel: %v",
spew.Sdump(aliceChanInfo.Channels))
}
assertNumChannels(t, ctxb, net.Alice, 0)
}
// testRevokedCloseRetributionZeroValueRemoteOutput tests that Alice is able
@ -3286,18 +3275,7 @@ func testRevokedCloseRetributionZeroValueRemoteOutput(net *lntest.NetworkHarness
t.Fatalf("justice tx wasn't mined")
}
// Finally, obtain Alice's channel state, she shouldn't report any
// channel as she just successfully brought Carol to justice by sweeping
// all the channel funds.
req := &lnrpc.ListChannelsRequest{}
aliceChanInfo, err := net.Alice.ListChannels(ctxb, req)
if err != nil {
t.Fatalf("unable to query for alice's channels: %v", err)
}
if len(aliceChanInfo.Channels) != 0 {
t.Fatalf("alice shouldn't have a channel: %v",
spew.Sdump(aliceChanInfo.Channels))
}
assertNumChannels(t, ctxb, net.Alice, 0)
}
// testRevokedCloseRetributionRemoteHodl tests that Alice properly responds to a
@ -3584,17 +3562,33 @@ func testRevokedCloseRetributionRemoteHodl(net *lntest.NetworkHarness,
t.Fatalf("justice tx wasn't mined")
}
// Finally, obtain Alice's channel state, she shouldn't report any
// channel as she just successfully brought Carol to justice by sweeping
// all the channel funds.
assertNumChannels(t, ctxb, net.Alice, 0)
}
// assertNumChannels polls the provided node's list channels rpc until it
// reaches the desired number of total channels.
func assertNumChannels(t *harnessTest, ctxb context.Context,
node *lntest.HarnessNode, numChannels int) {
// Poll alice for her list of channels.
req := &lnrpc.ListChannelsRequest{}
aliceChanInfo, err := net.Alice.ListChannels(ctxb, req)
if err != nil {
t.Fatalf("unable to query for alice's channels: %v", err)
var predErr error
pred := func() bool {
chanInfo, err := node.ListChannels(ctxb, req)
if err != nil {
predErr = fmt.Errorf("unable to query for alice's "+
"channels: %v", err)
return false
}
// Return true if the query returned the expected number of
// channels.
return len(chanInfo.Channels) == numChannels
}
if len(aliceChanInfo.Channels) != 0 {
t.Fatalf("alice shouldn't have a channel: %v",
spew.Sdump(aliceChanInfo.Channels))
if err := lntest.WaitPredicate(pred, time.Second*15); err != nil {
t.Fatalf("node has incorrect number of channels: %v", predErr)
}
}

View File

@ -1111,6 +1111,9 @@ type LightningChannel struct {
status channelState
// ChanPoint is the funding outpoint of this channel.
ChanPoint *wire.OutPoint
// sigPool is a pool of workers that are capable of signing and
// validating signatures in parallel. This is utilized as an
// optimization to void serially signing or validating the HTLC
@ -1205,7 +1208,8 @@ type LightningChannel struct {
sync.RWMutex
wg sync.WaitGroup
cowg sync.WaitGroup
wg sync.WaitGroup
shutdown int32
quit chan struct{}
@ -1269,6 +1273,7 @@ func NewLightningChannel(signer Signer, events chainntnfs.ChainNotifier,
remoteChanCfg: &state.RemoteChanCfg,
localUpdateLog: localUpdateLog,
remoteUpdateLog: remoteUpdateLog,
ChanPoint: &state.FundingOutpoint,
Capacity: state.Capacity,
FundingWitnessScript: multiSigScript,
ForceCloseSignal: make(chan struct{}),
@ -1338,6 +1343,7 @@ func NewLightningChannel(signer Signer, events chainntnfs.ChainNotifier,
// Launch the close observer which will vigilantly watch the
// network for any broadcasts the current or prior commitment
// transactions, taking action accordingly.
lc.cowg.Add(1)
go lc.closeObserver(channelCloseNtfn)
}
@ -1372,6 +1378,11 @@ func (lc *LightningChannel) CancelObserver() {
close(lc.observerQuit)
}
// WaitForClose blocks until the channel's close observer has terminated.
func (lc *LightningChannel) WaitForClose() {
lc.cowg.Wait()
}
// ResetState resets the state of the channel back to the default state. This
// ensures that any active goroutines which need to act based on on-chain
// events do so properly.
@ -1717,6 +1728,11 @@ type BreachRetribution struct {
// commitment transaction.
BreachTransaction *wire.MsgTx
// BreachHeight records the block height confirming the breach
// transaction, used as a height hint when registering for
// confirmations.
BreachHeight uint32
// ChainHash is the chain that the contract beach was identified
// within. This is also the resident chain of the contract (the chain
// the contract was created on).
@ -1757,13 +1773,18 @@ type BreachRetribution struct {
// HtlcRetributions is a slice of HTLC retributions for each output
// active HTLC output within the breached commitment transaction.
HtlcRetributions []HtlcRetribution
// Err is used to reliably hand-off the breach retribution to the breach
// arbiter.
Err chan error
}
// newBreachRetribution creates a new fully populated BreachRetribution for the
// passed channel, at a particular revoked state number, and one which targets
// the passed commitment transaction.
func newBreachRetribution(chanState *channeldb.OpenChannel, stateNum uint64,
broadcastCommitment *wire.MsgTx) (*BreachRetribution, error) {
broadcastCommitment *wire.MsgTx,
breachHeight uint32) (*BreachRetribution, error) {
commitHash := broadcastCommitment.TxHash()
@ -1926,6 +1947,7 @@ func newBreachRetribution(chanState *channeldb.OpenChannel, stateNum uint64,
return &BreachRetribution{
ChainHash: chanState.ChainHash,
BreachTransaction: broadcastCommitment,
BreachHeight: breachHeight,
RevokedStateNum: stateNum,
PendingHTLCs: revokedSnapshot.Htlcs,
LocalOutpoint: localOutpoint,
@ -1933,6 +1955,7 @@ func newBreachRetribution(chanState *channeldb.OpenChannel, stateNum uint64,
RemoteOutpoint: remoteOutpoint,
RemoteOutputSignDesc: remoteSignDesc,
HtlcRetributions: htlcRetributions,
Err: make(chan error, 1),
}, nil
}
@ -1946,11 +1969,14 @@ func newBreachRetribution(chanState *channeldb.OpenChannel, stateNum uint64,
//
// NOTE: This MUST be run as a goroutine.
func (lc *LightningChannel) closeObserver(channelCloseNtfn *chainntnfs.SpendEvent) {
defer lc.cowg.Done()
walletLog.Infof("Close observer for ChannelPoint(%v) active",
lc.channelState.FundingOutpoint)
var (
commitSpend *chainntnfs.SpendDetail
spendHeight uint32
ok bool
)
@ -1962,6 +1988,8 @@ func (lc *LightningChannel) closeObserver(channelCloseNtfn *chainntnfs.SpendEven
return
}
spendHeight = uint32(commitSpend.SpendingHeight)
// Otherwise, we've been signalled to bail out early by the
// caller/maintainer of this channel.
case <-lc.observerQuit:
@ -2012,6 +2040,7 @@ func (lc *LightningChannel) closeObserver(channelCloseNtfn *chainntnfs.SpendEven
remoteStateNum := lc.channelState.RemoteCommitment.CommitHeight
// TODO(roasbeef): track heights distinctly?
switch {
// If state number spending transaction matches the current latest
// state, then they've initiated a unilateral close. So we'll trigger
@ -2037,15 +2066,17 @@ func (lc *LightningChannel) closeObserver(channelCloseNtfn *chainntnfs.SpendEven
ChanPoint: lc.channelState.FundingOutpoint,
ChainHash: lc.channelState.ChainHash,
ClosingTXID: *commitSpend.SpenderTxHash,
CloseHeight: spendHeight,
RemotePub: lc.channelState.IdentityPub,
Capacity: lc.Capacity,
SettledBalance: lc.channelState.LocalCommitment.LocalBalance.ToSatoshis(),
CloseType: channeldb.ForceClose,
IsPending: true,
}
if err := lc.DeleteState(&closeSummary); err != nil {
walletLog.Errorf("unable to delete channel state: %v",
err)
walletLog.Errorf("unable to delete channel state: %v", err)
return
}
// TODO(roasbeef): need to handle case of if >
@ -2109,6 +2140,17 @@ func (lc *LightningChannel) closeObserver(channelCloseNtfn *chainntnfs.SpendEven
}
}
// We'll also send all the details necessary to re-claim funds
// that are suspended within any contracts.
unilateralCloseSummary := &UnilateralCloseSummary{
SpendDetail: commitSpend,
ChannelCloseSummary: closeSummary,
SelfOutPoint: selfPoint,
SelfOutputSignDesc: selfSignDesc,
MaturityDelay: uint32(lc.remoteChanCfg.CsvDelay),
HtlcResolutions: htlcResolutions,
}
// TODO(roasbeef): send msg before writing to disk
// * need to ensure proper fault tolerance in all cases
// * get ACK from the consumer of the ntfn before writing to disk?
@ -2118,15 +2160,11 @@ func (lc *LightningChannel) closeObserver(channelCloseNtfn *chainntnfs.SpendEven
// commitment transaction broadcast.
close(lc.UnilateralCloseSignal)
// We'll also send all the details necessary to re-claim funds
// that are suspended within any contracts.
lc.UnilateralClose <- &UnilateralCloseSummary{
SpendDetail: commitSpend,
ChannelCloseSummary: closeSummary,
SelfOutPoint: selfPoint,
SelfOutputSignDesc: selfSignDesc,
MaturityDelay: uint32(lc.remoteChanCfg.CsvDelay),
HtlcResolutions: htlcResolutions,
select {
case lc.UnilateralClose <- unilateralCloseSummary:
case <-lc.observerQuit:
walletLog.Errorf("channel shutting down")
return
}
// If the state number broadcast is lower than the remote node's
@ -2140,10 +2178,16 @@ func (lc *LightningChannel) closeObserver(channelCloseNtfn *chainntnfs.SpendEven
"broadcast!!!", lc.channelState.FundingOutpoint,
remoteStateNum)
if err := lc.channelState.MarkBorked(); err != nil {
walletLog.Errorf("unable to mark channel as borked: %v",
err)
return
}
// Create a new reach retribution struct which contains all the
// data needed to swiftly bring the cheating peer to justice.
retribution, err := newBreachRetribution(lc.channelState,
broadcastStateNum, commitTxBroadcast)
broadcastStateNum, commitTxBroadcast, spendHeight)
if err != nil {
walletLog.Errorf("unable to create breach retribution: %v", err)
return
@ -2155,7 +2199,55 @@ func (lc *LightningChannel) closeObserver(channelCloseNtfn *chainntnfs.SpendEven
// Finally, send the retribution struct over the contract beach
// channel to allow the observer the use the breach retribution
// to sweep ALL funds.
lc.ContractBreach <- retribution
select {
case lc.ContractBreach <- retribution:
case <-lc.observerQuit:
walletLog.Errorf("channel shutting down")
return
}
// Wait for the breach arbiter to ACK the handoff before marking
// the channel as pending force closed in channeldb.
select {
case err := <-retribution.Err:
// Bail if the handoff failed.
if err != nil {
walletLog.Errorf("unable to handoff "+
"retribution info: %v", err)
return
}
case <-lc.observerQuit:
walletLog.Errorf("channel shutting down")
return
}
// At this point, we've successfully received an ack for the
// breach close. We now construct and persist the close
// summary, marking the channel as pending force closed.
settledBalance := lc.channelState.LocalCommitment.
LocalBalance.ToSatoshis()
closeSummary := channeldb.ChannelCloseSummary{
ChanPoint: lc.channelState.FundingOutpoint,
ChainHash: lc.channelState.ChainHash,
ClosingTXID: *commitSpend.SpenderTxHash,
CloseHeight: spendHeight,
RemotePub: lc.channelState.IdentityPub,
Capacity: lc.Capacity,
SettledBalance: settledBalance,
CloseType: channeldb.BreachClose,
IsPending: true,
}
err = lc.DeleteState(&closeSummary)
if err != nil {
walletLog.Errorf("unable to delete channel state: %v",
err)
return
}
walletLog.Infof("Breached channel=%v marked pending-closed",
lc.channelState.FundingOutpoint)
}
}
@ -3190,6 +3282,10 @@ func (lc *LightningChannel) ProcessChanSyncMsg(msg *lnwire.ChannelReestablish) (
// chain reported by the remote party is not equal to our chain tail,
// then we cannot sync.
case !oweRevocation && localChainTail.height != msg.RemoteCommitTailHeight:
if err := lc.channelState.MarkBorked(); err != nil {
return nil, err
}
return nil, ErrCannotSyncCommitChains
}
@ -3218,6 +3314,10 @@ func (lc *LightningChannel) ProcessChanSyncMsg(msg *lnwire.ChannelReestablish) (
} else if !oweCommitment && remoteChainTip.height+1 !=
msg.NextLocalCommitHeight {
if err := lc.channelState.MarkBorked(); err != nil {
return nil, err
}
// If we don't owe them a commitment, yet the tip of their
// chain isn't one more than the next local commit height they
// report, we'll fail the channel.
@ -5118,3 +5218,14 @@ func (lc *LightningChannel) IsPending() bool {
return lc.channelState.IsPending
}
// State provides access to the channel's internal state for testing.
func (lc *LightningChannel) State() *channeldb.OpenChannel {
return lc.channelState
}
// ObserverQuit returns the quit channel used to coordinate the shutdown of the
// close observer.
func (lc *LightningChannel) ObserverQuit() chan struct{} {
return lc.observerQuit
}

View File

@ -3,13 +3,16 @@ package lnwallet
import (
"bytes"
"crypto/sha256"
"errors"
"fmt"
"io/ioutil"
"math/rand"
"os"
"reflect"
"runtime"
"sync"
"testing"
"time"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/chainntnfs"
@ -79,7 +82,7 @@ func (m *mockSigner) SignOutputRaw(tx *wire.MsgTx, signDesc *SignDescriptor) ([]
}
sig, err := txscript.RawTxInWitnessSignature(tx, signDesc.SigHashes,
signDesc.InputIndex, amt, witnessScript, txscript.SigHashAll,
signDesc.InputIndex, amt, witnessScript, signDesc.HashType,
privKey)
if err != nil {
return nil, err
@ -105,7 +108,7 @@ func (m *mockSigner) ComputeInputScript(tx *wire.MsgTx, signDesc *SignDescriptor
witnessScript, err := txscript.WitnessSignature(tx, signDesc.SigHashes,
signDesc.InputIndex, signDesc.Output.Value, signDesc.Output.PkScript,
txscript.SigHashAll, privKey, true)
signDesc.HashType, privKey, true)
if err != nil {
return nil, err
}
@ -121,6 +124,7 @@ type mockNotfier struct {
func (m *mockNotfier) RegisterConfirmationsNtfn(txid *chainhash.Hash, numConfs, heightHint uint32) (*chainntnfs.ConfirmationEvent, error) {
return nil, nil
}
func (m *mockNotfier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) {
return nil, nil
}
@ -132,6 +136,7 @@ func (m *mockNotfier) Start() error {
func (m *mockNotfier) Stop() error {
return nil
}
func (m *mockNotfier) RegisterSpendNtfn(outpoint *wire.OutPoint, heightHint uint32) (*chainntnfs.SpendEvent, error) {
return &chainntnfs.SpendEvent{
Spend: make(chan *chainntnfs.SpendDetail),
@ -140,6 +145,51 @@ func (m *mockNotfier) RegisterSpendNtfn(outpoint *wire.OutPoint, heightHint uint
}, nil
}
// mockSpendNotifier extends the mockNotifier so that spend notifications can be
// triggered and delivered to subscribers.
type mockSpendNotifier struct {
*mockNotfier
spendMap map[wire.OutPoint][]chan *chainntnfs.SpendDetail
}
func makeMockSpendNotifier() *mockSpendNotifier {
return &mockSpendNotifier{
spendMap: make(map[wire.OutPoint][]chan *chainntnfs.SpendDetail),
}
}
func (m *mockSpendNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
heightHint uint32) (*chainntnfs.SpendEvent, error) {
spendChan := make(chan *chainntnfs.SpendDetail, 1)
m.spendMap[*outpoint] = append(m.spendMap[*outpoint], spendChan)
return &chainntnfs.SpendEvent{
Spend: spendChan,
Cancel: func() {
},
}, nil
}
// Spend dispatches SpendDetails to all subscribers of the outpoint. The details
// will include the transaction and height provided by the caller.
func (m *mockSpendNotifier) Spend(outpoint *wire.OutPoint, height int32,
txn *wire.MsgTx) {
if spendChans, ok := m.spendMap[*outpoint]; ok {
delete(m.spendMap, *outpoint)
for _, spendChan := range spendChans {
txnHash := txn.TxHash()
spendChan <- &chainntnfs.SpendDetail{
SpentOutPoint: outpoint,
SpendingHeight: height,
SpendingTx: txn,
SpenderTxHash: &txnHash,
SpenderInputIndex: outpoint.Index,
}
}
}
}
// initRevocationWindows simulates a new channel being opened within the p2p
// network by populating the initial revocation windows of the passed
// commitment state machines.
@ -204,9 +254,36 @@ func forceStateTransition(chanA, chanB *LightningChannel) error {
return nil
}
// createTestChannels creates two test channels funded with 10 BTC, with 5 BTC
// createSpendableTestChannels initializes a pair of channels using a
// mockSpendNotifier. This allows us to test the behavior of the closeObserver,
// which is activated when the funding transaction is spent.
func createSpendableTestChannels(revocationWindow int) (*LightningChannel,
*LightningChannel, *mockSpendNotifier, func(), error) {
notifier := makeMockSpendNotifier()
alice, bob, cleanup, err := createTestChannelsWithNotifier(
revocationWindow, notifier,
)
return alice, bob, notifier, cleanup, err
}
// createTestChannels initializes a pair of channels using a mock notifier.
func createTestChannels(revocationWindow int) (*LightningChannel,
*LightningChannel, func(), error) {
notifier := &mockNotfier{}
return createTestChannelsWithNotifier(revocationWindow, notifier)
}
// createTestChannelsWithNotifier creates two test lightning channels using the
// provided notifier. The channel itself is funded with 10 BTC, with 5 BTC
// allocated to each side. Within the channel, Alice is the initiator.
func createTestChannels(revocationWindow int) (*LightningChannel, *LightningChannel, func(), error) {
func createTestChannelsWithNotifier(revocationWindow int,
notifier chainntnfs.ChainNotifier) (*LightningChannel,
*LightningChannel, func(), error) {
aliceKeyPriv, aliceKeyPub := btcec.PrivKeyFromBytes(btcec.S256(),
testWalletPrivKey)
bobKeyPriv, bobKeyPub := btcec.PrivKeyFromBytes(btcec.S256(),
@ -353,8 +430,6 @@ func createTestChannels(revocationWindow int) (*LightningChannel, *LightningChan
aliceSigner := &mockSigner{aliceKeyPriv}
bobSigner := &mockSigner{bobKeyPriv}
notifier := &mockNotfier{}
channelAlice, err := NewLightningChannel(aliceSigner, notifier,
estimator, aliceChannelState)
if err != nil {
@ -1151,6 +1226,106 @@ func TestForceCloseDustOutput(t *testing.T) {
}
}
// TestBreachClose checks that the resulting ForceCloseSummary is correct when a
// peer is ForceClosing the channel. Will check outputs both above and below
// the dust limit.
func TestBreachClose(t *testing.T) {
t.Parallel()
// Create a test channel which will be used for the duration of this
// unittest. The channel will be funded evenly with Alice having 5 BTC,
// and Bob having 5 BTC.
aliceChannel, bobChannel, notifier, cleanUp, err :=
createSpendableTestChannels(1)
if err != nil {
t.Fatalf("unable to create test channels: %v", err)
}
defer cleanUp()
// Send one HTLC from Alice to Bob, and advance the state of both
// channels.
htlcAmount := lnwire.NewMSatFromSatoshis(20000)
htlc, _ := createHTLC(0, htlcAmount)
if _, err := aliceChannel.AddHTLC(htlc); err != nil {
t.Fatalf("alice unable to add htlc: %v", err)
}
if _, err := bobChannel.ReceiveHTLC(htlc); err != nil {
t.Fatalf("bob unable to recv add htlc: %v", err)
}
if err := forceStateTransition(aliceChannel, bobChannel); err != nil {
t.Fatalf("Can't update the channel state: %v", err)
}
// Construct a force close summary of Bob's channel, this includes the
// breach transaction that will be used to spend the funding point.
forceCloseSummary, err := bobChannel.ForceClose()
if err != nil {
t.Fatalf("unable to force close bob's channel: %v", err)
}
// Send another HTLC and advance the state of both channels again. This
// ensures that Alice's state will be ahead of the breach transaction
// generated above.
htlc2, _ := createHTLC(1, htlcAmount)
if _, err := aliceChannel.AddHTLC(htlc2); err != nil {
t.Fatalf("alice unable to add htlc: %v", err)
}
if _, err := bobChannel.ReceiveHTLC(htlc2); err != nil {
t.Fatalf("bob unable to recv add htlc: %v", err)
}
if err := forceStateTransition(aliceChannel, bobChannel); err != nil {
t.Fatalf("Can't update the channel state: %v", err)
}
chanPoint := aliceChannel.ChanPoint
breachTxn := forceCloseSummary.CloseTx
// Spend the funding point using the breach transaction.
notifier.Spend(chanPoint, 100, breachTxn)
// Set up a separate routine to monitor alice's channel for a response
// to the spend. We use a generous timeout to ensure the test doesn't
// stall indefinitely, but allows us to block the main routine until the
// close observer exits.
errChan := make(chan error, 1)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
select {
case ret := <-aliceChannel.ContractBreach:
errChan <- nil
// Acknowledge a successful processing of the
// retribution information.
ret.Err <- nil
case <-aliceChannel.UnilateralClose:
errChan <- errors.New("expected breach close to " +
"be signaled, not unilateral")
case <-time.After(60 * time.Second):
errChan <- errors.New("breach was not signaled")
}
}()
// Wait for both the close observer to exit and our background process
// to exit before attempting to read from the error channel.
aliceChannel.WaitForClose()
wg.Wait()
// Now that all tasks have been shutdown, handle the result. The result
// should be available immediately, we allow five seconds to handle any
// variance in scheduling on travis.
select {
case err := <-errChan:
if err != nil {
t.Fatalf(err.Error())
}
case <-time.After(5 * time.Second):
t.Fatalf("breach was not received")
}
}
// TestDustHTLCFees checks that fees are calculated correctly when HTLCs fall
// below the nodes' dust limit. In these cases, the amount of the dust HTLCs
// should be applied to the commitment transaction fee.

82
mock.go
View File

@ -1,6 +1,8 @@
package main
import (
"fmt"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/roasbeef/btcd/btcec"
@ -24,6 +26,19 @@ func (m *mockSigner) SignOutputRaw(tx *wire.MsgTx,
witnessScript := signDesc.WitnessScript
privKey := m.key
if !privKey.PubKey().IsEqual(signDesc.PubKey) {
return nil, fmt.Errorf("incorrect key passed")
}
switch {
case signDesc.SingleTweak != nil:
privKey = lnwallet.TweakPrivKey(privKey,
signDesc.SingleTweak)
case signDesc.DoubleTweak != nil:
privKey = lnwallet.DeriveRevocationPrivKey(privKey,
signDesc.DoubleTweak)
}
sig, err := txscript.RawTxInWitnessSignature(tx, signDesc.SigHashes,
signDesc.InputIndex, amt, witnessScript, signDesc.HashType,
privKey)
@ -36,9 +51,24 @@ func (m *mockSigner) SignOutputRaw(tx *wire.MsgTx,
func (m *mockSigner) ComputeInputScript(tx *wire.MsgTx,
signDesc *lnwallet.SignDescriptor) (*lnwallet.InputScript, error) {
// TODO(roasbeef): expose tweaked signer from lnwallet so don't need to
// duplicate this code?
privKey := m.key
switch {
case signDesc.SingleTweak != nil:
privKey = lnwallet.TweakPrivKey(privKey,
signDesc.SingleTweak)
case signDesc.DoubleTweak != nil:
privKey = lnwallet.DeriveRevocationPrivKey(privKey,
signDesc.DoubleTweak)
}
witnessScript, err := txscript.WitnessSignature(tx, signDesc.SigHashes,
signDesc.InputIndex, signDesc.Output.Value,
signDesc.Output.PkScript, signDesc.HashType, m.key, true)
signDesc.InputIndex, signDesc.Output.Value, signDesc.Output.PkScript,
signDesc.HashType, privKey, true)
if err != nil {
return nil, err
}
@ -78,6 +108,54 @@ func (m *mockNotfier) RegisterSpendNtfn(outpoint *wire.OutPoint,
}, nil
}
// mockSpendNotifier extends the mockNotifier so that spend notifications can be
// triggered and delivered to subscribers.
type mockSpendNotifier struct {
*mockNotfier
spendMap map[wire.OutPoint][]chan *chainntnfs.SpendDetail
}
func makeMockSpendNotifier() *mockSpendNotifier {
return &mockSpendNotifier{
mockNotfier: &mockNotfier{
confChannel: make(chan *chainntnfs.TxConfirmation),
},
spendMap: make(map[wire.OutPoint][]chan *chainntnfs.SpendDetail),
}
}
func (m *mockSpendNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
heightHint uint32) (*chainntnfs.SpendEvent, error) {
spendChan := make(chan *chainntnfs.SpendDetail)
m.spendMap[*outpoint] = append(m.spendMap[*outpoint], spendChan)
return &chainntnfs.SpendEvent{
Spend: spendChan,
Cancel: func() {
},
}, nil
}
// Spend dispatches SpendDetails to all subscribers of the outpoint. The details
// will include the transaction and height provided by the caller.
func (m *mockSpendNotifier) Spend(outpoint *wire.OutPoint, height int32,
txn *wire.MsgTx) {
if spendChans, ok := m.spendMap[*outpoint]; ok {
delete(m.spendMap, *outpoint)
for _, spendChan := range spendChans {
txnHash := txn.TxHash()
spendChan <- &chainntnfs.SpendDetail{
SpentOutPoint: outpoint,
SpendingHeight: height,
SpendingTx: txn,
SpenderTxHash: &txnHash,
SpenderInputIndex: outpoint.Index,
}
}
}
}
type mockChainIO struct{}
func (*mockChainIO) GetBestBlock() (*chainhash.Hash, int32, error) {

View File

@ -318,6 +318,12 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
return fmt.Errorf("peer shutting down")
}
// Skip adding any permanently irreconcilable channels to the
// htlcswitch.
if dbChan.IsBorked {
continue
}
blockEpoch, err := p.server.cc.chainNotifier.RegisterBlockEpochNtfn()
if err != nil {
return err

View File

@ -17,7 +17,7 @@ import (
"github.com/roasbeef/btcutil"
)
func disablePeerLogger(t *testing.T) {
func init() {
peerLog = btclog.Disabled
srvrLog = btclog.Disabled
lnwallet.UseLogger(btclog.Disabled)
@ -28,7 +28,6 @@ func disablePeerLogger(t *testing.T) {
// TestPeerChannelClosureAcceptFeeResponder tests the shutdown responder's
// behavior if we can agree on the fee immediately.
func TestPeerChannelClosureAcceptFeeResponder(t *testing.T) {
disablePeerLogger(t)
t.Parallel()
notifier := &mockNotfier{
@ -118,7 +117,6 @@ func TestPeerChannelClosureAcceptFeeResponder(t *testing.T) {
// TestPeerChannelClosureAcceptFeeInitiator tests the shutdown initiator's
// behavior if we can agree on the fee immediately.
func TestPeerChannelClosureAcceptFeeInitiator(t *testing.T) {
disablePeerLogger(t)
t.Parallel()
notifier := &mockNotfier{
@ -228,7 +226,6 @@ func TestPeerChannelClosureAcceptFeeInitiator(t *testing.T) {
// responder's behavior in the case where we must do several rounds of fee
// negotiation before we agree on a fee.
func TestPeerChannelClosureFeeNegotiationsResponder(t *testing.T) {
disablePeerLogger(t)
t.Parallel()
notifier := &mockNotfier{
@ -409,7 +406,6 @@ func TestPeerChannelClosureFeeNegotiationsResponder(t *testing.T) {
// initiator's behavior in the case where we must do several rounds of fee
// negotiation before we agree on a fee.
func TestPeerChannelClosureFeeNegotiationsInitiator(t *testing.T) {
disablePeerLogger(t)
t.Parallel()
notifier := &mockNotfier{

View File

@ -344,7 +344,6 @@ func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl,
}
s.breachArbiter = newBreachArbiter(&BreachConfig{
ChainIO: s.cc.chainIO,
CloseLink: closeLink,
DB: chanDB,
Estimator: s.cc.feeEstimator,