From 7b675446f0c27915d2972edc7b21d2602a3ac852 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Mon, 22 Jan 2018 17:03:34 -0800 Subject: [PATCH] breacharbiter: properly accept new incoming channels for watching --- breacharbiter.go | 57 ++++++++++++++++++++++++++++++++++-------- fundingmanager_test.go | 6 +++-- 2 files changed, 51 insertions(+), 12 deletions(-) diff --git a/breacharbiter.go b/breacharbiter.go index 5ee799f5..2d89a300 100644 --- a/breacharbiter.go +++ b/breacharbiter.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "errors" "io" + "strings" "sync" "sync/atomic" @@ -115,7 +116,12 @@ type breachArbiter struct { // the breachArbiter that a channel has peacefully been closed. Once a // channel has been closed the arbiter no longer needs to watch for // breach closes. - settledContracts chan *wire.OutPoint + settledContracts chan wire.OutPoint + + // newContracts is a channel which is used by outside subsystems to + // notify the breachArbiter of a new contract (a channel) that should + // be watched. + newContracts chan wire.OutPoint quit chan struct{} wg sync.WaitGroup @@ -128,7 +134,8 @@ func newBreachArbiter(cfg *BreachConfig) *breachArbiter { cfg: cfg, breachObservers: make(map[wire.OutPoint]chan struct{}), breachedContracts: make(chan *retributionInfo), - settledContracts: make(chan *wire.OutPoint), + newContracts: make(chan wire.OutPoint), + settledContracts: make(chan wire.OutPoint), quit: make(chan struct{}), } } @@ -343,11 +350,44 @@ out: delete(b.breachObservers, breachInfo.chanPoint) + case chanPoint := <-b.newContracts: + // A new channel has just been opened within the + // daemon, so we launch a new breachObserver to handle + // the detection of attempted contract breaches. + settleSignal := make(chan struct{}) + + // If the contract is already being watched, then an + // additional send indicates we have a stale version of + // the contract. So we'll cancel active watcher + // goroutine to create a new instance with the latest + // contract reference. + if oldSignal, ok := b.breachObservers[chanPoint]; ok { + brarLog.Infof("ChannelPoint(%v) is now live, "+ + "abandoning state contract for live "+ + "version", chanPoint) + close(oldSignal) + } + + b.breachObservers[chanPoint] = settleSignal + + brarLog.Debugf("New contract detected, launching " + + "breachObserver") + + chainEvents, err := b.cfg.SubscribeChannelEvents(chanPoint) + if err != nil { + // TODO(roasbeef); panic? + brarLog.Errorf("unable to register for event "+ + "sub for chan_point=%v: %v", chanPoint, err) + } + + b.wg.Add(1) + go b.breachObserver(chainEvents, settleSignal) + case chanPoint := <-b.settledContracts: // A new channel has been closed either unilaterally or // cooperatively, as a result we no longer need a // breachObserver detected to the channel. - killSignal, ok := b.breachObservers[*chanPoint] + killSignal, ok := b.breachObservers[chanPoint] if !ok { brarLog.Errorf("Unable to find contract: %v", chanPoint) @@ -361,7 +401,7 @@ out: // for exit and also delete its state from our tracking // map. close(killSignal) - delete(b.breachObservers, *chanPoint) + delete(b.breachObservers, chanPoint) case <-b.quit: break out } @@ -533,8 +573,6 @@ func (b *breachArbiter) breachObserver( chanPoint := chainEvents.ChanPoint - // TODO(roasbeef): needs to get the signals from the arb!!! - brarLog.Debugf("Breach observer for ChannelPoint(%v) started ", chanPoint) @@ -553,7 +591,7 @@ func (b *breachArbiter) breachObserver( defer b.wg.Done() select { - case b.settledContracts <- &chanPoint: + case b.settledContracts <- chanPoint: case <-b.quit: } }() @@ -563,7 +601,6 @@ func (b *breachArbiter) breachObserver( // The channel has been closed by a normal means: force closing with // the latest commitment transaction. case <-chainEvents.UnilateralClosure: - // Launch a goroutine to cancel out this contract within the // breachArbiter's main goroutine. b.wg.Add(1) @@ -571,7 +608,7 @@ func (b *breachArbiter) breachObserver( defer b.wg.Done() select { - case b.settledContracts <- &chanPoint: + case b.settledContracts <- chanPoint: case <-b.quit: } }() @@ -614,7 +651,7 @@ func (b *breachArbiter) breachObserver( // the ack is successful, the close observer will mark the // channel as pending-closed in the channeldb. select { - case chainEvents.ProcessACK <- struct{}{}: + case chainEvents.ProcessACK <- err: // Bail if we failed to persist retribution info. if err != nil { return diff --git a/fundingmanager_test.go b/fundingmanager_test.go index c0eebfe6..cb1a1e76 100644 --- a/fundingmanager_test.go +++ b/fundingmanager_test.go @@ -124,7 +124,7 @@ type testNode struct { privKey *btcec.PrivateKey msgChan chan lnwire.Message announceChan chan lnwire.Message - arbiterChan chan *lnwallet.LightningChannel + arbiterChan chan wire.OutPoint publTxChan chan *wire.MsgTx fundingMgr *fundingManager peer *peer @@ -185,7 +185,7 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, sentMessages := make(chan lnwire.Message) sentAnnouncements := make(chan lnwire.Message) publTxChan := make(chan *wire.MsgTx, 1) - arbiterChan := make(chan *lnwallet.LightningChannel) + arbiterChan := make(chan wire.OutPoint) shutdownChan := make(chan struct{}) wc := &mockWalletController{ @@ -269,6 +269,7 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, RequiredRemoteDelay: func(amt btcutil.Amount) uint16 { return 4 }, + ArbiterChan: arbiterChan, WatchNewChannel: func(*channeldb.OpenChannel) error { return nil }, @@ -342,6 +343,7 @@ func recreateAliceFundingManager(t *testing.T, alice *testNode) { }, FindPeer: oldCfg.FindPeer, TempChanIDSeed: oldCfg.TempChanIDSeed, + ArbiterChan: alice.arbiterChan, FindChannel: oldCfg.FindChannel, }) if err != nil {