lnd.xprv/contractcourt/chain_arbitrator.go
Johan T. Halseth ac49031396
chainwatcher: only continue breach handling after successfully marked
closed

This commit makes the handoff procedure between the breachabiter and
chainwatcher use a function closure to mark the channel pending closed
in the DB. Doing it this way we know that the channel has been markd
pending closed in the DB when ProcessACK returns.

The reason we do this is that we really need a "two-way ACK" to have the
breacharbiter know it can go on with the breach handling. Earlier it
would just send the ACK on the channel and continue. This lead to a race
where breach handling could finish before the chain watcher had marked
the channel pending closed in the database, which again lead to the
breacharbiter failing to mark the channel fully closed.

We saw this causing flakes during itests.
2021-05-06 12:37:11 +02:00

1146 lines
36 KiB
Go

package contractcourt
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/btcsuite/btcwallet/walletdb"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/kvdb"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/labels"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
"github.com/lightningnetwork/lnd/lnwire"
)
// ErrChainArbExiting signals that the chain arbitrator is shutting down.
var ErrChainArbExiting = errors.New("ChainArbitrator exiting")
// ResolutionMsg is a message sent by resolvers to outside sub-systems once an
// outgoing contract has been fully resolved. For multi-hop contracts, if we
// resolve the outgoing contract, we'll also need to ensure that the incoming
// contract is resolved as well. We package the items required to resolve the
// incoming contracts within this message.
type ResolutionMsg struct {
// SourceChan identifies the channel that this message is being sent
// from. This is the channel's short channel ID.
SourceChan lnwire.ShortChannelID
// HtlcIndex is the index of the contract within the original
// commitment trace.
HtlcIndex uint64
// Failure will be non-nil if the incoming contract should be canceled
// all together. This can happen if the outgoing contract was dust, if
// if the outgoing HTLC timed out.
Failure lnwire.FailureMessage
// PreImage will be non-nil if the incoming contract can successfully
// be redeemed. This can happen if we learn of the preimage from the
// outgoing HTLC on-chain.
PreImage *[32]byte
}
// ChainArbitratorConfig is a configuration struct that contains all the
// function closures and interface that required to arbitrate on-chain
// contracts for a particular chain.
type ChainArbitratorConfig struct {
// ChainHash is the chain that this arbitrator is to operate within.
ChainHash chainhash.Hash
// IncomingBroadcastDelta is the delta that we'll use to decide when to
// broadcast our commitment transaction if we have incoming htlcs. This
// value should be set based on our current fee estimation of the
// commitment transaction. We use this to determine when we should
// broadcast instead of just the HTLC timeout, as we want to ensure
// that the commitment transaction is already confirmed, by the time the
// HTLC expires. Otherwise we may end up not settling the htlc on-chain
// because the other party managed to time it out.
IncomingBroadcastDelta uint32
// OutgoingBroadcastDelta is the delta that we'll use to decide when to
// broadcast our commitment transaction if there are active outgoing
// htlcs. This value can be lower than the incoming broadcast delta.
OutgoingBroadcastDelta uint32
// NewSweepAddr is a function that returns a new address under control
// by the wallet. We'll use this to sweep any no-delay outputs as a
// result of unilateral channel closes.
//
// NOTE: This SHOULD return a p2wkh script.
NewSweepAddr func() ([]byte, error)
// PublishTx reliably broadcasts a transaction to the network. Once
// this function exits without an error, then they transaction MUST
// continually be rebroadcast if needed.
PublishTx func(*wire.MsgTx, string) error
// DeliverResolutionMsg is a function that will append an outgoing
// message to the "out box" for a ChannelLink. This is used to cancel
// backwards any HTLC's that are either dust, we're timing out, or
// settling on-chain to the incoming link.
DeliverResolutionMsg func(...ResolutionMsg) error
// MarkLinkInactive is a function closure that the ChainArbitrator will
// use to mark that active HTLC's shouldn't be attempted to be routed
// over a particular channel. This function will be called in that a
// ChannelArbitrator decides that it needs to go to chain in order to
// resolve contracts.
//
// TODO(roasbeef): rename, routing based
MarkLinkInactive func(wire.OutPoint) error
// ContractBreach is a function closure that the ChainArbitrator will
// use to notify the breachArbiter about a contract breach. A callback
// should be passed that when called will mark the channel pending
// close in the databae. It should only return a non-nil error when the
// breachArbiter has preserved the necessary breach info for this
// channel point, and the callback has succeeded, meaning it is safe to
// stop watching the channel.
ContractBreach func(wire.OutPoint, *lnwallet.BreachRetribution,
func() error) error
// IsOurAddress is a function that returns true if the passed address
// is known to the underlying wallet. Otherwise, false should be
// returned.
IsOurAddress func(btcutil.Address) bool
// IncubateOutput sends either an incoming HTLC, an outgoing HTLC, or
// both to the utxo nursery. Once this function returns, the nursery
// should have safely persisted the outputs to disk, and should start
// the process of incubation. This is used when a resolver wishes to
// pass off the output to the nursery as we're only waiting on an
// absolute/relative item block.
IncubateOutputs func(wire.OutPoint, *lnwallet.OutgoingHtlcResolution,
*lnwallet.IncomingHtlcResolution, uint32) error
// PreimageDB is a global store of all known pre-images. We'll use this
// to decide if we should broadcast a commitment transaction to claim
// an HTLC on-chain.
PreimageDB WitnessBeacon
// Notifier is an instance of a chain notifier we'll use to watch for
// certain on-chain events.
Notifier chainntnfs.ChainNotifier
// Signer is a signer backed by the active lnd node. This should be
// capable of producing a signature as specified by a valid
// SignDescriptor.
Signer input.Signer
// FeeEstimator will be used to return fee estimates.
FeeEstimator chainfee.Estimator
// ChainIO allows us to query the state of the current main chain.
ChainIO lnwallet.BlockChainIO
// DisableChannel disables a channel, resulting in it not being able to
// forward payments.
DisableChannel func(wire.OutPoint) error
// Sweeper allows resolvers to sweep their final outputs.
Sweeper UtxoSweeper
// Registry is the invoice database that is used by resolvers to lookup
// preimages and settle invoices.
Registry Registry
// NotifyClosedChannel is a function closure that the ChainArbitrator
// will use to notify the ChannelNotifier about a newly closed channel.
NotifyClosedChannel func(wire.OutPoint)
// OnionProcessor is used to decode onion payloads for on-chain
// resolution.
OnionProcessor OnionProcessor
// PaymentsExpirationGracePeriod indicates a time window we let the
// other node to cancel an outgoing htlc that our node has initiated and
// has timed out.
PaymentsExpirationGracePeriod time.Duration
// IsForwardedHTLC checks for a given htlc, identified by channel id and
// htlcIndex, if it is a forwarded one.
IsForwardedHTLC func(chanID lnwire.ShortChannelID, htlcIndex uint64) bool
// Clock is the clock implementation that ChannelArbitrator uses.
// It is useful for testing.
Clock clock.Clock
}
// ChainArbitrator is a sub-system that oversees the on-chain resolution of all
// active, and channel that are in the "pending close" state. Within the
// contractcourt package, the ChainArbitrator manages a set of active
// ContractArbitrators. Each ContractArbitrators is responsible for watching
// the chain for any activity that affects the state of the channel, and also
// for monitoring each contract in order to determine if any on-chain activity is
// required. Outside sub-systems interact with the ChainArbitrator in order to
// forcibly exit a contract, update the set of live signals for each contract,
// and to receive reports on the state of contract resolution.
type ChainArbitrator struct {
started int32 // To be used atomically.
stopped int32 // To be used atomically.
sync.Mutex
// activeChannels is a map of all the active contracts that are still
// open, and not fully resolved.
activeChannels map[wire.OutPoint]*ChannelArbitrator
// activeWatchers is a map of all the active chainWatchers for channels
// that are still considered open.
activeWatchers map[wire.OutPoint]*chainWatcher
// cfg is the config struct for the arbitrator that contains all
// methods and interface it needs to operate.
cfg ChainArbitratorConfig
// chanSource will be used by the ChainArbitrator to fetch all the
// active channels that it must still watch over.
chanSource *channeldb.DB
quit chan struct{}
wg sync.WaitGroup
}
// NewChainArbitrator returns a new instance of the ChainArbitrator using the
// passed config struct, and backing persistent database.
func NewChainArbitrator(cfg ChainArbitratorConfig,
db *channeldb.DB) *ChainArbitrator {
return &ChainArbitrator{
cfg: cfg,
activeChannels: make(map[wire.OutPoint]*ChannelArbitrator),
activeWatchers: make(map[wire.OutPoint]*chainWatcher),
chanSource: db,
quit: make(chan struct{}),
}
}
// arbChannel is a wrapper around an open channel that channel arbitrators
// interact with.
type arbChannel struct {
// channel is the in-memory channel state.
channel *channeldb.OpenChannel
// c references the chain arbitrator and is used by arbChannel
// internally.
c *ChainArbitrator
}
// NewAnchorResolutions returns the anchor resolutions for currently valid
// commitment transactions.
//
// NOTE: Part of the ArbChannel interface.
func (a *arbChannel) NewAnchorResolutions() ([]*lnwallet.AnchorResolution,
error) {
// Get a fresh copy of the database state to base the anchor resolutions
// on. Unfortunately the channel instance that we have here isn't the
// same instance that is used by the link.
chanPoint := a.channel.FundingOutpoint
channel, err := a.c.chanSource.FetchChannel(chanPoint)
if err != nil {
return nil, err
}
chanMachine, err := lnwallet.NewLightningChannel(
a.c.cfg.Signer, channel, nil,
)
if err != nil {
return nil, err
}
return chanMachine.NewAnchorResolutions()
}
// ForceCloseChan should force close the contract that this attendant is
// watching over. We'll use this when we decide that we need to go to chain. It
// should in addition tell the switch to remove the corresponding link, such
// that we won't accept any new updates. The returned summary contains all items
// needed to eventually resolve all outputs on chain.
//
// NOTE: Part of the ArbChannel interface.
func (a *arbChannel) ForceCloseChan() (*lnwallet.LocalForceCloseSummary, error) {
// First, we mark the channel as borked, this ensure
// that no new state transitions can happen, and also
// that the link won't be loaded into the switch.
if err := a.channel.MarkBorked(); err != nil {
return nil, err
}
// With the channel marked as borked, we'll now remove
// the link from the switch if its there. If the link
// is active, then this method will block until it
// exits.
chanPoint := a.channel.FundingOutpoint
if err := a.c.cfg.MarkLinkInactive(chanPoint); err != nil {
log.Errorf("unable to mark link inactive: %v", err)
}
// Now that we know the link can't mutate the channel
// state, we'll read the channel from disk the target
// channel according to its channel point.
channel, err := a.c.chanSource.FetchChannel(chanPoint)
if err != nil {
return nil, err
}
// Finally, we'll force close the channel completing
// the force close workflow.
chanMachine, err := lnwallet.NewLightningChannel(
a.c.cfg.Signer, channel, nil,
)
if err != nil {
return nil, err
}
return chanMachine.ForceClose()
}
// newActiveChannelArbitrator creates a new instance of an active channel
// arbitrator given the state of the target channel.
func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
c *ChainArbitrator, chanEvents *ChainEventSubscription) (*ChannelArbitrator, error) {
log.Tracef("Creating ChannelArbitrator for ChannelPoint(%v)",
channel.FundingOutpoint)
// TODO(roasbeef): fetch best height (or pass in) so can ensure block
// epoch delivers all the notifications to
chanPoint := channel.FundingOutpoint
// Next we'll create the matching configuration struct that contains
// all interfaces and methods the arbitrator needs to do its job.
arbCfg := ChannelArbitratorConfig{
ChanPoint: chanPoint,
Channel: c.getArbChannel(channel),
ShortChanID: channel.ShortChanID(),
MarkCommitmentBroadcasted: channel.MarkCommitmentBroadcasted,
MarkChannelClosed: func(summary *channeldb.ChannelCloseSummary,
statuses ...channeldb.ChannelStatus) error {
err := channel.CloseChannel(summary, statuses...)
if err != nil {
return err
}
c.cfg.NotifyClosedChannel(summary.ChanPoint)
return nil
},
IsPendingClose: false,
ChainArbitratorConfig: c.cfg,
ChainEvents: chanEvents,
PutResolverReport: func(tx kvdb.RwTx,
report *channeldb.ResolverReport) error {
return c.chanSource.PutResolverReport(
tx, c.cfg.ChainHash, &channel.FundingOutpoint,
report,
)
},
}
// The final component needed is an arbitrator log that the arbitrator
// will use to keep track of its internal state using a backed
// persistent log.
//
// TODO(roasbeef); abstraction leak...
// * rework: adaptor method to set log scope w/ factory func
chanLog, err := newBoltArbitratorLog(
c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint,
)
if err != nil {
return nil, err
}
arbCfg.MarkChannelResolved = func() error {
return c.ResolveContract(chanPoint)
}
// Finally, we'll need to construct a series of htlc Sets based on all
// currently known valid commitments.
htlcSets := make(map[HtlcSetKey]htlcSet)
htlcSets[LocalHtlcSet] = newHtlcSet(channel.LocalCommitment.Htlcs)
htlcSets[RemoteHtlcSet] = newHtlcSet(channel.RemoteCommitment.Htlcs)
pendingRemoteCommitment, err := channel.RemoteCommitChainTip()
if err != nil && err != channeldb.ErrNoPendingCommit {
return nil, err
}
if pendingRemoteCommitment != nil {
htlcSets[RemotePendingHtlcSet] = newHtlcSet(
pendingRemoteCommitment.Commitment.Htlcs,
)
}
return NewChannelArbitrator(
arbCfg, htlcSets, chanLog,
), nil
}
// getArbChannel returns an open channel wrapper for use by channel arbitrators.
func (c *ChainArbitrator) getArbChannel(
channel *channeldb.OpenChannel) *arbChannel {
return &arbChannel{
channel: channel,
c: c,
}
}
// ResolveContract marks a contract as fully resolved within the database.
// This is only to be done once all contracts which were live on the channel
// before hitting the chain have been resolved.
func (c *ChainArbitrator) ResolveContract(chanPoint wire.OutPoint) error {
log.Infof("Marking ChannelPoint(%v) fully resolved", chanPoint)
// First, we'll we'll mark the channel as fully closed from the PoV of
// the channel source.
err := c.chanSource.MarkChanFullyClosed(&chanPoint)
if err != nil {
log.Errorf("ChainArbitrator: unable to mark ChannelPoint(%v) "+
"fully closed: %v", chanPoint, err)
return err
}
// Now that the channel has been marked as fully closed, we'll stop
// both the channel arbitrator and chain watcher for this channel if
// they're still active.
var arbLog ArbitratorLog
c.Lock()
chainArb := c.activeChannels[chanPoint]
delete(c.activeChannels, chanPoint)
chainWatcher := c.activeWatchers[chanPoint]
delete(c.activeWatchers, chanPoint)
c.Unlock()
if chainArb != nil {
arbLog = chainArb.log
if err := chainArb.Stop(); err != nil {
log.Warnf("unable to stop ChannelArbitrator(%v): %v",
chanPoint, err)
}
}
if chainWatcher != nil {
if err := chainWatcher.Stop(); err != nil {
log.Warnf("unable to stop ChainWatcher(%v): %v",
chanPoint, err)
}
}
// Once this has been marked as resolved, we'll wipe the log that the
// channel arbitrator was using to store its persistent state. We do
// this after marking the channel resolved, as otherwise, the
// arbitrator would be re-created, and think it was starting from the
// default state.
if arbLog != nil {
if err := arbLog.WipeHistory(); err != nil {
return err
}
}
return nil
}
// Start launches all goroutines that the ChainArbitrator needs to operate.
func (c *ChainArbitrator) Start() error {
if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
return nil
}
log.Tracef("Starting ChainArbitrator")
// First, we'll fetch all the channels that are still open, in order to
// collect them within our set of active contracts.
openChannels, err := c.chanSource.FetchAllChannels()
if err != nil {
return err
}
if len(openChannels) > 0 {
log.Infof("Creating ChannelArbitrators for %v active channels",
len(openChannels))
}
// For each open channel, we'll configure then launch a corresponding
// ChannelArbitrator.
for _, channel := range openChannels {
chanPoint := channel.FundingOutpoint
channel := channel
// First, we'll create an active chainWatcher for this channel
// to ensure that we detect any relevant on chain events.
chainWatcher, err := newChainWatcher(
chainWatcherConfig{
chanState: channel,
notifier: c.cfg.Notifier,
signer: c.cfg.Signer,
isOurAddr: c.cfg.IsOurAddress,
contractBreach: func(retInfo *lnwallet.BreachRetribution,
markClosed func() error) error {
return c.cfg.ContractBreach(
chanPoint, retInfo, markClosed,
)
},
extractStateNumHint: lnwallet.GetStateNumHint,
},
)
if err != nil {
return err
}
c.activeWatchers[chanPoint] = chainWatcher
channelArb, err := newActiveChannelArbitrator(
channel, c, chainWatcher.SubscribeChannelEvents(),
)
if err != nil {
return err
}
c.activeChannels[chanPoint] = channelArb
// Republish any closing transactions for this channel.
err = c.publishClosingTxs(channel)
if err != nil {
return err
}
}
// In addition to the channels that we know to be open, we'll also
// launch arbitrators to finishing resolving any channels that are in
// the pending close state.
closingChannels, err := c.chanSource.FetchClosedChannels(true)
if err != nil {
return err
}
if len(closingChannels) > 0 {
log.Infof("Creating ChannelArbitrators for %v closing channels",
len(closingChannels))
}
// Next, for each channel is the closing state, we'll launch a
// corresponding more restricted resolver, as we don't have to watch
// the chain any longer, only resolve the contracts on the confirmed
// commitment.
for _, closeChanInfo := range closingChannels {
// We can leave off the CloseContract and ForceCloseChan
// methods as the channel is already closed at this point.
chanPoint := closeChanInfo.ChanPoint
arbCfg := ChannelArbitratorConfig{
ChanPoint: chanPoint,
ShortChanID: closeChanInfo.ShortChanID,
ChainArbitratorConfig: c.cfg,
ChainEvents: &ChainEventSubscription{},
IsPendingClose: true,
ClosingHeight: closeChanInfo.CloseHeight,
CloseType: closeChanInfo.CloseType,
PutResolverReport: func(tx kvdb.RwTx,
report *channeldb.ResolverReport) error {
return c.chanSource.PutResolverReport(
tx, c.cfg.ChainHash, &chanPoint, report,
)
},
}
chanLog, err := newBoltArbitratorLog(
c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint,
)
if err != nil {
return err
}
arbCfg.MarkChannelResolved = func() error {
return c.ResolveContract(chanPoint)
}
// We can also leave off the set of HTLC's here as since the
// channel is already in the process of being full resolved, no
// new HTLC's will be added.
c.activeChannels[chanPoint] = NewChannelArbitrator(
arbCfg, nil, chanLog,
)
}
// Now, we'll start all chain watchers in parallel to shorten start up
// duration. In neutrino mode, this allows spend registrations to take
// advantage of batch spend reporting, instead of doing a single rescan
// per chain watcher.
//
// NOTE: After this point, we Stop the chain arb to ensure that any
// lingering goroutines are cleaned up before exiting.
watcherErrs := make(chan error, len(c.activeWatchers))
var wg sync.WaitGroup
for _, watcher := range c.activeWatchers {
wg.Add(1)
go func(w *chainWatcher) {
defer wg.Done()
select {
case watcherErrs <- w.Start():
case <-c.quit:
watcherErrs <- ErrChainArbExiting
}
}(watcher)
}
// Once all chain watchers have been started, seal the err chan to
// signal the end of the err stream.
go func() {
wg.Wait()
close(watcherErrs)
}()
// stopAndLog is a helper function which shuts down the chain arb and
// logs errors if they occur.
stopAndLog := func() {
if err := c.Stop(); err != nil {
log.Errorf("ChainArbitrator could not shutdown: %v", err)
}
}
// Handle all errors returned from spawning our chain watchers. If any
// of them failed, we will stop the chain arb to shutdown any active
// goroutines.
for err := range watcherErrs {
if err != nil {
stopAndLog()
return err
}
}
// Before we start all of our arbitrators, we do a preliminary state
// lookup so that we can combine all of these lookups in a single db
// transaction.
var startStates map[wire.OutPoint]*chanArbStartState
err = kvdb.View(c.chanSource, func(tx walletdb.ReadTx) error {
for _, arbitrator := range c.activeChannels {
startState, err := arbitrator.getStartState(tx)
if err != nil {
return err
}
startStates[arbitrator.cfg.ChanPoint] = startState
}
return nil
}, func() {
startStates = make(
map[wire.OutPoint]*chanArbStartState,
len(c.activeChannels),
)
})
if err != nil {
stopAndLog()
return err
}
// Launch all the goroutines for each arbitrator so they can carry out
// their duties.
for _, arbitrator := range c.activeChannels {
startState, ok := startStates[arbitrator.cfg.ChanPoint]
if !ok {
stopAndLog()
return fmt.Errorf("arbitrator: %v has no start state",
arbitrator.cfg.ChanPoint)
}
if err := arbitrator.Start(startState); err != nil {
stopAndLog()
return err
}
}
// Subscribe to a single stream of block epoch notifications that we
// will dispatch to all active arbitrators.
blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn(nil)
if err != nil {
return err
}
// Start our goroutine which will dispatch blocks to each arbitrator.
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.dispatchBlocks(blockEpoch)
}()
// TODO(roasbeef): eventually move all breach watching here
return nil
}
// blockRecipient contains the information we need to dispatch a block to a
// channel arbitrator.
type blockRecipient struct {
// chanPoint is the funding outpoint of the channel.
chanPoint wire.OutPoint
// blocks is the channel that new block heights are sent into. This
// channel should be sufficiently buffered as to not block the sender.
blocks chan<- int32
// quit is closed if the receiving entity is shutting down.
quit chan struct{}
}
// dispatchBlocks consumes a block epoch notification stream and dispatches
// blocks to each of the chain arb's active channel arbitrators. This function
// must be run in a goroutine.
func (c *ChainArbitrator) dispatchBlocks(
blockEpoch *chainntnfs.BlockEpochEvent) {
// getRecipients is a helper function which acquires the chain arb
// lock and returns a set of block recipients which can be used to
// dispatch blocks.
getRecipients := func() []blockRecipient {
c.Lock()
blocks := make([]blockRecipient, 0, len(c.activeChannels))
for _, channel := range c.activeChannels {
blocks = append(blocks, blockRecipient{
chanPoint: channel.cfg.ChanPoint,
blocks: channel.blocks,
quit: channel.quit,
})
}
c.Unlock()
return blocks
}
// On exit, cancel our blocks subscription and close each block channel
// so that the arbitrators know they will no longer be receiving blocks.
defer func() {
blockEpoch.Cancel()
recipients := getRecipients()
for _, recipient := range recipients {
close(recipient.blocks)
}
}()
// Consume block epochs until we receive the instruction to shutdown.
for {
select {
// Consume block epochs, exiting if our subscription is
// terminated.
case block, ok := <-blockEpoch.Epochs:
if !ok {
log.Trace("dispatchBlocks block epoch " +
"cancelled")
return
}
// Get the set of currently active channels block
// subscription channels and dispatch the block to
// each.
for _, recipient := range getRecipients() {
select {
// Deliver the block to the arbitrator.
case recipient.blocks <- block.Height:
// If the recipient is shutting down, exit
// without delivering the block. This may be
// the case when two blocks are mined in quick
// succession, and the arbitrator resolves
// after the first block, and does not need to
// consume the second block.
case <-recipient.quit:
log.Debugf("channel: %v exit without "+
"receiving block: %v",
recipient.chanPoint,
block.Height)
// If the chain arb is shutting down, we don't
// need to deliver any more blocks (everything
// will be shutting down).
case <-c.quit:
return
}
}
// Exit if the chain arbitrator is shutting down.
case <-c.quit:
return
}
}
}
// publishClosingTxs will load any stored cooperative or unilater closing
// transactions and republish them. This helps ensure propagation of the
// transactions in the event that prior publications failed.
func (c *ChainArbitrator) publishClosingTxs(
channel *channeldb.OpenChannel) error {
// If the channel has had its unilateral close broadcasted already,
// republish it in case it didn't propagate.
if channel.HasChanStatus(channeldb.ChanStatusCommitBroadcasted) {
err := c.rebroadcast(
channel, channeldb.ChanStatusCommitBroadcasted,
)
if err != nil {
return err
}
}
// If the channel has had its cooperative close broadcasted
// already, republish it in case it didn't propagate.
if channel.HasChanStatus(channeldb.ChanStatusCoopBroadcasted) {
err := c.rebroadcast(
channel, channeldb.ChanStatusCoopBroadcasted,
)
if err != nil {
return err
}
}
return nil
}
// rebroadcast is a helper method which will republish the unilateral or
// cooperative close transaction or a channel in a particular state.
//
// NOTE: There is no risk to caling this method if the channel isn't in either
// CommimentBroadcasted or CoopBroadcasted, but the logs will be misleading.
func (c *ChainArbitrator) rebroadcast(channel *channeldb.OpenChannel,
state channeldb.ChannelStatus) error {
chanPoint := channel.FundingOutpoint
var (
closeTx *wire.MsgTx
kind string
err error
)
switch state {
case channeldb.ChanStatusCommitBroadcasted:
kind = "force"
closeTx, err = channel.BroadcastedCommitment()
case channeldb.ChanStatusCoopBroadcasted:
kind = "coop"
closeTx, err = channel.BroadcastedCooperative()
default:
return fmt.Errorf("unknown closing state: %v", state)
}
switch {
// This can happen for channels that had their closing tx published
// before we started storing it to disk.
case err == channeldb.ErrNoCloseTx:
log.Warnf("Channel %v is in state %v, but no %s closing tx "+
"to re-publish...", chanPoint, state, kind)
return nil
case err != nil:
return err
}
log.Infof("Re-publishing %s close tx(%v) for channel %v",
kind, closeTx.TxHash(), chanPoint)
label := labels.MakeLabel(
labels.LabelTypeChannelClose, &channel.ShortChannelID,
)
err = c.cfg.PublishTx(closeTx, label)
if err != nil && err != lnwallet.ErrDoubleSpend {
log.Warnf("Unable to broadcast %s close tx(%v): %v",
kind, closeTx.TxHash(), err)
}
return nil
}
// Stop signals the ChainArbitrator to trigger a graceful shutdown. Any active
// channel arbitrators will be signalled to exit, and this method will block
// until they've all exited.
func (c *ChainArbitrator) Stop() error {
if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
return nil
}
log.Infof("Stopping ChainArbitrator")
close(c.quit)
var (
activeWatchers = make(map[wire.OutPoint]*chainWatcher)
activeChannels = make(map[wire.OutPoint]*ChannelArbitrator)
)
// Copy the current set of active watchers and arbitrators to shutdown.
// We don't want to hold the lock when shutting down each watcher or
// arbitrator individually, as they may need to acquire this mutex.
c.Lock()
for chanPoint, watcher := range c.activeWatchers {
activeWatchers[chanPoint] = watcher
}
for chanPoint, arbitrator := range c.activeChannels {
activeChannels[chanPoint] = arbitrator
}
c.Unlock()
for chanPoint, watcher := range activeWatchers {
log.Tracef("Attempting to stop ChainWatcher(%v)",
chanPoint)
if err := watcher.Stop(); err != nil {
log.Errorf("unable to stop watcher for "+
"ChannelPoint(%v): %v", chanPoint, err)
}
}
for chanPoint, arbitrator := range activeChannels {
log.Tracef("Attempting to stop ChannelArbitrator(%v)",
chanPoint)
if err := arbitrator.Stop(); err != nil {
log.Errorf("unable to stop arbitrator for "+
"ChannelPoint(%v): %v", chanPoint, err)
}
}
c.wg.Wait()
return nil
}
// ContractUpdate is a message packages the latest set of active HTLCs on a
// commitment, and also identifies which commitment received a new set of
// HTLCs.
type ContractUpdate struct {
// HtlcKey identifies which commitment the HTLCs below are present on.
HtlcKey HtlcSetKey
// Htlcs are the of active HTLCs on the commitment identified by the
// above HtlcKey.
Htlcs []channeldb.HTLC
}
// ContractSignals wraps the two signals that affect the state of a channel
// being watched by an arbitrator. The two signals we care about are: the
// channel has a new set of HTLC's, and the remote party has just broadcast
// their version of the commitment transaction.
type ContractSignals struct {
// HtlcUpdates is a channel that the link will use to update the
// designated channel arbitrator when the set of HTLCs on any valid
// commitment changes.
HtlcUpdates chan *ContractUpdate
// ShortChanID is the up to date short channel ID for a contract. This
// can change either if when the contract was added it didn't yet have
// a stable identifier, or in the case of a reorg.
ShortChanID lnwire.ShortChannelID
}
// UpdateContractSignals sends a set of active, up to date contract signals to
// the ChannelArbitrator which is has been assigned to the channel infield by
// the passed channel point.
func (c *ChainArbitrator) UpdateContractSignals(chanPoint wire.OutPoint,
signals *ContractSignals) error {
log.Infof("Attempting to update ContractSignals for ChannelPoint(%v)",
chanPoint)
c.Lock()
arbitrator, ok := c.activeChannels[chanPoint]
c.Unlock()
if !ok {
return fmt.Errorf("unable to find arbitrator")
}
arbitrator.UpdateContractSignals(signals)
return nil
}
// GetChannelArbitrator safely returns the channel arbitrator for a given
// channel outpoint.
func (c *ChainArbitrator) GetChannelArbitrator(chanPoint wire.OutPoint) (
*ChannelArbitrator, error) {
c.Lock()
arbitrator, ok := c.activeChannels[chanPoint]
c.Unlock()
if !ok {
return nil, fmt.Errorf("unable to find arbitrator")
}
return arbitrator, nil
}
// forceCloseReq is a request sent from an outside sub-system to the arbitrator
// that watches a particular channel to broadcast the commitment transaction,
// and enter the resolution phase of the channel.
type forceCloseReq struct {
// errResp is a channel that will be sent upon either in the case of
// force close success (nil error), or in the case on an error.
//
// NOTE; This channel MUST be buffered.
errResp chan error
// closeTx is a channel that carries the transaction which ultimately
// closed out the channel.
closeTx chan *wire.MsgTx
}
// ForceCloseContract attempts to force close the channel infield by the passed
// channel point. A force close will immediately terminate the contract,
// causing it to enter the resolution phase. If the force close was successful,
// then the force close transaction itself will be returned.
//
// TODO(roasbeef): just return the summary itself?
func (c *ChainArbitrator) ForceCloseContract(chanPoint wire.OutPoint) (*wire.MsgTx, error) {
c.Lock()
arbitrator, ok := c.activeChannels[chanPoint]
c.Unlock()
if !ok {
return nil, fmt.Errorf("unable to find arbitrator")
}
log.Infof("Attempting to force close ChannelPoint(%v)", chanPoint)
// Before closing, we'll attempt to send a disable update for the
// channel. We do so before closing the channel as otherwise the current
// edge policy won't be retrievable from the graph.
if err := c.cfg.DisableChannel(chanPoint); err != nil {
log.Warnf("Unable to disable channel %v on "+
"close: %v", chanPoint, err)
}
errChan := make(chan error, 1)
respChan := make(chan *wire.MsgTx, 1)
// With the channel found, and the request crafted, we'll send over a
// force close request to the arbitrator that watches this channel.
select {
case arbitrator.forceCloseReqs <- &forceCloseReq{
errResp: errChan,
closeTx: respChan,
}:
case <-c.quit:
return nil, ErrChainArbExiting
}
// We'll await two responses: the error response, and the transaction
// that closed out the channel.
select {
case err := <-errChan:
if err != nil {
return nil, err
}
case <-c.quit:
return nil, ErrChainArbExiting
}
var closeTx *wire.MsgTx
select {
case closeTx = <-respChan:
case <-c.quit:
return nil, ErrChainArbExiting
}
return closeTx, nil
}
// WatchNewChannel sends the ChainArbitrator a message to create a
// ChannelArbitrator tasked with watching over a new channel. Once a new
// channel has finished its final funding flow, it should be registered with
// the ChainArbitrator so we can properly react to any on-chain events.
func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error {
c.Lock()
defer c.Unlock()
log.Infof("Creating new ChannelArbitrator for ChannelPoint(%v)",
newChan.FundingOutpoint)
// If we're already watching this channel, then we'll ignore this
// request.
chanPoint := newChan.FundingOutpoint
if _, ok := c.activeChannels[chanPoint]; ok {
return nil
}
// First, also create an active chainWatcher for this channel to ensure
// that we detect any relevant on chain events.
chainWatcher, err := newChainWatcher(
chainWatcherConfig{
chanState: newChan,
notifier: c.cfg.Notifier,
signer: c.cfg.Signer,
isOurAddr: c.cfg.IsOurAddress,
contractBreach: func(retInfo *lnwallet.BreachRetribution,
markClosed func() error) error {
return c.cfg.ContractBreach(
chanPoint, retInfo, markClosed,
)
},
extractStateNumHint: lnwallet.GetStateNumHint,
},
)
if err != nil {
return err
}
c.activeWatchers[newChan.FundingOutpoint] = chainWatcher
// We'll also create a new channel arbitrator instance using this new
// channel, and our internal state.
channelArb, err := newActiveChannelArbitrator(
newChan, c, chainWatcher.SubscribeChannelEvents(),
)
if err != nil {
return err
}
// With the arbitrator created, we'll add it to our set of active
// arbitrators, then launch it.
c.activeChannels[chanPoint] = channelArb
if err := channelArb.Start(nil); err != nil {
return err
}
return chainWatcher.Start()
}
// SubscribeChannelEvents returns a new active subscription for the set of
// possible on-chain events for a particular channel. The struct can be used by
// callers to be notified whenever an event that changes the state of the
// channel on-chain occurs.
func (c *ChainArbitrator) SubscribeChannelEvents(
chanPoint wire.OutPoint) (*ChainEventSubscription, error) {
// First, we'll attempt to look up the active watcher for this channel.
// If we can't find it, then we'll return an error back to the caller.
watcher, ok := c.activeWatchers[chanPoint]
if !ok {
return nil, fmt.Errorf("unable to find watcher for: %v",
chanPoint)
}
// With the watcher located, we'll request for it to create a new chain
// event subscription client.
return watcher.SubscribeChannelEvents(), nil
}
// TODO(roasbeef): arbitration reports
// * types: contested, waiting for success conf, etc