diff --git a/contractcourt/chain_arbitrator.go b/contractcourt/chain_arbitrator.go index ad6a0692..75ab5c10 100644 --- a/contractcourt/chain_arbitrator.go +++ b/contractcourt/chain_arbitrator.go @@ -130,6 +130,10 @@ type ChainArbitrator struct { // open, and not fully resolved. activeChannels map[wire.OutPoint]*ChannelArbitrator + // activeWatchers is a map of all the active chainWatchers for channels + // that are still considered open. + activeWatchers map[wire.OutPoint]*chainWatcher + // cfg is the config struct for the arbitrator that contains all // methods and interface it needs to operate. cfg ChainArbitratorConfig @@ -151,6 +155,7 @@ func NewChainArbitrator(cfg ChainArbitratorConfig, return &ChainArbitrator{ cfg: cfg, activeChannels: make(map[wire.OutPoint]*ChannelArbitrator), + activeWatchers: make(map[wire.OutPoint]*chainWatcher), chanSource: db, quit: make(chan struct{}), } @@ -159,7 +164,7 @@ func NewChainArbitrator(cfg ChainArbitratorConfig, // newActiveChannelArbitrator creates a new instance of an active channel // arbitrator given the state of the target channel. func newActiveChannelArbitrator(channel *channeldb.OpenChannel, - c *ChainArbitrator) (*ChannelArbitrator, error) { + c *ChainArbitrator, chanEvents *ChainEventSubscription) (*ChannelArbitrator, error) { log.Tracef("Creating ChannelArbitrator for ChannelPoint(%v)", channel.FundingOutpoint) @@ -208,12 +213,11 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel, } chanMachine, err := lnwallet.NewLightningChannel( - c.cfg.Signer, nil, c.cfg.PreimageDB, channel) + c.cfg.Signer, c.cfg.PreimageDB, channel) if err != nil { return nil, err } chanMachine.Stop() - chanMachine.CancelObserver() if err := c.cfg.MarkLinkInactive(chanPoint); err != nil { log.Errorf("unable to mark link inactive: %v", err) @@ -228,6 +232,7 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel, return channel.CloseChannel(summary) }, ChainArbitratorConfig: c.cfg, + ChainEvents: chanEvents, } // The final component needed is an arbitrator log that the arbitrator @@ -307,7 +312,19 @@ func (c *ChainArbitrator) Start() error { // For each open channel, we'll configure then launch a corresponding // ChannelArbitrator. for _, channel := range openChannels { - channelArb, err := newActiveChannelArbitrator(channel, c) + // First, we'll create an active chainWatcher for this channel + // to ensure that we detect any relevant on chain events. + chainWatcher, err := newChainWatcher( + channel, c.cfg.Notifier, c.cfg.PreimageDB, c.cfg.Signer, + ) + if err != nil { + return err + } + + c.activeWatchers[channel.FundingOutpoint] = chainWatcher + channelArb, err := newActiveChannelArbitrator( + channel, c, chainWatcher.SubscribeChannelEvents(), + ) if err != nil { return err } @@ -344,6 +361,7 @@ func (c *ChainArbitrator) Start() error { ShortChanID: closeChanInfo.ShortChanID, BlockEpochs: blockEpoch, ChainArbitratorConfig: c.cfg, + ChainEvents: &ChainEventSubscription{}, } chanLog, err := newBoltArbitratorLog( c.chanSource.DB, arbCfg, c.cfg.ChainHash, chanPoint, @@ -363,8 +381,13 @@ func (c *ChainArbitrator) Start() error { ) } - // Finally, we'll launch all the goroutines for each arbitrator so they - // can carry out their duties. + // Finally, we'll launch all the goroutines for each watcher and + // arbitrator so they can carry out their duties. + for _, watcher := range c.activeWatchers { + if err := watcher.Start(); err != nil { + return err + } + } for _, arbitrator := range c.activeChannels { if err := arbitrator.Start(); err != nil { return err @@ -390,8 +413,18 @@ func (c *ChainArbitrator) Stop() error { c.Lock() arbitrators := c.activeChannels + watchers := c.activeWatchers c.Unlock() + for chanPoint, watcher := range watchers { + log.Tracef("Attempting to stop ChainWatcher(%v)", + chanPoint) + + if err := watcher.Stop(); err != nil { + log.Errorf("unable to stop watcher for "+ + "ChannelPoint(%v): %v", chanPoint, err) + } + } for chanPoint, arbitrator := range arbitrators { log.Tracef("Attempting to stop ChannelArbitrator(%v)", chanPoint) @@ -417,14 +450,6 @@ type ContractSignals struct { // be sent over. HtlcUpdates chan []channeldb.HTLC - // UniCloseSignal is a channel that allows the ChannelArbitrator for a - // particular channel to detect if the remote party has broadcast their - // version of the commitment transaction. - // - // TODO(roasbeef): eliminate and just roll into the struct itself, all - // watching - UniCloseSignal chan *lnwallet.UnilateralCloseSummary - // ShortChanID is the up to date short channel ID for a contract. This // can change either if when the contract was added it didn't yet have // a stable identifier, or in the case of a reorg. @@ -452,17 +477,17 @@ func (c *ChainArbitrator) UpdateContractSignals(chanPoint wire.OutPoint, return nil } -// forceCloseReq is a request sent from an outsde sub-system to the arbitrator -// that watches a particular channel to broadcast the commitnet transaction, +// forceCloseReq is a request sent from an outside sub-system to the arbitrator +// that watches a particular channel to broadcast the commitment transaction, // and enter the resolution phase of the channel. type forceCloseReq struct { - // errResp is a channel that will be sent upon either in the case of force - // close success (nil error), or in the case on an erro (nil-nil error) + // errResp is a channel that will be sent upon either in the case of + // force close success (nil error), or in the case on an error. // // NOTE; This channel MUST be buffered. errResp chan error - // closeTx is a channel that carries the transaction which ultimatley + // closeTx is a channel that carries the transaction which ultimately // closed out the channel. closeTx chan *wire.MsgTx } @@ -497,7 +522,7 @@ func (c *ChainArbitrator) ForceCloseContract(chanPoint wire.OutPoint) (*wire.Msg return nil, fmt.Errorf("ChainArbitrator shutting down") } - // We'll await two resposnes: the error response, and the transaction + // We'll await two responses: the error response, and the transaction // that closed out the channel. select { case err := <-errChan: @@ -518,11 +543,11 @@ func (c *ChainArbitrator) ForceCloseContract(chanPoint wire.OutPoint) (*wire.Msg return closeTx, nil } -// RequestChannelArbitration sends the ChainArbitrator a message to create a +// WatchNewChannel sends the ChainArbitrator a message to create a // ChannelArbitrator tasked with watching over a new channel. Once a new // channel has finished its final funding flow, it should be registered with // the ChainArbitrator so we can properly react to any on-chain events. -func (c *ChainArbitrator) RequestChannelArbitration(newChan *channeldb.OpenChannel) error { +func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error { c.Lock() defer c.Unlock() @@ -536,9 +561,22 @@ func (c *ChainArbitrator) RequestChannelArbitration(newChan *channeldb.OpenChann return nil } - // First, we'll create a new channel arbitrator instance using this new + // First, also create an active chainWatcher for this channel to ensure + // that we detect any relevant on chain events. + chainWatcher, err := newChainWatcher( + newChan, c.cfg.Notifier, c.cfg.PreimageDB, c.cfg.Signer, + ) + if err != nil { + return err + } + + c.activeWatchers[newChan.FundingOutpoint] = chainWatcher + + // We'll also create a new channel arbitrator instance using this new // channel, and our internal state. - channelArb, err := newActiveChannelArbitrator(newChan, c) + channelArb, err := newActiveChannelArbitrator( + newChan, c, chainWatcher.SubscribeChannelEvents(), + ) if err != nil { return err } @@ -567,12 +605,14 @@ func (c *ChainArbitrator) ManuallyResolveChannel(chanPoint wire.OutPoint) error } if err := channelArb.Stop(); err != nil { + if err := channelArb.Start(); err != nil { return err } delete(c.activeChannels, chanPoint) return channelArb.log.WipeHistory() + return chainWatcher.Start() } // SubscribeChannelSignals... diff --git a/contractcourt/channel_arbitrator.go b/contractcourt/channel_arbitrator.go index bad3f028..5959041e 100644 --- a/contractcourt/channel_arbitrator.go +++ b/contractcourt/channel_arbitrator.go @@ -80,6 +80,11 @@ type ChannelArbitratorConfig struct { // reclaim/redeem the funds in an HTLC sent to/from us. BlockEpochs *chainntnfs.BlockEpochEvent + // ChainEvents is an active subscription to the chain watcher for this + // channel to be notified of any on-chain activity related to this + // channel. + ChainEvents *ChainEventSubscription + // ForceCloseChan should force close the contract that this attendant // is watching over. We'll use this when we decide that we need to go // to chain. The returned summary contains all items needed to @@ -164,10 +169,6 @@ type ChannelArbitrator struct { // we're watching over will be sent. signalUpdates chan *signalUpdateMsg - // uniCloseSignal is a channel that will be sent upon if we detect that - // the remote party closes the channel on-chain. - uniCloseSignal <-chan *lnwallet.UnilateralCloseSummary - // htlcUpdates is a channel that is sent upon with new updates from the // active channel. Each time a new commitment state is accepted, the // set of HTLC's on the new state should be sent across this channel. @@ -202,7 +203,6 @@ func NewChannelArbitrator(cfg ChannelArbitratorConfig, return &ChannelArbitrator{ log: log, signalUpdates: make(chan *signalUpdateMsg), - uniCloseSignal: make(<-chan *lnwallet.UnilateralCloseSummary), htlcUpdates: make(<-chan []channeldb.HTLC), resolutionSignal: make(chan struct{}), forceCloseReqs: make(chan *forceCloseReq), @@ -292,6 +292,8 @@ func (c *ChannelArbitrator) Stop() error { log.Debugf("Stopping ChannelArbitrator(%v)", c.cfg.ChanPoint) + c.cfg.ChainEvents.Cancel() + for _, activeResolver := range c.activeResolvers { activeResolver.Stop() } @@ -368,7 +370,8 @@ func (c *ChannelArbitrator) stateStep(bestHeight uint32, bestHash *chainhash.Has // chain, we'll check to see if we need to make any on-chain // claims on behalf of the channel contract that we're // arbitrating for. - chainActions := c.checkChainActions(uint32(bestHeight), trigger) + chainActions := c.checkChainActions(uint32(bestHeight), + trigger) // If there are no actions to be made, then we'll remain in the // default state. If this isn't a self initiated event (we're @@ -1320,7 +1323,6 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32, // First, we'll update our set of signals. c.htlcUpdates = signalUpdate.newSignals.HtlcUpdates - c.uniCloseSignal = signalUpdate.newSignals.UniCloseSignal c.cfg.ShortChanID = signalUpdate.newSignals.ShortChanID // Now that the signals have been updated, we'll now @@ -1347,7 +1349,7 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32, // The remote party has broadcast the commitment on-chain. // We'll examine our state to determine if we need to act at // all. - case uniClosure := <-c.uniCloseSignal: + case uniClosure := <-c.cfg.ChainEvents.UnilateralClosure: if c.state != StateDefault { continue }