Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 54 additions & 27 deletions ruva-core/src/snowflake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,40 @@ pub struct NumericalUniqueIdGenerator {
pub datacenter_id: i32,
pub machine_id: i32,

/// Sequence number consists of 42 + 12 bits. 42 bits for timestamp and 12 bits for sequence number.
sequence_num: AtomicI64,
/// timpstamp(42bits) + sequence number(12bits).
ts_seq: AtomicI64,
}

struct TimeStampSeq {
ts: i64,
seq: i64,
}
impl TimeStampSeq {
fn to_i64(&self) -> i64 {
// combine the timestamp and sequence number.
// It's basically the reverse of the `current_ts_seq` function.
self.ts << 12 | self.seq
}

fn next_ts_seq(&self, current_ts: i64, epoch: SystemTime) -> Self {
const MAX_SEQUENCE: i64 = 4095;
let (ts, seq) = if self.ts == current_ts {
// If timestamp hasn't changed, increment sequence
let next_seq = self.seq.wrapping_add(1) & MAX_SEQUENCE;

// If sequence overflows, move to next millisecond
if next_seq == 0 {
let next_ts = race_next_milli(current_ts, epoch);
(next_ts, 0)
} else {
(current_ts, next_seq)
}
} else {
(current_ts, 0)
};

TimeStampSeq { ts, seq }
}
}

#[derive(Debug)]
Expand All @@ -48,6 +80,15 @@ impl NumericalUniqueIdGenerator {
Self::with_epoch(datacenter_id, machine_id, UNIX_EPOCH)
}

fn ts_seq(&self) -> TimeStampSeq {
const MASKING: i64 = 0xFFF;
let ts_seq = self.ts_seq.load(Ordering::Relaxed);
TimeStampSeq {
ts: ts_seq >> 12,
seq: ts_seq & MASKING,
}
}

/// Constructs a new `NumericalUniqueIdGenerator` using the specified epoch.
///
/// # Examples
Expand All @@ -68,7 +109,7 @@ impl NumericalUniqueIdGenerator {
epoch,
datacenter_id,
machine_id,
sequence_num: AtomicI64::new(timestamp << 12),
ts_seq: AtomicI64::new(timestamp << 12),
}
}

Expand All @@ -77,8 +118,8 @@ impl NumericalUniqueIdGenerator {
/// datacenter id takes 5 bits in the second place so left shift 17
/// machine id takes 5 bits in the third place so left shift 12
/// sequence number comes last.
fn get_snowflake(&self, seq_num: i16, timestamp: i64) -> i64 {
timestamp << 22 | ((self.datacenter_id as i64) << 17) | ((self.machine_id as i64) << 12) | (seq_num as i64)
fn get_snowflake(&self, ts_seq: TimeStampSeq) -> i64 {
ts_seq.ts << 22 | ((self.datacenter_id as i64) << 17) | ((self.machine_id as i64) << 12) | (ts_seq.seq)
}

/// The basic guarantee time punctuality.
Expand All @@ -97,35 +138,21 @@ impl NumericalUniqueIdGenerator {
/// id_generator.generate();
/// ```
pub fn generate(&self) -> i64 {
const MAX_SEQUENCE: i64 = 4095;

loop {
let timestamp = current_time_in_milli(self.epoch);
let current = self.sequence_num.load(Ordering::Relaxed);
let last_ts = current >> 12;
let last_seq = current & 0xFFF;
let last_ts_seq = self.ts_seq();
let current_ts = current_time_in_milli(self.epoch);

if last_ts > timestamp {
//This condition handles clock drift or system time adjustments where the clock moves backwards.
//It occurs in scenarios like: NTP adjustments, Daylight saving, Manual system clock adjustment. VM time sync
if last_ts_seq.ts > current_ts {
spin_loop();
continue;
}

let (new_ts, new_seq) = if last_ts == timestamp {
let next_seq = (last_seq.wrapping_add(1) & MAX_SEQUENCE) as i16;
if next_seq == 0 {
let next_ts = race_next_milli(timestamp, self.epoch);
(next_ts, 0)
} else {
(timestamp, next_seq)
}
} else {
(timestamp, 0)
};
let new_ts_seq = last_ts_seq.next_ts_seq(current_ts, self.epoch);

let new_id = new_ts << 12 | new_seq as i64;
match self.sequence_num.compare_exchange(current, new_id, Ordering::Relaxed, Ordering::Relaxed) {
Ok(_) => return self.get_snowflake(new_seq, new_ts),
Err(_) => continue,
if let Ok(_) = self.ts_seq.compare_exchange(last_ts_seq.to_i64(), new_ts_seq.to_i64(), Ordering::Relaxed, Ordering::Relaxed) {
return self.get_snowflake(new_ts_seq);
}
}
}
Expand Down
Loading