breacharbiter: utilize new channel on-chain event stream to watch for breaches

In this commit, we modify the breach arbiter to no longer require
holding a channel object directly in order to receive new notifications
about possible breaches. Instead, we’ll contact the chain arbiter to
request a new channel event subscription.

As a result of the new architecture, we no longer need to receive a
handoff once the new channel comes online, as the chainWatcher will
always be active and watching the channel until it’s been closed.
This commit is contained in:
Olaoluwa Osuntokun 2018-01-18 14:06:38 -08:00
parent defa1bc3e3
commit a0cc1d1b2d
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21
2 changed files with 43 additions and 110 deletions

@ -12,6 +12,7 @@ import (
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet"
"github.com/roasbeef/btcd/blockchain" "github.com/roasbeef/btcd/blockchain"
@ -45,14 +46,6 @@ type BreachConfig struct {
// a close type to be included in the channel close summary. // a close type to be included in the channel close summary.
CloseLink func(*wire.OutPoint, htlcswitch.ChannelCloseType) CloseLink func(*wire.OutPoint, htlcswitch.ChannelCloseType)
// UpdateCloseSignal allows the breach arbiter to notify the
// ChainArbitrator that a set of new signals for the unilateral closing
// of a channel is now available. This ensures that ifa channel hasn't
// had any updates since it was live, then we're still able to act on
// on-chain events.
UpdateCloseSignal func(*wire.OutPoint,
chan *lnwallet.UnilateralCloseSummary) error
// DB provides access to the user's channels, allowing the breach // DB provides access to the user's channels, allowing the breach
// arbiter to determine the current state of a user's channels, and how // arbiter to determine the current state of a user's channels, and how
// it should respond to channel closure. // it should respond to channel closure.
@ -74,6 +67,11 @@ type BreachConfig struct {
// transaction to the network. // transaction to the network.
PublishTransaction func(*wire.MsgTx) error PublishTransaction func(*wire.MsgTx) error
// SubscribeChannelEvents is a function closure that allows goroutines
// within the breachArbiter to be notified of potential on-chain events
// related to the channels they're watching.
SubscribeChannelEvents func(wire.OutPoint) (*contractcourt.ChainEventSubscription, error)
// Signer is used by the breach arbiter to generate sweep transactions, // Signer is used by the breach arbiter to generate sweep transactions,
// which move coins from previously open channels back to the user's // which move coins from previously open channels back to the user's
// wallet. // wallet.
@ -113,11 +111,6 @@ type breachArbiter struct {
// use this to communicate with the main contractObserver goroutine. // use this to communicate with the main contractObserver goroutine.
breachedContracts chan *retributionInfo breachedContracts chan *retributionInfo
// newContracts is a channel which is used by outside subsystems to
// notify the breachArbiter of a new contract (a channel) that should
// be watched.
newContracts chan *lnwallet.LightningChannel
// settledContracts is a channel by outside subsystems to notify // settledContracts is a channel by outside subsystems to notify
// the breachArbiter that a channel has peacefully been closed. Once a // the breachArbiter that a channel has peacefully been closed. Once a
// channel has been closed the arbiter no longer needs to watch for // channel has been closed the arbiter no longer needs to watch for
@ -132,11 +125,9 @@ type breachArbiter struct {
// its dependent objects. // its dependent objects.
func newBreachArbiter(cfg *BreachConfig) *breachArbiter { func newBreachArbiter(cfg *BreachConfig) *breachArbiter {
return &breachArbiter{ return &breachArbiter{
cfg: cfg, cfg: cfg,
breachObservers: make(map[wire.OutPoint]chan struct{}), breachObservers: make(map[wire.OutPoint]chan struct{}),
breachedContracts: make(chan *retributionInfo), breachedContracts: make(chan *retributionInfo),
newContracts: make(chan *lnwallet.LightningChannel),
settledContracts: make(chan *wire.OutPoint), settledContracts: make(chan *wire.OutPoint),
quit: make(chan struct{}), quit: make(chan struct{}),
} }
@ -215,27 +206,26 @@ func (b *breachArbiter) Start() error {
// retribution store, we skip it to avoid creating a breach observer. // retribution store, we skip it to avoid creating a breach observer.
// Resolving breached channels will be handled later by spawning an // Resolving breached channels will be handled later by spawning an
// exactRetribution task for each. // exactRetribution task for each.
channelsToWatch := make([]*lnwallet.LightningChannel, 0, nActive) channelsToWatch := make([]*contractcourt.ChainEventSubscription, 0, nActive)
for _, chanState := range activeChannels { for _, chanState := range activeChannels {
// If this channel was previously breached, we skip it here to // If this channel was previously breached, we skip it here to
// avoid creating a breach observer, as we can go straight to // avoid creating a breach observer, as we can go straight to
// the task of exacting retribution. // the task of exacting retribution.
if _, ok := breachRetInfos[chanState.FundingOutpoint]; ok { chanPoint := chanState.FundingOutpoint
if _, ok := breachRetInfos[chanPoint]; ok {
continue continue
} }
// Initialize active channel from persisted channel state. // For each active channels, we'll request a chain event
channel, err := lnwallet.NewLightningChannel(nil, // subscription form the system that's overseeing the channel.
b.cfg.Notifier, nil, chanState) chainEvents, err := b.cfg.SubscribeChannelEvents(chanPoint)
if err != nil { if err != nil {
brarLog.Errorf("unable to load channel from "+
"disk: %v", err)
return err return err
} }
// Finally, add this channel to breach arbiter's list of // Finally, add this channel event stream to breach arbiter's
// channels to watch. // list of channels to watch.
channelsToWatch = append(channelsToWatch, channel) channelsToWatch = append(channelsToWatch, chainEvents)
} }
// Spawn the exactRetribution tasks to monitor and resolve any breaches // Spawn the exactRetribution tasks to monitor and resolve any breaches
@ -297,34 +287,24 @@ func (b *breachArbiter) IsBreached(chanPoint *wire.OutPoint) (bool, error) {
// channel into the daemon's wallet. // channel into the daemon's wallet.
// //
// NOTE: This MUST be run as a goroutine. // NOTE: This MUST be run as a goroutine.
func (b *breachArbiter) contractObserver( func (b *breachArbiter) contractObserver(channelEvents []*contractcourt.ChainEventSubscription) {
activeChannels []*lnwallet.LightningChannel) {
defer b.wg.Done() defer b.wg.Done()
brarLog.Infof("Starting contract observer with %v active channels", brarLog.Infof("Starting contract observer with %v active channels",
len(activeChannels)) len(channelEvents))
// For each active channel found within the database, we launch a // For each active channel found within the database, we launch a
// detected breachObserver goroutine for that channel and also track // detected breachObserver goroutine for that channel and also track
// the new goroutine within the breachObservers map so we can cancel it // the new goroutine within the breachObservers map so we can cancel it
// later if necessary. // later if necessary.
for _, channel := range activeChannels { for _, channelEvent := range channelEvents {
settleSignal := make(chan struct{}) settleSignal := make(chan struct{})
chanPoint := channel.ChanPoint chanPoint := channelEvent.ChanPoint
b.breachObservers[*chanPoint] = settleSignal b.breachObservers[chanPoint] = settleSignal
// Before we'll launch our breach observe, we'll send this
// latest set of contract signals to the ChainArbitrator.
//
// TODO(roasbeef): just move now?
err := b.cfg.UpdateCloseSignal(chanPoint, channel.UnilateralClose)
if err != nil {
brarLog.Errorf("unable to update close signals: %v", err)
}
b.wg.Add(1) b.wg.Add(1)
go b.breachObserver(channel, settleSignal) go b.breachObserver(channelEvent, settleSignal)
} }
// TODO(roasbeef): need to ensure currentHeight passed in doesn't // TODO(roasbeef): need to ensure currentHeight passed in doesn't
@ -363,37 +343,6 @@ out:
delete(b.breachObservers, breachInfo.chanPoint) delete(b.breachObservers, breachInfo.chanPoint)
case contract := <-b.newContracts:
// A new channel has just been opened within the
// daemon, so we launch a new breachObserver to handle
// the detection of attempted contract breaches.
settleSignal := make(chan struct{})
chanPoint := contract.ChanPoint
// If the contract is already being watched, then an
// additional send indicates we have a stale version of
// the contract. So we'll cancel active watcher
// goroutine to create a new instance with the latest
// contract reference.
if oldSignal, ok := b.breachObservers[*chanPoint]; ok {
brarLog.Infof("ChannelPoint(%v) is now live, "+
"abandoning state contract for live "+
"version", chanPoint)
close(oldSignal)
}
b.breachObservers[*chanPoint] = settleSignal
brarLog.Debugf("New contract detected, launching " +
"breachObserver")
b.wg.Add(1)
go b.breachObserver(contract, settleSignal)
// TODO(roasbeef): add doneChan to signal to peer
// continue * peer send over to us on
// loadActiveChanenls, sync until we're aware so no
// state transitions
case chanPoint := <-b.settledContracts: case chanPoint := <-b.settledContracts:
// A new channel has been closed either unilaterally or // A new channel has been closed either unilaterally or
// cooperatively, as a result we no longer need a // cooperatively, as a result we no longer need a
@ -573,12 +522,18 @@ func (b *breachArbiter) exactRetribution(
// generated due to the breach of channel contract. The funds will be swept // generated due to the breach of channel contract. The funds will be swept
// only after the breaching transaction receives a necessary number of // only after the breaching transaction receives a necessary number of
// confirmations. // confirmations.
func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel, func (b *breachArbiter) breachObserver(
chainEvents *contractcourt.ChainEventSubscription,
settleSignal chan struct{}) { settleSignal chan struct{}) {
defer b.wg.Done() defer func() {
b.wg.Done()
chainEvents.Cancel()
}()
chanPoint := contract.ChanPoint chanPoint := chainEvents.ChanPoint
// TODO(roasbeef): needs to get the signals from the arb!!!
brarLog.Debugf("Breach observer for ChannelPoint(%v) started ", brarLog.Debugf("Breach observer for ChannelPoint(%v) started ",
chanPoint) chanPoint)
@ -587,13 +542,11 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel,
// A read from this channel indicates that the contract has been // A read from this channel indicates that the contract has been
// settled cooperatively so we exit as our duties are no longer needed. // settled cooperatively so we exit as our duties are no longer needed.
case <-settleSignal: case <-settleSignal:
contract.CancelObserver()
contract.Stop()
return return
// The channel has been closed by a normal means: force closing with // The channel has been closed by a normal means: force closing with
// the latest commitment transaction. // the latest commitment transaction.
case <-contract.UnilateralCloseSignal: case <-chainEvents.UnilateralClosure:
// Launch a goroutine to cancel out this contract within the // Launch a goroutine to cancel out this contract within the
// breachArbiter's main goroutine. // breachArbiter's main goroutine.
@ -602,18 +555,17 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel,
defer b.wg.Done() defer b.wg.Done()
select { select {
case b.settledContracts <- chanPoint: case b.settledContracts <- &chanPoint:
case <-b.quit: case <-b.quit:
} }
}() }()
b.cfg.CloseLink(chanPoint, htlcswitch.CloseBreach) b.cfg.CloseLink(&chanPoint, htlcswitch.CloseBreach)
contract.Stop()
// A read from this channel indicates that a channel breach has been // A read from this channel indicates that a channel breach has been
// detected! So we notify the main coordination goroutine with the // detected! So we notify the main coordination goroutine with the
// information needed to bring the counterparty to justice. // information needed to bring the counterparty to justice.
case breachInfo := <-contract.ContractBreach: case breachInfo := <-chainEvents.ContractBreach:
brarLog.Warnf("REVOKED STATE #%v FOR ChannelPoint(%v) "+ brarLog.Warnf("REVOKED STATE #%v FOR ChannelPoint(%v) "+
"broadcast, REMOTE PEER IS DOING SOMETHING "+ "broadcast, REMOTE PEER IS DOING SOMETHING "+
"SKETCHY!!!", breachInfo.RevokedStateNum, "SKETCHY!!!", breachInfo.RevokedStateNum,
@ -623,7 +575,7 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel,
// breached in order to ensure any incoming or outgoing // breached in order to ensure any incoming or outgoing
// multi-hop HTLCs aren't sent over this link, nor any other // multi-hop HTLCs aren't sent over this link, nor any other
// links associated with this peer. // links associated with this peer.
b.cfg.CloseLink(chanPoint, htlcswitch.CloseBreach) b.cfg.CloseLink(&chanPoint, htlcswitch.CloseBreach)
// TODO(roasbeef): need to handle case of remote broadcast // TODO(roasbeef): need to handle case of remote broadcast
// mid-local initiated state-transition, possible // mid-local initiated state-transition, possible
@ -632,7 +584,7 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel,
// Using the breach information provided by the wallet and the // Using the breach information provided by the wallet and the
// channel snapshot, construct the retribution information that // channel snapshot, construct the retribution information that
// will be persisted to disk. // will be persisted to disk.
retInfo := newRetributionInfo(chanPoint, breachInfo) retInfo := newRetributionInfo(&chanPoint, breachInfo)
// Persist the pending retribution state to disk. // Persist the pending retribution state to disk.
err := b.cfg.Store.Add(retInfo) err := b.cfg.Store.Add(retInfo)
@ -646,21 +598,13 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel,
// the ack is successful, the close observer will mark the // the ack is successful, the close observer will mark the
// channel as pending-closed in the channeldb. // channel as pending-closed in the channeldb.
select { select {
case breachInfo.Err <- err: case chainEvents.ProcessACK <- struct{}{}:
// Bail if we failed to persist retribution info. // Bail if we failed to persist retribution info.
if err != nil { if err != nil {
return return
} }
case <-contract.ObserverQuit():
// If the close observer has already exited, it will
// never read the acknowledgment, so we exit.
return
case <-b.quit: case <-b.quit:
// Cancel the close observer if the breach arbiter is
// shutting down, dropping the acknowledgment.
contract.CancelObserver()
return return
} }
@ -672,8 +616,6 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel,
} }
case <-b.quit: case <-b.quit:
contract.Stop()
contract.CancelObserver()
return return
} }
} }

@ -409,20 +409,11 @@ func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl,
GenSweepScript: func() ([]byte, error) { GenSweepScript: func() ([]byte, error) {
return newSweepPkScript(cc.wallet) return newSweepPkScript(cc.wallet)
}, },
Notifier: cc.chainNotifier, Notifier: cc.chainNotifier,
PublishTransaction: cc.wallet.PublishTransaction, PublishTransaction: cc.wallet.PublishTransaction,
Signer: cc.wallet.Cfg.Signer, SubscribeChannelEvents: s.chainArb.SubscribeChannelEvents,
Store: newRetributionStore(chanDB), Signer: cc.wallet.Cfg.Signer,
UpdateCloseSignal: func(op *wire.OutPoint, Store: newRetributionStore(chanDB),
ucs chan *lnwallet.UnilateralCloseSummary) error {
signals := &contractcourt.ContractSignals{
HtlcUpdates: make(chan []channeldb.HTLC),
UniCloseSignal: ucs,
}
return s.chainArb.UpdateContractSignals(*op, signals)
},
}) })
// Create the connection manager which will be responsible for // Create the connection manager which will be responsible for