Skip to content
/ cc Public

Concurrent Core for Go — a lightweight, high-performance toolkit designed for critical paths where latency and allocation matter.

License

Notifications You must be signed in to change notification settings

llxisdsh/cc

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

62 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Concurrent Core

Go Reference

Concurrent Core for Go — a lightweight, high-performance toolkit designed for critical paths where latency and allocation matter.

Installation

go get github.com/llxisdsh/cc

Core Components

🚀 Concurrent Maps

State-of-the-art concurrent map implementations, streamlined from llxisdsh/pb.

Component Description Ideal Use Case
Map Lock-free reads, fine-grained write locking. Drop-in sync.Map replacement. General purpose, mixed R/W workloads.
FlatMap Seqlock-based, open-addressing with inline storage. Read-heavy, cache-sensitive scenarios.

Note: These components retain the core high-performance logic of llxisdsh/pb but are packaged here for lightweight integration. For comprehensive benchmarks and advanced architectural details, please refer to the upstream repository.

⚡ Processor Local

  • PLocal[T]: Processor-local storage. Shards data by P (GOMAXPROCS) to minimize lock contention. Ideal for high-throughput counters or temporary buffers.

🧵 Execution Patterns

Tools to manage task execution and flow.

  • WorkerPool: Robust, high-performance worker pool with zero-allocation on happy path.
  • OnceGroup[K, V]: Coalesces duplicate requests (singleflight). ~20× faster than singleflight with panic propagation.

🔒 Synchronization Primitives

Atomic, low-overhead coordination tools built on runtime semaphores.

Primitive Metaphor Behavior Key Usage
Latch One-time Door Starts closed. Once Open(), stays open forever. Initialization, Shutdown signal.
Gate Manual Door Open()/Close()/Pulse(). Supports broadcast wakeups. Pausing/Resuming, Cond-like signals.
Rally Meeting Point Meet(n) waits until n parties arrive, then releases all. CyclicBarrier, MapReduce stages.
Phaser Dynamic Barrier Dynamic party registration with split-phase Arrive()/AwaitAdvance(). Java-style Phaser, Pipeline stages.
Epoch Milestone WaitAtLeast(n) blocks until counter reaches n. No thundering herd. Phase coordination, Version gates.
Barter Exchanger Two goroutines swap values at a sync point. Producer-Consumer handoff.
RWLock Read-Write Lock Spin-based R/W lock, writer-preferred. Low-latency, writer-priority.
TicketLock Ticket Queue FIFO spin-lock with ticket algorithm. Fair mutex, Latency-sensitive paths.
BitLock Bit Lock Spins on a specific bit mask. Fine-grained, memory-constrained locks.
SeqLock Sequence Lock Optimistic reads with version counting. Tear-free snapshots, Read-heavy.
FairSemaphore FIFO Queue Strict FIFO ordering for permit acquisition. Anti-starvation scenarios.
TicketLockGroup Keyed Lock Per-key locking with auto-cleanup. User/Resource isolation.
RWLockGroup Keyed R/W Lock Per-key R/W locking with auto-cleanup. Config/Data partitioning.
WaitGroup Reusable WG Supports TryWait() & Waiters(). Reusable immediately. Batch processing.

Design Philosophy: Minimal footprint, direct runtime_semacquire integration. Most primitives are zero-alloc on hot paths.

🛠️ Concurrency Helpers

Generic helpers to add Timeout and Context cancellation support to any blocking operation, plus tools for periodic and parallel execution.

// Wait: Add Context cancellation to a blocking function
err := cc.Wait(ctx, func() {
    wg.Wait()
})

// WaitTimeout: Add Timeout to a blocking function
if err := cc.WaitTimeout(time.Second, wg.Wait); err != nil {
    // timed out
}

// Do: Execute a function that returns error, with Context support
err := cc.Do(ctx, func() error {
    return complexOp()
})

// Repeat: Run action periodically until Context cancelled or error
cc.Repeat(ctx, 5*time.Second, func(ctx context.Context) error {
    return reloadConfig()
})

// Parallel: Execute N tasks concurrently (fail-fast on error)
err := cc.Parallel(ctx, 10, func(ctx context.Context, i int) error {
    return processItem(i)
})

Quick Start

Concurrent Map

package main

import "github.com/llxisdsh/cc"

func main() {
    // 1. Standard Map (Lock-free reads, sync.Map compatible)
    var m cc.Map[string, int]
    m.Store("foo", 1)

    // 2. FlatMap (Seqlock-based, inline storage)
    fm := cc.NewFlatMap[string, int](cc.WithCapacity(1000))
    fm.Store("bar", 2)

    // 3. Compute (Atomic Read-Modify-Write)
    // Safe, lock-free coordination for complex state changes
    m.Compute("foo", func(e *cc.MapEntry[string, int]) {
        if e.Loaded() {
            // Atomically increment if exists
            e.Update(e.Value() + 1)
        } else {
            // Initialize if missing
            e.Update(1)
        }
    })

    // 4. Rebuild (Atomic transaction)
    // Safe, Multiple operations as single atomic transaction
    m.Rebuild(func(r *cc.MapRebuild[string, int]) {
        r.Store("new", 1)
        r.Delete("old")
        r.Compute("counter", func(e *cc.MapEntry[string, int]) {
            e.Update(e.Value() + 1)
        })
    })
}

PLocal (Processor-Local Storage)

// 1. Scalable Counter (PLocalCounter)
var c cc.PLocalCounter
// High throughput: Writes are sharded by P, avoiding global lock contention
c.Add(1)         // Scalable: No global lock
sum := c.Value() // Aggregates across all Ps

// 2. Generic PLocal
var p cc.PLocal[*bytes.Buffer]
// Run fn pinned to current P with local shard
p.With(func(buf **bytes.Buffer) {
    if *buf == nil { *buf = new(bytes.Buffer) }
    (*buf).WriteString("data")
})

WorkerPool

// Create a pool with 10 workers and queue size of 100
wp := cc.NewWorkerPool(10, 100)

// Optional: Handle panics from workers
wp.OnPanic = func(r any) {
    log.Printf("Worker panicked: %v", r)
}

// Submit non-blocking task (blocks if queue full)
wp.Submit(func() {
    process()
})

// Wait for all tasks to complete without closing
wp.Wait()

// Graceful shutdown
wp.Close()

OnceGroup

var g cc.OnceGroup[string, string]
// Coalesce duplicate requests
val, err, shared := g.Do("key", func() (string, error) {
    return "expensive-op", nil
})

Primitives Gallery

1. Coordination

// Latch: One-shot signal (e.g., init finished)
var l cc.Latch
go func() { l.Open() }()
l.Wait() // Blocks until Open()

// Gate: Reusable stop/go barrier
var g cc.Gate
g.Open()   // All waiters pass
g.Close()  // Future waiters block
g.Pulse()  // Wake current waiters only, remain closed

// Rally: Cyclic barrier for N parties
var r cc.Rally
r.Meet(3)  // Blocks until 3 goroutines arrive

// WaitGroup: Reusable
var wg cc.WaitGroup
wg.Go(func() { /* work */ })
// Add timeout support via cc.WaitTimeout
err := cc.WaitTimeout(time.Second, wg.Wait)

2. Advanced Locking

// RWLock: Writer-preferred R/W lock (avoids writer starvation)
var rw cc.RWLock
rw.Lock() // Higher priority than RLock

// TicketLock: Fair mutex (FIFO), no starvation
var mu cc.TicketLock
mu.Lock()
defer mu.Unlock()

// BitLock: Memory-efficient lock using a single bit in uint64
var state uint64
const lockBit = 1 << 63
cc.BitLockUint64(&state, lockBit) // Spins until bit 63 is 0, then sets it
cc.BitUnlockUint64(&state, lockBit)

// SeqLock: Tear-free snapshots for read-heavy small data
var sl cc.SeqLock
var slot cc.SeqLockSlot[string]
cc.SeqLockWrite(&sl, &slot, "data") // Writer
val := cc.SeqLockRead(&sl, &slot)   // Reader (optimistic, no blocking)

3. Keyed Locks (Auto-cleanup)

// Lock by key (string, int, etc.) without memory leaks
var locks cc.TicketLockGroup[string]

locks.Lock("user:123")
// Critical section for user:123
locks.Unlock("user:123")

4. Specialized

// Phaser: Dynamic barrier (Java-style)
p := cc.NewPhaser()
p.Register()
phase := p.ArriveAndAwaitAdvance()

// Epoch: Wait for counter to reach target (e.g., version waits)
var e cc.Epoch
e.WaitAtLeast(5) // Blocks until e.Add() reaches 5

// Barter: Exchanger for 2 goroutines
b := cc.NewBarter[string]()
// G1: b.Exchange("ping") -> returns "pong"
// G2: b.Exchange("pong") -> returns "ping"

About

Concurrent Core for Go — a lightweight, high-performance toolkit designed for critical paths where latency and allocation matter.

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published