diff --git a/contractcourt/chain_watcher.go b/contractcourt/chain_watcher.go index fe4f3290..cc8515ce 100644 --- a/contractcourt/chain_watcher.go +++ b/contractcourt/chain_watcher.go @@ -66,27 +66,14 @@ type ChainEventSubscription struct { Cancel func() } -// chainWatcher is a system that's assigned to every active channel. The duty -// of this system is to watch the chain for spends of the channels chan point. -// If a spend is detected then with chain watcher will notify all subscribers -// that the channel has been closed, and also give them the materials necessary -// to sweep the funds of the channel on chain eventually. -type chainWatcher struct { - started int32 - stopped int32 - - quit chan struct{} - wg sync.WaitGroup - +// chainWatcherConfig encapsulates all the necessary functions and interfaces +// needed to watch and act on on-chain events for a particular channel. +type chainWatcherConfig struct { // chanState is a snapshot of the persistent state of the channel that // we're watching. In the event of an on-chain event, we'll query the // database to ensure that we act using the most up to date state. chanState *channeldb.OpenChannel - // stateHintObfuscator is a 48-bit state hint that's used to obfuscate - // the current state number on the commitment transactions. - stateHintObfuscator [lnwallet.StateHintSize]byte - // notifier is a reference to the channel notifier that we'll use to be // notified of output spends and when transactions are confirmed. notifier chainntnfs.ChainNotifier @@ -101,6 +88,34 @@ type chainWatcher struct { // machine. signer lnwallet.Signer + // markChanClosed is a method that will be called by the watcher if it + // detects that a cooperative closure transaction has successfully been + // confirmed. + markChanClosed func() error + + // isOurAddr is a function that returns true if the passed address is + // known to us. + isOurAddr func(btcutil.Address) bool +} + +// chainWatcher is a system that's assigned to every active channel. The duty +// of this system is to watch the chain for spends of the channels chan point. +// If a spend is detected then with chain watcher will notify all subscribers +// that the channel has been closed, and also give them the materials necessary +// to sweep the funds of the channel on chain eventually. +type chainWatcher struct { + started int32 + stopped int32 + + quit chan struct{} + wg sync.WaitGroup + + cfg chainWatcherConfig + + // stateHintObfuscator is a 48-bit state hint that's used to obfuscate + // the current state number on the commitment transactions. + stateHintObfuscator [lnwallet.StateHintSize]byte + // All the fields below are protected by this mutex. sync.Mutex @@ -117,30 +132,18 @@ type chainWatcher struct { // We'll use this map to keep track of all possible channel closures to // ensure out db state is correct in the end. possibleCloses map[chainhash.Hash]*channeldb.ChannelCloseSummary - - // markChanClosed is a method that will be called by the watcher if it - // detects that a cooperative closure transaction has successfully been - // confirmed. - markChanClosed func() error - - // isOurAddr is a function that returns true if the passed address is - // known to us. - isOurAddr func(btcutil.Address) bool } // newChainWatcher returns a new instance of a chainWatcher for a channel given // the chan point to watch, and also a notifier instance that will allow us to // detect on chain events. -func newChainWatcher(chanState *channeldb.OpenChannel, - notifier chainntnfs.ChainNotifier, pCache WitnessBeacon, - signer lnwallet.Signer, isOurAddr func(btcutil.Address) bool, - markChanClosed func() error) (*chainWatcher, error) { - +func newChainWatcher(cfg chainWatcherConfig) (*chainWatcher, error) { // In order to be able to detect the nature of a potential channel // closure we'll need to reconstruct the state hint bytes used to // obfuscate the commitment state number encoded in the lock time and // sequence fields. var stateHint [lnwallet.StateHintSize]byte + chanState := cfg.chanState if chanState.IsInitiator { stateHint = lnwallet.DeriveStateHintObfuscator( chanState.LocalChanCfg.PaymentBasePoint.PubKey, @@ -154,15 +157,10 @@ func newChainWatcher(chanState *channeldb.OpenChannel, } return &chainWatcher{ - chanState: chanState, + cfg: cfg, stateHintObfuscator: stateHint, - notifier: notifier, - pCache: pCache, - markChanClosed: markChanClosed, - signer: signer, quit: make(chan struct{}), clientSubscriptions: make(map[uint64]*ChainEventSubscription), - isOurAddr: isOurAddr, possibleCloses: make(map[chainhash.Hash]*channeldb.ChannelCloseSummary), }, nil } @@ -174,22 +172,23 @@ func (c *chainWatcher) Start() error { return nil } + chanState := c.cfg.chanState log.Debugf("Starting chain watcher for ChannelPoint(%v)", - c.chanState.FundingOutpoint) + chanState.FundingOutpoint) // First, we'll register for a notification to be dispatched if the // funding output is spent. - fundingOut := &c.chanState.FundingOutpoint + fundingOut := &chanState.FundingOutpoint // As a height hint, we'll try to use the opening height, but if the // channel isn't yet open, then we'll use the height it was broadcast // at. - heightHint := c.chanState.ShortChanID.BlockHeight + heightHint := chanState.ShortChanID.BlockHeight if heightHint == 0 { - heightHint = c.chanState.FundingBroadcastHeight + heightHint = chanState.FundingBroadcastHeight } - spendNtfn, err := c.notifier.RegisterSpendNtfn( + spendNtfn, err := c.cfg.notifier.RegisterSpendNtfn( fundingOut, heightHint, false, ) if err != nil { @@ -233,10 +232,10 @@ func (c *chainWatcher) SubscribeChannelEvents(syncDispatch bool) *ChainEventSubs c.Unlock() log.Debugf("New ChainEventSubscription(id=%v) for ChannelPoint(%v)", - clientID, c.chanState.FundingOutpoint) + clientID, c.cfg.chanState.FundingOutpoint) sub := &ChainEventSubscription{ - ChanPoint: c.chanState.FundingOutpoint, + ChanPoint: c.cfg.chanState.FundingOutpoint, RemoteUnilateralClosure: make(chan *lnwallet.UnilateralCloseSummary, 1), LocalUnilateralClosure: make(chan *LocalUnilateralCloseInfo, 1), CooperativeClosure: make(chan struct{}, 1), @@ -293,7 +292,7 @@ func (c *chainWatcher) closeObserver(spendNtfn *chainntnfs.SpendEvent) { defer c.wg.Done() log.Infof("Close observer for ChannelPoint(%v) active", - c.chanState.FundingOutpoint) + c.cfg.chanState.FundingOutpoint) select { // We've detected a spend of the channel onchain! Depending on @@ -314,10 +313,10 @@ func (c *chainWatcher) closeObserver(spendNtfn *chainntnfs.SpendEvent) { // prior revoked state...!!! commitTxBroadcast := commitSpend.SpendingTx - localCommit, remoteCommit, err := c.chanState.LatestCommitments() + localCommit, remoteCommit, err := c.cfg.chanState.LatestCommitments() if err != nil { log.Errorf("Unable to fetch channel state for "+ - "chan_point=%v", c.chanState.FundingOutpoint) + "chan_point=%v", c.cfg.chanState.FundingOutpoint) return } @@ -326,10 +325,10 @@ func (c *chainWatcher) closeObserver(spendNtfn *chainntnfs.SpendEvent) { // channel state object that we have. // // TODO(roasbeef): mutation is bad mkay - _, err = c.chanState.RemoteRevocationStore() + _, err = c.cfg.chanState.RemoteRevocationStore() if err != nil { log.Errorf("Unable to fetch revocation state for "+ - "chan_point=%v", c.chanState.FundingOutpoint) + "chan_point=%v", c.cfg.chanState.FundingOutpoint) return } @@ -346,7 +345,7 @@ func (c *chainWatcher) closeObserver(spendNtfn *chainntnfs.SpendEvent) { ); err != nil { log.Errorf("unable to handle local"+ "close for chan_point=%v: %v", - c.chanState.FundingOutpoint, err) + c.cfg.chanState.FundingOutpoint, err) } return } @@ -365,7 +364,7 @@ func (c *chainWatcher) closeObserver(spendNtfn *chainntnfs.SpendEvent) { } log.Warnf("Unprompted commitment broadcast for "+ - "ChannelPoint(%v) ", c.chanState.FundingOutpoint) + "ChannelPoint(%v) ", c.cfg.chanState.FundingOutpoint) // Decode the state hint encoded within the commitment // transaction to determine if this is a revoked state @@ -395,7 +394,7 @@ func (c *chainWatcher) closeObserver(spendNtfn *chainntnfs.SpendEvent) { ); err != nil { log.Errorf("unable to handle remote "+ "close for chan_point=%v: %v", - c.chanState.FundingOutpoint, err) + c.cfg.chanState.FundingOutpoint, err) } // If the state number broadcast is lower than the @@ -412,7 +411,7 @@ func (c *chainWatcher) closeObserver(spendNtfn *chainntnfs.SpendEvent) { ); err != nil { log.Errorf("unable to handle channel "+ "breach for chan_point=%v: %v", - c.chanState.FundingOutpoint, err) + c.cfg.chanState.FundingOutpoint, err) } } @@ -442,7 +441,7 @@ func (c *chainWatcher) toSelfAmount(tx *wire.MsgTx) btcutil.Amount { } for _, addr := range addrs { - if c.isOurAddr(addr) { + if c.cfg.isOurAddr(addr) { selfAmt += btcutil.Amount(txOut.Value) } } @@ -460,7 +459,7 @@ func (c *chainWatcher) dispatchCooperativeClose(commitSpend *chainntnfs.SpendDet broadcastTx := commitSpend.SpendingTx log.Infof("Cooperative closure for ChannelPoint(%v): %v", - c.chanState.FundingOutpoint, spew.Sdump(broadcastTx)) + c.cfg.chanState.FundingOutpoint, spew.Sdump(broadcastTx)) // If the input *is* final, then we'll check to see which output is // ours. @@ -469,18 +468,18 @@ func (c *chainWatcher) dispatchCooperativeClose(commitSpend *chainntnfs.SpendDet // Once this is known, we'll mark the state as pending close in the // database. closeSummary := &channeldb.ChannelCloseSummary{ - ChanPoint: c.chanState.FundingOutpoint, - ChainHash: c.chanState.ChainHash, + ChanPoint: c.cfg.chanState.FundingOutpoint, + ChainHash: c.cfg.chanState.ChainHash, ClosingTXID: *commitSpend.SpenderTxHash, - RemotePub: c.chanState.IdentityPub, - Capacity: c.chanState.Capacity, + RemotePub: c.cfg.chanState.IdentityPub, + Capacity: c.cfg.chanState.Capacity, CloseHeight: uint32(commitSpend.SpendingHeight), SettledBalance: localAmt, CloseType: channeldb.CooperativeClose, - ShortChanID: c.chanState.ShortChanID, + ShortChanID: c.cfg.chanState.ShortChanID, IsPending: true, } - err := c.chanState.CloseChannel(closeSummary) + err := c.cfg.chanState.CloseChannel(closeSummary) if err != nil && err != channeldb.ErrNoActiveChannels && err != channeldb.ErrNoChanDBExists { return fmt.Errorf("unable to close chan state: %v", err) @@ -489,7 +488,7 @@ func (c *chainWatcher) dispatchCooperativeClose(commitSpend *chainntnfs.SpendDet // Finally, we'll launch a goroutine to mark the channel as fully // closed once the transaction confirmed. go func() { - confNtfn, err := c.notifier.RegisterConfirmationsNtfn( + confNtfn, err := c.cfg.notifier.RegisterConfirmationsNtfn( commitSpend.SpenderTxHash, 1, uint32(commitSpend.SpendingHeight), ) @@ -500,7 +499,7 @@ func (c *chainWatcher) dispatchCooperativeClose(commitSpend *chainntnfs.SpendDet log.Infof("closeObserver: waiting for txid=%v to close "+ "ChannelPoint(%v) on chain", commitSpend.SpenderTxHash, - c.chanState.FundingOutpoint) + c.cfg.chanState.FundingOutpoint) select { case confInfo, ok := <-confNtfn.Confirmed: @@ -510,10 +509,11 @@ func (c *chainWatcher) dispatchCooperativeClose(commitSpend *chainntnfs.SpendDet } log.Infof("closeObserver: ChannelPoint(%v) is fully "+ - "closed, at height: %v", c.chanState.FundingOutpoint, + "closed, at height: %v", + c.cfg.chanState.FundingOutpoint, confInfo.BlockHeight) - err := c.markChanClosed() + err := c.cfg.markChanClosed() if err != nil { log.Errorf("unable to mark chan fully "+ "closed: %v", err) @@ -546,11 +546,11 @@ func (c *chainWatcher) dispatchLocalForceClose( localCommit channeldb.ChannelCommitment) error { log.Infof("Local unilateral close of ChannelPoint(%v) "+ - "detected", c.chanState.FundingOutpoint) + "detected", c.cfg.chanState.FundingOutpoint) forceClose, err := lnwallet.NewLocalForceCloseSummary( - c.chanState, c.signer, c.pCache, commitSpend.SpendingTx, - localCommit, + c.cfg.chanState, c.cfg.signer, c.cfg.pCache, + commitSpend.SpendingTx, localCommit, ) if err != nil { return err @@ -568,7 +568,7 @@ func (c *chainWatcher) dispatchLocalForceClose( Capacity: chanSnapshot.Capacity, CloseType: channeldb.LocalForceClose, IsPending: true, - ShortChanID: c.chanState.ShortChanID, + ShortChanID: c.cfg.chanState.ShortChanID, CloseHeight: uint32(commitSpend.SpendingHeight), } @@ -583,7 +583,7 @@ func (c *chainWatcher) dispatchLocalForceClose( htlcValue := btcutil.Amount(htlc.SweepSignDesc.Output.Value) closeSummary.TimeLockedBalance += htlcValue } - err = c.chanState.CloseChannel(closeSummary) + err = c.cfg.chanState.CloseChannel(closeSummary) if err != nil { return fmt.Errorf("unable to delete channel state: %v", err) } @@ -614,13 +614,13 @@ func (c *chainWatcher) dispatchRemoteForceClose(commitSpend *chainntnfs.SpendDet remoteCommit channeldb.ChannelCommitment) error { log.Infof("Unilateral close of ChannelPoint(%v) "+ - "detected", c.chanState.FundingOutpoint) + "detected", c.cfg.chanState.FundingOutpoint) // First, we'll create a closure summary that contains all the // materials required to let each subscriber sweep the funds in the // channel on-chain. - uniClose, err := lnwallet.NewUnilateralCloseSummary(c.chanState, - c.signer, c.pCache, commitSpend, remoteCommit, + uniClose, err := lnwallet.NewUnilateralCloseSummary(c.cfg.chanState, + c.cfg.signer, c.cfg.pCache, commitSpend, remoteCommit, ) if err != nil { return err @@ -629,7 +629,7 @@ func (c *chainWatcher) dispatchRemoteForceClose(commitSpend *chainntnfs.SpendDet // As we've detected that the channel has been closed, immediately // delete the state from disk, creating a close summary for future // usage by related sub-systems. - err = c.chanState.CloseChannel(&uniClose.ChannelCloseSummary) + err = c.cfg.chanState.CloseChannel(&uniClose.ChannelCloseSummary) if err != nil { return fmt.Errorf("unable to delete channel state: %v", err) } @@ -665,9 +665,9 @@ func (c *chainWatcher) dispatchContractBreach(spendEvent *chainntnfs.SpendDetail log.Warnf("Remote peer has breached the channel contract for "+ "ChannelPoint(%v). Revoked state #%v was broadcast!!!", - c.chanState.FundingOutpoint, broadcastStateNum) + c.cfg.chanState.FundingOutpoint, broadcastStateNum) - if err := c.chanState.MarkBorked(); err != nil { + if err := c.cfg.chanState.MarkBorked(); err != nil { return fmt.Errorf("unable to mark channel as borked: %v", err) } @@ -681,7 +681,7 @@ func (c *chainWatcher) dispatchContractBreach(spendEvent *chainntnfs.SpendDetail // // TODO(roasbeef): move to same package retribution, err := lnwallet.NewBreachRetribution( - c.chanState, broadcastStateNum, commitTxBroadcast, + c.cfg.chanState, broadcastStateNum, commitTxBroadcast, spendHeight, ) if err != nil { @@ -742,24 +742,24 @@ func (c *chainWatcher) dispatchContractBreach(spendEvent *chainntnfs.SpendDetail // TODO(roasbeef): instead mark we got all the monies? settledBalance := remoteCommit.LocalBalance.ToSatoshis() closeSummary := channeldb.ChannelCloseSummary{ - ChanPoint: c.chanState.FundingOutpoint, - ChainHash: c.chanState.ChainHash, + ChanPoint: c.cfg.chanState.FundingOutpoint, + ChainHash: c.cfg.chanState.ChainHash, ClosingTXID: *spendEvent.SpenderTxHash, CloseHeight: spendHeight, - RemotePub: c.chanState.IdentityPub, - Capacity: c.chanState.Capacity, + RemotePub: c.cfg.chanState.IdentityPub, + Capacity: c.cfg.chanState.Capacity, SettledBalance: settledBalance, CloseType: channeldb.BreachClose, IsPending: true, - ShortChanID: c.chanState.ShortChanID, + ShortChanID: c.cfg.chanState.ShortChanID, } - if err := c.chanState.CloseChannel(&closeSummary); err != nil { + if err := c.cfg.chanState.CloseChannel(&closeSummary); err != nil { return err } log.Infof("Breached channel=%v marked pending-closed", - c.chanState.FundingOutpoint) + c.cfg.chanState.FundingOutpoint) return nil } @@ -827,7 +827,7 @@ func (c *CooperativeCloseCtx) LogPotentialClose(potentialClose *channeldb.Channe // potential close gets confirmed, we'll cancel out all other launched // goroutines. go func() { - confNtfn, err := c.watcher.notifier.RegisterConfirmationsNtfn( + confNtfn, err := c.watcher.cfg.notifier.RegisterConfirmationsNtfn( &potentialClose.ClosingTXID, 1, uint32(potentialClose.CloseHeight), ) @@ -838,7 +838,7 @@ func (c *CooperativeCloseCtx) LogPotentialClose(potentialClose *channeldb.Channe log.Infof("closeCtx: waiting for txid=%v to close "+ "ChannelPoint(%v) on chain", potentialClose.ClosingTXID, - c.watcher.chanState.FundingOutpoint) + c.watcher.cfg.chanState.FundingOutpoint) select { case confInfo, ok := <-confNtfn.Confirmed: @@ -848,7 +848,7 @@ func (c *CooperativeCloseCtx) LogPotentialClose(potentialClose *channeldb.Channe } log.Infof("closeCtx: ChannelPoint(%v) is fully closed, at "+ - "height: %v", c.watcher.chanState.FundingOutpoint, + "height: %v", c.watcher.cfg.chanState.FundingOutpoint, confInfo.BlockHeight) close(c.watchCancel) @@ -862,14 +862,14 @@ func (c *CooperativeCloseCtx) LogPotentialClose(potentialClose *channeldb.Channe } c.watcher.Unlock() - err := c.watcher.chanState.CloseChannel(potentialClose) + err := c.watcher.cfg.chanState.CloseChannel(potentialClose) if err != nil { log.Warnf("closeCtx: unable to update latest "+ "close for ChannelPoint(%v): %v", - c.watcher.chanState.FundingOutpoint, err) + c.watcher.cfg.chanState.FundingOutpoint, err) } - err = c.watcher.markChanClosed() + err = c.watcher.cfg.markChanClosed() if err != nil { log.Errorf("closeCtx: unable to mark chan fully "+ "closed: %v", err) @@ -879,7 +879,7 @@ func (c *CooperativeCloseCtx) LogPotentialClose(potentialClose *channeldb.Channe case <-c.watchCancel: log.Debugf("Exiting watch for close of txid=%v for "+ "ChannelPoint(%v)", potentialClose.ClosingTXID, - c.watcher.chanState.FundingOutpoint) + c.watcher.cfg.chanState.FundingOutpoint) case <-c.watcher.quit: return @@ -892,11 +892,11 @@ func (c *CooperativeCloseCtx) LogPotentialClose(potentialClose *channeldb.Channe // pending closed in the database, then launch a goroutine to mark the channel // fully closed upon confirmation. func (c *CooperativeCloseCtx) Finalize(preferredClose *channeldb.ChannelCloseSummary) error { - chanPoint := c.watcher.chanState.FundingOutpoint + chanPoint := c.watcher.cfg.chanState.FundingOutpoint log.Infof("Finalizing chan close for ChannelPoint(%v)", chanPoint) - err := c.watcher.chanState.CloseChannel(preferredClose) + err := c.watcher.cfg.chanState.CloseChannel(preferredClose) if err != nil { log.Errorf("closeCtx: unable to close ChannelPoint(%v): %v", chanPoint, err)