peer+rpcserver+breacharbiter: usel latest ChainNotifier API

This commit is contained in:
Olaoluwa Osuntokun 2017-05-10 17:27:05 -07:00
parent 77cf7ed085
commit d47f004fbd
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
3 changed files with 99 additions and 44 deletions

@ -26,6 +26,7 @@ type breachArbiter struct {
wallet *lnwallet.LightningWallet
db *channeldb.DB
notifier chainntnfs.ChainNotifier
chainIO lnwallet.BlockChainIO
htlcSwitch *htlcSwitch
// breachObservers is a map which tracks all the active breach
@ -62,12 +63,14 @@ type breachArbiter struct {
// newBreachArbiter creates a new instance of a breachArbiter initialized with
// its dependent objects.
func newBreachArbiter(wallet *lnwallet.LightningWallet, db *channeldb.DB,
notifier chainntnfs.ChainNotifier, h *htlcSwitch) *breachArbiter {
notifier chainntnfs.ChainNotifier, h *htlcSwitch,
chain lnwallet.BlockChainIO) *breachArbiter {
return &breachArbiter{
wallet: wallet,
db: db,
notifier: notifier,
chainIO: chain,
htlcSwitch: h,
breachObservers: make(map[wire.OutPoint]chan struct{}),
@ -120,6 +123,12 @@ func (b *breachArbiter) Start() error {
b.wg.Add(1)
go b.contractObserver(channelsToWatch)
// TODO(roasbeef): instead use closure height of channel
_, currentHeight, err := b.chainIO.GetBestBlock()
if err != nil {
return err
}
// Additionally, we'll also want to retrieve any pending close or force
// close transactions to we can properly mark them as resolved in the
// database.
@ -142,7 +151,9 @@ func (b *breachArbiter) Start() error {
chanPoint := &pendingClose.ChanPoint
closeTXID := &pendingClose.ClosingTXID
confNtfn, err := b.notifier.RegisterConfirmationsNtfn(closeTXID, 1)
confNtfn, err := b.notifier.RegisterConfirmationsNtfn(
closeTXID, 1, uint32(currentHeight),
)
if err != nil {
return err
}
@ -208,17 +219,27 @@ func (b *breachArbiter) contractObserver(activeChannels []*lnwallet.LightningCha
go b.breachObserver(channel, settleSignal)
}
// TODO(roasbeef): need to ensure currentHeight passed in doesn't
// result in lost notification
out:
for {
select {
case breachInfo := <-b.breachedContracts:
_, currentHeight, err := b.chainIO.GetBestBlock()
if err != nil {
brarLog.Errorf("unable to get best height: %v", err)
}
// A new channel contract has just been breached! We
// first register for a notification to be dispatched
// once the breach transaction (the revoked commitment
// transaction) has been confirmed in the chain to
// ensure we're not dealing with a moving target.
breachTXID := &breachInfo.commitHash
confChan, err := b.notifier.RegisterConfirmationsNtfn(breachTXID, 1)
confChan, err := b.notifier.RegisterConfirmationsNtfn(
breachTXID, 1, uint32(currentHeight),
)
if err != nil {
brarLog.Errorf("unable to register for conf for txid: %v",
breachTXID)
@ -336,6 +357,12 @@ func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent,
return spew.Sdump(justiceTx)
}))
_, currentHeight, err := b.chainIO.GetBestBlock()
if err != nil {
brarLog.Errorf("unable to get current height: %v", err)
return
}
// Finally, broadcast the transaction, finalizing the channels'
// retribution against the cheating counterparty.
if err := b.wallet.PublishTransaction(justiceTx); err != nil {
@ -349,7 +376,8 @@ func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent,
// notify the caller that initiated the retribution workflow that the
// deed has been done.
justiceTXID := justiceTx.TxHash()
confChan, err = b.notifier.RegisterConfirmationsNtfn(&justiceTXID, 1)
confChan, err = b.notifier.RegisterConfirmationsNtfn(&justiceTXID, 1,
uint32(currentHeight))
if err != nil {
brarLog.Errorf("unable to register for conf for txid: %v",
justiceTXID)
@ -427,14 +455,16 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel,
//
// TODO(roasbeef): also notify utxoNursery, might've had
// outbound HTLC's in flight
go waitForChanToClose(b.notifier, nil, chanPoint, closeInfo.SpenderTxHash, func() {
brarLog.Infof("Force closed ChannelPoint(%v) is "+
"fully closed, updating DB", chanPoint)
go waitForChanToClose(uint32(closeInfo.SpendingHeight), b.notifier,
nil, chanPoint, closeInfo.SpenderTxHash, func() {
if err := b.db.MarkChanFullyClosed(chanPoint); err != nil {
brarLog.Errorf("unable to mark chan as closed: %v", err)
}
})
brarLog.Infof("Force closed ChannelPoint(%v) is "+
"fully closed, updating DB", chanPoint)
if err := b.db.MarkChanFullyClosed(chanPoint); err != nil {
brarLog.Errorf("unable to mark chan as closed: %v", err)
}
})
// A read from this channel indicates that a channel breach has been
// detected! So we notify the main coordination goroutine with the

63
peer.go

@ -921,30 +921,39 @@ func (p *peer) handleLocalClose(req *closeLinkReq) {
},
}
_, bestHeight, err := p.server.bio.GetBestBlock()
if err != nil {
req.err <- err
return
}
// Finally, launch a goroutine which will request to be notified by the
// ChainNotifier once the closure transaction obtains a single
// confirmation.
notifier := p.server.chainNotifier
go waitForChanToClose(notifier, req.err, req.chanPoint, closingTxid, func() {
// First, we'll mark the database as being fully closed so
// we'll no longer watch for its ultimate closure upon startup.
err := p.server.chanDB.MarkChanFullyClosed(req.chanPoint)
if err != nil {
req.err <- err
return
}
go waitForChanToClose(uint32(bestHeight), notifier, req.err,
req.chanPoint, closingTxid, func() {
// Respond to the local subsystem which requested the channel
// closure.
req.updates <- &lnrpc.CloseStatusUpdate{
Update: &lnrpc.CloseStatusUpdate_ChanClose{
ChanClose: &lnrpc.ChannelCloseUpdate{
ClosingTxid: closingTxid[:],
Success: true,
// First, we'll mark the database as being fully closed
// so we'll no longer watch for its ultimate closure
// upon startup.
err := p.server.chanDB.MarkChanFullyClosed(req.chanPoint)
if err != nil {
req.err <- err
return
}
// Respond to the local subsystem which requested the
// channel closure.
req.updates <- &lnrpc.CloseStatusUpdate{
Update: &lnrpc.CloseStatusUpdate_ChanClose{
ChanClose: &lnrpc.ChannelCloseUpdate{
ClosingTxid: closingTxid[:],
Success: true,
},
},
},
}
})
}
})
}
// handleRemoteClose completes a request for cooperative channel closure
@ -978,6 +987,15 @@ func (p *peer) handleRemoteClose(req *lnwire.CloseRequest) {
newLogClosure(func() string {
return spew.Sdump(closeTx)
}))
if err != nil {
peerLog.Errorf("unable to get current height: %v", err)
return
}
_, bestHeight, err := p.server.bio.GetBestBlock()
if err != nil {
peerLog.Errorf("unable to get best height: %v", err)
}
// Finally, broadcast the closure transaction, to the network.
err = p.server.lnwallet.PublishTransaction(closeTx)
@ -1022,8 +1040,8 @@ func (p *peer) handleRemoteClose(req *lnwire.CloseRequest) {
// confirmation of the closing transaction, and mark the channel as
// such within the database (once it's confirmed").
notifier := p.server.chainNotifier
go waitForChanToClose(notifier, nil, chanPoint, &closeTxid,
func() {
go waitForChanToClose(uint32(bestHeight), notifier, nil, chanPoint,
&closeTxid, func() {
// Now that the closing transaction has been confirmed,
// we'll mark the database as being fully closed so now
// that we no longer watch for its ultimate closure
@ -1042,12 +1060,13 @@ func (p *peer) handleRemoteClose(req *lnwire.CloseRequest) {
// following actions: the channel point will be sent over the settleChan, and
// finally the callback will be executed. If any error is encountered within
// the function, then it will be sent over the errChan.
func waitForChanToClose(notifier chainntnfs.ChainNotifier,
func waitForChanToClose(bestHeight uint32, notifier chainntnfs.ChainNotifier,
errChan chan error, chanPoint *wire.OutPoint,
closingTxID *chainhash.Hash, cb func()) {
// TODO(roasbeef): add param for num needed confs
confNtfn, err := notifier.RegisterConfirmationsNtfn(closingTxID, 1)
confNtfn, err := notifier.RegisterConfirmationsNtfn(closingTxID, 1,
bestHeight)
if err != nil && errChan != nil {
errChan <- err
return

@ -502,6 +502,11 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest,
return err
}
_, bestHeight, err := r.server.bio.GetBestBlock()
if err != nil {
return err
}
// As we're force closing this channel, as a precaution, we'll
// ensure that the switch doesn't continue to see this channel
// as eligible for forwarding HTLC's. If the peer is online,
@ -537,18 +542,19 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest,
errChan = make(chan error, 1)
notifier := r.server.chainNotifier
go waitForChanToClose(notifier, errChan, chanPoint, closingTxid, func() {
// Respond to the local subsystem which requested the
// channel closure.
updateChan <- &lnrpc.CloseStatusUpdate{
Update: &lnrpc.CloseStatusUpdate_ChanClose{
ChanClose: &lnrpc.ChannelCloseUpdate{
ClosingTxid: closingTxid[:],
Success: true,
go waitForChanToClose(uint32(bestHeight), notifier, errChan, chanPoint,
closingTxid, func() {
// Respond to the local subsystem which
// requested the channel closure.
updateChan <- &lnrpc.CloseStatusUpdate{
Update: &lnrpc.CloseStatusUpdate_ChanClose{
ChanClose: &lnrpc.ChannelCloseUpdate{
ClosingTxid: closingTxid[:],
Success: true,
},
},
},
}
})
}
})
// TODO(roasbeef): utxo nursery marks as fully closed