contractcourt/chain_watcher: make breachArbiter handoff a function closure

This commit changes how the ChainWatcher notifies the breachArbiter
about a channel breach. Instead of assuming the breachArbiter is among
the clients subscibing to channel events, it will call a new method
contractBreach(), and assume the breachArbiter has reliably gotten the
breach info when this method returns with a non-nil error.

Since the breachArbiter was the only sybsystem having a sync chain
subsciption, we also remove the (now) unused syncDispatch option.
This commit is contained in:
Johan T. Halseth 2018-04-18 13:41:03 +02:00
parent a6f0dd72ac
commit 3fdc04dff0
No known key found for this signature in database
GPG Key ID: 15BAADA29DA20D26

@ -51,15 +51,6 @@ type ChainEventSubscription struct {
// material required to bring the cheating channel peer to justice. // material required to bring the cheating channel peer to justice.
ContractBreach chan *lnwallet.BreachRetribution ContractBreach chan *lnwallet.BreachRetribution
// ProcessACK is a channel that will be used by the chainWatcher to
// synchronize dispatch and processing of the notification with the act
// of updating the state of the channel on disk. This ensures that the
// event can be reliably handed off.
//
// NOTE: This channel will only be used if the syncDispatch arg passed
// into the constructor is true.
ProcessACK chan error
// Cancel cancels the subscription to the event stream for a particular // Cancel cancels the subscription to the event stream for a particular
// channel. This method should be called once the caller no longer needs to // channel. This method should be called once the caller no longer needs to
// be notified of any on-chain events for a particular channel. // be notified of any on-chain events for a particular channel.
@ -93,6 +84,12 @@ type chainWatcherConfig struct {
// confirmed. // confirmed.
markChanClosed func() error markChanClosed func() error
// contractBreach is a method that will be called by the watcher if it
// detects that a contract breach transaction has been confirmed. Only
// when this method returns with a non-nil error it will be safe to mark
// the channel as pending close in the database.
contractBreach func(*lnwallet.BreachRetribution) error
// isOurAddr is a function that returns true if the passed address is // isOurAddr is a function that returns true if the passed address is
// known to us. // known to us.
isOurAddr func(btcutil.Address) bool isOurAddr func(btcutil.Address) bool
@ -219,12 +216,8 @@ func (c *chainWatcher) Stop() error {
// SubscribeChannelEvents returns an active subscription to the set of channel // SubscribeChannelEvents returns an active subscription to the set of channel
// events for the channel watched by this chain watcher. Once clients no longer // events for the channel watched by this chain watcher. Once clients no longer
// require the subscription, they should call the Cancel() method to allow the // require the subscription, they should call the Cancel() method to allow the
// watcher to regain those committed resources. The syncDispatch bool indicates // watcher to regain those committed resources.
// if the caller would like a synchronous dispatch of the notification. This func (c *chainWatcher) SubscribeChannelEvents() *ChainEventSubscription {
// means that the main chain watcher goroutine won't proceed with
// post-processing after the notification until the ProcessACK channel is sent
// upon.
func (c *chainWatcher) SubscribeChannelEvents(syncDispatch bool) *ChainEventSubscription {
c.Lock() c.Lock()
clientID := c.clientID clientID := c.clientID
@ -248,34 +241,6 @@ func (c *chainWatcher) SubscribeChannelEvents(syncDispatch bool) *ChainEventSubs
}, },
} }
if syncDispatch {
sub.ProcessACK = make(chan error, 1)
// If this client is syncDispatch, we cannot safely delete it
// from our list of clients. This is because of a potential
// race at shutdown, where the client shuts down and calls
// Cancel(). In this case we must make sure the ChainWatcher
// doesn't think it has successfully handed off a contract
// breach to the client. We start a goroutine that will send an
// error on the ProcessACK channel until the ChainWatcher is
// shutdown.
sub.Cancel = func() {
c.wg.Add(1)
go func() {
defer c.wg.Done()
err := fmt.Errorf("cancelled")
for {
select {
case sub.ProcessACK <- err:
case <-c.quit:
return
}
}
}()
}
}
c.Lock() c.Lock()
c.clientSubscriptions[clientID] = sub c.clientSubscriptions[clientID] = sub
c.Unlock() c.Unlock()
@ -703,6 +668,13 @@ func (c *chainWatcher) dispatchContractBreach(spendEvent *chainntnfs.SpendDetail
return spew.Sdump(retribution) return spew.Sdump(retribution)
})) }))
// Hand the retribution info over to the breach arbiter.
if err := c.cfg.contractBreach(retribution); err != nil {
log.Errorf("unable to hand breached contract off to "+
"breachArbiter: %v", err)
return err
}
// With the event processed, we'll now notify all subscribers of the // With the event processed, we'll now notify all subscribers of the
// event. // event.
c.Lock() c.Lock()
@ -713,25 +685,6 @@ func (c *chainWatcher) dispatchContractBreach(spendEvent *chainntnfs.SpendDetail
c.Unlock() c.Unlock()
return fmt.Errorf("quitting") return fmt.Errorf("quitting")
} }
// Wait for the breach arbiter to ACK the handoff before
// marking the channel as pending force closed in channeldb,
// but only if the client requested a sync dispatch.
if sub.ProcessACK != nil {
select {
case err := <-sub.ProcessACK:
// Bail if the handoff failed.
if err != nil {
c.Unlock()
return fmt.Errorf("unable to handoff "+
"retribution info: %v", err)
}
case <-c.quit:
c.Unlock()
return fmt.Errorf("quitting")
}
}
} }
c.Unlock() c.Unlock()