lnd.xprv/contractcourt/chain_arbitrator.go

589 lines
19 KiB
Go
Raw Normal View History

package contractcourt
import (
"fmt"
"sync"
"sync/atomic"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/chaincfg/chainhash"
"github.com/roasbeef/btcd/wire"
)
// ResolutionMsg is a message sent by resolvers to outside sub-systems once an
// outgoing contract has been fully resolved. For multi-hop contracts, if we
// resolve the outgoing contract, we'll also need to ensure that the incoming
// contract is resolved as well. We package the items required to resolve the
// incoming contracts within this message.
type ResolutionMsg struct {
// SourceChan identifies the channel that this message is being sent
// from. This is the channel's short channel ID.
SourceChan lnwire.ShortChannelID
// HtlcIndex is the index of the contract within the original
// commitment trace.
HtlcIndex uint64
// Failure will be non-nil if the incoming contract should be cancelled
// all together. This can happen if the outgoing contract was dust, if
// if the outgoing HTLC timed out.
Failure lnwire.FailureMessage
// PreImage will be non-nil if the incoming contract can successfully
// be redeemed. This can happen if we learn of the preimage from the
// outgoing HTLC on-chain.
PreImage *[32]byte
}
// ChainArbitratorConfig is a configuration struct that contains all the
// function closures and interface that required to arbitrate on-chain
// contracts for a particular chain.
type ChainArbitratorConfig struct {
// ChainHash is the chain that this arbitrator is to operate within.
ChainHash chainhash.Hash
// BroadcastDelta is the delta that we'll use to decide when to
// broadcast our commitment transaction. This value should be set
// based on our current fee estimation of the commitment transaction.
// We use this to determine when we should broadcast instead of the
// just the HTLC timeout, as we want to ensure that the commitment
// transaction is already confirmed, by the time the HTLC expires.
BroadcastDelta uint32
// NewSweepAddr is a function that returns a new address under control
// by the wallet. We'll use this to sweep any no-delay outputs as a
// result of unilateral channel closes.
//
// NOTE: This SHOULD return a p2wkh script.
NewSweepAddr func() ([]byte, error)
// PublishTx reliably broadcasts a transaction to the network. Once
// this function exits without an error, then they transaction MUST
// continually be rebroadcast if needed.
PublishTx func(*wire.MsgTx) error
// DeliverResolutionMsg is a function that will append an outgoing
// message to the "out box" for a ChannelLink. This is used to cancel
// backwards any HTLC's that are either dust, we're timing out, or
// settling on-chain to the incoming link.
DeliverResolutionMsg func(...ResolutionMsg) error
// MarkLinkInactive is a function closure that the ChainArbitrator will
// use to mark that active HTLC's shouldn't be attempt ted to be routed
// over a particular channel. This function will be called in that a
// ChannelArbitrator decides that it needs to go to chain in order to
// resolve contracts.
//
// TODO(Roasbeef): rename, routing based
MarkLinkInactive func(wire.OutPoint) error
// IncubateOutput sends either a incoming HTLC, an outgoing HTLC, or
// both to the utxo nursery. Once this function returns, the nursery
// should have safely persisted the outputs to disk, and should start
// the process of incubation. This is used when a resolver wishes to
// pass off the output to the nursery as we're inly waiting on an
// absolute/relative item block.
IncubateOutputs func(wire.OutPoint, *lnwallet.CommitOutputResolution,
*lnwallet.OutgoingHtlcResolution,
*lnwallet.IncomingHtlcResolution) error
// PreimageDB is a global store of all known pre-images. We'll use this
// to decide if we should broadcast a commitment transaction to claim
// an HTLC on-chain.
PreimageDB WitnessBeacon
// Notifier is an instance of a chain notifier we'll use to watch for
// certain on-chain events.
Notifier chainntnfs.ChainNotifier
// Signer is a signer backed by the active lnd node. This should be
// capable of producing a signature as specified by a valid
// SignDescriptor.
Signer lnwallet.Signer
// FeeEstimator will be used to return fee estimates.
FeeEstimator lnwallet.FeeEstimator
// ChainIO allows us to query the state of the current main chain.
ChainIO lnwallet.BlockChainIO
}
// ChainArbitrator is a sub-system that oversees the on-chain resolution of all
// active, and channel that are in the "pending close" state. Within the
// contractcourt package, the ChainArbitrator manages a set of active
// ContractArbitrators. Each ContractArbitrators is responsible for watching
// the chain for any activity that affects the state of the channel, and also
// for monitoring each contract in order to determine if any on-chain activity is
// required. Outside sub-systems interact with the ChainArbitrator in order to
// forcibly exit a contract, update the set of live signals for each contract,
// and to receive reports on the state of contract resolution.
type ChainArbitrator struct {
started int32
stopped int32
sync.Mutex
// activeChannels is a map of all the active contracts that are still
// open, and not fully resolved.
activeChannels map[wire.OutPoint]*ChannelArbitrator
// cfg is the config struct for the arbitrator that contains all
// methods and interface it needs to operate.
cfg ChainArbitratorConfig
// chanSource will be used by the ChainArbitrator to fetch all the
// active channels that it must still watch over.
chanSource *channeldb.DB
quit chan struct{}
wg sync.WaitGroup
}
// NewChainArbitrator returns a new instance of the ChainArbitrator using the
// passed config struct, and backing persistent database.
func NewChainArbitrator(cfg ChainArbitratorConfig,
db *channeldb.DB) *ChainArbitrator {
return &ChainArbitrator{
cfg: cfg,
activeChannels: make(map[wire.OutPoint]*ChannelArbitrator),
chanSource: db,
quit: make(chan struct{}),
}
}
// newActiveChannelArbitrator creates a new instance of an active channel
// arbitrator given the state of the target channel.
func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
c *ChainArbitrator) (*ChannelArbitrator, error) {
log.Tracef("Creating ChannelArbitrator for ChannelPoint(%v)",
channel.FundingOutpoint)
// We'll start by registering for a block epoch notifications so this
// channel can keep track of the current state of the main chain.
//
// TODO(roasbeef): fetch best height (or pass in) so can ensure block
// epoch delivers all the notifications to
//
// TODO(roasbeef): instead 1 block epoch that multi-plexes to the rest?
// * reduces the number of goroutines
blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn()
if err != nil {
return nil, err
}
chanPoint := channel.FundingOutpoint
// Next we'll create the matching configuration struct that contains
// all interfaces and methods the arbitrator needs to do its job.
arbCfg := ChannelArbitratorConfig{
ChanPoint: chanPoint,
ShortChanID: channel.ShortChanID,
BlockEpochs: blockEpoch,
ForceCloseChan: func() (*lnwallet.ForceCloseSummary, error) {
// With the channels fetched, attempt to locate
// the target channel according to its channel
// point.
dbChannels, err := c.chanSource.FetchAllChannels()
if err != nil {
return nil, err
}
var channel *channeldb.OpenChannel
for _, dbChannel := range dbChannels {
if dbChannel.FundingOutpoint == chanPoint {
channel = dbChannel
break
}
}
// If the channel cannot be located, then we
// exit with an error to the channel.
if channel == nil {
return nil, fmt.Errorf("unable to find channel")
}
chanMachine, err := lnwallet.NewLightningChannel(
c.cfg.Signer, nil, c.cfg.PreimageDB, channel)
if err != nil {
return nil, err
}
chanMachine.Stop()
chanMachine.CancelObserver()
if err := c.cfg.MarkLinkInactive(chanPoint); err != nil {
log.Errorf("unable to mark link inactive: %v", err)
}
return chanMachine.ForceClose()
},
CloseChannel: func(summary *channeldb.ChannelCloseSummary) error {
log.Tracef("ChannelArbitrator(%v): closing "+
"channel", chanPoint)
return channel.CloseChannel(summary)
},
ChainArbitratorConfig: c.cfg,
}
// The final component needed is an arbitrator log that the arbitrator
// will use to keep track of its internal state using a backed
// persistent log.
//
// TODO(roasbeef); abstraction leak...
// * rework: adaptor method to set log scope w/ factory func
chanLog, err := newBoltArbitratorLog(
c.chanSource.DB, arbCfg, c.cfg.ChainHash, chanPoint,
)
if err != nil {
blockEpoch.Cancel()
return nil, err
}
arbCfg.MarkChannelResolved = func() error {
return c.resolveContract(chanPoint, chanLog)
}
return NewChannelArbitrator(
arbCfg, channel.LocalCommitment.Htlcs, chanLog,
), nil
}
// resolveContract marks a contract as fully resolved within the database.
// This is only to be done once all contracts which were live on the channel
// before hitting the chain have been resolved.
func (c *ChainArbitrator) resolveContract(chanPoint wire.OutPoint,
arbLog ArbitratorLog) error {
log.Infof("Marking ChannelPoint(%v) fully resolved", chanPoint)
// First, we'll we'll mark the channel as fully closed from the PoV of
// the channel source.
err := c.chanSource.MarkChanFullyClosed(&chanPoint)
if err != nil {
return err
}
// Once this has been marked as resolved, we'll wipe the log that the
// channel arbitrator was using to store its persistent state. We do
// this after marking the channel resolved, as otherwise, the
// arbitrator would be re-created, and think it was starting from the
// default state.
if err := arbLog.WipeHistory(); err != nil {
return err
}
c.Lock()
delete(c.activeChannels, chanPoint)
c.Unlock()
return nil
}
// Start launches all goroutines that the ChainArbitrator needs to operate.
func (c *ChainArbitrator) Start() error {
if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
return nil
}
log.Tracef("Starting ChainArbitrator")
// First, we'll fetch all the channels that are still open, in order to
// collect them within our set of active contracts.
openChannels, err := c.chanSource.FetchAllChannels()
if err != nil {
return err
}
if len(openChannels) > 0 {
log.Infof("Creating ChannelArbitrators for %v active channels",
len(openChannels))
}
// For each open channel, we'll configure then launch a corresponding
// ChannelArbitrator.
for _, channel := range openChannels {
channelArb, err := newActiveChannelArbitrator(channel, c)
if err != nil {
return err
}
c.activeChannels[channel.FundingOutpoint] = channelArb
}
// In addition to the channels that we know to be open, we'll also
// launch arbitrators to finishing resolving any channels that are in
// the pending close state.
closingChannels, err := c.chanSource.FetchClosedChannels(true)
if err != nil {
return err
}
if len(closingChannels) > 0 {
log.Infof("Creating ChannelArbitrators for %v closing channels",
len(closingChannels))
}
// Next, for each channel is the closing state, we'll launch a
// corresponding more restricted resolver.
for _, closeChanInfo := range closingChannels {
blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn()
if err != nil {
return err
}
// We can leave off the CloseContract and ForceCloseChan
// methods as the channel is already closed at this point.
chanPoint := closeChanInfo.ChanPoint
arbCfg := ChannelArbitratorConfig{
ChanPoint: chanPoint,
ShortChanID: closeChanInfo.ShortChanID,
BlockEpochs: blockEpoch,
ChainArbitratorConfig: c.cfg,
}
chanLog, err := newBoltArbitratorLog(
c.chanSource.DB, arbCfg, c.cfg.ChainHash, chanPoint,
)
if err != nil {
return err
}
arbCfg.MarkChannelResolved = func() error {
return c.resolveContract(chanPoint, chanLog)
}
// We can also leave off the set of HTLC's here as since the
// channel is already in the process of being full resolved, no
// new HTLC's we be added.
c.activeChannels[chanPoint] = NewChannelArbitrator(
arbCfg, nil, chanLog,
)
}
// Finally, we'll launch all the goroutines for each arbitrator so they
// can carry out their duties.
for _, arbitrator := range c.activeChannels {
if err := arbitrator.Start(); err != nil {
return err
}
}
// TODO(roasbeef): eventually move all breach watching here
return nil
}
// Stop signals the ChainArbitrator to trigger a graceful shutdown. Any active
// channel arbitrators will be signalled to exit, and this method will block
// until they've all exited.
func (c *ChainArbitrator) Stop() error {
if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
return nil
}
log.Infof("Stopping ChainArbitrator")
close(c.quit)
c.Lock()
arbitrators := c.activeChannels
c.Unlock()
for chanPoint, arbitrator := range arbitrators {
log.Tracef("Attempting to stop ChannelArbitrator(%v)",
chanPoint)
if err := arbitrator.Stop(); err != nil {
log.Errorf("unable to stop arbitrator for "+
"ChannelPoint(%v): %v", chanPoint, err)
}
}
c.wg.Wait()
return nil
}
// ContractSignals wraps the two signals that affect the state of a channel
// being watched by an arbitrator. The two signals we care about are: the
// channel has a new set of HTLC's, and the remote party has just broadcast
// their version of the commitment transaction.
type ContractSignals struct {
// HtlcUpdates is a channel that once we new commitment updates takes
// place, the later set of HTLC's on the commitment transaction should
// be sent over.
HtlcUpdates chan []channeldb.HTLC
// UniCloseSignal is a channel that allows the ChannelArbitrator for a
// particular channel to detect if the remote party has broadcast their
// version of the commitment transaction.
//
// TODO(roasbeef): eliminate and just roll into the struct itself, all
// watching
UniCloseSignal chan *lnwallet.UnilateralCloseSummary
// ShortChanID is the up to date short channel ID for a contract. This
// can change either if when the contract was added it didn't yet have
// a stable identifier, or in the case of a reorg.
ShortChanID lnwire.ShortChannelID
}
// UpdateContractSignals sends a set of active, up to date contract signals to
// the ChannelArbitrator which is has been assigned to the channel infield by
// the passed channel point.
func (c *ChainArbitrator) UpdateContractSignals(chanPoint wire.OutPoint,
signals *ContractSignals) error {
log.Infof("Attempting to update ContractSignals for ChannelPoint(%v)",
chanPoint)
c.Lock()
arbitrator, ok := c.activeChannels[chanPoint]
c.Unlock()
if !ok {
return fmt.Errorf("unable to find arbitrator")
}
arbitrator.UpdateContractSignals(signals)
return nil
}
// forceCloseReq is a request sent from an outsde sub-system to the arbitrator
// that watches a particular channel to broadcast the commitnet transaction,
// and enter the resolution phase of the channel.
type forceCloseReq struct {
// errResp is a channel that will be sent upon either in the case of force
// close success (nil error), or in the case on an erro (nil-nil error)
//
// NOTE; This channel MUST be buffered.
errResp chan error
// closeTx is a channel that carries the transaction which ultimatley
// closed out the channel.
closeTx chan *wire.MsgTx
}
// ForceCloseContract attempts to force close the channel infield by the passed
// channel point. A force close will immediately terminate the contract,
// causing it to enter the resolution phase. If the force close was successful,
// then the force close transaction itself will be returned.
//
// TODO(roasbeef): just return the summary itself?
func (c *ChainArbitrator) ForceCloseContract(chanPoint wire.OutPoint) (*wire.MsgTx, error) {
c.Lock()
arbitrator, ok := c.activeChannels[chanPoint]
c.Unlock()
if !ok {
return nil, fmt.Errorf("unable to find arbitrator")
}
log.Infof("Attempting to force close ChannelPoint(%v)", chanPoint)
errChan := make(chan error, 1)
respChan := make(chan *wire.MsgTx, 1)
// With the channel found, and the request crafted, we'll send over a
// force close request to the arbitrator that watches this channel.
select {
case arbitrator.forceCloseReqs <- &forceCloseReq{
errResp: errChan,
closeTx: respChan,
}:
case <-c.quit:
return nil, fmt.Errorf("ChainArbitrator shutting down")
}
// We'll await two resposnes: the error response, and the transaction
// that closed out the channel.
select {
case err := <-errChan:
if err != nil {
return nil, err
}
case <-c.quit:
return nil, fmt.Errorf("ChainArbitrator shutting down")
}
var closeTx *wire.MsgTx
select {
case closeTx = <-respChan:
case <-c.quit:
return nil, fmt.Errorf("ChainArbitrator shutting down")
}
return closeTx, nil
}
// RequestChannelArbitration sends the ChainArbitrator a message to create a
// ChannelArbitrator tasked with watching over a new channel. Once a new
// channel has finished its final funding flow, it should be registered with
// the ChainArbitrator so we can properly react to any on-chain events.
func (c *ChainArbitrator) RequestChannelArbitration(newChan *channeldb.OpenChannel) error {
c.Lock()
defer c.Unlock()
log.Infof("Creating new ChannelArbitrator for ChannelPoint(%v)",
newChan.FundingOutpoint)
// If we're already watching this channel, then we'll ignore this
// request.
chanPoint := newChan.FundingOutpoint
if _, ok := c.activeChannels[chanPoint]; ok {
return nil
}
// First, we'll create a new channel arbitrator instance using this new
// channel, and our internal state.
channelArb, err := newActiveChannelArbitrator(newChan, c)
if err != nil {
return err
}
// With the arbitrator created, we'll add it to our set of active
// arbitrators, then launch it.
c.activeChannels[chanPoint] = channelArb
return channelArb.Start()
}
// ManuallyResolveChannel is a method to be called by outside sub-systems if a
// channel becomes fully resolved, but due to manual intervention. An example
// of such a manual intervention includes a cooperative channel closuer.
//
// TODO(roasbeef): remove after arbs watch chain for all actions directly
func (c *ChainArbitrator) ManuallyResolveChannel(chanPoint wire.OutPoint) error {
c.Lock()
defer c.Unlock()
log.Infof("Manually resolving ChannelArbitrator for ChannelPoint(%v)",
chanPoint)
channelArb, ok := c.activeChannels[chanPoint]
if !ok {
return fmt.Errorf("unable to find arbitrator for: %v", chanPoint)
}
if err := channelArb.Stop(); err != nil {
return err
}
delete(c.activeChannels, chanPoint)
return channelArb.log.WipeHistory()
}
// SubscribeChannelSignals...
func (c *ChainArbitrator) SubscribeChannelSignals() error {
// TODO(roasbeef): gives signals of spends, breaches, etc
// * breach arg grabs initially
// * create new db method for *just* getting the outpoints for em all?
return nil
}
// TODO(roasbeef): arbitration reports
// * types: contested, waiting for success conf, etc