diff --git a/breacharbiter.go b/breacharbiter.go index 25a190d3..5af66415 100644 --- a/breacharbiter.go +++ b/breacharbiter.go @@ -34,6 +34,10 @@ var ( // breached contracts. Entries are added to the justice txn bucket just // before broadcasting the sweep txn. justiceTxnBucket = []byte("justice-txn") + + // errBrarShuttingDown is an error returned if the breacharbiter has + // been signalled to exit. + errBrarShuttingDown = errors.New("breacharbiter shutting down") ) // ContractBreachEvent is an event the breachArbiter will receive in case a @@ -302,6 +306,147 @@ func convertToSecondLevelRevoke(bo *breachedOutput, breachInfo *retributionInfo, bo.outpoint) } +// waitForSpendEvent waits for any of the breached outputs to get spent, and +// mutates the breachInfo to be able to sweep it. This method should be used +// when we fail to publish the justice tx because of a double spend, indicating +// that the counter party has taken one of the breached outputs to the second +// level. The spendNtfns map is a cache used to store registered spend +// subscriptions, in case we must call this method multiple times. +func (b *breachArbiter) waitForSpendEvent(breachInfo *retributionInfo, + spendNtfns map[wire.OutPoint]*chainntnfs.SpendEvent) error { + + // spend is used to wrap the index of the output that gets spent + // together with the spend details. + type spend struct { + index int + detail *chainntnfs.SpendDetail + } + + // We create a channel the first goroutine that gets a spend event can + // signal. We make it buffered in case multiple spend events come in at + // the same time. + anySpend := make(chan struct{}, len(breachInfo.breachedOutputs)) + + // The allSpends channel will be used to pass spend events from all the + // goroutines that detects a spend before they are signalled to exit. + allSpends := make(chan spend, len(breachInfo.breachedOutputs)) + + // exit will be used to signal the goroutines that they can exit. + exit := make(chan struct{}) + var wg sync.WaitGroup + + // We'll now launch a goroutine for each of the HTLC outputs, that will + // signal the moment they detect a spend event. + for i := 0; i < len(breachInfo.breachedOutputs); i++ { + breachedOutput := &breachInfo.breachedOutputs[i] + + // If this isn't an HTLC output, then we can skip it. + if breachedOutput.witnessType != lnwallet.HtlcAcceptedRevoke && + breachedOutput.witnessType != lnwallet.HtlcOfferedRevoke { + continue + } + + brarLog.Debugf("Checking for second-level attempt on HTLC(%v) "+ + "for ChannelPoint(%v)", breachedOutput.outpoint, + breachInfo.chanPoint) + + // If we have already registered for a notification for this + // output, we'll reuse it. + spendNtfn, ok := spendNtfns[breachedOutput.outpoint] + if !ok { + var err error + spendNtfn, err = b.cfg.Notifier.RegisterSpendNtfn( + &breachedOutput.outpoint, + breachInfo.breachHeight, true, + ) + if err != nil { + brarLog.Errorf("unable to check for spentness "+ + "of out_point=%v: %v", + breachedOutput.outpoint, err) + + // Registration may have failed if we've been + // instructed to shutdown. If so, return here + // to avoid entering an infinite loop. + select { + case <-b.quit: + return errBrarShuttingDown + default: + continue + } + } + spendNtfns[breachedOutput.outpoint] = spendNtfn + } + + // Launch a goroutine waiting for a spend event. + b.wg.Add(1) + wg.Add(1) + go func(index int, spendEv *chainntnfs.SpendEvent) { + defer b.wg.Done() + defer wg.Done() + + select { + // The output has been taken to the second level! + case sp, ok := <-spendEv.Spend: + if !ok { + return + } + brarLog.Debugf("Detected spend of HTLC(%v) "+ + "for ChannelPoint(%v)", + breachedOutput.outpoint, + breachInfo.chanPoint) + + // First we send the spend event on the + // allSpends channel, such that it can be + // handled after all go routines have exited. + allSpends <- spend{index, sp} + + // Finally we'll signal the anySpend channel + // that a spend was detected, such that the + // other goroutines can be shut down. + anySpend <- struct{}{} + case <-exit: + return + case <-b.quit: + return + } + }(i, spendNtfn) + } + + // We'll wait for any of the outputs to be spent, or that we are + // signalled to exit. + select { + // A goroutine have signalled that a spend occured. + case <-anySpend: + // Signal for the remaining goroutines to exit. + close(exit) + wg.Wait() + + // At this point all goroutines that can send on the allSpends + // channel have exited. We can therefore safely close the + // channel before ranging over its content. + close(allSpends) + for s := range allSpends { + breachedOutput := &breachInfo.breachedOutputs[s.index] + brarLog.Debugf("Detected second-level spend on "+ + "HTLC(%v) for ChannelPoint(%v)", + breachedOutput.outpoint, breachInfo.chanPoint) + + delete(spendNtfns, breachedOutput.outpoint) + + // In this case we'll morph our initial revoke spend to + // instead point to the second level output, and update + // the sign descriptor in the process. + convertToSecondLevelRevoke( + breachedOutput, breachInfo, s.detail, + ) + } + case <-b.quit: + return errBrarShuttingDown + } + + return nil +} + // exactRetribution is a goroutine which is executed once a contract breach has // been detected by a breachObserver. This function is responsible for // punishing a counterparty for violating the channel contract by sweeping ALL @@ -334,6 +479,11 @@ func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent, brarLog.Debugf("Breach transaction %v has been confirmed, sweeping "+ "revoked funds", breachInfo.commitHash) + // We may have to wait for some of the HTLC outputs to be spent to the + // second level before broadcasting the justice tx. We'll store the + // SpendEvents between each attempt to not re-register uneccessarily. + spendNtfns := make(map[wire.OutPoint]*chainntnfs.SpendEvent) + finalTx, err := b.cfg.Store.GetFinalizedTxn(&breachInfo.chanPoint) if err != nil { brarLog.Errorf("unable to get finalized txn for"+ @@ -345,77 +495,8 @@ func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent, // construct a sweep transaction and write it to disk. This will allow // the breach arbiter to re-register for notifications for the justice // txid. - spendNtfns := make(map[wire.OutPoint]*chainntnfs.SpendEvent) - -secondLevelCheck: +justiceTxBroadcast: if finalTx == nil { - // Before we create the justice tx, we need to check to see if - // any of the active HTLC's on the commitment transactions has - // been spent. In this case, we'll need to go to the second - // level to sweep them before the remote party can. - for i := 0; i < len(breachInfo.breachedOutputs); i++ { - breachedOutput := &breachInfo.breachedOutputs[i] - - // If this isn't an HTLC output, then we can skip it. - if breachedOutput.witnessType != lnwallet.HtlcAcceptedRevoke && - breachedOutput.witnessType != lnwallet.HtlcOfferedRevoke { - continue - } - - brarLog.Debugf("Checking for second-level attempt on "+ - "HTLC(%v) for ChannelPoint(%v)", - breachedOutput.outpoint, breachInfo.chanPoint) - - // Now that we have an HTLC output, we'll quickly check - // to see if it has been spent or not. If we have - // already registered for a notification for this - // output, we'll reuse it. - spendNtfn, ok := spendNtfns[breachedOutput.outpoint] - if !ok { - spendNtfn, err = b.cfg.Notifier.RegisterSpendNtfn( - &breachedOutput.outpoint, - breachInfo.breachHeight, true, - ) - if err != nil { - brarLog.Errorf("unable to check for "+ - "spentness of out_point=%v: %v", - breachedOutput.outpoint, err) - - // Registration may have failed if - // we've been instructed to shutdown. - // If so, return here to avoid entering - // an infinite loop. - select { - case <-b.quit: - return - default: - continue - } - } - spendNtfns[breachedOutput.outpoint] = spendNtfn - } - - select { - // The output has been taken to the second level! - case spendDetails, ok := <-spendNtfn.Spend: - if !ok { - return - } - delete(spendNtfns, breachedOutput.outpoint) - - // In this case we'll morph our initial revoke - // spend to instead point to the second level - // output, and update the sign descriptor in - // the process. - convertToSecondLevelRevoke( - breachedOutput, breachInfo, spendDetails, - ) - - // It hasn't been spent so we'll continue. - default: - } - } - // With the breach transaction confirmed, we now create the // justice tx which will claim ALL the funds within the // channel. @@ -443,21 +524,30 @@ secondLevelCheck: // channel's retribution against the cheating counter party. err = b.cfg.PublishTransaction(finalTx) if err != nil { - brarLog.Errorf("unable to broadcast "+ - "justice tx: %v", err) + brarLog.Errorf("unable to broadcast justice tx: %v", err) + if err == lnwallet.ErrDoubleSpend { - brarLog.Infof("Attempting to transfer HTLC revocations " + - "to the second level") + // Broadcasting the transaction failed because of a + // conflict either in the mempool or in chain. We'll + // now create spend subscriptions for all HTLC outputs + // on the commitment transaction that could possibly + // have been spent, and wait for any of them to + // trigger. + brarLog.Infof("Waiting for a spend event before " + + "attempting to craft new justice tx.") finalTx = nil - // Txn publication may fail if we're shutting down. - // If so, return to avoid entering an infinite loop. - select { - case <-b.quit: + err := b.waitForSpendEvent(breachInfo, spendNtfns) + if err != nil { + if err != errBrarShuttingDown { + brarLog.Errorf("error waiting for "+ + "spend event: %v", err) + } return - default: - goto secondLevelCheck } + + brarLog.Infof("Attempting another justice tx broadcast") + goto justiceTxBroadcast } } @@ -469,8 +559,8 @@ secondLevelCheck: confChan, err = b.cfg.Notifier.RegisterConfirmationsNtfn( &justiceTXID, 1, breachConfHeight) if err != nil { - brarLog.Errorf("unable to register for conf for txid: %v", - justiceTXID) + brarLog.Errorf("unable to register for conf for txid(%v): %v", + justiceTXID, err) return }