diff --git a/channeldb/forwarding_log.go b/channeldb/forwarding_log.go index a52848dd..d1216dc4 100644 --- a/channeldb/forwarding_log.go +++ b/channeldb/forwarding_log.go @@ -6,6 +6,7 @@ import ( "sort" "time" + "github.com/btcsuite/btcwallet/walletdb" "github.com/lightningnetwork/lnd/channeldb/kvdb" "github.com/lightningnetwork/lnd/lnwire" ) @@ -104,10 +105,9 @@ func decodeForwardingEvent(r io.Reader, f *ForwardingEvent) error { func (f *ForwardingLog) AddForwardingEvents(events []ForwardingEvent) error { // Before we create the database transaction, we'll ensure that the set // of forwarding events are properly sorted according to their - // timestamp. - sort.Slice(events, func(i, j int) bool { - return events[i].Timestamp.Before(events[j].Timestamp) - }) + // timestamp and that no duplicate timestamps exist to avoid collisions + // in the key we are going to store the events under. + makeUniqueTimestamps(events) var timestamp [8]byte @@ -124,22 +124,7 @@ func (f *ForwardingLog) AddForwardingEvents(events []ForwardingEvent) error { // With the bucket obtained, we can now begin to write out the // series of events. for _, event := range events { - var eventBytes [forwardingEventSize]byte - eventBuf := bytes.NewBuffer(eventBytes[0:0:forwardingEventSize]) - - // First, we'll serialize this timestamp into our - // timestamp buffer. - byteOrder.PutUint64( - timestamp[:], uint64(event.Timestamp.UnixNano()), - ) - - // With the key encoded, we'll then encode the event - // into our buffer, then write it out to disk. - err := encodeForwardingEvent(eventBuf, &event) - if err != nil { - return err - } - err = logBucket.Put(timestamp[:], eventBuf.Bytes()) + err := storeEvent(logBucket, event, timestamp[:]) if err != nil { return err } @@ -149,6 +134,55 @@ func (f *ForwardingLog) AddForwardingEvents(events []ForwardingEvent) error { }) } +// storeEvent tries to store a forwarding event into the given bucket by trying +// to avoid collisions. If a key for the event timestamp already exists in the +// database, the timestamp is incremented in nanosecond intervals until a "free" +// slot is found. +func storeEvent(bucket walletdb.ReadWriteBucket, event ForwardingEvent, + timestampScratchSpace []byte) error { + + // First, we'll serialize this timestamp into our + // timestamp buffer. + byteOrder.PutUint64( + timestampScratchSpace, uint64(event.Timestamp.UnixNano()), + ) + + // Next we'll loop until we find a "free" slot in the bucket to store + // the event under. This should almost never happen unless we're running + // on a system that has a very bad system clock that doesn't properly + // resolve to nanosecond scale. We try up to 100 times (which would come + // to a maximum shift of 0.1 microsecond which is acceptable for most + // use cases). If we don't find a free slot, we just give up and let + // the collision happen. Something must be wrong with the data in that + // case, even on a very fast machine forwarding payments _will_ take a + // few microseconds at least so we should find a nanosecond slot + // somewhere. + const maxTries = 100 + tries := 0 + for tries < maxTries { + val := bucket.Get(timestampScratchSpace) + if val == nil { + break + } + + // Collision, try the next nanosecond timestamp. + nextNano := event.Timestamp.UnixNano() + 1 + event.Timestamp = time.Unix(0, nextNano) + byteOrder.PutUint64(timestampScratchSpace, uint64(nextNano)) + tries++ + } + + // With the key encoded, we'll then encode the event + // into our buffer, then write it out to disk. + var eventBytes [forwardingEventSize]byte + eventBuf := bytes.NewBuffer(eventBytes[0:0:forwardingEventSize]) + err := encodeForwardingEvent(eventBuf, &event) + if err != nil { + return err + } + return bucket.Put(timestampScratchSpace, eventBuf.Bytes()) +} + // ForwardingEventQuery represents a query to the forwarding log payment // circuit time series database. The query allows a caller to retrieve all // records for a particular time slice, offset in that time slice, limiting the @@ -272,3 +306,34 @@ func (f *ForwardingLog) Query(q ForwardingEventQuery) (ForwardingLogTimeSlice, e return resp, nil } + +// makeUniqueTimestamps takes a slice of forwarding events, sorts it by the +// event timestamps and then makes sure there are no duplicates in the +// timestamps. If duplicates are found, some of the timestamps are increased on +// the nanosecond scale until only unique values remain. This is a fix to +// address the problem that in some environments (looking at you, Windows) the +// system clock has such a bad resolution that two serial invocations of +// time.Now() might return the same timestamp, even if some time has elapsed +// between the calls. +func makeUniqueTimestamps(events []ForwardingEvent) { + sort.Slice(events, func(i, j int) bool { + return events[i].Timestamp.Before(events[j].Timestamp) + }) + + // Now that we know the events are sorted by timestamp, we can go + // through the list and fix all duplicates until only unique values + // remain. + for outer := 0; outer < len(events)-1; outer++ { + current := events[outer].Timestamp.UnixNano() + next := events[outer+1].Timestamp.UnixNano() + + // We initially sorted the slice. So if the current is now + // greater or equal to the next one, it's either because it's a + // duplicate or because we increased the current in the last + // iteration. + if current >= next { + next = current + 1 + events[outer+1].Timestamp = time.Unix(0, next) + } + } +} diff --git a/channeldb/forwarding_log_test.go b/channeldb/forwarding_log_test.go index 07dfc902..cd21f12e 100644 --- a/channeldb/forwarding_log_test.go +++ b/channeldb/forwarding_log_test.go @@ -4,11 +4,11 @@ import ( "math/rand" "reflect" "testing" + "time" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/lnwire" - - "time" + "github.com/stretchr/testify/assert" ) // TestForwardingLogBasicStorageAndQuery tests that we're able to store and @@ -20,10 +20,11 @@ func TestForwardingLogBasicStorageAndQuery(t *testing.T) { // forwarding event log that we'll be using for the duration of the // test. db, cleanUp, err := MakeTestDB() - defer cleanUp() if err != nil { t.Fatalf("unable to make test db: %v", err) } + defer cleanUp() + log := ForwardingLog{ db: db, } @@ -92,10 +93,11 @@ func TestForwardingLogQueryOptions(t *testing.T) { // forwarding event log that we'll be using for the duration of the // test. db, cleanUp, err := MakeTestDB() - defer cleanUp() if err != nil { t.Fatalf("unable to make test db: %v", err) } + defer cleanUp() + log := ForwardingLog{ db: db, } @@ -197,10 +199,11 @@ func TestForwardingLogQueryLimit(t *testing.T) { // forwarding event log that we'll be using for the duration of the // test. db, cleanUp, err := MakeTestDB() - defer cleanUp() if err != nil { t.Fatalf("unable to make test db: %v", err) } + defer cleanUp() + log := ForwardingLog{ db: db, } @@ -263,3 +266,118 @@ func TestForwardingLogQueryLimit(t *testing.T) { timeSlice.LastIndexOffset) } } + +// TestForwardingLogMakeUniqueTimestamps makes sure the function that creates +// unique timestamps does it job correctly. +func TestForwardingLogMakeUniqueTimestamps(t *testing.T) { + t.Parallel() + + // Create a list of events where some of the timestamps collide. We + // expect no existing timestamp to be overwritten, instead the "gaps" + // between them should be filled. + inputSlice := []ForwardingEvent{ + {Timestamp: time.Unix(0, 1001)}, + {Timestamp: time.Unix(0, 2001)}, + {Timestamp: time.Unix(0, 1001)}, + {Timestamp: time.Unix(0, 1002)}, + {Timestamp: time.Unix(0, 1004)}, + {Timestamp: time.Unix(0, 1004)}, + {Timestamp: time.Unix(0, 1007)}, + {Timestamp: time.Unix(0, 1001)}, + } + expectedSlice := []ForwardingEvent{ + {Timestamp: time.Unix(0, 1001)}, + {Timestamp: time.Unix(0, 1002)}, + {Timestamp: time.Unix(0, 1003)}, + {Timestamp: time.Unix(0, 1004)}, + {Timestamp: time.Unix(0, 1005)}, + {Timestamp: time.Unix(0, 1006)}, + {Timestamp: time.Unix(0, 1007)}, + {Timestamp: time.Unix(0, 2001)}, + } + + makeUniqueTimestamps(inputSlice) + + for idx, in := range inputSlice { + expect := expectedSlice[idx] + assert.Equal( + t, expect.Timestamp.UnixNano(), in.Timestamp.UnixNano(), + ) + } +} + +// TestForwardingLogStoreEvent makes sure forwarding events are stored without +// colliding on duplicate timestamps. +func TestForwardingLogStoreEvent(t *testing.T) { + t.Parallel() + + // First, we'll set up a test database, and use that to instantiate the + // forwarding event log that we'll be using for the duration of the + // test. + db, cleanUp, err := MakeTestDB() + if err != nil { + t.Fatalf("unable to make test db: %v", err) + } + defer cleanUp() + + log := ForwardingLog{ + db: db, + } + + // We'll create 20 random events, with each event having a timestamp + // with just one nanosecond apart. + numEvents := 20 + events := make([]ForwardingEvent, numEvents) + ts := time.Now().UnixNano() + for i := 0; i < numEvents; i++ { + events[i] = ForwardingEvent{ + Timestamp: time.Unix(0, ts+int64(i)), + IncomingChanID: lnwire.NewShortChanIDFromInt(uint64(rand.Int63())), + OutgoingChanID: lnwire.NewShortChanIDFromInt(uint64(rand.Int63())), + AmtIn: lnwire.MilliSatoshi(rand.Int63()), + AmtOut: lnwire.MilliSatoshi(rand.Int63()), + } + } + + // Now that all of our events are constructed, we'll add them to the + // database in a batched manner. + if err := log.AddForwardingEvents(events); err != nil { + t.Fatalf("unable to add events: %v", err) + } + + // Because timestamps are de-duplicated when adding them in a single + // batch before they even hit the DB, we add the same events again but + // in a new batch. They now have to be de-duplicated on the DB level. + if err := log.AddForwardingEvents(events); err != nil { + t.Fatalf("unable to add second batch of events: %v", err) + } + + // With all of our events added, we should be able to query for all + // events with a range of just 40 nanoseconds (2 times 20 events, all + // spaced one nanosecond apart). + eventQuery := ForwardingEventQuery{ + StartTime: time.Unix(0, ts), + EndTime: time.Unix(0, ts+int64(numEvents*2)), + IndexOffset: 0, + NumMaxEvents: uint32(numEvents * 3), + } + timeSlice, err := log.Query(eventQuery) + if err != nil { + t.Fatalf("unable to query for events: %v", err) + } + + // We should get exactly 40 events back. + if len(timeSlice.ForwardingEvents) != numEvents*2 { + t.Fatalf("wrong number of events: expected %v, got %v", + numEvents*2, len(timeSlice.ForwardingEvents)) + } + + // The timestamps should be spaced out evenly and in order. + for i := 0; i < numEvents*2; i++ { + eventTs := timeSlice.ForwardingEvents[i].Timestamp.UnixNano() + if eventTs != ts+int64(i) { + t.Fatalf("unexpected timestamp of event %d: expected "+ + "%d, got %d", i, ts+int64(i), eventTs) + } + } +}