lnd.xprv/contractcourt/channel_arbitrator.go
Olaoluwa Osuntokun 5df6704a9c
contractcourt: make synchronous chain watcher notifications optional
In this commit, we modify the way that notifications are dispatched
within the chainWatcher. Before we would *always* wait for an ack back
before we started to clean up he database state. This would at times
lead to deadlocks. To remedy this, we now allow callers to decide if
they want notifications to be sync or not. The only current caller that
requires this is the breach arbiter.
2018-01-22 19:19:58 -08:00

1531 lines
50 KiB
Go

package contractcourt
import (
"fmt"
"sync"
"sync/atomic"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/chaincfg/chainhash"
"github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil"
)
const (
// broadcastRedeemMultiplier is the additional factor that we'll scale
// the normal broadcastDelta by when deciding whether or not to
// broadcast a commitment to claim an HTLC on-chain. We use a scaled
// value, as when redeeming we want to ensure that we have enough time
// to redeem the HTLC, well before it times out.
broadcastRedeemMultiplier = 2
)
// WitnessSubcription represents an intent to be notified once new witnesses
// are discovered by various active contract resolvers. A contract resolver may
// use this to be notified of when it can satisfy an incoming contract after we
// discover the witness for an outgoing contract.
type WitnessSubcription struct {
// WitnessUpdates is a channel that newly discovered witnesses will be
// sent over.
//
// TODO(roasbef): couple with WitnessType?
WitnessUpdates <-chan []byte
// CancelSubcription is a function closure that should be used by a
// client to cancel the subscription once they are no longer interested
// in receiving new updates.
CancelSubcription func()
}
// WitnessBeacon is a global beacon of witnesses. Contract resolvers will use
// this interface to lookup witnesses (preimages typically) of contracts
// they're trying to resolver, add new preimages they resolver, and finally
// receive new updates each new time a preimage is discovered.
//
// TODO(roasbeef): need to delete the pre-images once we've used them
// and have been sufficiently confirmed?
type WitnessBeacon interface {
// SubcribeUpdates returns a channel that will be sent upon *each* time
// a new preimage is discovered.
SubcribeUpdates() *WitnessSubcription
// LookupPreImage attempts to lookup a preimage in the global cache.
// True is returned for the second argument if the preimage is found.
LookupPreimage(payhash []byte) ([]byte, bool)
// AddPreImage adds a newly discovered preimage to the global cache.
AddPreimage(pre []byte) error
}
// ChannelArbitratorConfig contains all the functionality that the
// ChannelArbitrator needs in order to properly arbitrate any contract dispute
// on chain.
type ChannelArbitratorConfig struct {
// ChanPoint is the channel point that uniquely identifies this
// channel.
ChanPoint wire.OutPoint
// ShortChanID describes the exact location of the channel within the
// chain. We'll use this to address any messages that we need to send
// to the switch during contract resolution.
ShortChanID lnwire.ShortChannelID
// BlockEpochs is an active block epoch event stream backed by an
// active ChainNotifier instance. We will use new block notifications
// sent over this channel to decide when we should go on chain to
// reclaim/redeem the funds in an HTLC sent to/from us.
BlockEpochs *chainntnfs.BlockEpochEvent
// ChainEvents is an active subscription to the chain watcher for this
// channel to be notified of any on-chain activity related to this
// channel.
ChainEvents *ChainEventSubscription
// ForceCloseChan should force close the contract that this attendant
// is watching over. We'll use this when we decide that we need to go
// to chain. The returned summary contains all items needed to
// eventually resolve all outputs on chain.
ForceCloseChan func() (*lnwallet.ForceCloseSummary, error)
// CloseChannel is a function closure that marks a channel under watch
// as "closing". In this phase, we will no longer accept any updates to
// the channel as the commitment transaction has been broadcast, and
// possibly fully confirmed.
CloseChannel func(*channeldb.ChannelCloseSummary) error
// MarkChannelResolved is a function closure that serves to mark a
// channel as "fully resolved". A channel itself can be considered
// fully resolved once all active contracts have individually been
// fully resolved.
//
// TODO(roasbeef): need RPC's to combine for pendingchannels RPC
MarkChannelResolved func() error
ChainArbitratorConfig
}
// htlcSet represents the set of active HTLCs on a given commitment
// transaction.
type htlcSet struct {
// incomingHTLCs is a map of all incoming HTLCs on our commitment
// transaction. We may potentially go onchain to claim the funds sent
// to us within this set.
incomingHTLCs map[uint64]channeldb.HTLC
// outgoingHTLCs is a map of all outgoing HTLCs on our commitment
// transaction. We may potentially go onchain to reclaim the funds that
// are currently in limbo.
outgoingHTLCs map[uint64]channeldb.HTLC
}
// newHtlcSet constructs a new HTLC set from a slice of HTLC's.
func newHtlcSet(htlcs []channeldb.HTLC) htlcSet {
outHTLCs := make(map[uint64]channeldb.HTLC)
inHTLCs := make(map[uint64]channeldb.HTLC)
for _, htlc := range htlcs {
if htlc.Incoming {
inHTLCs[htlc.HtlcIndex] = htlc
continue
}
outHTLCs[htlc.HtlcIndex] = htlc
}
return htlcSet{
incomingHTLCs: inHTLCs,
outgoingHTLCs: outHTLCs,
}
}
// ChannelArbitrator is the on-chain arbitrator for a particular channel. The
// struct will keep in sync with the current set of HTLCs on the commitment
// transaction. The job of the attendant is to go on-chain to either settle or
// cancel an HTLC as necessary iff: an HTLC times out, or we known the
// pre-image to an HTLC, but it wasn't settled by the link off-chain. The
// ChannelArbitrator will factor in an expected confirmation delta when
// broadcasting to ensure that we avoid any possibility of race conditions, and
// sweep the output(s) without contest.
type ChannelArbitrator struct {
started int32
stopped int32
// log is a persistent log that the attendant will use to checkpoint
// its next action, and the state of any unresolved contracts.
log ArbitratorLog
// activeHTLCs is the set of active incoming/outgoing HTLC's on the
// commitment transaction.
activeHTLCs htlcSet
// cfg contains all the functionality that the ChannelArbitrator requires
// to do its duty.
cfg ChannelArbitratorConfig
// signalUpdates is a channel that any new live signals for the channel
// we're watching over will be sent.
signalUpdates chan *signalUpdateMsg
// htlcUpdates is a channel that is sent upon with new updates from the
// active channel. Each time a new commitment state is accepted, the
// set of HTLC's on the new state should be sent across this channel.
htlcUpdates <-chan []channeldb.HTLC
// activeResolvers is a slice of any active resolvers. This is used to
// be able to signal them for shutdown in the case that we shutdown.
activeResolvers []ContractResolver
// resolutionSignal is a channel that will be sent upon by contract
// resolvers once their contract has been fully resolved. With each
// send, we'll check to see if the contract is fully resolved.
resolutionSignal chan struct{}
// forceCloseReqs is a channel that requests to forcibly close the
// contract will be sent over.
forceCloseReqs chan *forceCloseReq
// state is the current state of the arbitrator. This state is examined
// upon start up to decide which actions to take.
state ArbitratorState
wg sync.WaitGroup
quit chan struct{}
}
// NewChannelArbitrator returns a new instance of a ChannelArbitrator backed by
// the passed config struct.
func NewChannelArbitrator(cfg ChannelArbitratorConfig,
startingHTLCs []channeldb.HTLC, log ArbitratorLog) *ChannelArbitrator {
return &ChannelArbitrator{
log: log,
signalUpdates: make(chan *signalUpdateMsg),
htlcUpdates: make(<-chan []channeldb.HTLC),
resolutionSignal: make(chan struct{}),
forceCloseReqs: make(chan *forceCloseReq),
activeHTLCs: newHtlcSet(startingHTLCs),
cfg: cfg,
quit: make(chan struct{}),
}
}
// Start starts all the goroutines that the ChannelArbitrator needs to operate.
func (c *ChannelArbitrator) Start() error {
if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
return nil
}
var (
err error
unresolvedContracts []ContractResolver
)
log.Debugf("Starting ChannelArbitrator(%v), htlc_set=%v",
c.cfg.ChanPoint, newLogClosure(func() string {
return spew.Sdump(c.activeHTLCs)
}),
)
// First, we'll read our last state from disk, so our internal state
// machine can act accordingly.
c.state, err = c.log.CurrentState()
if err != nil {
return err
}
log.Infof("ChannelArbitrator(%v): starting state=%v", c.cfg.ChanPoint,
c.state)
bestHash, bestHeight, err := c.cfg.ChainIO.GetBestBlock()
if err != nil {
return err
}
// We'll now attempt to advance our state forward based on the current
// on-chain state, and our set of active contracts.
startingState := c.state
nextState, _, err := c.advanceState(
uint32(bestHeight), bestHash, chainTrigger, nil,
)
if err != nil {
return err
}
// If we start and ended at the awiting full resolution state, then
// we'll relaunch our set of unresolved contracts.
if startingState == StateWaitingFullResolution &&
nextState == StateWaitingFullResolution {
// We'll now query our log to see if there are any active
// unresolved contracts. If this is the case, then we'll
// relaunch all contract resolvers.
unresolvedContracts, err = c.log.FetchUnresolvedContracts()
if err != nil {
return err
}
log.Infof("ChannelArbitrator(%v): relaunching %v contract "+
"resolvers", c.cfg.ChanPoint, len(unresolvedContracts))
c.activeResolvers = unresolvedContracts
for _, contract := range unresolvedContracts {
c.wg.Add(1)
go c.resolveContract(contract)
}
}
// TODO(roasbeef): cancel if breached
c.wg.Add(1)
go c.channelAttendant(bestHeight, bestHash)
return nil
}
// Stop signals the ChannelArbitrator for a graceful shutdown.
func (c *ChannelArbitrator) Stop() error {
if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
return nil
}
log.Debugf("Stopping ChannelArbitrator(%v)", c.cfg.ChanPoint)
if c.cfg.ChainEvents.Cancel != nil {
go c.cfg.ChainEvents.Cancel()
}
for _, activeResolver := range c.activeResolvers {
activeResolver.Stop()
}
close(c.quit)
c.wg.Wait()
c.cfg.BlockEpochs.Cancel()
return nil
}
// transitionTrigger is an enum that denotes exactly *why* a state transition
// was initiated. This is useful as depending on the initial trigger, we may
// skip certain states as those actions are expected to have already taken
// place as a result of the external trigger.
type transitionTrigger uint8
const (
// chainTrigger is a transition trigger that has been attempted due to
// changing on-chain conditions such as a block which times out HTLC's
// being attached.
chainTrigger transitionTrigger = iota
// remotePeerTrigger is a transition trigger driven by actions of the
// remote peer.
remotePeerTrigger
// userTrigger is a transition trigger driven by user action. Examples
// of such a trigger include a user requesting a forgive closure of the
// channel.
userTrigger
)
// String returns a human readable string describing the passed
// transitionTrigger.
func (t transitionTrigger) String() string {
switch t {
case chainTrigger:
return "chainTrigger"
case remotePeerTrigger:
return "remotePeerTrigger"
case userTrigger:
return "userTrigger"
default:
return "unknown trigger"
}
}
// stateStep is a help method that examines our internal state, and attempts
// the appropriate state transition if necessary. The next state we transition
// to is returned, Additionally, if the next transition results in a commitment
// broadcast, the commitment transaction itself is returned.
func (c *ChannelArbitrator) stateStep(bestHeight uint32, bestHash *chainhash.Hash,
trigger transitionTrigger) (ArbitratorState, *wire.MsgTx, error) {
var (
nextState ArbitratorState
closeTx *wire.MsgTx
)
switch c.state {
// If we're in the default state, then we'll check our set of actions
// to see if while we were down, conditions have changed.
case StateDefault:
log.Debugf("ChannelArbitrator(%v): new block (height=%v, "+
"hash=%v) examining active HTLC's",
c.cfg.ChanPoint, bestHeight, bestHash)
// As a new block has been connected to the end of the main
// chain, we'll check to see if we need to make any on-chain
// claims on behalf of the channel contract that we're
// arbitrating for.
chainActions := c.checkChainActions(uint32(bestHeight),
trigger)
// If there are no actions to be made, then we'll remain in the
// default state. If this isn't a self initiated event (we're
// checking due to a chain update), then we'll exit now.
if len(chainActions) == 0 && trigger == chainTrigger {
log.Tracef("ChannelArbitrator(%v): no actions for "+
"chain trigger, terminating", c.cfg.ChanPoint)
return StateDefault, closeTx, nil
}
// Otherwise, we'll log that we checked the HTLC actions as the
// commitment transaction has already been broadcast.
log.Tracef("ChannelArbitrator(%v): logging chain_actions=%v",
c.cfg.ChanPoint,
newLogClosure(func() string {
return spew.Sdump(chainActions)
}))
if err := c.log.LogChainActions(chainActions); err != nil {
return StateError, closeTx, err
}
// Depending on the type of trigger, we'll either "tunnel"
// through to a farther state, or just proceed linearly to the
// next state.
switch trigger {
// If this is a chain trigger, then we'll go straight to the
// next state, as we still need to broadcast the commitment
// transaction.
case chainTrigger:
fallthrough
case userTrigger:
nextState = StateBroadcastCommit
// Otherwise, if this state advance was triggered by the remote
// peer, then we'll jump straight to the state where the
// contract has already been closed.
case remotePeerTrigger:
nextState = StateContractClosed
}
// If we're in this state, then we've decided to broadcast the
// commitment transaction. We enter this state either due to an outside
// sub-system, or because an on-chain action has been triggered.
case StateBroadcastCommit:
log.Infof("ChannelArbitrator(%v): force closing "+
"chan", c.cfg.ChanPoint)
// Now that we have all the actions decided for the set of
// HTLC's, we'll broadcast the commitment transaction, and
// signal the link to exit.
//
// TODO(roasbeef): need to report to switch that channel is
// inactive, should close link
closeSummary, err := c.cfg.ForceCloseChan()
if err != nil {
log.Errorf("ChannelArbitrator(%v): unable to "+
"force close: %v", c.cfg.ChanPoint, err)
return StateError, closeTx, err
}
closeTx = closeSummary.CloseTx
// With the close transaction in hand, broadcast the
// transaction to the network, thereby entering the post
// channel resolution state.
log.Infof("Broadcasting force close transaction, "+
"ChannelPoint(%v): %v", c.cfg.ChanPoint,
newLogClosure(func() string {
return spew.Sdump(closeTx)
}))
// At this point, we'll now broadcast the commitment
// transaction itself.
if err := c.cfg.PublishTx(closeTx); err != nil {
// TODO(roasbeef): need to check for errors (duplicate)
log.Errorf("ChannelArbitrator(%v): unable to broadcast "+
"close tx: %v", c.cfg.ChanPoint, err)
return StateError, closeTx, err
}
// As we've have broadcast the commitment transaction, we send
// out commitment output for incubation, but only if it wasn't
// trimmed. We'll need to wait for a CSV timeout before we can
// reclaim the funds.
if closeSummary.CommitResolution != nil {
log.Infof("ChannelArbitrator(%v): sending commit "+
"output for incubation", c.cfg.ChanPoint)
err = c.cfg.IncubateOutputs(
c.cfg.ChanPoint, closeSummary.CommitResolution,
nil, nil,
)
if err != nil {
// TODO(roasbeef): check for AlreadyExists errors
log.Errorf("unable to incubate commitment "+
"output: %v", err)
return StateError, closeTx, err
}
}
contractRes := ContractResolutions{
CommitHash: closeTx.TxHash(),
CommitResolution: closeSummary.CommitResolution,
HtlcResolutions: *closeSummary.HtlcResolutions,
}
// Now that the transaction has been broadcast, we can mark
// that it's has been closed to outside sub-systems.
err = c.markContractClosed(
closeTx, closeSummary.ChanSnapshot, &contractRes,
bestHeight,
)
if err != nil {
log.Errorf("unable to close contract: %v", err)
return StateError, closeTx, err
}
// With the channel force closed, we'll now log our
// resolutions, then advance our state forward.
log.Infof("ChannelArbitrator(%v): logging contract "+
"resolutions: commit=%v, num_htlcs=%v",
c.cfg.ChanPoint,
closeSummary.CommitResolution != nil,
len(closeSummary.HtlcResolutions.IncomingHTLCs)+
len(closeSummary.HtlcResolutions.OutgoingHTLCs))
err = c.log.LogContractResolutions(&contractRes)
if err != nil {
log.Errorf("unable to write resolutions: %v", err)
return StateError, closeTx, err
}
nextState = StateContractClosed
// If we're in this state, then the contract has been fully closed to
// outside sub-systems, so we'll process the prior set of on-chain
// contract actions and launch a set of resolvers.
case StateContractClosed:
// First, we'll fetch our chain actions, and both sets of
// resolutions so we can process them.
chainActions, err := c.log.FetchChainActions()
if err != nil {
log.Errorf("unable to fetch chain actions: %v", err)
return StateError, closeTx, err
}
contractResolutions, err := c.log.FetchContractResolutions()
if err != nil {
log.Errorf("unable to fetch contract resolutions: %v",
err)
return StateError, closeTx, err
}
// If the resolution is empty, then we're done here. We don't
// need to launch any resolvers, and can go straight to our
// final state.
if contractResolutions.IsEmpty() {
log.Infof("ChannelArbitrator(%v): contract "+
"resolutions empty, marking channel as fully resolved!",
c.cfg.ChanPoint)
nextState = StateFullyResolved
break
}
// Now that we know we'll need to act, we'll process the htlc
// actions, wen create the structures we need to resolve all
// outstanding contracts.
htlcResolvers, pktsToSend, err := c.prepContractResolutions(
chainActions, contractResolutions, uint32(bestHeight),
trigger,
)
if err != nil {
log.Errorf("ChannelArbitrator(%v): unable to "+
"resolve contracts: %v", c.cfg.ChanPoint, err)
return StateError, closeTx, err
}
log.Debugf("ChannelArbitrator(%v): sending resolution message=%v",
c.cfg.ChanPoint,
newLogClosure(func() string {
return spew.Sdump(pktsToSend)
}))
// With the commitment broadcast, we'll then send over all
// messages we can send immediately.
err = c.cfg.DeliverResolutionMsg(pktsToSend...)
if err != nil {
// TODO(roasbeef): make sure packet sends are idempotent
log.Errorf("unable to send pkts: %v", err)
return StateError, closeTx, err
}
log.Debugf("ChannelArbitrator(%v): inserting %v contract "+
"resolvers", c.cfg.ChanPoint, len(htlcResolvers))
err = c.log.InsertUnresolvedContracts(htlcResolvers...)
if err != nil {
return StateError, closeTx, err
}
// Finally, we'll launch all the required contract resolvers.
// Once they're all resolved, we're no longer needed.
c.activeResolvers = htlcResolvers
for _, contract := range htlcResolvers {
c.wg.Add(1)
go c.resolveContract(contract)
}
nextState = StateWaitingFullResolution
// This is our terminal state. We'll keep returning this state until
// all contracts are fully resolved.
case StateWaitingFullResolution:
log.Infof("ChannelArbitrator(%v): still awaiting contract "+
"resolution", c.cfg.ChanPoint)
nextState = StateWaitingFullResolution
// If we start as fully resolved, then we'll end as fully resolved.
case StateFullyResolved:
// To ensure that the state of the contract in persistent
// storage is properly reflected, we'll mark the contract as
// fully resolved now.
nextState = StateFullyResolved
log.Infof("ChannelPoint(%v) has been fully resolved "+
"on-chain at height=%v", c.cfg.ChanPoint, bestHeight)
return nextState, closeTx, c.cfg.MarkChannelResolved()
}
if err := c.log.CommitState(nextState); err != nil {
return StateError, nil, err
}
log.Tracef("ChannelArbitrator(%v): next_state=%v", c.cfg.ChanPoint,
nextState)
c.state = nextState
return nextState, closeTx, nil
}
// advanceState is the main driver of our state machine. This method is an
// iterative function which repeatedly attempts to advance the internal state
// of the channel arbitrator. The state will be advanced until we reach a
// redundant transition, meaning that the state transition is a noop. The final
// param is a callback that allows the caller to execute an arbitrary action
// after each state transition.
func (c *ChannelArbitrator) advanceState(currentHeight uint32,
bestHash *chainhash.Hash, trigger transitionTrigger,
stateCallback func(ArbitratorState) error) (ArbitratorState, *wire.MsgTx, error) {
var (
priorState ArbitratorState
forceCloseTx *wire.MsgTx
)
log.Tracef("ChannelArbitrator(%v): attempting state step with "+
"trigger=%v", c.cfg.ChanPoint, trigger)
// We'll continue to advance our state forward until the state we
// transition to is that same state that we started at.
for {
priorState = c.state
nextState, closeTx, err := c.stateStep(
currentHeight, bestHash, trigger,
)
if err != nil {
log.Errorf("unable to advance state: %v", err)
return priorState, nil, err
}
if forceCloseTx == nil && closeTx != nil {
forceCloseTx = closeTx
}
// If we have a state callback, then we'll attempt to execute
// it. If the callback doesn't execute successfully, then we'll
// exit early.
if stateCallback != nil {
if err := stateCallback(nextState); err != nil {
return nextState, closeTx, err
}
}
// Our termination transition is a noop transition. If we get
// our prior state back as the next state, then we'll
// terminate.
if nextState == priorState {
log.Tracef("ChannelArbitrator(%v): terminating at state=%v",
c.cfg.ChanPoint, nextState)
return nextState, forceCloseTx, nil
}
}
}
// ChainAction is an enum that that encompasses all possible on-chain actions
// we'll take for a set of HTLC's.
type ChainAction uint8
const (
// NoAction is the min chainAction type, indicating that no action
// needs to be taken for a given HTLC.
NoAction ChainAction = 0
// HtlcTimeoutAction indicates that the HTLC will timeout soon. As a
// result, we should get ready to sweep it on chain after the timeout.
HtlcTimeoutAction = 1
// HtlcClaimAction indicates that we should claim the HTLC on chain
// before its timeout period.
HtlcClaimAction = 2
// HtlcFailNowAction indicates that we should fail an outgoing HTLC
// immediately by cancelling it backwards as it has no corresponding
// output in our commitment transaction.
HtlcFailNowAction = 3
// HtlcOutgoingWatchAction indicates that we can't yet timeout this
// HTLC, but we had to go to chain on order to resolve an existing
// HTLC. In this case, we'll either: time it out once it expires, or
// will learn the pre-image if the remote party claims the output. In
// this case, well add the pre-image to our global store.
HtlcOutgoingWatchAction = 4
// HtlcIncomingWatchAction indicates that we don't yet have the
// pre-image to claim incoming HTLC, but we had to go to chain in order
// to resolve and existing HTLC. In this case, we'll either: let the
// other party time it out, or eventually learn of the pre-image, in
// which case we'll claim on chain.
HtlcIncomingWatchAction = 5
)
// String returns a human readable string describing a chain action.
func (c ChainAction) String() string {
switch c {
case NoAction:
return "NoAction"
case HtlcTimeoutAction:
return "HtlcTimeoutAction"
case HtlcClaimAction:
return "HtlcClaimAction"
case HtlcFailNowAction:
return "HtlcFailNowAction"
case HtlcOutgoingWatchAction:
return "HtlcOutgoingWatchAction"
case HtlcIncomingWatchAction:
return "HtlcIncomingWatchAction"
default:
return "<unknown action>"
}
}
// ChainActionMap is a map of a chain action, to the set of HTLC's that need to
// be acted upon for a given action type. The channel
type ChainActionMap map[ChainAction][]channeldb.HTLC
// shouldGoOnChain takes into account the absolute timeout of the HTLC, if the
// confirmation delta that we need is close, and returns a bool indicating if
// we should go on chain to claim. We do this rather than waiting up until the
// last minute as we want to ensure that when we *need* (HTLC is timed out) to
// sweep, the commitment is already confirmed.
func (c *ChannelArbitrator) shouldGoOnChain(htlcExpiry, broadcastDelta,
currentHeight uint32) bool {
// We'll calculate the broadcast cut off for this HTLC. This is the
// height that (based on our current fee estimation) we should
// broadcast in order to ensure the commitment transaction is confirmed
// before the HTLC fully expires.
broadcastCutOff := htlcExpiry - broadcastDelta
log.Tracef("ChannelArbitrator(%v): examining outgoing contract: "+
"expiry=%v, cutoff=%v, height=%v", c.cfg.ChanPoint, htlcExpiry,
broadcastCutOff, currentHeight)
// TODO(roasbeef): take into account default HTLC delta, don't need to
// broadcast immediately
// * can then batch with SINGLE | ANYONECANPAY
// We should on-chain for this HTLC, iff we're within out broadcast
// cutoff window.
return currentHeight >= broadcastCutOff
}
// checkChainActions is called for each new block connected to the end of the
// main chain. Given the new block height, this new method will examine all
// active HTLC's, and determine if we need to go on-chain to claim any of them.
// A map of action -> []htlc is returned, detailing what action (if any) should
// be performed for each HTLC. For timed out HTLC's, once the commitment has
// been sufficiently confirmed, the HTLC's should be canceled backwards. For
// redeemed HTLC's, we should send the pre-image back to the incoming link.
func (c *ChannelArbitrator) checkChainActions(height uint32,
trigger transitionTrigger) ChainActionMap {
// TODO(roasbeef): would need to lock channel? channel totem?
// * race condition if adding and we broadcast, etc
// * or would make each instance sync?
log.Debugf("ChannelArbitrator(%v): checking chain actions at "+
"height=%v", c.cfg.ChanPoint, height)
actionMap := make(ChainActionMap)
redeemCutoff := c.cfg.BroadcastDelta * broadcastRedeemMultiplier
// First, we'll make an initial pass over the set of incoming and
// outgoing HTLC's to decide if we need to go on chain at all.
haveChainActions := false
for _, htlc := range c.activeHTLCs.outgoingHTLCs {
// If any of our HTLC's triggered an on-chain action, then we
// can break early.
if haveChainActions {
break
}
// We'll need to go on-chain for an outgoing HTLC if it was
// never resolved downstream, and it's "close" to timing out.
haveChainActions = haveChainActions || c.shouldGoOnChain(
htlc.RefundTimeout, c.cfg.BroadcastDelta, height,
)
}
for _, htlc := range c.activeHTLCs.incomingHTLCs {
// If any of our HTLC's triggered an on-chain action, then we
// can break early.
if haveChainActions {
break
}
// We'll need to go on-chain to pull an incoming HTLC iff we
// know the pre-image and it's close to timing out. We need to
// ensure that we claim the funds that our rightfully ours
// on-chain.
if _, ok := c.cfg.PreimageDB.LookupPreimage(htlc.RHash[:]); !ok {
continue
}
haveChainActions = haveChainActions || c.shouldGoOnChain(
htlc.RefundTimeout, redeemCutoff, height,
)
}
// If we don't have any actions to make, then we'll return an empty
// action map. We only do this if this was a chain trigger though, as
// if we're going to broadcast the commitment (or the remote party) did
// we're *forced* to act on each HTLC.
if !haveChainActions && trigger == chainTrigger {
log.Tracef("ChannelArbitrator(%v): no actions to take at "+
"height=%v", c.cfg.ChanPoint, height)
return actionMap
}
// Now that we know we'll need to go on-chain, we'll examine all of our
// active outgoing HTLC's to see if we either need to: sweep them after
// a timeout (then cancel backwards), cancel them backwards
// immediately, or or watch them as they're still active contracts.
for _, htlc := range c.activeHTLCs.outgoingHTLCs {
switch {
// If the HTLC is dust, then we can cancel it backwards
// immediately as there's no matching contract to arbitrate
// on-chain. We know the HTLC is dust, if the OutputIndex
// negative.
case htlc.OutputIndex < 0:
log.Tracef("ChannelArbitrator(%v): immediately "+
"failing dust htlc=%x", c.cfg.ChanPoint,
htlc.RHash[:])
actionMap[HtlcFailNowAction] = append(
actionMap[HtlcFailNowAction], htlc,
)
// If we don't need to immediately act on this HTLC, then we'll
// mark it still "live". After we broadcast, we'll monitor it
// until the HTLC times out to see if we can also redeem it
// on-chain.
case !c.shouldGoOnChain(
htlc.RefundTimeout, c.cfg.BroadcastDelta, height,
):
// TODO(roasbeef): also need to be able to query
// circuit map to see if HTLC hasn't been fully
// resolved
//
// * can't fail incoming until if outgoing not yet
// failed
log.Tracef("ChannelArbitrator(%v): watching chain to "+
"decide action for outgoing htlc=%x",
c.cfg.ChanPoint, htlc.RHash[:])
actionMap[HtlcOutgoingWatchAction] = append(
actionMap[HtlcOutgoingWatchAction], htlc,
)
// Otherwise, we'll update our actionMap to mark that we need
// to sweep this HTLC on-chain
default:
log.Tracef("ChannelArbitrator(%v): going on-chain to "+
"timeout htlc=%x", c.cfg.ChanPoint, htlc.RHash[:])
actionMap[HtlcTimeoutAction] = append(
actionMap[HtlcTimeoutAction], htlc,
)
}
}
// Similarly, for each incoming HTLC, now that we need to go on-chain,
// we'll either: sweep it immediately if we know the pre-image, or
// observe the output on-chain if we don't In this last, case we'll
// either learn of it eventually from the outgoing HTLC, or the sender
// will timeout the HTLC.
for _, htlc := range c.activeHTLCs.incomingHTLCs {
payHash := htlc.RHash
// If we have the pre-image, then we should go on-chain to
// redeem the HTLC immediately.
if _, ok := c.cfg.PreimageDB.LookupPreimage(payHash[:]); ok {
log.Tracef("ChannelArbitrator(%v): preimage for "+
"htlc=%x is known!", c.cfg.ChanPoint, payHash[:])
actionMap[HtlcClaimAction] = append(
actionMap[HtlcClaimAction], htlc,
)
continue
}
log.Tracef("ChannelArbitrator(%v): watching chain to decide "+
"action for incoming htlc=%x", c.cfg.ChanPoint,
payHash[:])
// Otherwise, we don't yet have the pre-image, but should watch
// on-chain to see if either: the remote party times out the
// HTLC, or we learn of the pre-image.
actionMap[HtlcIncomingWatchAction] = append(
actionMap[HtlcIncomingWatchAction], htlc,
)
}
return actionMap
}
// prepContractResolutions is called either int he case that we decide we need
// to go to chain, or the remote party goes to chain. Given a set of actions we
// need to take for each HTLC, this method will return a set of contract
// resolvers that will resolve the contracts on-chain if needed, and also a set
// of packets to send to the htlcswitch in order to ensure all incoming HTLC's
// are properly resolved.
func (c *ChannelArbitrator) prepContractResolutions(htlcActions ChainActionMap,
contractResolutions *ContractResolutions, height uint32,
trigger transitionTrigger,
) ([]ContractResolver, []ResolutionMsg, error) {
// There may be a class of HTLC's which we can fail back immediately,
// for those we'll prepare a slice of packets to add to our outbox. Any
// packets we need to send, will be cancels.
var (
msgsToSend []ResolutionMsg
)
incomingResolutions := contractResolutions.HtlcResolutions.IncomingHTLCs
outgoingResolutions := contractResolutions.HtlcResolutions.OutgoingHTLCs
// We'll use these two maps to quickly look up an active HTLC with its
// matching HTLC resolution.
outResolutionMap := make(map[wire.OutPoint]lnwallet.OutgoingHtlcResolution)
inResolutionMap := make(map[wire.OutPoint]lnwallet.IncomingHtlcResolution)
for i := 0; i < len(incomingResolutions); i++ {
inRes := incomingResolutions[i]
// If we have a success transaction, then the htlc's outpoint
// is the transaction's only input. Otherwise, it's the claim
// point.
var htlcPoint wire.OutPoint
if inRes.SignedSuccessTx != nil {
htlcPoint = inRes.SignedSuccessTx.TxIn[0].PreviousOutPoint
} else {
htlcPoint = inRes.ClaimOutpoint
}
inResolutionMap[htlcPoint] = inRes
}
for i := 0; i < len(outgoingResolutions); i++ {
outRes := outgoingResolutions[i]
// If we have a timeout transaction, then the htlc's outpoint
// is the transaction's only input. Otherwise, it's the claim
// point.
var htlcPoint wire.OutPoint
if outRes.SignedTimeoutTx != nil {
htlcPoint = outRes.SignedTimeoutTx.TxIn[0].PreviousOutPoint
} else {
htlcPoint = outRes.ClaimOutpoint
}
outResolutionMap[htlcPoint] = outRes
}
// We'll create the resolver kit that we'll be cloning for each
// resolver so they each can do their duty.
resKit := ResolverKit{
ChannelArbitratorConfig: c.cfg,
Checkpoint: func(res ContractResolver) error {
return c.log.InsertUnresolvedContracts(res)
},
}
commitHash := contractResolutions.CommitHash
failureMsg := &lnwire.FailPermanentChannelFailure{}
// For each HTLC, we'll either act immediately, meaning we'll instantly
// fail the HTLC, or we'll act only once the transaction has been
// confirmed, in which case we'll need an HTLC resolver.
var htlcResolvers []ContractResolver
for htlcAction, htlcs := range htlcActions {
switch htlcAction {
// If we can fail an HTLC immediately (an outgoing HTLC with no
// contract), then we'll assemble an HTLC fail packet to send.
case HtlcFailNowAction:
for _, htlc := range htlcs {
failMsg := ResolutionMsg{
SourceChan: c.cfg.ShortChanID,
HtlcIndex: htlc.HtlcIndex,
Failure: failureMsg,
}
msgsToSend = append(msgsToSend, failMsg)
}
// If we can claim this HTLC, we'll create an HTLC resolver to
// claim the HTLC (second-level or directly), then add the pre
case HtlcClaimAction:
for _, htlc := range htlcs {
htlcOp := wire.OutPoint{
Hash: commitHash,
Index: uint32(htlc.OutputIndex),
}
resolution, ok := inResolutionMap[htlcOp]
if !ok {
// TODO(roasbeef): panic?
log.Errorf("ChannelArbitrator(%v) unable to find "+
"incoming resolution: %v",
c.cfg.ChanPoint, htlcOp)
continue
}
resKit.Quit = make(chan struct{})
resolver := &htlcSuccessResolver{
htlcResolution: resolution,
broadcastHeight: height,
payHash: htlc.RHash,
ResolverKit: resKit,
}
htlcResolvers = append(htlcResolvers, resolver)
}
// If we can timeout the HTLC directly, then we'll create the
// proper resolver to to so, who will then cancel the packet
// backwards.
case HtlcTimeoutAction:
for _, htlc := range htlcs {
htlcOp := wire.OutPoint{
Hash: commitHash,
Index: uint32(htlc.OutputIndex),
}
resolution, ok := outResolutionMap[htlcOp]
if !ok {
log.Errorf("ChannelArbitrator(%v) unable to find "+
"outgoing resolution: %v", c.cfg.ChanPoint, htlcOp)
continue
}
resKit.Quit = make(chan struct{})
resolver := &htlcTimeoutResolver{
htlcResolution: resolution,
broadcastHeight: height,
htlcIndex: htlc.HtlcIndex,
ResolverKit: resKit,
}
htlcResolvers = append(htlcResolvers, resolver)
}
// If this is an incoming HTLC, but we can't act yet, then
// we'll create an incoming resolver to redeem the HTLC if we
// learn of the pre-image, or let the remote party time out.
case HtlcIncomingWatchAction:
for _, htlc := range htlcs {
htlcOp := wire.OutPoint{
Hash: commitHash,
Index: uint32(htlc.OutputIndex),
}
// TODO(roasbeef): need to handle incoming dust...
// TODO(roasbeef): can't be negative!!!
resolution, ok := inResolutionMap[htlcOp]
if !ok {
log.Errorf("ChannelArbitrator(%v) unable to find "+
"incoming resolution: %v",
c.cfg.ChanPoint, htlcOp)
continue
}
resKit.Quit = make(chan struct{})
resolver := &htlcIncomingContestResolver{
htlcExpiry: htlc.RefundTimeout,
htlcSuccessResolver: htlcSuccessResolver{
htlcResolution: resolution,
broadcastHeight: height,
payHash: htlc.RHash,
ResolverKit: resKit,
},
}
htlcResolvers = append(htlcResolvers, resolver)
}
// Finally, if this is an outgoing HTLC we've sent, then we'll
// launch a resolver to watch for the pre-image (and settle
// backwards), or just timeout.
case HtlcOutgoingWatchAction:
for _, htlc := range htlcs {
htlcOp := wire.OutPoint{
Hash: commitHash,
Index: uint32(htlc.OutputIndex),
}
resolution, ok := outResolutionMap[htlcOp]
if !ok {
log.Errorf("ChannelArbitrator(%v) unable to find "+
"outgoing resolution: %v",
c.cfg.ChanPoint, htlcOp)
continue
}
resKit.Quit = make(chan struct{})
resolver := &htlcOutgoingContestResolver{
htlcTimeoutResolver{
htlcResolution: resolution,
broadcastHeight: height,
htlcIndex: htlc.HtlcIndex,
ResolverKit: resKit,
},
}
htlcResolvers = append(htlcResolvers, resolver)
}
}
}
// Finally, if this is was a unilateral closure, then we'll also create
// a resolver to sweep our commitment output (but only if it wasn't
// trimmed).
if contractResolutions.CommitResolution != nil {
resKit.Quit = make(chan struct{})
resolver := &commitSweepResolver{
commitResolution: *contractResolutions.CommitResolution,
broadcastHeight: height,
chanPoint: c.cfg.ChanPoint,
ResolverKit: resKit,
}
htlcResolvers = append(htlcResolvers, resolver)
}
return htlcResolvers, msgsToSend, nil
}
// resolveContract is a goroutien tasked with fully resolving an unresolved
// contract. Either the initial contract will be resolved after a single step,
// or the contract will itself create another contract to be resolved. In
// either case, one the contract has been fully resolved, we'll signal back to
// the main goroutine so it can properly keep track of the set of unresolved
// contracts.
//
// NOTE: This MUST be run as a goroutine.
func (c *ChannelArbitrator) resolveContract(currentContract ContractResolver) {
defer c.wg.Done()
log.Debugf("ChannelArbitrator(%v): attempting to resolve %T",
c.cfg.ChanPoint, currentContract)
// Until the contract is fully resolved, we'll continue to iteratively
// resolve the contract one step at a time.
for !currentContract.IsResolved() {
select {
// If we've been signalled to quit, then we'll exit early.
case <-c.quit:
default:
// Otherwise, we'll attempt to resolve the current
// contract.
nextContract, err := currentContract.Resolve()
if err != nil {
log.Errorf("ChannelArbitrator(%v): unable to "+
"progress resolver: %v", c.cfg.ChanPoint, err)
return
}
switch {
// If this contract produced another, then this means
// the current contract was only able to be partially
// resolved in this step. So we'll not a contract swap
// within our logs: the new contract will take the
// place of the old one.
case nextContract != nil:
log.Tracef("ChannelArbitrator(%v): swapping "+
"out contract %T for %T ",
c.cfg.ChanPoint, currentContract,
nextContract)
err := c.log.SwapContract(
currentContract, nextContract,
)
if err != nil {
log.Errorf("unable to add recurse "+
"contract: %v", err)
}
// As this contract produced another, we'll
// re-assign, so we can continue our resolution
// loop.
currentContract = nextContract
// If this contract is actually fully resolved, then
// we'll mark it as such within the database.
case currentContract.IsResolved():
log.Tracef("ChannelArbitrator(%v): marking "+
"contract %T fully resolved",
c.cfg.ChanPoint, currentContract)
err := c.log.ResolveContract(currentContract)
if err != nil {
log.Errorf("unable to resolve contract: %v",
err)
}
// Now that the contract has been resolved,
// well signal to the main goroutine.
select {
case c.resolutionSignal <- struct{}{}:
case <-c.quit:
return
}
}
}
}
}
// signalUpdateMsg is a struct that carries fresh signals to the
// ChannelArbitrator. We need to receive a message like this each time the
// channel becomes active, as it's internal state may change.
type signalUpdateMsg struct {
// newSignals is the set of new active signals to be sent to the
// arbitrator.
newSignals *ContractSignals
// doneChan is a channel that will be closed on the arbitrator has
// attached the new signals.
doneChan chan struct{}
}
// UpdateContractSignals updates the set of signals the ChannelArbitrator needs
// to receive from a channel in real-time in order to keep in sync with the
// latest state of the contract.
func (c *ChannelArbitrator) UpdateContractSignals(newSignals *ContractSignals) {
done := make(chan struct{})
select {
case c.signalUpdates <- &signalUpdateMsg{
newSignals: newSignals,
doneChan: done,
}:
case <-c.quit:
}
select {
case <-done:
case <-c.quit:
}
}
// channelAttendant is the primary goroutine that acts at the judicial
// arbitrator between our channel state, the remote channel peer, and the
// blockchain Our judge). This goroutine will ensure that we faithfully execute
// all clauses of our contract in the case that we need to go on-chain for a
// dispute. Currently, two such conditions warrant our intervention: when an
// outgoing HTLC is about to timeout, and when we know the pre-image for an
// incoming HTLC, but it hasn't yet been settled off-chain. In these cases,
// we'll: broadcast our commitment, cancel/settle any HTLC's backwards after
// sufficient confirmation, and finally send our set of outputs to the UTXO
// Nursery for incubation, and ultimate sweeping.
//
// NOTE: This MUST be run as a goroutine.
func (c *ChannelArbitrator) channelAttendant(bestHeight int32,
bestHash *chainhash.Hash) {
// TODO(roasbeef): tell top chain arb we're done
defer c.wg.Done()
for {
select {
// A new block has arrived, we'll examine all the active HTLC's
// to see if any of them have expired, and also update our
// track of the best current height.
case blockEpoch, ok := <-c.cfg.BlockEpochs.Epochs:
if !ok {
return
}
bestHeight = blockEpoch.Height
bestHash = blockEpoch.Hash
// If we're not in the default state, then we can
// ignore this signal as we're waiting for contract
// resolution.
if c.state != StateDefault {
continue
}
// Now that a new block has arrived, we'll attempt to
// advance our state forward.
nextState, _, err := c.advanceState(
uint32(bestHeight), bestHash, chainTrigger, nil,
)
if err != nil {
log.Errorf("unable to advance state: %v", err)
}
// If as a result of this trigger, the contract is
// fully resolved, then well exit.
if nextState == StateFullyResolved {
return
}
// A new signal update was just sent. This indicates that the
// channel under watch is now live, and may modify its internal
// state, so we'll get the most up to date signals to we can
// properly do our job.
case signalUpdate := <-c.signalUpdates:
log.Tracef("ChannelArbitrator(%v) got new signal "+
"update!", c.cfg.ChanPoint)
// First, we'll update our set of signals.
c.htlcUpdates = signalUpdate.newSignals.HtlcUpdates
c.cfg.ShortChanID = signalUpdate.newSignals.ShortChanID
// Now that the signals have been updated, we'll now
// close the done channel to signal to the caller we've
// registered the new contracts.
close(signalUpdate.doneChan)
// A new set of HTLC's has been added or removed from the
// commitment transaction. So we'll update our activeHTLCs map
// accordingly.
case newStateHTLCs := <-c.htlcUpdates:
// We'll wipe out our old set of HTLC's and instead
// monitor only the HTLC's that are still active on the
// current commitment state.
c.activeHTLCs = newHtlcSet(newStateHTLCs)
log.Tracef("ChannelArbitrator(%v): fresh set of "+
"htlcs=%v", c.cfg.ChanPoint,
newLogClosure(func() string {
return spew.Sdump(c.activeHTLCs)
}),
)
// We've cooperatively closed the channel, so we're no longer
// needed.
case <-c.cfg.ChainEvents.CooperativeClosure:
log.Infof("ChannelArbitrator(%v) closing due to co-op "+
"closure", c.cfg.ChanPoint)
return
// The remote party has broadcast the commitment on-chain.
// We'll examine our state to determine if we need to act at
// all.
case uniClosure := <-c.cfg.ChainEvents.UnilateralClosure:
if c.state != StateDefault {
continue
}
log.Infof("ChannelArbitrator(%v): remote party has "+
"closed channel out on-chain", c.cfg.ChanPoint)
// If we don't have a self output, and there are no
// active HTLC's, then we can immediately mark the
// contract as fully resolved and exit.
contractRes := &ContractResolutions{
CommitHash: *uniClosure.SpenderTxHash,
CommitResolution: uniClosure.CommitResolution,
HtlcResolutions: *uniClosure.HtlcResolutions,
}
if contractRes.IsEmpty() {
log.Infof("ChannelArbitrator(%v): contract "+
"resolutions empty, exiting", c.cfg.ChanPoint)
err := c.cfg.MarkChannelResolved()
if err != nil {
log.Errorf("unable to resolve "+
"contract: %v", err)
}
return
}
// TODO(roasbeef): modify signal to also detect
// cooperative closures?
// As we're now acting upon an event triggered by the
// broadcast of the remote commitment transaction,
// we'll swap out our active HTLC set with the set
// present on their commitment.
c.activeHTLCs = newHtlcSet(uniClosure.RemoteCommit.Htlcs)
// When processing a remote party initiated event,
// we'll skip the BroadcastCommit state, and transition
// directly to the ContractClosed state. As a result,
// we'll now manually log out set of resolutions.
stateCb := func(nextState ArbitratorState) error {
if nextState == StateContractClosed {
err := c.log.LogContractResolutions(
contractRes,
)
if err != nil {
return fmt.Errorf("unable to write "+
"resolutions: %v", err)
}
}
return nil
}
// We'll now advance our state machine until it reaches
// a terminal state.
_, _, err := c.advanceState(
uint32(bestHeight), bestHash,
remotePeerTrigger, stateCb,
)
if err != nil {
log.Errorf("unable to advance state: %v", err)
}
// A new contract has just been resolved, we'll now check our
// log to see if all contracts have been resolved. If so, then
// we can exit as the contract is fully resolved.
case <-c.resolutionSignal:
log.Infof("ChannelArbitrator(%v): a contract has been "+
"fully resolved!", c.cfg.ChanPoint)
numUnresolved, err := c.log.FetchUnresolvedContracts()
if err != nil {
log.Errorf("unable to query resolved "+
"contracts: %v", err)
}
// If we still have unresolved contracts, then we'll
// stay alive to oversee their resolution.
if len(numUnresolved) != 0 {
continue
}
log.Infof("ChannelArbitrator(%v): all contracts fully "+
"resolved, exiting", c.cfg.ChanPoint)
// Otherwise, our job is finished here, the contract is
// now fully resolved! We'll mark it as such, then exit
// ourselves.
if err := c.cfg.MarkChannelResolved(); err != nil {
log.Errorf("unable to mark contract "+
"resolved: %v", err)
}
return
// We've just received a request to forcibly close out the
// channel. We'll
case closeReq := <-c.forceCloseReqs:
if c.state != StateDefault {
continue
}
nextState, closeTx, err := c.advanceState(
uint32(bestHeight), bestHash, userTrigger, nil,
)
if err != nil {
log.Errorf("unable to advance state: %v", err)
}
select {
case closeReq.closeTx <- closeTx:
case <-c.quit:
return
}
select {
case closeReq.errResp <- err:
case <-c.quit:
return
}
// If we don't have anything further to do after
// advancing our state, then we'll exit.
if nextState == StateFullyResolved {
log.Infof("ChannelArbitrator(%v): all "+
"contracts resolved, exiting",
c.cfg.ChanPoint)
}
case <-c.quit:
return
}
}
}
// markContractClosed marks a contract as "pending closed". After this state,
// upon restart, we'll no longer watch for updates to the set of contracts as
// the channel cannot be updated any longer.
func (c *ChannelArbitrator) markContractClosed(closeTx *wire.MsgTx,
chanSnapshot channeldb.ChannelSnapshot,
contractResolution *ContractResolutions,
closeHeight uint32) error {
// TODO(roasbeef): also need height info?
closeInfo := &channeldb.ChannelCloseSummary{
ChanPoint: chanSnapshot.ChannelPoint,
ChainHash: chanSnapshot.ChainHash,
ClosingTXID: closeTx.TxHash(),
RemotePub: &chanSnapshot.RemoteIdentity,
Capacity: chanSnapshot.Capacity,
CloseType: channeldb.ForceClose,
IsPending: true,
ShortChanID: c.cfg.ShortChanID,
CloseHeight: closeHeight,
}
// If our commitment output isn't dust or we have active HTLC's on the
// commitment transaction, then we'll populate the balances on the
// close channel summary.
if contractResolution.CommitResolution != nil {
closeInfo.SettledBalance = chanSnapshot.LocalBalance.ToSatoshis()
closeInfo.TimeLockedBalance = chanSnapshot.LocalBalance.ToSatoshis()
}
for _, htlc := range contractResolution.HtlcResolutions.OutgoingHTLCs {
htlcValue := btcutil.Amount(htlc.SweepSignDesc.Output.Value)
closeInfo.TimeLockedBalance += htlcValue
}
return c.cfg.CloseChannel(closeInfo)
}