breacharbiter: properly accept new incoming channels for watching

This commit is contained in:
Olaoluwa Osuntokun 2018-01-22 17:03:34 -08:00
parent d368bce1da
commit 7b675446f0
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21
2 changed files with 51 additions and 12 deletions

@ -5,6 +5,7 @@ import (
"encoding/binary" "encoding/binary"
"errors" "errors"
"io" "io"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -115,7 +116,12 @@ type breachArbiter struct {
// the breachArbiter that a channel has peacefully been closed. Once a // the breachArbiter that a channel has peacefully been closed. Once a
// channel has been closed the arbiter no longer needs to watch for // channel has been closed the arbiter no longer needs to watch for
// breach closes. // breach closes.
settledContracts chan *wire.OutPoint settledContracts chan wire.OutPoint
// newContracts is a channel which is used by outside subsystems to
// notify the breachArbiter of a new contract (a channel) that should
// be watched.
newContracts chan wire.OutPoint
quit chan struct{} quit chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
@ -128,7 +134,8 @@ func newBreachArbiter(cfg *BreachConfig) *breachArbiter {
cfg: cfg, cfg: cfg,
breachObservers: make(map[wire.OutPoint]chan struct{}), breachObservers: make(map[wire.OutPoint]chan struct{}),
breachedContracts: make(chan *retributionInfo), breachedContracts: make(chan *retributionInfo),
settledContracts: make(chan *wire.OutPoint), newContracts: make(chan wire.OutPoint),
settledContracts: make(chan wire.OutPoint),
quit: make(chan struct{}), quit: make(chan struct{}),
} }
} }
@ -343,11 +350,44 @@ out:
delete(b.breachObservers, breachInfo.chanPoint) delete(b.breachObservers, breachInfo.chanPoint)
case chanPoint := <-b.newContracts:
// A new channel has just been opened within the
// daemon, so we launch a new breachObserver to handle
// the detection of attempted contract breaches.
settleSignal := make(chan struct{})
// If the contract is already being watched, then an
// additional send indicates we have a stale version of
// the contract. So we'll cancel active watcher
// goroutine to create a new instance with the latest
// contract reference.
if oldSignal, ok := b.breachObservers[chanPoint]; ok {
brarLog.Infof("ChannelPoint(%v) is now live, "+
"abandoning state contract for live "+
"version", chanPoint)
close(oldSignal)
}
b.breachObservers[chanPoint] = settleSignal
brarLog.Debugf("New contract detected, launching " +
"breachObserver")
chainEvents, err := b.cfg.SubscribeChannelEvents(chanPoint)
if err != nil {
// TODO(roasbeef); panic?
brarLog.Errorf("unable to register for event "+
"sub for chan_point=%v: %v", chanPoint, err)
}
b.wg.Add(1)
go b.breachObserver(chainEvents, settleSignal)
case chanPoint := <-b.settledContracts: case chanPoint := <-b.settledContracts:
// A new channel has been closed either unilaterally or // A new channel has been closed either unilaterally or
// cooperatively, as a result we no longer need a // cooperatively, as a result we no longer need a
// breachObserver detected to the channel. // breachObserver detected to the channel.
killSignal, ok := b.breachObservers[*chanPoint] killSignal, ok := b.breachObservers[chanPoint]
if !ok { if !ok {
brarLog.Errorf("Unable to find contract: %v", brarLog.Errorf("Unable to find contract: %v",
chanPoint) chanPoint)
@ -361,7 +401,7 @@ out:
// for exit and also delete its state from our tracking // for exit and also delete its state from our tracking
// map. // map.
close(killSignal) close(killSignal)
delete(b.breachObservers, *chanPoint) delete(b.breachObservers, chanPoint)
case <-b.quit: case <-b.quit:
break out break out
} }
@ -533,8 +573,6 @@ func (b *breachArbiter) breachObserver(
chanPoint := chainEvents.ChanPoint chanPoint := chainEvents.ChanPoint
// TODO(roasbeef): needs to get the signals from the arb!!!
brarLog.Debugf("Breach observer for ChannelPoint(%v) started ", brarLog.Debugf("Breach observer for ChannelPoint(%v) started ",
chanPoint) chanPoint)
@ -553,7 +591,7 @@ func (b *breachArbiter) breachObserver(
defer b.wg.Done() defer b.wg.Done()
select { select {
case b.settledContracts <- &chanPoint: case b.settledContracts <- chanPoint:
case <-b.quit: case <-b.quit:
} }
}() }()
@ -563,7 +601,6 @@ func (b *breachArbiter) breachObserver(
// The channel has been closed by a normal means: force closing with // The channel has been closed by a normal means: force closing with
// the latest commitment transaction. // the latest commitment transaction.
case <-chainEvents.UnilateralClosure: case <-chainEvents.UnilateralClosure:
// Launch a goroutine to cancel out this contract within the // Launch a goroutine to cancel out this contract within the
// breachArbiter's main goroutine. // breachArbiter's main goroutine.
b.wg.Add(1) b.wg.Add(1)
@ -571,7 +608,7 @@ func (b *breachArbiter) breachObserver(
defer b.wg.Done() defer b.wg.Done()
select { select {
case b.settledContracts <- &chanPoint: case b.settledContracts <- chanPoint:
case <-b.quit: case <-b.quit:
} }
}() }()
@ -614,7 +651,7 @@ func (b *breachArbiter) breachObserver(
// the ack is successful, the close observer will mark the // the ack is successful, the close observer will mark the
// channel as pending-closed in the channeldb. // channel as pending-closed in the channeldb.
select { select {
case chainEvents.ProcessACK <- struct{}{}: case chainEvents.ProcessACK <- err:
// Bail if we failed to persist retribution info. // Bail if we failed to persist retribution info.
if err != nil { if err != nil {
return return

@ -124,7 +124,7 @@ type testNode struct {
privKey *btcec.PrivateKey privKey *btcec.PrivateKey
msgChan chan lnwire.Message msgChan chan lnwire.Message
announceChan chan lnwire.Message announceChan chan lnwire.Message
arbiterChan chan *lnwallet.LightningChannel arbiterChan chan wire.OutPoint
publTxChan chan *wire.MsgTx publTxChan chan *wire.MsgTx
fundingMgr *fundingManager fundingMgr *fundingManager
peer *peer peer *peer
@ -185,7 +185,7 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
sentMessages := make(chan lnwire.Message) sentMessages := make(chan lnwire.Message)
sentAnnouncements := make(chan lnwire.Message) sentAnnouncements := make(chan lnwire.Message)
publTxChan := make(chan *wire.MsgTx, 1) publTxChan := make(chan *wire.MsgTx, 1)
arbiterChan := make(chan *lnwallet.LightningChannel) arbiterChan := make(chan wire.OutPoint)
shutdownChan := make(chan struct{}) shutdownChan := make(chan struct{})
wc := &mockWalletController{ wc := &mockWalletController{
@ -269,6 +269,7 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
RequiredRemoteDelay: func(amt btcutil.Amount) uint16 { RequiredRemoteDelay: func(amt btcutil.Amount) uint16 {
return 4 return 4
}, },
ArbiterChan: arbiterChan,
WatchNewChannel: func(*channeldb.OpenChannel) error { WatchNewChannel: func(*channeldb.OpenChannel) error {
return nil return nil
}, },
@ -342,6 +343,7 @@ func recreateAliceFundingManager(t *testing.T, alice *testNode) {
}, },
FindPeer: oldCfg.FindPeer, FindPeer: oldCfg.FindPeer,
TempChanIDSeed: oldCfg.TempChanIDSeed, TempChanIDSeed: oldCfg.TempChanIDSeed,
ArbiterChan: alice.arbiterChan,
FindChannel: oldCfg.FindChannel, FindChannel: oldCfg.FindChannel,
}) })
if err != nil { if err != nil {