breacharbiter: receive breaches to ACK on separate channel

This commit changes how the breachArbiter gets notified about channel
breaches. Previously it would need to SubscribeToChannelEvents to get
get notified if any breach happened, now we send all seen breaches on a
new channel ContractBreaches.

By having the breachArbiter subscribe to channel events, we risked
events getting lost when we were either starting up or shutting down,
since events could happen before we had been able to subscribe, or right
after we had cancelled our subscription.

Now it is the server's responsibility to reliably forward events from
the ChainArbitrator to the breachArbiter, and forward the ACK the
breachArbiter responds with. This makes sure that the messages aren't
lost in the event of starting up or shutting down, since the connection
between the subsystems now are static.

A result of this change is that the internals of the breachArbiter can
be simplified significantly, as we will get all channel breaches
forwarded on one channel. This lets us get rid of the observer
goroutines, and we spin up goroutines handling the channel breaches only
when they happen.
This commit is contained in:
Johan T. Halseth 2018-04-18 13:56:14 +02:00
parent 79341fc63e
commit b9970aec47
No known key found for this signature in database
GPG Key ID: 15BAADA29DA20D26

@ -12,7 +12,6 @@ import (
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/roasbeef/btcd/blockchain"
@ -37,6 +36,25 @@ var (
justiceTxnBucket = []byte("justice-txn")
)
// ContractBreachEvent is an event the breachArbiter will receive in case a
// contract breach is observed on-chain. It contains the necessary information
// to handle the breach, and a ProcessACK channel we will use to ACK the event
// when we have safely stored all the necessary information.
type ContractBreachEvent struct {
// ChanPoint is the channel point of the breached channel.
ChanPoint wire.OutPoint
// ProcessACK is an error channel where a nil error should be sent
// iff the breach retribution info is safely stored in the retribution
// store. In case storing the information to the store fails, a non-nil
// error should be sent.
ProcessACK chan error
// BreachRetribution is the information needed to act on this contract
// breach.
BreachRetribution *lnwallet.BreachRetribution
}
// BreachConfig bundles the required subsystems used by the breach arbiter. An
// instance of BreachConfig is passed to newBreachArbiter during instantiation.
type BreachConfig struct {
@ -67,10 +85,11 @@ type BreachConfig struct {
// transaction to the network.
PublishTransaction func(*wire.MsgTx) error
// SubscribeChannelEvents is a function closure that allows goroutines
// within the breachArbiter to be notified of potential on-chain events
// related to the channels they're watching.
SubscribeChannelEvents func(wire.OutPoint) (*contractcourt.ChainEventSubscription, error)
// ContractBreaches is a channel where the breachArbiter will receive
// notifications in the event of a contract breach being observed. A
// ContractBreachEvent must be ACKed by the breachArbiter, such that
// the sending subsystem knows that the event is properly handed off.
ContractBreaches <-chan *ContractBreachEvent
// Signer is used by the breach arbiter to generate sweep transactions,
// which move coins from previously open channels back to the user's
@ -97,45 +116,17 @@ type breachArbiter struct {
cfg *BreachConfig
// breachObservers is a map which tracks all the active breach
// observers we're currently managing. The key of the map is the
// funding outpoint of the channel, and the value is a channel which
// will be closed once we detect that the channel has been
// cooperatively closed, thereby killing the goroutine and freeing up
// resources.
breachObservers map[wire.OutPoint]chan struct{}
// breachedContracts is a channel which is used internally within the
// struct to send the necessary information required to punish a
// counterparty once a channel breach is detected. Breach observers
// use this to communicate with the main contractObserver goroutine.
breachedContracts chan *retributionInfo
// settledContracts is a channel by outside subsystems to notify
// the breachArbiter that a channel has peacefully been closed. Once a
// channel has been closed the arbiter no longer needs to watch for
// breach closes.
settledContracts chan wire.OutPoint
// newContracts is a channel which is used by outside subsystems to
// notify the breachArbiter of a new contract (a channel) that should
// be watched.
newContracts chan wire.OutPoint
quit chan struct{}
wg sync.WaitGroup
sync.Mutex
}
// newBreachArbiter creates a new instance of a breachArbiter initialized with
// its dependent objects.
func newBreachArbiter(cfg *BreachConfig) *breachArbiter {
return &breachArbiter{
cfg: cfg,
breachObservers: make(map[wire.OutPoint]chan struct{}),
breachedContracts: make(chan *retributionInfo),
newContracts: make(chan wire.OutPoint),
settledContracts: make(chan wire.OutPoint),
quit: make(chan struct{}),
cfg: cfg,
quit: make(chan struct{}),
}
}
@ -191,49 +182,6 @@ func (b *breachArbiter) Start() error {
}
}
// We need to query that database state for all currently active
// channels, these channels will represent a super set of all channels
// that may be assigned a go routine to monitor for channel breaches.
activeChannels, err := b.cfg.DB.FetchAllChannels()
if err != nil && err != channeldb.ErrNoActiveChannels {
brarLog.Errorf("unable to fetch active channels: %v", err)
return err
}
nActive := len(activeChannels)
if nActive > 0 {
brarLog.Infof("Retrieved %v channels from database, watching "+
"with vigilance!", nActive)
}
// Here we will determine a set of channels that will need to be managed
// by the contractObserver. This should comprise all active channels
// that have not been breached. If the channel point has an entry in the
// retribution store, we skip it to avoid creating a breach observer.
// Resolving breached channels will be handled later by spawning an
// exactRetribution task for each.
channelsToWatch := make([]*contractcourt.ChainEventSubscription, 0, nActive)
for _, chanState := range activeChannels {
// If this channel was previously breached, we skip it here to
// avoid creating a breach observer, as we can go straight to
// the task of exacting retribution.
chanPoint := chanState.FundingOutpoint
if _, ok := breachRetInfos[chanPoint]; ok {
continue
}
// For each active channels, we'll request a chain event
// subscription form the system that's overseeing the channel.
chainEvents, err := b.cfg.SubscribeChannelEvents(chanPoint)
if err != nil {
return err
}
// Finally, add this channel event stream to breach arbiter's
// list of channels to watch.
channelsToWatch = append(channelsToWatch, chainEvents)
}
// Spawn the exactRetribution tasks to monitor and resolve any breaches
// that were loaded from the retribution store.
for chanPoint := range breachRetInfos {
@ -258,7 +206,7 @@ func (b *breachArbiter) Start() error {
// Start watching the remaining active channels!
b.wg.Add(1)
go b.contractObserver(channelsToWatch)
go b.contractObserver()
return nil
}
@ -286,127 +234,32 @@ func (b *breachArbiter) IsBreached(chanPoint *wire.OutPoint) (bool, error) {
}
// contractObserver is the primary goroutine for the breachArbiter. This
// goroutine is responsible for managing goroutines that watch for breaches for
// all current active and newly created channels. If a channel breach is
// detected by a spawned child goroutine, then the contractObserver will
// execute the retribution logic required to sweep ALL outputs from a contested
// channel into the daemon's wallet.
// goroutine is responsible for handling breach events coming from the
// contractcourt on the ContractBreaches channel. If a channel breach is
// detected, then the contractObserver will execute the retribution logic
// required to sweep ALL outputs from a contested channel into the daemon's
// wallet.
//
// NOTE: This MUST be run as a goroutine.
func (b *breachArbiter) contractObserver(channelEvents []*contractcourt.ChainEventSubscription) {
func (b *breachArbiter) contractObserver() {
defer b.wg.Done()
brarLog.Infof("Starting contract observer with %v active channels",
len(channelEvents))
brarLog.Infof("Starting contract observer, watching for breaches.")
// For each active channel found within the database, we launch a
// detected breachObserver goroutine for that channel and also track
// the new goroutine within the breachObservers map so we can cancel it
// later if necessary.
for _, channelEvent := range channelEvents {
settleSignal := make(chan struct{})
chanPoint := channelEvent.ChanPoint
b.breachObservers[chanPoint] = settleSignal
b.wg.Add(1)
go b.breachObserver(channelEvent, settleSignal)
}
// TODO(roasbeef): need to ensure currentHeight passed in doesn't
// result in lost notification
out:
for {
select {
case breachInfo := <-b.breachedContracts:
// A new channel contract has just been breached! We
// first register for a notification to be dispatched
// once the breach transaction (the revoked commitment
// transaction) has been confirmed in the chain to
// ensure we're not dealing with a moving target.
breachTXID := &breachInfo.commitHash
cfChan, err := b.cfg.Notifier.RegisterConfirmationsNtfn(
breachTXID, 1, breachInfo.breachHeight)
if err != nil {
brarLog.Errorf("unable to register for conf "+
"updates for txid: %v, err: %v",
breachTXID, err)
continue
}
brarLog.Warnf("A channel has been breached with "+
"txid: %v. Waiting for confirmation, then "+
"justice will be served!", breachTXID)
// With the retribution state persisted, channel close
// persisted, and notification registered, we launch a
// new goroutine which will finalize the channel
// retribution after the breach transaction has been
// confirmed.
case breachEvent := <-b.cfg.ContractBreaches:
// We have been notified about a contract breach!
// Handle the handoff, making sure we ACK the event
// after we have safely added it to the retribution
// store.
b.wg.Add(1)
go b.exactRetribution(cfChan, breachInfo)
go b.handleBreachHandoff(breachEvent)
delete(b.breachObservers, breachInfo.chanPoint)
case chanPoint := <-b.newContracts:
// A new channel has just been opened within the
// daemon, so we launch a new breachObserver to handle
// the detection of attempted contract breaches.
settleSignal := make(chan struct{})
// If the contract is already being watched, then an
// additional send indicates we have a stale version of
// the contract. So we'll cancel active watcher
// goroutine to create a new instance with the latest
// contract reference.
if oldSignal, ok := b.breachObservers[chanPoint]; ok {
brarLog.Infof("ChannelPoint(%v) is now live, "+
"abandoning state contract for live "+
"version", chanPoint)
close(oldSignal)
}
b.breachObservers[chanPoint] = settleSignal
brarLog.Debugf("New contract detected, launching " +
"breachObserver")
chainEvents, err := b.cfg.SubscribeChannelEvents(chanPoint)
if err != nil {
// TODO(roasbeef); panic?
brarLog.Errorf("unable to register for event "+
"sub for chan_point=%v: %v", chanPoint, err)
}
b.wg.Add(1)
go b.breachObserver(chainEvents, settleSignal)
case chanPoint := <-b.settledContracts:
// A new channel has been closed either unilaterally or
// cooperatively, as a result we no longer need a
// breachObserver detected to the channel.
killSignal, ok := b.breachObservers[chanPoint]
if !ok {
brarLog.Errorf("Unable to find contract: %v",
chanPoint)
continue
}
brarLog.Debugf("ChannelPoint(%v) has been settled, "+
"cancelling breachObserver", chanPoint)
// If we had a breachObserver active, then we signal it
// for exit and also delete its state from our tracking
// map.
close(killSignal)
delete(b.breachObservers, chanPoint)
case <-b.quit:
break out
return
}
}
return
}
// convertToSecondLevelRevoke takes a breached output, and a transaction that
@ -675,116 +528,123 @@ secondLevelCheck:
}
}
// breachObserver notifies the breachArbiter contract observer goroutine that a
// channel's contract has been breached by the prior counterparty. Once
// notified the breachArbiter will attempt to sweep ALL funds within the
// channel using the information provided within the BreachRetribution
// generated due to the breach of channel contract. The funds will be swept
// only after the breaching transaction receives a necessary number of
// confirmations.
func (b *breachArbiter) breachObserver(
chainEvents *contractcourt.ChainEventSubscription,
settleSignal chan struct{}) {
// handleBreachHandoff handles a new breach event, by writing it to disk, then
// notifies the breachArbiter contract observer goroutine that a channel's
// contract has been breached by the prior counterparty. Once notified the
// breachArbiter will attempt to sweep ALL funds within the channel using the
// information provided within the BreachRetribution generated due to the
// breach of channel contract. The funds will be swept only after the breaching
// transaction receives a necessary number of confirmations.
//
// NOTE: This MUST be run as a goroutine.
func (b *breachArbiter) handleBreachHandoff(breachEvent *ContractBreachEvent) {
defer b.wg.Done()
defer func() {
b.wg.Done()
chainEvents.Cancel()
}()
chanPoint := chainEvents.ChanPoint
brarLog.Debugf("Breach observer for ChannelPoint(%v) started ",
chanPoint := breachEvent.ChanPoint
brarLog.Debugf("Handling breach handoff for ChannelPoint(%v)",
chanPoint)
gracefullyExit := func() {
// Launch a goroutine to cancel out this contract within the
// breachArbiter's main goroutine.
b.wg.Add(1)
go func() {
defer b.wg.Done()
select {
case b.settledContracts <- chanPoint:
case <-b.quit:
}
}()
b.cfg.CloseLink(&chanPoint, htlcswitch.CloseBreach)
}
select {
// A read from this channel indicates that the contract has been
// settled cooperatively so we exit as our duties are no longer needed.
case <-settleSignal:
return
// The channel has been closed cooperatively, so we're done here.
case <-chainEvents.CooperativeClosure:
gracefullyExit()
// The channel has been closed by a normal means: force closing with
// the latest commitment transaction.
case <-chainEvents.LocalUnilateralClosure:
gracefullyExit()
case <-chainEvents.RemoteUnilateralClosure:
gracefullyExit()
// A read from this channel indicates that a channel breach has been
// detected! So we notify the main coordination goroutine with the
// information needed to bring the counterparty to justice.
case breachInfo := <-chainEvents.ContractBreach:
brarLog.Warnf("REVOKED STATE #%v FOR ChannelPoint(%v) "+
"broadcast, REMOTE PEER IS DOING SOMETHING "+
"SKETCHY!!!", breachInfo.RevokedStateNum,
chanPoint)
breachInfo := breachEvent.BreachRetribution
brarLog.Warnf("REVOKED STATE #%v FOR ChannelPoint(%v) "+
"broadcast, REMOTE PEER IS DOING SOMETHING "+
"SKETCHY!!!", breachInfo.RevokedStateNum,
chanPoint)
// Immediately notify the HTLC switch that this link has been
// breached in order to ensure any incoming or outgoing
// multi-hop HTLCs aren't sent over this link, nor any other
// links associated with this peer.
b.cfg.CloseLink(&chanPoint, htlcswitch.CloseBreach)
// Immediately notify the HTLC switch that this link has been
// breached in order to ensure any incoming or outgoing
// multi-hop HTLCs aren't sent over this link, nor any other
// links associated with this peer.
b.cfg.CloseLink(&chanPoint, htlcswitch.CloseBreach)
// TODO(roasbeef): need to handle case of remote broadcast
// mid-local initiated state-transition, possible
// false-positive?
// TODO(roasbeef): need to handle case of remote broadcast
// mid-local initiated state-transition, possible
// false-positive?
// Using the breach information provided by the wallet and the
// channel snapshot, construct the retribution information that
// will be persisted to disk.
retInfo := newRetributionInfo(&chanPoint, breachInfo)
// Acquire the mutex to ensure consistency between the call to
// IsBreached and Add below.
b.Lock()
// Persist the pending retribution state to disk.
err := b.cfg.Store.Add(retInfo)
// We first check if this breach info is already added to the
// retribution store.
breached, err := b.cfg.Store.IsBreached(&chanPoint)
if err != nil {
b.Unlock()
brarLog.Errorf("unable to check breach info in DB: %v", err)
select {
case breachEvent.ProcessACK <- err:
case <-b.quit:
}
return
}
// If this channel is already marked as breached in the retribution
// store, we already have handled the handoff for this breach. In this
// case we can safely ACK the handoff, and return.
if breached {
b.Unlock()
select {
case breachEvent.ProcessACK <- nil:
case <-b.quit:
}
return
}
// Using the breach information provided by the wallet and the
// channel snapshot, construct the retribution information that
// will be persisted to disk.
retInfo := newRetributionInfo(&chanPoint, breachInfo)
// Persist the pending retribution state to disk.
err = b.cfg.Store.Add(retInfo)
b.Unlock()
if err != nil {
brarLog.Errorf("unable to persist retribution "+
"info to db: %v", err)
}
// Now that the breach has been persisted, try to send an
// acknowledgment back to the close observer with the error. If
// the ack is successful, the close observer will mark the
// channel as pending-closed in the channeldb.
select {
case breachEvent.ProcessACK <- err:
// Bail if we failed to persist retribution info.
if err != nil {
brarLog.Errorf("unable to persist retribution "+
"info to db: %v", err)
}
// Now that the breach has been persisted, try to send an
// acknowledgment back to the close observer with the error. If
// the ack is successful, the close observer will mark the
// channel as pending-closed in the channeldb.
select {
case chainEvents.ProcessACK <- err:
// Bail if we failed to persist retribution info.
if err != nil {
return
}
case <-b.quit:
return
}
// Finally, we send the retribution information into the
// breachArbiter event loop to deal swift justice.
select {
case b.breachedContracts <- retInfo:
case <-b.quit:
}
case <-b.quit:
return
}
// Now that a new channel contract has been added to the retribution
// store, we first register for a notification to be dispatched once
// the breach transaction (the revoked commitment transaction) has been
// confirmed in the chain to ensure we're not dealing with a moving
// target.
breachTXID := &retInfo.commitHash
cfChan, err := b.cfg.Notifier.RegisterConfirmationsNtfn(breachTXID, 1,
retInfo.breachHeight)
if err != nil {
brarLog.Errorf("unable to register for conf updates for "+
"txid: %v, err: %v", breachTXID, err)
return
}
brarLog.Warnf("A channel has been breached with txid: %v. Waiting "+
"for confirmation, then justice will be served!", breachTXID)
// With the retribution state persisted, channel close persisted, and
// notification registered, we launch a new goroutine which will
// finalize the channel retribution after the breach transaction has
// been confirmed.
b.wg.Add(1)
go b.exactRetribution(cfChan, retInfo)
}
// SpendableOutput an interface which can be used by the breach arbiter to