channeldb: fix for Windows clock resolution
We use the event timestamp of a forwarding event as its primary storage key. On systems with a bad clock resolution this can lead to collisions of the events if some of the timestamps are identical. We fix this problem by shifting the timestamps on the nanosecond level until only unique values remain.
This commit is contained in:
parent
b21b2ebd6f
commit
97c73706b5
@ -6,6 +6,7 @@ import (
|
|||||||
"sort"
|
"sort"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/btcsuite/btcwallet/walletdb"
|
||||||
"github.com/lightningnetwork/lnd/channeldb/kvdb"
|
"github.com/lightningnetwork/lnd/channeldb/kvdb"
|
||||||
"github.com/lightningnetwork/lnd/lnwire"
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
)
|
)
|
||||||
@ -104,10 +105,9 @@ func decodeForwardingEvent(r io.Reader, f *ForwardingEvent) error {
|
|||||||
func (f *ForwardingLog) AddForwardingEvents(events []ForwardingEvent) error {
|
func (f *ForwardingLog) AddForwardingEvents(events []ForwardingEvent) error {
|
||||||
// Before we create the database transaction, we'll ensure that the set
|
// Before we create the database transaction, we'll ensure that the set
|
||||||
// of forwarding events are properly sorted according to their
|
// of forwarding events are properly sorted according to their
|
||||||
// timestamp.
|
// timestamp and that no duplicate timestamps exist to avoid collisions
|
||||||
sort.Slice(events, func(i, j int) bool {
|
// in the key we are going to store the events under.
|
||||||
return events[i].Timestamp.Before(events[j].Timestamp)
|
makeUniqueTimestamps(events)
|
||||||
})
|
|
||||||
|
|
||||||
var timestamp [8]byte
|
var timestamp [8]byte
|
||||||
|
|
||||||
@ -124,22 +124,7 @@ func (f *ForwardingLog) AddForwardingEvents(events []ForwardingEvent) error {
|
|||||||
// With the bucket obtained, we can now begin to write out the
|
// With the bucket obtained, we can now begin to write out the
|
||||||
// series of events.
|
// series of events.
|
||||||
for _, event := range events {
|
for _, event := range events {
|
||||||
var eventBytes [forwardingEventSize]byte
|
err := storeEvent(logBucket, event, timestamp[:])
|
||||||
eventBuf := bytes.NewBuffer(eventBytes[0:0:forwardingEventSize])
|
|
||||||
|
|
||||||
// First, we'll serialize this timestamp into our
|
|
||||||
// timestamp buffer.
|
|
||||||
byteOrder.PutUint64(
|
|
||||||
timestamp[:], uint64(event.Timestamp.UnixNano()),
|
|
||||||
)
|
|
||||||
|
|
||||||
// With the key encoded, we'll then encode the event
|
|
||||||
// into our buffer, then write it out to disk.
|
|
||||||
err := encodeForwardingEvent(eventBuf, &event)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
err = logBucket.Put(timestamp[:], eventBuf.Bytes())
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -149,6 +134,55 @@ func (f *ForwardingLog) AddForwardingEvents(events []ForwardingEvent) error {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// storeEvent tries to store a forwarding event into the given bucket by trying
|
||||||
|
// to avoid collisions. If a key for the event timestamp already exists in the
|
||||||
|
// database, the timestamp is incremented in nanosecond intervals until a "free"
|
||||||
|
// slot is found.
|
||||||
|
func storeEvent(bucket walletdb.ReadWriteBucket, event ForwardingEvent,
|
||||||
|
timestampScratchSpace []byte) error {
|
||||||
|
|
||||||
|
// First, we'll serialize this timestamp into our
|
||||||
|
// timestamp buffer.
|
||||||
|
byteOrder.PutUint64(
|
||||||
|
timestampScratchSpace, uint64(event.Timestamp.UnixNano()),
|
||||||
|
)
|
||||||
|
|
||||||
|
// Next we'll loop until we find a "free" slot in the bucket to store
|
||||||
|
// the event under. This should almost never happen unless we're running
|
||||||
|
// on a system that has a very bad system clock that doesn't properly
|
||||||
|
// resolve to nanosecond scale. We try up to 100 times (which would come
|
||||||
|
// to a maximum shift of 0.1 microsecond which is acceptable for most
|
||||||
|
// use cases). If we don't find a free slot, we just give up and let
|
||||||
|
// the collision happen. Something must be wrong with the data in that
|
||||||
|
// case, even on a very fast machine forwarding payments _will_ take a
|
||||||
|
// few microseconds at least so we should find a nanosecond slot
|
||||||
|
// somewhere.
|
||||||
|
const maxTries = 100
|
||||||
|
tries := 0
|
||||||
|
for tries < maxTries {
|
||||||
|
val := bucket.Get(timestampScratchSpace)
|
||||||
|
if val == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// Collision, try the next nanosecond timestamp.
|
||||||
|
nextNano := event.Timestamp.UnixNano() + 1
|
||||||
|
event.Timestamp = time.Unix(0, nextNano)
|
||||||
|
byteOrder.PutUint64(timestampScratchSpace, uint64(nextNano))
|
||||||
|
tries++
|
||||||
|
}
|
||||||
|
|
||||||
|
// With the key encoded, we'll then encode the event
|
||||||
|
// into our buffer, then write it out to disk.
|
||||||
|
var eventBytes [forwardingEventSize]byte
|
||||||
|
eventBuf := bytes.NewBuffer(eventBytes[0:0:forwardingEventSize])
|
||||||
|
err := encodeForwardingEvent(eventBuf, &event)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return bucket.Put(timestampScratchSpace, eventBuf.Bytes())
|
||||||
|
}
|
||||||
|
|
||||||
// ForwardingEventQuery represents a query to the forwarding log payment
|
// ForwardingEventQuery represents a query to the forwarding log payment
|
||||||
// circuit time series database. The query allows a caller to retrieve all
|
// circuit time series database. The query allows a caller to retrieve all
|
||||||
// records for a particular time slice, offset in that time slice, limiting the
|
// records for a particular time slice, offset in that time slice, limiting the
|
||||||
@ -272,3 +306,34 @@ func (f *ForwardingLog) Query(q ForwardingEventQuery) (ForwardingLogTimeSlice, e
|
|||||||
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// makeUniqueTimestamps takes a slice of forwarding events, sorts it by the
|
||||||
|
// event timestamps and then makes sure there are no duplicates in the
|
||||||
|
// timestamps. If duplicates are found, some of the timestamps are increased on
|
||||||
|
// the nanosecond scale until only unique values remain. This is a fix to
|
||||||
|
// address the problem that in some environments (looking at you, Windows) the
|
||||||
|
// system clock has such a bad resolution that two serial invocations of
|
||||||
|
// time.Now() might return the same timestamp, even if some time has elapsed
|
||||||
|
// between the calls.
|
||||||
|
func makeUniqueTimestamps(events []ForwardingEvent) {
|
||||||
|
sort.Slice(events, func(i, j int) bool {
|
||||||
|
return events[i].Timestamp.Before(events[j].Timestamp)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Now that we know the events are sorted by timestamp, we can go
|
||||||
|
// through the list and fix all duplicates until only unique values
|
||||||
|
// remain.
|
||||||
|
for outer := 0; outer < len(events)-1; outer++ {
|
||||||
|
current := events[outer].Timestamp.UnixNano()
|
||||||
|
next := events[outer+1].Timestamp.UnixNano()
|
||||||
|
|
||||||
|
// We initially sorted the slice. So if the current is now
|
||||||
|
// greater or equal to the next one, it's either because it's a
|
||||||
|
// duplicate or because we increased the current in the last
|
||||||
|
// iteration.
|
||||||
|
if current >= next {
|
||||||
|
next = current + 1
|
||||||
|
events[outer+1].Timestamp = time.Unix(0, next)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -4,11 +4,11 @@ import (
|
|||||||
"math/rand"
|
"math/rand"
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/davecgh/go-spew/spew"
|
"github.com/davecgh/go-spew/spew"
|
||||||
"github.com/lightningnetwork/lnd/lnwire"
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// TestForwardingLogBasicStorageAndQuery tests that we're able to store and
|
// TestForwardingLogBasicStorageAndQuery tests that we're able to store and
|
||||||
@ -20,10 +20,11 @@ func TestForwardingLogBasicStorageAndQuery(t *testing.T) {
|
|||||||
// forwarding event log that we'll be using for the duration of the
|
// forwarding event log that we'll be using for the duration of the
|
||||||
// test.
|
// test.
|
||||||
db, cleanUp, err := MakeTestDB()
|
db, cleanUp, err := MakeTestDB()
|
||||||
defer cleanUp()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unable to make test db: %v", err)
|
t.Fatalf("unable to make test db: %v", err)
|
||||||
}
|
}
|
||||||
|
defer cleanUp()
|
||||||
|
|
||||||
log := ForwardingLog{
|
log := ForwardingLog{
|
||||||
db: db,
|
db: db,
|
||||||
}
|
}
|
||||||
@ -92,10 +93,11 @@ func TestForwardingLogQueryOptions(t *testing.T) {
|
|||||||
// forwarding event log that we'll be using for the duration of the
|
// forwarding event log that we'll be using for the duration of the
|
||||||
// test.
|
// test.
|
||||||
db, cleanUp, err := MakeTestDB()
|
db, cleanUp, err := MakeTestDB()
|
||||||
defer cleanUp()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unable to make test db: %v", err)
|
t.Fatalf("unable to make test db: %v", err)
|
||||||
}
|
}
|
||||||
|
defer cleanUp()
|
||||||
|
|
||||||
log := ForwardingLog{
|
log := ForwardingLog{
|
||||||
db: db,
|
db: db,
|
||||||
}
|
}
|
||||||
@ -197,10 +199,11 @@ func TestForwardingLogQueryLimit(t *testing.T) {
|
|||||||
// forwarding event log that we'll be using for the duration of the
|
// forwarding event log that we'll be using for the duration of the
|
||||||
// test.
|
// test.
|
||||||
db, cleanUp, err := MakeTestDB()
|
db, cleanUp, err := MakeTestDB()
|
||||||
defer cleanUp()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unable to make test db: %v", err)
|
t.Fatalf("unable to make test db: %v", err)
|
||||||
}
|
}
|
||||||
|
defer cleanUp()
|
||||||
|
|
||||||
log := ForwardingLog{
|
log := ForwardingLog{
|
||||||
db: db,
|
db: db,
|
||||||
}
|
}
|
||||||
@ -263,3 +266,118 @@ func TestForwardingLogQueryLimit(t *testing.T) {
|
|||||||
timeSlice.LastIndexOffset)
|
timeSlice.LastIndexOffset)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestForwardingLogMakeUniqueTimestamps makes sure the function that creates
|
||||||
|
// unique timestamps does it job correctly.
|
||||||
|
func TestForwardingLogMakeUniqueTimestamps(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
// Create a list of events where some of the timestamps collide. We
|
||||||
|
// expect no existing timestamp to be overwritten, instead the "gaps"
|
||||||
|
// between them should be filled.
|
||||||
|
inputSlice := []ForwardingEvent{
|
||||||
|
{Timestamp: time.Unix(0, 1001)},
|
||||||
|
{Timestamp: time.Unix(0, 2001)},
|
||||||
|
{Timestamp: time.Unix(0, 1001)},
|
||||||
|
{Timestamp: time.Unix(0, 1002)},
|
||||||
|
{Timestamp: time.Unix(0, 1004)},
|
||||||
|
{Timestamp: time.Unix(0, 1004)},
|
||||||
|
{Timestamp: time.Unix(0, 1007)},
|
||||||
|
{Timestamp: time.Unix(0, 1001)},
|
||||||
|
}
|
||||||
|
expectedSlice := []ForwardingEvent{
|
||||||
|
{Timestamp: time.Unix(0, 1001)},
|
||||||
|
{Timestamp: time.Unix(0, 1002)},
|
||||||
|
{Timestamp: time.Unix(0, 1003)},
|
||||||
|
{Timestamp: time.Unix(0, 1004)},
|
||||||
|
{Timestamp: time.Unix(0, 1005)},
|
||||||
|
{Timestamp: time.Unix(0, 1006)},
|
||||||
|
{Timestamp: time.Unix(0, 1007)},
|
||||||
|
{Timestamp: time.Unix(0, 2001)},
|
||||||
|
}
|
||||||
|
|
||||||
|
makeUniqueTimestamps(inputSlice)
|
||||||
|
|
||||||
|
for idx, in := range inputSlice {
|
||||||
|
expect := expectedSlice[idx]
|
||||||
|
assert.Equal(
|
||||||
|
t, expect.Timestamp.UnixNano(), in.Timestamp.UnixNano(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestForwardingLogStoreEvent makes sure forwarding events are stored without
|
||||||
|
// colliding on duplicate timestamps.
|
||||||
|
func TestForwardingLogStoreEvent(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
// First, we'll set up a test database, and use that to instantiate the
|
||||||
|
// forwarding event log that we'll be using for the duration of the
|
||||||
|
// test.
|
||||||
|
db, cleanUp, err := MakeTestDB()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to make test db: %v", err)
|
||||||
|
}
|
||||||
|
defer cleanUp()
|
||||||
|
|
||||||
|
log := ForwardingLog{
|
||||||
|
db: db,
|
||||||
|
}
|
||||||
|
|
||||||
|
// We'll create 20 random events, with each event having a timestamp
|
||||||
|
// with just one nanosecond apart.
|
||||||
|
numEvents := 20
|
||||||
|
events := make([]ForwardingEvent, numEvents)
|
||||||
|
ts := time.Now().UnixNano()
|
||||||
|
for i := 0; i < numEvents; i++ {
|
||||||
|
events[i] = ForwardingEvent{
|
||||||
|
Timestamp: time.Unix(0, ts+int64(i)),
|
||||||
|
IncomingChanID: lnwire.NewShortChanIDFromInt(uint64(rand.Int63())),
|
||||||
|
OutgoingChanID: lnwire.NewShortChanIDFromInt(uint64(rand.Int63())),
|
||||||
|
AmtIn: lnwire.MilliSatoshi(rand.Int63()),
|
||||||
|
AmtOut: lnwire.MilliSatoshi(rand.Int63()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now that all of our events are constructed, we'll add them to the
|
||||||
|
// database in a batched manner.
|
||||||
|
if err := log.AddForwardingEvents(events); err != nil {
|
||||||
|
t.Fatalf("unable to add events: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Because timestamps are de-duplicated when adding them in a single
|
||||||
|
// batch before they even hit the DB, we add the same events again but
|
||||||
|
// in a new batch. They now have to be de-duplicated on the DB level.
|
||||||
|
if err := log.AddForwardingEvents(events); err != nil {
|
||||||
|
t.Fatalf("unable to add second batch of events: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// With all of our events added, we should be able to query for all
|
||||||
|
// events with a range of just 40 nanoseconds (2 times 20 events, all
|
||||||
|
// spaced one nanosecond apart).
|
||||||
|
eventQuery := ForwardingEventQuery{
|
||||||
|
StartTime: time.Unix(0, ts),
|
||||||
|
EndTime: time.Unix(0, ts+int64(numEvents*2)),
|
||||||
|
IndexOffset: 0,
|
||||||
|
NumMaxEvents: uint32(numEvents * 3),
|
||||||
|
}
|
||||||
|
timeSlice, err := log.Query(eventQuery)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to query for events: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// We should get exactly 40 events back.
|
||||||
|
if len(timeSlice.ForwardingEvents) != numEvents*2 {
|
||||||
|
t.Fatalf("wrong number of events: expected %v, got %v",
|
||||||
|
numEvents*2, len(timeSlice.ForwardingEvents))
|
||||||
|
}
|
||||||
|
|
||||||
|
// The timestamps should be spaced out evenly and in order.
|
||||||
|
for i := 0; i < numEvents*2; i++ {
|
||||||
|
eventTs := timeSlice.ForwardingEvents[i].Timestamp.UnixNano()
|
||||||
|
if eventTs != ts+int64(i) {
|
||||||
|
t.Fatalf("unexpected timestamp of event %d: expected "+
|
||||||
|
"%d, got %d", i, ts+int64(i), eventTs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user