diff --git a/breacharbiter.go b/breacharbiter.go index 33e1e3e7..a5d12c88 100644 --- a/breacharbiter.go +++ b/breacharbiter.go @@ -12,7 +12,6 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" - "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/lnwallet" "github.com/roasbeef/btcd/blockchain" @@ -37,6 +36,25 @@ var ( justiceTxnBucket = []byte("justice-txn") ) +// ContractBreachEvent is an event the breachArbiter will receive in case a +// contract breach is observed on-chain. It contains the necessary information +// to handle the breach, and a ProcessACK channel we will use to ACK the event +// when we have safely stored all the necessary information. +type ContractBreachEvent struct { + // ChanPoint is the channel point of the breached channel. + ChanPoint wire.OutPoint + + // ProcessACK is an error channel where a nil error should be sent + // iff the breach retribution info is safely stored in the retribution + // store. In case storing the information to the store fails, a non-nil + // error should be sent. + ProcessACK chan error + + // BreachRetribution is the information needed to act on this contract + // breach. + BreachRetribution *lnwallet.BreachRetribution +} + // BreachConfig bundles the required subsystems used by the breach arbiter. An // instance of BreachConfig is passed to newBreachArbiter during instantiation. type BreachConfig struct { @@ -67,10 +85,11 @@ type BreachConfig struct { // transaction to the network. PublishTransaction func(*wire.MsgTx) error - // SubscribeChannelEvents is a function closure that allows goroutines - // within the breachArbiter to be notified of potential on-chain events - // related to the channels they're watching. - SubscribeChannelEvents func(wire.OutPoint) (*contractcourt.ChainEventSubscription, error) + // ContractBreaches is a channel where the breachArbiter will receive + // notifications in the event of a contract breach being observed. A + // ContractBreachEvent must be ACKed by the breachArbiter, such that + // the sending subsystem knows that the event is properly handed off. + ContractBreaches <-chan *ContractBreachEvent // Signer is used by the breach arbiter to generate sweep transactions, // which move coins from previously open channels back to the user's @@ -97,45 +116,17 @@ type breachArbiter struct { cfg *BreachConfig - // breachObservers is a map which tracks all the active breach - // observers we're currently managing. The key of the map is the - // funding outpoint of the channel, and the value is a channel which - // will be closed once we detect that the channel has been - // cooperatively closed, thereby killing the goroutine and freeing up - // resources. - breachObservers map[wire.OutPoint]chan struct{} - - // breachedContracts is a channel which is used internally within the - // struct to send the necessary information required to punish a - // counterparty once a channel breach is detected. Breach observers - // use this to communicate with the main contractObserver goroutine. - breachedContracts chan *retributionInfo - - // settledContracts is a channel by outside subsystems to notify - // the breachArbiter that a channel has peacefully been closed. Once a - // channel has been closed the arbiter no longer needs to watch for - // breach closes. - settledContracts chan wire.OutPoint - - // newContracts is a channel which is used by outside subsystems to - // notify the breachArbiter of a new contract (a channel) that should - // be watched. - newContracts chan wire.OutPoint - quit chan struct{} wg sync.WaitGroup + sync.Mutex } // newBreachArbiter creates a new instance of a breachArbiter initialized with // its dependent objects. func newBreachArbiter(cfg *BreachConfig) *breachArbiter { return &breachArbiter{ - cfg: cfg, - breachObservers: make(map[wire.OutPoint]chan struct{}), - breachedContracts: make(chan *retributionInfo), - newContracts: make(chan wire.OutPoint), - settledContracts: make(chan wire.OutPoint), - quit: make(chan struct{}), + cfg: cfg, + quit: make(chan struct{}), } } @@ -191,49 +182,6 @@ func (b *breachArbiter) Start() error { } } - // We need to query that database state for all currently active - // channels, these channels will represent a super set of all channels - // that may be assigned a go routine to monitor for channel breaches. - activeChannels, err := b.cfg.DB.FetchAllChannels() - if err != nil && err != channeldb.ErrNoActiveChannels { - brarLog.Errorf("unable to fetch active channels: %v", err) - return err - } - - nActive := len(activeChannels) - if nActive > 0 { - brarLog.Infof("Retrieved %v channels from database, watching "+ - "with vigilance!", nActive) - } - - // Here we will determine a set of channels that will need to be managed - // by the contractObserver. This should comprise all active channels - // that have not been breached. If the channel point has an entry in the - // retribution store, we skip it to avoid creating a breach observer. - // Resolving breached channels will be handled later by spawning an - // exactRetribution task for each. - channelsToWatch := make([]*contractcourt.ChainEventSubscription, 0, nActive) - for _, chanState := range activeChannels { - // If this channel was previously breached, we skip it here to - // avoid creating a breach observer, as we can go straight to - // the task of exacting retribution. - chanPoint := chanState.FundingOutpoint - if _, ok := breachRetInfos[chanPoint]; ok { - continue - } - - // For each active channels, we'll request a chain event - // subscription form the system that's overseeing the channel. - chainEvents, err := b.cfg.SubscribeChannelEvents(chanPoint) - if err != nil { - return err - } - - // Finally, add this channel event stream to breach arbiter's - // list of channels to watch. - channelsToWatch = append(channelsToWatch, chainEvents) - } - // Spawn the exactRetribution tasks to monitor and resolve any breaches // that were loaded from the retribution store. for chanPoint := range breachRetInfos { @@ -258,7 +206,7 @@ func (b *breachArbiter) Start() error { // Start watching the remaining active channels! b.wg.Add(1) - go b.contractObserver(channelsToWatch) + go b.contractObserver() return nil } @@ -286,127 +234,32 @@ func (b *breachArbiter) IsBreached(chanPoint *wire.OutPoint) (bool, error) { } // contractObserver is the primary goroutine for the breachArbiter. This -// goroutine is responsible for managing goroutines that watch for breaches for -// all current active and newly created channels. If a channel breach is -// detected by a spawned child goroutine, then the contractObserver will -// execute the retribution logic required to sweep ALL outputs from a contested -// channel into the daemon's wallet. +// goroutine is responsible for handling breach events coming from the +// contractcourt on the ContractBreaches channel. If a channel breach is +// detected, then the contractObserver will execute the retribution logic +// required to sweep ALL outputs from a contested channel into the daemon's +// wallet. // // NOTE: This MUST be run as a goroutine. -func (b *breachArbiter) contractObserver(channelEvents []*contractcourt.ChainEventSubscription) { - +func (b *breachArbiter) contractObserver() { defer b.wg.Done() - brarLog.Infof("Starting contract observer with %v active channels", - len(channelEvents)) + brarLog.Infof("Starting contract observer, watching for breaches.") - // For each active channel found within the database, we launch a - // detected breachObserver goroutine for that channel and also track - // the new goroutine within the breachObservers map so we can cancel it - // later if necessary. - for _, channelEvent := range channelEvents { - settleSignal := make(chan struct{}) - chanPoint := channelEvent.ChanPoint - b.breachObservers[chanPoint] = settleSignal - - b.wg.Add(1) - go b.breachObserver(channelEvent, settleSignal) - } - - // TODO(roasbeef): need to ensure currentHeight passed in doesn't - // result in lost notification - -out: for { select { - case breachInfo := <-b.breachedContracts: - // A new channel contract has just been breached! We - // first register for a notification to be dispatched - // once the breach transaction (the revoked commitment - // transaction) has been confirmed in the chain to - // ensure we're not dealing with a moving target. - breachTXID := &breachInfo.commitHash - cfChan, err := b.cfg.Notifier.RegisterConfirmationsNtfn( - breachTXID, 1, breachInfo.breachHeight) - if err != nil { - brarLog.Errorf("unable to register for conf "+ - "updates for txid: %v, err: %v", - breachTXID, err) - continue - } - - brarLog.Warnf("A channel has been breached with "+ - "txid: %v. Waiting for confirmation, then "+ - "justice will be served!", breachTXID) - - // With the retribution state persisted, channel close - // persisted, and notification registered, we launch a - // new goroutine which will finalize the channel - // retribution after the breach transaction has been - // confirmed. + case breachEvent := <-b.cfg.ContractBreaches: + // We have been notified about a contract breach! + // Handle the handoff, making sure we ACK the event + // after we have safely added it to the retribution + // store. b.wg.Add(1) - go b.exactRetribution(cfChan, breachInfo) + go b.handleBreachHandoff(breachEvent) - delete(b.breachObservers, breachInfo.chanPoint) - - case chanPoint := <-b.newContracts: - // A new channel has just been opened within the - // daemon, so we launch a new breachObserver to handle - // the detection of attempted contract breaches. - settleSignal := make(chan struct{}) - - // If the contract is already being watched, then an - // additional send indicates we have a stale version of - // the contract. So we'll cancel active watcher - // goroutine to create a new instance with the latest - // contract reference. - if oldSignal, ok := b.breachObservers[chanPoint]; ok { - brarLog.Infof("ChannelPoint(%v) is now live, "+ - "abandoning state contract for live "+ - "version", chanPoint) - close(oldSignal) - } - - b.breachObservers[chanPoint] = settleSignal - - brarLog.Debugf("New contract detected, launching " + - "breachObserver") - - chainEvents, err := b.cfg.SubscribeChannelEvents(chanPoint) - if err != nil { - // TODO(roasbeef); panic? - brarLog.Errorf("unable to register for event "+ - "sub for chan_point=%v: %v", chanPoint, err) - } - - b.wg.Add(1) - go b.breachObserver(chainEvents, settleSignal) - - case chanPoint := <-b.settledContracts: - // A new channel has been closed either unilaterally or - // cooperatively, as a result we no longer need a - // breachObserver detected to the channel. - killSignal, ok := b.breachObservers[chanPoint] - if !ok { - brarLog.Errorf("Unable to find contract: %v", - chanPoint) - continue - } - - brarLog.Debugf("ChannelPoint(%v) has been settled, "+ - "cancelling breachObserver", chanPoint) - - // If we had a breachObserver active, then we signal it - // for exit and also delete its state from our tracking - // map. - close(killSignal) - delete(b.breachObservers, chanPoint) case <-b.quit: - break out + return } } - - return } // convertToSecondLevelRevoke takes a breached output, and a transaction that @@ -675,116 +528,123 @@ secondLevelCheck: } } -// breachObserver notifies the breachArbiter contract observer goroutine that a -// channel's contract has been breached by the prior counterparty. Once -// notified the breachArbiter will attempt to sweep ALL funds within the -// channel using the information provided within the BreachRetribution -// generated due to the breach of channel contract. The funds will be swept -// only after the breaching transaction receives a necessary number of -// confirmations. -func (b *breachArbiter) breachObserver( - chainEvents *contractcourt.ChainEventSubscription, - settleSignal chan struct{}) { +// handleBreachHandoff handles a new breach event, by writing it to disk, then +// notifies the breachArbiter contract observer goroutine that a channel's +// contract has been breached by the prior counterparty. Once notified the +// breachArbiter will attempt to sweep ALL funds within the channel using the +// information provided within the BreachRetribution generated due to the +// breach of channel contract. The funds will be swept only after the breaching +// transaction receives a necessary number of confirmations. +// +// NOTE: This MUST be run as a goroutine. +func (b *breachArbiter) handleBreachHandoff(breachEvent *ContractBreachEvent) { + defer b.wg.Done() - defer func() { - b.wg.Done() - chainEvents.Cancel() - }() - - chanPoint := chainEvents.ChanPoint - - brarLog.Debugf("Breach observer for ChannelPoint(%v) started ", + chanPoint := breachEvent.ChanPoint + brarLog.Debugf("Handling breach handoff for ChannelPoint(%v)", chanPoint) - gracefullyExit := func() { - // Launch a goroutine to cancel out this contract within the - // breachArbiter's main goroutine. - b.wg.Add(1) - go func() { - defer b.wg.Done() - - select { - case b.settledContracts <- chanPoint: - case <-b.quit: - } - }() - - b.cfg.CloseLink(&chanPoint, htlcswitch.CloseBreach) - } - - select { - // A read from this channel indicates that the contract has been - // settled cooperatively so we exit as our duties are no longer needed. - case <-settleSignal: - return - - // The channel has been closed cooperatively, so we're done here. - case <-chainEvents.CooperativeClosure: - gracefullyExit() - - // The channel has been closed by a normal means: force closing with - // the latest commitment transaction. - case <-chainEvents.LocalUnilateralClosure: - gracefullyExit() - case <-chainEvents.RemoteUnilateralClosure: - gracefullyExit() - // A read from this channel indicates that a channel breach has been // detected! So we notify the main coordination goroutine with the // information needed to bring the counterparty to justice. - case breachInfo := <-chainEvents.ContractBreach: - brarLog.Warnf("REVOKED STATE #%v FOR ChannelPoint(%v) "+ - "broadcast, REMOTE PEER IS DOING SOMETHING "+ - "SKETCHY!!!", breachInfo.RevokedStateNum, - chanPoint) + breachInfo := breachEvent.BreachRetribution + brarLog.Warnf("REVOKED STATE #%v FOR ChannelPoint(%v) "+ + "broadcast, REMOTE PEER IS DOING SOMETHING "+ + "SKETCHY!!!", breachInfo.RevokedStateNum, + chanPoint) - // Immediately notify the HTLC switch that this link has been - // breached in order to ensure any incoming or outgoing - // multi-hop HTLCs aren't sent over this link, nor any other - // links associated with this peer. - b.cfg.CloseLink(&chanPoint, htlcswitch.CloseBreach) + // Immediately notify the HTLC switch that this link has been + // breached in order to ensure any incoming or outgoing + // multi-hop HTLCs aren't sent over this link, nor any other + // links associated with this peer. + b.cfg.CloseLink(&chanPoint, htlcswitch.CloseBreach) - // TODO(roasbeef): need to handle case of remote broadcast - // mid-local initiated state-transition, possible - // false-positive? + // TODO(roasbeef): need to handle case of remote broadcast + // mid-local initiated state-transition, possible + // false-positive? - // Using the breach information provided by the wallet and the - // channel snapshot, construct the retribution information that - // will be persisted to disk. - retInfo := newRetributionInfo(&chanPoint, breachInfo) + // Acquire the mutex to ensure consistency between the call to + // IsBreached and Add below. + b.Lock() - // Persist the pending retribution state to disk. - err := b.cfg.Store.Add(retInfo) + // We first check if this breach info is already added to the + // retribution store. + breached, err := b.cfg.Store.IsBreached(&chanPoint) + if err != nil { + b.Unlock() + brarLog.Errorf("unable to check breach info in DB: %v", err) + + select { + case breachEvent.ProcessACK <- err: + case <-b.quit: + } + return + } + + // If this channel is already marked as breached in the retribution + // store, we already have handled the handoff for this breach. In this + // case we can safely ACK the handoff, and return. + if breached { + b.Unlock() + + select { + case breachEvent.ProcessACK <- nil: + case <-b.quit: + } + return + } + + // Using the breach information provided by the wallet and the + // channel snapshot, construct the retribution information that + // will be persisted to disk. + retInfo := newRetributionInfo(&chanPoint, breachInfo) + + // Persist the pending retribution state to disk. + err = b.cfg.Store.Add(retInfo) + b.Unlock() + if err != nil { + brarLog.Errorf("unable to persist retribution "+ + "info to db: %v", err) + } + + // Now that the breach has been persisted, try to send an + // acknowledgment back to the close observer with the error. If + // the ack is successful, the close observer will mark the + // channel as pending-closed in the channeldb. + select { + case breachEvent.ProcessACK <- err: + // Bail if we failed to persist retribution info. if err != nil { - brarLog.Errorf("unable to persist retribution "+ - "info to db: %v", err) - } - - // Now that the breach has been persisted, try to send an - // acknowledgment back to the close observer with the error. If - // the ack is successful, the close observer will mark the - // channel as pending-closed in the channeldb. - select { - case chainEvents.ProcessACK <- err: - // Bail if we failed to persist retribution info. - if err != nil { - return - } - - case <-b.quit: return } - // Finally, we send the retribution information into the - // breachArbiter event loop to deal swift justice. - select { - case b.breachedContracts <- retInfo: - case <-b.quit: - } - case <-b.quit: return } + + // Now that a new channel contract has been added to the retribution + // store, we first register for a notification to be dispatched once + // the breach transaction (the revoked commitment transaction) has been + // confirmed in the chain to ensure we're not dealing with a moving + // target. + breachTXID := &retInfo.commitHash + cfChan, err := b.cfg.Notifier.RegisterConfirmationsNtfn(breachTXID, 1, + retInfo.breachHeight) + if err != nil { + brarLog.Errorf("unable to register for conf updates for "+ + "txid: %v, err: %v", breachTXID, err) + return + } + + brarLog.Warnf("A channel has been breached with txid: %v. Waiting "+ + "for confirmation, then justice will be served!", breachTXID) + + // With the retribution state persisted, channel close persisted, and + // notification registered, we launch a new goroutine which will + // finalize the channel retribution after the breach transaction has + // been confirmed. + b.wg.Add(1) + go b.exactRetribution(cfChan, retInfo) } // SpendableOutput an interface which can be used by the breach arbiter to diff --git a/breacharbiter_test.go b/breacharbiter_test.go index a4e7ba0f..216bf36c 100644 --- a/breacharbiter_test.go +++ b/breacharbiter_test.go @@ -21,7 +21,6 @@ import ( "github.com/btcsuite/btclog" "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/channeldb" - "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/lnwallet" @@ -949,19 +948,10 @@ func TestBreachHandoffSuccess(t *testing.T) { defer cleanUpChans() // Instantiate a breach arbiter to handle the breach of alice's channel. - alicePoint := alice.ChannelPoint() - spendEvents := contractcourt.ChainEventSubscription{ - RemoteUnilateralClosure: make(chan *lnwallet.UnilateralCloseSummary, 1), - LocalUnilateralClosure: make(chan *contractcourt.LocalUnilateralCloseInfo, 1), - CooperativeClosure: make(chan struct{}, 1), - ContractBreach: make(chan *lnwallet.BreachRetribution, 1), - ProcessACK: make(chan error, 1), - ChanPoint: *alicePoint, - Cancel: func() { - }, - } + contractBreaches := make(chan *ContractBreachEvent) + brar, cleanUpArb, err := createTestArbiter( - t, &spendEvents, alice.State().Db, + t, contractBreaches, alice.State().Db, ) if err != nil { t.Fatalf("unable to initialize test breach arbiter: %v", err) @@ -1005,13 +995,21 @@ func TestBreachHandoffSuccess(t *testing.T) { // Signal a spend of the funding transaction and wait for the close // observer to exit. - spendEvents.ContractBreach <- &lnwallet.BreachRetribution{ - BreachTransaction: bobClose.CloseTx, + breach := &ContractBreachEvent{ + ChanPoint: *chanPoint, + ProcessACK: make(chan error, 1), + BreachRetribution: &lnwallet.BreachRetribution{ + BreachTransaction: bobClose.CloseTx, + }, } + contractBreaches <- breach // We'll also wait to consume the ACK back from the breach arbiter. select { - case <-spendEvents.ProcessACK: + case err := <-breach.ProcessACK: + if err != nil { + t.Fatalf("handoff failed: %v", err) + } case <-time.After(time.Second * 15): t.Fatalf("breach arbiter didn't send ack back") } @@ -1020,6 +1018,32 @@ func TestBreachHandoffSuccess(t *testing.T) { // retribution information and the channel should be shown as pending // force closed. assertArbiterBreach(t, brar, chanPoint) + + // Send another breach event. Since the handoff for this channel was + // already ACKed, the breach arbiter should immediately ACK and ignore + // this event. + breach = &ContractBreachEvent{ + ChanPoint: *chanPoint, + ProcessACK: make(chan error, 1), + BreachRetribution: &lnwallet.BreachRetribution{ + BreachTransaction: bobClose.CloseTx, + }, + } + + contractBreaches <- breach + + // We'll also wait to consume the ACK back from the breach arbiter. + select { + case err := <-breach.ProcessACK: + if err != nil { + t.Fatalf("handoff failed: %v", err) + } + case <-time.After(time.Second * 15): + t.Fatalf("breach arbiter didn't send ack back") + } + + // State should not have changed. + assertArbiterBreach(t, brar, chanPoint) } // TestBreachHandoffFail tests that a channel's close observer properly @@ -1038,19 +1062,10 @@ func TestBreachHandoffFail(t *testing.T) { defer cleanUpChans() // Instantiate a breach arbiter to handle the breach of alice's channel. - alicePoint := alice.ChannelPoint() - spendEvents := contractcourt.ChainEventSubscription{ - RemoteUnilateralClosure: make(chan *lnwallet.UnilateralCloseSummary, 1), - LocalUnilateralClosure: make(chan *contractcourt.LocalUnilateralCloseInfo, 1), - CooperativeClosure: make(chan struct{}, 1), - ContractBreach: make(chan *lnwallet.BreachRetribution, 1), - ProcessACK: make(chan error, 1), - ChanPoint: *alicePoint, - Cancel: func() { - }, - } + contractBreaches := make(chan *ContractBreachEvent) + brar, cleanUpArb, err := createTestArbiter( - t, &spendEvents, alice.State().Db, + t, contractBreaches, alice.State().Db, ) if err != nil { t.Fatalf("unable to initialize test breach arbiter: %v", err) @@ -1099,11 +1114,18 @@ func TestBreachHandoffFail(t *testing.T) { // Signal the notifier to dispatch spend notifications of the funding // transaction using the transaction from bob's closing summary. chanPoint := alice.ChanPoint - spendEvents.ContractBreach <- &lnwallet.BreachRetribution{ - BreachTransaction: bobClose.CloseTx, + breach := &ContractBreachEvent{ + ChanPoint: *chanPoint, + ProcessACK: make(chan error, 1), + BreachRetribution: &lnwallet.BreachRetribution{ + BreachTransaction: bobClose.CloseTx, + }, } + contractBreaches <- breach + + // We'll also wait to consume the ACK back from the breach arbiter. select { - case err := <-spendEvents.ProcessACK: + case err := <-breach.ProcessACK: if err == nil { t.Fatalf("breach write should have failed") } @@ -1118,7 +1140,7 @@ func TestBreachHandoffFail(t *testing.T) { assertNotPendingClosed(t, alice) brar, cleanUpArb, err = createTestArbiter( - t, &spendEvents, alice.State().Db, + t, contractBreaches, alice.State().Db, ) if err != nil { t.Fatalf("unable to initialize test breach arbiter: %v", err) @@ -1139,11 +1161,21 @@ func TestBreachHandoffFail(t *testing.T) { // Signal a spend of the funding transaction and wait for the close // observer to exit. This time we are allowing the handoff to succeed. - spendEvents.ContractBreach <- &lnwallet.BreachRetribution{ - BreachTransaction: bobClose.CloseTx, + breach = &ContractBreachEvent{ + ChanPoint: *chanPoint, + ProcessACK: make(chan error, 1), + BreachRetribution: &lnwallet.BreachRetribution{ + BreachTransaction: bobClose.CloseTx, + }, } + + contractBreaches <- breach + select { - case <-spendEvents.ProcessACK: + case err := <-breach.ProcessACK: + if err != nil { + t.Fatalf("handoff failed: %v", err) + } case <-time.After(time.Second * 15): t.Fatalf("breach arbiter didn't send ack back") } @@ -1207,7 +1239,7 @@ func assertNotPendingClosed(t *testing.T, c *lnwallet.LightningChannel) { // createTestArbiter instantiates a breach arbiter with a failing retribution // store, so that controlled failures can be tested. -func createTestArbiter(t *testing.T, chainEvents *contractcourt.ChainEventSubscription, +func createTestArbiter(t *testing.T, contractBreaches chan *ContractBreachEvent, db *channeldb.DB) (*breachArbiter, func(), error) { // Create a failing retribution store, that wraps a normal one. @@ -1222,13 +1254,11 @@ func createTestArbiter(t *testing.T, chainEvents *contractcourt.ChainEventSubscr // Assemble our test arbiter. notifier := makeMockSpendNotifier() ba := newBreachArbiter(&BreachConfig{ - CloseLink: func(_ *wire.OutPoint, _ htlcswitch.ChannelCloseType) {}, - DB: db, - Estimator: &lnwallet.StaticFeeEstimator{FeeRate: 50}, - GenSweepScript: func() ([]byte, error) { return nil, nil }, - SubscribeChannelEvents: func(_ wire.OutPoint) (*contractcourt.ChainEventSubscription, error) { - return chainEvents, nil - }, + CloseLink: func(_ *wire.OutPoint, _ htlcswitch.ChannelCloseType) {}, + DB: db, + Estimator: &lnwallet.StaticFeeEstimator{FeeRate: 50}, + GenSweepScript: func() ([]byte, error) { return nil, nil }, + ContractBreaches: contractBreaches, Signer: signer, Notifier: notifier, PublishTransaction: func(_ *wire.MsgTx) error { return nil }, diff --git a/contractcourt/chain_arbitrator.go b/contractcourt/chain_arbitrator.go index 95bc071c..d84a9b83 100644 --- a/contractcourt/chain_arbitrator.go +++ b/contractcourt/chain_arbitrator.go @@ -81,6 +81,13 @@ type ChainArbitratorConfig struct { // TODO(roasbeef): rename, routing based MarkLinkInactive func(wire.OutPoint) error + // ContractBreach is a function closure that the ChainArbitrator will + // use to notify the breachArbiter about a contract breach. It should + // only return a non-nil error when the breachArbiter has preserved the + // necessary breach info for this channel point, and it is safe to mark + // the channel as pending close in the database. + ContractBreach func(wire.OutPoint, *lnwallet.BreachRetribution) error + // IsOurAddress is a function that returns true if the passed address // is known to the underlying wallet. Otherwise, false should be // returned. @@ -327,10 +334,19 @@ func (c *ChainArbitrator) Start() error { // First, we'll create an active chainWatcher for this channel // to ensure that we detect any relevant on chain events. chainWatcher, err := newChainWatcher( - channel, c.cfg.Notifier, c.cfg.PreimageDB, c.cfg.Signer, - c.cfg.IsOurAddress, func() error { - // TODO(roasbeef): also need to pass in log? - return c.resolveContract(chanPoint, nil) + chainWatcherConfig{ + chanState: channel, + notifier: c.cfg.Notifier, + pCache: c.cfg.PreimageDB, + signer: c.cfg.Signer, + isOurAddr: c.cfg.IsOurAddress, + markChanClosed: func() error { + // TODO(roasbeef): also need to pass in log? + return c.resolveContract(chanPoint, nil) + }, + contractBreach: func(retInfo *lnwallet.BreachRetribution) error { + return c.cfg.ContractBreach(chanPoint, retInfo) + }, }, ) if err != nil { @@ -339,7 +355,7 @@ func (c *ChainArbitrator) Start() error { c.activeWatchers[chanPoint] = chainWatcher channelArb, err := newActiveChannelArbitrator( - channel, c, chainWatcher.SubscribeChannelEvents(false), + channel, c, chainWatcher.SubscribeChannelEvents(), ) if err != nil { return err @@ -654,9 +670,18 @@ func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error // First, also create an active chainWatcher for this channel to ensure // that we detect any relevant on chain events. chainWatcher, err := newChainWatcher( - newChan, c.cfg.Notifier, c.cfg.PreimageDB, c.cfg.Signer, - c.cfg.IsOurAddress, func() error { - return c.resolveContract(chanPoint, nil) + chainWatcherConfig{ + chanState: newChan, + notifier: c.cfg.Notifier, + pCache: c.cfg.PreimageDB, + signer: c.cfg.Signer, + isOurAddr: c.cfg.IsOurAddress, + markChanClosed: func() error { + return c.resolveContract(chanPoint, nil) + }, + contractBreach: func(retInfo *lnwallet.BreachRetribution) error { + return c.cfg.ContractBreach(chanPoint, retInfo) + }, }, ) if err != nil { @@ -668,7 +693,7 @@ func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error // We'll also create a new channel arbitrator instance using this new // channel, and our internal state. channelArb, err := newActiveChannelArbitrator( - newChan, c, chainWatcher.SubscribeChannelEvents(false), + newChan, c, chainWatcher.SubscribeChannelEvents(), ) if err != nil { return err @@ -696,7 +721,7 @@ func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error // TODO(roasbeef): can be used later to provide RPC hook for all channel // lifetimes func (c *ChainArbitrator) SubscribeChannelEvents( - chanPoint wire.OutPoint, syncDispatch bool) (*ChainEventSubscription, error) { + chanPoint wire.OutPoint) (*ChainEventSubscription, error) { // First, we'll attempt to look up the active watcher for this channel. // If we can't find it, then we'll return an error back to the caller. @@ -708,7 +733,7 @@ func (c *ChainArbitrator) SubscribeChannelEvents( // With the watcher located, we'll request for it to create a new chain // event subscription client. - return watcher.SubscribeChannelEvents(syncDispatch), nil + return watcher.SubscribeChannelEvents(), nil } // BeginCoopChanClose allows the initiator or responder to a cooperative diff --git a/contractcourt/chain_watcher.go b/contractcourt/chain_watcher.go index ffd632b6..cf35d558 100644 --- a/contractcourt/chain_watcher.go +++ b/contractcourt/chain_watcher.go @@ -51,42 +51,20 @@ type ChainEventSubscription struct { // material required to bring the cheating channel peer to justice. ContractBreach chan *lnwallet.BreachRetribution - // ProcessACK is a channel that will be used by the chainWatcher to - // synchronize dispatch and processing of the notification with the act - // of updating the state of the channel on disk. This ensures that the - // event can be reliably handed off. - // - // NOTE: This channel will only be used if the syncDispatch arg passed - // into the constructor is true. - ProcessACK chan error - // Cancel cancels the subscription to the event stream for a particular // channel. This method should be called once the caller no longer needs to // be notified of any on-chain events for a particular channel. Cancel func() } -// chainWatcher is a system that's assigned to every active channel. The duty -// of this system is to watch the chain for spends of the channels chan point. -// If a spend is detected then with chain watcher will notify all subscribers -// that the channel has been closed, and also give them the materials necessary -// to sweep the funds of the channel on chain eventually. -type chainWatcher struct { - started int32 - stopped int32 - - quit chan struct{} - wg sync.WaitGroup - +// chainWatcherConfig encapsulates all the necessary functions and interfaces +// needed to watch and act on on-chain events for a particular channel. +type chainWatcherConfig struct { // chanState is a snapshot of the persistent state of the channel that // we're watching. In the event of an on-chain event, we'll query the // database to ensure that we act using the most up to date state. chanState *channeldb.OpenChannel - // stateHintObfuscator is a 48-bit state hint that's used to obfuscate - // the current state number on the commitment transactions. - stateHintObfuscator [lnwallet.StateHintSize]byte - // notifier is a reference to the channel notifier that we'll use to be // notified of output spends and when transactions are confirmed. notifier chainntnfs.ChainNotifier @@ -101,6 +79,40 @@ type chainWatcher struct { // machine. signer lnwallet.Signer + // markChanClosed is a method that will be called by the watcher if it + // detects that a cooperative closure transaction has successfully been + // confirmed. + markChanClosed func() error + + // contractBreach is a method that will be called by the watcher if it + // detects that a contract breach transaction has been confirmed. Only + // when this method returns with a non-nil error it will be safe to mark + // the channel as pending close in the database. + contractBreach func(*lnwallet.BreachRetribution) error + + // isOurAddr is a function that returns true if the passed address is + // known to us. + isOurAddr func(btcutil.Address) bool +} + +// chainWatcher is a system that's assigned to every active channel. The duty +// of this system is to watch the chain for spends of the channels chan point. +// If a spend is detected then with chain watcher will notify all subscribers +// that the channel has been closed, and also give them the materials necessary +// to sweep the funds of the channel on chain eventually. +type chainWatcher struct { + started int32 + stopped int32 + + quit chan struct{} + wg sync.WaitGroup + + cfg chainWatcherConfig + + // stateHintObfuscator is a 48-bit state hint that's used to obfuscate + // the current state number on the commitment transactions. + stateHintObfuscator [lnwallet.StateHintSize]byte + // All the fields below are protected by this mutex. sync.Mutex @@ -117,30 +129,18 @@ type chainWatcher struct { // We'll use this map to keep track of all possible channel closures to // ensure out db state is correct in the end. possibleCloses map[chainhash.Hash]*channeldb.ChannelCloseSummary - - // markChanClosed is a method that will be called by the watcher if it - // detects that a cooperative closure transaction has successfully been - // confirmed. - markChanClosed func() error - - // isOurAddr is a function that returns true if the passed address is - // known to us. - isOurAddr func(btcutil.Address) bool } // newChainWatcher returns a new instance of a chainWatcher for a channel given // the chan point to watch, and also a notifier instance that will allow us to // detect on chain events. -func newChainWatcher(chanState *channeldb.OpenChannel, - notifier chainntnfs.ChainNotifier, pCache WitnessBeacon, - signer lnwallet.Signer, isOurAddr func(btcutil.Address) bool, - markChanClosed func() error) (*chainWatcher, error) { - +func newChainWatcher(cfg chainWatcherConfig) (*chainWatcher, error) { // In order to be able to detect the nature of a potential channel // closure we'll need to reconstruct the state hint bytes used to // obfuscate the commitment state number encoded in the lock time and // sequence fields. var stateHint [lnwallet.StateHintSize]byte + chanState := cfg.chanState if chanState.IsInitiator { stateHint = lnwallet.DeriveStateHintObfuscator( chanState.LocalChanCfg.PaymentBasePoint.PubKey, @@ -154,15 +154,10 @@ func newChainWatcher(chanState *channeldb.OpenChannel, } return &chainWatcher{ - chanState: chanState, + cfg: cfg, stateHintObfuscator: stateHint, - notifier: notifier, - pCache: pCache, - markChanClosed: markChanClosed, - signer: signer, quit: make(chan struct{}), clientSubscriptions: make(map[uint64]*ChainEventSubscription), - isOurAddr: isOurAddr, possibleCloses: make(map[chainhash.Hash]*channeldb.ChannelCloseSummary), }, nil } @@ -174,22 +169,23 @@ func (c *chainWatcher) Start() error { return nil } + chanState := c.cfg.chanState log.Debugf("Starting chain watcher for ChannelPoint(%v)", - c.chanState.FundingOutpoint) + chanState.FundingOutpoint) // First, we'll register for a notification to be dispatched if the // funding output is spent. - fundingOut := &c.chanState.FundingOutpoint + fundingOut := &chanState.FundingOutpoint // As a height hint, we'll try to use the opening height, but if the // channel isn't yet open, then we'll use the height it was broadcast // at. - heightHint := c.chanState.ShortChanID.BlockHeight + heightHint := chanState.ShortChanID.BlockHeight if heightHint == 0 { - heightHint = c.chanState.FundingBroadcastHeight + heightHint = chanState.FundingBroadcastHeight } - spendNtfn, err := c.notifier.RegisterSpendNtfn( + spendNtfn, err := c.cfg.notifier.RegisterSpendNtfn( fundingOut, heightHint, false, ) if err != nil { @@ -220,12 +216,8 @@ func (c *chainWatcher) Stop() error { // SubscribeChannelEvents returns an active subscription to the set of channel // events for the channel watched by this chain watcher. Once clients no longer // require the subscription, they should call the Cancel() method to allow the -// watcher to regain those committed resources. The syncDispatch bool indicates -// if the caller would like a synchronous dispatch of the notification. This -// means that the main chain watcher goroutine won't proceed with -// post-processing after the notification until the ProcessACK channel is sent -// upon. -func (c *chainWatcher) SubscribeChannelEvents(syncDispatch bool) *ChainEventSubscription { +// watcher to regain those committed resources. +func (c *chainWatcher) SubscribeChannelEvents() *ChainEventSubscription { c.Lock() clientID := c.clientID @@ -233,10 +225,10 @@ func (c *chainWatcher) SubscribeChannelEvents(syncDispatch bool) *ChainEventSubs c.Unlock() log.Debugf("New ChainEventSubscription(id=%v) for ChannelPoint(%v)", - clientID, c.chanState.FundingOutpoint) + clientID, c.cfg.chanState.FundingOutpoint) sub := &ChainEventSubscription{ - ChanPoint: c.chanState.FundingOutpoint, + ChanPoint: c.cfg.chanState.FundingOutpoint, RemoteUnilateralClosure: make(chan *lnwallet.UnilateralCloseSummary, 1), LocalUnilateralClosure: make(chan *LocalUnilateralCloseInfo, 1), CooperativeClosure: make(chan struct{}, 1), @@ -249,34 +241,6 @@ func (c *chainWatcher) SubscribeChannelEvents(syncDispatch bool) *ChainEventSubs }, } - if syncDispatch { - sub.ProcessACK = make(chan error, 1) - - // If this client is syncDispatch, we cannot safely delete it - // from our list of clients. This is because of a potential - // race at shutdown, where the client shuts down and calls - // Cancel(). In this case we must make sure the ChainWatcher - // doesn't think it has successfully handed off a contract - // breach to the client. We start a goroutine that will send an - // error on the ProcessACK channel until the ChainWatcher is - // shutdown. - sub.Cancel = func() { - c.wg.Add(1) - go func() { - defer c.wg.Done() - - err := fmt.Errorf("cancelled") - for { - select { - case sub.ProcessACK <- err: - case <-c.quit: - return - } - } - }() - } - } - c.Lock() c.clientSubscriptions[clientID] = sub c.Unlock() @@ -293,138 +257,136 @@ func (c *chainWatcher) closeObserver(spendNtfn *chainntnfs.SpendEvent) { defer c.wg.Done() log.Infof("Close observer for ChannelPoint(%v) active", - c.chanState.FundingOutpoint) + c.cfg.chanState.FundingOutpoint) - for { - select { - // We've detected a spend of the channel onchain! Depending on - // the type of spend, we'll act accordingly , so we'll examine - // the spending transaction to determine what we should do. - // - // TODO(Roasbeef): need to be able to ensure this only triggers - // on confirmation, to ensure if multiple txns are broadcast, we - // act on the one that's timestamped - case commitSpend, ok := <-spendNtfn.Spend: - // If the channel was closed, then this means that the - // notifier exited, so we will as well. - if !ok { - return - } - - // Otherwise, the remote party might have broadcast a - // prior revoked state...!!! - commitTxBroadcast := commitSpend.SpendingTx - - localCommit, remoteCommit, err := c.chanState.LatestCommitments() - if err != nil { - log.Errorf("Unable to fetch channel state for "+ - "chan_point=%v", c.chanState.FundingOutpoint) - return - } - - // We'll not retrieve the latest sate of the revocation - // store so we can populate the information within the - // channel state object that we have. - // - // TODO(roasbeef): mutation is bad mkay - _, err = c.chanState.RemoteRevocationStore() - if err != nil { - log.Errorf("Unable to fetch revocation state for "+ - "chan_point=%v", c.chanState.FundingOutpoint) - return - } - - // If this is our commitment transaction, then we can - // exit here as we don't have any further processing we - // need to do (we can't cheat ourselves :p). - commitmentHash := localCommit.CommitTx.TxHash() - isOurCommitment := commitSpend.SpenderTxHash.IsEqual( - &commitmentHash, - ) - if isOurCommitment { - if err := c.dispatchLocalForceClose( - commitSpend, *localCommit, - ); err != nil { - log.Errorf("unable to handle local"+ - "close for chan_point=%v: %v", - c.chanState.FundingOutpoint, err) - } - return - } - - // Next, we'll check to see if this is a cooperative - // channel closure or not. This is characterized by - // having an input sequence number that's finalized. - // This won't happen with regular commitment - // transactions due to the state hint encoding scheme. - if commitTxBroadcast.TxIn[0].Sequence == wire.MaxTxInSequenceNum { - err := c.dispatchCooperativeClose(commitSpend) - if err != nil { - log.Errorf("unable to handle co op close: %v", err) - } - return - } - - log.Warnf("Unprompted commitment broadcast for "+ - "ChannelPoint(%v) ", c.chanState.FundingOutpoint) - - // Decode the state hint encoded within the commitment - // transaction to determine if this is a revoked state - // or not. - obfuscator := c.stateHintObfuscator - broadcastStateNum := lnwallet.GetStateNumHint( - commitTxBroadcast, obfuscator, - ) - remoteStateNum := remoteCommit.CommitHeight - - switch { - // If state number spending transaction matches the - // current latest state, then they've initiated a - // unilateral close. So we'll trigger the unilateral - // close signal so subscribers can clean up the state - // as necessary. - // - // We'll also handle the case of the remote party - // broadcasting their commitment transaction which is - // one height above ours. This case can arise when we - // initiate a state transition, but the remote party - // has a fail crash _after_ accepting the new state, - // but _before_ sending their signature to us. - case broadcastStateNum >= remoteStateNum: - if err := c.dispatchRemoteForceClose( - commitSpend, *remoteCommit, - ); err != nil { - log.Errorf("unable to handle remote "+ - "close for chan_point=%v: %v", - c.chanState.FundingOutpoint, err) - } - - // If the state number broadcast is lower than the - // remote node's current un-revoked height, then - // THEY'RE ATTEMPTING TO VIOLATE THE CONTRACT LAID OUT - // WITHIN THE PAYMENT CHANNEL. Therefore we close the - // signal indicating a revoked broadcast to allow - // subscribers to - // swiftly dispatch justice!!! - case broadcastStateNum < remoteStateNum: - if err := c.dispatchContractBreach( - commitSpend, remoteCommit, - broadcastStateNum, - ); err != nil { - log.Errorf("unable to handle channel "+ - "breach for chan_point=%v: %v", - c.chanState.FundingOutpoint, err) - } - } - - // Now that a spend has been detected, we've done our - // job, so we'll exit immediately. - return - - // The chainWatcher has been signalled to exit, so we'll do so now. - case <-c.quit: + select { + // We've detected a spend of the channel onchain! Depending on + // the type of spend, we'll act accordingly , so we'll examine + // the spending transaction to determine what we should do. + // + // TODO(Roasbeef): need to be able to ensure this only triggers + // on confirmation, to ensure if multiple txns are broadcast, we + // act on the one that's timestamped + case commitSpend, ok := <-spendNtfn.Spend: + // If the channel was closed, then this means that the + // notifier exited, so we will as well. + if !ok { return } + + // Otherwise, the remote party might have broadcast a + // prior revoked state...!!! + commitTxBroadcast := commitSpend.SpendingTx + + localCommit, remoteCommit, err := c.cfg.chanState.LatestCommitments() + if err != nil { + log.Errorf("Unable to fetch channel state for "+ + "chan_point=%v", c.cfg.chanState.FundingOutpoint) + return + } + + // We'll not retrieve the latest sate of the revocation + // store so we can populate the information within the + // channel state object that we have. + // + // TODO(roasbeef): mutation is bad mkay + _, err = c.cfg.chanState.RemoteRevocationStore() + if err != nil { + log.Errorf("Unable to fetch revocation state for "+ + "chan_point=%v", c.cfg.chanState.FundingOutpoint) + return + } + + // If this is our commitment transaction, then we can + // exit here as we don't have any further processing we + // need to do (we can't cheat ourselves :p). + commitmentHash := localCommit.CommitTx.TxHash() + isOurCommitment := commitSpend.SpenderTxHash.IsEqual( + &commitmentHash, + ) + if isOurCommitment { + if err := c.dispatchLocalForceClose( + commitSpend, *localCommit, + ); err != nil { + log.Errorf("unable to handle local"+ + "close for chan_point=%v: %v", + c.cfg.chanState.FundingOutpoint, err) + } + return + } + + // Next, we'll check to see if this is a cooperative + // channel closure or not. This is characterized by + // having an input sequence number that's finalized. + // This won't happen with regular commitment + // transactions due to the state hint encoding scheme. + if commitTxBroadcast.TxIn[0].Sequence == wire.MaxTxInSequenceNum { + err := c.dispatchCooperativeClose(commitSpend) + if err != nil { + log.Errorf("unable to handle co op close: %v", err) + } + return + } + + log.Warnf("Unprompted commitment broadcast for "+ + "ChannelPoint(%v) ", c.cfg.chanState.FundingOutpoint) + + // Decode the state hint encoded within the commitment + // transaction to determine if this is a revoked state + // or not. + obfuscator := c.stateHintObfuscator + broadcastStateNum := lnwallet.GetStateNumHint( + commitTxBroadcast, obfuscator, + ) + remoteStateNum := remoteCommit.CommitHeight + + switch { + // If state number spending transaction matches the + // current latest state, then they've initiated a + // unilateral close. So we'll trigger the unilateral + // close signal so subscribers can clean up the state + // as necessary. + // + // We'll also handle the case of the remote party + // broadcasting their commitment transaction which is + // one height above ours. This case can arise when we + // initiate a state transition, but the remote party + // has a fail crash _after_ accepting the new state, + // but _before_ sending their signature to us. + case broadcastStateNum >= remoteStateNum: + if err := c.dispatchRemoteForceClose( + commitSpend, *remoteCommit, + ); err != nil { + log.Errorf("unable to handle remote "+ + "close for chan_point=%v: %v", + c.cfg.chanState.FundingOutpoint, err) + } + + // If the state number broadcast is lower than the + // remote node's current un-revoked height, then + // THEY'RE ATTEMPTING TO VIOLATE THE CONTRACT LAID OUT + // WITHIN THE PAYMENT CHANNEL. Therefore we close the + // signal indicating a revoked broadcast to allow + // subscribers to + // swiftly dispatch justice!!! + case broadcastStateNum < remoteStateNum: + if err := c.dispatchContractBreach( + commitSpend, remoteCommit, + broadcastStateNum, + ); err != nil { + log.Errorf("unable to handle channel "+ + "breach for chan_point=%v: %v", + c.cfg.chanState.FundingOutpoint, err) + } + } + + // Now that a spend has been detected, we've done our + // job, so we'll exit immediately. + return + + // The chainWatcher has been signalled to exit, so we'll do so now. + case <-c.quit: + return } } @@ -444,7 +406,7 @@ func (c *chainWatcher) toSelfAmount(tx *wire.MsgTx) btcutil.Amount { } for _, addr := range addrs { - if c.isOurAddr(addr) { + if c.cfg.isOurAddr(addr) { selfAmt += btcutil.Amount(txOut.Value) } } @@ -462,7 +424,7 @@ func (c *chainWatcher) dispatchCooperativeClose(commitSpend *chainntnfs.SpendDet broadcastTx := commitSpend.SpendingTx log.Infof("Cooperative closure for ChannelPoint(%v): %v", - c.chanState.FundingOutpoint, spew.Sdump(broadcastTx)) + c.cfg.chanState.FundingOutpoint, spew.Sdump(broadcastTx)) // If the input *is* final, then we'll check to see which output is // ours. @@ -471,18 +433,18 @@ func (c *chainWatcher) dispatchCooperativeClose(commitSpend *chainntnfs.SpendDet // Once this is known, we'll mark the state as pending close in the // database. closeSummary := &channeldb.ChannelCloseSummary{ - ChanPoint: c.chanState.FundingOutpoint, - ChainHash: c.chanState.ChainHash, + ChanPoint: c.cfg.chanState.FundingOutpoint, + ChainHash: c.cfg.chanState.ChainHash, ClosingTXID: *commitSpend.SpenderTxHash, - RemotePub: c.chanState.IdentityPub, - Capacity: c.chanState.Capacity, + RemotePub: c.cfg.chanState.IdentityPub, + Capacity: c.cfg.chanState.Capacity, CloseHeight: uint32(commitSpend.SpendingHeight), SettledBalance: localAmt, CloseType: channeldb.CooperativeClose, - ShortChanID: c.chanState.ShortChanID, + ShortChanID: c.cfg.chanState.ShortChanID, IsPending: true, } - err := c.chanState.CloseChannel(closeSummary) + err := c.cfg.chanState.CloseChannel(closeSummary) if err != nil && err != channeldb.ErrNoActiveChannels && err != channeldb.ErrNoChanDBExists { return fmt.Errorf("unable to close chan state: %v", err) @@ -491,7 +453,7 @@ func (c *chainWatcher) dispatchCooperativeClose(commitSpend *chainntnfs.SpendDet // Finally, we'll launch a goroutine to mark the channel as fully // closed once the transaction confirmed. go func() { - confNtfn, err := c.notifier.RegisterConfirmationsNtfn( + confNtfn, err := c.cfg.notifier.RegisterConfirmationsNtfn( commitSpend.SpenderTxHash, 1, uint32(commitSpend.SpendingHeight), ) @@ -502,7 +464,7 @@ func (c *chainWatcher) dispatchCooperativeClose(commitSpend *chainntnfs.SpendDet log.Infof("closeObserver: waiting for txid=%v to close "+ "ChannelPoint(%v) on chain", commitSpend.SpenderTxHash, - c.chanState.FundingOutpoint) + c.cfg.chanState.FundingOutpoint) select { case confInfo, ok := <-confNtfn.Confirmed: @@ -512,10 +474,11 @@ func (c *chainWatcher) dispatchCooperativeClose(commitSpend *chainntnfs.SpendDet } log.Infof("closeObserver: ChannelPoint(%v) is fully "+ - "closed, at height: %v", c.chanState.FundingOutpoint, + "closed, at height: %v", + c.cfg.chanState.FundingOutpoint, confInfo.BlockHeight) - err := c.markChanClosed() + err := c.cfg.markChanClosed() if err != nil { log.Errorf("unable to mark chan fully "+ "closed: %v", err) @@ -548,11 +511,11 @@ func (c *chainWatcher) dispatchLocalForceClose( localCommit channeldb.ChannelCommitment) error { log.Infof("Local unilateral close of ChannelPoint(%v) "+ - "detected", c.chanState.FundingOutpoint) + "detected", c.cfg.chanState.FundingOutpoint) forceClose, err := lnwallet.NewLocalForceCloseSummary( - c.chanState, c.signer, c.pCache, commitSpend.SpendingTx, - localCommit, + c.cfg.chanState, c.cfg.signer, c.cfg.pCache, + commitSpend.SpendingTx, localCommit, ) if err != nil { return err @@ -570,7 +533,7 @@ func (c *chainWatcher) dispatchLocalForceClose( Capacity: chanSnapshot.Capacity, CloseType: channeldb.LocalForceClose, IsPending: true, - ShortChanID: c.chanState.ShortChanID, + ShortChanID: c.cfg.chanState.ShortChanID, CloseHeight: uint32(commitSpend.SpendingHeight), } @@ -585,7 +548,7 @@ func (c *chainWatcher) dispatchLocalForceClose( htlcValue := btcutil.Amount(htlc.SweepSignDesc.Output.Value) closeSummary.TimeLockedBalance += htlcValue } - err = c.chanState.CloseChannel(closeSummary) + err = c.cfg.chanState.CloseChannel(closeSummary) if err != nil { return fmt.Errorf("unable to delete channel state: %v", err) } @@ -616,13 +579,13 @@ func (c *chainWatcher) dispatchRemoteForceClose(commitSpend *chainntnfs.SpendDet remoteCommit channeldb.ChannelCommitment) error { log.Infof("Unilateral close of ChannelPoint(%v) "+ - "detected", c.chanState.FundingOutpoint) + "detected", c.cfg.chanState.FundingOutpoint) // First, we'll create a closure summary that contains all the // materials required to let each subscriber sweep the funds in the // channel on-chain. - uniClose, err := lnwallet.NewUnilateralCloseSummary(c.chanState, - c.signer, c.pCache, commitSpend, remoteCommit, + uniClose, err := lnwallet.NewUnilateralCloseSummary(c.cfg.chanState, + c.cfg.signer, c.cfg.pCache, commitSpend, remoteCommit, ) if err != nil { return err @@ -631,7 +594,7 @@ func (c *chainWatcher) dispatchRemoteForceClose(commitSpend *chainntnfs.SpendDet // As we've detected that the channel has been closed, immediately // delete the state from disk, creating a close summary for future // usage by related sub-systems. - err = c.chanState.CloseChannel(&uniClose.ChannelCloseSummary) + err = c.cfg.chanState.CloseChannel(&uniClose.ChannelCloseSummary) if err != nil { return fmt.Errorf("unable to delete channel state: %v", err) } @@ -667,9 +630,9 @@ func (c *chainWatcher) dispatchContractBreach(spendEvent *chainntnfs.SpendDetail log.Warnf("Remote peer has breached the channel contract for "+ "ChannelPoint(%v). Revoked state #%v was broadcast!!!", - c.chanState.FundingOutpoint, broadcastStateNum) + c.cfg.chanState.FundingOutpoint, broadcastStateNum) - if err := c.chanState.MarkBorked(); err != nil { + if err := c.cfg.chanState.MarkBorked(); err != nil { return fmt.Errorf("unable to mark channel as borked: %v", err) } @@ -683,7 +646,7 @@ func (c *chainWatcher) dispatchContractBreach(spendEvent *chainntnfs.SpendDetail // // TODO(roasbeef): move to same package retribution, err := lnwallet.NewBreachRetribution( - c.chanState, broadcastStateNum, commitTxBroadcast, + c.cfg.chanState, broadcastStateNum, commitTxBroadcast, spendHeight, ) if err != nil { @@ -705,6 +668,13 @@ func (c *chainWatcher) dispatchContractBreach(spendEvent *chainntnfs.SpendDetail return spew.Sdump(retribution) })) + // Hand the retribution info over to the breach arbiter. + if err := c.cfg.contractBreach(retribution); err != nil { + log.Errorf("unable to hand breached contract off to "+ + "breachArbiter: %v", err) + return err + } + // With the event processed, we'll now notify all subscribers of the // event. c.Lock() @@ -715,25 +685,6 @@ func (c *chainWatcher) dispatchContractBreach(spendEvent *chainntnfs.SpendDetail c.Unlock() return fmt.Errorf("quitting") } - - // Wait for the breach arbiter to ACK the handoff before - // marking the channel as pending force closed in channeldb, - // but only if the client requested a sync dispatch. - if sub.ProcessACK != nil { - select { - case err := <-sub.ProcessACK: - // Bail if the handoff failed. - if err != nil { - c.Unlock() - return fmt.Errorf("unable to handoff "+ - "retribution info: %v", err) - } - - case <-c.quit: - c.Unlock() - return fmt.Errorf("quitting") - } - } } c.Unlock() @@ -744,22 +695,26 @@ func (c *chainWatcher) dispatchContractBreach(spendEvent *chainntnfs.SpendDetail // TODO(roasbeef): instead mark we got all the monies? settledBalance := remoteCommit.LocalBalance.ToSatoshis() closeSummary := channeldb.ChannelCloseSummary{ - ChanPoint: c.chanState.FundingOutpoint, - ChainHash: c.chanState.ChainHash, + ChanPoint: c.cfg.chanState.FundingOutpoint, + ChainHash: c.cfg.chanState.ChainHash, ClosingTXID: *spendEvent.SpenderTxHash, CloseHeight: spendHeight, - RemotePub: c.chanState.IdentityPub, - Capacity: c.chanState.Capacity, + RemotePub: c.cfg.chanState.IdentityPub, + Capacity: c.cfg.chanState.Capacity, SettledBalance: settledBalance, CloseType: channeldb.BreachClose, IsPending: true, - ShortChanID: c.chanState.ShortChanID, + ShortChanID: c.cfg.chanState.ShortChanID, + } + + if err := c.cfg.chanState.CloseChannel(&closeSummary); err != nil { + return err } log.Infof("Breached channel=%v marked pending-closed", - c.chanState.FundingOutpoint) + c.cfg.chanState.FundingOutpoint) - return c.chanState.CloseChannel(&closeSummary) + return nil } // CooperativeCloseCtx is a transactional object that's used by external @@ -825,7 +780,7 @@ func (c *CooperativeCloseCtx) LogPotentialClose(potentialClose *channeldb.Channe // potential close gets confirmed, we'll cancel out all other launched // goroutines. go func() { - confNtfn, err := c.watcher.notifier.RegisterConfirmationsNtfn( + confNtfn, err := c.watcher.cfg.notifier.RegisterConfirmationsNtfn( &potentialClose.ClosingTXID, 1, uint32(potentialClose.CloseHeight), ) @@ -836,7 +791,7 @@ func (c *CooperativeCloseCtx) LogPotentialClose(potentialClose *channeldb.Channe log.Infof("closeCtx: waiting for txid=%v to close "+ "ChannelPoint(%v) on chain", potentialClose.ClosingTXID, - c.watcher.chanState.FundingOutpoint) + c.watcher.cfg.chanState.FundingOutpoint) select { case confInfo, ok := <-confNtfn.Confirmed: @@ -846,7 +801,7 @@ func (c *CooperativeCloseCtx) LogPotentialClose(potentialClose *channeldb.Channe } log.Infof("closeCtx: ChannelPoint(%v) is fully closed, at "+ - "height: %v", c.watcher.chanState.FundingOutpoint, + "height: %v", c.watcher.cfg.chanState.FundingOutpoint, confInfo.BlockHeight) close(c.watchCancel) @@ -860,14 +815,14 @@ func (c *CooperativeCloseCtx) LogPotentialClose(potentialClose *channeldb.Channe } c.watcher.Unlock() - err := c.watcher.chanState.CloseChannel(potentialClose) + err := c.watcher.cfg.chanState.CloseChannel(potentialClose) if err != nil { log.Warnf("closeCtx: unable to update latest "+ "close for ChannelPoint(%v): %v", - c.watcher.chanState.FundingOutpoint, err) + c.watcher.cfg.chanState.FundingOutpoint, err) } - err = c.watcher.markChanClosed() + err = c.watcher.cfg.markChanClosed() if err != nil { log.Errorf("closeCtx: unable to mark chan fully "+ "closed: %v", err) @@ -877,7 +832,7 @@ func (c *CooperativeCloseCtx) LogPotentialClose(potentialClose *channeldb.Channe case <-c.watchCancel: log.Debugf("Exiting watch for close of txid=%v for "+ "ChannelPoint(%v)", potentialClose.ClosingTXID, - c.watcher.chanState.FundingOutpoint) + c.watcher.cfg.chanState.FundingOutpoint) case <-c.watcher.quit: return @@ -890,11 +845,11 @@ func (c *CooperativeCloseCtx) LogPotentialClose(potentialClose *channeldb.Channe // pending closed in the database, then launch a goroutine to mark the channel // fully closed upon confirmation. func (c *CooperativeCloseCtx) Finalize(preferredClose *channeldb.ChannelCloseSummary) error { - chanPoint := c.watcher.chanState.FundingOutpoint + chanPoint := c.watcher.cfg.chanState.FundingOutpoint log.Infof("Finalizing chan close for ChannelPoint(%v)", chanPoint) - err := c.watcher.chanState.CloseChannel(preferredClose) + err := c.watcher.cfg.chanState.CloseChannel(preferredClose) if err != nil { log.Errorf("closeCtx: unable to close ChannelPoint(%v): %v", chanPoint, err) diff --git a/fundingmanager.go b/fundingmanager.go index 22b5d47c..36546fb0 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -218,12 +218,6 @@ type fundingConfig struct { // so that the channel creation process can be completed. Notifier chainntnfs.ChainNotifier - // ArbiterChan allows the FundingManager to notify the BreachArbiter - // that a new channel has been created that should be observed to - // ensure that the channel counterparty hasn't broadcast an invalid - // commitment transaction. - ArbiterChan chan<- wire.OutPoint - // SignMessage signs an arbitrary method with a given public key. The // actual digest signed is the double sha-256 of the message. In the // case that the private key corresponding to the passed public key @@ -2195,15 +2189,6 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) { return } - // With the channel retrieved, we'll send the breach arbiter the new - // channel so it can watch for attempts to breach the channel's - // contract by the remote party. - select { - case f.cfg.ArbiterChan <- *channel.ChanPoint: - case <-f.quit: - return - } - // The funding locked message contains the next commitment point we'll // need to create the next commitment state for the remote party. So // we'll insert that into the channel now before passing it along to diff --git a/fundingmanager_test.go b/fundingmanager_test.go index 62376c86..4ebf4465 100644 --- a/fundingmanager_test.go +++ b/fundingmanager_test.go @@ -136,7 +136,6 @@ type testNode struct { privKey *btcec.PrivateKey msgChan chan lnwire.Message announceChan chan lnwire.Message - arbiterChan chan wire.OutPoint publTxChan chan *wire.MsgTx fundingMgr *fundingManager peer *peer @@ -200,7 +199,6 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, sentMessages := make(chan lnwire.Message) sentAnnouncements := make(chan lnwire.Message) publTxChan := make(chan *wire.MsgTx, 1) - arbiterChan := make(chan wire.OutPoint) shutdownChan := make(chan struct{}) wc := &mockWalletController{ @@ -306,7 +304,6 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, RequiredRemoteMaxHTLCs: func(chanAmt btcutil.Amount) uint16 { return uint16(lnwallet.MaxHTLCNumber / 2) }, - ArbiterChan: arbiterChan, WatchNewChannel: func(*channeldb.OpenChannel, *lnwire.NetAddress) error { return nil }, @@ -328,7 +325,6 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, privKey: privKey, msgChan: sentMessages, announceChan: sentAnnouncements, - arbiterChan: arbiterChan, publTxChan: publTxChan, fundingMgr: f, peer: p, @@ -386,7 +382,6 @@ func recreateAliceFundingManager(t *testing.T, alice *testNode) { }, FindPeer: oldCfg.FindPeer, TempChanIDSeed: oldCfg.TempChanIDSeed, - ArbiterChan: alice.arbiterChan, FindChannel: oldCfg.FindChannel, PublishTransaction: func(txn *wire.MsgTx) error { publishChan <- txn @@ -857,20 +852,7 @@ func assertErrChannelNotFound(t *testing.T, node *testNode, } func assertHandleFundingLocked(t *testing.T, alice, bob *testNode) { - // They should both send the new channel to the breach arbiter. - select { - case <-alice.arbiterChan: - case <-time.After(time.Second * 15): - t.Fatalf("alice did not send channel to breach arbiter") - } - - select { - case <-bob.arbiterChan: - case <-time.After(time.Second * 15): - t.Fatalf("bob did not send channel to breach arbiter") - } - - // And send the new channel state to their peer. + // They should both send the new channel state to their peer. select { case c := <-alice.peer.newChannels: close(c.done) @@ -1587,12 +1569,6 @@ func TestFundingManagerReceiveFundingLockedTwice(t *testing.T) { // Alice should not send the channel state the second time, as the // second funding locked should just be ignored. select { - case <-alice.arbiterChan: - t.Fatalf("alice sent channel to breach arbiter a second time") - case <-time.After(time.Millisecond * 300): - // Expected - } - select { case <-alice.peer.newChannels: t.Fatalf("alice sent new channel to peer a second time") case <-time.After(time.Millisecond * 300): @@ -1603,12 +1579,6 @@ func TestFundingManagerReceiveFundingLockedTwice(t *testing.T) { // have updated her database at this point. alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) select { - case <-alice.arbiterChan: - t.Fatalf("alice sent channel to breach arbiter a second time") - case <-time.After(time.Millisecond * 300): - // Expected - } - select { case <-alice.peer.newChannels: t.Fatalf("alice sent new channel to peer a second time") case <-time.After(time.Millisecond * 300): diff --git a/lnd.go b/lnd.go index 4e1e2dc1..86228460 100644 --- a/lnd.go +++ b/lnd.go @@ -357,7 +357,6 @@ func lndMain() error { idPrivKey.PubKey()) return <-errChan }, - ArbiterChan: server.breachArbiter.newContracts, SendToPeer: server.SendToPeer, NotifyWhenOnline: server.NotifyWhenOnline, FindPeer: server.FindPeer, diff --git a/peer.go b/peer.go index 5dbe904c..ad9dc946 100644 --- a/peer.go +++ b/peer.go @@ -403,7 +403,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { // necessary to properly route multi-hop payments, and forward // new payments triggered by RPC clients. chainEvents, err := p.server.chainArb.SubscribeChannelEvents( - *chanPoint, false, + *chanPoint, ) if err != nil { lnChan.Stop() @@ -1380,7 +1380,7 @@ out: continue } chainEvents, err := p.server.chainArb.SubscribeChannelEvents( - *chanPoint, false, + *chanPoint, ) if err != nil { peerLog.Errorf("unable to subscribe to chain "+ diff --git a/rpcserver.go b/rpcserver.go index 225f5fa2..2db34de9 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -1043,12 +1043,6 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest, r.server.htlcSwitch.RemoveLink(chanID) } - select { - case r.server.breachArbiter.settledContracts <- *chanPoint: - case <-r.quit: - return fmt.Errorf("server shutting down") - } - // With the necessary indexes cleaned up, we'll now force close // the channel. chainArbitrator := r.server.chainArb diff --git a/server.go b/server.go index c0b6ea4f..9c259f44 100644 --- a/server.go +++ b/server.go @@ -396,6 +396,10 @@ func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl, s.htlcSwitch.CloseLink(chanPoint, closureType, 0) } + // We will use the following channel to reliably hand off contract + // breach events from the ChannelArbitrator to the breachArbiter, + contractBreaches := make(chan *ContractBreachEvent, 1) + s.chainArb = contractcourt.NewChainArbitrator(contractcourt.ChainArbitratorConfig{ ChainHash: *activeNetParams.GenesisHash, // TODO(roasbeef): properly configure @@ -447,6 +451,29 @@ func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl, _, err := cc.wallet.GetPrivKey(addr) return err == nil }, + ContractBreach: func(chanPoint wire.OutPoint, + breachRet *lnwallet.BreachRetribution) error { + event := &ContractBreachEvent{ + ChanPoint: chanPoint, + ProcessACK: make(chan error, 1), + BreachRetribution: breachRet, + } + + // Send the contract breach event to the breachArbiter. + select { + case contractBreaches <- event: + case <-s.quit: + return ErrServerShuttingDown + } + + // Wait for the breachArbiter to ACK the event. + select { + case err := <-event.ProcessACK: + return err + case <-s.quit: + return ErrServerShuttingDown + } + }, }, chanDB) s.breachArbiter = newBreachArbiter(&BreachConfig{ @@ -458,14 +485,9 @@ func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl, }, Notifier: cc.chainNotifier, PublishTransaction: cc.wallet.PublishTransaction, - SubscribeChannelEvents: func(chanPoint wire.OutPoint) (*contractcourt.ChainEventSubscription, error) { - // We'll request a sync dispatch to ensure that the channel - // is only marked as closed *after* we update our internal - // state. - return s.chainArb.SubscribeChannelEvents(chanPoint, true) - }, - Signer: cc.wallet.Cfg.Signer, - Store: newRetributionStore(chanDB), + ContractBreaches: contractBreaches, + Signer: cc.wallet.Cfg.Signer, + Store: newRetributionStore(chanDB), }) // Create the connection manager which will be responsible for