diff --git a/breacharbiter.go b/breacharbiter.go index 33e1e3e7..a5d12c88 100644 --- a/breacharbiter.go +++ b/breacharbiter.go @@ -12,7 +12,6 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" - "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/lnwallet" "github.com/roasbeef/btcd/blockchain" @@ -37,6 +36,25 @@ var ( justiceTxnBucket = []byte("justice-txn") ) +// ContractBreachEvent is an event the breachArbiter will receive in case a +// contract breach is observed on-chain. It contains the necessary information +// to handle the breach, and a ProcessACK channel we will use to ACK the event +// when we have safely stored all the necessary information. +type ContractBreachEvent struct { + // ChanPoint is the channel point of the breached channel. + ChanPoint wire.OutPoint + + // ProcessACK is an error channel where a nil error should be sent + // iff the breach retribution info is safely stored in the retribution + // store. In case storing the information to the store fails, a non-nil + // error should be sent. + ProcessACK chan error + + // BreachRetribution is the information needed to act on this contract + // breach. + BreachRetribution *lnwallet.BreachRetribution +} + // BreachConfig bundles the required subsystems used by the breach arbiter. An // instance of BreachConfig is passed to newBreachArbiter during instantiation. type BreachConfig struct { @@ -67,10 +85,11 @@ type BreachConfig struct { // transaction to the network. PublishTransaction func(*wire.MsgTx) error - // SubscribeChannelEvents is a function closure that allows goroutines - // within the breachArbiter to be notified of potential on-chain events - // related to the channels they're watching. - SubscribeChannelEvents func(wire.OutPoint) (*contractcourt.ChainEventSubscription, error) + // ContractBreaches is a channel where the breachArbiter will receive + // notifications in the event of a contract breach being observed. A + // ContractBreachEvent must be ACKed by the breachArbiter, such that + // the sending subsystem knows that the event is properly handed off. + ContractBreaches <-chan *ContractBreachEvent // Signer is used by the breach arbiter to generate sweep transactions, // which move coins from previously open channels back to the user's @@ -97,45 +116,17 @@ type breachArbiter struct { cfg *BreachConfig - // breachObservers is a map which tracks all the active breach - // observers we're currently managing. The key of the map is the - // funding outpoint of the channel, and the value is a channel which - // will be closed once we detect that the channel has been - // cooperatively closed, thereby killing the goroutine and freeing up - // resources. - breachObservers map[wire.OutPoint]chan struct{} - - // breachedContracts is a channel which is used internally within the - // struct to send the necessary information required to punish a - // counterparty once a channel breach is detected. Breach observers - // use this to communicate with the main contractObserver goroutine. - breachedContracts chan *retributionInfo - - // settledContracts is a channel by outside subsystems to notify - // the breachArbiter that a channel has peacefully been closed. Once a - // channel has been closed the arbiter no longer needs to watch for - // breach closes. - settledContracts chan wire.OutPoint - - // newContracts is a channel which is used by outside subsystems to - // notify the breachArbiter of a new contract (a channel) that should - // be watched. - newContracts chan wire.OutPoint - quit chan struct{} wg sync.WaitGroup + sync.Mutex } // newBreachArbiter creates a new instance of a breachArbiter initialized with // its dependent objects. func newBreachArbiter(cfg *BreachConfig) *breachArbiter { return &breachArbiter{ - cfg: cfg, - breachObservers: make(map[wire.OutPoint]chan struct{}), - breachedContracts: make(chan *retributionInfo), - newContracts: make(chan wire.OutPoint), - settledContracts: make(chan wire.OutPoint), - quit: make(chan struct{}), + cfg: cfg, + quit: make(chan struct{}), } } @@ -191,49 +182,6 @@ func (b *breachArbiter) Start() error { } } - // We need to query that database state for all currently active - // channels, these channels will represent a super set of all channels - // that may be assigned a go routine to monitor for channel breaches. - activeChannels, err := b.cfg.DB.FetchAllChannels() - if err != nil && err != channeldb.ErrNoActiveChannels { - brarLog.Errorf("unable to fetch active channels: %v", err) - return err - } - - nActive := len(activeChannels) - if nActive > 0 { - brarLog.Infof("Retrieved %v channels from database, watching "+ - "with vigilance!", nActive) - } - - // Here we will determine a set of channels that will need to be managed - // by the contractObserver. This should comprise all active channels - // that have not been breached. If the channel point has an entry in the - // retribution store, we skip it to avoid creating a breach observer. - // Resolving breached channels will be handled later by spawning an - // exactRetribution task for each. - channelsToWatch := make([]*contractcourt.ChainEventSubscription, 0, nActive) - for _, chanState := range activeChannels { - // If this channel was previously breached, we skip it here to - // avoid creating a breach observer, as we can go straight to - // the task of exacting retribution. - chanPoint := chanState.FundingOutpoint - if _, ok := breachRetInfos[chanPoint]; ok { - continue - } - - // For each active channels, we'll request a chain event - // subscription form the system that's overseeing the channel. - chainEvents, err := b.cfg.SubscribeChannelEvents(chanPoint) - if err != nil { - return err - } - - // Finally, add this channel event stream to breach arbiter's - // list of channels to watch. - channelsToWatch = append(channelsToWatch, chainEvents) - } - // Spawn the exactRetribution tasks to monitor and resolve any breaches // that were loaded from the retribution store. for chanPoint := range breachRetInfos { @@ -258,7 +206,7 @@ func (b *breachArbiter) Start() error { // Start watching the remaining active channels! b.wg.Add(1) - go b.contractObserver(channelsToWatch) + go b.contractObserver() return nil } @@ -286,127 +234,32 @@ func (b *breachArbiter) IsBreached(chanPoint *wire.OutPoint) (bool, error) { } // contractObserver is the primary goroutine for the breachArbiter. This -// goroutine is responsible for managing goroutines that watch for breaches for -// all current active and newly created channels. If a channel breach is -// detected by a spawned child goroutine, then the contractObserver will -// execute the retribution logic required to sweep ALL outputs from a contested -// channel into the daemon's wallet. +// goroutine is responsible for handling breach events coming from the +// contractcourt on the ContractBreaches channel. If a channel breach is +// detected, then the contractObserver will execute the retribution logic +// required to sweep ALL outputs from a contested channel into the daemon's +// wallet. // // NOTE: This MUST be run as a goroutine. -func (b *breachArbiter) contractObserver(channelEvents []*contractcourt.ChainEventSubscription) { - +func (b *breachArbiter) contractObserver() { defer b.wg.Done() - brarLog.Infof("Starting contract observer with %v active channels", - len(channelEvents)) + brarLog.Infof("Starting contract observer, watching for breaches.") - // For each active channel found within the database, we launch a - // detected breachObserver goroutine for that channel and also track - // the new goroutine within the breachObservers map so we can cancel it - // later if necessary. - for _, channelEvent := range channelEvents { - settleSignal := make(chan struct{}) - chanPoint := channelEvent.ChanPoint - b.breachObservers[chanPoint] = settleSignal - - b.wg.Add(1) - go b.breachObserver(channelEvent, settleSignal) - } - - // TODO(roasbeef): need to ensure currentHeight passed in doesn't - // result in lost notification - -out: for { select { - case breachInfo := <-b.breachedContracts: - // A new channel contract has just been breached! We - // first register for a notification to be dispatched - // once the breach transaction (the revoked commitment - // transaction) has been confirmed in the chain to - // ensure we're not dealing with a moving target. - breachTXID := &breachInfo.commitHash - cfChan, err := b.cfg.Notifier.RegisterConfirmationsNtfn( - breachTXID, 1, breachInfo.breachHeight) - if err != nil { - brarLog.Errorf("unable to register for conf "+ - "updates for txid: %v, err: %v", - breachTXID, err) - continue - } - - brarLog.Warnf("A channel has been breached with "+ - "txid: %v. Waiting for confirmation, then "+ - "justice will be served!", breachTXID) - - // With the retribution state persisted, channel close - // persisted, and notification registered, we launch a - // new goroutine which will finalize the channel - // retribution after the breach transaction has been - // confirmed. + case breachEvent := <-b.cfg.ContractBreaches: + // We have been notified about a contract breach! + // Handle the handoff, making sure we ACK the event + // after we have safely added it to the retribution + // store. b.wg.Add(1) - go b.exactRetribution(cfChan, breachInfo) + go b.handleBreachHandoff(breachEvent) - delete(b.breachObservers, breachInfo.chanPoint) - - case chanPoint := <-b.newContracts: - // A new channel has just been opened within the - // daemon, so we launch a new breachObserver to handle - // the detection of attempted contract breaches. - settleSignal := make(chan struct{}) - - // If the contract is already being watched, then an - // additional send indicates we have a stale version of - // the contract. So we'll cancel active watcher - // goroutine to create a new instance with the latest - // contract reference. - if oldSignal, ok := b.breachObservers[chanPoint]; ok { - brarLog.Infof("ChannelPoint(%v) is now live, "+ - "abandoning state contract for live "+ - "version", chanPoint) - close(oldSignal) - } - - b.breachObservers[chanPoint] = settleSignal - - brarLog.Debugf("New contract detected, launching " + - "breachObserver") - - chainEvents, err := b.cfg.SubscribeChannelEvents(chanPoint) - if err != nil { - // TODO(roasbeef); panic? - brarLog.Errorf("unable to register for event "+ - "sub for chan_point=%v: %v", chanPoint, err) - } - - b.wg.Add(1) - go b.breachObserver(chainEvents, settleSignal) - - case chanPoint := <-b.settledContracts: - // A new channel has been closed either unilaterally or - // cooperatively, as a result we no longer need a - // breachObserver detected to the channel. - killSignal, ok := b.breachObservers[chanPoint] - if !ok { - brarLog.Errorf("Unable to find contract: %v", - chanPoint) - continue - } - - brarLog.Debugf("ChannelPoint(%v) has been settled, "+ - "cancelling breachObserver", chanPoint) - - // If we had a breachObserver active, then we signal it - // for exit and also delete its state from our tracking - // map. - close(killSignal) - delete(b.breachObservers, chanPoint) case <-b.quit: - break out + return } } - - return } // convertToSecondLevelRevoke takes a breached output, and a transaction that @@ -675,116 +528,123 @@ secondLevelCheck: } } -// breachObserver notifies the breachArbiter contract observer goroutine that a -// channel's contract has been breached by the prior counterparty. Once -// notified the breachArbiter will attempt to sweep ALL funds within the -// channel using the information provided within the BreachRetribution -// generated due to the breach of channel contract. The funds will be swept -// only after the breaching transaction receives a necessary number of -// confirmations. -func (b *breachArbiter) breachObserver( - chainEvents *contractcourt.ChainEventSubscription, - settleSignal chan struct{}) { +// handleBreachHandoff handles a new breach event, by writing it to disk, then +// notifies the breachArbiter contract observer goroutine that a channel's +// contract has been breached by the prior counterparty. Once notified the +// breachArbiter will attempt to sweep ALL funds within the channel using the +// information provided within the BreachRetribution generated due to the +// breach of channel contract. The funds will be swept only after the breaching +// transaction receives a necessary number of confirmations. +// +// NOTE: This MUST be run as a goroutine. +func (b *breachArbiter) handleBreachHandoff(breachEvent *ContractBreachEvent) { + defer b.wg.Done() - defer func() { - b.wg.Done() - chainEvents.Cancel() - }() - - chanPoint := chainEvents.ChanPoint - - brarLog.Debugf("Breach observer for ChannelPoint(%v) started ", + chanPoint := breachEvent.ChanPoint + brarLog.Debugf("Handling breach handoff for ChannelPoint(%v)", chanPoint) - gracefullyExit := func() { - // Launch a goroutine to cancel out this contract within the - // breachArbiter's main goroutine. - b.wg.Add(1) - go func() { - defer b.wg.Done() - - select { - case b.settledContracts <- chanPoint: - case <-b.quit: - } - }() - - b.cfg.CloseLink(&chanPoint, htlcswitch.CloseBreach) - } - - select { - // A read from this channel indicates that the contract has been - // settled cooperatively so we exit as our duties are no longer needed. - case <-settleSignal: - return - - // The channel has been closed cooperatively, so we're done here. - case <-chainEvents.CooperativeClosure: - gracefullyExit() - - // The channel has been closed by a normal means: force closing with - // the latest commitment transaction. - case <-chainEvents.LocalUnilateralClosure: - gracefullyExit() - case <-chainEvents.RemoteUnilateralClosure: - gracefullyExit() - // A read from this channel indicates that a channel breach has been // detected! So we notify the main coordination goroutine with the // information needed to bring the counterparty to justice. - case breachInfo := <-chainEvents.ContractBreach: - brarLog.Warnf("REVOKED STATE #%v FOR ChannelPoint(%v) "+ - "broadcast, REMOTE PEER IS DOING SOMETHING "+ - "SKETCHY!!!", breachInfo.RevokedStateNum, - chanPoint) + breachInfo := breachEvent.BreachRetribution + brarLog.Warnf("REVOKED STATE #%v FOR ChannelPoint(%v) "+ + "broadcast, REMOTE PEER IS DOING SOMETHING "+ + "SKETCHY!!!", breachInfo.RevokedStateNum, + chanPoint) - // Immediately notify the HTLC switch that this link has been - // breached in order to ensure any incoming or outgoing - // multi-hop HTLCs aren't sent over this link, nor any other - // links associated with this peer. - b.cfg.CloseLink(&chanPoint, htlcswitch.CloseBreach) + // Immediately notify the HTLC switch that this link has been + // breached in order to ensure any incoming or outgoing + // multi-hop HTLCs aren't sent over this link, nor any other + // links associated with this peer. + b.cfg.CloseLink(&chanPoint, htlcswitch.CloseBreach) - // TODO(roasbeef): need to handle case of remote broadcast - // mid-local initiated state-transition, possible - // false-positive? + // TODO(roasbeef): need to handle case of remote broadcast + // mid-local initiated state-transition, possible + // false-positive? - // Using the breach information provided by the wallet and the - // channel snapshot, construct the retribution information that - // will be persisted to disk. - retInfo := newRetributionInfo(&chanPoint, breachInfo) + // Acquire the mutex to ensure consistency between the call to + // IsBreached and Add below. + b.Lock() - // Persist the pending retribution state to disk. - err := b.cfg.Store.Add(retInfo) + // We first check if this breach info is already added to the + // retribution store. + breached, err := b.cfg.Store.IsBreached(&chanPoint) + if err != nil { + b.Unlock() + brarLog.Errorf("unable to check breach info in DB: %v", err) + + select { + case breachEvent.ProcessACK <- err: + case <-b.quit: + } + return + } + + // If this channel is already marked as breached in the retribution + // store, we already have handled the handoff for this breach. In this + // case we can safely ACK the handoff, and return. + if breached { + b.Unlock() + + select { + case breachEvent.ProcessACK <- nil: + case <-b.quit: + } + return + } + + // Using the breach information provided by the wallet and the + // channel snapshot, construct the retribution information that + // will be persisted to disk. + retInfo := newRetributionInfo(&chanPoint, breachInfo) + + // Persist the pending retribution state to disk. + err = b.cfg.Store.Add(retInfo) + b.Unlock() + if err != nil { + brarLog.Errorf("unable to persist retribution "+ + "info to db: %v", err) + } + + // Now that the breach has been persisted, try to send an + // acknowledgment back to the close observer with the error. If + // the ack is successful, the close observer will mark the + // channel as pending-closed in the channeldb. + select { + case breachEvent.ProcessACK <- err: + // Bail if we failed to persist retribution info. if err != nil { - brarLog.Errorf("unable to persist retribution "+ - "info to db: %v", err) - } - - // Now that the breach has been persisted, try to send an - // acknowledgment back to the close observer with the error. If - // the ack is successful, the close observer will mark the - // channel as pending-closed in the channeldb. - select { - case chainEvents.ProcessACK <- err: - // Bail if we failed to persist retribution info. - if err != nil { - return - } - - case <-b.quit: return } - // Finally, we send the retribution information into the - // breachArbiter event loop to deal swift justice. - select { - case b.breachedContracts <- retInfo: - case <-b.quit: - } - case <-b.quit: return } + + // Now that a new channel contract has been added to the retribution + // store, we first register for a notification to be dispatched once + // the breach transaction (the revoked commitment transaction) has been + // confirmed in the chain to ensure we're not dealing with a moving + // target. + breachTXID := &retInfo.commitHash + cfChan, err := b.cfg.Notifier.RegisterConfirmationsNtfn(breachTXID, 1, + retInfo.breachHeight) + if err != nil { + brarLog.Errorf("unable to register for conf updates for "+ + "txid: %v, err: %v", breachTXID, err) + return + } + + brarLog.Warnf("A channel has been breached with txid: %v. Waiting "+ + "for confirmation, then justice will be served!", breachTXID) + + // With the retribution state persisted, channel close persisted, and + // notification registered, we launch a new goroutine which will + // finalize the channel retribution after the breach transaction has + // been confirmed. + b.wg.Add(1) + go b.exactRetribution(cfChan, retInfo) } // SpendableOutput an interface which can be used by the breach arbiter to