2296 lines
73 KiB
Go
2296 lines
73 KiB
Go
package contractcourt
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"github.com/btcsuite/btcd/wire"
|
|
"github.com/btcsuite/btcutil"
|
|
"github.com/davecgh/go-spew/spew"
|
|
"github.com/lightningnetwork/lnd/chainntnfs"
|
|
"github.com/lightningnetwork/lnd/channeldb"
|
|
"github.com/lightningnetwork/lnd/lntypes"
|
|
"github.com/lightningnetwork/lnd/lnwallet"
|
|
"github.com/lightningnetwork/lnd/lnwire"
|
|
)
|
|
|
|
var (
|
|
// errAlreadyForceClosed is an error returned when we attempt to force
|
|
// close a channel that's already in the process of doing so.
|
|
errAlreadyForceClosed = errors.New("channel is already in the " +
|
|
"process of being force closed")
|
|
)
|
|
|
|
// WitnessSubscription represents an intent to be notified once new witnesses
|
|
// are discovered by various active contract resolvers. A contract resolver may
|
|
// use this to be notified of when it can satisfy an incoming contract after we
|
|
// discover the witness for an outgoing contract.
|
|
type WitnessSubscription struct {
|
|
// WitnessUpdates is a channel that newly discovered witnesses will be
|
|
// sent over.
|
|
//
|
|
// TODO(roasbeef): couple with WitnessType?
|
|
WitnessUpdates <-chan lntypes.Preimage
|
|
|
|
// CancelSubscription is a function closure that should be used by a
|
|
// client to cancel the subscription once they are no longer interested
|
|
// in receiving new updates.
|
|
CancelSubscription func()
|
|
}
|
|
|
|
// WitnessBeacon is a global beacon of witnesses. Contract resolvers will use
|
|
// this interface to lookup witnesses (preimages typically) of contracts
|
|
// they're trying to resolve, add new preimages they resolve, and finally
|
|
// receive new updates each new time a preimage is discovered.
|
|
//
|
|
// TODO(roasbeef): need to delete the pre-images once we've used them
|
|
// and have been sufficiently confirmed?
|
|
type WitnessBeacon interface {
|
|
// SubscribeUpdates returns a channel that will be sent upon *each* time
|
|
// a new preimage is discovered.
|
|
SubscribeUpdates() *WitnessSubscription
|
|
|
|
// LookupPreImage attempts to lookup a preimage in the global cache.
|
|
// True is returned for the second argument if the preimage is found.
|
|
LookupPreimage(payhash lntypes.Hash) (lntypes.Preimage, bool)
|
|
|
|
// AddPreimages adds a batch of newly discovered preimages to the global
|
|
// cache, and also signals any subscribers of the newly discovered
|
|
// witness.
|
|
AddPreimages(preimages ...lntypes.Preimage) error
|
|
}
|
|
|
|
// ChannelArbitratorConfig contains all the functionality that the
|
|
// ChannelArbitrator needs in order to properly arbitrate any contract dispute
|
|
// on chain.
|
|
type ChannelArbitratorConfig struct {
|
|
// ChanPoint is the channel point that uniquely identifies this
|
|
// channel.
|
|
ChanPoint wire.OutPoint
|
|
|
|
// ShortChanID describes the exact location of the channel within the
|
|
// chain. We'll use this to address any messages that we need to send
|
|
// to the switch during contract resolution.
|
|
ShortChanID lnwire.ShortChannelID
|
|
|
|
// BlockEpochs is an active block epoch event stream backed by an
|
|
// active ChainNotifier instance. We will use new block notifications
|
|
// sent over this channel to decide when we should go on chain to
|
|
// reclaim/redeem the funds in an HTLC sent to/from us.
|
|
BlockEpochs *chainntnfs.BlockEpochEvent
|
|
|
|
// ChainEvents is an active subscription to the chain watcher for this
|
|
// channel to be notified of any on-chain activity related to this
|
|
// channel.
|
|
ChainEvents *ChainEventSubscription
|
|
|
|
// ForceCloseChan should force close the contract that this attendant
|
|
// is watching over. We'll use this when we decide that we need to go
|
|
// to chain. It should in addition tell the switch to remove the
|
|
// corresponding link, such that we won't accept any new updates. The
|
|
// returned summary contains all items needed to eventually resolve all
|
|
// outputs on chain.
|
|
ForceCloseChan func() (*lnwallet.LocalForceCloseSummary, error)
|
|
|
|
// MarkCommitmentBroadcasted should mark the channel as the commitment
|
|
// being broadcast, and we are waiting for the commitment to confirm.
|
|
MarkCommitmentBroadcasted func() error
|
|
|
|
// MarkChannelClosed marks the channel closed in the database, with the
|
|
// passed close summary. After this method successfully returns we can
|
|
// no longer expect to receive chain events for this channel, and must
|
|
// be able to recover from a failure without getting the close event
|
|
// again.
|
|
MarkChannelClosed func(*channeldb.ChannelCloseSummary) error
|
|
|
|
// IsPendingClose is a boolean indicating whether the channel is marked
|
|
// as pending close in the database.
|
|
IsPendingClose bool
|
|
|
|
// ClosingHeight is the height at which the channel was closed. Note
|
|
// that this value is only valid if IsPendingClose is true.
|
|
ClosingHeight uint32
|
|
|
|
// CloseType is the type of the close event in case IsPendingClose is
|
|
// true. Otherwise this value is unset.
|
|
CloseType channeldb.ClosureType
|
|
|
|
// MarkChannelResolved is a function closure that serves to mark a
|
|
// channel as "fully resolved". A channel itself can be considered
|
|
// fully resolved once all active contracts have individually been
|
|
// fully resolved.
|
|
//
|
|
// TODO(roasbeef): need RPC's to combine for pendingchannels RPC
|
|
MarkChannelResolved func() error
|
|
|
|
ChainArbitratorConfig
|
|
}
|
|
|
|
// ContractReport provides a summary of a commitment tx output.
|
|
type ContractReport struct {
|
|
// Outpoint is the final output that will be swept back to the wallet.
|
|
Outpoint wire.OutPoint
|
|
|
|
// Incoming indicates whether the htlc was incoming to this channel.
|
|
Incoming bool
|
|
|
|
// Amount is the final value that will be swept in back to the wallet.
|
|
Amount btcutil.Amount
|
|
|
|
// MaturityHeight is the absolute block height that this output will
|
|
// mature at.
|
|
MaturityHeight uint32
|
|
|
|
// Stage indicates whether the htlc is in the CLTV-timeout stage (1) or
|
|
// the CSV-delay stage (2). A stage 1 htlc's maturity height will be set
|
|
// to its expiry height, while a stage 2 htlc's maturity height will be
|
|
// set to its confirmation height plus the maturity requirement.
|
|
Stage uint32
|
|
|
|
// LimboBalance is the total number of frozen coins within this
|
|
// contract.
|
|
LimboBalance btcutil.Amount
|
|
|
|
// RecoveredBalance is the total value that has been successfully swept
|
|
// back to the user's wallet.
|
|
RecoveredBalance btcutil.Amount
|
|
}
|
|
|
|
// htlcSet represents the set of active HTLCs on a given commitment
|
|
// transaction.
|
|
type htlcSet struct {
|
|
// incomingHTLCs is a map of all incoming HTLCs on the target
|
|
// commitment transaction. We may potentially go onchain to claim the
|
|
// funds sent to us within this set.
|
|
incomingHTLCs map[uint64]channeldb.HTLC
|
|
|
|
// outgoingHTLCs is a map of all outgoing HTLCs on the target
|
|
// commitment transaction. We may potentially go onchain to reclaim the
|
|
// funds that are currently in limbo.
|
|
outgoingHTLCs map[uint64]channeldb.HTLC
|
|
}
|
|
|
|
// newHtlcSet constructs a new HTLC set from a slice of HTLC's.
|
|
func newHtlcSet(htlcs []channeldb.HTLC) htlcSet {
|
|
outHTLCs := make(map[uint64]channeldb.HTLC)
|
|
inHTLCs := make(map[uint64]channeldb.HTLC)
|
|
for _, htlc := range htlcs {
|
|
if htlc.Incoming {
|
|
inHTLCs[htlc.HtlcIndex] = htlc
|
|
continue
|
|
}
|
|
|
|
outHTLCs[htlc.HtlcIndex] = htlc
|
|
}
|
|
|
|
return htlcSet{
|
|
incomingHTLCs: inHTLCs,
|
|
outgoingHTLCs: outHTLCs,
|
|
}
|
|
}
|
|
|
|
// HtlcSetKey is a two-tuple that uniquely identifies a set of HTLCs on a
|
|
// commitment transaction.
|
|
type HtlcSetKey struct {
|
|
// IsRemote denotes if the HTLCs are on the remote commitment
|
|
// transaction.
|
|
IsRemote bool
|
|
|
|
// IsPending denotes if the commitment transaction that HTLCS are on
|
|
// are pending (the higher of two unrevoked commitments).
|
|
IsPending bool
|
|
}
|
|
|
|
var (
|
|
// LocalHtlcSet is the HtlcSetKey used for local commitments.
|
|
LocalHtlcSet = HtlcSetKey{IsRemote: false, IsPending: false}
|
|
|
|
// RemoteHtlcSet is the HtlcSetKey used for remote commitments.
|
|
RemoteHtlcSet = HtlcSetKey{IsRemote: true, IsPending: false}
|
|
|
|
// RemotePendingHtlcSet is the HtlcSetKey used for dangling remote
|
|
// commitment transactions.
|
|
RemotePendingHtlcSet = HtlcSetKey{IsRemote: true, IsPending: true}
|
|
)
|
|
|
|
// String returns a human readable string describing the target HtlcSetKey.
|
|
func (h HtlcSetKey) String() string {
|
|
switch h {
|
|
case LocalHtlcSet:
|
|
return "LocalHtlcSet"
|
|
case RemoteHtlcSet:
|
|
return "RemoteHtlcSet"
|
|
case RemotePendingHtlcSet:
|
|
return "RemotePendingHtlcSet"
|
|
default:
|
|
return "unknown HtlcSetKey"
|
|
}
|
|
}
|
|
|
|
// ChannelArbitrator is the on-chain arbitrator for a particular channel. The
|
|
// struct will keep in sync with the current set of HTLCs on the commitment
|
|
// transaction. The job of the attendant is to go on-chain to either settle or
|
|
// cancel an HTLC as necessary iff: an HTLC times out, or we known the
|
|
// pre-image to an HTLC, but it wasn't settled by the link off-chain. The
|
|
// ChannelArbitrator will factor in an expected confirmation delta when
|
|
// broadcasting to ensure that we avoid any possibility of race conditions, and
|
|
// sweep the output(s) without contest.
|
|
type ChannelArbitrator struct {
|
|
started int32 // To be used atomically.
|
|
stopped int32 // To be used atomically.
|
|
|
|
// log is a persistent log that the attendant will use to checkpoint
|
|
// its next action, and the state of any unresolved contracts.
|
|
log ArbitratorLog
|
|
|
|
// activeHTLCs is the set of active incoming/outgoing HTLC's on all
|
|
// currently valid commitment transactions.
|
|
activeHTLCs map[HtlcSetKey]htlcSet
|
|
|
|
// cfg contains all the functionality that the ChannelArbitrator requires
|
|
// to do its duty.
|
|
cfg ChannelArbitratorConfig
|
|
|
|
// signalUpdates is a channel that any new live signals for the channel
|
|
// we're watching over will be sent.
|
|
signalUpdates chan *signalUpdateMsg
|
|
|
|
// htlcUpdates is a channel that is sent upon with new updates from the
|
|
// active channel. Each time a new commitment state is accepted, the
|
|
// set of HTLC's on the new state should be sent across this channel.
|
|
htlcUpdates <-chan *ContractUpdate
|
|
|
|
// activeResolvers is a slice of any active resolvers. This is used to
|
|
// be able to signal them for shutdown in the case that we shutdown.
|
|
activeResolvers []ContractResolver
|
|
|
|
// activeResolversLock prevents simultaneous read and write to the
|
|
// resolvers slice.
|
|
activeResolversLock sync.RWMutex
|
|
|
|
// resolutionSignal is a channel that will be sent upon by contract
|
|
// resolvers once their contract has been fully resolved. With each
|
|
// send, we'll check to see if the contract is fully resolved.
|
|
resolutionSignal chan struct{}
|
|
|
|
// forceCloseReqs is a channel that requests to forcibly close the
|
|
// contract will be sent over.
|
|
forceCloseReqs chan *forceCloseReq
|
|
|
|
// state is the current state of the arbitrator. This state is examined
|
|
// upon start up to decide which actions to take.
|
|
state ArbitratorState
|
|
|
|
wg sync.WaitGroup
|
|
quit chan struct{}
|
|
}
|
|
|
|
// NewChannelArbitrator returns a new instance of a ChannelArbitrator backed by
|
|
// the passed config struct.
|
|
func NewChannelArbitrator(cfg ChannelArbitratorConfig,
|
|
htlcSets map[HtlcSetKey]htlcSet, log ArbitratorLog) *ChannelArbitrator {
|
|
|
|
return &ChannelArbitrator{
|
|
log: log,
|
|
signalUpdates: make(chan *signalUpdateMsg),
|
|
htlcUpdates: make(<-chan *ContractUpdate),
|
|
resolutionSignal: make(chan struct{}),
|
|
forceCloseReqs: make(chan *forceCloseReq),
|
|
activeHTLCs: htlcSets,
|
|
cfg: cfg,
|
|
quit: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// Start starts all the goroutines that the ChannelArbitrator needs to operate.
|
|
func (c *ChannelArbitrator) Start() error {
|
|
if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
|
|
return nil
|
|
}
|
|
|
|
var (
|
|
err error
|
|
)
|
|
|
|
log.Debugf("Starting ChannelArbitrator(%v), htlc_set=%v",
|
|
c.cfg.ChanPoint, newLogClosure(func() string {
|
|
return spew.Sdump(c.activeHTLCs)
|
|
}),
|
|
)
|
|
|
|
// First, we'll read our last state from disk, so our internal state
|
|
// machine can act accordingly.
|
|
c.state, err = c.log.CurrentState()
|
|
if err != nil {
|
|
c.cfg.BlockEpochs.Cancel()
|
|
return err
|
|
}
|
|
|
|
log.Infof("ChannelArbitrator(%v): starting state=%v", c.cfg.ChanPoint,
|
|
c.state)
|
|
|
|
_, bestHeight, err := c.cfg.ChainIO.GetBestBlock()
|
|
if err != nil {
|
|
c.cfg.BlockEpochs.Cancel()
|
|
return err
|
|
}
|
|
|
|
// If the channel has been marked pending close in the database, and we
|
|
// haven't transitioned the state machine to StateContractClosed (or a
|
|
// succeeding state), then a state transition most likely failed. We'll
|
|
// try to recover from this by manually advancing the state by setting
|
|
// the corresponding close trigger.
|
|
trigger := chainTrigger
|
|
triggerHeight := uint32(bestHeight)
|
|
if c.cfg.IsPendingClose {
|
|
switch c.state {
|
|
case StateDefault:
|
|
fallthrough
|
|
case StateBroadcastCommit:
|
|
fallthrough
|
|
case StateCommitmentBroadcasted:
|
|
switch c.cfg.CloseType {
|
|
|
|
case channeldb.CooperativeClose:
|
|
trigger = coopCloseTrigger
|
|
|
|
case channeldb.LocalForceClose:
|
|
trigger = localCloseTrigger
|
|
|
|
case channeldb.RemoteForceClose:
|
|
trigger = remoteCloseTrigger
|
|
}
|
|
triggerHeight = c.cfg.ClosingHeight
|
|
|
|
log.Warnf("ChannelArbitrator(%v): detected stalled "+
|
|
"state=%v for closed channel, using "+
|
|
"trigger=%v", c.cfg.ChanPoint, c.state, trigger)
|
|
}
|
|
}
|
|
|
|
// Next we'll fetch our confirmed commitment set. This will only exist
|
|
// if the channel has been closed out on chain for modern nodes. For
|
|
// older nodes, this won't be found at all, and will rely on the
|
|
// existing written chain actions. Additionally, if this channel hasn't
|
|
// logged any actions in the log, then this field won't be present.
|
|
commitSet, err := c.log.FetchConfirmedCommitSet()
|
|
if err != nil && err != errNoCommitSet && err != errScopeBucketNoExist {
|
|
return err
|
|
}
|
|
|
|
// We'll now attempt to advance our state forward based on the current
|
|
// on-chain state, and our set of active contracts.
|
|
startingState := c.state
|
|
nextState, _, err := c.advanceState(
|
|
triggerHeight, trigger, commitSet,
|
|
)
|
|
if err != nil {
|
|
switch err {
|
|
|
|
// If we detect that we tried to fetch resolutions, but failed,
|
|
// this channel was marked closed in the database before
|
|
// resolutions successfully written. In this case there is not
|
|
// much we can do, so we don't return the error.
|
|
case errScopeBucketNoExist:
|
|
fallthrough
|
|
case errNoResolutions:
|
|
log.Warnf("ChannelArbitrator(%v): detected closed"+
|
|
"channel with no contract resolutions written.",
|
|
c.cfg.ChanPoint)
|
|
|
|
default:
|
|
c.cfg.BlockEpochs.Cancel()
|
|
return err
|
|
}
|
|
}
|
|
|
|
// If we start and ended at the awaiting full resolution state, then
|
|
// we'll relaunch our set of unresolved contracts.
|
|
if startingState == StateWaitingFullResolution &&
|
|
nextState == StateWaitingFullResolution {
|
|
|
|
if err := c.relaunchResolvers(); err != nil {
|
|
c.cfg.BlockEpochs.Cancel()
|
|
return err
|
|
}
|
|
}
|
|
|
|
// TODO(roasbeef): cancel if breached
|
|
|
|
c.wg.Add(1)
|
|
go c.channelAttendant(bestHeight)
|
|
return nil
|
|
}
|
|
|
|
// relauchResolvers relaunches the set of resolvers for unresolved contracts in
|
|
// order to provide them with information that's not immediately available upon
|
|
// starting the ChannelArbitrator. This information should ideally be stored in
|
|
// the database, so this only serves as a intermediate work-around to prevent a
|
|
// migration.
|
|
func (c *ChannelArbitrator) relaunchResolvers() error {
|
|
// We'll now query our log to see if there are any active
|
|
// unresolved contracts. If this is the case, then we'll
|
|
// relaunch all contract resolvers.
|
|
unresolvedContracts, err := c.log.FetchUnresolvedContracts()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Retrieve the commitment tx hash from the log.
|
|
contractResolutions, err := c.log.FetchContractResolutions()
|
|
if err != nil {
|
|
log.Errorf("unable to fetch contract resolutions: %v",
|
|
err)
|
|
return err
|
|
}
|
|
commitHash := contractResolutions.CommitHash
|
|
|
|
// Reconstruct the htlc outpoints and data from the chain action log.
|
|
// The purpose of the constructed htlc map is to supplement to
|
|
// resolvers restored from database with extra data. Ideally this data
|
|
// is stored as part of the resolver in the log. This is a workaround
|
|
// to prevent a db migration. We use all available htlc sets here in
|
|
// order to ensure we have complete coverage.
|
|
htlcMap := make(map[wire.OutPoint]*channeldb.HTLC)
|
|
for _, htlcs := range c.activeHTLCs {
|
|
for _, htlc := range htlcs.incomingHTLCs {
|
|
htlc := htlc
|
|
outpoint := wire.OutPoint{
|
|
Hash: commitHash,
|
|
Index: uint32(htlc.OutputIndex),
|
|
}
|
|
htlcMap[outpoint] = &htlc
|
|
}
|
|
|
|
for _, htlc := range htlcs.outgoingHTLCs {
|
|
htlc := htlc
|
|
outpoint := wire.OutPoint{
|
|
Hash: commitHash,
|
|
Index: uint32(htlc.OutputIndex),
|
|
}
|
|
htlcMap[outpoint] = &htlc
|
|
}
|
|
}
|
|
|
|
log.Infof("ChannelArbitrator(%v): relaunching %v contract "+
|
|
"resolvers", c.cfg.ChanPoint, len(unresolvedContracts))
|
|
|
|
for _, resolver := range unresolvedContracts {
|
|
c.supplementResolver(resolver, htlcMap)
|
|
}
|
|
|
|
c.launchResolvers(unresolvedContracts)
|
|
|
|
return nil
|
|
}
|
|
|
|
// supplementResolver takes a resolver as it is restored from the log and fills
|
|
// in missing data from the htlcMap.
|
|
func (c *ChannelArbitrator) supplementResolver(resolver ContractResolver,
|
|
htlcMap map[wire.OutPoint]*channeldb.HTLC) error {
|
|
|
|
switch r := resolver.(type) {
|
|
|
|
case *htlcSuccessResolver:
|
|
return c.supplementSuccessResolver(r, htlcMap)
|
|
|
|
case *htlcIncomingContestResolver:
|
|
return c.supplementIncomingContestResolver(r, htlcMap)
|
|
|
|
case *htlcTimeoutResolver:
|
|
return c.supplementTimeoutResolver(r, htlcMap)
|
|
|
|
case *htlcOutgoingContestResolver:
|
|
return c.supplementTimeoutResolver(
|
|
&r.htlcTimeoutResolver, htlcMap,
|
|
)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// supplementSuccessResolver takes a htlcIncomingContestResolver as it is
|
|
// restored from the log and fills in missing data from the htlcMap.
|
|
func (c *ChannelArbitrator) supplementIncomingContestResolver(
|
|
r *htlcIncomingContestResolver,
|
|
htlcMap map[wire.OutPoint]*channeldb.HTLC) error {
|
|
|
|
res := r.htlcResolution
|
|
htlcPoint := res.HtlcPoint()
|
|
htlc, ok := htlcMap[htlcPoint]
|
|
if !ok {
|
|
return errors.New(
|
|
"htlc for incoming contest resolver unavailable",
|
|
)
|
|
}
|
|
|
|
r.htlcAmt = htlc.Amt
|
|
r.circuitKey = channeldb.CircuitKey{
|
|
ChanID: c.cfg.ShortChanID,
|
|
HtlcID: htlc.HtlcIndex,
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// supplementSuccessResolver takes a htlcSuccessResolver as it is restored from
|
|
// the log and fills in missing data from the htlcMap.
|
|
func (c *ChannelArbitrator) supplementSuccessResolver(r *htlcSuccessResolver,
|
|
htlcMap map[wire.OutPoint]*channeldb.HTLC) error {
|
|
|
|
res := r.htlcResolution
|
|
htlcPoint := res.HtlcPoint()
|
|
htlc, ok := htlcMap[htlcPoint]
|
|
if !ok {
|
|
return errors.New(
|
|
"htlc for success resolver unavailable",
|
|
)
|
|
}
|
|
r.htlcAmt = htlc.Amt
|
|
return nil
|
|
}
|
|
|
|
// supplementTimeoutResolver takes a htlcSuccessResolver as it is restored from
|
|
// the log and fills in missing data from the htlcMap.
|
|
func (c *ChannelArbitrator) supplementTimeoutResolver(r *htlcTimeoutResolver,
|
|
htlcMap map[wire.OutPoint]*channeldb.HTLC) error {
|
|
|
|
res := r.htlcResolution
|
|
htlcPoint := res.HtlcPoint()
|
|
htlc, ok := htlcMap[htlcPoint]
|
|
if !ok {
|
|
return errors.New(
|
|
"htlc for timeout resolver unavailable",
|
|
)
|
|
}
|
|
r.htlcAmt = htlc.Amt
|
|
return nil
|
|
}
|
|
|
|
// Report returns htlc reports for the active resolvers.
|
|
func (c *ChannelArbitrator) Report() []*ContractReport {
|
|
c.activeResolversLock.RLock()
|
|
defer c.activeResolversLock.RUnlock()
|
|
|
|
var reports []*ContractReport
|
|
for _, resolver := range c.activeResolvers {
|
|
r, ok := resolver.(reportingContractResolver)
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
if r.IsResolved() {
|
|
continue
|
|
}
|
|
|
|
report := r.report()
|
|
if report == nil {
|
|
continue
|
|
}
|
|
|
|
reports = append(reports, report)
|
|
}
|
|
|
|
return reports
|
|
}
|
|
|
|
// Stop signals the ChannelArbitrator for a graceful shutdown.
|
|
func (c *ChannelArbitrator) Stop() error {
|
|
if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
|
|
return nil
|
|
}
|
|
|
|
log.Debugf("Stopping ChannelArbitrator(%v)", c.cfg.ChanPoint)
|
|
|
|
if c.cfg.ChainEvents.Cancel != nil {
|
|
go c.cfg.ChainEvents.Cancel()
|
|
}
|
|
|
|
c.activeResolversLock.RLock()
|
|
for _, activeResolver := range c.activeResolvers {
|
|
activeResolver.Stop()
|
|
}
|
|
c.activeResolversLock.RUnlock()
|
|
|
|
close(c.quit)
|
|
c.wg.Wait()
|
|
|
|
return nil
|
|
}
|
|
|
|
// transitionTrigger is an enum that denotes exactly *why* a state transition
|
|
// was initiated. This is useful as depending on the initial trigger, we may
|
|
// skip certain states as those actions are expected to have already taken
|
|
// place as a result of the external trigger.
|
|
type transitionTrigger uint8
|
|
|
|
const (
|
|
// chainTrigger is a transition trigger that has been attempted due to
|
|
// changing on-chain conditions such as a block which times out HTLC's
|
|
// being attached.
|
|
chainTrigger transitionTrigger = iota
|
|
|
|
// userTrigger is a transition trigger driven by user action. Examples
|
|
// of such a trigger include a user requesting a force closure of the
|
|
// channel.
|
|
userTrigger
|
|
|
|
// remoteCloseTrigger is a transition trigger driven by the remote
|
|
// peer's commitment being confirmed.
|
|
remoteCloseTrigger
|
|
|
|
// localCloseTrigger is a transition trigger driven by our commitment
|
|
// being confirmed.
|
|
localCloseTrigger
|
|
|
|
// coopCloseTrigger is a transition trigger driven by a cooperative
|
|
// close transaction being confirmed.
|
|
coopCloseTrigger
|
|
)
|
|
|
|
// String returns a human readable string describing the passed
|
|
// transitionTrigger.
|
|
func (t transitionTrigger) String() string {
|
|
switch t {
|
|
case chainTrigger:
|
|
return "chainTrigger"
|
|
|
|
case remoteCloseTrigger:
|
|
return "remoteCloseTrigger"
|
|
|
|
case userTrigger:
|
|
return "userTrigger"
|
|
|
|
case localCloseTrigger:
|
|
return "localCloseTrigger"
|
|
|
|
case coopCloseTrigger:
|
|
return "coopCloseTrigger"
|
|
|
|
default:
|
|
return "unknown trigger"
|
|
}
|
|
}
|
|
|
|
// stateStep is a help method that examines our internal state, and attempts
|
|
// the appropriate state transition if necessary. The next state we transition
|
|
// to is returned, Additionally, if the next transition results in a commitment
|
|
// broadcast, the commitment transaction itself is returned.
|
|
func (c *ChannelArbitrator) stateStep(
|
|
triggerHeight uint32, trigger transitionTrigger,
|
|
confCommitSet *CommitSet) (ArbitratorState, *wire.MsgTx, error) {
|
|
|
|
var (
|
|
nextState ArbitratorState
|
|
closeTx *wire.MsgTx
|
|
)
|
|
switch c.state {
|
|
|
|
// If we're in the default state, then we'll check our set of actions
|
|
// to see if while we were down, conditions have changed.
|
|
case StateDefault:
|
|
log.Debugf("ChannelArbitrator(%v): new block (height=%v) "+
|
|
"examining active HTLC's", c.cfg.ChanPoint,
|
|
triggerHeight)
|
|
|
|
// As a new block has been connected to the end of the main
|
|
// chain, we'll check to see if we need to make any on-chain
|
|
// claims on behalf of the channel contract that we're
|
|
// arbitrating for. If a commitment has confirmed, then we'll
|
|
// use the set snapshot from the chain, otherwise we'll use our
|
|
// current set.
|
|
var htlcs map[HtlcSetKey]htlcSet
|
|
if confCommitSet != nil {
|
|
htlcs = confCommitSet.toActiveHTLCSets()
|
|
} else {
|
|
htlcs = c.activeHTLCs
|
|
}
|
|
chainActions, err := c.checkLocalChainActions(
|
|
triggerHeight, trigger, htlcs, false,
|
|
)
|
|
if err != nil {
|
|
return StateDefault, nil, err
|
|
}
|
|
|
|
// If there are no actions to be made, then we'll remain in the
|
|
// default state. If this isn't a self initiated event (we're
|
|
// checking due to a chain update), then we'll exit now.
|
|
if len(chainActions) == 0 && trigger == chainTrigger {
|
|
log.Tracef("ChannelArbitrator(%v): no actions for "+
|
|
"chain trigger, terminating", c.cfg.ChanPoint)
|
|
|
|
return StateDefault, closeTx, nil
|
|
}
|
|
|
|
// Otherwise, we'll log that we checked the HTLC actions as the
|
|
// commitment transaction has already been broadcast.
|
|
log.Tracef("ChannelArbitrator(%v): logging chain_actions=%v",
|
|
c.cfg.ChanPoint,
|
|
newLogClosure(func() string {
|
|
return spew.Sdump(chainActions)
|
|
}))
|
|
|
|
// Depending on the type of trigger, we'll either "tunnel"
|
|
// through to a farther state, or just proceed linearly to the
|
|
// next state.
|
|
switch trigger {
|
|
|
|
// If this is a chain trigger, then we'll go straight to the
|
|
// next state, as we still need to broadcast the commitment
|
|
// transaction.
|
|
case chainTrigger:
|
|
fallthrough
|
|
case userTrigger:
|
|
nextState = StateBroadcastCommit
|
|
|
|
// If the trigger is a cooperative close being confirmed, then
|
|
// we can go straight to StateFullyResolved, as there won't be
|
|
// any contracts to resolve.
|
|
case coopCloseTrigger:
|
|
nextState = StateFullyResolved
|
|
|
|
// Otherwise, if this state advance was triggered by a
|
|
// commitment being confirmed on chain, then we'll jump
|
|
// straight to the state where the contract has already been
|
|
// closed, and we will inspect the set of unresolved contracts.
|
|
case localCloseTrigger:
|
|
log.Errorf("ChannelArbitrator(%v): unexpected local "+
|
|
"commitment confirmed while in StateDefault",
|
|
c.cfg.ChanPoint)
|
|
fallthrough
|
|
case remoteCloseTrigger:
|
|
nextState = StateContractClosed
|
|
}
|
|
|
|
// If we're in this state, then we've decided to broadcast the
|
|
// commitment transaction. We enter this state either due to an outside
|
|
// sub-system, or because an on-chain action has been triggered.
|
|
case StateBroadcastCommit:
|
|
// Under normal operation, we can only enter
|
|
// StateBroadcastCommit via a user or chain trigger. On restart,
|
|
// this state may be reexecuted after closing the channel, but
|
|
// failing to commit to StateContractClosed or
|
|
// StateFullyResolved. In that case, one of the three close
|
|
// triggers will be presented, signifying that we should skip
|
|
// rebroadcasting, and go straight to resolving the on-chain
|
|
// contract or marking the channel resolved.
|
|
switch trigger {
|
|
case localCloseTrigger, remoteCloseTrigger:
|
|
log.Infof("ChannelArbitrator(%v): detected %s "+
|
|
"close after closing channel, fast-forwarding "+
|
|
"to %s to resolve contract",
|
|
c.cfg.ChanPoint, trigger, StateContractClosed)
|
|
return StateContractClosed, closeTx, nil
|
|
|
|
case coopCloseTrigger:
|
|
log.Infof("ChannelArbitrator(%v): detected %s "+
|
|
"close after closing channel, fast-forwarding "+
|
|
"to %s to resolve contract",
|
|
c.cfg.ChanPoint, trigger, StateFullyResolved)
|
|
return StateFullyResolved, closeTx, nil
|
|
}
|
|
|
|
log.Infof("ChannelArbitrator(%v): force closing "+
|
|
"chan", c.cfg.ChanPoint)
|
|
|
|
// Now that we have all the actions decided for the set of
|
|
// HTLC's, we'll broadcast the commitment transaction, and
|
|
// signal the link to exit.
|
|
|
|
// We'll tell the switch that it should remove the link for
|
|
// this channel, in addition to fetching the force close
|
|
// summary needed to close this channel on chain.
|
|
closeSummary, err := c.cfg.ForceCloseChan()
|
|
if err != nil {
|
|
log.Errorf("ChannelArbitrator(%v): unable to "+
|
|
"force close: %v", c.cfg.ChanPoint, err)
|
|
return StateError, closeTx, err
|
|
}
|
|
closeTx = closeSummary.CloseTx
|
|
|
|
// With the close transaction in hand, broadcast the
|
|
// transaction to the network, thereby entering the post
|
|
// channel resolution state.
|
|
log.Infof("Broadcasting force close transaction, "+
|
|
"ChannelPoint(%v): %v", c.cfg.ChanPoint,
|
|
newLogClosure(func() string {
|
|
return spew.Sdump(closeTx)
|
|
}))
|
|
|
|
// At this point, we'll now broadcast the commitment
|
|
// transaction itself.
|
|
if err := c.cfg.PublishTx(closeTx); err != nil {
|
|
log.Errorf("ChannelArbitrator(%v): unable to broadcast "+
|
|
"close tx: %v", c.cfg.ChanPoint, err)
|
|
if err != lnwallet.ErrDoubleSpend {
|
|
return StateError, closeTx, err
|
|
}
|
|
}
|
|
|
|
if err := c.cfg.MarkCommitmentBroadcasted(); err != nil {
|
|
log.Errorf("ChannelArbitrator(%v): unable to "+
|
|
"mark commitment broadcasted: %v",
|
|
c.cfg.ChanPoint, err)
|
|
}
|
|
|
|
// We go to the StateCommitmentBroadcasted state, where we'll
|
|
// be waiting for the commitment to be confirmed.
|
|
nextState = StateCommitmentBroadcasted
|
|
|
|
// In this state we have broadcasted our own commitment, and will need
|
|
// to wait for a commitment (not necessarily the one we broadcasted!)
|
|
// to be confirmed.
|
|
case StateCommitmentBroadcasted:
|
|
switch trigger {
|
|
// We are waiting for a commitment to be confirmed, so any
|
|
// other trigger will be ignored.
|
|
case chainTrigger, userTrigger:
|
|
log.Infof("ChannelArbitrator(%v): noop trigger %v",
|
|
c.cfg.ChanPoint, trigger)
|
|
nextState = StateCommitmentBroadcasted
|
|
|
|
// If this state advance was triggered by any of the
|
|
// commitments being confirmed, then we'll jump to the state
|
|
// where the contract has been closed.
|
|
case localCloseTrigger, remoteCloseTrigger:
|
|
log.Infof("ChannelArbitrator(%v): trigger %v, "+
|
|
" going to StateContractClosed",
|
|
c.cfg.ChanPoint, trigger)
|
|
nextState = StateContractClosed
|
|
|
|
case coopCloseTrigger:
|
|
log.Infof("ChannelArbitrator(%v): trigger %v, "+
|
|
" going to StateFullyResolved",
|
|
c.cfg.ChanPoint, trigger)
|
|
nextState = StateFullyResolved
|
|
}
|
|
|
|
// If we're in this state, then the contract has been fully closed to
|
|
// outside sub-systems, so we'll process the prior set of on-chain
|
|
// contract actions and launch a set of resolvers.
|
|
case StateContractClosed:
|
|
// First, we'll fetch our chain actions, and both sets of
|
|
// resolutions so we can process them.
|
|
contractResolutions, err := c.log.FetchContractResolutions()
|
|
if err != nil {
|
|
log.Errorf("unable to fetch contract resolutions: %v",
|
|
err)
|
|
return StateError, closeTx, err
|
|
}
|
|
|
|
// If the resolution is empty, and we have no HTLCs at all to
|
|
// tend to, then we're done here. We don't need to launch any
|
|
// resolvers, and can go straight to our final state.
|
|
if contractResolutions.IsEmpty() && confCommitSet.IsEmpty() {
|
|
log.Infof("ChannelArbitrator(%v): contract "+
|
|
"resolutions empty, marking channel as fully resolved!",
|
|
c.cfg.ChanPoint)
|
|
nextState = StateFullyResolved
|
|
break
|
|
}
|
|
|
|
// If we've have broadcast the commitment transaction, we send
|
|
// our commitment output for incubation, but only if it wasn't
|
|
// trimmed. We'll need to wait for a CSV timeout before we can
|
|
// reclaim the funds.
|
|
commitRes := contractResolutions.CommitResolution
|
|
if commitRes != nil && commitRes.MaturityDelay > 0 {
|
|
log.Infof("ChannelArbitrator(%v): sending commit "+
|
|
"output for incubation", c.cfg.ChanPoint)
|
|
|
|
err = c.cfg.IncubateOutputs(
|
|
c.cfg.ChanPoint, commitRes,
|
|
nil, nil, triggerHeight,
|
|
)
|
|
if err != nil {
|
|
// TODO(roasbeef): check for AlreadyExists errors
|
|
log.Errorf("unable to incubate commitment "+
|
|
"output: %v", err)
|
|
return StateError, closeTx, err
|
|
}
|
|
}
|
|
|
|
// Now that we know we'll need to act, we'll process the htlc
|
|
// actions, wen create the structures we need to resolve all
|
|
// outstanding contracts.
|
|
htlcResolvers, pktsToSend, err := c.prepContractResolutions(
|
|
contractResolutions, triggerHeight, trigger,
|
|
confCommitSet,
|
|
)
|
|
if err != nil {
|
|
log.Errorf("ChannelArbitrator(%v): unable to "+
|
|
"resolve contracts: %v", c.cfg.ChanPoint, err)
|
|
return StateError, closeTx, err
|
|
}
|
|
|
|
log.Debugf("ChannelArbitrator(%v): sending resolution message=%v",
|
|
c.cfg.ChanPoint,
|
|
newLogClosure(func() string {
|
|
return spew.Sdump(pktsToSend)
|
|
}))
|
|
|
|
// With the commitment broadcast, we'll then send over all
|
|
// messages we can send immediately.
|
|
if len(pktsToSend) != 0 {
|
|
err := c.cfg.DeliverResolutionMsg(pktsToSend...)
|
|
if err != nil {
|
|
// TODO(roasbeef): make sure packet sends are
|
|
// idempotent
|
|
log.Errorf("unable to send pkts: %v", err)
|
|
return StateError, closeTx, err
|
|
}
|
|
}
|
|
|
|
log.Debugf("ChannelArbitrator(%v): inserting %v contract "+
|
|
"resolvers", c.cfg.ChanPoint, len(htlcResolvers))
|
|
|
|
err = c.log.InsertUnresolvedContracts(htlcResolvers...)
|
|
if err != nil {
|
|
return StateError, closeTx, err
|
|
}
|
|
|
|
// Finally, we'll launch all the required contract resolvers.
|
|
// Once they're all resolved, we're no longer needed.
|
|
c.launchResolvers(htlcResolvers)
|
|
|
|
nextState = StateWaitingFullResolution
|
|
|
|
// This is our terminal state. We'll keep returning this state until
|
|
// all contracts are fully resolved.
|
|
case StateWaitingFullResolution:
|
|
log.Infof("ChannelArbitrator(%v): still awaiting contract "+
|
|
"resolution", c.cfg.ChanPoint)
|
|
|
|
numUnresolved, err := c.log.FetchUnresolvedContracts()
|
|
if err != nil {
|
|
return StateError, closeTx, err
|
|
}
|
|
|
|
// If we still have unresolved contracts, then we'll stay alive
|
|
// to oversee their resolution.
|
|
if len(numUnresolved) != 0 {
|
|
nextState = StateWaitingFullResolution
|
|
break
|
|
}
|
|
|
|
nextState = StateFullyResolved
|
|
|
|
// If we start as fully resolved, then we'll end as fully resolved.
|
|
case StateFullyResolved:
|
|
// To ensure that the state of the contract in persistent
|
|
// storage is properly reflected, we'll mark the contract as
|
|
// fully resolved now.
|
|
nextState = StateFullyResolved
|
|
|
|
log.Infof("ChannelPoint(%v) has been fully resolved "+
|
|
"on-chain at height=%v", c.cfg.ChanPoint, triggerHeight)
|
|
|
|
if err := c.cfg.MarkChannelResolved(); err != nil {
|
|
log.Errorf("unable to mark channel resolved: %v", err)
|
|
return StateError, closeTx, err
|
|
}
|
|
}
|
|
|
|
log.Tracef("ChannelArbitrator(%v): next_state=%v", c.cfg.ChanPoint,
|
|
nextState)
|
|
|
|
return nextState, closeTx, nil
|
|
}
|
|
|
|
// launchResolvers updates the activeResolvers list and starts the resolvers.
|
|
func (c *ChannelArbitrator) launchResolvers(resolvers []ContractResolver) {
|
|
c.activeResolversLock.Lock()
|
|
defer c.activeResolversLock.Unlock()
|
|
|
|
c.activeResolvers = resolvers
|
|
for _, contract := range resolvers {
|
|
c.wg.Add(1)
|
|
go c.resolveContract(contract)
|
|
}
|
|
}
|
|
|
|
// advanceState is the main driver of our state machine. This method is an
|
|
// iterative function which repeatedly attempts to advance the internal state
|
|
// of the channel arbitrator. The state will be advanced until we reach a
|
|
// redundant transition, meaning that the state transition is a noop. The final
|
|
// param is a callback that allows the caller to execute an arbitrary action
|
|
// after each state transition.
|
|
func (c *ChannelArbitrator) advanceState(
|
|
triggerHeight uint32, trigger transitionTrigger,
|
|
confCommitSet *CommitSet) (ArbitratorState, *wire.MsgTx, error) {
|
|
|
|
var (
|
|
priorState ArbitratorState
|
|
forceCloseTx *wire.MsgTx
|
|
)
|
|
|
|
// We'll continue to advance our state forward until the state we
|
|
// transition to is that same state that we started at.
|
|
for {
|
|
priorState = c.state
|
|
log.Tracef("ChannelArbitrator(%v): attempting state step with "+
|
|
"trigger=%v from state=%v", c.cfg.ChanPoint, trigger,
|
|
priorState)
|
|
|
|
nextState, closeTx, err := c.stateStep(
|
|
triggerHeight, trigger, confCommitSet,
|
|
)
|
|
if err != nil {
|
|
log.Errorf("ChannelArbitrator(%v): unable to advance "+
|
|
"state: %v", c.cfg.ChanPoint, err)
|
|
return priorState, nil, err
|
|
}
|
|
|
|
if forceCloseTx == nil && closeTx != nil {
|
|
forceCloseTx = closeTx
|
|
}
|
|
|
|
// Our termination transition is a noop transition. If we get
|
|
// our prior state back as the next state, then we'll
|
|
// terminate.
|
|
if nextState == priorState {
|
|
log.Tracef("ChannelArbitrator(%v): terminating at "+
|
|
"state=%v", c.cfg.ChanPoint, nextState)
|
|
return nextState, forceCloseTx, nil
|
|
}
|
|
|
|
// As the prior state was successfully executed, we can now
|
|
// commit the next state. This ensures that we will re-execute
|
|
// the prior state if anything fails.
|
|
if err := c.log.CommitState(nextState); err != nil {
|
|
log.Errorf("ChannelArbitrator(%v): unable to commit "+
|
|
"next state(%v): %v", c.cfg.ChanPoint,
|
|
nextState, err)
|
|
return priorState, nil, err
|
|
}
|
|
c.state = nextState
|
|
}
|
|
}
|
|
|
|
// ChainAction is an enum that encompasses all possible on-chain actions
|
|
// we'll take for a set of HTLC's.
|
|
type ChainAction uint8
|
|
|
|
const (
|
|
// NoAction is the min chainAction type, indicating that no action
|
|
// needs to be taken for a given HTLC.
|
|
NoAction ChainAction = 0
|
|
|
|
// HtlcTimeoutAction indicates that the HTLC will timeout soon. As a
|
|
// result, we should get ready to sweep it on chain after the timeout.
|
|
HtlcTimeoutAction = 1
|
|
|
|
// HtlcClaimAction indicates that we should claim the HTLC on chain
|
|
// before its timeout period.
|
|
HtlcClaimAction = 2
|
|
|
|
// HtlcFailNowAction indicates that we should fail an outgoing HTLC
|
|
// immediately by cancelling it backwards as it has no corresponding
|
|
// output in our commitment transaction.
|
|
HtlcFailNowAction = 3
|
|
|
|
// HtlcOutgoingWatchAction indicates that we can't yet timeout this
|
|
// HTLC, but we had to go to chain on order to resolve an existing
|
|
// HTLC. In this case, we'll either: time it out once it expires, or
|
|
// will learn the pre-image if the remote party claims the output. In
|
|
// this case, well add the pre-image to our global store.
|
|
HtlcOutgoingWatchAction = 4
|
|
|
|
// HtlcIncomingWatchAction indicates that we don't yet have the
|
|
// pre-image to claim incoming HTLC, but we had to go to chain in order
|
|
// to resolve and existing HTLC. In this case, we'll either: let the
|
|
// other party time it out, or eventually learn of the pre-image, in
|
|
// which case we'll claim on chain.
|
|
HtlcIncomingWatchAction = 5
|
|
)
|
|
|
|
// String returns a human readable string describing a chain action.
|
|
func (c ChainAction) String() string {
|
|
switch c {
|
|
case NoAction:
|
|
return "NoAction"
|
|
|
|
case HtlcTimeoutAction:
|
|
return "HtlcTimeoutAction"
|
|
|
|
case HtlcClaimAction:
|
|
return "HtlcClaimAction"
|
|
|
|
case HtlcFailNowAction:
|
|
return "HtlcFailNowAction"
|
|
|
|
case HtlcOutgoingWatchAction:
|
|
return "HtlcOutgoingWatchAction"
|
|
|
|
case HtlcIncomingWatchAction:
|
|
return "HtlcIncomingWatchAction"
|
|
|
|
default:
|
|
return "<unknown action>"
|
|
}
|
|
}
|
|
|
|
// ChainActionMap is a map of a chain action, to the set of HTLC's that need to
|
|
// be acted upon for a given action type. The channel
|
|
type ChainActionMap map[ChainAction][]channeldb.HTLC
|
|
|
|
// Merge merges the passed chain actions with the target chain action map.
|
|
func (c ChainActionMap) Merge(actions ChainActionMap) {
|
|
for chainAction, htlcs := range actions {
|
|
c[chainAction] = append(c[chainAction], htlcs...)
|
|
}
|
|
}
|
|
|
|
// shouldGoOnChain takes into account the absolute timeout of the HTLC, if the
|
|
// confirmation delta that we need is close, and returns a bool indicating if
|
|
// we should go on chain to claim. We do this rather than waiting up until the
|
|
// last minute as we want to ensure that when we *need* (HTLC is timed out) to
|
|
// sweep, the commitment is already confirmed.
|
|
func (c *ChannelArbitrator) shouldGoOnChain(htlcExpiry, broadcastDelta,
|
|
currentHeight uint32) bool {
|
|
|
|
// We'll calculate the broadcast cut off for this HTLC. This is the
|
|
// height that (based on our current fee estimation) we should
|
|
// broadcast in order to ensure the commitment transaction is confirmed
|
|
// before the HTLC fully expires.
|
|
broadcastCutOff := htlcExpiry - broadcastDelta
|
|
|
|
log.Tracef("ChannelArbitrator(%v): examining outgoing contract: "+
|
|
"expiry=%v, cutoff=%v, height=%v", c.cfg.ChanPoint, htlcExpiry,
|
|
broadcastCutOff, currentHeight)
|
|
|
|
// TODO(roasbeef): take into account default HTLC delta, don't need to
|
|
// broadcast immediately
|
|
// * can then batch with SINGLE | ANYONECANPAY
|
|
|
|
// We should on-chain for this HTLC, iff we're within out broadcast
|
|
// cutoff window.
|
|
return currentHeight >= broadcastCutOff
|
|
}
|
|
|
|
// checkCommitChainActions is called for each new block connected to the end of
|
|
// the main chain. Given the new block height, this new method will examine all
|
|
// active HTLC's, and determine if we need to go on-chain to claim any of them.
|
|
// A map of action -> []htlc is returned, detailing what action (if any) should
|
|
// be performed for each HTLC. For timed out HTLC's, once the commitment has
|
|
// been sufficiently confirmed, the HTLC's should be canceled backwards. For
|
|
// redeemed HTLC's, we should send the pre-image back to the incoming link.
|
|
func (c *ChannelArbitrator) checkCommitChainActions(height uint32,
|
|
trigger transitionTrigger, htlcs htlcSet) (ChainActionMap, error) {
|
|
|
|
// TODO(roasbeef): would need to lock channel? channel totem?
|
|
// * race condition if adding and we broadcast, etc
|
|
// * or would make each instance sync?
|
|
|
|
log.Debugf("ChannelArbitrator(%v): checking chain actions at "+
|
|
"height=%v", c.cfg.ChanPoint, height)
|
|
|
|
actionMap := make(ChainActionMap)
|
|
|
|
// First, we'll make an initial pass over the set of incoming and
|
|
// outgoing HTLC's to decide if we need to go on chain at all.
|
|
haveChainActions := false
|
|
for _, htlc := range htlcs.outgoingHTLCs {
|
|
// We'll need to go on-chain for an outgoing HTLC if it was
|
|
// never resolved downstream, and it's "close" to timing out.
|
|
toChain := c.shouldGoOnChain(
|
|
htlc.RefundTimeout, c.cfg.OutgoingBroadcastDelta,
|
|
height,
|
|
)
|
|
|
|
if toChain {
|
|
log.Debugf("ChannelArbitrator(%v): go to chain for "+
|
|
"outgoing htlc %x: timeout=%v, "+
|
|
"blocks_until_expiry=%v, broadcast_delta=%v",
|
|
c.cfg.ChanPoint, htlc.RHash[:],
|
|
htlc.RefundTimeout, htlc.RefundTimeout-height,
|
|
c.cfg.OutgoingBroadcastDelta,
|
|
)
|
|
}
|
|
|
|
haveChainActions = haveChainActions || toChain
|
|
}
|
|
|
|
for _, htlc := range htlcs.incomingHTLCs {
|
|
// We'll need to go on-chain to pull an incoming HTLC iff we
|
|
// know the pre-image and it's close to timing out. We need to
|
|
// ensure that we claim the funds that our rightfully ours
|
|
// on-chain.
|
|
preimageAvailable, err := c.isPreimageAvailable(htlc.RHash)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if !preimageAvailable {
|
|
continue
|
|
}
|
|
|
|
toChain := c.shouldGoOnChain(
|
|
htlc.RefundTimeout, c.cfg.IncomingBroadcastDelta,
|
|
height,
|
|
)
|
|
|
|
if toChain {
|
|
log.Debugf("ChannelArbitrator(%v): go to chain for "+
|
|
"incoming htlc %x: timeout=%v, "+
|
|
"blocks_until_expiry=%v, broadcast_delta=%v",
|
|
c.cfg.ChanPoint, htlc.RHash[:],
|
|
htlc.RefundTimeout, htlc.RefundTimeout-height,
|
|
c.cfg.IncomingBroadcastDelta,
|
|
)
|
|
}
|
|
|
|
haveChainActions = haveChainActions || toChain
|
|
}
|
|
|
|
// If we don't have any actions to make, then we'll return an empty
|
|
// action map. We only do this if this was a chain trigger though, as
|
|
// if we're going to broadcast the commitment (or the remote party did)
|
|
// we're *forced* to act on each HTLC.
|
|
if !haveChainActions && trigger == chainTrigger {
|
|
log.Tracef("ChannelArbitrator(%v): no actions to take at "+
|
|
"height=%v", c.cfg.ChanPoint, height)
|
|
return actionMap, nil
|
|
}
|
|
|
|
// Now that we know we'll need to go on-chain, we'll examine all of our
|
|
// active outgoing HTLC's to see if we either need to: sweep them after
|
|
// a timeout (then cancel backwards), cancel them backwards
|
|
// immediately, or watch them as they're still active contracts.
|
|
for _, htlc := range htlcs.outgoingHTLCs {
|
|
switch {
|
|
// If the HTLC is dust, then we can cancel it backwards
|
|
// immediately as there's no matching contract to arbitrate
|
|
// on-chain. We know the HTLC is dust, if the OutputIndex
|
|
// negative.
|
|
case htlc.OutputIndex < 0:
|
|
log.Tracef("ChannelArbitrator(%v): immediately "+
|
|
"failing dust htlc=%x", c.cfg.ChanPoint,
|
|
htlc.RHash[:])
|
|
|
|
actionMap[HtlcFailNowAction] = append(
|
|
actionMap[HtlcFailNowAction], htlc,
|
|
)
|
|
|
|
// If we don't need to immediately act on this HTLC, then we'll
|
|
// mark it still "live". After we broadcast, we'll monitor it
|
|
// until the HTLC times out to see if we can also redeem it
|
|
// on-chain.
|
|
case !c.shouldGoOnChain(
|
|
htlc.RefundTimeout, c.cfg.OutgoingBroadcastDelta,
|
|
height,
|
|
):
|
|
// TODO(roasbeef): also need to be able to query
|
|
// circuit map to see if HTLC hasn't been fully
|
|
// resolved
|
|
//
|
|
// * can't fail incoming until if outgoing not yet
|
|
// failed
|
|
|
|
log.Tracef("ChannelArbitrator(%v): watching chain to "+
|
|
"decide action for outgoing htlc=%x",
|
|
c.cfg.ChanPoint, htlc.RHash[:])
|
|
|
|
actionMap[HtlcOutgoingWatchAction] = append(
|
|
actionMap[HtlcOutgoingWatchAction], htlc,
|
|
)
|
|
|
|
// Otherwise, we'll update our actionMap to mark that we need
|
|
// to sweep this HTLC on-chain
|
|
default:
|
|
log.Tracef("ChannelArbitrator(%v): going on-chain to "+
|
|
"timeout htlc=%x", c.cfg.ChanPoint, htlc.RHash[:])
|
|
|
|
actionMap[HtlcTimeoutAction] = append(
|
|
actionMap[HtlcTimeoutAction], htlc,
|
|
)
|
|
}
|
|
}
|
|
|
|
// Similarly, for each incoming HTLC, now that we need to go on-chain,
|
|
// we'll either: sweep it immediately if we know the pre-image, or
|
|
// observe the output on-chain if we don't In this last, case we'll
|
|
// either learn of it eventually from the outgoing HTLC, or the sender
|
|
// will timeout the HTLC.
|
|
for _, htlc := range htlcs.incomingHTLCs {
|
|
log.Tracef("ChannelArbitrator(%v): watching chain to decide "+
|
|
"action for incoming htlc=%x", c.cfg.ChanPoint,
|
|
htlc.RHash[:])
|
|
|
|
actionMap[HtlcIncomingWatchAction] = append(
|
|
actionMap[HtlcIncomingWatchAction], htlc,
|
|
)
|
|
}
|
|
|
|
return actionMap, nil
|
|
}
|
|
|
|
// isPreimageAvailable returns whether the hash preimage is available in either
|
|
// the preimage cache or the invoice database.
|
|
func (c *ChannelArbitrator) isPreimageAvailable(hash lntypes.Hash) (bool,
|
|
error) {
|
|
|
|
// Start by checking the preimage cache for preimages of
|
|
// forwarded HTLCs.
|
|
_, preimageAvailable := c.cfg.PreimageDB.LookupPreimage(
|
|
hash,
|
|
)
|
|
if preimageAvailable {
|
|
return true, nil
|
|
}
|
|
|
|
// Then check if we have an invoice that can be settled by this HTLC.
|
|
//
|
|
// TODO(joostjager): Check that there are still more blocks remaining
|
|
// than the invoice cltv delta. We don't want to go to chain only to
|
|
// have the incoming contest resolver decide that we don't want to
|
|
// settle this invoice.
|
|
invoice, err := c.cfg.Registry.LookupInvoice(hash)
|
|
switch err {
|
|
case nil:
|
|
case channeldb.ErrInvoiceNotFound, channeldb.ErrNoInvoicesCreated:
|
|
return false, nil
|
|
default:
|
|
return false, err
|
|
}
|
|
|
|
preimageAvailable = invoice.Terms.PaymentPreimage !=
|
|
channeldb.UnknownPreimage
|
|
|
|
return preimageAvailable, nil
|
|
}
|
|
|
|
// checkLocalChainActions is similar to checkCommitChainActions, but it also
|
|
// examines the set of HTLCs on the remote party's commitment. This allows us
|
|
// to ensure we're able to satisfy the HTLC timeout constraints for incoming vs
|
|
// outgoing HTLCs.
|
|
func (c *ChannelArbitrator) checkLocalChainActions(
|
|
height uint32, trigger transitionTrigger,
|
|
activeHTLCs map[HtlcSetKey]htlcSet,
|
|
commitsConfirmed bool) (ChainActionMap, error) {
|
|
|
|
// First, we'll check our local chain actions as normal. This will only
|
|
// examine HTLCs on our local commitment (timeout or settle).
|
|
localCommitActions, err := c.checkCommitChainActions(
|
|
height, trigger, activeHTLCs[LocalHtlcSet],
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Next, we'll examine the remote commitment (and maybe a dangling one)
|
|
// to see if the set difference of our HTLCs is non-empty. If so, then
|
|
// we may need to cancel back some HTLCs if we decide go to chain.
|
|
remoteDanglingActions := c.checkRemoteDanglingActions(
|
|
height, activeHTLCs, commitsConfirmed,
|
|
)
|
|
|
|
// Finally, we'll merge the two set of chain actions.
|
|
localCommitActions.Merge(remoteDanglingActions)
|
|
|
|
return localCommitActions, nil
|
|
}
|
|
|
|
// checkRemoteDanglingActions examines the set of remote commitments for any
|
|
// HTLCs that are close to timing out. If we find any, then we'll return a set
|
|
// of chain actions for HTLCs that are on our commitment, but not theirs to
|
|
// cancel immediately.
|
|
func (c *ChannelArbitrator) checkRemoteDanglingActions(
|
|
height uint32, activeHTLCs map[HtlcSetKey]htlcSet,
|
|
commitsConfirmed bool) ChainActionMap {
|
|
|
|
var (
|
|
pendingRemoteHTLCs []channeldb.HTLC
|
|
localHTLCs = make(map[uint64]struct{})
|
|
remoteHTLCs = make(map[uint64]channeldb.HTLC)
|
|
actionMap = make(ChainActionMap)
|
|
)
|
|
|
|
// First, we'll construct two sets of the outgoing HTLCs: those on our
|
|
// local commitment, and those that are on the remote commitment(s).
|
|
for htlcSetKey, htlcs := range activeHTLCs {
|
|
if htlcSetKey.IsRemote {
|
|
for _, htlc := range htlcs.outgoingHTLCs {
|
|
remoteHTLCs[htlc.HtlcIndex] = htlc
|
|
}
|
|
} else {
|
|
for _, htlc := range htlcs.outgoingHTLCs {
|
|
localHTLCs[htlc.HtlcIndex] = struct{}{}
|
|
}
|
|
}
|
|
}
|
|
|
|
// With both sets constructed, we'll now compute the set difference of
|
|
// our two sets of HTLCs. This'll give us the HTLCs that exist on the
|
|
// remote commitment transaction, but not on ours.
|
|
for htlcIndex, htlc := range remoteHTLCs {
|
|
if _, ok := localHTLCs[htlcIndex]; ok {
|
|
continue
|
|
}
|
|
|
|
pendingRemoteHTLCs = append(pendingRemoteHTLCs, htlc)
|
|
}
|
|
|
|
// Finally, we'll examine all the pending remote HTLCs for those that
|
|
// have expired. If we find any, then we'll recommend that they be
|
|
// failed now so we can free up the incoming HTLC.
|
|
for _, htlc := range pendingRemoteHTLCs {
|
|
// We'll now check if we need to go to chain in order to cancel
|
|
// the incoming HTLC.
|
|
goToChain := c.shouldGoOnChain(
|
|
htlc.RefundTimeout, c.cfg.OutgoingBroadcastDelta,
|
|
height,
|
|
)
|
|
|
|
// If we don't need to go to chain, and no commitments have
|
|
// been confirmed, then we can move on. Otherwise, if
|
|
// commitments have been confirmed, then we need to cancel back
|
|
// *all* of the pending remote HTLCS.
|
|
if !goToChain && !commitsConfirmed {
|
|
continue
|
|
}
|
|
|
|
log.Tracef("ChannelArbitrator(%v): immediately failing "+
|
|
"htlc=%x from remote commitment",
|
|
c.cfg.ChanPoint, htlc.RHash[:])
|
|
|
|
actionMap[HtlcFailNowAction] = append(
|
|
actionMap[HtlcFailNowAction], htlc,
|
|
)
|
|
}
|
|
|
|
return actionMap
|
|
}
|
|
|
|
// checkRemoteChainActions examines the two possible remote commitment chains
|
|
// and returns the set of chain actions we need to carry out if the remote
|
|
// commitment (non pending) confirms. The pendingConf indicates if the pending
|
|
// remote commitment confirmed. This is similar to checkCommitChainActions, but
|
|
// we'll immediately fail any HTLCs on the pending remote commit, but not the
|
|
// remote commit (or the other way around).
|
|
func (c *ChannelArbitrator) checkRemoteChainActions(
|
|
height uint32, trigger transitionTrigger,
|
|
activeHTLCs map[HtlcSetKey]htlcSet,
|
|
pendingConf bool) (ChainActionMap, error) {
|
|
|
|
// First, we'll examine all the normal chain actions on the remote
|
|
// commitment that confirmed.
|
|
confHTLCs := activeHTLCs[RemoteHtlcSet]
|
|
if pendingConf {
|
|
confHTLCs = activeHTLCs[RemotePendingHtlcSet]
|
|
}
|
|
remoteCommitActions, err := c.checkCommitChainActions(
|
|
height, trigger, confHTLCs,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// With this actions computed, we'll now check the diff of the HTLCs on
|
|
// the commitments, and cancel back any that are on the pending but not
|
|
// the non-pending.
|
|
remoteDiffActions := c.checkRemoteDiffActions(
|
|
height, activeHTLCs, pendingConf,
|
|
)
|
|
|
|
// Finally, we'll merge all the chain actions and the final set of
|
|
// chain actions.
|
|
remoteCommitActions.Merge(remoteDiffActions)
|
|
return remoteCommitActions, nil
|
|
}
|
|
|
|
// checkRemoteDiffActions checks the set difference of the HTLCs on the remote
|
|
// confirmed commit and remote dangling commit for HTLCS that we need to cancel
|
|
// back. If we find any HTLCs on the remote pending but not the remote, then
|
|
// we'll mark them to be failed immediately.
|
|
func (c *ChannelArbitrator) checkRemoteDiffActions(height uint32,
|
|
activeHTLCs map[HtlcSetKey]htlcSet,
|
|
pendingConf bool) ChainActionMap {
|
|
|
|
// First, we'll partition the HTLCs into those that are present on the
|
|
// confirmed commitment, and those on the dangling commitment.
|
|
confHTLCs := activeHTLCs[RemoteHtlcSet]
|
|
danglingHTLCs := activeHTLCs[RemotePendingHtlcSet]
|
|
if pendingConf {
|
|
confHTLCs = activeHTLCs[RemotePendingHtlcSet]
|
|
danglingHTLCs = activeHTLCs[RemoteHtlcSet]
|
|
}
|
|
|
|
// Next, we'll create a set of all the HTLCs confirmed commitment.
|
|
remoteHtlcs := make(map[uint64]struct{})
|
|
for _, htlc := range confHTLCs.outgoingHTLCs {
|
|
remoteHtlcs[htlc.HtlcIndex] = struct{}{}
|
|
}
|
|
|
|
// With the remote HTLCs assembled, we'll mark any HTLCs only on the
|
|
// remote dangling commitment to be failed asap.
|
|
actionMap := make(ChainActionMap)
|
|
for _, htlc := range danglingHTLCs.outgoingHTLCs {
|
|
if _, ok := remoteHtlcs[htlc.HtlcIndex]; ok {
|
|
continue
|
|
}
|
|
|
|
actionMap[HtlcFailNowAction] = append(
|
|
actionMap[HtlcFailNowAction], htlc,
|
|
)
|
|
|
|
log.Tracef("ChannelArbitrator(%v): immediately failing "+
|
|
"htlc=%x from remote commitment",
|
|
c.cfg.ChanPoint, htlc.RHash[:])
|
|
}
|
|
|
|
return actionMap
|
|
}
|
|
|
|
// constructChainActions returns the set of actions that should be taken for
|
|
// confirmed HTLCs at the specified height. Our actions will depend on the set
|
|
// of HTLCs that were active across all channels at the time of channel
|
|
// closure.
|
|
func (c *ChannelArbitrator) constructChainActions(confCommitSet *CommitSet,
|
|
height uint32, trigger transitionTrigger) (ChainActionMap, error) {
|
|
|
|
// If we've reached this point and have not confirmed commitment set,
|
|
// then this is an older node that had a pending close channel before
|
|
// the CommitSet was introduced. In this case, we'll just return the
|
|
// existing ChainActionMap they had on disk.
|
|
if confCommitSet == nil {
|
|
return c.log.FetchChainActions()
|
|
}
|
|
|
|
// Otherwise we have the full commitment set written to disk, and can
|
|
// proceed as normal.
|
|
htlcSets := confCommitSet.toActiveHTLCSets()
|
|
switch *confCommitSet.ConfCommitKey {
|
|
|
|
// If the local commitment transaction confirmed, then we'll examine
|
|
// that as well as their commitments to the set of chain actions.
|
|
case LocalHtlcSet:
|
|
return c.checkLocalChainActions(
|
|
height, trigger, htlcSets, true,
|
|
)
|
|
|
|
// If the remote commitment confirmed, then we'll grab all the chain
|
|
// actions for the remote commit, and check the pending commit for any
|
|
// HTLCS we need to handle immediately (dust).
|
|
case RemoteHtlcSet:
|
|
return c.checkRemoteChainActions(
|
|
height, trigger, htlcSets, false,
|
|
)
|
|
|
|
// Otherwise, the remote pending commitment confirmed, so we'll examine
|
|
// the HTLCs on that unrevoked dangling commitment.
|
|
case RemotePendingHtlcSet:
|
|
return c.checkRemoteChainActions(
|
|
height, trigger, htlcSets, true,
|
|
)
|
|
}
|
|
|
|
return nil, fmt.Errorf("unable to locate chain actions")
|
|
}
|
|
|
|
// prepContractResolutions is called either int he case that we decide we need
|
|
// to go to chain, or the remote party goes to chain. Given a set of actions we
|
|
// need to take for each HTLC, this method will return a set of contract
|
|
// resolvers that will resolve the contracts on-chain if needed, and also a set
|
|
// of packets to send to the htlcswitch in order to ensure all incoming HTLC's
|
|
// are properly resolved.
|
|
func (c *ChannelArbitrator) prepContractResolutions(
|
|
contractResolutions *ContractResolutions, height uint32,
|
|
trigger transitionTrigger,
|
|
confCommitSet *CommitSet) ([]ContractResolver, []ResolutionMsg, error) {
|
|
|
|
// First, we'll reconstruct a fresh set of chain actions as the set of
|
|
// actions we need to act on may differ based on if it was our
|
|
// commitment, or they're commitment that hit the chain.
|
|
htlcActions, err := c.constructChainActions(
|
|
confCommitSet, height, trigger,
|
|
)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// There may be a class of HTLC's which we can fail back immediately,
|
|
// for those we'll prepare a slice of packets to add to our outbox. Any
|
|
// packets we need to send, will be cancels.
|
|
var (
|
|
msgsToSend []ResolutionMsg
|
|
)
|
|
|
|
incomingResolutions := contractResolutions.HtlcResolutions.IncomingHTLCs
|
|
outgoingResolutions := contractResolutions.HtlcResolutions.OutgoingHTLCs
|
|
|
|
// We'll use these two maps to quickly look up an active HTLC with its
|
|
// matching HTLC resolution.
|
|
outResolutionMap := make(map[wire.OutPoint]lnwallet.OutgoingHtlcResolution)
|
|
inResolutionMap := make(map[wire.OutPoint]lnwallet.IncomingHtlcResolution)
|
|
for i := 0; i < len(incomingResolutions); i++ {
|
|
inRes := incomingResolutions[i]
|
|
inResolutionMap[inRes.HtlcPoint()] = inRes
|
|
}
|
|
for i := 0; i < len(outgoingResolutions); i++ {
|
|
outRes := outgoingResolutions[i]
|
|
outResolutionMap[outRes.HtlcPoint()] = outRes
|
|
}
|
|
|
|
// We'll create the resolver kit that we'll be cloning for each
|
|
// resolver so they each can do their duty.
|
|
resKit := ResolverKit{
|
|
ChannelArbitratorConfig: c.cfg,
|
|
Checkpoint: func(res ContractResolver) error {
|
|
return c.log.InsertUnresolvedContracts(res)
|
|
},
|
|
}
|
|
|
|
commitHash := contractResolutions.CommitHash
|
|
failureMsg := &lnwire.FailPermanentChannelFailure{}
|
|
|
|
// For each HTLC, we'll either act immediately, meaning we'll instantly
|
|
// fail the HTLC, or we'll act only once the transaction has been
|
|
// confirmed, in which case we'll need an HTLC resolver.
|
|
var htlcResolvers []ContractResolver
|
|
for htlcAction, htlcs := range htlcActions {
|
|
switch htlcAction {
|
|
|
|
// If we can fail an HTLC immediately (an outgoing HTLC with no
|
|
// contract), then we'll assemble an HTLC fail packet to send.
|
|
case HtlcFailNowAction:
|
|
for _, htlc := range htlcs {
|
|
failMsg := ResolutionMsg{
|
|
SourceChan: c.cfg.ShortChanID,
|
|
HtlcIndex: htlc.HtlcIndex,
|
|
Failure: failureMsg,
|
|
}
|
|
|
|
msgsToSend = append(msgsToSend, failMsg)
|
|
}
|
|
|
|
// If we can claim this HTLC, we'll create an HTLC resolver to
|
|
// claim the HTLC (second-level or directly), then add the pre
|
|
case HtlcClaimAction:
|
|
for _, htlc := range htlcs {
|
|
htlcOp := wire.OutPoint{
|
|
Hash: commitHash,
|
|
Index: uint32(htlc.OutputIndex),
|
|
}
|
|
|
|
resolution, ok := inResolutionMap[htlcOp]
|
|
if !ok {
|
|
// TODO(roasbeef): panic?
|
|
log.Errorf("ChannelArbitrator(%v) unable to find "+
|
|
"incoming resolution: %v",
|
|
c.cfg.ChanPoint, htlcOp)
|
|
continue
|
|
}
|
|
|
|
resKit.Quit = make(chan struct{})
|
|
resolver := &htlcSuccessResolver{
|
|
htlcResolution: resolution,
|
|
broadcastHeight: height,
|
|
payHash: htlc.RHash,
|
|
htlcAmt: htlc.Amt,
|
|
ResolverKit: resKit,
|
|
}
|
|
htlcResolvers = append(htlcResolvers, resolver)
|
|
}
|
|
|
|
// If we can timeout the HTLC directly, then we'll create the
|
|
// proper resolver to do so, who will then cancel the packet
|
|
// backwards.
|
|
case HtlcTimeoutAction:
|
|
for _, htlc := range htlcs {
|
|
htlcOp := wire.OutPoint{
|
|
Hash: commitHash,
|
|
Index: uint32(htlc.OutputIndex),
|
|
}
|
|
|
|
resolution, ok := outResolutionMap[htlcOp]
|
|
if !ok {
|
|
log.Errorf("ChannelArbitrator(%v) unable to find "+
|
|
"outgoing resolution: %v", c.cfg.ChanPoint, htlcOp)
|
|
continue
|
|
}
|
|
|
|
resKit.Quit = make(chan struct{})
|
|
resolver := &htlcTimeoutResolver{
|
|
htlcResolution: resolution,
|
|
broadcastHeight: height,
|
|
htlcIndex: htlc.HtlcIndex,
|
|
htlcAmt: htlc.Amt,
|
|
ResolverKit: resKit,
|
|
}
|
|
htlcResolvers = append(htlcResolvers, resolver)
|
|
}
|
|
|
|
// If this is an incoming HTLC, but we can't act yet, then
|
|
// we'll create an incoming resolver to redeem the HTLC if we
|
|
// learn of the pre-image, or let the remote party time out.
|
|
case HtlcIncomingWatchAction:
|
|
for _, htlc := range htlcs {
|
|
htlcOp := wire.OutPoint{
|
|
Hash: commitHash,
|
|
Index: uint32(htlc.OutputIndex),
|
|
}
|
|
|
|
// TODO(roasbeef): need to handle incoming dust...
|
|
|
|
// TODO(roasbeef): can't be negative!!!
|
|
resolution, ok := inResolutionMap[htlcOp]
|
|
if !ok {
|
|
log.Errorf("ChannelArbitrator(%v) unable to find "+
|
|
"incoming resolution: %v",
|
|
c.cfg.ChanPoint, htlcOp)
|
|
continue
|
|
}
|
|
|
|
circuitKey := channeldb.CircuitKey{
|
|
HtlcID: htlc.HtlcIndex,
|
|
ChanID: c.cfg.ShortChanID,
|
|
}
|
|
|
|
resKit.Quit = make(chan struct{})
|
|
resolver := &htlcIncomingContestResolver{
|
|
htlcExpiry: htlc.RefundTimeout,
|
|
circuitKey: circuitKey,
|
|
htlcSuccessResolver: htlcSuccessResolver{
|
|
htlcResolution: resolution,
|
|
broadcastHeight: height,
|
|
payHash: htlc.RHash,
|
|
htlcAmt: htlc.Amt,
|
|
ResolverKit: resKit,
|
|
},
|
|
}
|
|
htlcResolvers = append(htlcResolvers, resolver)
|
|
}
|
|
|
|
// Finally, if this is an outgoing HTLC we've sent, then we'll
|
|
// launch a resolver to watch for the pre-image (and settle
|
|
// backwards), or just timeout.
|
|
case HtlcOutgoingWatchAction:
|
|
for _, htlc := range htlcs {
|
|
htlcOp := wire.OutPoint{
|
|
Hash: commitHash,
|
|
Index: uint32(htlc.OutputIndex),
|
|
}
|
|
|
|
resolution, ok := outResolutionMap[htlcOp]
|
|
if !ok {
|
|
log.Errorf("ChannelArbitrator(%v) unable to find "+
|
|
"outgoing resolution: %v",
|
|
c.cfg.ChanPoint, htlcOp)
|
|
continue
|
|
}
|
|
|
|
resKit.Quit = make(chan struct{})
|
|
resolver := &htlcOutgoingContestResolver{
|
|
htlcTimeoutResolver: htlcTimeoutResolver{
|
|
htlcResolution: resolution,
|
|
broadcastHeight: height,
|
|
htlcIndex: htlc.HtlcIndex,
|
|
htlcAmt: htlc.Amt,
|
|
ResolverKit: resKit,
|
|
},
|
|
}
|
|
htlcResolvers = append(htlcResolvers, resolver)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Finally, if this is was a unilateral closure, then we'll also create
|
|
// a resolver to sweep our commitment output (but only if it wasn't
|
|
// trimmed).
|
|
if contractResolutions.CommitResolution != nil {
|
|
resKit.Quit = make(chan struct{})
|
|
resolver := &commitSweepResolver{
|
|
commitResolution: *contractResolutions.CommitResolution,
|
|
broadcastHeight: height,
|
|
chanPoint: c.cfg.ChanPoint,
|
|
ResolverKit: resKit,
|
|
}
|
|
|
|
htlcResolvers = append(htlcResolvers, resolver)
|
|
}
|
|
|
|
return htlcResolvers, msgsToSend, nil
|
|
}
|
|
|
|
// replaceResolver replaces a in the list of active resolvers. If the resolver
|
|
// to be replaced is not found, it returns an error.
|
|
func (c *ChannelArbitrator) replaceResolver(oldResolver,
|
|
newResolver ContractResolver) error {
|
|
|
|
c.activeResolversLock.Lock()
|
|
defer c.activeResolversLock.Unlock()
|
|
|
|
oldKey := oldResolver.ResolverKey()
|
|
for i, r := range c.activeResolvers {
|
|
if bytes.Equal(r.ResolverKey(), oldKey) {
|
|
c.activeResolvers[i] = newResolver
|
|
return nil
|
|
}
|
|
}
|
|
|
|
return errors.New("resolver to be replaced not found")
|
|
}
|
|
|
|
// resolveContract is a goroutine tasked with fully resolving an unresolved
|
|
// contract. Either the initial contract will be resolved after a single step,
|
|
// or the contract will itself create another contract to be resolved. In
|
|
// either case, one the contract has been fully resolved, we'll signal back to
|
|
// the main goroutine so it can properly keep track of the set of unresolved
|
|
// contracts.
|
|
//
|
|
// NOTE: This MUST be run as a goroutine.
|
|
func (c *ChannelArbitrator) resolveContract(currentContract ContractResolver) {
|
|
defer c.wg.Done()
|
|
|
|
log.Debugf("ChannelArbitrator(%v): attempting to resolve %T",
|
|
c.cfg.ChanPoint, currentContract)
|
|
|
|
// Until the contract is fully resolved, we'll continue to iteratively
|
|
// resolve the contract one step at a time.
|
|
for !currentContract.IsResolved() {
|
|
log.Debugf("ChannelArbitrator(%v): contract %T not yet resolved",
|
|
c.cfg.ChanPoint, currentContract)
|
|
|
|
select {
|
|
|
|
// If we've been signalled to quit, then we'll exit early.
|
|
case <-c.quit:
|
|
return
|
|
|
|
default:
|
|
// Otherwise, we'll attempt to resolve the current
|
|
// contract.
|
|
nextContract, err := currentContract.Resolve()
|
|
if err != nil {
|
|
if err == errResolverShuttingDown {
|
|
return
|
|
}
|
|
|
|
log.Errorf("ChannelArbitrator(%v): unable to "+
|
|
"progress resolver: %v",
|
|
c.cfg.ChanPoint, err)
|
|
return
|
|
}
|
|
|
|
switch {
|
|
// If this contract produced another, then this means
|
|
// the current contract was only able to be partially
|
|
// resolved in this step. So we'll not a contract swap
|
|
// within our logs: the new contract will take the
|
|
// place of the old one.
|
|
case nextContract != nil:
|
|
log.Debugf("ChannelArbitrator(%v): swapping "+
|
|
"out contract %T for %T ",
|
|
c.cfg.ChanPoint, currentContract,
|
|
nextContract)
|
|
|
|
// Swap contract in log.
|
|
err := c.log.SwapContract(
|
|
currentContract, nextContract,
|
|
)
|
|
if err != nil {
|
|
log.Errorf("unable to add recurse "+
|
|
"contract: %v", err)
|
|
}
|
|
|
|
// Swap contract in resolvers list. This is to
|
|
// make sure that reports are queried from the
|
|
// new resolver.
|
|
err = c.replaceResolver(
|
|
currentContract, nextContract,
|
|
)
|
|
if err != nil {
|
|
log.Errorf("unable to replace "+
|
|
"contract: %v", err)
|
|
}
|
|
|
|
// As this contract produced another, we'll
|
|
// re-assign, so we can continue our resolution
|
|
// loop.
|
|
currentContract = nextContract
|
|
|
|
// If this contract is actually fully resolved, then
|
|
// we'll mark it as such within the database.
|
|
case currentContract.IsResolved():
|
|
log.Debugf("ChannelArbitrator(%v): marking "+
|
|
"contract %T fully resolved",
|
|
c.cfg.ChanPoint, currentContract)
|
|
|
|
err := c.log.ResolveContract(currentContract)
|
|
if err != nil {
|
|
log.Errorf("unable to resolve contract: %v",
|
|
err)
|
|
}
|
|
|
|
// Now that the contract has been resolved,
|
|
// well signal to the main goroutine.
|
|
select {
|
|
case c.resolutionSignal <- struct{}{}:
|
|
case <-c.quit:
|
|
return
|
|
}
|
|
}
|
|
|
|
}
|
|
}
|
|
}
|
|
|
|
// signalUpdateMsg is a struct that carries fresh signals to the
|
|
// ChannelArbitrator. We need to receive a message like this each time the
|
|
// channel becomes active, as it's internal state may change.
|
|
type signalUpdateMsg struct {
|
|
// newSignals is the set of new active signals to be sent to the
|
|
// arbitrator.
|
|
newSignals *ContractSignals
|
|
|
|
// doneChan is a channel that will be closed on the arbitrator has
|
|
// attached the new signals.
|
|
doneChan chan struct{}
|
|
}
|
|
|
|
// UpdateContractSignals updates the set of signals the ChannelArbitrator needs
|
|
// to receive from a channel in real-time in order to keep in sync with the
|
|
// latest state of the contract.
|
|
func (c *ChannelArbitrator) UpdateContractSignals(newSignals *ContractSignals) {
|
|
done := make(chan struct{})
|
|
|
|
select {
|
|
case c.signalUpdates <- &signalUpdateMsg{
|
|
newSignals: newSignals,
|
|
doneChan: done,
|
|
}:
|
|
case <-c.quit:
|
|
}
|
|
|
|
select {
|
|
case <-done:
|
|
case <-c.quit:
|
|
}
|
|
}
|
|
|
|
// channelAttendant is the primary goroutine that acts at the judicial
|
|
// arbitrator between our channel state, the remote channel peer, and the
|
|
// blockchain Our judge). This goroutine will ensure that we faithfully execute
|
|
// all clauses of our contract in the case that we need to go on-chain for a
|
|
// dispute. Currently, two such conditions warrant our intervention: when an
|
|
// outgoing HTLC is about to timeout, and when we know the pre-image for an
|
|
// incoming HTLC, but it hasn't yet been settled off-chain. In these cases,
|
|
// we'll: broadcast our commitment, cancel/settle any HTLC's backwards after
|
|
// sufficient confirmation, and finally send our set of outputs to the UTXO
|
|
// Nursery for incubation, and ultimate sweeping.
|
|
//
|
|
// NOTE: This MUST be run as a goroutine.
|
|
func (c *ChannelArbitrator) channelAttendant(bestHeight int32) {
|
|
|
|
// TODO(roasbeef): tell top chain arb we're done
|
|
defer func() {
|
|
c.cfg.BlockEpochs.Cancel()
|
|
c.wg.Done()
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
|
|
// A new block has arrived, we'll examine all the active HTLC's
|
|
// to see if any of them have expired, and also update our
|
|
// track of the best current height.
|
|
case blockEpoch, ok := <-c.cfg.BlockEpochs.Epochs:
|
|
if !ok {
|
|
return
|
|
}
|
|
bestHeight = blockEpoch.Height
|
|
|
|
// If we're not in the default state, then we can
|
|
// ignore this signal as we're waiting for contract
|
|
// resolution.
|
|
if c.state != StateDefault {
|
|
continue
|
|
}
|
|
|
|
// Now that a new block has arrived, we'll attempt to
|
|
// advance our state forward.
|
|
nextState, _, err := c.advanceState(
|
|
uint32(bestHeight), chainTrigger, nil,
|
|
)
|
|
if err != nil {
|
|
log.Errorf("unable to advance state: %v", err)
|
|
}
|
|
|
|
// If as a result of this trigger, the contract is
|
|
// fully resolved, then well exit.
|
|
if nextState == StateFullyResolved {
|
|
return
|
|
}
|
|
|
|
// A new signal update was just sent. This indicates that the
|
|
// channel under watch is now live, and may modify its internal
|
|
// state, so we'll get the most up to date signals to we can
|
|
// properly do our job.
|
|
case signalUpdate := <-c.signalUpdates:
|
|
log.Tracef("ChannelArbitrator(%v) got new signal "+
|
|
"update!", c.cfg.ChanPoint)
|
|
|
|
// First, we'll update our set of signals.
|
|
c.htlcUpdates = signalUpdate.newSignals.HtlcUpdates
|
|
c.cfg.ShortChanID = signalUpdate.newSignals.ShortChanID
|
|
|
|
// Now that the signals have been updated, we'll now
|
|
// close the done channel to signal to the caller we've
|
|
// registered the new contracts.
|
|
close(signalUpdate.doneChan)
|
|
|
|
// A new set of HTLC's has been added or removed from the
|
|
// commitment transaction. So we'll update our activeHTLCs map
|
|
// accordingly.
|
|
case htlcUpdate := <-c.htlcUpdates:
|
|
// We'll wipe out our old set of HTLC's for each
|
|
// htlcSetKey type included in this update in order to
|
|
// only monitor the HTLCs that are still active on this
|
|
// target commitment.
|
|
c.activeHTLCs[htlcUpdate.HtlcKey] = newHtlcSet(
|
|
htlcUpdate.Htlcs,
|
|
)
|
|
|
|
log.Tracef("ChannelArbitrator(%v): fresh set of htlcs=%v",
|
|
c.cfg.ChanPoint,
|
|
newLogClosure(func() string {
|
|
return spew.Sdump(htlcUpdate)
|
|
}),
|
|
)
|
|
|
|
// We've cooperatively closed the channel, so we're no longer
|
|
// needed. We'll mark the channel as resolved and exit.
|
|
case closeInfo := <-c.cfg.ChainEvents.CooperativeClosure:
|
|
log.Infof("ChannelArbitrator(%v) marking channel "+
|
|
"cooperatively closed", c.cfg.ChanPoint)
|
|
|
|
err := c.cfg.MarkChannelClosed(
|
|
closeInfo.ChannelCloseSummary,
|
|
)
|
|
if err != nil {
|
|
log.Errorf("unable to mark channel closed: "+
|
|
"%v", err)
|
|
return
|
|
}
|
|
|
|
// We'll now advance our state machine until it reaches
|
|
// a terminal state, and the channel is marked resolved.
|
|
_, _, err = c.advanceState(
|
|
closeInfo.CloseHeight, coopCloseTrigger, nil,
|
|
)
|
|
if err != nil {
|
|
log.Errorf("unable to advance state: %v", err)
|
|
return
|
|
}
|
|
|
|
// We have broadcasted our commitment, and it is now confirmed
|
|
// on-chain.
|
|
case closeInfo := <-c.cfg.ChainEvents.LocalUnilateralClosure:
|
|
log.Infof("ChannelArbitrator(%v): local on-chain "+
|
|
"channel close", c.cfg.ChanPoint)
|
|
|
|
if c.state != StateCommitmentBroadcasted {
|
|
log.Errorf("ChannelArbitrator(%v): unexpected "+
|
|
"local on-chain channel close",
|
|
c.cfg.ChanPoint)
|
|
}
|
|
closeTx := closeInfo.CloseTx
|
|
|
|
contractRes := &ContractResolutions{
|
|
CommitHash: closeTx.TxHash(),
|
|
CommitResolution: closeInfo.CommitResolution,
|
|
HtlcResolutions: *closeInfo.HtlcResolutions,
|
|
}
|
|
|
|
// When processing a unilateral close event, we'll
|
|
// transition to the ContractClosed state. We'll log
|
|
// out the set of resolutions such that they are
|
|
// available to fetch in that state, we'll also write
|
|
// the commit set so we can reconstruct our chain
|
|
// actions on restart.
|
|
err := c.log.LogContractResolutions(contractRes)
|
|
if err != nil {
|
|
log.Errorf("unable to write resolutions: %v",
|
|
err)
|
|
return
|
|
}
|
|
err = c.log.InsertConfirmedCommitSet(
|
|
&closeInfo.CommitSet,
|
|
)
|
|
if err != nil {
|
|
log.Errorf("unable to write commit set: %v", err)
|
|
return
|
|
}
|
|
|
|
// After the set of resolutions are successfully
|
|
// logged, we can safely close the channel. After this
|
|
// succeeds we won't be getting chain events anymore,
|
|
// so we must make sure we can recover on restart after
|
|
// it is marked closed. If the next state transition
|
|
// fails, we'll start up in the prior state again, and
|
|
// we won't be longer getting chain events. In this
|
|
// case we must manually re-trigger the state
|
|
// transition into StateContractClosed based on the
|
|
// close status of the channel.
|
|
err = c.cfg.MarkChannelClosed(
|
|
closeInfo.ChannelCloseSummary,
|
|
)
|
|
if err != nil {
|
|
log.Errorf("unable to mark "+
|
|
"channel closed: %v", err)
|
|
return
|
|
}
|
|
|
|
// We'll now advance our state machine until it reaches
|
|
// a terminal state.
|
|
_, _, err = c.advanceState(
|
|
uint32(closeInfo.SpendingHeight),
|
|
localCloseTrigger, &closeInfo.CommitSet,
|
|
)
|
|
if err != nil {
|
|
log.Errorf("unable to advance state: %v", err)
|
|
}
|
|
|
|
// The remote party has broadcast the commitment on-chain.
|
|
// We'll examine our state to determine if we need to act at
|
|
// all.
|
|
case uniClosure := <-c.cfg.ChainEvents.RemoteUnilateralClosure:
|
|
log.Infof("ChannelArbitrator(%v): remote party has "+
|
|
"closed channel out on-chain", c.cfg.ChanPoint)
|
|
|
|
// If we don't have a self output, and there are no
|
|
// active HTLC's, then we can immediately mark the
|
|
// contract as fully resolved and exit.
|
|
contractRes := &ContractResolutions{
|
|
CommitHash: *uniClosure.SpenderTxHash,
|
|
CommitResolution: uniClosure.CommitResolution,
|
|
HtlcResolutions: *uniClosure.HtlcResolutions,
|
|
}
|
|
|
|
// When processing a unilateral close event, we'll
|
|
// transition to the ContractClosed state. We'll log
|
|
// out the set of resolutions such that they are
|
|
// available to fetch in that state, we'll also write
|
|
// the commit set so we can reconstruct our chain
|
|
// actions on restart.
|
|
err := c.log.LogContractResolutions(contractRes)
|
|
if err != nil {
|
|
log.Errorf("unable to write resolutions: %v",
|
|
err)
|
|
return
|
|
}
|
|
err = c.log.InsertConfirmedCommitSet(
|
|
&uniClosure.CommitSet,
|
|
)
|
|
if err != nil {
|
|
log.Errorf("unable to write commit set: %v", err)
|
|
return
|
|
}
|
|
|
|
// After the set of resolutions are successfully
|
|
// logged, we can safely close the channel. After this
|
|
// succeeds we won't be getting chain events anymore,
|
|
// so we must make sure we can recover on restart after
|
|
// it is marked closed. If the next state transition
|
|
// fails, we'll start up in the prior state again, and
|
|
// we won't be longer getting chain events. In this
|
|
// case we must manually re-trigger the state
|
|
// transition into StateContractClosed based on the
|
|
// close status of the channel.
|
|
closeSummary := &uniClosure.ChannelCloseSummary
|
|
err = c.cfg.MarkChannelClosed(closeSummary)
|
|
if err != nil {
|
|
log.Errorf("unable to mark channel closed: %v",
|
|
err)
|
|
return
|
|
}
|
|
|
|
// We'll now advance our state machine until it reaches
|
|
// a terminal state.
|
|
_, _, err = c.advanceState(
|
|
uint32(uniClosure.SpendingHeight),
|
|
remoteCloseTrigger, &uniClosure.CommitSet,
|
|
)
|
|
if err != nil {
|
|
log.Errorf("unable to advance state: %v", err)
|
|
}
|
|
|
|
// A new contract has just been resolved, we'll now check our
|
|
// log to see if all contracts have been resolved. If so, then
|
|
// we can exit as the contract is fully resolved.
|
|
case <-c.resolutionSignal:
|
|
log.Infof("ChannelArbitrator(%v): a contract has been "+
|
|
"fully resolved!", c.cfg.ChanPoint)
|
|
|
|
nextState, _, err := c.advanceState(
|
|
uint32(bestHeight), chainTrigger, nil,
|
|
)
|
|
if err != nil {
|
|
log.Errorf("unable to advance state: %v", err)
|
|
}
|
|
|
|
// If we don't have anything further to do after
|
|
// advancing our state, then we'll exit.
|
|
if nextState == StateFullyResolved {
|
|
log.Infof("ChannelArbitrator(%v): all "+
|
|
"contracts fully resolved, exiting",
|
|
c.cfg.ChanPoint)
|
|
|
|
return
|
|
}
|
|
|
|
// We've just received a request to forcibly close out the
|
|
// channel. We'll
|
|
case closeReq := <-c.forceCloseReqs:
|
|
if c.state != StateDefault {
|
|
select {
|
|
case closeReq.closeTx <- nil:
|
|
case <-c.quit:
|
|
}
|
|
|
|
select {
|
|
case closeReq.errResp <- errAlreadyForceClosed:
|
|
case <-c.quit:
|
|
}
|
|
|
|
continue
|
|
}
|
|
|
|
nextState, closeTx, err := c.advanceState(
|
|
uint32(bestHeight), userTrigger, nil,
|
|
)
|
|
if err != nil {
|
|
log.Errorf("unable to advance state: %v", err)
|
|
}
|
|
|
|
select {
|
|
case closeReq.closeTx <- closeTx:
|
|
case <-c.quit:
|
|
return
|
|
}
|
|
|
|
select {
|
|
case closeReq.errResp <- err:
|
|
case <-c.quit:
|
|
return
|
|
}
|
|
|
|
// If we don't have anything further to do after
|
|
// advancing our state, then we'll exit.
|
|
if nextState == StateFullyResolved {
|
|
log.Infof("ChannelArbitrator(%v): all "+
|
|
"contracts resolved, exiting",
|
|
c.cfg.ChanPoint)
|
|
return
|
|
}
|
|
|
|
case <-c.quit:
|
|
return
|
|
}
|
|
}
|
|
}
|