contractcourt: integrate notifications of the chainWatcher with each channel arb

In this commit, we modify the construction of the channel arbitrator to
accept a pointer to an event stream from the chain watcher that’s been
assigned to that channel. As a result, we no longer need a fresh
unilateral close signal, as the one we get from the chain watcher will
*always* be up to date.

For each active channel, we’ll now create a chainWatcher instance that
will be around until the channel is fully closed on chain.
This commit is contained in:
Olaoluwa Osuntokun 2018-01-18 14:00:35 -08:00
parent 0e14ac2063
commit 63f7bf4e65
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21
2 changed files with 74 additions and 32 deletions

@ -130,6 +130,10 @@ type ChainArbitrator struct {
// open, and not fully resolved. // open, and not fully resolved.
activeChannels map[wire.OutPoint]*ChannelArbitrator activeChannels map[wire.OutPoint]*ChannelArbitrator
// activeWatchers is a map of all the active chainWatchers for channels
// that are still considered open.
activeWatchers map[wire.OutPoint]*chainWatcher
// cfg is the config struct for the arbitrator that contains all // cfg is the config struct for the arbitrator that contains all
// methods and interface it needs to operate. // methods and interface it needs to operate.
cfg ChainArbitratorConfig cfg ChainArbitratorConfig
@ -151,6 +155,7 @@ func NewChainArbitrator(cfg ChainArbitratorConfig,
return &ChainArbitrator{ return &ChainArbitrator{
cfg: cfg, cfg: cfg,
activeChannels: make(map[wire.OutPoint]*ChannelArbitrator), activeChannels: make(map[wire.OutPoint]*ChannelArbitrator),
activeWatchers: make(map[wire.OutPoint]*chainWatcher),
chanSource: db, chanSource: db,
quit: make(chan struct{}), quit: make(chan struct{}),
} }
@ -159,7 +164,7 @@ func NewChainArbitrator(cfg ChainArbitratorConfig,
// newActiveChannelArbitrator creates a new instance of an active channel // newActiveChannelArbitrator creates a new instance of an active channel
// arbitrator given the state of the target channel. // arbitrator given the state of the target channel.
func newActiveChannelArbitrator(channel *channeldb.OpenChannel, func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
c *ChainArbitrator) (*ChannelArbitrator, error) { c *ChainArbitrator, chanEvents *ChainEventSubscription) (*ChannelArbitrator, error) {
log.Tracef("Creating ChannelArbitrator for ChannelPoint(%v)", log.Tracef("Creating ChannelArbitrator for ChannelPoint(%v)",
channel.FundingOutpoint) channel.FundingOutpoint)
@ -208,12 +213,11 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
} }
chanMachine, err := lnwallet.NewLightningChannel( chanMachine, err := lnwallet.NewLightningChannel(
c.cfg.Signer, nil, c.cfg.PreimageDB, channel) c.cfg.Signer, c.cfg.PreimageDB, channel)
if err != nil { if err != nil {
return nil, err return nil, err
} }
chanMachine.Stop() chanMachine.Stop()
chanMachine.CancelObserver()
if err := c.cfg.MarkLinkInactive(chanPoint); err != nil { if err := c.cfg.MarkLinkInactive(chanPoint); err != nil {
log.Errorf("unable to mark link inactive: %v", err) log.Errorf("unable to mark link inactive: %v", err)
@ -228,6 +232,7 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
return channel.CloseChannel(summary) return channel.CloseChannel(summary)
}, },
ChainArbitratorConfig: c.cfg, ChainArbitratorConfig: c.cfg,
ChainEvents: chanEvents,
} }
// The final component needed is an arbitrator log that the arbitrator // The final component needed is an arbitrator log that the arbitrator
@ -307,7 +312,19 @@ func (c *ChainArbitrator) Start() error {
// For each open channel, we'll configure then launch a corresponding // For each open channel, we'll configure then launch a corresponding
// ChannelArbitrator. // ChannelArbitrator.
for _, channel := range openChannels { for _, channel := range openChannels {
channelArb, err := newActiveChannelArbitrator(channel, c) // First, we'll create an active chainWatcher for this channel
// to ensure that we detect any relevant on chain events.
chainWatcher, err := newChainWatcher(
channel, c.cfg.Notifier, c.cfg.PreimageDB, c.cfg.Signer,
)
if err != nil {
return err
}
c.activeWatchers[channel.FundingOutpoint] = chainWatcher
channelArb, err := newActiveChannelArbitrator(
channel, c, chainWatcher.SubscribeChannelEvents(),
)
if err != nil { if err != nil {
return err return err
} }
@ -344,6 +361,7 @@ func (c *ChainArbitrator) Start() error {
ShortChanID: closeChanInfo.ShortChanID, ShortChanID: closeChanInfo.ShortChanID,
BlockEpochs: blockEpoch, BlockEpochs: blockEpoch,
ChainArbitratorConfig: c.cfg, ChainArbitratorConfig: c.cfg,
ChainEvents: &ChainEventSubscription{},
} }
chanLog, err := newBoltArbitratorLog( chanLog, err := newBoltArbitratorLog(
c.chanSource.DB, arbCfg, c.cfg.ChainHash, chanPoint, c.chanSource.DB, arbCfg, c.cfg.ChainHash, chanPoint,
@ -363,8 +381,13 @@ func (c *ChainArbitrator) Start() error {
) )
} }
// Finally, we'll launch all the goroutines for each arbitrator so they // Finally, we'll launch all the goroutines for each watcher and
// can carry out their duties. // arbitrator so they can carry out their duties.
for _, watcher := range c.activeWatchers {
if err := watcher.Start(); err != nil {
return err
}
}
for _, arbitrator := range c.activeChannels { for _, arbitrator := range c.activeChannels {
if err := arbitrator.Start(); err != nil { if err := arbitrator.Start(); err != nil {
return err return err
@ -390,8 +413,18 @@ func (c *ChainArbitrator) Stop() error {
c.Lock() c.Lock()
arbitrators := c.activeChannels arbitrators := c.activeChannels
watchers := c.activeWatchers
c.Unlock() c.Unlock()
for chanPoint, watcher := range watchers {
log.Tracef("Attempting to stop ChainWatcher(%v)",
chanPoint)
if err := watcher.Stop(); err != nil {
log.Errorf("unable to stop watcher for "+
"ChannelPoint(%v): %v", chanPoint, err)
}
}
for chanPoint, arbitrator := range arbitrators { for chanPoint, arbitrator := range arbitrators {
log.Tracef("Attempting to stop ChannelArbitrator(%v)", log.Tracef("Attempting to stop ChannelArbitrator(%v)",
chanPoint) chanPoint)
@ -417,14 +450,6 @@ type ContractSignals struct {
// be sent over. // be sent over.
HtlcUpdates chan []channeldb.HTLC HtlcUpdates chan []channeldb.HTLC
// UniCloseSignal is a channel that allows the ChannelArbitrator for a
// particular channel to detect if the remote party has broadcast their
// version of the commitment transaction.
//
// TODO(roasbeef): eliminate and just roll into the struct itself, all
// watching
UniCloseSignal chan *lnwallet.UnilateralCloseSummary
// ShortChanID is the up to date short channel ID for a contract. This // ShortChanID is the up to date short channel ID for a contract. This
// can change either if when the contract was added it didn't yet have // can change either if when the contract was added it didn't yet have
// a stable identifier, or in the case of a reorg. // a stable identifier, or in the case of a reorg.
@ -452,17 +477,17 @@ func (c *ChainArbitrator) UpdateContractSignals(chanPoint wire.OutPoint,
return nil return nil
} }
// forceCloseReq is a request sent from an outsde sub-system to the arbitrator // forceCloseReq is a request sent from an outside sub-system to the arbitrator
// that watches a particular channel to broadcast the commitnet transaction, // that watches a particular channel to broadcast the commitment transaction,
// and enter the resolution phase of the channel. // and enter the resolution phase of the channel.
type forceCloseReq struct { type forceCloseReq struct {
// errResp is a channel that will be sent upon either in the case of force // errResp is a channel that will be sent upon either in the case of
// close success (nil error), or in the case on an erro (nil-nil error) // force close success (nil error), or in the case on an error.
// //
// NOTE; This channel MUST be buffered. // NOTE; This channel MUST be buffered.
errResp chan error errResp chan error
// closeTx is a channel that carries the transaction which ultimatley // closeTx is a channel that carries the transaction which ultimately
// closed out the channel. // closed out the channel.
closeTx chan *wire.MsgTx closeTx chan *wire.MsgTx
} }
@ -497,7 +522,7 @@ func (c *ChainArbitrator) ForceCloseContract(chanPoint wire.OutPoint) (*wire.Msg
return nil, fmt.Errorf("ChainArbitrator shutting down") return nil, fmt.Errorf("ChainArbitrator shutting down")
} }
// We'll await two resposnes: the error response, and the transaction // We'll await two responses: the error response, and the transaction
// that closed out the channel. // that closed out the channel.
select { select {
case err := <-errChan: case err := <-errChan:
@ -518,11 +543,11 @@ func (c *ChainArbitrator) ForceCloseContract(chanPoint wire.OutPoint) (*wire.Msg
return closeTx, nil return closeTx, nil
} }
// RequestChannelArbitration sends the ChainArbitrator a message to create a // WatchNewChannel sends the ChainArbitrator a message to create a
// ChannelArbitrator tasked with watching over a new channel. Once a new // ChannelArbitrator tasked with watching over a new channel. Once a new
// channel has finished its final funding flow, it should be registered with // channel has finished its final funding flow, it should be registered with
// the ChainArbitrator so we can properly react to any on-chain events. // the ChainArbitrator so we can properly react to any on-chain events.
func (c *ChainArbitrator) RequestChannelArbitration(newChan *channeldb.OpenChannel) error { func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
@ -536,9 +561,22 @@ func (c *ChainArbitrator) RequestChannelArbitration(newChan *channeldb.OpenChann
return nil return nil
} }
// First, we'll create a new channel arbitrator instance using this new // First, also create an active chainWatcher for this channel to ensure
// that we detect any relevant on chain events.
chainWatcher, err := newChainWatcher(
newChan, c.cfg.Notifier, c.cfg.PreimageDB, c.cfg.Signer,
)
if err != nil {
return err
}
c.activeWatchers[newChan.FundingOutpoint] = chainWatcher
// We'll also create a new channel arbitrator instance using this new
// channel, and our internal state. // channel, and our internal state.
channelArb, err := newActiveChannelArbitrator(newChan, c) channelArb, err := newActiveChannelArbitrator(
newChan, c, chainWatcher.SubscribeChannelEvents(),
)
if err != nil { if err != nil {
return err return err
} }
@ -567,12 +605,14 @@ func (c *ChainArbitrator) ManuallyResolveChannel(chanPoint wire.OutPoint) error
} }
if err := channelArb.Stop(); err != nil { if err := channelArb.Stop(); err != nil {
if err := channelArb.Start(); err != nil {
return err return err
} }
delete(c.activeChannels, chanPoint) delete(c.activeChannels, chanPoint)
return channelArb.log.WipeHistory() return channelArb.log.WipeHistory()
return chainWatcher.Start()
} }
// SubscribeChannelSignals... // SubscribeChannelSignals...

@ -80,6 +80,11 @@ type ChannelArbitratorConfig struct {
// reclaim/redeem the funds in an HTLC sent to/from us. // reclaim/redeem the funds in an HTLC sent to/from us.
BlockEpochs *chainntnfs.BlockEpochEvent BlockEpochs *chainntnfs.BlockEpochEvent
// ChainEvents is an active subscription to the chain watcher for this
// channel to be notified of any on-chain activity related to this
// channel.
ChainEvents *ChainEventSubscription
// ForceCloseChan should force close the contract that this attendant // ForceCloseChan should force close the contract that this attendant
// is watching over. We'll use this when we decide that we need to go // is watching over. We'll use this when we decide that we need to go
// to chain. The returned summary contains all items needed to // to chain. The returned summary contains all items needed to
@ -164,10 +169,6 @@ type ChannelArbitrator struct {
// we're watching over will be sent. // we're watching over will be sent.
signalUpdates chan *signalUpdateMsg signalUpdates chan *signalUpdateMsg
// uniCloseSignal is a channel that will be sent upon if we detect that
// the remote party closes the channel on-chain.
uniCloseSignal <-chan *lnwallet.UnilateralCloseSummary
// htlcUpdates is a channel that is sent upon with new updates from the // htlcUpdates is a channel that is sent upon with new updates from the
// active channel. Each time a new commitment state is accepted, the // active channel. Each time a new commitment state is accepted, the
// set of HTLC's on the new state should be sent across this channel. // set of HTLC's on the new state should be sent across this channel.
@ -202,7 +203,6 @@ func NewChannelArbitrator(cfg ChannelArbitratorConfig,
return &ChannelArbitrator{ return &ChannelArbitrator{
log: log, log: log,
signalUpdates: make(chan *signalUpdateMsg), signalUpdates: make(chan *signalUpdateMsg),
uniCloseSignal: make(<-chan *lnwallet.UnilateralCloseSummary),
htlcUpdates: make(<-chan []channeldb.HTLC), htlcUpdates: make(<-chan []channeldb.HTLC),
resolutionSignal: make(chan struct{}), resolutionSignal: make(chan struct{}),
forceCloseReqs: make(chan *forceCloseReq), forceCloseReqs: make(chan *forceCloseReq),
@ -292,6 +292,8 @@ func (c *ChannelArbitrator) Stop() error {
log.Debugf("Stopping ChannelArbitrator(%v)", c.cfg.ChanPoint) log.Debugf("Stopping ChannelArbitrator(%v)", c.cfg.ChanPoint)
c.cfg.ChainEvents.Cancel()
for _, activeResolver := range c.activeResolvers { for _, activeResolver := range c.activeResolvers {
activeResolver.Stop() activeResolver.Stop()
} }
@ -368,7 +370,8 @@ func (c *ChannelArbitrator) stateStep(bestHeight uint32, bestHash *chainhash.Has
// chain, we'll check to see if we need to make any on-chain // chain, we'll check to see if we need to make any on-chain
// claims on behalf of the channel contract that we're // claims on behalf of the channel contract that we're
// arbitrating for. // arbitrating for.
chainActions := c.checkChainActions(uint32(bestHeight), trigger) chainActions := c.checkChainActions(uint32(bestHeight),
trigger)
// If there are no actions to be made, then we'll remain in the // If there are no actions to be made, then we'll remain in the
// default state. If this isn't a self initiated event (we're // default state. If this isn't a self initiated event (we're
@ -1320,7 +1323,6 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32,
// First, we'll update our set of signals. // First, we'll update our set of signals.
c.htlcUpdates = signalUpdate.newSignals.HtlcUpdates c.htlcUpdates = signalUpdate.newSignals.HtlcUpdates
c.uniCloseSignal = signalUpdate.newSignals.UniCloseSignal
c.cfg.ShortChanID = signalUpdate.newSignals.ShortChanID c.cfg.ShortChanID = signalUpdate.newSignals.ShortChanID
// Now that the signals have been updated, we'll now // Now that the signals have been updated, we'll now
@ -1347,7 +1349,7 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32,
// The remote party has broadcast the commitment on-chain. // The remote party has broadcast the commitment on-chain.
// We'll examine our state to determine if we need to act at // We'll examine our state to determine if we need to act at
// all. // all.
case uniClosure := <-c.uniCloseSignal: case uniClosure := <-c.cfg.ChainEvents.UnilateralClosure:
if c.state != StateDefault { if c.state != StateDefault {
continue continue
} }