breacharbiter+channeldb: resolves rebase conflicts
This commit is contained in:
parent
ce411f8e22
commit
4cdce1fc0a
@ -40,9 +40,11 @@ var retributionBucket = []byte("ret")
|
|||||||
// TODO(roasbeef): closures in config for subsystem pointers to decouple?
|
// TODO(roasbeef): closures in config for subsystem pointers to decouple?
|
||||||
type breachArbiter struct {
|
type breachArbiter struct {
|
||||||
wallet *lnwallet.LightningWallet
|
wallet *lnwallet.LightningWallet
|
||||||
notifier chainntnfs.ChainNotifier
|
|
||||||
htlcSwitch *htlcSwitch
|
|
||||||
db *channeldb.DB
|
db *channeldb.DB
|
||||||
|
notifier chainntnfs.ChainNotifier
|
||||||
|
htlcSwitch *htlcswitch.Switch
|
||||||
|
chainIO lnwallet.BlockChainIO
|
||||||
|
estimator lnwallet.FeeEstimator
|
||||||
retributionStore *retributionStore
|
retributionStore *retributionStore
|
||||||
|
|
||||||
// breachObservers is a map which tracks all the active breach
|
// breachObservers is a map which tracks all the active breach
|
||||||
@ -106,13 +108,20 @@ func (b *breachArbiter) Start() error {
|
|||||||
|
|
||||||
brarLog.Tracef("Starting breach arbiter")
|
brarLog.Tracef("Starting breach arbiter")
|
||||||
|
|
||||||
|
// TODO(roasbeef): instead use closure height of channel
|
||||||
|
_, currentHeight, err := b.chainIO.GetBestBlock()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// We load any pending retributions from the database. For each retribution
|
// We load any pending retributions from the database. For each retribution
|
||||||
// we need to restart the retribution procedure to claim our just reward.
|
// we need to restart the retribution procedure to claim our just reward.
|
||||||
err := b.retributionStore.ForAll(func(ret *retribution) error {
|
err = b.retributionStore.ForAll(func(ret *retribution) error {
|
||||||
// Register for a notification when the breach transaction is confirmed
|
// Register for a notification when the breach transaction is confirmed
|
||||||
// on chain.
|
// on chain.
|
||||||
breachTXID := &ret.commitHash
|
breachTXID := &ret.commitHash
|
||||||
confChan, err := b.notifier.RegisterConfirmationsNtfn(breachTXID, 1)
|
confChan, err := b.notifier.RegisterConfirmationsNtfn(breachTXID, 1,
|
||||||
|
uint32(currentHeight))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
brarLog.Errorf("unable to register for conf updates for txid: "+
|
brarLog.Errorf("unable to register for conf updates for txid: "+
|
||||||
"%v, err: %v", breachTXID, err)
|
"%v, err: %v", breachTXID, err)
|
||||||
@ -162,12 +171,6 @@ func (b *breachArbiter) Start() error {
|
|||||||
b.wg.Add(1)
|
b.wg.Add(1)
|
||||||
go b.contractObserver(channelsToWatch)
|
go b.contractObserver(channelsToWatch)
|
||||||
|
|
||||||
// TODO(roasbeef): instead use closure height of channel
|
|
||||||
_, currentHeight, err := b.chainIO.GetBestBlock()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Additionally, we'll also want to retrieve any pending close or force
|
// Additionally, we'll also want to retrieve any pending close or force
|
||||||
// close transactions to we can properly mark them as resolved in the
|
// close transactions to we can properly mark them as resolved in the
|
||||||
// database.
|
// database.
|
||||||
@ -534,7 +537,7 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel,
|
|||||||
//
|
//
|
||||||
// NOTE: This MUST be run as a goroutine.
|
// NOTE: This MUST be run as a goroutine.
|
||||||
func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent,
|
func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent,
|
||||||
breachInfo *retributionInfo) {
|
breachInfo *retribution) {
|
||||||
|
|
||||||
defer b.wg.Done()
|
defer b.wg.Done()
|
||||||
|
|
||||||
@ -612,14 +615,14 @@ func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent,
|
|||||||
revokedFunds, totalFunds)
|
revokedFunds, totalFunds)
|
||||||
|
|
||||||
// With the channel closed, mark it in the database as such.
|
// With the channel closed, mark it in the database as such.
|
||||||
err := b.db.MarkChanFullyClosed(&ret.chanPoint)
|
err := b.db.MarkChanFullyClosed(&breachInfo.chanPoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
brarLog.Errorf("unable to mark chan as closed: %v", err)
|
brarLog.Errorf("unable to mark chan as closed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Justice has been carried out; we can safely delete the retribution
|
// Justice has been carried out; we can safely delete the retribution
|
||||||
// info from the database.
|
// info from the database.
|
||||||
err = b.retributionStore.Remove(&ret.chanPoint)
|
err = b.retributionStore.Remove(&breachInfo.chanPoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
brarLog.Errorf("unable to remove retribution from the db: %v", err)
|
brarLog.Errorf("unable to remove retribution from the db: %v", err)
|
||||||
}
|
}
|
||||||
@ -788,6 +791,8 @@ type retribution struct {
|
|||||||
selfOutput *breachedOutput
|
selfOutput *breachedOutput
|
||||||
revokedOutput *breachedOutput
|
revokedOutput *breachedOutput
|
||||||
htlcOutputs []*breachedOutput
|
htlcOutputs []*breachedOutput
|
||||||
|
|
||||||
|
doneChan chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// retributionStore handles persistence of retribution states to disk and is
|
// retributionStore handles persistence of retribution states to disk and is
|
||||||
|
@ -380,7 +380,7 @@ func (c *OpenChannel) fullSync(tx *bolt.Tx) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
var b bytes.Buffer
|
var b bytes.Buffer
|
||||||
if err := lnwire.WriteOutPoint(&b, c.ChanID); err != nil {
|
if err := writeOutpoint(&b, c.ChanID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if chanIndexBucket.Get(b.Bytes()) == nil {
|
if chanIndexBucket.Get(b.Bytes()) == nil {
|
||||||
@ -871,7 +871,7 @@ func (c *OpenChannel) CloseChannel(summary *ChannelCloseSummary) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var b bytes.Buffer
|
var b bytes.Buffer
|
||||||
if err := lnwire.WriteOutPoint(&b, c.ChanID); err != nil {
|
if err := writeOutpoint(&b, c.ChanID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -982,7 +982,7 @@ func serializeChannelCloseSummary(w io.Writer, cs *ChannelCloseSummary) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := lnwire.WriteOutPoint(w, &cs.ChanPoint); err != nil {
|
if err := writeOutpoint(w, &cs.ChanPoint); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if _, err := w.Write(cs.ClosingTXID[:]); err != nil {
|
if _, err := w.Write(cs.ClosingTXID[:]); err != nil {
|
||||||
@ -1037,7 +1037,7 @@ func deserializeCloseChannelSummary(r io.Reader) (*ChannelCloseSummary, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := lnwire.ReadOutPoint(r, &c.ChanPoint); err != nil {
|
if err := readOutpoint(r, &c.ChanPoint); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if _, err := io.ReadFull(r, c.ClosingTXID[:]); err != nil {
|
if _, err := io.ReadFull(r, c.ClosingTXID[:]); err != nil {
|
||||||
@ -1245,7 +1245,7 @@ func putChanCapacity(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
|
|||||||
scratch3 := make([]byte, 8)
|
scratch3 := make([]byte, 8)
|
||||||
|
|
||||||
var b bytes.Buffer
|
var b bytes.Buffer
|
||||||
if err := lnwire.WriteOutPoint(&b, channel.ChanID); err != nil {
|
if err := writeOutpoint(&b, channel.ChanID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1290,7 +1290,7 @@ func deleteChanCapacity(openChanBucket *bolt.Bucket, chanID []byte) error {
|
|||||||
func fetchChanCapacity(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
|
func fetchChanCapacity(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
|
||||||
// A byte slice re-used to compute each key prefix below.
|
// A byte slice re-used to compute each key prefix below.
|
||||||
var b bytes.Buffer
|
var b bytes.Buffer
|
||||||
if err := lnwire.WriteOutPoint(&b, channel.ChanID); err != nil {
|
if err := writeOutpoint(&b, channel.ChanID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1317,7 +1317,7 @@ func putChanFeePerKw(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
|
|||||||
byteOrder.PutUint64(scratch, uint64(channel.FeePerKw))
|
byteOrder.PutUint64(scratch, uint64(channel.FeePerKw))
|
||||||
|
|
||||||
var b bytes.Buffer
|
var b bytes.Buffer
|
||||||
if err := lnwire.WriteOutPoint(&b, channel.ChanID); err != nil {
|
if err := writeOutpoint(&b, channel.ChanID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1337,7 +1337,7 @@ func deleteChanMinFeePerKw(openChanBucket *bolt.Bucket, chanID []byte) error {
|
|||||||
|
|
||||||
func fetchChanMinFeePerKw(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
|
func fetchChanMinFeePerKw(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
|
||||||
var b bytes.Buffer
|
var b bytes.Buffer
|
||||||
if err := lnwire.WriteOutPoint(&b, channel.ChanID); err != nil {
|
if err := writeOutpoint(&b, channel.ChanID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1356,7 +1356,7 @@ func putChanNumUpdates(openChanBucket *bolt.Bucket, channel *OpenChannel) error
|
|||||||
byteOrder.PutUint64(scratch, channel.NumUpdates)
|
byteOrder.PutUint64(scratch, channel.NumUpdates)
|
||||||
|
|
||||||
var b bytes.Buffer
|
var b bytes.Buffer
|
||||||
if err := lnwire.WriteOutPoint(&b, channel.ChanID); err != nil {
|
if err := writeOutpoint(&b, channel.ChanID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1376,7 +1376,7 @@ func deleteChanNumUpdates(openChanBucket *bolt.Bucket, chanID []byte) error {
|
|||||||
|
|
||||||
func fetchChanNumUpdates(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
|
func fetchChanNumUpdates(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
|
||||||
var b bytes.Buffer
|
var b bytes.Buffer
|
||||||
if err := lnwire.WriteOutPoint(&b, channel.ChanID); err != nil {
|
if err := writeOutpoint(&b, channel.ChanID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1395,7 +1395,7 @@ func putChanAmountsTransferred(openChanBucket *bolt.Bucket, channel *OpenChannel
|
|||||||
scratch2 := make([]byte, 8)
|
scratch2 := make([]byte, 8)
|
||||||
|
|
||||||
var b bytes.Buffer
|
var b bytes.Buffer
|
||||||
if err := lnwire.WriteOutPoint(&b, channel.ChanID); err != nil {
|
if err := writeOutpoint(&b, channel.ChanID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1428,7 +1428,7 @@ func deleteChanAmountsTransferred(openChanBucket *bolt.Bucket, chanID []byte) er
|
|||||||
|
|
||||||
func fetchChanAmountsTransferred(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
|
func fetchChanAmountsTransferred(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
|
||||||
var b bytes.Buffer
|
var b bytes.Buffer
|
||||||
if err := lnwire.WriteOutPoint(&b, channel.ChanID); err != nil {
|
if err := writeOutpoint(&b, channel.ChanID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1450,7 +1450,7 @@ func putChanIsPending(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
|
|||||||
scratch := make([]byte, 2)
|
scratch := make([]byte, 2)
|
||||||
|
|
||||||
var b bytes.Buffer
|
var b bytes.Buffer
|
||||||
if err := lnwire.WriteOutPoint(&b, channel.ChanID); err != nil {
|
if err := writeOutpoint(&b, channel.ChanID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1476,7 +1476,7 @@ func deleteChanIsPending(openChanBucket *bolt.Bucket, chanID []byte) error {
|
|||||||
|
|
||||||
func fetchChanIsPending(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
|
func fetchChanIsPending(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
|
||||||
var b bytes.Buffer
|
var b bytes.Buffer
|
||||||
if err := lnwire.WriteOutPoint(&b, channel.ChanID); err != nil {
|
if err := writeOutpoint(&b, channel.ChanID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1541,7 +1541,7 @@ func deleteChanConfInfo(openChanBucket *bolt.Bucket, chanID []byte) error {
|
|||||||
func putChannelIDs(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error {
|
func putChannelIDs(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error {
|
||||||
// TODO(roasbeef): just pass in chanID everywhere for puts
|
// TODO(roasbeef): just pass in chanID everywhere for puts
|
||||||
var b bytes.Buffer
|
var b bytes.Buffer
|
||||||
if err := lnwire.WriteOutPoint(&b, channel.ChanID); err != nil {
|
if err := writeOutpoint(&b, channel.ChanID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1568,7 +1568,7 @@ func fetchChannelIDs(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error {
|
|||||||
b bytes.Buffer
|
b bytes.Buffer
|
||||||
)
|
)
|
||||||
|
|
||||||
if err = lnwire.WriteOutPoint(&b, channel.ChanID); err != nil {
|
if err = writeOutpoint(&b, channel.ChanID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1604,7 +1604,7 @@ func putChanCommitFee(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
|
|||||||
|
|
||||||
func fetchChanCommitFee(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
|
func fetchChanCommitFee(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
|
||||||
var b bytes.Buffer
|
var b bytes.Buffer
|
||||||
if err := lnwire.WriteOutpoint(&b, &channel.FundingOutpoint); err != nil {
|
if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1628,7 +1628,7 @@ func deleteChanCommitFee(openChanBucket *bolt.Bucket, chanID []byte) error {
|
|||||||
|
|
||||||
func putChanCommitTxns(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error {
|
func putChanCommitTxns(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error {
|
||||||
var bc bytes.Buffer
|
var bc bytes.Buffer
|
||||||
if err := lnwire.WriteOutpoint(&bc, &channel.FundingOutpoint); err != nil {
|
if err := writeOutpoint(&bc, &channel.FundingOutpoint); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
txnsKey := make([]byte, len(commitTxnsKey)+bc.Len())
|
txnsKey := make([]byte, len(commitTxnsKey)+bc.Len())
|
||||||
@ -1658,7 +1658,7 @@ func deleteChanCommitTxns(nodeChanBucket *bolt.Bucket, chanID []byte) error {
|
|||||||
func fetchChanCommitTxns(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error {
|
func fetchChanCommitTxns(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error {
|
||||||
var bc bytes.Buffer
|
var bc bytes.Buffer
|
||||||
var err error
|
var err error
|
||||||
if err = lnwire.WriteOutPoint(&bc, channel.ChanID); err != nil {
|
if err = writeOutpoint(&bc, channel.ChanID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
txnsKey := make([]byte, len(commitTxnsKey)+bc.Len())
|
txnsKey := make([]byte, len(commitTxnsKey)+bc.Len())
|
||||||
@ -1745,7 +1745,7 @@ func putChanConfigs(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error {
|
|||||||
|
|
||||||
func fetchChanConfigs(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error {
|
func fetchChanConfigs(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error {
|
||||||
var bc bytes.Buffer
|
var bc bytes.Buffer
|
||||||
if err := lnwire.WriteOutPoint(&bc, channel.ChanID); err != nil {
|
if err := writeOutpoint(&bc, channel.ChanID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
configKey := make([]byte, len(chanConfigPrefix)+len(bc.Bytes()))
|
configKey := make([]byte, len(chanConfigPrefix)+len(bc.Bytes()))
|
||||||
@ -1885,7 +1885,7 @@ func deleteChanFundingInfo(nodeChanBucket *bolt.Bucket, chanID []byte) error {
|
|||||||
|
|
||||||
func fetchChanFundingInfo(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error {
|
func fetchChanFundingInfo(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error {
|
||||||
var b bytes.Buffer
|
var b bytes.Buffer
|
||||||
if err := lnwire.WriteOutPoint(&b, channel.ChanID); err != nil {
|
if err := writeOutpoint(&b, channel.ChanID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
fundTxnKey := make([]byte, len(fundingTxnKey)+b.Len())
|
fundTxnKey := make([]byte, len(fundingTxnKey)+b.Len())
|
||||||
@ -1971,7 +1971,7 @@ func deleteChanRevocationState(nodeChanBucket *bolt.Bucket, chanID []byte) error
|
|||||||
|
|
||||||
func fetchChanRevocationState(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error {
|
func fetchChanRevocationState(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error {
|
||||||
var b bytes.Buffer
|
var b bytes.Buffer
|
||||||
if err := lnwire.WriteOutPoint(&b, channel.ChanID); err != nil {
|
if err := writeOutpoint(&b, channel.ChanID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
preimageKey := make([]byte, len(revocationStateKey)+b.Len())
|
preimageKey := make([]byte, len(revocationStateKey)+b.Len())
|
||||||
@ -2316,6 +2316,38 @@ func wipeChannelLogEntries(log *bolt.Bucket, o *wire.OutPoint) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func writeOutpoint(w io.Writer, o *wire.OutPoint) error {
|
||||||
|
// TODO(roasbeef): make all scratch buffers on the stack
|
||||||
|
scratch := make([]byte, 4)
|
||||||
|
|
||||||
|
// TODO(roasbeef): write raw 32 bytes instead of wasting the extra
|
||||||
|
// byte.
|
||||||
|
if err := wire.WriteVarBytes(w, 0, o.Hash[:]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
byteOrder.PutUint32(scratch, o.Index)
|
||||||
|
_, err := w.Write(scratch)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func readOutpoint(r io.Reader, o *wire.OutPoint) error {
|
||||||
|
scratch := make([]byte, 4)
|
||||||
|
|
||||||
|
txid, err := wire.ReadVarBytes(r, 0, 32, "prevout")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
copy(o.Hash[:], txid)
|
||||||
|
|
||||||
|
if _, err := r.Read(scratch); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
o.Index = byteOrder.Uint32(scratch)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func writeBool(w io.Writer, b bool) error {
|
func writeBool(w io.Writer, b bool) error {
|
||||||
boolByte := byte(0x01)
|
boolByte := byte(0x01)
|
||||||
if !b {
|
if !b {
|
||||||
|
Loading…
Reference in New Issue
Block a user