From 31aa7265b736a840c1b67300db3921889cd5af25 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 16 Jan 2018 19:52:22 -0800 Subject: [PATCH] contractcourt: add new ChainArbitrator struct as central coordinator of package MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In this commit, we add the ChainArbitrator struct. The ChainArbitrator is a special sub-system that will oversee the on-chain resolution of all active channels, and also channels that are in the pending close state. The ChainArbitrator maintains a set of ChannelArbitrators, one for each channel that hasn’t yet been fully resolved. Outside sub-systems should send new channels to the arbitrator once they’ve opened. Additionally, they can also trigger manual interventions to close out a channel on chain forcibly, or just to signal that a channel has been closed cooperatively. Finally, (for now) the ChainArbitrator should be notified once a fresh set of signals for a channel becomes available. The ChannelArbitrator for the channel will use these set of signals to be notified when an on-chain event happens. --- contractcourt/chain_arbitrator.go | 588 +++++++++++++++++++++++++ contractcourt/chain_arbitrator_test.go | 1 + 2 files changed, 589 insertions(+) create mode 100644 contractcourt/chain_arbitrator.go create mode 100644 contractcourt/chain_arbitrator_test.go diff --git a/contractcourt/chain_arbitrator.go b/contractcourt/chain_arbitrator.go new file mode 100644 index 00000000..ad6a0692 --- /dev/null +++ b/contractcourt/chain_arbitrator.go @@ -0,0 +1,588 @@ +package contractcourt + +import ( + "fmt" + "sync" + "sync/atomic" + + "github.com/lightningnetwork/lnd/chainntnfs" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnwallet" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/roasbeef/btcd/chaincfg/chainhash" + "github.com/roasbeef/btcd/wire" +) + +// ResolutionMsg is a message sent by resolvers to outside sub-systems once an +// outgoing contract has been fully resolved. For multi-hop contracts, if we +// resolve the outgoing contract, we'll also need to ensure that the incoming +// contract is resolved as well. We package the items required to resolve the +// incoming contracts within this message. +type ResolutionMsg struct { + // SourceChan identifies the channel that this message is being sent + // from. This is the channel's short channel ID. + SourceChan lnwire.ShortChannelID + + // HtlcIndex is the index of the contract within the original + // commitment trace. + HtlcIndex uint64 + + // Failure will be non-nil if the incoming contract should be cancelled + // all together. This can happen if the outgoing contract was dust, if + // if the outgoing HTLC timed out. + Failure lnwire.FailureMessage + + // PreImage will be non-nil if the incoming contract can successfully + // be redeemed. This can happen if we learn of the preimage from the + // outgoing HTLC on-chain. + PreImage *[32]byte +} + +// ChainArbitratorConfig is a configuration struct that contains all the +// function closures and interface that required to arbitrate on-chain +// contracts for a particular chain. +type ChainArbitratorConfig struct { + // ChainHash is the chain that this arbitrator is to operate within. + ChainHash chainhash.Hash + + // BroadcastDelta is the delta that we'll use to decide when to + // broadcast our commitment transaction. This value should be set + // based on our current fee estimation of the commitment transaction. + // We use this to determine when we should broadcast instead of the + // just the HTLC timeout, as we want to ensure that the commitment + // transaction is already confirmed, by the time the HTLC expires. + BroadcastDelta uint32 + + // NewSweepAddr is a function that returns a new address under control + // by the wallet. We'll use this to sweep any no-delay outputs as a + // result of unilateral channel closes. + // + // NOTE: This SHOULD return a p2wkh script. + NewSweepAddr func() ([]byte, error) + + // PublishTx reliably broadcasts a transaction to the network. Once + // this function exits without an error, then they transaction MUST + // continually be rebroadcast if needed. + PublishTx func(*wire.MsgTx) error + + // DeliverResolutionMsg is a function that will append an outgoing + // message to the "out box" for a ChannelLink. This is used to cancel + // backwards any HTLC's that are either dust, we're timing out, or + // settling on-chain to the incoming link. + DeliverResolutionMsg func(...ResolutionMsg) error + + // MarkLinkInactive is a function closure that the ChainArbitrator will + // use to mark that active HTLC's shouldn't be attempt ted to be routed + // over a particular channel. This function will be called in that a + // ChannelArbitrator decides that it needs to go to chain in order to + // resolve contracts. + // + // TODO(Roasbeef): rename, routing based + MarkLinkInactive func(wire.OutPoint) error + + // IncubateOutput sends either a incoming HTLC, an outgoing HTLC, or + // both to the utxo nursery. Once this function returns, the nursery + // should have safely persisted the outputs to disk, and should start + // the process of incubation. This is used when a resolver wishes to + // pass off the output to the nursery as we're inly waiting on an + // absolute/relative item block. + IncubateOutputs func(wire.OutPoint, *lnwallet.CommitOutputResolution, + *lnwallet.OutgoingHtlcResolution, + *lnwallet.IncomingHtlcResolution) error + + // PreimageDB is a global store of all known pre-images. We'll use this + // to decide if we should broadcast a commitment transaction to claim + // an HTLC on-chain. + PreimageDB WitnessBeacon + + // Notifier is an instance of a chain notifier we'll use to watch for + // certain on-chain events. + Notifier chainntnfs.ChainNotifier + + // Signer is a signer backed by the active lnd node. This should be + // capable of producing a signature as specified by a valid + // SignDescriptor. + Signer lnwallet.Signer + + // FeeEstimator will be used to return fee estimates. + FeeEstimator lnwallet.FeeEstimator + + // ChainIO allows us to query the state of the current main chain. + ChainIO lnwallet.BlockChainIO +} + +// ChainArbitrator is a sub-system that oversees the on-chain resolution of all +// active, and channel that are in the "pending close" state. Within the +// contractcourt package, the ChainArbitrator manages a set of active +// ContractArbitrators. Each ContractArbitrators is responsible for watching +// the chain for any activity that affects the state of the channel, and also +// for monitoring each contract in order to determine if any on-chain activity is +// required. Outside sub-systems interact with the ChainArbitrator in order to +// forcibly exit a contract, update the set of live signals for each contract, +// and to receive reports on the state of contract resolution. +type ChainArbitrator struct { + started int32 + stopped int32 + + sync.Mutex + + // activeChannels is a map of all the active contracts that are still + // open, and not fully resolved. + activeChannels map[wire.OutPoint]*ChannelArbitrator + + // cfg is the config struct for the arbitrator that contains all + // methods and interface it needs to operate. + cfg ChainArbitratorConfig + + // chanSource will be used by the ChainArbitrator to fetch all the + // active channels that it must still watch over. + chanSource *channeldb.DB + + quit chan struct{} + + wg sync.WaitGroup +} + +// NewChainArbitrator returns a new instance of the ChainArbitrator using the +// passed config struct, and backing persistent database. +func NewChainArbitrator(cfg ChainArbitratorConfig, + db *channeldb.DB) *ChainArbitrator { + + return &ChainArbitrator{ + cfg: cfg, + activeChannels: make(map[wire.OutPoint]*ChannelArbitrator), + chanSource: db, + quit: make(chan struct{}), + } +} + +// newActiveChannelArbitrator creates a new instance of an active channel +// arbitrator given the state of the target channel. +func newActiveChannelArbitrator(channel *channeldb.OpenChannel, + c *ChainArbitrator) (*ChannelArbitrator, error) { + + log.Tracef("Creating ChannelArbitrator for ChannelPoint(%v)", + channel.FundingOutpoint) + + // We'll start by registering for a block epoch notifications so this + // channel can keep track of the current state of the main chain. + // + // TODO(roasbeef): fetch best height (or pass in) so can ensure block + // epoch delivers all the notifications to + // + // TODO(roasbeef): instead 1 block epoch that multi-plexes to the rest? + // * reduces the number of goroutines + blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn() + if err != nil { + return nil, err + } + + chanPoint := channel.FundingOutpoint + + // Next we'll create the matching configuration struct that contains + // all interfaces and methods the arbitrator needs to do its job. + arbCfg := ChannelArbitratorConfig{ + ChanPoint: chanPoint, + ShortChanID: channel.ShortChanID, + BlockEpochs: blockEpoch, + ForceCloseChan: func() (*lnwallet.ForceCloseSummary, error) { + // With the channels fetched, attempt to locate + // the target channel according to its channel + // point. + dbChannels, err := c.chanSource.FetchAllChannels() + if err != nil { + return nil, err + } + var channel *channeldb.OpenChannel + for _, dbChannel := range dbChannels { + if dbChannel.FundingOutpoint == chanPoint { + channel = dbChannel + break + } + } + + // If the channel cannot be located, then we + // exit with an error to the channel. + if channel == nil { + return nil, fmt.Errorf("unable to find channel") + } + + chanMachine, err := lnwallet.NewLightningChannel( + c.cfg.Signer, nil, c.cfg.PreimageDB, channel) + if err != nil { + return nil, err + } + chanMachine.Stop() + chanMachine.CancelObserver() + + if err := c.cfg.MarkLinkInactive(chanPoint); err != nil { + log.Errorf("unable to mark link inactive: %v", err) + } + + return chanMachine.ForceClose() + }, + CloseChannel: func(summary *channeldb.ChannelCloseSummary) error { + log.Tracef("ChannelArbitrator(%v): closing "+ + "channel", chanPoint) + + return channel.CloseChannel(summary) + }, + ChainArbitratorConfig: c.cfg, + } + + // The final component needed is an arbitrator log that the arbitrator + // will use to keep track of its internal state using a backed + // persistent log. + // + // TODO(roasbeef); abstraction leak... + // * rework: adaptor method to set log scope w/ factory func + chanLog, err := newBoltArbitratorLog( + c.chanSource.DB, arbCfg, c.cfg.ChainHash, chanPoint, + ) + if err != nil { + blockEpoch.Cancel() + return nil, err + } + + arbCfg.MarkChannelResolved = func() error { + return c.resolveContract(chanPoint, chanLog) + } + + return NewChannelArbitrator( + arbCfg, channel.LocalCommitment.Htlcs, chanLog, + ), nil +} + +// resolveContract marks a contract as fully resolved within the database. +// This is only to be done once all contracts which were live on the channel +// before hitting the chain have been resolved. +func (c *ChainArbitrator) resolveContract(chanPoint wire.OutPoint, + arbLog ArbitratorLog) error { + + log.Infof("Marking ChannelPoint(%v) fully resolved", chanPoint) + + // First, we'll we'll mark the channel as fully closed from the PoV of + // the channel source. + err := c.chanSource.MarkChanFullyClosed(&chanPoint) + if err != nil { + return err + } + + // Once this has been marked as resolved, we'll wipe the log that the + // channel arbitrator was using to store its persistent state. We do + // this after marking the channel resolved, as otherwise, the + // arbitrator would be re-created, and think it was starting from the + // default state. + if err := arbLog.WipeHistory(); err != nil { + return err + } + + c.Lock() + delete(c.activeChannels, chanPoint) + c.Unlock() + + return nil +} + +// Start launches all goroutines that the ChainArbitrator needs to operate. +func (c *ChainArbitrator) Start() error { + if !atomic.CompareAndSwapInt32(&c.started, 0, 1) { + return nil + } + + log.Tracef("Starting ChainArbitrator") + + // First, we'll fetch all the channels that are still open, in order to + // collect them within our set of active contracts. + openChannels, err := c.chanSource.FetchAllChannels() + if err != nil { + return err + } + + if len(openChannels) > 0 { + log.Infof("Creating ChannelArbitrators for %v active channels", + len(openChannels)) + } + + // For each open channel, we'll configure then launch a corresponding + // ChannelArbitrator. + for _, channel := range openChannels { + channelArb, err := newActiveChannelArbitrator(channel, c) + if err != nil { + return err + } + + c.activeChannels[channel.FundingOutpoint] = channelArb + } + + // In addition to the channels that we know to be open, we'll also + // launch arbitrators to finishing resolving any channels that are in + // the pending close state. + closingChannels, err := c.chanSource.FetchClosedChannels(true) + if err != nil { + return err + } + + if len(closingChannels) > 0 { + log.Infof("Creating ChannelArbitrators for %v closing channels", + len(closingChannels)) + } + + // Next, for each channel is the closing state, we'll launch a + // corresponding more restricted resolver. + for _, closeChanInfo := range closingChannels { + blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn() + if err != nil { + return err + } + + // We can leave off the CloseContract and ForceCloseChan + // methods as the channel is already closed at this point. + chanPoint := closeChanInfo.ChanPoint + arbCfg := ChannelArbitratorConfig{ + ChanPoint: chanPoint, + ShortChanID: closeChanInfo.ShortChanID, + BlockEpochs: blockEpoch, + ChainArbitratorConfig: c.cfg, + } + chanLog, err := newBoltArbitratorLog( + c.chanSource.DB, arbCfg, c.cfg.ChainHash, chanPoint, + ) + if err != nil { + return err + } + arbCfg.MarkChannelResolved = func() error { + return c.resolveContract(chanPoint, chanLog) + } + + // We can also leave off the set of HTLC's here as since the + // channel is already in the process of being full resolved, no + // new HTLC's we be added. + c.activeChannels[chanPoint] = NewChannelArbitrator( + arbCfg, nil, chanLog, + ) + } + + // Finally, we'll launch all the goroutines for each arbitrator so they + // can carry out their duties. + for _, arbitrator := range c.activeChannels { + if err := arbitrator.Start(); err != nil { + return err + } + } + + // TODO(roasbeef): eventually move all breach watching here + + return nil +} + +// Stop signals the ChainArbitrator to trigger a graceful shutdown. Any active +// channel arbitrators will be signalled to exit, and this method will block +// until they've all exited. +func (c *ChainArbitrator) Stop() error { + if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) { + return nil + } + + log.Infof("Stopping ChainArbitrator") + + close(c.quit) + + c.Lock() + arbitrators := c.activeChannels + c.Unlock() + + for chanPoint, arbitrator := range arbitrators { + log.Tracef("Attempting to stop ChannelArbitrator(%v)", + chanPoint) + + if err := arbitrator.Stop(); err != nil { + log.Errorf("unable to stop arbitrator for "+ + "ChannelPoint(%v): %v", chanPoint, err) + } + } + + c.wg.Wait() + + return nil +} + +// ContractSignals wraps the two signals that affect the state of a channel +// being watched by an arbitrator. The two signals we care about are: the +// channel has a new set of HTLC's, and the remote party has just broadcast +// their version of the commitment transaction. +type ContractSignals struct { + // HtlcUpdates is a channel that once we new commitment updates takes + // place, the later set of HTLC's on the commitment transaction should + // be sent over. + HtlcUpdates chan []channeldb.HTLC + + // UniCloseSignal is a channel that allows the ChannelArbitrator for a + // particular channel to detect if the remote party has broadcast their + // version of the commitment transaction. + // + // TODO(roasbeef): eliminate and just roll into the struct itself, all + // watching + UniCloseSignal chan *lnwallet.UnilateralCloseSummary + + // ShortChanID is the up to date short channel ID for a contract. This + // can change either if when the contract was added it didn't yet have + // a stable identifier, or in the case of a reorg. + ShortChanID lnwire.ShortChannelID +} + +// UpdateContractSignals sends a set of active, up to date contract signals to +// the ChannelArbitrator which is has been assigned to the channel infield by +// the passed channel point. +func (c *ChainArbitrator) UpdateContractSignals(chanPoint wire.OutPoint, + signals *ContractSignals) error { + + log.Infof("Attempting to update ContractSignals for ChannelPoint(%v)", + chanPoint) + + c.Lock() + arbitrator, ok := c.activeChannels[chanPoint] + c.Unlock() + if !ok { + return fmt.Errorf("unable to find arbitrator") + } + + arbitrator.UpdateContractSignals(signals) + + return nil +} + +// forceCloseReq is a request sent from an outsde sub-system to the arbitrator +// that watches a particular channel to broadcast the commitnet transaction, +// and enter the resolution phase of the channel. +type forceCloseReq struct { + // errResp is a channel that will be sent upon either in the case of force + // close success (nil error), or in the case on an erro (nil-nil error) + // + // NOTE; This channel MUST be buffered. + errResp chan error + + // closeTx is a channel that carries the transaction which ultimatley + // closed out the channel. + closeTx chan *wire.MsgTx +} + +// ForceCloseContract attempts to force close the channel infield by the passed +// channel point. A force close will immediately terminate the contract, +// causing it to enter the resolution phase. If the force close was successful, +// then the force close transaction itself will be returned. +// +// TODO(roasbeef): just return the summary itself? +func (c *ChainArbitrator) ForceCloseContract(chanPoint wire.OutPoint) (*wire.MsgTx, error) { + c.Lock() + arbitrator, ok := c.activeChannels[chanPoint] + c.Unlock() + if !ok { + return nil, fmt.Errorf("unable to find arbitrator") + } + + log.Infof("Attempting to force close ChannelPoint(%v)", chanPoint) + + errChan := make(chan error, 1) + respChan := make(chan *wire.MsgTx, 1) + + // With the channel found, and the request crafted, we'll send over a + // force close request to the arbitrator that watches this channel. + select { + case arbitrator.forceCloseReqs <- &forceCloseReq{ + errResp: errChan, + closeTx: respChan, + }: + case <-c.quit: + return nil, fmt.Errorf("ChainArbitrator shutting down") + } + + // We'll await two resposnes: the error response, and the transaction + // that closed out the channel. + select { + case err := <-errChan: + if err != nil { + return nil, err + } + case <-c.quit: + return nil, fmt.Errorf("ChainArbitrator shutting down") + } + + var closeTx *wire.MsgTx + select { + case closeTx = <-respChan: + case <-c.quit: + return nil, fmt.Errorf("ChainArbitrator shutting down") + } + + return closeTx, nil +} + +// RequestChannelArbitration sends the ChainArbitrator a message to create a +// ChannelArbitrator tasked with watching over a new channel. Once a new +// channel has finished its final funding flow, it should be registered with +// the ChainArbitrator so we can properly react to any on-chain events. +func (c *ChainArbitrator) RequestChannelArbitration(newChan *channeldb.OpenChannel) error { + c.Lock() + defer c.Unlock() + + log.Infof("Creating new ChannelArbitrator for ChannelPoint(%v)", + newChan.FundingOutpoint) + + // If we're already watching this channel, then we'll ignore this + // request. + chanPoint := newChan.FundingOutpoint + if _, ok := c.activeChannels[chanPoint]; ok { + return nil + } + + // First, we'll create a new channel arbitrator instance using this new + // channel, and our internal state. + channelArb, err := newActiveChannelArbitrator(newChan, c) + if err != nil { + return err + } + + // With the arbitrator created, we'll add it to our set of active + // arbitrators, then launch it. + c.activeChannels[chanPoint] = channelArb + return channelArb.Start() +} + +// ManuallyResolveChannel is a method to be called by outside sub-systems if a +// channel becomes fully resolved, but due to manual intervention. An example +// of such a manual intervention includes a cooperative channel closuer. +// +// TODO(roasbeef): remove after arbs watch chain for all actions directly +func (c *ChainArbitrator) ManuallyResolveChannel(chanPoint wire.OutPoint) error { + c.Lock() + defer c.Unlock() + + log.Infof("Manually resolving ChannelArbitrator for ChannelPoint(%v)", + chanPoint) + + channelArb, ok := c.activeChannels[chanPoint] + if !ok { + return fmt.Errorf("unable to find arbitrator for: %v", chanPoint) + } + + if err := channelArb.Stop(); err != nil { + return err + } + + delete(c.activeChannels, chanPoint) + + return channelArb.log.WipeHistory() +} + +// SubscribeChannelSignals... +func (c *ChainArbitrator) SubscribeChannelSignals() error { + // TODO(roasbeef): gives signals of spends, breaches, etc + // * breach arg grabs initially + // * create new db method for *just* getting the outpoints for em all? + + return nil +} + +// TODO(roasbeef): arbitration reports +// * types: contested, waiting for success conf, etc diff --git a/contractcourt/chain_arbitrator_test.go b/contractcourt/chain_arbitrator_test.go new file mode 100644 index 00000000..236a3336 --- /dev/null +++ b/contractcourt/chain_arbitrator_test.go @@ -0,0 +1 @@ +package contractcourt