Merge pull request #1312 from halseth/breacharbiter-wait-for-spend

breacharbiter: wait on spend events instead of timeout
This commit is contained in:
Olaoluwa Osuntokun 2018-06-26 17:50:47 -07:00 committed by GitHub
commit b44ce7b366
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 310 additions and 140 deletions

@ -34,6 +34,10 @@ var (
// breached contracts. Entries are added to the justice txn bucket just // breached contracts. Entries are added to the justice txn bucket just
// before broadcasting the sweep txn. // before broadcasting the sweep txn.
justiceTxnBucket = []byte("justice-txn") justiceTxnBucket = []byte("justice-txn")
// errBrarShuttingDown is an error returned if the breacharbiter has
// been signalled to exit.
errBrarShuttingDown = errors.New("breacharbiter shutting down")
) )
// ContractBreachEvent is an event the breachArbiter will receive in case a // ContractBreachEvent is an event the breachArbiter will receive in case a
@ -302,6 +306,147 @@ func convertToSecondLevelRevoke(bo *breachedOutput, breachInfo *retributionInfo,
bo.outpoint) bo.outpoint)
} }
// waitForSpendEvent waits for any of the breached outputs to get spent, and
// mutates the breachInfo to be able to sweep it. This method should be used
// when we fail to publish the justice tx because of a double spend, indicating
// that the counter party has taken one of the breached outputs to the second
// level. The spendNtfns map is a cache used to store registered spend
// subscriptions, in case we must call this method multiple times.
func (b *breachArbiter) waitForSpendEvent(breachInfo *retributionInfo,
spendNtfns map[wire.OutPoint]*chainntnfs.SpendEvent) error {
// spend is used to wrap the index of the output that gets spent
// together with the spend details.
type spend struct {
index int
detail *chainntnfs.SpendDetail
}
// We create a channel the first goroutine that gets a spend event can
// signal. We make it buffered in case multiple spend events come in at
// the same time.
anySpend := make(chan struct{}, len(breachInfo.breachedOutputs))
// The allSpends channel will be used to pass spend events from all the
// goroutines that detects a spend before they are signalled to exit.
allSpends := make(chan spend, len(breachInfo.breachedOutputs))
// exit will be used to signal the goroutines that they can exit.
exit := make(chan struct{})
var wg sync.WaitGroup
// We'll now launch a goroutine for each of the HTLC outputs, that will
// signal the moment they detect a spend event.
for i := 0; i < len(breachInfo.breachedOutputs); i++ {
breachedOutput := &breachInfo.breachedOutputs[i]
// If this isn't an HTLC output, then we can skip it.
if breachedOutput.witnessType != lnwallet.HtlcAcceptedRevoke &&
breachedOutput.witnessType != lnwallet.HtlcOfferedRevoke {
continue
}
brarLog.Debugf("Checking for second-level attempt on HTLC(%v) "+
"for ChannelPoint(%v)", breachedOutput.outpoint,
breachInfo.chanPoint)
// If we have already registered for a notification for this
// output, we'll reuse it.
spendNtfn, ok := spendNtfns[breachedOutput.outpoint]
if !ok {
var err error
spendNtfn, err = b.cfg.Notifier.RegisterSpendNtfn(
&breachedOutput.outpoint,
breachInfo.breachHeight, true,
)
if err != nil {
brarLog.Errorf("unable to check for spentness "+
"of out_point=%v: %v",
breachedOutput.outpoint, err)
// Registration may have failed if we've been
// instructed to shutdown. If so, return here
// to avoid entering an infinite loop.
select {
case <-b.quit:
return errBrarShuttingDown
default:
continue
}
}
spendNtfns[breachedOutput.outpoint] = spendNtfn
}
// Launch a goroutine waiting for a spend event.
b.wg.Add(1)
wg.Add(1)
go func(index int, spendEv *chainntnfs.SpendEvent) {
defer b.wg.Done()
defer wg.Done()
select {
// The output has been taken to the second level!
case sp, ok := <-spendEv.Spend:
if !ok {
return
}
brarLog.Debugf("Detected spend of HTLC(%v) "+
"for ChannelPoint(%v)",
breachedOutput.outpoint,
breachInfo.chanPoint)
// First we send the spend event on the
// allSpends channel, such that it can be
// handled after all go routines have exited.
allSpends <- spend{index, sp}
// Finally we'll signal the anySpend channel
// that a spend was detected, such that the
// other goroutines can be shut down.
anySpend <- struct{}{}
case <-exit:
return
case <-b.quit:
return
}
}(i, spendNtfn)
}
// We'll wait for any of the outputs to be spent, or that we are
// signalled to exit.
select {
// A goroutine have signalled that a spend occured.
case <-anySpend:
// Signal for the remaining goroutines to exit.
close(exit)
wg.Wait()
// At this point all goroutines that can send on the allSpends
// channel have exited. We can therefore safely close the
// channel before ranging over its content.
close(allSpends)
for s := range allSpends {
breachedOutput := &breachInfo.breachedOutputs[s.index]
brarLog.Debugf("Detected second-level spend on "+
"HTLC(%v) for ChannelPoint(%v)",
breachedOutput.outpoint, breachInfo.chanPoint)
delete(spendNtfns, breachedOutput.outpoint)
// In this case we'll morph our initial revoke spend to
// instead point to the second level output, and update
// the sign descriptor in the process.
convertToSecondLevelRevoke(
breachedOutput, breachInfo, s.detail,
)
}
case <-b.quit:
return errBrarShuttingDown
}
return nil
}
// exactRetribution is a goroutine which is executed once a contract breach has // exactRetribution is a goroutine which is executed once a contract breach has
// been detected by a breachObserver. This function is responsible for // been detected by a breachObserver. This function is responsible for
// punishing a counterparty for violating the channel contract by sweeping ALL // punishing a counterparty for violating the channel contract by sweeping ALL
@ -334,6 +479,11 @@ func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent,
brarLog.Debugf("Breach transaction %v has been confirmed, sweeping "+ brarLog.Debugf("Breach transaction %v has been confirmed, sweeping "+
"revoked funds", breachInfo.commitHash) "revoked funds", breachInfo.commitHash)
// We may have to wait for some of the HTLC outputs to be spent to the
// second level before broadcasting the justice tx. We'll store the
// SpendEvents between each attempt to not re-register uneccessarily.
spendNtfns := make(map[wire.OutPoint]*chainntnfs.SpendEvent)
finalTx, err := b.cfg.Store.GetFinalizedTxn(&breachInfo.chanPoint) finalTx, err := b.cfg.Store.GetFinalizedTxn(&breachInfo.chanPoint)
if err != nil { if err != nil {
brarLog.Errorf("unable to get finalized txn for"+ brarLog.Errorf("unable to get finalized txn for"+
@ -345,77 +495,8 @@ func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent,
// construct a sweep transaction and write it to disk. This will allow // construct a sweep transaction and write it to disk. This will allow
// the breach arbiter to re-register for notifications for the justice // the breach arbiter to re-register for notifications for the justice
// txid. // txid.
spendNtfns := make(map[wire.OutPoint]*chainntnfs.SpendEvent) justiceTxBroadcast:
secondLevelCheck:
if finalTx == nil { if finalTx == nil {
// Before we create the justice tx, we need to check to see if
// any of the active HTLC's on the commitment transactions has
// been spent. In this case, we'll need to go to the second
// level to sweep them before the remote party can.
for i := 0; i < len(breachInfo.breachedOutputs); i++ {
breachedOutput := &breachInfo.breachedOutputs[i]
// If this isn't an HTLC output, then we can skip it.
if breachedOutput.witnessType != lnwallet.HtlcAcceptedRevoke &&
breachedOutput.witnessType != lnwallet.HtlcOfferedRevoke {
continue
}
brarLog.Debugf("Checking for second-level attempt on "+
"HTLC(%v) for ChannelPoint(%v)",
breachedOutput.outpoint, breachInfo.chanPoint)
// Now that we have an HTLC output, we'll quickly check
// to see if it has been spent or not. If we have
// already registered for a notification for this
// output, we'll reuse it.
spendNtfn, ok := spendNtfns[breachedOutput.outpoint]
if !ok {
spendNtfn, err = b.cfg.Notifier.RegisterSpendNtfn(
&breachedOutput.outpoint,
breachInfo.breachHeight, true,
)
if err != nil {
brarLog.Errorf("unable to check for "+
"spentness of out_point=%v: %v",
breachedOutput.outpoint, err)
// Registration may have failed if
// we've been instructed to shutdown.
// If so, return here to avoid entering
// an infinite loop.
select {
case <-b.quit:
return
default:
continue
}
}
spendNtfns[breachedOutput.outpoint] = spendNtfn
}
select {
// The output has been taken to the second level!
case spendDetails, ok := <-spendNtfn.Spend:
if !ok {
return
}
delete(spendNtfns, breachedOutput.outpoint)
// In this case we'll morph our initial revoke
// spend to instead point to the second level
// output, and update the sign descriptor in
// the process.
convertToSecondLevelRevoke(
breachedOutput, breachInfo, spendDetails,
)
// It hasn't been spent so we'll continue.
default:
}
}
// With the breach transaction confirmed, we now create the // With the breach transaction confirmed, we now create the
// justice tx which will claim ALL the funds within the // justice tx which will claim ALL the funds within the
// channel. // channel.
@ -443,21 +524,30 @@ secondLevelCheck:
// channel's retribution against the cheating counter party. // channel's retribution against the cheating counter party.
err = b.cfg.PublishTransaction(finalTx) err = b.cfg.PublishTransaction(finalTx)
if err != nil { if err != nil {
brarLog.Errorf("unable to broadcast "+ brarLog.Errorf("unable to broadcast justice tx: %v", err)
"justice tx: %v", err)
if err == lnwallet.ErrDoubleSpend { if err == lnwallet.ErrDoubleSpend {
brarLog.Infof("Attempting to transfer HTLC revocations " + // Broadcasting the transaction failed because of a
"to the second level") // conflict either in the mempool or in chain. We'll
// now create spend subscriptions for all HTLC outputs
// on the commitment transaction that could possibly
// have been spent, and wait for any of them to
// trigger.
brarLog.Infof("Waiting for a spend event before " +
"attempting to craft new justice tx.")
finalTx = nil finalTx = nil
// Txn publication may fail if we're shutting down. err := b.waitForSpendEvent(breachInfo, spendNtfns)
// If so, return to avoid entering an infinite loop. if err != nil {
select { if err != errBrarShuttingDown {
case <-b.quit: brarLog.Errorf("error waiting for "+
return "spend event: %v", err)
default:
goto secondLevelCheck
} }
return
}
brarLog.Infof("Attempting another justice tx broadcast")
goto justiceTxBroadcast
} }
} }
@ -469,8 +559,8 @@ secondLevelCheck:
confChan, err = b.cfg.Notifier.RegisterConfirmationsNtfn( confChan, err = b.cfg.Notifier.RegisterConfirmationsNtfn(
&justiceTXID, 1, breachConfHeight) &justiceTXID, 1, breachConfHeight)
if err != nil { if err != nil {
brarLog.Errorf("unable to register for conf for txid: %v", brarLog.Errorf("unable to register for conf for txid(%v): %v",
justiceTXID) justiceTXID, err)
return return
} }

@ -20,6 +20,7 @@ import (
"github.com/btcsuite/btclog" "github.com/btcsuite/btclog"
"github.com/go-errors/errors" "github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/keychain"
@ -933,11 +934,10 @@ restartCheck:
} }
} }
// TestBreachHandoffSuccess tests that a channel's close observer properly func initBreachedState(t *testing.T) (*breachArbiter,
// delivers retribution information to the breach arbiter in response to a *lnwallet.LightningChannel, *lnwallet.LightningChannel,
// breach close. This test verifies correctness in the event that the handoff *lnwallet.LocalForceCloseSummary, chan *ContractBreachEvent,
// experiences no interruptions. func(), func()) {
func TestBreachHandoffSuccess(t *testing.T) {
// Create a pair of channels using a notifier that allows us to signal // Create a pair of channels using a notifier that allows us to signal
// a spend of the funding transaction. Alice's channel will be the on // a spend of the funding transaction. Alice's channel will be the on
// observing a breach. // observing a breach.
@ -945,7 +945,6 @@ func TestBreachHandoffSuccess(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("unable to create test channels: %v", err) t.Fatalf("unable to create test channels: %v", err)
} }
defer cleanUpChans()
// Instantiate a breach arbiter to handle the breach of alice's channel. // Instantiate a breach arbiter to handle the breach of alice's channel.
contractBreaches := make(chan *ContractBreachEvent) contractBreaches := make(chan *ContractBreachEvent)
@ -956,7 +955,6 @@ func TestBreachHandoffSuccess(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("unable to initialize test breach arbiter: %v", err) t.Fatalf("unable to initialize test breach arbiter: %v", err)
} }
defer cleanUpArb()
// Send one HTLC to Bob and perform a state transition to lock it in. // Send one HTLC to Bob and perform a state transition to lock it in.
htlcAmount := lnwire.NewMSatFromSatoshis(20000) htlcAmount := lnwire.NewMSatFromSatoshis(20000)
@ -991,6 +989,20 @@ func TestBreachHandoffSuccess(t *testing.T) {
t.Fatalf("Can't update the channel state: %v", err) t.Fatalf("Can't update the channel state: %v", err)
} }
return brar, alice, bob, bobClose, contractBreaches, cleanUpChans,
cleanUpArb
}
// TestBreachHandoffSuccess tests that a channel's close observer properly
// delivers retribution information to the breach arbiter in response to a
// breach close. This test verifies correctness in the event that the handoff
// experiences no interruptions.
func TestBreachHandoffSuccess(t *testing.T) {
brar, alice, _, bobClose, contractBreaches,
cleanUpChans, cleanUpArb := initBreachedState(t)
defer cleanUpChans()
defer cleanUpArb()
chanPoint := alice.ChanPoint chanPoint := alice.ChanPoint
// Signal a spend of the funding transaction and wait for the close // Signal a spend of the funding transaction and wait for the close
@ -1052,59 +1064,11 @@ func TestBreachHandoffSuccess(t *testing.T) {
// arbiter fails to write the information to disk, and that a subsequent attempt // arbiter fails to write the information to disk, and that a subsequent attempt
// at the handoff succeeds. // at the handoff succeeds.
func TestBreachHandoffFail(t *testing.T) { func TestBreachHandoffFail(t *testing.T) {
// Create a pair of channels using a notifier that allows us to signal brar, alice, _, bobClose, contractBreaches,
// a spend of the funding transaction. Alice's channel will be the on cleanUpChans, cleanUpArb := initBreachedState(t)
// observing a breach.
alice, bob, cleanUpChans, err := createInitChannels(1)
if err != nil {
t.Fatalf("unable to create test channels: %v", err)
}
defer cleanUpChans() defer cleanUpChans()
// Instantiate a breach arbiter to handle the breach of alice's channel.
contractBreaches := make(chan *ContractBreachEvent)
brar, cleanUpArb, err := createTestArbiter(
t, contractBreaches, alice.State().Db,
)
if err != nil {
t.Fatalf("unable to initialize test breach arbiter: %v", err)
}
defer cleanUpArb() defer cleanUpArb()
// Send one HTLC to Bob and perform a state transition to lock it in.
htlcAmount := lnwire.NewMSatFromSatoshis(20000)
htlc, _ := createHTLC(0, htlcAmount)
if _, err := alice.AddHTLC(htlc, nil); err != nil {
t.Fatalf("alice unable to add htlc: %v", err)
}
if _, err := bob.ReceiveHTLC(htlc); err != nil {
t.Fatalf("bob unable to recv add htlc: %v", err)
}
if err := forceStateTransition(alice, bob); err != nil {
t.Fatalf("Can't update the channel state: %v", err)
}
// Generate the force close summary at this point in time, this will
// serve as the old state bob will broadcast.
bobClose, err := bob.ForceClose()
if err != nil {
t.Fatalf("unable to force close bob's channel: %v", err)
}
// Now send another HTLC and perform a state transition, this ensures
// Alice is ahead of the state Bob will broadcast.
htlc2, _ := createHTLC(1, htlcAmount)
if _, err := alice.AddHTLC(htlc2, nil); err != nil {
t.Fatalf("alice unable to add htlc: %v", err)
}
if _, err := bob.ReceiveHTLC(htlc2); err != nil {
t.Fatalf("bob unable to recv add htlc: %v", err)
}
if err := forceStateTransition(alice, bob); err != nil {
t.Fatalf("Can't update the channel state: %v", err)
}
// Before alerting Alice of the breach, instruct our failing retribution // Before alerting Alice of the breach, instruct our failing retribution
// store to fail the next database operation, which we expect to write // store to fail the next database operation, which we expect to write
// the information handed off by the channel's close observer. // the information handed off by the channel's close observer.
@ -1139,7 +1103,7 @@ func TestBreachHandoffFail(t *testing.T) {
assertNoArbiterBreach(t, brar, chanPoint) assertNoArbiterBreach(t, brar, chanPoint)
assertNotPendingClosed(t, alice) assertNotPendingClosed(t, alice)
brar, cleanUpArb, err = createTestArbiter( brar, cleanUpArb, err := createTestArbiter(
t, contractBreaches, alice.State().Db, t, contractBreaches, alice.State().Db,
) )
if err != nil { if err != nil {
@ -1186,6 +1150,117 @@ func TestBreachHandoffFail(t *testing.T) {
assertArbiterBreach(t, brar, chanPoint) assertArbiterBreach(t, brar, chanPoint)
} }
// TestBreachSecondLevelTransfer tests that sweep of a HTLC output on a
// breached commitment is transferred to a second level spend if the output is
// already spent.
func TestBreachSecondLevelTransfer(t *testing.T) {
brar, alice, _, bobClose, contractBreaches,
cleanUpChans, cleanUpArb := initBreachedState(t)
defer cleanUpChans()
defer cleanUpArb()
var (
height = bobClose.ChanSnapshot.CommitHeight
forceCloseTx = bobClose.CloseTx
chanPoint = alice.ChanPoint
publTx = make(chan *wire.MsgTx)
publErr error
)
// Make PublishTransaction always return ErrDoubleSpend to begin with.
publErr = lnwallet.ErrDoubleSpend
brar.cfg.PublishTransaction = func(tx *wire.MsgTx) error {
publTx <- tx
return publErr
}
// Notify the breach arbiter about the breach.
retribution, err := lnwallet.NewBreachRetribution(
alice.State(), height, forceCloseTx, 1)
if err != nil {
t.Fatalf("unable to create breach retribution: %v", err)
}
breach := &ContractBreachEvent{
ChanPoint: *chanPoint,
ProcessACK: make(chan error, 1),
BreachRetribution: retribution,
}
contractBreaches <- breach
// We'll also wait to consume the ACK back from the breach arbiter.
select {
case err := <-breach.ProcessACK:
if err != nil {
t.Fatalf("handoff failed: %v", err)
}
case <-time.After(time.Second * 15):
t.Fatalf("breach arbiter didn't send ack back")
}
// After exiting, the breach arbiter should have persisted the
// retribution information and the channel should be shown as pending
// force closed.
assertArbiterBreach(t, brar, chanPoint)
// Notify that the breaching transaction is confirmed, to trigger the
// retribution logic.
notifier := brar.cfg.Notifier.(*mockSpendNotifier)
notifier.confChannel <- &chainntnfs.TxConfirmation{}
// The breach arbiter should attempt to sweep all outputs on the
// breached commitment. We'll pretend that the HTLC output has been
// spent by the channel counter party's second level tx already.
var tx *wire.MsgTx
select {
case tx = <-publTx:
case <-time.After(5 * time.Second):
t.Fatalf("tx was not published")
}
if tx.TxIn[0].PreviousOutPoint.Hash != forceCloseTx.TxHash() {
t.Fatalf("tx not attempting to spend commitment")
}
// Find the index of the TxIn spending the HTLC output.
htlcOutpoint := &retribution.HtlcRetributions[0].OutPoint
htlcIn := -1
for i, txIn := range tx.TxIn {
if txIn.PreviousOutPoint == *htlcOutpoint {
htlcIn = i
}
}
if htlcIn == -1 {
t.Fatalf("htlc in not found")
}
// Since publishing the transaction failed above, the breach arbiter
// will attempt another second level check. Now notify that the htlc
// output is spent by a second level tx.
secondLvlTx := &wire.MsgTx{
TxOut: []*wire.TxOut{
&wire.TxOut{Value: 1},
},
}
notifier.Spend(htlcOutpoint, 2, secondLvlTx)
// Now a transaction attempting to spend from the second level tx
// should be published instead. Let this publish succeed by setting the
// publishing error to nil.
publErr = nil
select {
case tx = <-publTx:
case <-time.After(5 * time.Second):
t.Fatalf("tx was not published")
}
// The TxIn previously attempting to spend the HTLC outpoint should now
// be spending from the second level tx.
if tx.TxIn[htlcIn].PreviousOutPoint.Hash != secondLvlTx.TxHash() {
t.Fatalf("tx not attempting to spend second level tx, %v", tx.TxIn[0])
}
}
// assertArbiterBreach checks that the breach arbiter has persisted the breach // assertArbiterBreach checks that the breach arbiter has persisted the breach
// information for a particular channel. // information for a particular channel.
func assertArbiterBreach(t *testing.T, brar *breachArbiter, func assertArbiterBreach(t *testing.T, brar *breachArbiter,

@ -118,6 +118,7 @@ func (m *mockNotfier) RegisterSpendNtfn(outpoint *wire.OutPoint,
type mockSpendNotifier struct { type mockSpendNotifier struct {
*mockNotfier *mockNotfier
spendMap map[wire.OutPoint][]chan *chainntnfs.SpendDetail spendMap map[wire.OutPoint][]chan *chainntnfs.SpendDetail
mtx sync.Mutex
} }
func makeMockSpendNotifier() *mockSpendNotifier { func makeMockSpendNotifier() *mockSpendNotifier {
@ -131,6 +132,8 @@ func makeMockSpendNotifier() *mockSpendNotifier {
func (m *mockSpendNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, func (m *mockSpendNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
heightHint uint32, _ bool) (*chainntnfs.SpendEvent, error) { heightHint uint32, _ bool) (*chainntnfs.SpendEvent, error) {
m.mtx.Lock()
defer m.mtx.Unlock()
spendChan := make(chan *chainntnfs.SpendDetail) spendChan := make(chan *chainntnfs.SpendDetail)
m.spendMap[*outpoint] = append(m.spendMap[*outpoint], spendChan) m.spendMap[*outpoint] = append(m.spendMap[*outpoint], spendChan)
@ -145,6 +148,8 @@ func (m *mockSpendNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
// will include the transaction and height provided by the caller. // will include the transaction and height provided by the caller.
func (m *mockSpendNotifier) Spend(outpoint *wire.OutPoint, height int32, func (m *mockSpendNotifier) Spend(outpoint *wire.OutPoint, height int32,
txn *wire.MsgTx) { txn *wire.MsgTx) {
m.mtx.Lock()
defer m.mtx.Unlock()
if spendChans, ok := m.spendMap[*outpoint]; ok { if spendChans, ok := m.spendMap[*outpoint]; ok {
delete(m.spendMap, *outpoint) delete(m.spendMap, *outpoint)