From fb228a0f7d09ca67408e449688dcede6ac885e26 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 20 Nov 2017 23:56:42 -0800 Subject: [PATCH 01/13] breacharbiter: reliable handoff from wallet --- breacharbiter.go | 543 ++++++++++++++++++++++++++--------------------- 1 file changed, 297 insertions(+), 246 deletions(-) diff --git a/breacharbiter.go b/breacharbiter.go index a94608a1..7f9a5dcb 100644 --- a/breacharbiter.go +++ b/breacharbiter.go @@ -15,33 +15,34 @@ import ( "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/lnwallet" "github.com/roasbeef/btcd/blockchain" - "github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/chaincfg/chainhash" "github.com/roasbeef/btcd/txscript" "github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcutil" ) -// retributionBucket stores retribution state on disk between detecting a -// contract breach, broadcasting a justice transaction that sweeps the channel, -// and finally witnessing the justice transaction confirm on the blockchain. It -// is critical that such state is persisted on disk, so that if our node -// restarts at any point during the retribution procedure, we can recover and -// continue from the persisted state. -var retributionBucket = []byte("retribution") +var ( + // retributionBucket stores retribution state on disk between detecting + // a contract breach, broadcasting a justice transaction that sweeps the + // channel, and finally witnessing the justice transaction confirm on + // the blockchain. It is critical that such state is persisted on disk, + // so that if our node restarts at any point during the retribution + // procedure, we can recover and continue from the persisted state. + retributionBucket = []byte("retribution") + + // justiceTxnBucket holds the finalized justice transactions for all + // breached contracts. Entries are added to the justice txn bucket just + // before broadcasting the sweep txn. + justiceTxnBucket = []byte("justice-txn") +) // BreachConfig bundles the required subsystems used by the breach arbiter. An // instance of BreachConfig is passed to newBreachArbiter during instantiation. type BreachConfig struct { - // ChainIO is used by the breach arbiter to determine the current height - // of the blockchain, which is required to subscribe for spend - // notifications from Notifier. - ChainIO lnwallet.BlockChainIO - // CloseLink allows the breach arbiter to shutdown any channel links for // which it detects a breach, ensuring now further activity will - // continue across the link. The method accepts link's channel point and a - // close type to be included in the channel close summary. + // continue across the link. The method accepts link's channel point and + // a close type to be included in the channel close summary. CloseLink func(*wire.OutPoint, htlcswitch.ChannelCloseType) // DB provides access to the user's channels, allowing the breach @@ -85,6 +86,9 @@ type BreachConfig struct { // counterparties. // TODO(roasbeef): closures in config for subsystem pointers to decouple? type breachArbiter struct { + started uint32 + stopped uint32 + cfg *BreachConfig // breachObservers is a map which tracks all the active breach @@ -112,10 +116,8 @@ type breachArbiter struct { // breach closes. settledContracts chan *wire.OutPoint - started uint32 - stopped uint32 - quit chan struct{} - wg sync.WaitGroup + quit chan struct{} + wg sync.WaitGroup } // newBreachArbiter creates a new instance of a breachArbiter initialized with @@ -141,40 +143,52 @@ func (b *breachArbiter) Start() error { brarLog.Tracef("Starting breach arbiter") - // We load all pending retributions from the database and - // deterministically reconstruct a channel close summary for each. In - // the event that a channel is still open after being breached, we can - // use the close summary to reinitiate a channel close so that the - // breach is reflected in channeldb. + // Load all retributions currently persisted in the retribution store. breachRetInfos := make(map[wire.OutPoint]retributionInfo) - closeSummaries := make(map[wire.OutPoint]channeldb.ChannelCloseSummary) - err := b.cfg.Store.ForAll(func(ret *retributionInfo) error { - // Extract emitted retribution information. + if err := b.cfg.Store.ForAll(func(ret *retributionInfo) error { breachRetInfos[ret.chanPoint] = *ret - - // Deterministically reconstruct channel close summary from - // persisted retribution information and record in breach close - // summaries map under the corresponding channel point. - closeSummary := channeldb.ChannelCloseSummary{ - ChanPoint: ret.chanPoint, - ClosingTXID: ret.commitHash, - RemotePub: ret.remoteIdentity, - Capacity: ret.capacity, - SettledBalance: ret.settledBalance, - CloseType: channeldb.BreachClose, - IsPending: true, - } - closeSummaries[ret.chanPoint] = closeSummary - return nil - }) - if err != nil { + }); err != nil { return err } + // Load all currently closed channels from disk, we will use the + // channels that have been marked fully closed to filter the retribution + // information loaded from disk. This is necessary in the event that the + // channel was marked fully closed, but was not removed from the + // retribution store. + closedChans, err := b.cfg.DB.FetchClosedChannels(false) + if err != nil { + brarLog.Errorf("unable to fetch closing channels: %v", err) + return err + } + + // Using the set of non-pending, closed channels, reconcile any + // discrepancies between the channeldb and the retribution store by + // removing any retribution information for which we have already + // finished our responsibilities. If the removal is successful, we also + // remove the entry from our in-memory map, to avoid any further action + // for this channel. + for _, chanSummary := range closedChans { + if chanSummary.IsPending { + continue + } + + chanPoint := &chanSummary.ChanPoint + if _, ok := breachRetInfos[*chanPoint]; ok { + if err := b.cfg.Store.Remove(chanPoint); err != nil { + brarLog.Errorf("unable to remove closed "+ + "chanid=%v from breach arbiter: %v", + chanPoint, err) + return err + } + delete(breachRetInfos, *chanPoint) + } + } + // We need to query that database state for all currently active - // channels, each of these channels will need a goroutine assigned to - // it to watch for channel breaches. + // channels, these channels will represent a super set of all channels + // that may be assigned a go routine to monitor for channel breaches. activeChannels, err := b.cfg.DB.FetchAllChannels() if err != nil && err != channeldb.ErrNoActiveChannels { brarLog.Errorf("unable to fetch active channels: %v", err) @@ -188,29 +202,20 @@ func (b *breachArbiter) Start() error { } // Here we will determine a set of channels that will need to be managed - // by the contractObserver. For each of the open channels read from - // disk, we will create a channel state machine that can be used to - // watch for any potential channel closures. We must first exclude any - // channel whose retribution process has been initiated, and proceed to - // mark them as closed. The state machines generated for these filtered - // channels can be discarded, as their fate will be placed in the hands - // of an exactRetribution task spawned later. - // - // NOTE: Spawning of the exactRetribution task is intentionally - // postponed until after this step in order to ensure that the all - // breached channels are reflected as closed in channeldb and consistent - // with what is checkpointed by the breach arbiter. Instead of treating - // the breached-and-closed and breached-but-still-active channels as - // separate sets of channels, we first ensure that all - // breached-but-still-active channels are promoted to - // breached-and-closed during restart, allowing us to treat them as a - // single set from here on out. This approach also has the added benefit - // of minimizing the likelihood that the wrong number of tasks are - // spawned per breached channel, and prevents us from being in a - // position where retribution has completed but the channel is still - // marked as open in channeldb. + // by the contractObserver. This should comprise all active channels + // that have not been breached. If the channel point has an entry in the + // retribution store, we skip it to avoid creating a breach observer. + // Resolving breached channels will be handled later by spawning an + // exactRetribution task for each. channelsToWatch := make([]*lnwallet.LightningChannel, 0, nActive) for _, chanState := range activeChannels { + // If this channel was previously breached, we skip it here to + // avoid creating a breach observer, as we can go straight to + // the task of exacting retribution. + if _, ok := breachRetInfos[chanState.FundingOutpoint]; ok { + continue + } + // Initialize active channel from persisted channel state. channel, err := lnwallet.NewLightningChannel(nil, b.cfg.Notifier, b.cfg.Estimator, chanState) @@ -220,62 +225,28 @@ func (b *breachArbiter) Start() error { return err } - // Before marking this as an active channel that the breach - // arbiter should watch, check to see if this channel was - // previously breached. If so, we attempt to reflect this in the - // channeldb by closing the channel. Upon success, we continue - // because the channel is no longer open, and thus does not need - // to be managed by the contractObserver. - chanPoint := chanState.FundingOutpoint - if closeSummary, ok := closeSummaries[chanPoint]; ok { - // Since this channel should not be open, we - // immediately notify the HTLC switch that this link - // should be closed, and that all activity on the link - // should cease. - b.cfg.CloseLink(&chanState.FundingOutpoint, - htlcswitch.CloseBreach) - - // Ensure channeldb is consistent with the persisted - // breach. - err := channel.DeleteState(&closeSummary) - if err != nil { - brarLog.Errorf("unable to delete channel "+ - "state: %v", err) - return err - } - - // Now that this channel is both breached _and_ closed, - // we can skip adding it to the `channelsToWatch` since - // we can begin the retribution process immediately. - continue - } - // Finally, add this channel to breach arbiter's list of // channels to watch. channelsToWatch = append(channelsToWatch, channel) } - // TODO(roasbeef): instead use closure height of channel - _, currentHeight, err := b.cfg.ChainIO.GetBestBlock() - if err != nil { - return err - } - // Additionally, we'll also want to watch any pending close or force // close transactions so we can properly mark them as resolved in the // database. - if err := b.watchForPendingCloseConfs(currentHeight); err != nil { + if err := b.watchForPendingCloseConfs(); err != nil { return err } // Spawn the exactRetribution tasks to monitor and resolve any breaches // that were loaded from the retribution store. - for chanPoint, closeSummary := range closeSummaries { + for chanPoint := range breachRetInfos { + retInfo := breachRetInfos[chanPoint] + // Register for a notification when the breach transaction is // confirmed on chain. - breachTXID := closeSummary.ClosingTXID + breachTXID := retInfo.commitHash confChan, err := b.cfg.Notifier.RegisterConfirmationsNtfn( - &breachTXID, 1, uint32(currentHeight)) + &breachTXID, 1, retInfo.breachHeight) if err != nil { brarLog.Errorf("unable to register for conf updates "+ "for txid: %v, err: %v", breachTXID, err) @@ -284,7 +255,6 @@ func (b *breachArbiter) Start() error { // Launch a new goroutine which to finalize the channel // retribution after the breach transaction confirms. - retInfo := breachRetInfos[chanPoint] b.wg.Add(1) go b.exactRetribution(confChan, &retInfo) } @@ -298,12 +268,13 @@ func (b *breachArbiter) Start() error { // watchForPendingCloseConfs dispatches confirmation notification subscribers // that mark any pending channels as fully closed when signaled. -func (b *breachArbiter) watchForPendingCloseConfs(currentHeight int32) error { +func (b *breachArbiter) watchForPendingCloseConfs() error { pendingCloseChans, err := b.cfg.DB.FetchClosedChannels(true) if err != nil { brarLog.Errorf("unable to fetch closing channels: %v", err) return err } + for _, pendingClose := range pendingCloseChans { // If this channel was force closed, and we have a non-zero // time-locked balance, then the utxoNursery is currently @@ -319,7 +290,7 @@ func (b *breachArbiter) watchForPendingCloseConfs(currentHeight int32) error { closeTXID := pendingClose.ClosingTXID confNtfn, err := b.cfg.Notifier.RegisterConfirmationsNtfn( - &closeTXID, 1, uint32(currentHeight)) + &closeTXID, 1, pendingClose.CloseHeight) if err != nil { return err } @@ -376,6 +347,12 @@ func (b *breachArbiter) Stop() error { return nil } +// IsBreached queries the breach arbiter's retribution store to see if it is +// aware of any channel breaches for a particular channel point. +func (b *breachArbiter) IsBreached(chanPoint *wire.OutPoint) (bool, error) { + return b.cfg.Store.IsBreached(chanPoint) +} + // contractObserver is the primary goroutine for the breachArbiter. This // goroutine is responsible for managing goroutines that watch for breaches for // all current active and newly created channels. If a channel breach is @@ -389,13 +366,16 @@ func (b *breachArbiter) contractObserver( defer b.wg.Done() + brarLog.Infof("Starting contract observer with %v active channels", + len(activeChannels)) + // For each active channel found within the database, we launch a // detected breachObserver goroutine for that channel and also track // the new goroutine within the breachObservers map so we can cancel it // later if necessary. for _, channel := range activeChannels { settleSignal := make(chan struct{}) - chanPoint := channel.ChannelPoint() + chanPoint := channel.ChanPoint b.breachObservers[*chanPoint] = settleSignal b.wg.Add(1) @@ -409,12 +389,6 @@ out: for { select { case breachInfo := <-b.breachedContracts: - _, currentHeight, err := b.cfg.ChainIO.GetBestBlock() - if err != nil { - brarLog.Errorf("unable to get best height: %v", - err) - } - // A new channel contract has just been breached! We // first register for a notification to be dispatched // once the breach transaction (the revoked commitment @@ -422,7 +396,7 @@ out: // ensure we're not dealing with a moving target. breachTXID := &breachInfo.commitHash cfChan, err := b.cfg.Notifier.RegisterConfirmationsNtfn( - breachTXID, 1, uint32(currentHeight)) + breachTXID, 1, breachInfo.breachHeight) if err != nil { brarLog.Errorf("unable to register for conf "+ "updates for txid: %v, err: %v", @@ -449,7 +423,7 @@ out: // daemon, so we launch a new breachObserver to handle // the detection of attempted contract breaches. settleSignal := make(chan struct{}) - chanPoint := contract.ChannelPoint() + chanPoint := contract.ChanPoint // If the contract is already being watched, then an // additional send indicates we have a stale version of @@ -516,14 +490,17 @@ func (b *breachArbiter) exactRetribution( // TODO(roasbeef): state needs to be checkpointed here + var breachConfHeight uint32 select { - case _, ok := <-confChan.Confirmed: + case breachConf, ok := <-confChan.Confirmed: // If the second value is !ok, then the channel has been closed // signifying a daemon shutdown, so we exit. if !ok { return } + breachConfHeight = breachConf.BlockHeight + // Otherwise, if this is a real confirmation notification, then // we fall through to complete our duty. case <-b.quit: @@ -533,40 +510,55 @@ func (b *breachArbiter) exactRetribution( brarLog.Debugf("Breach transaction %v has been confirmed, sweeping "+ "revoked funds", breachInfo.commitHash) - // With the breach transaction confirmed, we now create the justice tx - // which will claim ALL the funds within the channel. - justiceTx, err := b.createJusticeTx(breachInfo) + finalTx, err := b.cfg.Store.GetFinalizedTxn(&breachInfo.chanPoint) if err != nil { - brarLog.Errorf("unable to create justice tx: %v", err) + brarLog.Errorf("unable to get finalized txn for"+ + "chanid=%v: %v", &breachInfo.chanPoint, err) return } + // If this retribution has not been finalized before, we will first + // construct a sweep transaction and write it to disk. This will allow + // the breach arbiter to re-register for notifications for the justice + // txid. + if finalTx == nil { + // With the breach transaction confirmed, we now create the + // justice tx which will claim ALL the funds within the channel. + finalTx, err = b.createJusticeTx(breachInfo) + if err != nil { + brarLog.Errorf("unable to create justice tx: %v", err) + return + } + + // Persist our finalized justice transaction before making an + // attempt to broadcast. + err := b.cfg.Store.Finalize(&breachInfo.chanPoint, finalTx) + if err != nil { + brarLog.Errorf("unable to finalize justice tx for "+ + "chanid=%v: %v", &breachInfo.chanPoint, err) + return + } + } + brarLog.Debugf("Broadcasting justice tx: %v", newLogClosure(func() string { - return spew.Sdump(justiceTx) + return spew.Sdump(finalTx) })) - _, currentHeight, err := b.cfg.ChainIO.GetBestBlock() - if err != nil { - brarLog.Errorf("unable to get current height: %v", err) - return - } - // Finally, broadcast the transaction, finalizing the channels' // retribution against the cheating counterparty. - if err := b.cfg.PublishTransaction(justiceTx); err != nil { + if err := b.cfg.PublishTransaction(finalTx); err != nil { brarLog.Errorf("unable to broadcast "+ "justice tx: %v", err) - return } // As a conclusionary step, we register for a notification to be // dispatched once the justice tx is confirmed. After confirmation we // notify the caller that initiated the retribution workflow that the // deed has been done. - justiceTXID := justiceTx.TxHash() + justiceTXID := finalTx.TxHash() confChan, err = b.cfg.Notifier.RegisterConfirmationsNtfn( - &justiceTXID, 1, uint32(currentHeight)) + &justiceTXID, 1, breachConfHeight) if err != nil { brarLog.Errorf("unable to register for conf for txid: %v", justiceTXID) @@ -607,6 +599,7 @@ func (b *breachArbiter) exactRetribution( err := b.cfg.DB.MarkChanFullyClosed(&breachInfo.chanPoint) if err != nil { brarLog.Errorf("unable to mark chan as closed: %v", err) + return } // Justice has been carried out; we can safely delete the @@ -640,7 +633,7 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel, defer b.wg.Done() - chanPoint := contract.ChannelPoint() + chanPoint := contract.ChanPoint brarLog.Debugf("Breach observer for ChannelPoint(%v) started ", chanPoint) @@ -746,57 +739,39 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel, // mid-local initiated state-transition, possible // false-positive? - // Obtain a snapshot of the final channel state, which can be - // used to reclose a breached channel in the event of a failure. - chanInfo := contract.StateSnapshot() - // Using the breach information provided by the wallet and the // channel snapshot, construct the retribution information that // will be persisted to disk. - retInfo := newRetributionInfo(chanPoint, breachInfo, chanInfo) + retInfo := newRetributionInfo(chanPoint, breachInfo) // Persist the pending retribution state to disk. - if err := b.cfg.Store.Add(retInfo); err != nil { - brarLog.Errorf("unable to persist retribution info "+ - "to db: %v", err) + err := b.cfg.Store.Add(retInfo) + if err != nil { + brarLog.Errorf("unable to persist retribution "+ + "info to db: %v", err) } - // TODO(conner): move responsibility of channel closure into - // lnwallet. Have breach arbiter ACK after writing to disk, then - // have wallet mark channel as closed. This allows the wallet to - // attempt to retransmit the breach info if the either arbiter - // or the wallet goes down before completing the hand off. + // Now that the breach has been persisted, try to send an + // acknowledgment back to the close observer with the error. If + // the ack is successful, the close observer will mark the + // channel as pending-closed in the channeldb. + select { + case breachInfo.Err <- err: + // Bail if we failed to persist retribution info. + if err != nil { + return + } - // Now that the breach arbiter has persisted the information, - // we can go ahead and mark the channel as closed in the - // channeldb. This step is done after persisting the - // retribution information so that a failure between these steps - // will cause an attempt to monitor the still-open channel. - // However, since the retribution information was persisted - // before, the arbiter will recognize that the channel should be - // closed, and proceed to mark it as such after a restart, and - // forgo monitoring it for breaches. + case <-contract.ObserverQuit(): + // If the close observer has already exited, it will + // never read the acknowledgment, so we exit. + return - // Construct the breached channel's close summary marking the - // channel using the snapshot from before, and marking this as a - // BreachClose. - closeInfo := &channeldb.ChannelCloseSummary{ - ChanPoint: *chanPoint, - ChainHash: breachInfo.ChainHash, - ClosingTXID: breachInfo.BreachTransaction.TxHash(), - RemotePub: &chanInfo.RemoteIdentity, - Capacity: chanInfo.Capacity, - SettledBalance: chanInfo.LocalBalance.ToSatoshis(), - CloseType: channeldb.BreachClose, - IsPending: true, - } - - // Next, persist the channel close to disk. Upon restart, the - // arbiter will recognize that this channel has been breached - // and marked close, and fast track its path to justice. - if err := contract.DeleteState(closeInfo); err != nil { - brarLog.Errorf("unable to delete channel state: %v", - err) + case <-b.quit: + // Cancel the close observer if the breach arbiter is + // shutting down, dropping the acknowledgment. + contract.CancelObserver() + return } // Finally, we send the retribution information into the @@ -807,6 +782,8 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel, } case <-b.quit: + contract.Stop() + contract.CancelObserver() return } } @@ -922,20 +899,10 @@ var _ SpendableOutput = (*breachedOutput)(nil) // spends all outputs of the commitment transaction into an output controlled // by the wallet. type retributionInfo struct { - commitHash chainhash.Hash - chanPoint wire.OutPoint - chainHash chainhash.Hash - - // TODO(conner): remove the following group of fields after decoupling - // the breach arbiter from the wallet. - - // Fields copied from channel snapshot when a breach is detected. This - // is necessary for deterministically constructing the channel close - // summary in the event that the breach arbiter crashes before closing - // the channel. - remoteIdentity *btcec.PublicKey - capacity btcutil.Amount - settledBalance btcutil.Amount + commitHash chainhash.Hash + chanPoint wire.OutPoint + chainHash chainhash.Hash + breachHeight uint32 breachedOutputs []breachedOutput } @@ -945,8 +912,7 @@ type retributionInfo struct { // channels. The information is primarily populated using the BreachRetribution // delivered by the wallet when it detects a channel breach. func newRetributionInfo(chanPoint *wire.OutPoint, - breachInfo *lnwallet.BreachRetribution, - chanInfo *channeldb.ChannelSnapshot) *retributionInfo { + breachInfo *lnwallet.BreachRetribution) *retributionInfo { // Determine the number of second layer HTLCs we will attempt to sweep. nHtlcs := len(breachInfo.HtlcRetributions) @@ -1009,16 +975,10 @@ func newRetributionInfo(chanPoint *wire.OutPoint, breachedOutputs = append(breachedOutputs, htlcOutput) } - // TODO(conner): remove dependency on channel snapshot after decoupling - // channel closure from the breach arbiter. - return &retributionInfo{ commitHash: breachInfo.BreachTransaction.TxHash(), - chainHash: chanInfo.ChainHash, + chainHash: breachInfo.ChainHash, chanPoint: *chanPoint, - remoteIdentity: &chanInfo.RemoteIdentity, - capacity: chanInfo.Capacity, - settledBalance: chanInfo.LocalBalance.ToSatoshis(), breachedOutputs: breachedOutputs, } } @@ -1226,6 +1186,19 @@ type RetributionStore interface { // the addition fails. Add(retInfo *retributionInfo) error + // IsBreached queries the retribution store to see if the breach arbiter + // is aware of any breaches for the provided channel point. + IsBreached(chanPoint *wire.OutPoint) (bool, error) + + // Finalize persists the finalized justice transaction for a particular + // channel. + Finalize(chanPoint *wire.OutPoint, finalTx *wire.MsgTx) error + + // GetFinalizedTxn loads the finalized justice transaction, if any, from + // the retribution store. The finalized transaction will be nil if + // Finalize has not yet been called for this channel point. + GetFinalizedTxn(chanPoint *wire.OutPoint) (*wire.MsgTx, error) + // Remove deletes the retributionInfo from disk, if any exists, under // the given key. An error should be re raised if the removal fails. Remove(key *wire.OutPoint) error @@ -1276,8 +1249,97 @@ func (rs *retributionStore) Add(ret *retributionInfo) error { }) } -// Remove removes a retribution state from the retributionStore database. -func (rs *retributionStore) Remove(key *wire.OutPoint) error { +// Finalize writes a signed justice transaction to the retribution store. This +// is done before publishing the transaction, so that we can recover the txid on +// startup and re-register for confirmation notifications. +func (rs *retributionStore) Finalize(chanPoint *wire.OutPoint, + finalTx *wire.MsgTx) error { + return rs.db.Update(func(tx *bolt.Tx) error { + justiceBkt, err := tx.CreateBucketIfNotExists(justiceTxnBucket) + if err != nil { + return err + } + + var chanBuf bytes.Buffer + if err := writeOutpoint(&chanBuf, chanPoint); err != nil { + return err + } + + var txBuf bytes.Buffer + if err := finalTx.Serialize(&txBuf); err != nil { + return err + } + + return justiceBkt.Put(chanBuf.Bytes(), txBuf.Bytes()) + }) +} + +// GetFinalizedTxn loads the finalized justice transaction for the provided +// channel point. The finalized transaction will be nil if Finalize has yet to +// be called for this channel point. +func (rs *retributionStore) GetFinalizedTxn( + chanPoint *wire.OutPoint) (*wire.MsgTx, error) { + + var finalTxBytes []byte + if err := rs.db.View(func(tx *bolt.Tx) error { + justiceBkt := tx.Bucket(justiceTxnBucket) + if justiceBkt == nil { + return nil + } + + var chanBuf bytes.Buffer + if err := writeOutpoint(&chanBuf, chanPoint); err != nil { + return err + } + + finalTxBytes = justiceBkt.Get(chanBuf.Bytes()) + + return nil + }); err != nil { + return nil, err + } + + if finalTxBytes == nil { + return nil, nil + } + + finalTx := &wire.MsgTx{} + err := finalTx.Deserialize(bytes.NewReader(finalTxBytes)) + + return finalTx, err +} + +// IsBreached queries the retribution store to discern if this channel was +// previously breached. This is used when connecting to a peer to determine if +// it is safe to add a link to the htlcswitch, as we should never add a channel +// that has already been breached. +func (rs *retributionStore) IsBreached(chanPoint *wire.OutPoint) (bool, error) { + var found bool + err := rs.db.View(func(tx *bolt.Tx) error { + retBucket := tx.Bucket(retributionBucket) + if retBucket == nil { + return nil + } + + var chanBuf bytes.Buffer + if err := writeOutpoint(&chanBuf, chanPoint); err != nil { + return err + } + + retInfo := retBucket.Get(chanBuf.Bytes()) + if retInfo != nil { + found = true + } + + return nil + }) + + return found, err +} + +// Remove removes a retribution state and finalized justice transaction by +// channel point from the retribution store. +func (rs *retributionStore) Remove(chanPoint *wire.OutPoint) error { return rs.db.Update(func(tx *bolt.Tx) error { retBucket := tx.Bucket(retributionBucket) @@ -1287,15 +1349,30 @@ func (rs *retributionStore) Remove(key *wire.OutPoint) error { // stored in the db. if retBucket == nil { return errors.New("unable to remove retribution " + - "because the db bucket doesn't exist.") + "because the retribution bucket doesn't exist.") } - var outBuf bytes.Buffer - if err := writeOutpoint(&outBuf, key); err != nil { + // Serialize the channel point we are intending to remove. + var chanBuf bytes.Buffer + if err := writeOutpoint(&chanBuf, chanPoint); err != nil { + return err + } + chanBytes := chanBuf.Bytes() + + // Remove the persisted retribution info and finalized justice + // transaction. + if err := retBucket.Delete(chanBytes); err != nil { return err } - return retBucket.Delete(outBuf.Bytes()) + // If we have not finalized this channel breach, we can exit + // early. + justiceBkt := tx.Bucket(justiceTxnBucket) + if justiceBkt == nil { + return nil + } + + return justiceBkt.Delete(chanBytes) }) } @@ -1313,11 +1390,10 @@ func (rs *retributionStore) ForAll(cb func(*retributionInfo) error) error { // Otherwise, we fetch each serialized retribution info, // deserialize it, and execute the passed in callback function // on it. - return retBucket.ForEach(func(outBytes, retBytes []byte) error { + return retBucket.ForEach(func(_, retBytes []byte) error { ret := &retributionInfo{} - if err := ret.Decode( - bytes.NewBuffer(retBytes), - ); err != nil { + err := ret.Decode(bytes.NewBuffer(retBytes)) + if err != nil { return err } @@ -1328,7 +1404,7 @@ func (rs *retributionStore) ForAll(cb func(*retributionInfo) error) error { // Encode serializes the retribution into the passed byte stream. func (ret *retributionInfo) Encode(w io.Writer) error { - var scratch [8]byte + var scratch [4]byte if _, err := w.Write(ret.commitHash[:]); err != nil { return err @@ -1342,18 +1418,8 @@ func (ret *retributionInfo) Encode(w io.Writer) error { return err } - if _, err := w.Write( - ret.remoteIdentity.SerializeCompressed()); err != nil { - return err - } - - binary.BigEndian.PutUint64(scratch[:8], uint64(ret.capacity)) - if _, err := w.Write(scratch[:8]); err != nil { - return err - } - - binary.BigEndian.PutUint64(scratch[:8], uint64(ret.settledBalance)) - if _, err := w.Write(scratch[:8]); err != nil { + binary.BigEndian.PutUint32(scratch[:], ret.breachHeight) + if _, err := w.Write(scratch[:]); err != nil { return err } @@ -1373,12 +1439,12 @@ func (ret *retributionInfo) Encode(w io.Writer) error { // Dencode deserializes a retribution from the passed byte stream. func (ret *retributionInfo) Decode(r io.Reader) error { - var scratch [33]byte + var scratch [32]byte - if _, err := io.ReadFull(r, scratch[:32]); err != nil { + if _, err := io.ReadFull(r, scratch[:]); err != nil { return err } - hash, err := chainhash.NewHash(scratch[:32]) + hash, err := chainhash.NewHash(scratch[:]) if err != nil { return err } @@ -1388,34 +1454,19 @@ func (ret *retributionInfo) Decode(r io.Reader) error { return err } - if _, err := io.ReadFull(r, scratch[:32]); err != nil { + if _, err := io.ReadFull(r, scratch[:]); err != nil { return err } - chainHash, err := chainhash.NewHash(scratch[:32]) + chainHash, err := chainhash.NewHash(scratch[:]) if err != nil { return err } ret.chainHash = *chainHash - if _, err = io.ReadFull(r, scratch[:33]); err != nil { + if _, err := io.ReadFull(r, scratch[:4]); err != nil { return err } - remoteIdentity, err := btcec.ParsePubKey(scratch[:33], btcec.S256()) - if err != nil { - return err - } - ret.remoteIdentity = remoteIdentity - - if _, err := io.ReadFull(r, scratch[:8]); err != nil { - return err - } - ret.capacity = btcutil.Amount(binary.BigEndian.Uint64(scratch[:8])) - - if _, err := io.ReadFull(r, scratch[:8]); err != nil { - return err - } - ret.settledBalance = btcutil.Amount( - binary.BigEndian.Uint64(scratch[:8])) + ret.breachHeight = binary.BigEndian.Uint32(scratch[:4]) nOutputsU64, err := wire.ReadVarInt(r, 0) if err != nil { From ff3a1389e5feac58c07589af9bb0276b38cf1434 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 20 Nov 2017 23:57:08 -0800 Subject: [PATCH 02/13] breacharbiter_test: remove channel close summary fields --- breacharbiter_test.go | 737 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 712 insertions(+), 25 deletions(-) diff --git a/breacharbiter_test.go b/breacharbiter_test.go index 9cdaadea..828ab7d7 100644 --- a/breacharbiter_test.go +++ b/breacharbiter_test.go @@ -4,16 +4,24 @@ package main import ( "bytes" + "crypto/sha256" "fmt" "io/ioutil" + "math/rand" "os" "reflect" "sync" "testing" + "time" "github.com/btcsuite/btclog" + "github.com/go-errors/errors" + "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/lnwallet" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/shachain" "github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/chaincfg/chainhash" "github.com/roasbeef/btcd/txscript" @@ -225,9 +233,8 @@ var ( 0xb7, 0x94, 0x38, 0x5f, 0x2d, 0x1e, 0xf7, 0xab, 0x6b, 0x49, 0x18, 0x83, 0x31, 0x98, 0x47, 0x53, }, - chanPoint: breachOutPoints[0], - capacity: btcutil.Amount(1e7), - settledBalance: btcutil.Amount(1e7), + chanPoint: breachOutPoints[0], + breachHeight: 337, // Set to breachedOutputs 0 and 1 in init() breachedOutputs: []breachedOutput{{}, {}}, }, @@ -244,9 +251,8 @@ var ( 0x6b, 0x49, 0x18, 0x83, 0x31, 0x98, 0x47, 0x53, 0x4d, 0x92, 0x73, 0xd1, 0x90, 0x63, 0x81, 0xb4, }, - chanPoint: breachOutPoints[1], - capacity: btcutil.Amount(1e7), - settledBalance: btcutil.Amount(1e7), + chanPoint: breachOutPoints[1], + breachHeight: 420420, // Set to breachedOutputs 1 and 2 in init() breachedOutputs: []breachedOutput{{}, {}}, }, @@ -254,6 +260,10 @@ var ( ) func init() { + channeldb.UseLogger(btclog.Disabled) + lnwallet.UseLogger(btclog.Disabled) + brarLog = btclog.Disabled + // Ensure that breached outputs are initialized before starting tests. if err := initBreachedOutputs(); err != nil { panic(err) @@ -263,7 +273,6 @@ func init() { // channel point. for i := range retributions { retInfo := &retributions[i] - retInfo.remoteIdentity = breachedOutputs[i].signDesc.PubKey retInfo.breachedOutputs[0] = breachedOutputs[i] retInfo.breachedOutputs[1] = breachedOutputs[i+1] @@ -291,6 +300,8 @@ type failingRetributionStore struct { rs RetributionStore + nextAddErr error + restart func() RetributionStore } @@ -307,19 +318,64 @@ func newFailingRetributionStore( } } +// FailNextAdd instructs the retribution store to return the provided error. If +// the error is nil, a generic default will be used. +func (frs *failingRetributionStore) FailNextAdd(err error) { + if err == nil { + err = errors.New("retribution store failed") + } + + frs.mu.Lock() + frs.nextAddErr = err + frs.mu.Unlock() +} + func (frs *failingRetributionStore) Restart() { frs.mu.Lock() frs.rs = frs.restart() frs.mu.Unlock() } +// Add forwards the call to the underlying retribution store, unless this Add +// has been previously instructed to fail. func (frs *failingRetributionStore) Add(retInfo *retributionInfo) error { frs.mu.Lock() defer frs.mu.Unlock() + if frs.nextAddErr != nil { + err := frs.nextAddErr + frs.nextAddErr = nil + return err + } + return frs.rs.Add(retInfo) } +func (frs *failingRetributionStore) IsBreached(chanPoint *wire.OutPoint) (bool, error) { + frs.mu.Lock() + defer frs.mu.Unlock() + + return frs.rs.IsBreached(chanPoint) +} + +func (frs *failingRetributionStore) Finalize(chanPoint *wire.OutPoint, + finalTx *wire.MsgTx) error { + + frs.mu.Lock() + defer frs.mu.Unlock() + + return frs.rs.Finalize(chanPoint, finalTx) +} + +func (frs *failingRetributionStore) GetFinalizedTxn( + chanPoint *wire.OutPoint) (*wire.MsgTx, error) { + + frs.mu.Lock() + defer frs.mu.Unlock() + + return frs.rs.GetFinalizedTxn(chanPoint) +} + func (frs *failingRetributionStore) Remove(key *wire.OutPoint) error { frs.mu.Lock() defer frs.mu.Unlock() @@ -415,9 +471,7 @@ func copyRetInfo(retInfo *retributionInfo) *retributionInfo { commitHash: retInfo.commitHash, chainHash: retInfo.chainHash, chanPoint: retInfo.chanPoint, - remoteIdentity: retInfo.remoteIdentity, - capacity: retInfo.capacity, - settledBalance: retInfo.settledBalance, + breachHeight: retInfo.breachHeight, breachedOutputs: make([]breachedOutput, nOutputs), } @@ -432,14 +486,16 @@ func copyRetInfo(retInfo *retributionInfo) *retributionInfo { // by an in-memory map. Access to the internal state is provided by a mutex. // TODO(cfromknecht) extend to support and test controlled failures. type mockRetributionStore struct { - mu sync.Mutex - state map[wire.OutPoint]*retributionInfo + mu sync.Mutex + state map[wire.OutPoint]*retributionInfo + finalTxs map[wire.OutPoint]*wire.MsgTx } func newMockRetributionStore() *mockRetributionStore { return &mockRetributionStore{ - mu: sync.Mutex{}, - state: make(map[wire.OutPoint]*retributionInfo), + mu: sync.Mutex{}, + state: make(map[wire.OutPoint]*retributionInfo), + finalTxs: make(map[wire.OutPoint]*wire.MsgTx), } } @@ -451,9 +507,38 @@ func (rs *mockRetributionStore) Add(retInfo *retributionInfo) error { return nil } +func (rs *mockRetributionStore) IsBreached(chanPoint *wire.OutPoint) (bool, error) { + rs.mu.Lock() + _, ok := rs.state[*chanPoint] + rs.mu.Unlock() + + return ok, nil +} + +func (rs *mockRetributionStore) Finalize(chanPoint *wire.OutPoint, + finalTx *wire.MsgTx) error { + + rs.mu.Lock() + rs.finalTxs[*chanPoint] = finalTx + rs.mu.Unlock() + + return nil +} + +func (rs *mockRetributionStore) GetFinalizedTxn( + chanPoint *wire.OutPoint) (*wire.MsgTx, error) { + + rs.mu.Lock() + finalTx := rs.finalTxs[*chanPoint] + rs.mu.Unlock() + + return finalTx, nil +} + func (rs *mockRetributionStore) Remove(key *wire.OutPoint) error { rs.mu.Lock() delete(rs.state, *key) + delete(rs.finalTxs, *key) rs.mu.Unlock() return nil @@ -515,28 +600,37 @@ func TestMockRetributionStore(t *testing.T) { } } -// TestChannelDBRetributionStore instantiates a retributionStore backed by a -// channeldb.DB, and tests its behavior using the general RetributionStore test -// suite. -func TestChannelDBRetributionStore(t *testing.T) { +func makeTestChannelDB() (*channeldb.DB, func(), error) { // First, create a temporary directory to be used for the duration of // this test. tempDirName, err := ioutil.TempDir("", "channeldb") if err != nil { - t.Fatalf("unable to initialize temp "+ - "directory for channeldb: %v", err) + return nil, nil, err } - defer os.RemoveAll(tempDirName) - // Disable logging to prevent panics bc. of global state - channeldb.UseLogger(btclog.Disabled) + cleanUp := func() { + os.RemoveAll(tempDirName) + } - // Next, create channeldb for the first time. db, err := channeldb.Open(tempDirName) + if err != nil { + cleanUp() + return nil, nil, err + } + + return db, cleanUp, nil +} + +// TestChannelDBRetributionStore instantiates a retributionStore backed by a +// channeldb.DB, and tests its behavior using the general RetributionStore test +// suite. +func TestChannelDBRetributionStore(t *testing.T) { + db, cleanUp, err := makeTestChannelDB() if err != nil { t.Fatalf("unable to open channeldb: %v", err) } defer db.Close() + defer cleanUp() restartDb := func() RetributionStore { // Close and reopen channeldb @@ -544,7 +638,7 @@ func TestChannelDBRetributionStore(t *testing.T) { t.Fatalf("unalbe to close channeldb during restart: %v", err) } - db, err = channeldb.Open(tempDirName) + db, err = channeldb.Open(db.Path()) if err != nil { t.Fatalf("unable to open channeldb: %v", err) } @@ -831,3 +925,596 @@ restartCheck: goto restartCheck } } + +// TestBreachHandoffSuccess tests that a channel's close observer properly +// delivers retribution information to the breach arbiter in response to a +// breach close. This test verifies correctness in the event that the handoff +// experiences no interruptions. +func TestBreachHandoffSuccess(t *testing.T) { + // Create a pair of channels using a notifier that allows us to signal + // a spend of the funding transaction. Alice's channel will be the on + // observing a breach. + notifier := makeMockSpendNotifier() + alice, bob, cleanUpChans, err := createInitChannelsWithNotifier( + 1, notifier) + if err != nil { + t.Fatalf("unable to create test channels: %v", err) + } + defer cleanUpChans() + + // Instantiate a breach arbiter to handle the breach of alice's channel. + brar, cleanUpArb, err := createTestArbiter(t, notifier, alice.State().Db) + if err != nil { + t.Fatalf("unable to initialize test breach arbiter: %v", err) + } + defer cleanUpArb() + + // Send the channel to the arbiter so that it set up the receiving end + // of the handoff. + select { + case brar.newContracts <- alice: + case <-time.After(500 * time.Millisecond): + t.Fatalf("unable to register alice with breach arbiter: %v", err) + } + + // Send one HTLC to Bob and perform a state transition to lock it in. + htlcAmount := lnwire.NewMSatFromSatoshis(20000) + htlc, _ := createHTLC(0, htlcAmount) + if _, err := alice.AddHTLC(htlc); err != nil { + t.Fatalf("alice unable to add htlc: %v", err) + } + if _, err := bob.ReceiveHTLC(htlc); err != nil { + t.Fatalf("bob unable to recv add htlc: %v", err) + } + if err := forceStateTransition(alice, bob); err != nil { + t.Fatalf("Can't update the channel state: %v", err) + } + + // Generate the force close summary at this point in time, this will + // serve as the old state bob will broadcast. + forceCloseSummary, err := bob.ForceClose() + if err != nil { + t.Fatalf("unable to force close bob's channel: %v", err) + } + + // Now send another HTLC and perform a state transition, this ensures + // Alice is ahead of the state Bob will broadcast. + htlc2, _ := createHTLC(1, htlcAmount) + if _, err := alice.AddHTLC(htlc2); err != nil { + t.Fatalf("alice unable to add htlc: %v", err) + } + if _, err := bob.ReceiveHTLC(htlc2); err != nil { + t.Fatalf("bob unable to recv add htlc: %v", err) + } + if err := forceStateTransition(alice, bob); err != nil { + t.Fatalf("Can't update the channel state: %v", err) + } + + chanPoint := alice.ChanPoint + breachTxn := forceCloseSummary.CloseTx + + // Signal a spend of the funding transaction and wait for the close + // observer to exit. + notifier.Spend(chanPoint, 100, breachTxn) + alice.WaitForClose() + + // After exiting, the breach arbiter should have persisted the + // retribution information and the channel should be shown as pending + // force closed. + assertArbiterBreach(t, brar, chanPoint) + assertPendingClosed(t, alice) +} + +// TestBreachHandoffFail tests that a channel's close observer properly +// delivers retribution information to the breach arbiter in response to a +// breach close. This test verifies correctness in the event that the breach +// arbiter fails to write the information to disk, and that a subsequent attempt +// at the handoff succeeds. +func TestBreachHandoffFail(t *testing.T) { + // Create a pair of channels using a notifier that allows us to signal + // a spend of the funding transaction. Alice's channel will be the on + // observing a breach. + notifier := makeMockSpendNotifier() + alice, bob, cleanUpChans, err := createInitChannelsWithNotifier( + 1, notifier) + if err != nil { + t.Fatalf("unable to create test channels: %v", err) + } + defer cleanUpChans() + + // Instantiate a breach arbiter to handle the breach of alice's channel. + brar, cleanUpArb, err := createTestArbiter(t, notifier, alice.State().Db) + if err != nil { + t.Fatalf("unable to initialize test breach arbiter: %v", err) + } + defer cleanUpArb() + + // Send the channel to the arbiter so that it set up the receiving end + // of the handoff. + select { + case brar.newContracts <- alice: + case <-time.After(500 * time.Millisecond): + t.Fatalf("unable to register alice with breach arbiter: %v", err) + } + + // Send one HTLC to Bob and perform a state transition to lock it in. + htlcAmount := lnwire.NewMSatFromSatoshis(20000) + htlc, _ := createHTLC(0, htlcAmount) + if _, err := alice.AddHTLC(htlc); err != nil { + t.Fatalf("alice unable to add htlc: %v", err) + } + if _, err := bob.ReceiveHTLC(htlc); err != nil { + t.Fatalf("bob unable to recv add htlc: %v", err) + } + if err := forceStateTransition(alice, bob); err != nil { + t.Fatalf("Can't update the channel state: %v", err) + } + + // Generate the force close summary at this point in time, this will + // serve as the old state bob will broadcast. + forceCloseSummary, err := bob.ForceClose() + if err != nil { + t.Fatalf("unable to force close bob's channel: %v", err) + } + + // Now send another HTLC and perform a state transition, this ensures + // Alice is ahead of the state Bob will broadcast. + htlc2, _ := createHTLC(1, htlcAmount) + if _, err := alice.AddHTLC(htlc2); err != nil { + t.Fatalf("alice unable to add htlc: %v", err) + } + if _, err := bob.ReceiveHTLC(htlc2); err != nil { + t.Fatalf("bob unable to recv add htlc: %v", err) + } + if err := forceStateTransition(alice, bob); err != nil { + t.Fatalf("Can't update the channel state: %v", err) + } + + // Before alerting Alice of the breach, instruct our failing retribution + // store to fail the next database operation, which we expect to write + // the information handed off by the channel's close observer. + fstore := brar.cfg.Store.(*failingRetributionStore) + fstore.FailNextAdd(nil) + + // Signal the notifier to dispatch spend notifications of the funding + // transaction using the transaction from bob's closing summary. + chanPoint := alice.ChanPoint + breachTxn := forceCloseSummary.CloseTx + notifier.Spend(chanPoint, 100, breachTxn) + + // Wait for the close observer to exit, all persistent effects should be + // observable after this point. + alice.WaitForClose() + + // Since the handoff failed, the breach arbiter should not show the + // channel as breached, and the channel should also not have been marked + // pending closed. + assertNoArbiterBreach(t, brar, chanPoint) + assertNotPendingClosed(t, alice) + + // Instantiate a second lightning channel for alice, using the state of + // her last channel. + aliceKeyPriv, _ := btcec.PrivKeyFromBytes(btcec.S256(), + alicesPrivKey) + aliceSigner := &mockSigner{aliceKeyPriv} + estimator := &lnwallet.StaticFeeEstimator{FeeRate: 50} + + alice2, err := lnwallet.NewLightningChannel(aliceSigner, notifier, + estimator, alice.State()) + if err != nil { + t.Fatalf("unable to create test channels: %v", err) + } + defer alice2.Stop() + + // Send this newer channel to breach arbiter, which should replace the + // prior. + select { + case brar.newContracts <- alice2: + case <-time.After(500 * time.Millisecond): + t.Fatalf("unable to register alice with breach arbiter: %v", err) + } + + // Signal a spend of the funding transaction and wait for the close + // observer to exit. This time we are allowing the handoff to succeed. + notifier.Spend(chanPoint, 100, breachTxn) + alice2.WaitForClose() + + // Check that the breach was properly recorded in the breach arbiter, + // and that the close observer marked the channel as pending closed + // before exiting. + assertArbiterBreach(t, brar, chanPoint) + assertPendingClosed(t, alice) +} + +// assertArbiterBreach checks that the breach arbiter has persisted the breach +// information for a particular channel. +func assertArbiterBreach(t *testing.T, brar *breachArbiter, + chanPoint *wire.OutPoint) { + + isBreached, err := brar.IsBreached(chanPoint) + if err != nil { + t.Fatalf("unable to determine if channel is "+ + "breached: %v", err) + } + + if !isBreached { + t.Fatalf("channel %v was never marked breached", + chanPoint) + } + +} + +// assertNoArbiterBreach checks that the breach arbiter has not persisted the +// breach information for a particular channel. +func assertNoArbiterBreach(t *testing.T, brar *breachArbiter, + chanPoint *wire.OutPoint) { + + isBreached, err := brar.IsBreached(chanPoint) + if err != nil { + t.Fatalf("unable to determine if channel is "+ + "breached: %v", err) + } + + if isBreached { + t.Fatalf("channel %v was marked breached", + chanPoint) + } +} + +// assertPendingClosed checks that the channel has been marked pending closed in +// the channel database. +func assertPendingClosed(t *testing.T, c *lnwallet.LightningChannel) { + closedChans, err := c.State().Db.FetchClosedChannels(true) + if err != nil { + t.Fatalf("unable to load pending closed channels: %v", err) + } + + for _, chanSummary := range closedChans { + if chanSummary.ChanPoint == *c.ChanPoint { + return + } + } + + t.Fatalf("channel %v was not marked pending closed", + c.ChanPoint) +} + +// assertNotPendingClosed checks that the channel has not been marked pending +// closed in the channel database. +func assertNotPendingClosed(t *testing.T, c *lnwallet.LightningChannel) { + closedChans, err := c.State().Db.FetchClosedChannels(true) + if err != nil { + t.Fatalf("unable to load pending closed channels: %v", err) + } + + for _, chanSummary := range closedChans { + if chanSummary.ChanPoint == *c.ChanPoint { + t.Fatalf("channel %v was marked pending closed", + c.ChanPoint) + } + } +} + +// createTestArbiter instantiates a breach arbiter with a failing retribution +// store, so that controlled failures can be tested. +func createTestArbiter(t *testing.T, notifier chainntnfs.ChainNotifier, + db *channeldb.DB) (*breachArbiter, func(), error) { + + // Create a failing retribution store, that wraps a normal one. + store := newFailingRetributionStore(func() RetributionStore { + return newRetributionStore(db) + }) + + aliceKeyPriv, _ := btcec.PrivKeyFromBytes(btcec.S256(), + alicesPrivKey) + signer := &mockSigner{key: aliceKeyPriv} + + // Assemble our test arbiter. + ba := newBreachArbiter(&BreachConfig{ + CloseLink: func(_ *wire.OutPoint, _ htlcswitch.ChannelCloseType) {}, + DB: db, + Estimator: &lnwallet.StaticFeeEstimator{FeeRate: 50}, + GenSweepScript: func() ([]byte, error) { return nil, nil }, + Notifier: notifier, + Signer: signer, + PublishTransaction: func(_ *wire.MsgTx) error { return nil }, + Store: store, + }) + + if err := ba.Start(); err != nil { + return nil, nil, err + } + + // The caller is responsible for closing the database. + cleanUp := func() { + ba.Stop() + } + + return ba, cleanUp, nil +} + +// createInitChannelsWithNotifier creates two initialized test channels funded +// with 10 BTC, with 5 BTC allocated to each side. Within the channel, Alice is +// the initiator. +func createInitChannelsWithNotifier(revocationWindow int, + notifier chainntnfs.ChainNotifier) (*lnwallet.LightningChannel, + *lnwallet.LightningChannel, func(), error) { + + aliceKeyPriv, aliceKeyPub := btcec.PrivKeyFromBytes(btcec.S256(), + alicesPrivKey) + bobKeyPriv, bobKeyPub := btcec.PrivKeyFromBytes(btcec.S256(), + bobsPrivKey) + + channelCapacity := btcutil.Amount(10 * 1e8) + channelBal := channelCapacity / 2 + aliceDustLimit := btcutil.Amount(200) + bobDustLimit := btcutil.Amount(1300) + csvTimeoutAlice := uint32(5) + csvTimeoutBob := uint32(4) + + prevOut := &wire.OutPoint{ + Hash: chainhash.Hash(testHdSeed), + Index: 0, + } + fundingTxIn := wire.NewTxIn(prevOut, nil, nil) + + aliceCfg := channeldb.ChannelConfig{ + ChannelConstraints: channeldb.ChannelConstraints{ + DustLimit: aliceDustLimit, + MaxPendingAmount: lnwire.MilliSatoshi(rand.Int63()), + ChanReserve: btcutil.Amount(rand.Int63()), + MinHTLC: lnwire.MilliSatoshi(rand.Int63()), + MaxAcceptedHtlcs: uint16(rand.Int31()), + }, + CsvDelay: uint16(csvTimeoutAlice), + MultiSigKey: aliceKeyPub, + RevocationBasePoint: aliceKeyPub, + PaymentBasePoint: aliceKeyPub, + DelayBasePoint: aliceKeyPub, + HtlcBasePoint: aliceKeyPub, + } + bobCfg := channeldb.ChannelConfig{ + ChannelConstraints: channeldb.ChannelConstraints{ + DustLimit: bobDustLimit, + MaxPendingAmount: lnwire.MilliSatoshi(rand.Int63()), + ChanReserve: btcutil.Amount(rand.Int63()), + MinHTLC: lnwire.MilliSatoshi(rand.Int63()), + MaxAcceptedHtlcs: uint16(rand.Int31()), + }, + CsvDelay: uint16(csvTimeoutBob), + MultiSigKey: bobKeyPub, + RevocationBasePoint: bobKeyPub, + PaymentBasePoint: bobKeyPub, + DelayBasePoint: bobKeyPub, + HtlcBasePoint: bobKeyPub, + } + + bobRoot := lnwallet.DeriveRevocationRoot(bobKeyPriv, testHdSeed, aliceKeyPub) + bobPreimageProducer := shachain.NewRevocationProducer(bobRoot) + bobFirstRevoke, err := bobPreimageProducer.AtIndex(0) + if err != nil { + return nil, nil, nil, err + } + bobCommitPoint := lnwallet.ComputeCommitmentPoint(bobFirstRevoke[:]) + + aliceRoot := lnwallet.DeriveRevocationRoot(aliceKeyPriv, testHdSeed, bobKeyPub) + alicePreimageProducer := shachain.NewRevocationProducer(aliceRoot) + aliceFirstRevoke, err := alicePreimageProducer.AtIndex(0) + if err != nil { + return nil, nil, nil, err + } + aliceCommitPoint := lnwallet.ComputeCommitmentPoint(aliceFirstRevoke[:]) + + aliceCommitTx, bobCommitTx, err := lnwallet.CreateCommitmentTxns(channelBal, + channelBal, &aliceCfg, &bobCfg, aliceCommitPoint, bobCommitPoint, + *fundingTxIn) + if err != nil { + return nil, nil, nil, err + } + + alicePath, err := ioutil.TempDir("", "alicedb") + dbAlice, err := channeldb.Open(alicePath) + if err != nil { + return nil, nil, nil, err + } + + bobPath, err := ioutil.TempDir("", "bobdb") + dbBob, err := channeldb.Open(bobPath) + if err != nil { + return nil, nil, nil, err + } + + estimator := &lnwallet.StaticFeeEstimator{FeeRate: 50} + feePerWeight, err := estimator.EstimateFeePerWeight(1) + if err != nil { + return nil, nil, nil, err + } + feePerKw := feePerWeight * 1000 + + // TODO(roasbeef): need to factor in commit fee? + aliceCommit := channeldb.ChannelCommitment{ + CommitHeight: 0, + LocalBalance: lnwire.NewMSatFromSatoshis(channelBal), + RemoteBalance: lnwire.NewMSatFromSatoshis(channelBal), + FeePerKw: feePerKw, + CommitFee: 8688, + CommitTx: aliceCommitTx, + CommitSig: bytes.Repeat([]byte{1}, 71), + } + bobCommit := channeldb.ChannelCommitment{ + CommitHeight: 0, + LocalBalance: lnwire.NewMSatFromSatoshis(channelBal), + RemoteBalance: lnwire.NewMSatFromSatoshis(channelBal), + FeePerKw: feePerKw, + CommitFee: 8688, + CommitTx: bobCommitTx, + CommitSig: bytes.Repeat([]byte{1}, 71), + } + + aliceChannelState := &channeldb.OpenChannel{ + LocalChanCfg: aliceCfg, + RemoteChanCfg: bobCfg, + IdentityPub: aliceKeyPub, + FundingOutpoint: *prevOut, + ChanType: channeldb.SingleFunder, + IsInitiator: true, + Capacity: channelCapacity, + RemoteCurrentRevocation: bobCommitPoint, + RevocationProducer: alicePreimageProducer, + RevocationStore: shachain.NewRevocationStore(), + LocalCommitment: aliceCommit, + RemoteCommitment: aliceCommit, + Db: dbAlice, + } + bobChannelState := &channeldb.OpenChannel{ + LocalChanCfg: bobCfg, + RemoteChanCfg: aliceCfg, + IdentityPub: bobKeyPub, + FundingOutpoint: *prevOut, + ChanType: channeldb.SingleFunder, + IsInitiator: false, + Capacity: channelCapacity, + RemoteCurrentRevocation: aliceCommitPoint, + RevocationProducer: bobPreimageProducer, + RevocationStore: shachain.NewRevocationStore(), + LocalCommitment: bobCommit, + RemoteCommitment: bobCommit, + Db: dbBob, + } + + aliceSigner := &mockSigner{aliceKeyPriv} + bobSigner := &mockSigner{bobKeyPriv} + + channelAlice, err := lnwallet.NewLightningChannel(aliceSigner, notifier, + estimator, aliceChannelState) + if err != nil { + return nil, nil, nil, err + } + channelBob, err := lnwallet.NewLightningChannel(bobSigner, notifier, + estimator, bobChannelState) + if err != nil { + return nil, nil, nil, err + } + + if err := channelAlice.State().FullSync(); err != nil { + return nil, nil, nil, err + } + if err := channelBob.State().FullSync(); err != nil { + return nil, nil, nil, err + } + + cleanUpFunc := func() { + dbBob.Close() + dbAlice.Close() + os.RemoveAll(bobPath) + os.RemoveAll(alicePath) + } + + // Now that the channel are open, simulate the start of a session by + // having Alice and Bob extend their revocation windows to each other. + err = initRevocationWindows(channelAlice, channelBob, revocationWindow) + if err != nil { + return nil, nil, nil, err + } + + return channelAlice, channelBob, cleanUpFunc, nil +} + +// initRevocationWindows simulates a new channel being opened within the p2p +// network by populating the initial revocation windows of the passed +// commitment state machines. +// +// TODO(conner) remove code duplication +func initRevocationWindows(chanA, chanB *lnwallet.LightningChannel, windowSize int) error { + aliceNextRevoke, err := chanA.NextRevocationKey() + if err != nil { + return err + } + if err := chanB.InitNextRevocation(aliceNextRevoke); err != nil { + return err + } + + bobNextRevoke, err := chanB.NextRevocationKey() + if err != nil { + return err + } + if err := chanA.InitNextRevocation(bobNextRevoke); err != nil { + return err + } + + return nil +} + +// createHTLC is a utility function for generating an HTLC with a given +// preimage and a given amount. +// TODO(conner) remove code duplication +func createHTLC(data int, amount lnwire.MilliSatoshi) (*lnwire.UpdateAddHTLC, [32]byte) { + preimage := bytes.Repeat([]byte{byte(data)}, 32) + paymentHash := sha256.Sum256(preimage) + + var returnPreimage [32]byte + copy(returnPreimage[:], preimage) + + return &lnwire.UpdateAddHTLC{ + ID: uint64(data), + PaymentHash: paymentHash, + Amount: amount, + Expiry: uint32(5), + }, returnPreimage +} + +// forceStateTransition executes the necessary interaction between the two +// commitment state machines to transition to a new state locking in any +// pending updates. +// TODO(conner) remove code duplication +func forceStateTransition(chanA, chanB *lnwallet.LightningChannel) error { + aliceSig, aliceHtlcSigs, err := chanA.SignNextCommitment() + if err != nil { + return err + } + if err = chanB.ReceiveNewCommitment(aliceSig, aliceHtlcSigs); err != nil { + return err + } + + bobRevocation, err := chanB.RevokeCurrentCommitment() + if err != nil { + return err + } + bobSig, bobHtlcSigs, err := chanB.SignNextCommitment() + if err != nil { + return err + } + + if _, err := chanA.ReceiveRevocation(bobRevocation); err != nil { + return err + } + if err := chanA.ReceiveNewCommitment(bobSig, bobHtlcSigs); err != nil { + return err + } + + aliceRevocation, err := chanA.RevokeCurrentCommitment() + if err != nil { + return err + } + if _, err := chanB.ReceiveRevocation(aliceRevocation); err != nil { + return err + } + + return nil +} + +// calcStaticFee calculates appropriate fees for commitment transactions. This +// function provides a simple way to allow test balance assertions to take fee +// calculations into account. +// +// TODO(bvu): Refactor when dynamic fee estimation is added. +// TODO(conner) remove code duplication +func calcStaticFee(numHTLCs int) btcutil.Amount { + const ( + commitWeight = btcutil.Amount(724) + htlcWeight = 172 + feePerKw = btcutil.Amount(24/4) * 1000 + ) + return feePerKw * (commitWeight + + btcutil.Amount(htlcWeight*numHTLCs)) / 1000 +} From bb8c5f82da04b6d2a4c36569517f0192446cef0a Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 20 Nov 2017 23:57:33 -0800 Subject: [PATCH 03/13] lnwallet/channel: delete state after ack from breach arb --- lnwallet/channel.go | 128 +++++++++++++++++++++++++++++++++++++++----- 1 file changed, 114 insertions(+), 14 deletions(-) diff --git a/lnwallet/channel.go b/lnwallet/channel.go index 2f2e17ad..f1ab217d 100644 --- a/lnwallet/channel.go +++ b/lnwallet/channel.go @@ -1111,6 +1111,9 @@ type LightningChannel struct { status channelState + // ChanPoint is the funding outpoint of this channel. + ChanPoint *wire.OutPoint + // sigPool is a pool of workers that are capable of signing and // validating signatures in parallel. This is utilized as an // optimization to void serially signing or validating the HTLC @@ -1269,6 +1272,7 @@ func NewLightningChannel(signer Signer, events chainntnfs.ChainNotifier, remoteChanCfg: &state.RemoteChanCfg, localUpdateLog: localUpdateLog, remoteUpdateLog: remoteUpdateLog, + ChanPoint: &state.FundingOutpoint, Capacity: state.Capacity, FundingWitnessScript: multiSigScript, ForceCloseSignal: make(chan struct{}), @@ -1717,6 +1721,11 @@ type BreachRetribution struct { // commitment transaction. BreachTransaction *wire.MsgTx + // BreachHeight records the block height confirming the breach + // transaction, used as a height hint when registering for + // confirmations. + BreachHeight uint32 + // ChainHash is the chain that the contract beach was identified // within. This is also the resident chain of the contract (the chain // the contract was created on). @@ -1757,13 +1766,18 @@ type BreachRetribution struct { // HtlcRetributions is a slice of HTLC retributions for each output // active HTLC output within the breached commitment transaction. HtlcRetributions []HtlcRetribution + + // Err is used to reliably hand-off the breach retribution to the breach + // arbiter. + Err chan error } // newBreachRetribution creates a new fully populated BreachRetribution for the // passed channel, at a particular revoked state number, and one which targets // the passed commitment transaction. func newBreachRetribution(chanState *channeldb.OpenChannel, stateNum uint64, - broadcastCommitment *wire.MsgTx) (*BreachRetribution, error) { + broadcastCommitment *wire.MsgTx, + breachHeight uint32) (*BreachRetribution, error) { commitHash := broadcastCommitment.TxHash() @@ -1926,6 +1940,7 @@ func newBreachRetribution(chanState *channeldb.OpenChannel, stateNum uint64, return &BreachRetribution{ ChainHash: chanState.ChainHash, BreachTransaction: broadcastCommitment, + BreachHeight: breachHeight, RevokedStateNum: stateNum, PendingHTLCs: revokedSnapshot.Htlcs, LocalOutpoint: localOutpoint, @@ -1933,6 +1948,7 @@ func newBreachRetribution(chanState *channeldb.OpenChannel, stateNum uint64, RemoteOutpoint: remoteOutpoint, RemoteOutputSignDesc: remoteSignDesc, HtlcRetributions: htlcRetributions, + Err: make(chan error, 1), }, nil } @@ -1951,6 +1967,7 @@ func (lc *LightningChannel) closeObserver(channelCloseNtfn *chainntnfs.SpendEven var ( commitSpend *chainntnfs.SpendDetail + spendHeight uint32 ok bool ) @@ -1962,6 +1979,8 @@ func (lc *LightningChannel) closeObserver(channelCloseNtfn *chainntnfs.SpendEven return } + spendHeight = uint32(commitSpend.SpendingHeight) + // Otherwise, we've been signalled to bail out early by the // caller/maintainer of this channel. case <-lc.observerQuit: @@ -2012,6 +2031,7 @@ func (lc *LightningChannel) closeObserver(channelCloseNtfn *chainntnfs.SpendEven remoteStateNum := lc.channelState.RemoteCommitment.CommitHeight // TODO(roasbeef): track heights distinctly? + switch { // If state number spending transaction matches the current latest // state, then they've initiated a unilateral close. So we'll trigger @@ -2037,15 +2057,17 @@ func (lc *LightningChannel) closeObserver(channelCloseNtfn *chainntnfs.SpendEven ChanPoint: lc.channelState.FundingOutpoint, ChainHash: lc.channelState.ChainHash, ClosingTXID: *commitSpend.SpenderTxHash, + CloseHeight: spendHeight, RemotePub: lc.channelState.IdentityPub, Capacity: lc.Capacity, SettledBalance: lc.channelState.LocalCommitment.LocalBalance.ToSatoshis(), CloseType: channeldb.ForceClose, IsPending: true, } + if err := lc.DeleteState(&closeSummary); err != nil { - walletLog.Errorf("unable to delete channel state: %v", - err) + walletLog.Errorf("unable to delete channel state: %v", err) + return } // TODO(roasbeef): need to handle case of if > @@ -2109,6 +2131,17 @@ func (lc *LightningChannel) closeObserver(channelCloseNtfn *chainntnfs.SpendEven } } + // We'll also send all the details necessary to re-claim funds + // that are suspended within any contracts. + unilateralCloseSummary := &UnilateralCloseSummary{ + SpendDetail: commitSpend, + ChannelCloseSummary: closeSummary, + SelfOutPoint: selfPoint, + SelfOutputSignDesc: selfSignDesc, + MaturityDelay: uint32(lc.remoteChanCfg.CsvDelay), + HtlcResolutions: htlcResolutions, + } + // TODO(roasbeef): send msg before writing to disk // * need to ensure proper fault tolerance in all cases // * get ACK from the consumer of the ntfn before writing to disk? @@ -2118,15 +2151,11 @@ func (lc *LightningChannel) closeObserver(channelCloseNtfn *chainntnfs.SpendEven // commitment transaction broadcast. close(lc.UnilateralCloseSignal) - // We'll also send all the details necessary to re-claim funds - // that are suspended within any contracts. - lc.UnilateralClose <- &UnilateralCloseSummary{ - SpendDetail: commitSpend, - ChannelCloseSummary: closeSummary, - SelfOutPoint: selfPoint, - SelfOutputSignDesc: selfSignDesc, - MaturityDelay: uint32(lc.remoteChanCfg.CsvDelay), - HtlcResolutions: htlcResolutions, + select { + case lc.UnilateralClose <- unilateralCloseSummary: + case <-lc.observerQuit: + walletLog.Errorf("channel shutting down") + return } // If the state number broadcast is lower than the remote node's @@ -2140,10 +2169,15 @@ func (lc *LightningChannel) closeObserver(channelCloseNtfn *chainntnfs.SpendEven "broadcast!!!", lc.channelState.FundingOutpoint, remoteStateNum) + if err := lc.channelState.MarkBorked(true); err != nil { + walletLog.Errorf("unable to mark channel as borked: %v", err) + return + } + // Create a new reach retribution struct which contains all the // data needed to swiftly bring the cheating peer to justice. retribution, err := newBreachRetribution(lc.channelState, - broadcastStateNum, commitTxBroadcast) + broadcastStateNum, commitTxBroadcast, spendHeight) if err != nil { walletLog.Errorf("unable to create breach retribution: %v", err) return @@ -2155,7 +2189,54 @@ func (lc *LightningChannel) closeObserver(channelCloseNtfn *chainntnfs.SpendEven // Finally, send the retribution struct over the contract beach // channel to allow the observer the use the breach retribution // to sweep ALL funds. - lc.ContractBreach <- retribution + select { + case lc.ContractBreach <- retribution: + case <-lc.observerQuit: + walletLog.Errorf("channel shutting down") + return + } + + // Wait for the breach arbiter to ACK the handoff before marking + // the channel as pending force closed in channeldb. + select { + case err := <-retribution.Err: + // Bail if the handoff failed. + if err != nil { + walletLog.Errorf("unable to handoff "+ + "retribution info: %v", err) + return + } + + case <-lc.observerQuit: + walletLog.Errorf("channel shutting down") + return + } + + // At this point, we've successfully received an ack for the + // breach close. We now construct and persist the close + // summary, marking the channel as pending force closed. + settledBalance := lc.channelState.LocalCommitment. + LocalBalance.ToSatoshis() + closeSummary := channeldb.ChannelCloseSummary{ + ChanPoint: lc.channelState.FundingOutpoint, + ChainHash: lc.channelState.ChainHash, + ClosingTXID: *commitSpend.SpenderTxHash, + CloseHeight: spendHeight, + RemotePub: lc.channelState.IdentityPub, + Capacity: lc.Capacity, + SettledBalance: settledBalance, + CloseType: channeldb.BreachClose, + IsPending: true, + } + + err = lc.DeleteState(&closeSummary) + if err != nil { + walletLog.Errorf("unable to delete channel state: %v", err) + return + } + + walletLog.Infof("Breached channel=%v marked pending-closed", + lc.channelState.FundingOutpoint) } } @@ -3190,6 +3271,10 @@ func (lc *LightningChannel) ProcessChanSyncMsg(msg *lnwire.ChannelReestablish) ( // chain reported by the remote party is not equal to our chain tail, // then we cannot sync. case !oweRevocation && localChainTail.height != msg.RemoteCommitTailHeight: + if err := lc.channelState.MarkBorked(true); err != nil { + return nil, err + } + return nil, ErrCannotSyncCommitChains } @@ -3218,6 +3303,10 @@ func (lc *LightningChannel) ProcessChanSyncMsg(msg *lnwire.ChannelReestablish) ( } else if !oweCommitment && remoteChainTip.height+1 != msg.NextLocalCommitHeight { + if err := lc.channelState.MarkBorked(true); err != nil { + return nil, err + } + // If we don't owe them a commitment, yet the tip of their // chain isn't one more than the next local commit height they // report, we'll fail the channel. @@ -5118,3 +5207,14 @@ func (lc *LightningChannel) IsPending() bool { return lc.channelState.IsPending } + +// State provides access to the channel's internal state for testing. +func (lc *LightningChannel) State() *channeldb.OpenChannel { + return lc.channelState +} + +// ObserverQuit returns the quit channel used to coordinate the shutdown of the +// close observer. +func (lc *LightningChannel) ObserverQuit() chan struct{} { + return lc.observerQuit +} From 9703ab9161378673d37c6ee1aae64f5e341f05e9 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Wed, 13 Dec 2017 01:29:36 -0800 Subject: [PATCH 04/13] lnwallet/channel: exposes channelState via State for testing --- lnwallet/channel.go | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/lnwallet/channel.go b/lnwallet/channel.go index f1ab217d..46cd97eb 100644 --- a/lnwallet/channel.go +++ b/lnwallet/channel.go @@ -1208,7 +1208,8 @@ type LightningChannel struct { sync.RWMutex - wg sync.WaitGroup + cowg sync.WaitGroup + wg sync.WaitGroup shutdown int32 quit chan struct{} @@ -1342,6 +1343,7 @@ func NewLightningChannel(signer Signer, events chainntnfs.ChainNotifier, // Launch the close observer which will vigilantly watch the // network for any broadcasts the current or prior commitment // transactions, taking action accordingly. + lc.cowg.Add(1) go lc.closeObserver(channelCloseNtfn) } @@ -1376,6 +1378,11 @@ func (lc *LightningChannel) CancelObserver() { close(lc.observerQuit) } +// WaitForClose blocks until the channel's close observer has terminated. +func (lc *LightningChannel) WaitForClose() { + lc.cowg.Wait() +} + // ResetState resets the state of the channel back to the default state. This // ensures that any active goroutines which need to act based on on-chain // events do so properly. @@ -1962,6 +1969,8 @@ func newBreachRetribution(chanState *channeldb.OpenChannel, stateNum uint64, // // NOTE: This MUST be run as a goroutine. func (lc *LightningChannel) closeObserver(channelCloseNtfn *chainntnfs.SpendEvent) { + defer lc.cowg.Done() + walletLog.Infof("Close observer for ChannelPoint(%v) active", lc.channelState.FundingOutpoint) @@ -2169,8 +2178,9 @@ func (lc *LightningChannel) closeObserver(channelCloseNtfn *chainntnfs.SpendEven "broadcast!!!", lc.channelState.FundingOutpoint, remoteStateNum) - if err := lc.channelState.MarkBorked(true); err != nil { - walletLog.Errorf("unable to mark channel as borked: %v", err) + if err := lc.channelState.MarkBorked(); err != nil { + walletLog.Errorf("unable to mark channel as borked: %v", + err) return } @@ -2231,7 +2241,8 @@ func (lc *LightningChannel) closeObserver(channelCloseNtfn *chainntnfs.SpendEven err = lc.DeleteState(&closeSummary) if err != nil { - walletLog.Errorf("unable to delete channel state: %v", err) + walletLog.Errorf("unable to delete channel state: %v", + err) return } @@ -3271,7 +3282,7 @@ func (lc *LightningChannel) ProcessChanSyncMsg(msg *lnwire.ChannelReestablish) ( // chain reported by the remote party is not equal to our chain tail, // then we cannot sync. case !oweRevocation && localChainTail.height != msg.RemoteCommitTailHeight: - if err := lc.channelState.MarkBorked(true); err != nil { + if err := lc.channelState.MarkBorked(); err != nil { return nil, err } @@ -3303,7 +3314,7 @@ func (lc *LightningChannel) ProcessChanSyncMsg(msg *lnwire.ChannelReestablish) ( } else if !oweCommitment && remoteChainTip.height+1 != msg.NextLocalCommitHeight { - if err := lc.channelState.MarkBorked(true); err != nil { + if err := lc.channelState.MarkBorked(); err != nil { return nil, err } From 20f4c61c8b6df7f483594b4ef93b2f37a626782e Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Wed, 13 Dec 2017 01:30:12 -0800 Subject: [PATCH 05/13] lnwallet/channel_test: adds TestBreachClose --- lnwallet/channel_test.go | 187 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 181 insertions(+), 6 deletions(-) diff --git a/lnwallet/channel_test.go b/lnwallet/channel_test.go index 3787f160..9ad4a4ca 100644 --- a/lnwallet/channel_test.go +++ b/lnwallet/channel_test.go @@ -3,13 +3,16 @@ package lnwallet import ( "bytes" "crypto/sha256" + "errors" "fmt" "io/ioutil" "math/rand" "os" "reflect" "runtime" + "sync" "testing" + "time" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/chainntnfs" @@ -79,7 +82,7 @@ func (m *mockSigner) SignOutputRaw(tx *wire.MsgTx, signDesc *SignDescriptor) ([] } sig, err := txscript.RawTxInWitnessSignature(tx, signDesc.SigHashes, - signDesc.InputIndex, amt, witnessScript, txscript.SigHashAll, + signDesc.InputIndex, amt, witnessScript, signDesc.HashType, privKey) if err != nil { return nil, err @@ -105,7 +108,7 @@ func (m *mockSigner) ComputeInputScript(tx *wire.MsgTx, signDesc *SignDescriptor witnessScript, err := txscript.WitnessSignature(tx, signDesc.SigHashes, signDesc.InputIndex, signDesc.Output.Value, signDesc.Output.PkScript, - txscript.SigHashAll, privKey, true) + signDesc.HashType, privKey, true) if err != nil { return nil, err } @@ -121,6 +124,7 @@ type mockNotfier struct { func (m *mockNotfier) RegisterConfirmationsNtfn(txid *chainhash.Hash, numConfs, heightHint uint32) (*chainntnfs.ConfirmationEvent, error) { return nil, nil } + func (m *mockNotfier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { return nil, nil } @@ -132,6 +136,7 @@ func (m *mockNotfier) Start() error { func (m *mockNotfier) Stop() error { return nil } + func (m *mockNotfier) RegisterSpendNtfn(outpoint *wire.OutPoint, heightHint uint32) (*chainntnfs.SpendEvent, error) { return &chainntnfs.SpendEvent{ Spend: make(chan *chainntnfs.SpendDetail), @@ -140,6 +145,51 @@ func (m *mockNotfier) RegisterSpendNtfn(outpoint *wire.OutPoint, heightHint uint }, nil } +// mockSpendNotifier extends the mockNotifier so that spend notifications can be +// triggered and delivered to subscribers. +type mockSpendNotifier struct { + *mockNotfier + spendMap map[wire.OutPoint][]chan *chainntnfs.SpendDetail +} + +func makeMockSpendNotifier() *mockSpendNotifier { + return &mockSpendNotifier{ + spendMap: make(map[wire.OutPoint][]chan *chainntnfs.SpendDetail), + } +} + +func (m *mockSpendNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, + heightHint uint32) (*chainntnfs.SpendEvent, error) { + + spendChan := make(chan *chainntnfs.SpendDetail, 1) + m.spendMap[*outpoint] = append(m.spendMap[*outpoint], spendChan) + return &chainntnfs.SpendEvent{ + Spend: spendChan, + Cancel: func() { + }, + }, nil +} + +// Spend dispatches SpendDetails to all subscribers of the outpoint. The details +// will include the transaction and height provided by the caller. +func (m *mockSpendNotifier) Spend(outpoint *wire.OutPoint, height int32, + txn *wire.MsgTx) { + + if spendChans, ok := m.spendMap[*outpoint]; ok { + delete(m.spendMap, *outpoint) + for _, spendChan := range spendChans { + txnHash := txn.TxHash() + spendChan <- &chainntnfs.SpendDetail{ + SpentOutPoint: outpoint, + SpendingHeight: height, + SpendingTx: txn, + SpenderTxHash: &txnHash, + SpenderInputIndex: outpoint.Index, + } + } + } +} + // initRevocationWindows simulates a new channel being opened within the p2p // network by populating the initial revocation windows of the passed // commitment state machines. @@ -204,9 +254,36 @@ func forceStateTransition(chanA, chanB *LightningChannel) error { return nil } -// createTestChannels creates two test channels funded with 10 BTC, with 5 BTC +// createSpendableTestChannels initializes a pair of channels using a +// mockSpendNotifier. This allows us to test the behavior of the closeObserver, +// which is activated when the funding transaction is spent. +func createSpendableTestChannels(revocationWindow int) (*LightningChannel, + *LightningChannel, *mockSpendNotifier, func(), error) { + + notifier := makeMockSpendNotifier() + alice, bob, cleanup, err := createTestChannelsWithNotifier( + revocationWindow, notifier, + ) + + return alice, bob, notifier, cleanup, err +} + +// createTestChannels initializes a pair of channels using a mock notifier. +func createTestChannels(revocationWindow int) (*LightningChannel, + *LightningChannel, func(), error) { + + notifier := &mockNotfier{} + + return createTestChannelsWithNotifier(revocationWindow, notifier) +} + +// createTestChannelsWithNotifier creates two test lightning channels using the +// provided notifier. The channel itself is funded with 10 BTC, with 5 BTC // allocated to each side. Within the channel, Alice is the initiator. -func createTestChannels(revocationWindow int) (*LightningChannel, *LightningChannel, func(), error) { +func createTestChannelsWithNotifier(revocationWindow int, + notifier chainntnfs.ChainNotifier) (*LightningChannel, + *LightningChannel, func(), error) { + aliceKeyPriv, aliceKeyPub := btcec.PrivKeyFromBytes(btcec.S256(), testWalletPrivKey) bobKeyPriv, bobKeyPub := btcec.PrivKeyFromBytes(btcec.S256(), @@ -353,8 +430,6 @@ func createTestChannels(revocationWindow int) (*LightningChannel, *LightningChan aliceSigner := &mockSigner{aliceKeyPriv} bobSigner := &mockSigner{bobKeyPriv} - notifier := &mockNotfier{} - channelAlice, err := NewLightningChannel(aliceSigner, notifier, estimator, aliceChannelState) if err != nil { @@ -1151,6 +1226,106 @@ func TestForceCloseDustOutput(t *testing.T) { } } +// TestBreachClose checks that the resulting ForceCloseSummary is correct when a +// peer is ForceClosing the channel. Will check outputs both above and below +// the dust limit. +func TestBreachClose(t *testing.T) { + t.Parallel() + + // Create a test channel which will be used for the duration of this + // unittest. The channel will be funded evenly with Alice having 5 BTC, + // and Bob having 5 BTC. + aliceChannel, bobChannel, notifier, cleanUp, err := + createSpendableTestChannels(1) + if err != nil { + t.Fatalf("unable to create test channels: %v", err) + } + defer cleanUp() + + // Send one HTLC from Alice to Bob, and advance the state of both + // channels. + htlcAmount := lnwire.NewMSatFromSatoshis(20000) + htlc, _ := createHTLC(0, htlcAmount) + if _, err := aliceChannel.AddHTLC(htlc); err != nil { + t.Fatalf("alice unable to add htlc: %v", err) + } + if _, err := bobChannel.ReceiveHTLC(htlc); err != nil { + t.Fatalf("bob unable to recv add htlc: %v", err) + } + if err := forceStateTransition(aliceChannel, bobChannel); err != nil { + t.Fatalf("Can't update the channel state: %v", err) + } + + // Construct a force close summary of Bob's channel, this includes the + // breach transaction that will be used to spend the funding point. + forceCloseSummary, err := bobChannel.ForceClose() + if err != nil { + t.Fatalf("unable to force close bob's channel: %v", err) + } + + // Send another HTLC and advance the state of both channels again. This + // ensures that Alice's state will be ahead of the breach transaction + // generated above. + htlc2, _ := createHTLC(1, htlcAmount) + if _, err := aliceChannel.AddHTLC(htlc2); err != nil { + t.Fatalf("alice unable to add htlc: %v", err) + } + if _, err := bobChannel.ReceiveHTLC(htlc2); err != nil { + t.Fatalf("bob unable to recv add htlc: %v", err) + } + if err := forceStateTransition(aliceChannel, bobChannel); err != nil { + t.Fatalf("Can't update the channel state: %v", err) + } + + chanPoint := aliceChannel.ChanPoint + breachTxn := forceCloseSummary.CloseTx + + // Spend the funding point using the breach transaction. + notifier.Spend(chanPoint, 100, breachTxn) + + // Set up a separate routine to monitor alice's channel for a response + // to the spend. We use a generous timeout to ensure the test doesn't + // stall indefinitely, but allows us to block the main routine until the + // close observer exits. + errChan := make(chan error, 1) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + + select { + case ret := <-aliceChannel.ContractBreach: + errChan <- nil + // Acknowledge a successful processing of the + // retribution information. + ret.Err <- nil + case <-aliceChannel.UnilateralClose: + errChan <- errors.New("expected breach close to " + + "be signaled, not unilateral") + case <-time.After(60 * time.Second): + errChan <- errors.New("breach was not signaled") + } + }() + + // Wait for both the close observer to exit and our background process + // to exit before attempting to read from the error channel. + aliceChannel.WaitForClose() + wg.Wait() + + // Now that all tasks have been shutdown, handle the result. The result + // should be available immediately, we allow five seconds to handle any + // variance in scheduling on travis. + select { + case err := <-errChan: + if err != nil { + t.Fatalf(err.Error()) + } + + case <-time.After(5 * time.Second): + t.Fatalf("breach was not received") + } +} + // TestDustHTLCFees checks that fees are calculated correctly when HTLCs fall // below the nodes' dust limit. In these cases, the amount of the dust HTLCs // should be applied to the commitment transaction fee. From 1d69526874de3c3f3a919792166ec6ea1c33e7b3 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Thu, 30 Nov 2017 18:59:32 -0800 Subject: [PATCH 06/13] channeldb/channel: adds IsBorked to OpenChannel --- channeldb/channel.go | 65 ++++++++++++++++++++++++++++++++++++-------- 1 file changed, 53 insertions(+), 12 deletions(-) diff --git a/channeldb/channel.go b/channeldb/channel.go index 6c3a362a..9c938cd1 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -305,6 +305,11 @@ type OpenChannel struct { // negotiate fees, or close the channel. IsInitiator bool + // IsBorked indicates that the channel has entered an irreconcilable + // state, triggered by a state desynchronization or channel breach. + // Channels in this state should never be added to the htlc switch. + IsBorked bool + // FundingBroadcastHeight is the height in which the funding // transaction was broadcast. This value can be used by higher level // sub-systems to determine if a channel is stale and/or should have @@ -522,6 +527,37 @@ func (c *OpenChannel) MarkAsOpen(openLoc lnwire.ShortChannelID) error { }) } +// MarkBorked marks the event when the channel as reached an irreconcilable +// state, such as a channel breach or state desynchronization. Borked channels +// should never be added to the switch. +func (c *OpenChannel) MarkBorked() error { + c.Lock() + defer c.Unlock() + + if err := c.Db.Update(func(tx *bolt.Tx) error { + chanBucket, err := updateChanBucket(tx, c.IdentityPub, + &c.FundingOutpoint, c.ChainHash) + if err != nil { + return err + } + + channel, err := fetchOpenChannel(chanBucket, &c.FundingOutpoint) + if err != nil { + return err + } + + channel.IsBorked = true + + return putOpenChannel(chanBucket, channel) + }); err != nil { + return err + } + + c.IsBorked = true + + return nil +} + // putChannel serializes, and stores the current state of the channel in its // entirety. func putOpenChannel(chanBucket *bolt.Bucket, channel *OpenChannel) error { @@ -1217,6 +1253,9 @@ type ChannelCloseSummary struct { // Capacity was the total capacity of the channel. Capacity btcutil.Amount + // CloseHeight is the height at which the funding transaction was spent. + CloseHeight uint32 + // SettledBalance is our total balance settled balance at the time of // channel closure. This _does not_ include the sum of any outputs that // have been time-locked as a result of the unilateral channel closure. @@ -1402,9 +1441,9 @@ func putChannelCloseSummary(tx *bolt.Tx, chanID []byte, func serializeChannelCloseSummary(w io.Writer, cs *ChannelCloseSummary) error { return writeElements(w, - cs.ChanPoint, cs.ChainHash, cs.ClosingTXID, cs.RemotePub, cs.Capacity, - cs.SettledBalance, cs.TimeLockedBalance, cs.CloseType, - cs.IsPending, + cs.ChanPoint, cs.ChainHash, cs.ClosingTXID, cs.CloseHeight, + cs.RemotePub, cs.Capacity, cs.SettledBalance, + cs.TimeLockedBalance, cs.CloseType, cs.IsPending, ) } @@ -1429,9 +1468,9 @@ func deserializeCloseChannelSummary(r io.Reader) (*ChannelCloseSummary, error) { c := &ChannelCloseSummary{} err := readElements(r, - &c.ChanPoint, &c.ChainHash, &c.ClosingTXID, &c.RemotePub, &c.Capacity, - &c.SettledBalance, &c.TimeLockedBalance, &c.CloseType, - &c.IsPending, + &c.ChanPoint, &c.ChainHash, &c.ClosingTXID, &c.CloseHeight, + &c.RemotePub, &c.Capacity, &c.SettledBalance, + &c.TimeLockedBalance, &c.CloseType, &c.IsPending, ) if err != nil { return nil, err @@ -1445,9 +1484,10 @@ func putChanInfo(chanBucket *bolt.Bucket, channel *OpenChannel) error { if err := writeElements(&w, channel.ChanType, channel.ChainHash, channel.FundingOutpoint, channel.ShortChanID, channel.IsPending, channel.IsInitiator, - channel.FundingBroadcastHeight, channel.NumConfsRequired, - channel.ChannelFlags, channel.IdentityPub, channel.Capacity, - channel.TotalMSatSent, channel.TotalMSatReceived, + channel.IsBorked, channel.FundingBroadcastHeight, + channel.NumConfsRequired, channel.ChannelFlags, + channel.IdentityPub, channel.Capacity, channel.TotalMSatSent, + channel.TotalMSatReceived, ); err != nil { return err } @@ -1545,9 +1585,10 @@ func fetchChanInfo(chanBucket *bolt.Bucket, channel *OpenChannel) error { if err := readElements(r, &channel.ChanType, &channel.ChainHash, &channel.FundingOutpoint, &channel.ShortChanID, &channel.IsPending, &channel.IsInitiator, - &channel.FundingBroadcastHeight, &channel.NumConfsRequired, - &channel.ChannelFlags, &channel.IdentityPub, &channel.Capacity, - &channel.TotalMSatSent, &channel.TotalMSatReceived, + &channel.IsBorked, &channel.FundingBroadcastHeight, + &channel.NumConfsRequired, &channel.ChannelFlags, + &channel.IdentityPub, &channel.Capacity, &channel.TotalMSatSent, + &channel.TotalMSatReceived, ); err != nil { return err } From 95b788e9a602198090bec4e6ef87e86f6ba1339c Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Wed, 13 Dec 2017 01:28:58 -0800 Subject: [PATCH 07/13] channeldb/db: exposes Path method, useful for testing --- channeldb/db.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/channeldb/db.go b/channeldb/db.go index 74437289..26d14fd2 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -88,6 +88,11 @@ func Open(dbPath string) (*DB, error) { return chanDB, nil } +// Path returns the file path to the channel database. +func (d *DB) Path() string { + return d.dbPath +} + // Wipe completely deletes all saved state within all used buckets within the // database. The deletion is done in a single transaction, therefore this // operation is fully atomic. From 20738885efd0ae1e4c3fae0fc01d516ec792813b Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Wed, 13 Dec 2017 01:30:57 -0800 Subject: [PATCH 08/13] mock: adds mockSpendNotifier and updates mockSigner --- mock.go | 82 +++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 80 insertions(+), 2 deletions(-) diff --git a/mock.go b/mock.go index 4b437fe7..3a846f6a 100644 --- a/mock.go +++ b/mock.go @@ -1,6 +1,8 @@ package main import ( + "fmt" + "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/lnwallet" "github.com/roasbeef/btcd/btcec" @@ -24,6 +26,19 @@ func (m *mockSigner) SignOutputRaw(tx *wire.MsgTx, witnessScript := signDesc.WitnessScript privKey := m.key + if !privKey.PubKey().IsEqual(signDesc.PubKey) { + return nil, fmt.Errorf("incorrect key passed") + } + + switch { + case signDesc.SingleTweak != nil: + privKey = lnwallet.TweakPrivKey(privKey, + signDesc.SingleTweak) + case signDesc.DoubleTweak != nil: + privKey = lnwallet.DeriveRevocationPrivKey(privKey, + signDesc.DoubleTweak) + } + sig, err := txscript.RawTxInWitnessSignature(tx, signDesc.SigHashes, signDesc.InputIndex, amt, witnessScript, signDesc.HashType, privKey) @@ -36,9 +51,24 @@ func (m *mockSigner) SignOutputRaw(tx *wire.MsgTx, func (m *mockSigner) ComputeInputScript(tx *wire.MsgTx, signDesc *lnwallet.SignDescriptor) (*lnwallet.InputScript, error) { + + // TODO(roasbeef): expose tweaked signer from lnwallet so don't need to + // duplicate this code? + + privKey := m.key + + switch { + case signDesc.SingleTweak != nil: + privKey = lnwallet.TweakPrivKey(privKey, + signDesc.SingleTweak) + case signDesc.DoubleTweak != nil: + privKey = lnwallet.DeriveRevocationPrivKey(privKey, + signDesc.DoubleTweak) + } + witnessScript, err := txscript.WitnessSignature(tx, signDesc.SigHashes, - signDesc.InputIndex, signDesc.Output.Value, - signDesc.Output.PkScript, signDesc.HashType, m.key, true) + signDesc.InputIndex, signDesc.Output.Value, signDesc.Output.PkScript, + signDesc.HashType, privKey, true) if err != nil { return nil, err } @@ -78,6 +108,54 @@ func (m *mockNotfier) RegisterSpendNtfn(outpoint *wire.OutPoint, }, nil } +// mockSpendNotifier extends the mockNotifier so that spend notifications can be +// triggered and delivered to subscribers. +type mockSpendNotifier struct { + *mockNotfier + spendMap map[wire.OutPoint][]chan *chainntnfs.SpendDetail +} + +func makeMockSpendNotifier() *mockSpendNotifier { + return &mockSpendNotifier{ + mockNotfier: &mockNotfier{ + confChannel: make(chan *chainntnfs.TxConfirmation), + }, + spendMap: make(map[wire.OutPoint][]chan *chainntnfs.SpendDetail), + } +} + +func (m *mockSpendNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, + heightHint uint32) (*chainntnfs.SpendEvent, error) { + + spendChan := make(chan *chainntnfs.SpendDetail) + m.spendMap[*outpoint] = append(m.spendMap[*outpoint], spendChan) + return &chainntnfs.SpendEvent{ + Spend: spendChan, + Cancel: func() { + }, + }, nil +} + +// Spend dispatches SpendDetails to all subscribers of the outpoint. The details +// will include the transaction and height provided by the caller. +func (m *mockSpendNotifier) Spend(outpoint *wire.OutPoint, height int32, + txn *wire.MsgTx) { + + if spendChans, ok := m.spendMap[*outpoint]; ok { + delete(m.spendMap, *outpoint) + for _, spendChan := range spendChans { + txnHash := txn.TxHash() + spendChan <- &chainntnfs.SpendDetail{ + SpentOutPoint: outpoint, + SpendingHeight: height, + SpendingTx: txn, + SpenderTxHash: &txnHash, + SpenderInputIndex: outpoint.Index, + } + } + } +} + type mockChainIO struct{} func (*mockChainIO) GetBestBlock() (*chainhash.Hash, int32, error) { From e86900b412f7ce16448dbc122c248d292327c31c Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Wed, 13 Dec 2017 01:31:39 -0800 Subject: [PATCH 09/13] server: remove ChainIO dep from BreachConfig --- server.go | 1 - 1 file changed, 1 deletion(-) diff --git a/server.go b/server.go index 1239a4b9..957ae18e 100644 --- a/server.go +++ b/server.go @@ -344,7 +344,6 @@ func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl, } s.breachArbiter = newBreachArbiter(&BreachConfig{ - ChainIO: s.cc.chainIO, CloseLink: closeLink, DB: chanDB, Estimator: s.cc.feeEstimator, From 21c257f1062026bfe00cce1183487553422f9904 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Wed, 13 Dec 2017 01:55:09 -0800 Subject: [PATCH 10/13] fundingmanager_test: remove race condition in fmgr log --- fundingmanager_test.go | 20 +------------------- 1 file changed, 1 insertion(+), 19 deletions(-) diff --git a/fundingmanager_test.go b/fundingmanager_test.go index 02698fef..692bc8a9 100644 --- a/fundingmanager_test.go +++ b/fundingmanager_test.go @@ -132,7 +132,7 @@ type testNode struct { shutdownChannel chan struct{} } -func disableFndgLogger(t *testing.T) { +func init() { channeldb.UseLogger(btclog.Disabled) lnwallet.UseLogger(btclog.Disabled) fndgLog = btclog.Disabled @@ -802,8 +802,6 @@ func assertHandleFundingLocked(t *testing.T, alice, bob *testNode) { } func TestFundingManagerNormalWorkflow(t *testing.T) { - disableFndgLogger(t) - alice, bob := setupFundingManagers(t) defer tearDownFundingManagers(t, alice, bob) @@ -866,8 +864,6 @@ func TestFundingManagerNormalWorkflow(t *testing.T) { } func TestFundingManagerRestartBehavior(t *testing.T) { - disableFndgLogger(t) - alice, bob := setupFundingManagers(t) defer tearDownFundingManagers(t, alice, bob) @@ -986,8 +982,6 @@ func TestFundingManagerRestartBehavior(t *testing.T) { // server to notify when the peer comes online, in case sending the // fundingLocked message fails the first time. func TestFundingManagerOfflinePeer(t *testing.T) { - disableFndgLogger(t) - alice, bob := setupFundingManagers(t) defer tearDownFundingManagers(t, alice, bob) @@ -1114,8 +1108,6 @@ func TestFundingManagerOfflinePeer(t *testing.T) { } func TestFundingManagerFundingTimeout(t *testing.T) { - disableFndgLogger(t) - alice, bob := setupFundingManagers(t) defer tearDownFundingManagers(t, alice, bob) @@ -1158,8 +1150,6 @@ func TestFundingManagerFundingTimeout(t *testing.T) { // continues to operate as expected in case we receive a duplicate fundingLocked // message. func TestFundingManagerReceiveFundingLockedTwice(t *testing.T) { - disableFndgLogger(t) - alice, bob := setupFundingManagers(t) defer tearDownFundingManagers(t, alice, bob) @@ -1257,8 +1247,6 @@ func TestFundingManagerReceiveFundingLockedTwice(t *testing.T) { // handles receiving a fundingLocked after the its own fundingLocked and channel // announcement is sent and gets restarted. func TestFundingManagerRestartAfterChanAnn(t *testing.T) { - disableFndgLogger(t) - alice, bob := setupFundingManagers(t) defer tearDownFundingManagers(t, alice, bob) @@ -1329,8 +1317,6 @@ func TestFundingManagerRestartAfterChanAnn(t *testing.T) { // fundingManager continues to operate as expected after it has received // fundingLocked and then gets restarted. func TestFundingManagerRestartAfterReceivingFundingLocked(t *testing.T) { - disableFndgLogger(t) - alice, bob := setupFundingManagers(t) defer tearDownFundingManagers(t, alice, bob) @@ -1397,8 +1383,6 @@ func TestFundingManagerRestartAfterReceivingFundingLocked(t *testing.T) { // (a channel not supposed to be announced to the rest of the network), // the announcementSignatures nor the nodeAnnouncement messages are sent. func TestFundingManagerPrivateChannel(t *testing.T) { - disableFndgLogger(t) - alice, bob := setupFundingManagers(t) defer tearDownFundingManagers(t, alice, bob) @@ -1475,8 +1459,6 @@ func TestFundingManagerPrivateChannel(t *testing.T) { // announcement signatures nor the node announcement messages are sent upon // restart. func TestFundingManagerPrivateRestart(t *testing.T) { - disableFndgLogger(t) - alice, bob := setupFundingManagers(t) defer tearDownFundingManagers(t, alice, bob) From d6998ed30618ca376c75ecadb9b9af1f5e8639d2 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Wed, 13 Dec 2017 02:08:22 -0800 Subject: [PATCH 11/13] peer_test: move log disable to init() --- peer_test.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/peer_test.go b/peer_test.go index 0e71a3a0..ff6eb70a 100644 --- a/peer_test.go +++ b/peer_test.go @@ -17,7 +17,7 @@ import ( "github.com/roasbeef/btcutil" ) -func disablePeerLogger(t *testing.T) { +func init() { peerLog = btclog.Disabled srvrLog = btclog.Disabled lnwallet.UseLogger(btclog.Disabled) @@ -28,7 +28,6 @@ func disablePeerLogger(t *testing.T) { // TestPeerChannelClosureAcceptFeeResponder tests the shutdown responder's // behavior if we can agree on the fee immediately. func TestPeerChannelClosureAcceptFeeResponder(t *testing.T) { - disablePeerLogger(t) t.Parallel() notifier := &mockNotfier{ @@ -118,7 +117,6 @@ func TestPeerChannelClosureAcceptFeeResponder(t *testing.T) { // TestPeerChannelClosureAcceptFeeInitiator tests the shutdown initiator's // behavior if we can agree on the fee immediately. func TestPeerChannelClosureAcceptFeeInitiator(t *testing.T) { - disablePeerLogger(t) t.Parallel() notifier := &mockNotfier{ @@ -228,7 +226,6 @@ func TestPeerChannelClosureAcceptFeeInitiator(t *testing.T) { // responder's behavior in the case where we must do several rounds of fee // negotiation before we agree on a fee. func TestPeerChannelClosureFeeNegotiationsResponder(t *testing.T) { - disablePeerLogger(t) t.Parallel() notifier := &mockNotfier{ @@ -409,7 +406,6 @@ func TestPeerChannelClosureFeeNegotiationsResponder(t *testing.T) { // initiator's behavior in the case where we must do several rounds of fee // negotiation before we agree on a fee. func TestPeerChannelClosureFeeNegotiationsInitiator(t *testing.T) { - disablePeerLogger(t) t.Parallel() notifier := &mockNotfier{ From 978c0dc512eb3b6c62cd9e437e3e5348aa4f35b8 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Thu, 14 Dec 2017 13:56:10 -0800 Subject: [PATCH 12/13] lnd_test: adds polling num channels assertion in breach itests --- lnd_test.go | 60 ++++++++++++++++++++++++----------------------------- 1 file changed, 27 insertions(+), 33 deletions(-) diff --git a/lnd_test.go b/lnd_test.go index ef84661e..e649ef47 100644 --- a/lnd_test.go +++ b/lnd_test.go @@ -2768,18 +2768,7 @@ func testRevokedCloseRetribution(net *lntest.NetworkHarness, t *harnessTest) { t.Fatalf("justice tx wasn't mined") } - // Finally, obtain Alice's channel state, she shouldn't report any - // channel as she just successfully brought Bob to justice by sweeping - // all the channel funds. - req := &lnrpc.ListChannelsRequest{} - aliceChanInfo, err := net.Alice.ListChannels(ctxb, req) - if err != nil { - t.Fatalf("unable to query for alice's channels: %v", err) - } - if len(aliceChanInfo.Channels) != 0 { - t.Fatalf("alice shouldn't have a channel: %v", - spew.Sdump(aliceChanInfo.Channels)) - } + assertNumChannels(t, ctxb, net.Alice, 0) } // testRevokedCloseRetributionZeroValueRemoteOutput tests that Alice is able @@ -2997,18 +2986,7 @@ func testRevokedCloseRetributionZeroValueRemoteOutput(net *lntest.NetworkHarness t.Fatalf("justice tx wasn't mined") } - // Finally, obtain Alice's channel state, she shouldn't report any - // channel as she just successfully brought Carol to justice by sweeping - // all the channel funds. - req := &lnrpc.ListChannelsRequest{} - aliceChanInfo, err := net.Alice.ListChannels(ctxb, req) - if err != nil { - t.Fatalf("unable to query for alice's channels: %v", err) - } - if len(aliceChanInfo.Channels) != 0 { - t.Fatalf("alice shouldn't have a channel: %v", - spew.Sdump(aliceChanInfo.Channels)) - } + assertNumChannels(t, ctxb, net.Alice, 0) } // testRevokedCloseRetributionRemoteHodl tests that Alice properly responds to a @@ -3295,17 +3273,33 @@ func testRevokedCloseRetributionRemoteHodl(net *lntest.NetworkHarness, t.Fatalf("justice tx wasn't mined") } - // Finally, obtain Alice's channel state, she shouldn't report any - // channel as she just successfully brought Carol to justice by sweeping - // all the channel funds. + assertNumChannels(t, ctxb, net.Alice, 0) +} + +// assertNumChannels polls the provided node's list channels rpc until it +// reaches the desired number of total channels. +func assertNumChannels(t *harnessTest, ctxb context.Context, + node *lntest.HarnessNode, numChannels int) { + + // Poll alice for her list of channels. req := &lnrpc.ListChannelsRequest{} - aliceChanInfo, err := net.Alice.ListChannels(ctxb, req) - if err != nil { - t.Fatalf("unable to query for alice's channels: %v", err) + + var predErr error + pred := func() bool { + chanInfo, err := node.ListChannels(ctxb, req) + if err != nil { + predErr = fmt.Errorf("unable to query for alice's "+ + "channels: %v", err) + return false + } + + // Return true if the query returned the expected number of + // channels. + return len(chanInfo.Channels) == numChannels } - if len(aliceChanInfo.Channels) != 0 { - t.Fatalf("alice shouldn't have a channel: %v", - spew.Sdump(aliceChanInfo.Channels)) + + if err := lntest.WaitPredicate(pred, time.Second*15); err != nil { + t.Fatalf("node has incorrect number of channels: %v", predErr) } } From 44805be8d95d59dc3b52a23d66b8f20e7127d5ad Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Sun, 17 Dec 2017 21:45:35 -0800 Subject: [PATCH 13/13] peer: filter borked channels when loading active chans --- peer.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/peer.go b/peer.go index 33cee2ae..37f87e75 100644 --- a/peer.go +++ b/peer.go @@ -318,6 +318,12 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { return fmt.Errorf("peer shutting down") } + // Skip adding any permanently irreconcilable channels to the + // htlcswitch. + if dbChan.IsBorked { + continue + } + blockEpoch, err := p.server.cc.chainNotifier.RegisterBlockEpochNtfn() if err != nil { return err