Merge pull request #5229 from halseth/breacharbiter-twoway-handoff

breacharbiter<->chainwatcher two-way handoff
This commit is contained in:
Conner Fromknecht 2021-05-06 16:17:03 -07:00 committed by GitHub
commit 6e5ccb4efb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 113 additions and 72 deletions

@ -46,17 +46,19 @@ var (
// ContractBreachEvent is an event the breachArbiter will receive in case a // ContractBreachEvent is an event the breachArbiter will receive in case a
// contract breach is observed on-chain. It contains the necessary information // contract breach is observed on-chain. It contains the necessary information
// to handle the breach, and a ProcessACK channel we will use to ACK the event // to handle the breach, and a ProcessACK closure we will use to ACK the event
// when we have safely stored all the necessary information. // when we have safely stored all the necessary information.
type ContractBreachEvent struct { type ContractBreachEvent struct {
// ChanPoint is the channel point of the breached channel. // ChanPoint is the channel point of the breached channel.
ChanPoint wire.OutPoint ChanPoint wire.OutPoint
// ProcessACK is an error channel where a nil error should be sent // ProcessACK is an closure that should be called with a nil error iff
// iff the breach retribution info is safely stored in the retribution // the breach retribution info is safely stored in the retribution
// store. In case storing the information to the store fails, a non-nil // store. In case storing the information to the store fails, a non-nil
// error should be sent. // error should be used. When this closure returns, it means that the
ProcessACK chan error // contract court has marked the channel pending close in the DB, and
// it is safe for the BreachArbiter to carry on its duty.
ProcessACK func(error)
// BreachRetribution is the information needed to act on this contract // BreachRetribution is the information needed to act on this contract
// breach. // breach.
@ -745,10 +747,8 @@ func (b *breachArbiter) handleBreachHandoff(breachEvent *ContractBreachEvent) {
b.Unlock() b.Unlock()
brarLog.Errorf("Unable to check breach info in DB: %v", err) brarLog.Errorf("Unable to check breach info in DB: %v", err)
select { // Notify about the failed lookup and return.
case breachEvent.ProcessACK <- err: breachEvent.ProcessACK(err)
case <-b.quit:
}
return return
} }
@ -757,11 +757,7 @@ func (b *breachArbiter) handleBreachHandoff(breachEvent *ContractBreachEvent) {
// case we can safely ACK the handoff, and return. // case we can safely ACK the handoff, and return.
if breached { if breached {
b.Unlock() b.Unlock()
breachEvent.ProcessACK(nil)
select {
case breachEvent.ProcessACK <- nil:
case <-b.quit:
}
return return
} }
@ -782,14 +778,10 @@ func (b *breachArbiter) handleBreachHandoff(breachEvent *ContractBreachEvent) {
// acknowledgment back to the close observer with the error. If // acknowledgment back to the close observer with the error. If
// the ack is successful, the close observer will mark the // the ack is successful, the close observer will mark the
// channel as pending-closed in the channeldb. // channel as pending-closed in the channeldb.
select { breachEvent.ProcessACK(err)
case breachEvent.ProcessACK <- err:
// Bail if we failed to persist retribution info.
if err != nil {
return
}
case <-b.quit: // Bail if we failed to persist retribution info.
if err != nil {
return return
} }

@ -1059,9 +1059,12 @@ func TestBreachHandoffSuccess(t *testing.T) {
// Signal a spend of the funding transaction and wait for the close // Signal a spend of the funding transaction and wait for the close
// observer to exit. // observer to exit.
processACK := make(chan error)
breach := &ContractBreachEvent{ breach := &ContractBreachEvent{
ChanPoint: *chanPoint, ChanPoint: *chanPoint,
ProcessACK: make(chan error, 1), ProcessACK: func(brarErr error) {
processACK <- brarErr
},
BreachRetribution: &lnwallet.BreachRetribution{ BreachRetribution: &lnwallet.BreachRetribution{
BreachTransaction: bobClose.CloseTx, BreachTransaction: bobClose.CloseTx,
LocalOutputSignDesc: &input.SignDescriptor{ LocalOutputSignDesc: &input.SignDescriptor{
@ -1075,7 +1078,7 @@ func TestBreachHandoffSuccess(t *testing.T) {
// We'll also wait to consume the ACK back from the breach arbiter. // We'll also wait to consume the ACK back from the breach arbiter.
select { select {
case err := <-breach.ProcessACK: case err := <-processACK:
if err != nil { if err != nil {
t.Fatalf("handoff failed: %v", err) t.Fatalf("handoff failed: %v", err)
} }
@ -1092,8 +1095,10 @@ func TestBreachHandoffSuccess(t *testing.T) {
// already ACKed, the breach arbiter should immediately ACK and ignore // already ACKed, the breach arbiter should immediately ACK and ignore
// this event. // this event.
breach = &ContractBreachEvent{ breach = &ContractBreachEvent{
ChanPoint: *chanPoint, ChanPoint: *chanPoint,
ProcessACK: make(chan error, 1), ProcessACK: func(brarErr error) {
processACK <- brarErr
},
BreachRetribution: &lnwallet.BreachRetribution{ BreachRetribution: &lnwallet.BreachRetribution{
BreachTransaction: bobClose.CloseTx, BreachTransaction: bobClose.CloseTx,
LocalOutputSignDesc: &input.SignDescriptor{ LocalOutputSignDesc: &input.SignDescriptor{
@ -1108,7 +1113,7 @@ func TestBreachHandoffSuccess(t *testing.T) {
// We'll also wait to consume the ACK back from the breach arbiter. // We'll also wait to consume the ACK back from the breach arbiter.
select { select {
case err := <-breach.ProcessACK: case err := <-processACK:
if err != nil { if err != nil {
t.Fatalf("handoff failed: %v", err) t.Fatalf("handoff failed: %v", err)
} }
@ -1140,9 +1145,12 @@ func TestBreachHandoffFail(t *testing.T) {
// Signal the notifier to dispatch spend notifications of the funding // Signal the notifier to dispatch spend notifications of the funding
// transaction using the transaction from bob's closing summary. // transaction using the transaction from bob's closing summary.
chanPoint := alice.ChanPoint chanPoint := alice.ChanPoint
processACK := make(chan error)
breach := &ContractBreachEvent{ breach := &ContractBreachEvent{
ChanPoint: *chanPoint, ChanPoint: *chanPoint,
ProcessACK: make(chan error, 1), ProcessACK: func(brarErr error) {
processACK <- brarErr
},
BreachRetribution: &lnwallet.BreachRetribution{ BreachRetribution: &lnwallet.BreachRetribution{
BreachTransaction: bobClose.CloseTx, BreachTransaction: bobClose.CloseTx,
LocalOutputSignDesc: &input.SignDescriptor{ LocalOutputSignDesc: &input.SignDescriptor{
@ -1156,7 +1164,7 @@ func TestBreachHandoffFail(t *testing.T) {
// We'll also wait to consume the ACK back from the breach arbiter. // We'll also wait to consume the ACK back from the breach arbiter.
select { select {
case err := <-breach.ProcessACK: case err := <-processACK:
if err == nil { if err == nil {
t.Fatalf("breach write should have failed") t.Fatalf("breach write should have failed")
} }
@ -1181,8 +1189,10 @@ func TestBreachHandoffFail(t *testing.T) {
// Signal a spend of the funding transaction and wait for the close // Signal a spend of the funding transaction and wait for the close
// observer to exit. This time we are allowing the handoff to succeed. // observer to exit. This time we are allowing the handoff to succeed.
breach = &ContractBreachEvent{ breach = &ContractBreachEvent{
ChanPoint: *chanPoint, ChanPoint: *chanPoint,
ProcessACK: make(chan error, 1), ProcessACK: func(brarErr error) {
processACK <- brarErr
},
BreachRetribution: &lnwallet.BreachRetribution{ BreachRetribution: &lnwallet.BreachRetribution{
BreachTransaction: bobClose.CloseTx, BreachTransaction: bobClose.CloseTx,
LocalOutputSignDesc: &input.SignDescriptor{ LocalOutputSignDesc: &input.SignDescriptor{
@ -1196,7 +1206,7 @@ func TestBreachHandoffFail(t *testing.T) {
contractBreaches <- breach contractBreaches <- breach
select { select {
case err := <-breach.ProcessACK: case err := <-processACK:
if err != nil { if err != nil {
t.Fatalf("handoff failed: %v", err) t.Fatalf("handoff failed: %v", err)
} }
@ -1399,16 +1409,19 @@ func testBreachSpends(t *testing.T, test breachTest) {
t.Fatalf("unable to create breach retribution: %v", err) t.Fatalf("unable to create breach retribution: %v", err)
} }
processACK := make(chan error)
breach := &ContractBreachEvent{ breach := &ContractBreachEvent{
ChanPoint: *chanPoint, ChanPoint: *chanPoint,
ProcessACK: make(chan error, 1), ProcessACK: func(brarErr error) {
processACK <- brarErr
},
BreachRetribution: retribution, BreachRetribution: retribution,
} }
contractBreaches <- breach contractBreaches <- breach
// We'll also wait to consume the ACK back from the breach arbiter. // We'll also wait to consume the ACK back from the breach arbiter.
select { select {
case err := <-breach.ProcessACK: case err := <-processACK:
if err != nil { if err != nil {
t.Fatalf("handoff failed: %v", err) t.Fatalf("handoff failed: %v", err)
} }

@ -100,11 +100,14 @@ type ChainArbitratorConfig struct {
MarkLinkInactive func(wire.OutPoint) error MarkLinkInactive func(wire.OutPoint) error
// ContractBreach is a function closure that the ChainArbitrator will // ContractBreach is a function closure that the ChainArbitrator will
// use to notify the breachArbiter about a contract breach. It should // use to notify the breachArbiter about a contract breach. A callback
// only return a non-nil error when the breachArbiter has preserved the // should be passed that when called will mark the channel pending
// necessary breach info for this channel point, and it is safe to mark // close in the databae. It should only return a non-nil error when the
// the channel as pending close in the database. // breachArbiter has preserved the necessary breach info for this
ContractBreach func(wire.OutPoint, *lnwallet.BreachRetribution) error // channel point, and the callback has succeeded, meaning it is safe to
// stop watching the channel.
ContractBreach func(wire.OutPoint, *lnwallet.BreachRetribution,
func() error) error
// IsOurAddress is a function that returns true if the passed address // IsOurAddress is a function that returns true if the passed address
// is known to the underlying wallet. Otherwise, false should be // is known to the underlying wallet. Otherwise, false should be
@ -488,8 +491,12 @@ func (c *ChainArbitrator) Start() error {
notifier: c.cfg.Notifier, notifier: c.cfg.Notifier,
signer: c.cfg.Signer, signer: c.cfg.Signer,
isOurAddr: c.cfg.IsOurAddress, isOurAddr: c.cfg.IsOurAddress,
contractBreach: func(retInfo *lnwallet.BreachRetribution) error { contractBreach: func(retInfo *lnwallet.BreachRetribution,
return c.cfg.ContractBreach(chanPoint, retInfo) markClosed func() error) error {
return c.cfg.ContractBreach(
chanPoint, retInfo, markClosed,
)
}, },
extractStateNumHint: lnwallet.GetStateNumHint, extractStateNumHint: lnwallet.GetStateNumHint,
}, },
@ -1078,8 +1085,12 @@ func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error
notifier: c.cfg.Notifier, notifier: c.cfg.Notifier,
signer: c.cfg.Signer, signer: c.cfg.Signer,
isOurAddr: c.cfg.IsOurAddress, isOurAddr: c.cfg.IsOurAddress,
contractBreach: func(retInfo *lnwallet.BreachRetribution) error { contractBreach: func(retInfo *lnwallet.BreachRetribution,
return c.cfg.ContractBreach(chanPoint, retInfo) markClosed func() error) error {
return c.cfg.ContractBreach(
chanPoint, retInfo, markClosed,
)
}, },
extractStateNumHint: lnwallet.GetStateNumHint, extractStateNumHint: lnwallet.GetStateNumHint,
}, },

@ -150,10 +150,13 @@ type chainWatcherConfig struct {
signer input.Signer signer input.Signer
// contractBreach is a method that will be called by the watcher if it // contractBreach is a method that will be called by the watcher if it
// detects that a contract breach transaction has been confirmed. Only // detects that a contract breach transaction has been confirmed. A
// when this method returns with a non-nil error it will be safe to mark // callback should be passed that when called will mark the channel
// the channel as pending close in the database. // pending close in the database. It will only return a non-nil error
contractBreach func(*lnwallet.BreachRetribution) error // when the breachArbiter has preserved the necessary breach info for
// this channel point, and the callback has succeeded, meaning it is
// safe to stop watching the channel.
contractBreach func(*lnwallet.BreachRetribution, func() error) error
// isOurAddr is a function that returns true if the passed address is // isOurAddr is a function that returns true if the passed address is
// known to us. // known to us.
@ -1121,19 +1124,6 @@ func (c *chainWatcher) dispatchContractBreach(spendEvent *chainntnfs.SpendDetail
return spew.Sdump(retribution) return spew.Sdump(retribution)
})) }))
// Hand the retribution info over to the breach arbiter.
if err := c.cfg.contractBreach(retribution); err != nil {
log.Errorf("unable to hand breached contract off to "+
"breachArbiter: %v", err)
return err
}
// At this point, we've successfully received an ack for the breach
// close. We now construct and persist the close summary, marking the
// channel as pending force closed.
//
// TODO(roasbeef): instead mark we got all the monies?
// TODO(halseth): move responsibility to breach arbiter?
settledBalance := remoteCommit.LocalBalance.ToSatoshis() settledBalance := remoteCommit.LocalBalance.ToSatoshis()
closeSummary := channeldb.ChannelCloseSummary{ closeSummary := channeldb.ChannelCloseSummary{
ChanPoint: c.cfg.chanState.FundingOutpoint, ChanPoint: c.cfg.chanState.FundingOutpoint,
@ -1160,14 +1150,31 @@ func (c *chainWatcher) dispatchContractBreach(spendEvent *chainntnfs.SpendDetail
closeSummary.LastChanSyncMsg = chanSync closeSummary.LastChanSyncMsg = chanSync
} }
if err := c.cfg.chanState.CloseChannel( // We create a function closure that will mark the channel as pending
&closeSummary, channeldb.ChanStatusRemoteCloseInitiator, // close in the database. We pass it to the contracBreach method such
); err != nil { // that it can ensure safe handoff of the breach before we close the
return err // channel.
markClosed := func() error {
// At this point, we've successfully received an ack for the
// breach close, and we can mark the channel as pending force
// closed.
if err := c.cfg.chanState.CloseChannel(
&closeSummary, channeldb.ChanStatusRemoteCloseInitiator,
); err != nil {
return err
}
log.Infof("Breached channel=%v marked pending-closed",
c.cfg.chanState.FundingOutpoint)
return nil
} }
log.Infof("Breached channel=%v marked pending-closed", // Hand the retribution info over to the breach arbiter.
c.cfg.chanState.FundingOutpoint) if err := c.cfg.contractBreach(retribution, markClosed); err != nil {
log.Errorf("unable to hand breached contract off to "+
"breachArbiter: %v", err)
return err
}
// With the event processed and channel closed, we'll now notify all // With the event processed and channel closed, we'll now notify all
// subscribers of the event. // subscribers of the event.

@ -949,10 +949,26 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
}, },
IsOurAddress: cc.Wallet.IsOurAddress, IsOurAddress: cc.Wallet.IsOurAddress,
ContractBreach: func(chanPoint wire.OutPoint, ContractBreach: func(chanPoint wire.OutPoint,
breachRet *lnwallet.BreachRetribution) error { breachRet *lnwallet.BreachRetribution,
markClosed func() error) error {
// processACK will handle the breachArbiter ACKing the
// event.
finalErr := make(chan error, 1)
processACK := func(brarErr error) {
if brarErr != nil {
finalErr <- brarErr
return
}
// If the breachArbiter successfully handled
// the event, we can mark the channel closed.
finalErr <- markClosed()
}
event := &ContractBreachEvent{ event := &ContractBreachEvent{
ChanPoint: chanPoint, ChanPoint: chanPoint,
ProcessACK: make(chan error, 1), ProcessACK: processACK,
BreachRetribution: breachRet, BreachRetribution: breachRet,
} }
@ -963,9 +979,11 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
return ErrServerShuttingDown return ErrServerShuttingDown
} }
// Wait for the breachArbiter to ACK the event. // We'll wait for a final error to be available, either
// from the breachArbiter or from our markClosed
// function closure.
select { select {
case err := <-event.ProcessACK: case err := <-finalErr:
return err return err
case <-s.quit: case <-s.quit:
return ErrServerShuttingDown return ErrServerShuttingDown