diff --git a/contractcourt/chain_watcher.go b/contractcourt/chain_watcher.go new file mode 100644 index 00000000..d6e6f363 --- /dev/null +++ b/contractcourt/chain_watcher.go @@ -0,0 +1,462 @@ +package contractcourt + +import ( + "fmt" + "sync" + + "github.com/davecgh/go-spew/spew" + "github.com/lightningnetwork/lnd/chainntnfs" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnwallet" + "github.com/roasbeef/btcd/wire" +) + +// ChainEventSubscription is a struct that houses a subscription to be notified +// for any on-chain events related to a channel. There are three types of +// possible on-chain events: a cooperative channel closure, a unilateral +// channel closure, and a channel breach. The fourth type: a force close is +// locally initiated, so we don't provide any event stream for said event. +type ChainEventSubscription struct { + // ChanPoint is that channel that chain events will be dispatched fo. + ChanPoint wire.OutPoint + + // UnilateralClosure is a channel that will be sent upon in the event that + // the remote party broadcasts their latest version of the commitment + // transaction. + UnilateralClosure chan *lnwallet.UnilateralCloseSummary + + // CooperativeClosure is a signal that will be sent upon once a cooperative + // channel closure has been detected. + // + // TODO(roasbeef): or something else + CooperativeClosure chan struct{} + + // ContractBreach is a channel that will be sent upon if we detect a + // contract breach. The struct sent across the channel contains all the + // material required to bring the cheating channel peer to justice. + ContractBreach chan *lnwallet.BreachRetribution + + // ProcessACK is a channel that'll be used by the chainWatcher to + // synchronize dispatch and processing of the notification with the act + // of updating the state of the channel on disk. This ensures that the + // event can be reliably handed off. + ProcessACK chan struct{} + + // Cancel cancels the subscription to the event stream for a particular + // channel. This method should be called once the caller no longer needs to + // be notified of any on-chain events for a particular channel. + Cancel func() +} + +// chainWatcher is a system that's assigned to every active channel. The duty +// of this system is to watch the chain for spends of the channels chan point. +// If a spend is detected then with chain watcher will notify all subscribers +// that the channel has been closed, and also give them the materials necessary +// to sweep the funds of the channel on chain eventually. +type chainWatcher struct { + quit chan struct{} + wg sync.WaitGroup + + // chanState is a snapshot of the persistent state of the channel that + // we're watching. In the event of an on-chain event, we'll query the + // database to ensure that we act using the most up to date state. + chanState *channeldb.OpenChannel + + // stateHintObfuscator is a 48-bit state hint that's used to obfsucate + // the current state number on the commitment transactions. + stateHintObfuscator [lnwallet.StateHintSize]byte + + // notifier is a reference to the channel notifier that we'll use to be + // notified of output spends and when transactions are confirmed. + notifier chainntnfs.ChainNotifier + + // pCache is a reference to the shared preimage cache. We'll use this + // to see if we can settle any incoming HTLC's during a remote + // commitment close event. + pCache WitnessBeacon + + // signer is the main signer instances that will be responsible for + // signing any HTLC and commitment transaction generated by the state + // machine. + signer lnwallet.Signer + + // All the fields below are protected by this mutex. + sync.RWMutex + + // clientID is an ephemeral counter used to keep track of each + // individual client subscription. + clientID uint64 + + // clientSubscriptions is a map that keeps track of all the active + // client subscriptions for events related to this channel. + clientSubscriptions map[uint64]*ChainEventSubscription +} + +// newChainWatcher returns a new instance of a chainWatcher for a channel given +// the chan point to watch, and also a notifier instance that will allow us to +// detect on chain events. +func newChainWatcher(chanState *channeldb.OpenChannel, + notifier chainntnfs.ChainNotifier, pCache WitnessBeacon, + signer lnwallet.Signer) (*chainWatcher, error) { + + // In order to be able to detect the nature of a potential channel + // closure we'll need to reconstruct the state hint bytes used to + // obfuscate the commitment state number encoded in the lock time and + // sequence fields. + var stateHint [lnwallet.StateHintSize]byte + if chanState.IsInitiator { + stateHint = lnwallet.DeriveStateHintObfuscator( + chanState.LocalChanCfg.PaymentBasePoint, + chanState.RemoteChanCfg.PaymentBasePoint, + ) + } else { + stateHint = lnwallet.DeriveStateHintObfuscator( + chanState.RemoteChanCfg.PaymentBasePoint, + chanState.LocalChanCfg.PaymentBasePoint, + ) + } + + return &chainWatcher{ + chanState: chanState, + stateHintObfuscator: stateHint, + notifier: notifier, + pCache: pCache, + signer: signer, + quit: make(chan struct{}), + clientSubscriptions: make(map[uint64]*ChainEventSubscription), + }, nil +} + +// Start starts all goroutines that the chainWatcher needs to perform its +// duties. +func (c *chainWatcher) Start() error { + log.Debugf("Starting chain watcher for ChannelPoint(%v)", + c.chanState.FundingOutpoint) + + // First, we'll register for a notification to be dispatched if the + // funding output is spent. + fundingOut := &c.chanState.FundingOutpoint + + // As a height hint, we'll try to use the opening height, but if the + // channel isn't yet open, then we'll use the height it was broadcast + // at. + heightHint := c.chanState.ShortChanID.BlockHeight + if heightHint == 0 { + heightHint = c.chanState.FundingBroadcastHeight + } + + spendNtfn, err := c.notifier.RegisterSpendNtfn( + fundingOut, heightHint, + ) + if err != nil { + return err + } + + // With the spend notification obtained, we'll now dispatch the + // closeObserver which will properly react to any changes. + c.wg.Add(1) + go c.closeObserver(spendNtfn) + + return nil +} + +// Stop signals the close observer to gracefully exit. +func (c *chainWatcher) Stop() error { + close(c.quit) + + c.wg.Wait() + + return nil +} + +// SubscribeChannelEvents returns a n active subscription to the set of channel +// events for the channel watched by this chain watcher. Once clients no longer +// require the subscription, they should call the Cancel() method to allow the +// watcher to regain those committed resources. +func (c *chainWatcher) SubscribeChannelEvents() *ChainEventSubscription { + c.Lock() + defer c.Unlock() + + clientID := c.clientID + c.clientID++ + + log.Debugf("New ChainEventSubscription(id=%v) for ChannelPoint(%v)", + clientID, c.chanState.FundingOutpoint) + + sub := &ChainEventSubscription{ + ChanPoint: c.chanState.FundingOutpoint, + UnilateralClosure: make(chan *lnwallet.UnilateralCloseSummary, 1), + CooperativeClosure: make(chan struct{}, 1), + ContractBreach: make(chan *lnwallet.BreachRetribution, 1), + Cancel: func() { + c.Lock() + delete(c.clientSubscriptions, clientID) + c.Unlock() + return + }, + } + + c.clientSubscriptions[clientID] = sub + + return sub +} + +// closeObserver is a dedicated goroutine that will watch for any closes of the +// channel that it's watching on chain. In the event of an on-chain event, the +// close observer will assembled the proper materials required to claim the +// funds of the channel on-chain (if required), then dispatch these as +// notifications to all subscribers. +func (c *chainWatcher) closeObserver(spendNtfn *chainntnfs.SpendEvent) { + defer c.wg.Done() + + log.Infof("Close observer for ChannelPoint(%v) active", + c.chanState.FundingOutpoint) + + for { + select { + // We've detected a spend of the channel onchain! Depending on + // the type of spend, we'll act accordingly , so we'll examine + // the spending transaction to determine what we should do. + case commitSpend, ok := <-spendNtfn.Spend: + // If the channel was closed, then this means that the + // notifier exited, so we will as well. + if !ok { + return + } + + // Otherwise, the remote party might have broadcast a + // prior revoked state...!!! + commitTxBroadcast := commitSpend.SpendingTx + + localCommit, remoteCommit, err := c.chanState.LatestCommitments() + if err != nil { + log.Errorf("Unable to fetch channel state for "+ + "chan_point=%v", c.chanState.FundingOutpoint) + return + } + + // We'll not retrieve the latest sate of the revocation + // store so we can populate the information within the + // channel state object that we have. + // + // TODO(roasbeef): mutation is bad mkay + _, err = c.chanState.RemoteRevocationStore() + if err != nil { + log.Errorf("Unable to fetch revocation state for "+ + "chan_point=%v", c.chanState.FundingOutpoint) + return + } + + // If this is our commitment transaction, then we can + // exit here as we don't have any further processing we + // need to do (we can't cheat ourselves :p). + commitmentHash := localCommit.CommitTx.TxHash() + isOurCommitment := commitSpend.SpenderTxHash.IsEqual( + &commitmentHash, + ) + if isOurCommitment { + return + } + + // Next, we'll check to see if this is a cooperative + // channel closure or not. This is characterized by + // + // TODO(roasbeef): check to see if txid amongst those + // that we know are co-op channel closes + + log.Warnf("Unprompted commitment broadcast for "+ + "ChannelPoint(%v) ", c.chanState.FundingOutpoint) + + // Decode the state hint encoded within the commitment + // transaction to determine if this is a revoked state + // or not. + obfuscator := c.stateHintObfuscator + broadcastStateNum := lnwallet.GetStateNumHint( + commitTxBroadcast, obfuscator, + ) + remoteStateNum := remoteCommit.CommitHeight + + switch { + // If state number spending transaction matches the + // current latest state, then they've initiated a + // unilateral close. So we'll trigger the unilateral + // close signal so subscribers can clean up the state + // as necessary. + // + // We'll also handle the case of the remote party + // broadcasting their commitment transaction which is + // one height above ours. This case can arise when we + // initiate a state transition, but the remote party + // has a fail crash _after_ accepting the new state, + // but _before_ sending their signature to us. + case broadcastStateNum >= remoteStateNum: + if err := c.dispatchRemoteClose( + commitSpend, *remoteCommit, + ); err != nil { + log.Errorf("unable to handle remote "+ + "close for chan_point=%v", + c.chanState.FundingOutpoint, err) + } + + // If the state number broadcast is lower than the + // remote node's current un-revoked height, then + // THEY'RE ATTEMPTING TO VIOLATE THE CONTRACT LAID OUT + // WITHIN THE PAYMENT CHANNEL. Therefore we close the + // signal indicating a revoked broadcast to allow + // subscribers to + // swiftly dispatch justice!!! + case broadcastStateNum < remoteStateNum: + if err := c.dispatchContractBreach( + commitSpend, remoteCommit, + ); err != nil { + log.Errorf("unable to handle channel "+ + "breach for chan_point=%v: %v", + c.chanState.FundingOutpoint, err) + } + } + + // Now that a spend has been detected, we've done our + // job, so we'll exit immediately. + return + + // The chainWatcher has been signalled to exit, so we'll do so now. + case <-c.quit: + } + } +} + +// dispatchRemoteClose processes a detected unilateral channel closure by the +// remote party. This function will prepare a UnilateralCloseSummary which will +// then be sent to any subscribers allowing them to resolve all our funds in +// the channel on chain. Once this close summary is prepared, all registered +// subscribers will receive a notification of this event. +func (c *chainWatcher) dispatchRemoteClose(commitSpend *chainntnfs.SpendDetail, + remoteCommit channeldb.ChannelCommitment) error { + + log.Infof("Unilateral close of ChannelPoint(%v) "+ + "detected", c.chanState.FundingOutpoint) + + // First, we'll create a closure summary that contains all the + // materials required to let each subscriber sweep the funds in the + // channel on-chain. + uniClose, err := lnwallet.NewUnilateralCloseSummary(c.chanState, + c.signer, c.pCache, commitSpend, remoteCommit, + ) + if err != nil { + return err + } + + // As we've detected that the channel has been closed, immediately + // delete the state from disk, creating a close summary for future + // usage by related sub-systems. + err = c.chanState.CloseChannel(&uniClose.ChannelCloseSummary) + if err != nil { + return fmt.Errorf("unable to delete channel state: %v", err) + } + + // With the event processed, we'll now notify all subscribers of the + // event. + c.Lock() + for _, sub := range c.clientSubscriptions { + // TODO(roasbeef): send msg before writing to disk + // * need to ensure proper fault tolerance in all cases + // * get ACK from the consumer of the ntfn before writing to disk? + // * no harm in repeated ntfns: at least once semantics + select { + case sub.UnilateralClosure <- uniClose: + case <-c.quit: + return fmt.Errorf("exiting") + } + } + c.Unlock() + + return nil +} + +// dispatchContractBreach processes a detected contract breached by the remote +// party. This method is to be called once we detect that the remote party has +// broadcast a prior revoked commitment state. This method well prepare all the +// materials required to bring the cheater to justice, then notify all +// registered subscribers of this event. +func (c *chainWatcher) dispatchContractBreach(spendEvent *chainntnfs.SpendDetail, + remoteCommit *channeldb.ChannelCommitment) error { + + log.Warnf("Remote peer has breached the channel contract for "+ + "ChannelPoint(%v). Revoked state #%v was broadcast!!!", + c.chanState.FundingOutpoint, remoteCommit.CommitHeight) + + if err := c.chanState.MarkBorked(); err != nil { + return fmt.Errorf("unable to mark channel as borked: %v", err) + } + + var ( + broadcastStateNum = remoteCommit.CommitHeight + commitTxBroadcast = spendEvent.SpendingTx + spendHeight = uint32(spendEvent.SpendingHeight) + ) + + // Create a new reach retribution struct which contains all the data + // needed to swiftly bring the cheating peer to justice. + // + // TODO(roasbeef): move to same package + retribution, err := lnwallet.NewBreachRetribution( + c.chanState, broadcastStateNum, commitTxBroadcast, + spendHeight, + ) + if err != nil { + return fmt.Errorf("unable to create breach retribution: %v", err) + } + + log.Debugf("Punishment breach retribution created: %v", + spew.Sdump(retribution)) + + // With the event processed, we'll now notify all subscribers of the + // event. + c.Lock() + for _, sub := range c.clientSubscriptions { + select { + case sub.ContractBreach <- retribution: + case <-c.quit: + return fmt.Errorf("quitting") + } + + // Wait for the breach arbiter to ACK the handoff before + // marking the channel as pending force closed in channeldb. + select { + case <-sub.ProcessACK: + // Bail if the handoff failed. + if err != nil { + return fmt.Errorf("unable to handoff "+ + "retribution info: %v", err) + } + + case <-c.quit: + return fmt.Errorf("quitting") + } + } + c.Unlock() + + // At this point, we've successfully received an ack for the breach + // close. We now construct and persist the close summary, marking the + // channel as pending force closed. + // + // TODO(roasbeef): instead mark we got all the monies? + settledBalance := remoteCommit.LocalBalance.ToSatoshis() + closeSummary := channeldb.ChannelCloseSummary{ + ChanPoint: c.chanState.FundingOutpoint, + ChainHash: c.chanState.ChainHash, + ClosingTXID: *spendEvent.SpenderTxHash, + CloseHeight: spendHeight, + RemotePub: c.chanState.IdentityPub, + Capacity: c.chanState.Capacity, + SettledBalance: settledBalance, + CloseType: channeldb.BreachClose, + IsPending: true, + ShortChanID: c.chanState.ShortChanID, + } + + log.Infof("Breached channel=%v marked pending-closed", + c.chanState.FundingOutpoint) + + return c.chanState.CloseChannel(&closeSummary) +}