From 4cdce1fc0a24d43d92ff0ac76cb10cdd706bb6d1 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht <conner@mit.edu> Date: Tue, 25 Jul 2017 18:19:44 -0700 Subject: [PATCH] breacharbiter+channeldb: resolves rebase conflicts --- breacharbiter.go | 31 ++++++++++-------- channeldb/channel.go | 76 +++++++++++++++++++++++++++++++------------- 2 files changed, 72 insertions(+), 35 deletions(-) diff --git a/breacharbiter.go b/breacharbiter.go index 12572955..1f6a5f64 100644 --- a/breacharbiter.go +++ b/breacharbiter.go @@ -40,9 +40,11 @@ var retributionBucket = []byte("ret") // TODO(roasbeef): closures in config for subsystem pointers to decouple? type breachArbiter struct { wallet *lnwallet.LightningWallet - notifier chainntnfs.ChainNotifier - htlcSwitch *htlcSwitch db *channeldb.DB + notifier chainntnfs.ChainNotifier + htlcSwitch *htlcswitch.Switch + chainIO lnwallet.BlockChainIO + estimator lnwallet.FeeEstimator retributionStore *retributionStore // breachObservers is a map which tracks all the active breach @@ -106,13 +108,20 @@ func (b *breachArbiter) Start() error { brarLog.Tracef("Starting breach arbiter") + // TODO(roasbeef): instead use closure height of channel + _, currentHeight, err := b.chainIO.GetBestBlock() + if err != nil { + return err + } + // We load any pending retributions from the database. For each retribution // we need to restart the retribution procedure to claim our just reward. - err := b.retributionStore.ForAll(func(ret *retribution) error { + err = b.retributionStore.ForAll(func(ret *retribution) error { // Register for a notification when the breach transaction is confirmed // on chain. breachTXID := &ret.commitHash - confChan, err := b.notifier.RegisterConfirmationsNtfn(breachTXID, 1) + confChan, err := b.notifier.RegisterConfirmationsNtfn(breachTXID, 1, + uint32(currentHeight)) if err != nil { brarLog.Errorf("unable to register for conf updates for txid: "+ "%v, err: %v", breachTXID, err) @@ -162,12 +171,6 @@ func (b *breachArbiter) Start() error { b.wg.Add(1) go b.contractObserver(channelsToWatch) - // TODO(roasbeef): instead use closure height of channel - _, currentHeight, err := b.chainIO.GetBestBlock() - if err != nil { - return err - } - // Additionally, we'll also want to retrieve any pending close or force // close transactions to we can properly mark them as resolved in the // database. @@ -534,7 +537,7 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel, // // NOTE: This MUST be run as a goroutine. func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent, - breachInfo *retributionInfo) { + breachInfo *retribution) { defer b.wg.Done() @@ -612,14 +615,14 @@ func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent, revokedFunds, totalFunds) // With the channel closed, mark it in the database as such. - err := b.db.MarkChanFullyClosed(&ret.chanPoint) + err := b.db.MarkChanFullyClosed(&breachInfo.chanPoint) if err != nil { brarLog.Errorf("unable to mark chan as closed: %v", err) } // Justice has been carried out; we can safely delete the retribution // info from the database. - err = b.retributionStore.Remove(&ret.chanPoint) + err = b.retributionStore.Remove(&breachInfo.chanPoint) if err != nil { brarLog.Errorf("unable to remove retribution from the db: %v", err) } @@ -788,6 +791,8 @@ type retribution struct { selfOutput *breachedOutput revokedOutput *breachedOutput htlcOutputs []*breachedOutput + + doneChan chan struct{} } // retributionStore handles persistence of retribution states to disk and is diff --git a/channeldb/channel.go b/channeldb/channel.go index b44375ca..47d63414 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -380,7 +380,7 @@ func (c *OpenChannel) fullSync(tx *bolt.Tx) error { return err } var b bytes.Buffer - if err := lnwire.WriteOutPoint(&b, c.ChanID); err != nil { + if err := writeOutpoint(&b, c.ChanID); err != nil { return err } if chanIndexBucket.Get(b.Bytes()) == nil { @@ -871,7 +871,7 @@ func (c *OpenChannel) CloseChannel(summary *ChannelCloseSummary) error { } var b bytes.Buffer - if err := lnwire.WriteOutPoint(&b, c.ChanID); err != nil { + if err := writeOutpoint(&b, c.ChanID); err != nil { return err } @@ -982,7 +982,7 @@ func serializeChannelCloseSummary(w io.Writer, cs *ChannelCloseSummary) error { return err } - if err := lnwire.WriteOutPoint(w, &cs.ChanPoint); err != nil { + if err := writeOutpoint(w, &cs.ChanPoint); err != nil { return err } if _, err := w.Write(cs.ClosingTXID[:]); err != nil { @@ -1037,7 +1037,7 @@ func deserializeCloseChannelSummary(r io.Reader) (*ChannelCloseSummary, error) { return nil, err } - if err := lnwire.ReadOutPoint(r, &c.ChanPoint); err != nil { + if err := readOutpoint(r, &c.ChanPoint); err != nil { return nil, err } if _, err := io.ReadFull(r, c.ClosingTXID[:]); err != nil { @@ -1245,7 +1245,7 @@ func putChanCapacity(openChanBucket *bolt.Bucket, channel *OpenChannel) error { scratch3 := make([]byte, 8) var b bytes.Buffer - if err := lnwire.WriteOutPoint(&b, channel.ChanID); err != nil { + if err := writeOutpoint(&b, channel.ChanID); err != nil { return err } @@ -1290,7 +1290,7 @@ func deleteChanCapacity(openChanBucket *bolt.Bucket, chanID []byte) error { func fetchChanCapacity(openChanBucket *bolt.Bucket, channel *OpenChannel) error { // A byte slice re-used to compute each key prefix below. var b bytes.Buffer - if err := lnwire.WriteOutPoint(&b, channel.ChanID); err != nil { + if err := writeOutpoint(&b, channel.ChanID); err != nil { return err } @@ -1317,7 +1317,7 @@ func putChanFeePerKw(openChanBucket *bolt.Bucket, channel *OpenChannel) error { byteOrder.PutUint64(scratch, uint64(channel.FeePerKw)) var b bytes.Buffer - if err := lnwire.WriteOutPoint(&b, channel.ChanID); err != nil { + if err := writeOutpoint(&b, channel.ChanID); err != nil { return err } @@ -1337,7 +1337,7 @@ func deleteChanMinFeePerKw(openChanBucket *bolt.Bucket, chanID []byte) error { func fetchChanMinFeePerKw(openChanBucket *bolt.Bucket, channel *OpenChannel) error { var b bytes.Buffer - if err := lnwire.WriteOutPoint(&b, channel.ChanID); err != nil { + if err := writeOutpoint(&b, channel.ChanID); err != nil { return err } @@ -1356,7 +1356,7 @@ func putChanNumUpdates(openChanBucket *bolt.Bucket, channel *OpenChannel) error byteOrder.PutUint64(scratch, channel.NumUpdates) var b bytes.Buffer - if err := lnwire.WriteOutPoint(&b, channel.ChanID); err != nil { + if err := writeOutpoint(&b, channel.ChanID); err != nil { return err } @@ -1376,7 +1376,7 @@ func deleteChanNumUpdates(openChanBucket *bolt.Bucket, chanID []byte) error { func fetchChanNumUpdates(openChanBucket *bolt.Bucket, channel *OpenChannel) error { var b bytes.Buffer - if err := lnwire.WriteOutPoint(&b, channel.ChanID); err != nil { + if err := writeOutpoint(&b, channel.ChanID); err != nil { return err } @@ -1395,7 +1395,7 @@ func putChanAmountsTransferred(openChanBucket *bolt.Bucket, channel *OpenChannel scratch2 := make([]byte, 8) var b bytes.Buffer - if err := lnwire.WriteOutPoint(&b, channel.ChanID); err != nil { + if err := writeOutpoint(&b, channel.ChanID); err != nil { return err } @@ -1428,7 +1428,7 @@ func deleteChanAmountsTransferred(openChanBucket *bolt.Bucket, chanID []byte) er func fetchChanAmountsTransferred(openChanBucket *bolt.Bucket, channel *OpenChannel) error { var b bytes.Buffer - if err := lnwire.WriteOutPoint(&b, channel.ChanID); err != nil { + if err := writeOutpoint(&b, channel.ChanID); err != nil { return err } @@ -1450,7 +1450,7 @@ func putChanIsPending(openChanBucket *bolt.Bucket, channel *OpenChannel) error { scratch := make([]byte, 2) var b bytes.Buffer - if err := lnwire.WriteOutPoint(&b, channel.ChanID); err != nil { + if err := writeOutpoint(&b, channel.ChanID); err != nil { return err } @@ -1476,7 +1476,7 @@ func deleteChanIsPending(openChanBucket *bolt.Bucket, chanID []byte) error { func fetchChanIsPending(openChanBucket *bolt.Bucket, channel *OpenChannel) error { var b bytes.Buffer - if err := lnwire.WriteOutPoint(&b, channel.ChanID); err != nil { + if err := writeOutpoint(&b, channel.ChanID); err != nil { return err } @@ -1541,7 +1541,7 @@ func deleteChanConfInfo(openChanBucket *bolt.Bucket, chanID []byte) error { func putChannelIDs(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error { // TODO(roasbeef): just pass in chanID everywhere for puts var b bytes.Buffer - if err := lnwire.WriteOutPoint(&b, channel.ChanID); err != nil { + if err := writeOutpoint(&b, channel.ChanID); err != nil { return err } @@ -1568,7 +1568,7 @@ func fetchChannelIDs(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error { b bytes.Buffer ) - if err = lnwire.WriteOutPoint(&b, channel.ChanID); err != nil { + if err = writeOutpoint(&b, channel.ChanID); err != nil { return err } @@ -1604,7 +1604,7 @@ func putChanCommitFee(openChanBucket *bolt.Bucket, channel *OpenChannel) error { func fetchChanCommitFee(openChanBucket *bolt.Bucket, channel *OpenChannel) error { var b bytes.Buffer - if err := lnwire.WriteOutpoint(&b, &channel.FundingOutpoint); err != nil { + if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil { return err } @@ -1628,7 +1628,7 @@ func deleteChanCommitFee(openChanBucket *bolt.Bucket, chanID []byte) error { func putChanCommitTxns(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error { var bc bytes.Buffer - if err := lnwire.WriteOutpoint(&bc, &channel.FundingOutpoint); err != nil { + if err := writeOutpoint(&bc, &channel.FundingOutpoint); err != nil { return err } txnsKey := make([]byte, len(commitTxnsKey)+bc.Len()) @@ -1658,7 +1658,7 @@ func deleteChanCommitTxns(nodeChanBucket *bolt.Bucket, chanID []byte) error { func fetchChanCommitTxns(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error { var bc bytes.Buffer var err error - if err = lnwire.WriteOutPoint(&bc, channel.ChanID); err != nil { + if err = writeOutpoint(&bc, channel.ChanID); err != nil { return err } txnsKey := make([]byte, len(commitTxnsKey)+bc.Len()) @@ -1745,7 +1745,7 @@ func putChanConfigs(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error { func fetchChanConfigs(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error { var bc bytes.Buffer - if err := lnwire.WriteOutPoint(&bc, channel.ChanID); err != nil { + if err := writeOutpoint(&bc, channel.ChanID); err != nil { return err } configKey := make([]byte, len(chanConfigPrefix)+len(bc.Bytes())) @@ -1885,7 +1885,7 @@ func deleteChanFundingInfo(nodeChanBucket *bolt.Bucket, chanID []byte) error { func fetchChanFundingInfo(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error { var b bytes.Buffer - if err := lnwire.WriteOutPoint(&b, channel.ChanID); err != nil { + if err := writeOutpoint(&b, channel.ChanID); err != nil { return err } fundTxnKey := make([]byte, len(fundingTxnKey)+b.Len()) @@ -1971,7 +1971,7 @@ func deleteChanRevocationState(nodeChanBucket *bolt.Bucket, chanID []byte) error func fetchChanRevocationState(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error { var b bytes.Buffer - if err := lnwire.WriteOutPoint(&b, channel.ChanID); err != nil { + if err := writeOutpoint(&b, channel.ChanID); err != nil { return err } preimageKey := make([]byte, len(revocationStateKey)+b.Len()) @@ -2316,6 +2316,38 @@ func wipeChannelLogEntries(log *bolt.Bucket, o *wire.OutPoint) error { return nil } +func writeOutpoint(w io.Writer, o *wire.OutPoint) error { + // TODO(roasbeef): make all scratch buffers on the stack + scratch := make([]byte, 4) + + // TODO(roasbeef): write raw 32 bytes instead of wasting the extra + // byte. + if err := wire.WriteVarBytes(w, 0, o.Hash[:]); err != nil { + return err + } + + byteOrder.PutUint32(scratch, o.Index) + _, err := w.Write(scratch) + return err +} + +func readOutpoint(r io.Reader, o *wire.OutPoint) error { + scratch := make([]byte, 4) + + txid, err := wire.ReadVarBytes(r, 0, 32, "prevout") + if err != nil { + return err + } + copy(o.Hash[:], txid) + + if _, err := r.Read(scratch); err != nil { + return err + } + o.Index = byteOrder.Uint32(scratch) + + return nil +} + func writeBool(w io.Writer, b bool) error { boolByte := byte(0x01) if !b {