breacharbiter: wait on spend events instead of timeout
This commit handles a racy condition within the breacharbiter's justice tx procedure. For backends that have no mempool we would check if an HTLC output was spent and then try broadcasting the justice tx, but this would fail since we wouldn't detect the spend before it was in a block. The result was that we would continuously attempt to broadcast the transaction, effectively ending up in an endless (until the second-level tx actually comfirmed) loop. Instead we now register for spend notifications in case broadcasting the transaction fails, and then wait for any of the notifications to be sent before trying again. This is a necessary step to be able to make lnd work well only with confimed transactions, and was a better solution than introducing timeouts within the broadcast loop (which complicates integration tests).
This commit is contained in:
parent
e0560741b4
commit
3bdc968f39
254
breacharbiter.go
254
breacharbiter.go
@ -34,6 +34,10 @@ var (
|
|||||||
// breached contracts. Entries are added to the justice txn bucket just
|
// breached contracts. Entries are added to the justice txn bucket just
|
||||||
// before broadcasting the sweep txn.
|
// before broadcasting the sweep txn.
|
||||||
justiceTxnBucket = []byte("justice-txn")
|
justiceTxnBucket = []byte("justice-txn")
|
||||||
|
|
||||||
|
// errBrarShuttingDown is an error returned if the breacharbiter has
|
||||||
|
// been signalled to exit.
|
||||||
|
errBrarShuttingDown = errors.New("breacharbiter shutting down")
|
||||||
)
|
)
|
||||||
|
|
||||||
// ContractBreachEvent is an event the breachArbiter will receive in case a
|
// ContractBreachEvent is an event the breachArbiter will receive in case a
|
||||||
@ -302,6 +306,147 @@ func convertToSecondLevelRevoke(bo *breachedOutput, breachInfo *retributionInfo,
|
|||||||
bo.outpoint)
|
bo.outpoint)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// waitForSpendEvent waits for any of the breached outputs to get spent, and
|
||||||
|
// mutates the breachInfo to be able to sweep it. This method should be used
|
||||||
|
// when we fail to publish the justice tx because of a double spend, indicating
|
||||||
|
// that the counter party has taken one of the breached outputs to the second
|
||||||
|
// level. The spendNtfns map is a cache used to store registered spend
|
||||||
|
// subscriptions, in case we must call this method multiple times.
|
||||||
|
func (b *breachArbiter) waitForSpendEvent(breachInfo *retributionInfo,
|
||||||
|
spendNtfns map[wire.OutPoint]*chainntnfs.SpendEvent) error {
|
||||||
|
|
||||||
|
// spend is used to wrap the index of the output that gets spent
|
||||||
|
// together with the spend details.
|
||||||
|
type spend struct {
|
||||||
|
index int
|
||||||
|
detail *chainntnfs.SpendDetail
|
||||||
|
}
|
||||||
|
|
||||||
|
// We create a channel the first goroutine that gets a spend event can
|
||||||
|
// signal. We make it buffered in case multiple spend events come in at
|
||||||
|
// the same time.
|
||||||
|
anySpend := make(chan struct{}, len(breachInfo.breachedOutputs))
|
||||||
|
|
||||||
|
// The allSpends channel will be used to pass spend events from all the
|
||||||
|
// goroutines that detects a spend before they are signalled to exit.
|
||||||
|
allSpends := make(chan spend, len(breachInfo.breachedOutputs))
|
||||||
|
|
||||||
|
// exit will be used to signal the goroutines that they can exit.
|
||||||
|
exit := make(chan struct{})
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
// We'll now launch a goroutine for each of the HTLC outputs, that will
|
||||||
|
// signal the moment they detect a spend event.
|
||||||
|
for i := 0; i < len(breachInfo.breachedOutputs); i++ {
|
||||||
|
breachedOutput := &breachInfo.breachedOutputs[i]
|
||||||
|
|
||||||
|
// If this isn't an HTLC output, then we can skip it.
|
||||||
|
if breachedOutput.witnessType != lnwallet.HtlcAcceptedRevoke &&
|
||||||
|
breachedOutput.witnessType != lnwallet.HtlcOfferedRevoke {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
brarLog.Debugf("Checking for second-level attempt on HTLC(%v) "+
|
||||||
|
"for ChannelPoint(%v)", breachedOutput.outpoint,
|
||||||
|
breachInfo.chanPoint)
|
||||||
|
|
||||||
|
// If we have already registered for a notification for this
|
||||||
|
// output, we'll reuse it.
|
||||||
|
spendNtfn, ok := spendNtfns[breachedOutput.outpoint]
|
||||||
|
if !ok {
|
||||||
|
var err error
|
||||||
|
spendNtfn, err = b.cfg.Notifier.RegisterSpendNtfn(
|
||||||
|
&breachedOutput.outpoint,
|
||||||
|
breachInfo.breachHeight, true,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
brarLog.Errorf("unable to check for spentness "+
|
||||||
|
"of out_point=%v: %v",
|
||||||
|
breachedOutput.outpoint, err)
|
||||||
|
|
||||||
|
// Registration may have failed if we've been
|
||||||
|
// instructed to shutdown. If so, return here
|
||||||
|
// to avoid entering an infinite loop.
|
||||||
|
select {
|
||||||
|
case <-b.quit:
|
||||||
|
return errBrarShuttingDown
|
||||||
|
default:
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
spendNtfns[breachedOutput.outpoint] = spendNtfn
|
||||||
|
}
|
||||||
|
|
||||||
|
// Launch a goroutine waiting for a spend event.
|
||||||
|
b.wg.Add(1)
|
||||||
|
wg.Add(1)
|
||||||
|
go func(index int, spendEv *chainntnfs.SpendEvent) {
|
||||||
|
defer b.wg.Done()
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
select {
|
||||||
|
// The output has been taken to the second level!
|
||||||
|
case sp, ok := <-spendEv.Spend:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
brarLog.Debugf("Detected spend of HTLC(%v) "+
|
||||||
|
"for ChannelPoint(%v)",
|
||||||
|
breachedOutput.outpoint,
|
||||||
|
breachInfo.chanPoint)
|
||||||
|
|
||||||
|
// First we send the spend event on the
|
||||||
|
// allSpends channel, such that it can be
|
||||||
|
// handled after all go routines have exited.
|
||||||
|
allSpends <- spend{index, sp}
|
||||||
|
|
||||||
|
// Finally we'll signal the anySpend channel
|
||||||
|
// that a spend was detected, such that the
|
||||||
|
// other goroutines can be shut down.
|
||||||
|
anySpend <- struct{}{}
|
||||||
|
case <-exit:
|
||||||
|
return
|
||||||
|
case <-b.quit:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}(i, spendNtfn)
|
||||||
|
}
|
||||||
|
|
||||||
|
// We'll wait for any of the outputs to be spent, or that we are
|
||||||
|
// signalled to exit.
|
||||||
|
select {
|
||||||
|
// A goroutine have signalled that a spend occured.
|
||||||
|
case <-anySpend:
|
||||||
|
// Signal for the remaining goroutines to exit.
|
||||||
|
close(exit)
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// At this point all goroutines that can send on the allSpends
|
||||||
|
// channel have exited. We can therefore safely close the
|
||||||
|
// channel before ranging over its content.
|
||||||
|
close(allSpends)
|
||||||
|
for s := range allSpends {
|
||||||
|
breachedOutput := &breachInfo.breachedOutputs[s.index]
|
||||||
|
brarLog.Debugf("Detected second-level spend on "+
|
||||||
|
"HTLC(%v) for ChannelPoint(%v)",
|
||||||
|
breachedOutput.outpoint, breachInfo.chanPoint)
|
||||||
|
|
||||||
|
delete(spendNtfns, breachedOutput.outpoint)
|
||||||
|
|
||||||
|
// In this case we'll morph our initial revoke spend to
|
||||||
|
// instead point to the second level output, and update
|
||||||
|
// the sign descriptor in the process.
|
||||||
|
convertToSecondLevelRevoke(
|
||||||
|
breachedOutput, breachInfo, s.detail,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
case <-b.quit:
|
||||||
|
return errBrarShuttingDown
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// exactRetribution is a goroutine which is executed once a contract breach has
|
// exactRetribution is a goroutine which is executed once a contract breach has
|
||||||
// been detected by a breachObserver. This function is responsible for
|
// been detected by a breachObserver. This function is responsible for
|
||||||
// punishing a counterparty for violating the channel contract by sweeping ALL
|
// punishing a counterparty for violating the channel contract by sweeping ALL
|
||||||
@ -334,6 +479,11 @@ func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent,
|
|||||||
brarLog.Debugf("Breach transaction %v has been confirmed, sweeping "+
|
brarLog.Debugf("Breach transaction %v has been confirmed, sweeping "+
|
||||||
"revoked funds", breachInfo.commitHash)
|
"revoked funds", breachInfo.commitHash)
|
||||||
|
|
||||||
|
// We may have to wait for some of the HTLC outputs to be spent to the
|
||||||
|
// second level before broadcasting the justice tx. We'll store the
|
||||||
|
// SpendEvents between each attempt to not re-register uneccessarily.
|
||||||
|
spendNtfns := make(map[wire.OutPoint]*chainntnfs.SpendEvent)
|
||||||
|
|
||||||
finalTx, err := b.cfg.Store.GetFinalizedTxn(&breachInfo.chanPoint)
|
finalTx, err := b.cfg.Store.GetFinalizedTxn(&breachInfo.chanPoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
brarLog.Errorf("unable to get finalized txn for"+
|
brarLog.Errorf("unable to get finalized txn for"+
|
||||||
@ -345,77 +495,8 @@ func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent,
|
|||||||
// construct a sweep transaction and write it to disk. This will allow
|
// construct a sweep transaction and write it to disk. This will allow
|
||||||
// the breach arbiter to re-register for notifications for the justice
|
// the breach arbiter to re-register for notifications for the justice
|
||||||
// txid.
|
// txid.
|
||||||
spendNtfns := make(map[wire.OutPoint]*chainntnfs.SpendEvent)
|
justiceTxBroadcast:
|
||||||
|
|
||||||
secondLevelCheck:
|
|
||||||
if finalTx == nil {
|
if finalTx == nil {
|
||||||
// Before we create the justice tx, we need to check to see if
|
|
||||||
// any of the active HTLC's on the commitment transactions has
|
|
||||||
// been spent. In this case, we'll need to go to the second
|
|
||||||
// level to sweep them before the remote party can.
|
|
||||||
for i := 0; i < len(breachInfo.breachedOutputs); i++ {
|
|
||||||
breachedOutput := &breachInfo.breachedOutputs[i]
|
|
||||||
|
|
||||||
// If this isn't an HTLC output, then we can skip it.
|
|
||||||
if breachedOutput.witnessType != lnwallet.HtlcAcceptedRevoke &&
|
|
||||||
breachedOutput.witnessType != lnwallet.HtlcOfferedRevoke {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
brarLog.Debugf("Checking for second-level attempt on "+
|
|
||||||
"HTLC(%v) for ChannelPoint(%v)",
|
|
||||||
breachedOutput.outpoint, breachInfo.chanPoint)
|
|
||||||
|
|
||||||
// Now that we have an HTLC output, we'll quickly check
|
|
||||||
// to see if it has been spent or not. If we have
|
|
||||||
// already registered for a notification for this
|
|
||||||
// output, we'll reuse it.
|
|
||||||
spendNtfn, ok := spendNtfns[breachedOutput.outpoint]
|
|
||||||
if !ok {
|
|
||||||
spendNtfn, err = b.cfg.Notifier.RegisterSpendNtfn(
|
|
||||||
&breachedOutput.outpoint,
|
|
||||||
breachInfo.breachHeight, true,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
brarLog.Errorf("unable to check for "+
|
|
||||||
"spentness of out_point=%v: %v",
|
|
||||||
breachedOutput.outpoint, err)
|
|
||||||
|
|
||||||
// Registration may have failed if
|
|
||||||
// we've been instructed to shutdown.
|
|
||||||
// If so, return here to avoid entering
|
|
||||||
// an infinite loop.
|
|
||||||
select {
|
|
||||||
case <-b.quit:
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
spendNtfns[breachedOutput.outpoint] = spendNtfn
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
// The output has been taken to the second level!
|
|
||||||
case spendDetails, ok := <-spendNtfn.Spend:
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
delete(spendNtfns, breachedOutput.outpoint)
|
|
||||||
|
|
||||||
// In this case we'll morph our initial revoke
|
|
||||||
// spend to instead point to the second level
|
|
||||||
// output, and update the sign descriptor in
|
|
||||||
// the process.
|
|
||||||
convertToSecondLevelRevoke(
|
|
||||||
breachedOutput, breachInfo, spendDetails,
|
|
||||||
)
|
|
||||||
|
|
||||||
// It hasn't been spent so we'll continue.
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// With the breach transaction confirmed, we now create the
|
// With the breach transaction confirmed, we now create the
|
||||||
// justice tx which will claim ALL the funds within the
|
// justice tx which will claim ALL the funds within the
|
||||||
// channel.
|
// channel.
|
||||||
@ -443,21 +524,30 @@ secondLevelCheck:
|
|||||||
// channel's retribution against the cheating counter party.
|
// channel's retribution against the cheating counter party.
|
||||||
err = b.cfg.PublishTransaction(finalTx)
|
err = b.cfg.PublishTransaction(finalTx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
brarLog.Errorf("unable to broadcast "+
|
brarLog.Errorf("unable to broadcast justice tx: %v", err)
|
||||||
"justice tx: %v", err)
|
|
||||||
if err == lnwallet.ErrDoubleSpend {
|
if err == lnwallet.ErrDoubleSpend {
|
||||||
brarLog.Infof("Attempting to transfer HTLC revocations " +
|
// Broadcasting the transaction failed because of a
|
||||||
"to the second level")
|
// conflict either in the mempool or in chain. We'll
|
||||||
|
// now create spend subscriptions for all HTLC outputs
|
||||||
|
// on the commitment transaction that could possibly
|
||||||
|
// have been spent, and wait for any of them to
|
||||||
|
// trigger.
|
||||||
|
brarLog.Infof("Waiting for a spend event before " +
|
||||||
|
"attempting to craft new justice tx.")
|
||||||
finalTx = nil
|
finalTx = nil
|
||||||
|
|
||||||
// Txn publication may fail if we're shutting down.
|
err := b.waitForSpendEvent(breachInfo, spendNtfns)
|
||||||
// If so, return to avoid entering an infinite loop.
|
if err != nil {
|
||||||
select {
|
if err != errBrarShuttingDown {
|
||||||
case <-b.quit:
|
brarLog.Errorf("error waiting for "+
|
||||||
|
"spend event: %v", err)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
default:
|
|
||||||
goto secondLevelCheck
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
brarLog.Infof("Attempting another justice tx broadcast")
|
||||||
|
goto justiceTxBroadcast
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -469,8 +559,8 @@ secondLevelCheck:
|
|||||||
confChan, err = b.cfg.Notifier.RegisterConfirmationsNtfn(
|
confChan, err = b.cfg.Notifier.RegisterConfirmationsNtfn(
|
||||||
&justiceTXID, 1, breachConfHeight)
|
&justiceTXID, 1, breachConfHeight)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
brarLog.Errorf("unable to register for conf for txid: %v",
|
brarLog.Errorf("unable to register for conf for txid(%v): %v",
|
||||||
justiceTXID)
|
justiceTXID, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user