lnd.xprv/contractcourt/chain_watcher.go

463 lines
16 KiB
Go
Raw Normal View History

package contractcourt
import (
"fmt"
"sync"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/roasbeef/btcd/wire"
)
// ChainEventSubscription is a struct that houses a subscription to be notified
// for any on-chain events related to a channel. There are three types of
// possible on-chain events: a cooperative channel closure, a unilateral
// channel closure, and a channel breach. The fourth type: a force close is
// locally initiated, so we don't provide any event stream for said event.
type ChainEventSubscription struct {
// ChanPoint is that channel that chain events will be dispatched fo.
ChanPoint wire.OutPoint
// UnilateralClosure is a channel that will be sent upon in the event that
// the remote party broadcasts their latest version of the commitment
// transaction.
UnilateralClosure chan *lnwallet.UnilateralCloseSummary
// CooperativeClosure is a signal that will be sent upon once a cooperative
// channel closure has been detected.
//
// TODO(roasbeef): or something else
CooperativeClosure chan struct{}
// ContractBreach is a channel that will be sent upon if we detect a
// contract breach. The struct sent across the channel contains all the
// material required to bring the cheating channel peer to justice.
ContractBreach chan *lnwallet.BreachRetribution
// ProcessACK is a channel that'll be used by the chainWatcher to
// synchronize dispatch and processing of the notification with the act
// of updating the state of the channel on disk. This ensures that the
// event can be reliably handed off.
ProcessACK chan struct{}
// Cancel cancels the subscription to the event stream for a particular
// channel. This method should be called once the caller no longer needs to
// be notified of any on-chain events for a particular channel.
Cancel func()
}
// chainWatcher is a system that's assigned to every active channel. The duty
// of this system is to watch the chain for spends of the channels chan point.
// If a spend is detected then with chain watcher will notify all subscribers
// that the channel has been closed, and also give them the materials necessary
// to sweep the funds of the channel on chain eventually.
type chainWatcher struct {
quit chan struct{}
wg sync.WaitGroup
// chanState is a snapshot of the persistent state of the channel that
// we're watching. In the event of an on-chain event, we'll query the
// database to ensure that we act using the most up to date state.
chanState *channeldb.OpenChannel
// stateHintObfuscator is a 48-bit state hint that's used to obfsucate
// the current state number on the commitment transactions.
stateHintObfuscator [lnwallet.StateHintSize]byte
// notifier is a reference to the channel notifier that we'll use to be
// notified of output spends and when transactions are confirmed.
notifier chainntnfs.ChainNotifier
// pCache is a reference to the shared preimage cache. We'll use this
// to see if we can settle any incoming HTLC's during a remote
// commitment close event.
pCache WitnessBeacon
// signer is the main signer instances that will be responsible for
// signing any HTLC and commitment transaction generated by the state
// machine.
signer lnwallet.Signer
// All the fields below are protected by this mutex.
sync.RWMutex
// clientID is an ephemeral counter used to keep track of each
// individual client subscription.
clientID uint64
// clientSubscriptions is a map that keeps track of all the active
// client subscriptions for events related to this channel.
clientSubscriptions map[uint64]*ChainEventSubscription
}
// newChainWatcher returns a new instance of a chainWatcher for a channel given
// the chan point to watch, and also a notifier instance that will allow us to
// detect on chain events.
func newChainWatcher(chanState *channeldb.OpenChannel,
notifier chainntnfs.ChainNotifier, pCache WitnessBeacon,
signer lnwallet.Signer) (*chainWatcher, error) {
// In order to be able to detect the nature of a potential channel
// closure we'll need to reconstruct the state hint bytes used to
// obfuscate the commitment state number encoded in the lock time and
// sequence fields.
var stateHint [lnwallet.StateHintSize]byte
if chanState.IsInitiator {
stateHint = lnwallet.DeriveStateHintObfuscator(
chanState.LocalChanCfg.PaymentBasePoint,
chanState.RemoteChanCfg.PaymentBasePoint,
)
} else {
stateHint = lnwallet.DeriveStateHintObfuscator(
chanState.RemoteChanCfg.PaymentBasePoint,
chanState.LocalChanCfg.PaymentBasePoint,
)
}
return &chainWatcher{
chanState: chanState,
stateHintObfuscator: stateHint,
notifier: notifier,
pCache: pCache,
signer: signer,
quit: make(chan struct{}),
clientSubscriptions: make(map[uint64]*ChainEventSubscription),
}, nil
}
// Start starts all goroutines that the chainWatcher needs to perform its
// duties.
func (c *chainWatcher) Start() error {
log.Debugf("Starting chain watcher for ChannelPoint(%v)",
c.chanState.FundingOutpoint)
// First, we'll register for a notification to be dispatched if the
// funding output is spent.
fundingOut := &c.chanState.FundingOutpoint
// As a height hint, we'll try to use the opening height, but if the
// channel isn't yet open, then we'll use the height it was broadcast
// at.
heightHint := c.chanState.ShortChanID.BlockHeight
if heightHint == 0 {
heightHint = c.chanState.FundingBroadcastHeight
}
spendNtfn, err := c.notifier.RegisterSpendNtfn(
fundingOut, heightHint,
)
if err != nil {
return err
}
// With the spend notification obtained, we'll now dispatch the
// closeObserver which will properly react to any changes.
c.wg.Add(1)
go c.closeObserver(spendNtfn)
return nil
}
// Stop signals the close observer to gracefully exit.
func (c *chainWatcher) Stop() error {
close(c.quit)
c.wg.Wait()
return nil
}
// SubscribeChannelEvents returns a n active subscription to the set of channel
// events for the channel watched by this chain watcher. Once clients no longer
// require the subscription, they should call the Cancel() method to allow the
// watcher to regain those committed resources.
func (c *chainWatcher) SubscribeChannelEvents() *ChainEventSubscription {
c.Lock()
defer c.Unlock()
clientID := c.clientID
c.clientID++
log.Debugf("New ChainEventSubscription(id=%v) for ChannelPoint(%v)",
clientID, c.chanState.FundingOutpoint)
sub := &ChainEventSubscription{
ChanPoint: c.chanState.FundingOutpoint,
UnilateralClosure: make(chan *lnwallet.UnilateralCloseSummary, 1),
CooperativeClosure: make(chan struct{}, 1),
ContractBreach: make(chan *lnwallet.BreachRetribution, 1),
Cancel: func() {
c.Lock()
delete(c.clientSubscriptions, clientID)
c.Unlock()
return
},
}
c.clientSubscriptions[clientID] = sub
return sub
}
// closeObserver is a dedicated goroutine that will watch for any closes of the
// channel that it's watching on chain. In the event of an on-chain event, the
// close observer will assembled the proper materials required to claim the
// funds of the channel on-chain (if required), then dispatch these as
// notifications to all subscribers.
func (c *chainWatcher) closeObserver(spendNtfn *chainntnfs.SpendEvent) {
defer c.wg.Done()
log.Infof("Close observer for ChannelPoint(%v) active",
c.chanState.FundingOutpoint)
for {
select {
// We've detected a spend of the channel onchain! Depending on
// the type of spend, we'll act accordingly , so we'll examine
// the spending transaction to determine what we should do.
case commitSpend, ok := <-spendNtfn.Spend:
// If the channel was closed, then this means that the
// notifier exited, so we will as well.
if !ok {
return
}
// Otherwise, the remote party might have broadcast a
// prior revoked state...!!!
commitTxBroadcast := commitSpend.SpendingTx
localCommit, remoteCommit, err := c.chanState.LatestCommitments()
if err != nil {
log.Errorf("Unable to fetch channel state for "+
"chan_point=%v", c.chanState.FundingOutpoint)
return
}
// We'll not retrieve the latest sate of the revocation
// store so we can populate the information within the
// channel state object that we have.
//
// TODO(roasbeef): mutation is bad mkay
_, err = c.chanState.RemoteRevocationStore()
if err != nil {
log.Errorf("Unable to fetch revocation state for "+
"chan_point=%v", c.chanState.FundingOutpoint)
return
}
// If this is our commitment transaction, then we can
// exit here as we don't have any further processing we
// need to do (we can't cheat ourselves :p).
commitmentHash := localCommit.CommitTx.TxHash()
isOurCommitment := commitSpend.SpenderTxHash.IsEqual(
&commitmentHash,
)
if isOurCommitment {
return
}
// Next, we'll check to see if this is a cooperative
// channel closure or not. This is characterized by
//
// TODO(roasbeef): check to see if txid amongst those
// that we know are co-op channel closes
log.Warnf("Unprompted commitment broadcast for "+
"ChannelPoint(%v) ", c.chanState.FundingOutpoint)
// Decode the state hint encoded within the commitment
// transaction to determine if this is a revoked state
// or not.
obfuscator := c.stateHintObfuscator
broadcastStateNum := lnwallet.GetStateNumHint(
commitTxBroadcast, obfuscator,
)
remoteStateNum := remoteCommit.CommitHeight
switch {
// If state number spending transaction matches the
// current latest state, then they've initiated a
// unilateral close. So we'll trigger the unilateral
// close signal so subscribers can clean up the state
// as necessary.
//
// We'll also handle the case of the remote party
// broadcasting their commitment transaction which is
// one height above ours. This case can arise when we
// initiate a state transition, but the remote party
// has a fail crash _after_ accepting the new state,
// but _before_ sending their signature to us.
case broadcastStateNum >= remoteStateNum:
if err := c.dispatchRemoteClose(
commitSpend, *remoteCommit,
); err != nil {
log.Errorf("unable to handle remote "+
"close for chan_point=%v",
c.chanState.FundingOutpoint, err)
}
// If the state number broadcast is lower than the
// remote node's current un-revoked height, then
// THEY'RE ATTEMPTING TO VIOLATE THE CONTRACT LAID OUT
// WITHIN THE PAYMENT CHANNEL. Therefore we close the
// signal indicating a revoked broadcast to allow
// subscribers to
// swiftly dispatch justice!!!
case broadcastStateNum < remoteStateNum:
if err := c.dispatchContractBreach(
commitSpend, remoteCommit,
); err != nil {
log.Errorf("unable to handle channel "+
"breach for chan_point=%v: %v",
c.chanState.FundingOutpoint, err)
}
}
// Now that a spend has been detected, we've done our
// job, so we'll exit immediately.
return
// The chainWatcher has been signalled to exit, so we'll do so now.
case <-c.quit:
}
}
}
// dispatchRemoteClose processes a detected unilateral channel closure by the
// remote party. This function will prepare a UnilateralCloseSummary which will
// then be sent to any subscribers allowing them to resolve all our funds in
// the channel on chain. Once this close summary is prepared, all registered
// subscribers will receive a notification of this event.
func (c *chainWatcher) dispatchRemoteClose(commitSpend *chainntnfs.SpendDetail,
remoteCommit channeldb.ChannelCommitment) error {
log.Infof("Unilateral close of ChannelPoint(%v) "+
"detected", c.chanState.FundingOutpoint)
// First, we'll create a closure summary that contains all the
// materials required to let each subscriber sweep the funds in the
// channel on-chain.
uniClose, err := lnwallet.NewUnilateralCloseSummary(c.chanState,
c.signer, c.pCache, commitSpend, remoteCommit,
)
if err != nil {
return err
}
// As we've detected that the channel has been closed, immediately
// delete the state from disk, creating a close summary for future
// usage by related sub-systems.
err = c.chanState.CloseChannel(&uniClose.ChannelCloseSummary)
if err != nil {
return fmt.Errorf("unable to delete channel state: %v", err)
}
// With the event processed, we'll now notify all subscribers of the
// event.
c.Lock()
for _, sub := range c.clientSubscriptions {
// TODO(roasbeef): send msg before writing to disk
// * need to ensure proper fault tolerance in all cases
// * get ACK from the consumer of the ntfn before writing to disk?
// * no harm in repeated ntfns: at least once semantics
select {
case sub.UnilateralClosure <- uniClose:
case <-c.quit:
return fmt.Errorf("exiting")
}
}
c.Unlock()
return nil
}
// dispatchContractBreach processes a detected contract breached by the remote
// party. This method is to be called once we detect that the remote party has
// broadcast a prior revoked commitment state. This method well prepare all the
// materials required to bring the cheater to justice, then notify all
// registered subscribers of this event.
func (c *chainWatcher) dispatchContractBreach(spendEvent *chainntnfs.SpendDetail,
remoteCommit *channeldb.ChannelCommitment) error {
log.Warnf("Remote peer has breached the channel contract for "+
"ChannelPoint(%v). Revoked state #%v was broadcast!!!",
c.chanState.FundingOutpoint, remoteCommit.CommitHeight)
if err := c.chanState.MarkBorked(); err != nil {
return fmt.Errorf("unable to mark channel as borked: %v", err)
}
var (
broadcastStateNum = remoteCommit.CommitHeight
commitTxBroadcast = spendEvent.SpendingTx
spendHeight = uint32(spendEvent.SpendingHeight)
)
// Create a new reach retribution struct which contains all the data
// needed to swiftly bring the cheating peer to justice.
//
// TODO(roasbeef): move to same package
retribution, err := lnwallet.NewBreachRetribution(
c.chanState, broadcastStateNum, commitTxBroadcast,
spendHeight,
)
if err != nil {
return fmt.Errorf("unable to create breach retribution: %v", err)
}
log.Debugf("Punishment breach retribution created: %v",
spew.Sdump(retribution))
// With the event processed, we'll now notify all subscribers of the
// event.
c.Lock()
for _, sub := range c.clientSubscriptions {
select {
case sub.ContractBreach <- retribution:
case <-c.quit:
return fmt.Errorf("quitting")
}
// Wait for the breach arbiter to ACK the handoff before
// marking the channel as pending force closed in channeldb.
select {
case <-sub.ProcessACK:
// Bail if the handoff failed.
if err != nil {
return fmt.Errorf("unable to handoff "+
"retribution info: %v", err)
}
case <-c.quit:
return fmt.Errorf("quitting")
}
}
c.Unlock()
// At this point, we've successfully received an ack for the breach
// close. We now construct and persist the close summary, marking the
// channel as pending force closed.
//
// TODO(roasbeef): instead mark we got all the monies?
settledBalance := remoteCommit.LocalBalance.ToSatoshis()
closeSummary := channeldb.ChannelCloseSummary{
ChanPoint: c.chanState.FundingOutpoint,
ChainHash: c.chanState.ChainHash,
ClosingTXID: *spendEvent.SpenderTxHash,
CloseHeight: spendHeight,
RemotePub: c.chanState.IdentityPub,
Capacity: c.chanState.Capacity,
SettledBalance: settledBalance,
CloseType: channeldb.BreachClose,
IsPending: true,
ShortChanID: c.chanState.ShortChanID,
}
log.Infof("Breached channel=%v marked pending-closed",
c.chanState.FundingOutpoint)
return c.chanState.CloseChannel(&closeSummary)
}