Source code for ulid.ulid

"""
This module provides immutable ULID objects (class ULID) according to the 
`ULID spec <https://github.com/ulid/spec>`_ and the functions *generate()* 
to generate ulids according to the specifications, *encode()* to transform a 
given integer to the canonical string representation of an ULID, and *decode()* to take 
a canonically encoded string and break it down into it's timestamp and randomness components.
The module also provides Monotonic sort order guarantee for ULIDs via the Monotonic
class and it's associated generate() function.

.. module:: ulid
    :platform: Unix, Windows
.. moduleauthor:: Manikandan Sundararajan <tsmanikandan@protonmail.com>

"""

import os
import sys
import time
import secrets
from typing import Any, Tuple
from datetime import datetime, timezone


__author__ = "Manikandan Sundararajan <tsmanikandan@protonmail.com>"

[docs]class ULID: """Instances of the ULID class represent ULIDS as specified in the `ULID spec <https://github.com/ulid/spec>`_. ULIDS have 128-bit compatibility with UUID, are Lexicographically sortable, case insensitive and URL safe """ # Number of bits each ulid component should have _time = 50 _randomness = 80 # 32 Symbol notation crockford_base = "0123456789ABCDEFGHJKMNPQRSTVWXYZ" # 2^48 - 1. Upto the year 10889 A.D MAX_EPOCH_TIME = 281474976710655 def __init__(self, seed=None): self.seed_time = seed # Function to generate the ulid without monotonicity or ms time handling
[docs] def generate(self) -> str: """ Generate a 26 character ULID string encoded in Crockford's Base32 :returns: Canonically encoded ULID string :rtype: str >>> from ulid import ULID >>> ulid = ULID() >>> ulid.generate() 01BX5ZZKBKACTAV9WEVGEMMVRZ """ if self.seed_time is None: curr_utc_timestamp = int(datetime.now(timezone.utc).timestamp() * 1000) else: curr_utc_timestamp = self.seed_time * 1000 epoch_bits = format(curr_utc_timestamp, f"0{self._time}b") rand_num_bits = format(secrets.randbits(self._randomness), f"0{self._randomness}b") return self._from_bits_to_ulidstr(epoch_bits + rand_num_bits)
# Function to encode an integer into a ulid canonical string
[docs] def encode(self, i: int) -> str: """ Convert a given integer into the canonical ULID string format :param i: The integer to convert :type i: int :returns: Canonically encoded ULID string :rtype: str :raises: TypeError, ValueError >>> from ulid import ULID >>> ulid = ULID() >>> ulid.encode(340282366920938463463374607431768167) 00864KEJY6MZQSVCHD1SB08637 """ if not isinstance(i, int): raise TypeError("The input has to be an integer") if i < 0: raise ValueError("The input has to be a positive value") if i >= (1 << 128): raise ValueError("The input value is larger than 128 bits") ulid_bits = format(i, f"0{self._time + self._randomness}b") return self._from_bits_to_ulidstr(ulid_bits)
# Function to encode a unix timestamp into ulid canonical string format
[docs] def encode_timestamp(self, t: int) -> str: """ Convert a given unix timestamp into the canonical ULID string format :param t: The unix timestamp to convert :type t: int :returns: Canonically encoded ULID string :rtype: str :raises: TypeError, ValueError >>> from ulid import ULID >>> ulid = ULID() >>> ulid.encode(281474976710655) 7ZZZZZZZZZ """ if not isinstance(t, int): raise TypeError("The timestamp has to be an integer") if t < 0: raise ValueError("The timestamp has to be a positive value") if t > self.MAX_EPOCH_TIME: raise ValueError("Cannot encode time larger than - {}".format(self.MAX_EPOCH_TIME)) return self._from_bits_to_ulidstr(format(t, f"0{self._time}b"))
[docs] def decode(self, s: str) -> Tuple[int, int]: """ Given a properly formed ULID, return a tuple containing the timestamp and the randomness component as ints :param s: ULID string to decode :type s: str :returns: (timestamp, randomness) :rtype: tuple(int, int) :raises: TypeError, ValueError >>> from ulid import ULID >>> ulid = ULID() >>> ulid.decode('01BX5ZZKBKACTAV9WEVGEMMVRY') (1508808576371, 392928161897179156999966) """ if not isinstance(s, str): raise TypeError("The input value has to be a string") if len(s) > 26: raise ValueError("The string has to be 26 characters in length") ulid_bits = "" # Loop through the given string and find the position of the character # in crockfords base. Convert that position to a 5 bit string for c in s: pos = self.crockford_base.find(c) if pos == -1: raise ValueError("Invalid character: {} found in ulid string".format(c)) ulid_bits += format(pos, f"0{5}b") epoch_time_in_ms = int(ulid_bits[0:self._time], base=2) if epoch_time_in_ms > self.MAX_EPOCH_TIME: raise ValueError("Cannot encode time larger than - {}".format(self.MAX_EPOCH_TIME)) random_component = int(ulid_bits[self._time:], base=2) return (epoch_time_in_ms, random_component)
def _from_bits_to_ulidstr(self, ulid_bits: str) -> str: ulid_str = "" for i in range(0, len(ulid_bits), 5): ulid_str += self.crockford_base[int(ulid_bits[i : i + 5], base=2)] return ulid_str # Function to print a given ULID as in the binary layout
[docs] def pretty_print(self, s: str) -> None: """ Print the given ULID string in a binary layout >>> from ulid import ULID >>> ulid = ULID() >>> ulid.pretty_print(ulid_str) +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | 32_bit_uint_time_high | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | 16_bit_uint_time_low | 16_bit_uint_random | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | 32_bit_uint_random | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | 32_bit_uint_random | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ """ # (timestamp, rand) = tuple(map(lambda x: bin(x)[2:], self.decode(s))) # interval = "".join(list(map(lambda a: '*' if a % 2 == 0 else '+', [for i in range(65)]))) interval = "+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+" (timestamp, rand) = self.decode(s) time_bits = format(timestamp, f"0{48}b") rand_bits = format(rand, f"0{self._randomness}b") print("\n") print(interval) print("|" + " "*16 + time_bits[:32] + " "*15 + "|") print(interval) print("|" + " "*8 + time_bits[32:] + " "*7 + "|" + " "*8 + rand_bits[0:16] + " "*7 + "|") print(interval) print("|" + " "*16 + rand_bits[16:48]+ " "*15 + "|") print(interval) print("|" + " "*16 + rand_bits[48:] + " "*15 + "|") print(interval)
[docs]class Monotonic(ULID): """The Monotonic class represent an extension of the base ULID class with the addition of a monotonic sort order (correctly detects and handles the same millisecond) """ def __init__(self, seed=None): self.__prev_utc_time = datetime(1970, 1, 1, tzinfo=timezone.utc) self.__prev_rand_bits = None self.seed_time = seed # Function to generate the ulid monotonically
[docs] def generate(self) -> str: """ Generate a 26 character ULID string encoded in Crockford's Base32 :returns: Canonically encoded ULID string :rtype: str :raises: ValueError >>> from ulid import Monotonic >>> ulid = Monotonic() >>> ulid.generate() 01BX5ZZKBKACTAV9WEVGEMMVRZ """ #Get current UTC time as a datetime obj curr_utc_time = datetime.now(timezone.utc) # print("Now: {}, Last: {}".format(curr_utc_time, self.__prev_utc_time)) # Calculate the difference in the current time and the last generated time # using the timedelta microseconds function ms_diff = (curr_utc_time - self.__prev_utc_time).microseconds / 1000 # print("ms diff: {}".format(ms_diff)) # The generate calls happened in the same millisecond if ms_diff <= 1.0: # Convert the prev time datetime object to a unix timestamp in milliseconds prev_utc_timestamp = int(self.__prev_utc_time.timestamp() * 1000) epoch_bits = format(prev_utc_timestamp, f"0{self._time}b") # If for some reason the rand_bits for the last generate call were not set, # Set them to be some random bits if self.__prev_rand_bits is None: rand_num_bits = format(secrets.randbits(self._randomness), f"0{self._randomness}b") else: prev_rand_num = int(self.__prev_rand_bits, base=2) if len(bin(prev_rand_num + 1)[2:]) > self._randomness: # Random component overflow raise ValueError("The random component has overflowed") else: rand_num_bits = format((prev_rand_num + 1), f"0{self._randomness}b") self.__prev_rand_bits = rand_num_bits ulid_bits = epoch_bits + rand_num_bits return self._from_bits_to_ulidstr(ulid_bits) else: # The generate calls did not happen in the same millisecond self.__prev_utc_time = curr_utc_time curr_utc_timestamp = int(curr_utc_time.timestamp() * 1000) epoch_bits = format(curr_utc_timestamp, f"0{self._time}b") #Generate the randomness bits using the secrets modules rand_num_bits = format(secrets.randbits(self._randomness), f"0{self._randomness}b") self.__prev_rand_bits = rand_num_bits ulid_bits = epoch_bits + rand_num_bits return self._from_bits_to_ulidstr(ulid_bits)