Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions tests/test_ulid.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,21 @@ def test_same_millisecond_monotonic_sorting() -> None:
assert_sorted(ulids)


def test_z_real_time_monotonic_sorting() -> None:
"""ULIDs generated in rapid succession must be monotonically increasing.

This catches the bug where ``from_timestamp()`` samples the clock twice,
potentially crossing a millisecond boundary between the timestamp capture
and the randomness generation, which could produce a fresh (smaller) random
value instead of incrementing the previous one.
"""
ulids = [ULID() for _ in range(5000)]
assert_sorted(ulids)


@freeze_time()
def test_same_millisecond_overflow() -> None:
ULID.provider.prev_timestamp = ULID.provider.timestamp()
ULID.provider.prev_randomness = constants.MAX_RANDOMNESS
with pytest.raises(ValueError, match="Randomness within same millisecond exhausted"):
ULID()
Expand Down
10 changes: 6 additions & 4 deletions ulid/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ def timestamp(self, value: float | None = None) -> int:
raise ValueError("Value exceeds maximum possible timestamp")
return value

def randomness(self) -> bytes:
def randomness(self, current_timestamp: int | None = None) -> bytes:
with self.lock:
current_timestamp = self.timestamp()
if current_timestamp is None:
current_timestamp = self.timestamp()
if current_timestamp == self.prev_timestamp:
if self.prev_randomness == constants.MAX_RANDOMNESS:
raise ValueError("Randomness within same millisecond exhausted")
Expand Down Expand Up @@ -148,8 +149,9 @@ def from_timestamp(cls, value: float) -> Self:
>>> ULID.from_timestamp(time.time())
ULID(01E75QWN5HKQ0JAVX9FG1K4YP4)
"""
timestamp = int.to_bytes(cls.provider.timestamp(value), constants.TIMESTAMP_LEN, "big")
randomness = cls.provider.randomness()
timestamp_value = cls.provider.timestamp(value)
timestamp = int.to_bytes(timestamp_value, constants.TIMESTAMP_LEN, "big")
randomness = cls.provider.randomness(timestamp_value)
return cls.from_bytes(timestamp + randomness)

@classmethod
Expand Down