Merge pull request #3198 from halseth/foreach-non-modify
Avoid modifying bucket during ForEach loop, and Cursor traversal
This commit is contained in:
commit
e65b8e3805
@ -2087,10 +2087,6 @@ func (c *OpenChannel) CloseChannel(summary *ChannelCloseSummary) error {
|
|||||||
// information stored within the revocation log.
|
// information stored within the revocation log.
|
||||||
logBucket := chanBucket.Bucket(revocationLogBucket)
|
logBucket := chanBucket.Bucket(revocationLogBucket)
|
||||||
if logBucket != nil {
|
if logBucket != nil {
|
||||||
err := wipeChannelLogEntries(logBucket)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
err = chanBucket.DeleteBucket(revocationLogBucket)
|
err = chanBucket.DeleteBucket(revocationLogBucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -2713,16 +2709,3 @@ func fetchChannelLogEntry(log *bbolt.Bucket,
|
|||||||
commitReader := bytes.NewReader(commitBytes)
|
commitReader := bytes.NewReader(commitBytes)
|
||||||
return deserializeChanCommit(commitReader)
|
return deserializeChanCommit(commitReader)
|
||||||
}
|
}
|
||||||
|
|
||||||
func wipeChannelLogEntries(log *bbolt.Bucket) error {
|
|
||||||
// TODO(roasbeef): comment
|
|
||||||
|
|
||||||
logCursor := log.Cursor()
|
|
||||||
for k, _ := logCursor.First(); k != nil; k, _ = logCursor.Next() {
|
|
||||||
if err := logCursor.Delete(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
@ -1061,6 +1061,9 @@ func (c *ChannelGraph) DisconnectBlockAtHeight(height uint32) ([]*ChannelEdgeInf
|
|||||||
|
|
||||||
// Scan from chanIDStart to chanIDEnd, deleting every
|
// Scan from chanIDStart to chanIDEnd, deleting every
|
||||||
// found edge.
|
// found edge.
|
||||||
|
// NOTE: we must delete the edges after the cursor loop, since
|
||||||
|
// modifying the bucket while traversing is not safe.
|
||||||
|
var keys [][]byte
|
||||||
cursor := edgeIndex.Cursor()
|
cursor := edgeIndex.Cursor()
|
||||||
for k, v := cursor.Seek(chanIDStart[:]); k != nil &&
|
for k, v := cursor.Seek(chanIDStart[:]); k != nil &&
|
||||||
bytes.Compare(k, chanIDEnd[:]) <= 0; k, v = cursor.Next() {
|
bytes.Compare(k, chanIDEnd[:]) <= 0; k, v = cursor.Next() {
|
||||||
@ -1070,6 +1073,12 @@ func (c *ChannelGraph) DisconnectBlockAtHeight(height uint32) ([]*ChannelEdgeInf
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
keys = append(keys, k)
|
||||||
|
removedChans = append(removedChans, &edgeInfo)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, k := range keys {
|
||||||
err = delChannelEdge(
|
err = delChannelEdge(
|
||||||
edges, edgeIndex, chanIndex, zombieIndex, nodes,
|
edges, edgeIndex, chanIndex, zombieIndex, nodes,
|
||||||
k, false,
|
k, false,
|
||||||
@ -1077,8 +1086,6 @@ func (c *ChannelGraph) DisconnectBlockAtHeight(height uint32) ([]*ChannelEdgeInf
|
|||||||
if err != nil && err != ErrEdgeNotFound {
|
if err != nil && err != ErrEdgeNotFound {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
removedChans = append(removedChans, &edgeInfo)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete all the entries in the prune log having a height
|
// Delete all the entries in the prune log having a height
|
||||||
@ -1099,10 +1106,18 @@ func (c *ChannelGraph) DisconnectBlockAtHeight(height uint32) ([]*ChannelEdgeInf
|
|||||||
var pruneKeyEnd [4]byte
|
var pruneKeyEnd [4]byte
|
||||||
byteOrder.PutUint32(pruneKeyEnd[:], math.MaxUint32)
|
byteOrder.PutUint32(pruneKeyEnd[:], math.MaxUint32)
|
||||||
|
|
||||||
|
// To avoid modifying the bucket while traversing, we delete
|
||||||
|
// the keys in a second loop.
|
||||||
|
var pruneKeys [][]byte
|
||||||
pruneCursor := pruneBucket.Cursor()
|
pruneCursor := pruneBucket.Cursor()
|
||||||
for k, _ := pruneCursor.Seek(pruneKeyStart[:]); k != nil &&
|
for k, _ := pruneCursor.Seek(pruneKeyStart[:]); k != nil &&
|
||||||
bytes.Compare(k, pruneKeyEnd[:]) <= 0; k, _ = pruneCursor.Next() {
|
bytes.Compare(k, pruneKeyEnd[:]) <= 0; k, _ = pruneCursor.Next() {
|
||||||
if err := pruneCursor.Delete(); err != nil {
|
|
||||||
|
pruneKeys = append(pruneKeys, k)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, k := range pruneKeys {
|
||||||
|
if err := pruneBucket.Delete(k); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -146,6 +146,11 @@ func migrateInvoiceTimeSeries(tx *bbolt.Tx) error {
|
|||||||
// Now that we have all the buckets we need, we'll run through each
|
// Now that we have all the buckets we need, we'll run through each
|
||||||
// invoice in the database, and update it to reflect the new format
|
// invoice in the database, and update it to reflect the new format
|
||||||
// expected post migration.
|
// expected post migration.
|
||||||
|
// NOTE: we store the converted invoices and put them back into the
|
||||||
|
// database after the loop, since modifying the bucket within the
|
||||||
|
// ForEach loop is not safe.
|
||||||
|
var invoicesKeys [][]byte
|
||||||
|
var invoicesValues [][]byte
|
||||||
err = invoices.ForEach(func(invoiceNum, invoiceBytes []byte) error {
|
err = invoices.ForEach(func(invoiceNum, invoiceBytes []byte) error {
|
||||||
// If this is a sub bucket, then we'll skip it.
|
// If this is a sub bucket, then we'll skip it.
|
||||||
if invoiceBytes == nil {
|
if invoiceBytes == nil {
|
||||||
@ -226,12 +231,25 @@ func migrateInvoiceTimeSeries(tx *bbolt.Tx) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return invoices.Put(invoiceNum, b.Bytes())
|
// Save the key and value pending update for after the ForEach
|
||||||
|
// is done.
|
||||||
|
invoicesKeys = append(invoicesKeys, invoiceNum)
|
||||||
|
invoicesValues = append(invoicesValues, b.Bytes())
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Now put the converted invoices into the DB.
|
||||||
|
for i := range invoicesKeys {
|
||||||
|
key := invoicesKeys[i]
|
||||||
|
value := invoicesValues[i]
|
||||||
|
if err := invoices.Put(key, value); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
log.Infof("Migration to invoice time series index complete!")
|
log.Infof("Migration to invoice time series index complete!")
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -249,6 +267,10 @@ func migrateInvoiceTimeSeriesOutgoingPayments(tx *bbolt.Tx) error {
|
|||||||
|
|
||||||
log.Infof("Migrating invoice database to new outgoing payment format")
|
log.Infof("Migrating invoice database to new outgoing payment format")
|
||||||
|
|
||||||
|
// We store the keys and values we want to modify since it is not safe
|
||||||
|
// to modify them directly within the ForEach loop.
|
||||||
|
var paymentKeys [][]byte
|
||||||
|
var paymentValues [][]byte
|
||||||
err := payBucket.ForEach(func(payID, paymentBytes []byte) error {
|
err := payBucket.ForEach(func(payID, paymentBytes []byte) error {
|
||||||
log.Tracef("Migrating payment %x", payID[:])
|
log.Tracef("Migrating payment %x", payID[:])
|
||||||
|
|
||||||
@ -290,17 +312,25 @@ func migrateInvoiceTimeSeriesOutgoingPayments(tx *bbolt.Tx) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Now that we know the modifications was successful, we'll
|
// Now that we know the modifications was successful, we'll
|
||||||
// write it back to disk in the new format.
|
// store it to our slice of keys and values, and write it back
|
||||||
if err := payBucket.Put(payID, paymentCopy); err != nil {
|
// to disk in the new format after the ForEach loop is over.
|
||||||
return err
|
paymentKeys = append(paymentKeys, payID)
|
||||||
}
|
paymentValues = append(paymentValues, paymentCopy)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Finally store the updated payments to the bucket.
|
||||||
|
for i := range paymentKeys {
|
||||||
|
key := paymentKeys[i]
|
||||||
|
value := paymentValues[i]
|
||||||
|
if err := payBucket.Put(key, value); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
log.Infof("Migration to outgoing payment invoices complete!")
|
log.Infof("Migration to outgoing payment invoices complete!")
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -587,6 +617,12 @@ func migrateOptionalChannelCloseSummaryFields(tx *bbolt.Tx) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
log.Info("Migrating to new closed channel format...")
|
log.Info("Migrating to new closed channel format...")
|
||||||
|
|
||||||
|
// We store the converted keys and values and put them back into the
|
||||||
|
// database after the loop, since modifying the bucket within the
|
||||||
|
// ForEach loop is not safe.
|
||||||
|
var closedChansKeys [][]byte
|
||||||
|
var closedChansValues [][]byte
|
||||||
err := closedChanBucket.ForEach(func(chanID, summary []byte) error {
|
err := closedChanBucket.ForEach(func(chanID, summary []byte) error {
|
||||||
r := bytes.NewReader(summary)
|
r := bytes.NewReader(summary)
|
||||||
|
|
||||||
@ -603,12 +639,26 @@ func migrateOptionalChannelCloseSummaryFields(tx *bbolt.Tx) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return closedChanBucket.Put(chanID, b.Bytes())
|
// Now that we know the modifications was successful, we'll
|
||||||
|
// Store the key and value to our slices, and write it back to
|
||||||
|
// disk in the new format after the ForEach loop is over.
|
||||||
|
closedChansKeys = append(closedChansKeys, chanID)
|
||||||
|
closedChansValues = append(closedChansValues, b.Bytes())
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to update closed channels: %v", err)
|
return fmt.Errorf("unable to update closed channels: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Now put the new format back into the DB.
|
||||||
|
for i := range closedChansKeys {
|
||||||
|
key := closedChansKeys[i]
|
||||||
|
value := closedChansValues[i]
|
||||||
|
if err := closedChanBucket.Put(key, value); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
log.Info("Migration to new closed channel format complete!")
|
log.Info("Migration to new closed channel format complete!")
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -853,20 +853,9 @@ func (b *boltArbitratorLog) WipeHistory() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Next, we'll delete any lingering contract state within the
|
// Next, we'll delete any lingering contract state within the
|
||||||
// contracts bucket, and the bucket itself once we're done
|
// contracts bucket by removing the bucket itself.
|
||||||
// clearing it out.
|
err = scopeBucket.DeleteBucket(contractsBucketKey)
|
||||||
contractBucket, err := scopeBucket.CreateBucketIfNotExists(
|
if err != nil && err != bbolt.ErrBucketNotFound {
|
||||||
contractsBucketKey,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := contractBucket.ForEach(func(resKey, _ []byte) error {
|
|
||||||
return contractBucket.Delete(resKey)
|
|
||||||
}); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := scopeBucket.DeleteBucket(contractsBucketKey); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -876,20 +865,10 @@ func (b *boltArbitratorLog) WipeHistory() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Before we delta the enclosing bucket itself, we'll delta any
|
// We'll delete any chain actions that are still stored by
|
||||||
// chain actions that are still stored.
|
// removing the enclosing bucket.
|
||||||
actionsBucket, err := scopeBucket.CreateBucketIfNotExists(
|
err = scopeBucket.DeleteBucket(actionsBucketKey)
|
||||||
actionsBucketKey,
|
if err != nil && err != bbolt.ErrBucketNotFound {
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := actionsBucket.ForEach(func(resKey, _ []byte) error {
|
|
||||||
return actionsBucket.Delete(resKey)
|
|
||||||
}); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := scopeBucket.DeleteBucket(actionsBucketKey); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user