diff --git a/CMakeLists.txt b/CMakeLists.txt index 3d1564f..89445bc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -89,11 +89,13 @@ add_library(elcache src/cache/memory_cache.cpp src/cache/disk_cache.cpp src/cache/cache_coordinator.cpp + src/cache/sparse_cache.cpp src/storage/chunk_store.cpp src/storage/disk_store.cpp src/network/node.cpp src/network/gossip.cpp src/network/cluster.cpp + src/network/unix_server.cpp src/protocol/protocol.cpp src/protocol/codec.cpp src/http/http_api.cpp diff --git a/docs/API.md b/docs/API.md index cc0cfa8..2632314 100644 --- a/docs/API.md +++ b/docs/API.md @@ -189,6 +189,257 @@ OPTIONS /cache/{key} --- +## Sparse/Parallel Write Operations + +The sparse write API allows uploading large files in parts, potentially from multiple clients in parallel. This is useful for: + +- Files larger than available memory +- Parallel uploads from multiple sources +- Resumable uploads +- Assembling data from distributed producers + +### Workflow + +1. **Create** a sparse entry with the total size +2. **Write** ranges at specific offsets (can be parallel from multiple clients) +3. **Finalize** to commit to cache (validates all bytes were written) + +### Create Sparse Entry + +Create a new sparse entry that will be filled with range writes. + +``` +POST /sparse/{key} +``` + +**Path Parameters:** +- `key` - Cache key (max 8KB) + +**Headers:** +| Header | Type | Required | Description | +|--------|------|----------|-------------| +| `X-ElCache-Size` | integer | Yes | Total size in bytes | +| `X-ElCache-TTL` | integer | No | Time-to-live in seconds (applied on finalize) | + +**Response:** +- `201 Created` - Sparse entry created +- `400 Bad Request` - Missing or invalid X-ElCache-Size +- `409 Conflict` - Sparse entry already exists for this key + +**Example:** + +```bash +# Create a 10MB sparse entry +curl -X POST http://localhost:8080/sparse/myfile \ + -H "X-ElCache-Size: 10485760" +``` + +--- + +### Write Range + +Write data at a specific byte offset within a sparse entry. + +``` +PATCH /sparse/{key} +PUT /sparse/{key} +``` + +**Path Parameters:** +- `key` - Cache key + +**Headers:** +| Header | Type | Description | +|--------|------|-------------| +| `Content-Range` | string | Byte range: `bytes start-end/total` | + +**Query Parameters:** +| Parameter | Type | Description | +|-----------|------|-------------| +| `offset` | integer | Alternative to Content-Range header | + +**Request Body:** Raw binary data for this range + +**Response:** +- `202 Accepted` - Range written successfully +- `400 Bad Request` - Invalid range (exceeds total size) +- `404 Not Found` - Sparse entry doesn't exist + +**Response Headers:** +| Header | Description | +|--------|-------------| +| `X-ElCache-Completion` | Percentage of bytes written (e.g., "75%") | + +**Examples:** + +```bash +# Write using Content-Range header +curl -X PATCH http://localhost:8080/sparse/myfile \ + -H "Content-Range: bytes 0-1048575/10485760" \ + --data-binary @chunk1.bin + +# Write using offset query parameter +curl -X PATCH "http://localhost:8080/sparse/myfile?offset=1048576" \ + --data-binary @chunk2.bin + +# Write using PUT (alternative to PATCH) +curl -X PUT "http://localhost:8080/sparse/myfile?offset=2097152" \ + --data-binary @chunk3.bin +``` + +--- + +### Get Sparse Entry Status + +Get the current status of a sparse entry. + +``` +GET /sparse/{key} +``` + +**Path Parameters:** +- `key` - Cache key + +**Response:** +- `200 OK` - Status returned +- `404 Not Found` - Sparse entry doesn't exist + +**Response Body:** +```json +{ + "key": "myfile", + "total_size": 10485760, + "completion_percent": 75.5, + "complete": false +} +``` + +**Example:** + +```bash +curl http://localhost:8080/sparse/myfile +``` + +--- + +### Finalize Sparse Entry + +Finalize a sparse entry, validating all bytes have been written and moving it to the main cache. + +``` +POST /sparse/{key}/finalize +``` + +**Path Parameters:** +- `key` - Cache key + +**Headers:** +| Header | Type | Description | +|--------|------|-------------| +| `X-ElCache-TTL` | integer | Time-to-live in seconds (optional) | + +**Response:** +- `201 Created` - Entry finalized and available in cache +- `404 Not Found` - Sparse entry doesn't exist +- `409 Conflict` - Not all byte ranges have been written + +**Example:** + +```bash +curl -X POST http://localhost:8080/sparse/myfile/finalize +``` + +--- + +### Abort Sparse Write + +Delete a sparse entry without finalizing (abort the upload). + +``` +DELETE /sparse/{key} +``` + +**Path Parameters:** +- `key` - Cache key + +**Response:** +- `204 No Content` - Sparse entry deleted + +**Example:** + +```bash +curl -X DELETE http://localhost:8080/sparse/myfile +``` + +--- + +### Complete Sparse Write Example + +```bash +#!/bin/bash +KEY="largefile" +TOTAL_SIZE=10485760 # 10MB +CHUNK_SIZE=2097152 # 2MB + +# Step 1: Create sparse entry +echo "Creating sparse entry..." +curl -s -X POST "http://localhost:8080/sparse/$KEY" \ + -H "X-ElCache-Size: $TOTAL_SIZE" + +# Step 2: Upload chunks in parallel +echo "Uploading chunks in parallel..." +for i in 0 1 2 3 4; do + OFFSET=$((i * CHUNK_SIZE)) + ( + dd if=/dev/urandom bs=$CHUNK_SIZE count=1 2>/dev/null | \ + curl -s -X PATCH "http://localhost:8080/sparse/$KEY?offset=$OFFSET" \ + --data-binary @- + ) & +done +wait + +# Step 3: Check status +echo "Checking status..." +curl -s "http://localhost:8080/sparse/$KEY" + +# Step 4: Finalize +echo "Finalizing..." +curl -s -X POST "http://localhost:8080/sparse/$KEY/finalize" + +# Step 5: Verify data is accessible +echo "Verifying..." +curl -s -I "http://localhost:8080/cache/$KEY" | grep -E "(X-ElCache|Content-Length)" +``` + +### Parallel Upload from Multiple Clients + +Multiple clients can write different ranges simultaneously: + +```bash +# Client 1 (writes first quarter) +curl -X PATCH "http://localhost:8080/sparse/shared-file?offset=0" \ + --data-binary @part1.bin & + +# Client 2 (writes second quarter) +curl -X PATCH "http://localhost:8080/sparse/shared-file?offset=262144" \ + --data-binary @part2.bin & + +# Client 3 (writes third quarter) +curl -X PATCH "http://localhost:8080/sparse/shared-file?offset=524288" \ + --data-binary @part3.bin & + +# Client 4 (writes fourth quarter) +curl -X PATCH "http://localhost:8080/sparse/shared-file?offset=786432" \ + --data-binary @part4.bin & + +wait + +# Any client can finalize +curl -X POST "http://localhost:8080/sparse/shared-file/finalize" +``` + +--- + ## Admin Endpoints ### Health Check @@ -331,7 +582,11 @@ All error responses include a plain text body with the error message. |-------|-------| | Maximum key size | 8 KB | | Maximum value size | 20 TB | +| Maximum single PUT body | 100 MB | | Chunk size | 4 MB | +| Sparse write chunk size | Unlimited (within total size) | + +**Note:** For values larger than 100MB, use the sparse write API which allows uploading in smaller chunks. --- @@ -360,6 +615,25 @@ curl -H "Range: bytes=1048576-2097151" \ http://localhost:8080/cache/bigfile.bin > part2.bin ``` +### Upload a large file using sparse writes + +```bash +# Create 100MB sparse entry +curl -X POST http://localhost:8080/sparse/bigfile.bin \ + -H "X-ElCache-Size: 104857600" + +# Upload in 10MB chunks (can be parallelized) +for i in $(seq 0 9); do + OFFSET=$((i * 10485760)) + dd if=bigfile.bin bs=10485760 skip=$i count=1 2>/dev/null | \ + curl -X PATCH "http://localhost:8080/sparse/bigfile.bin?offset=$OFFSET" \ + --data-binary @- +done + +# Finalize +curl -X POST http://localhost:8080/sparse/bigfile.bin/finalize +``` + ### Monitor cache performance ```bash diff --git a/docs/SDK.md b/docs/SDK.md index c888894..6667208 100644 --- a/docs/SDK.md +++ b/docs/SDK.md @@ -7,6 +7,7 @@ The ElCache C SDK provides high-performance cache access with zero external depe - **Zero dependencies**: Only requires POSIX APIs - **Two transport modes**: Unix sockets (reliable) and shared memory (fastest) - **Partial read support**: Efficiently read byte ranges of large values +- **Position-based writes**: Write large values in chunks with parallel support - **Streaming API**: Handle values larger than memory - **Thread safety**: Each client instance is single-threaded; create one per thread @@ -81,6 +82,34 @@ int main() { } ``` +## Quick Reference: Large File Upload + +For values larger than 256KB (the default buffer size), use position-based writes: + +```c +#include + +int upload_large_file(elcache_client_t* client, + const char* key, + const void* data, + size_t total_size) { + // 1. Create sparse entry + elcache_client_create_sparse(client, key, strlen(key), total_size, NULL); + + // 2. Write in chunks (can be parallel from multiple threads) + size_t chunk_size = 200 * 1024; // 200KB chunks + for (size_t offset = 0; offset < total_size; offset += chunk_size) { + size_t len = (offset + chunk_size > total_size) + ? (total_size - offset) : chunk_size; + elcache_client_write_range(client, key, strlen(key), + offset, (char*)data + offset, len); + } + + // 3. Finalize + return elcache_client_finalize(client, key, strlen(key)); +} +``` + ## API Reference ### Client Lifecycle @@ -316,6 +345,239 @@ err = elcache_client_read_range(client, "bigfile", 7, --- +### Position-Based Writes (Sparse/Parallel) + +The position-based write API allows you to write large values in parts, potentially from multiple threads in parallel. This is useful for: + +- Writing files larger than the SDK buffer size (default 256KB) +- Parallel uploads from multiple sources +- Resumable uploads +- Assembling data from multiple producers + +#### Workflow + +1. **Create** a sparse entry with total size +2. **Write** ranges at specific offsets (can be parallel) +3. **Finalize** to commit to cache (validates all bytes written) + +#### elcache_client_create_sparse + +```c +elcache_error_t elcache_client_create_sparse( + elcache_client_t* client, + const void* key, + size_t key_len, + uint64_t total_size, + const elcache_put_options_t* options // NULL for defaults +); +``` + +Create a sparse entry that will be filled with position-based writes. + +**Parameters:** +- `client` - Client instance +- `key` - Cache key +- `key_len` - Key length in bytes +- `total_size` - Total size of the final value in bytes +- `options` - Optional put options (TTL, flags, etc.) + +**Returns:** +- `ELCACHE_OK` - Entry created +- `ELCACHE_ERR_INVALID_ARG` - Invalid parameters +- `ELCACHE_ERR_INTERNAL` - Entry already exists + +--- + +#### elcache_client_write_range + +```c +elcache_error_t elcache_client_write_range( + elcache_client_t* client, + const void* key, + size_t key_len, + uint64_t offset, + const void* data, + size_t data_len +); +``` + +Write data at a specific offset within a sparse entry. + +**Parameters:** +- `client` - Client instance +- `key` - Cache key +- `key_len` - Key length in bytes +- `offset` - Byte offset where data should be written +- `data` - Data buffer +- `data_len` - Length of data to write + +**Returns:** +- `ELCACHE_OK` - Data written successfully +- `ELCACHE_ERR_NOT_FOUND` - Sparse entry doesn't exist +- `ELCACHE_ERR_INVALID_ARG` - Range exceeds total size +- `ELCACHE_ERR_VALUE_TOO_LARGE` - Data too large for buffer + +**Note:** Multiple `write_range` calls can overlap or be called in parallel from different threads/connections. Each byte position only needs to be written once. + +--- + +#### elcache_client_finalize + +```c +elcache_error_t elcache_client_finalize( + elcache_client_t* client, + const void* key, + size_t key_len +); +``` + +Finalize a sparse entry, validating all bytes have been written and moving it to the main cache. + +**Parameters:** +- `client` - Client instance +- `key` - Cache key +- `key_len` - Key length in bytes + +**Returns:** +- `ELCACHE_OK` - Entry finalized and available in cache +- `ELCACHE_ERR_NOT_FOUND` - Sparse entry doesn't exist +- `ELCACHE_ERR_PARTIAL` - Not all byte ranges have been written + +--- + +#### Example: Writing a 1MB file in chunks + +```c +#include +#include + +int upload_large_file(elcache_client_t* client, + const char* key, + const uint8_t* data, + size_t total_size) { + // Step 1: Create sparse entry + elcache_error_t err = elcache_client_create_sparse( + client, key, strlen(key), total_size, NULL); + if (err != ELCACHE_OK) { + return -1; + } + + // Step 2: Write in 200KB chunks + const size_t chunk_size = 200 * 1024; + for (size_t offset = 0; offset < total_size; offset += chunk_size) { + size_t len = (offset + chunk_size > total_size) + ? (total_size - offset) + : chunk_size; + + err = elcache_client_write_range( + client, key, strlen(key), offset, data + offset, len); + if (err != ELCACHE_OK) { + return -1; + } + } + + // Step 3: Finalize + err = elcache_client_finalize(client, key, strlen(key)); + if (err != ELCACHE_OK) { + return -1; + } + + return 0; +} +``` + +#### Example: Parallel writes from multiple threads + +```c +#include +#include + +typedef struct { + const char* socket_path; + const char* key; + const uint8_t* data; + size_t offset; + size_t length; + int success; +} write_task_t; + +void* write_worker(void* arg) { + write_task_t* task = (write_task_t*)arg; + + // Each thread creates its own client connection + elcache_client_t* client = elcache_client_create(); + elcache_client_connect_unix(client, task->socket_path); + + // Write this thread's chunk + elcache_error_t err = elcache_client_write_range( + client, task->key, strlen(task->key), + task->offset, task->data + task->offset, task->length); + + task->success = (err == ELCACHE_OK); + + elcache_client_destroy(client); + return NULL; +} + +int parallel_upload(const char* socket_path, + const char* key, + const uint8_t* data, + size_t total_size, + int num_threads) { + // Create sparse entry first + elcache_client_t* client = elcache_client_create(); + elcache_client_connect_unix(client, socket_path); + + elcache_error_t err = elcache_client_create_sparse( + client, key, strlen(key), total_size, NULL); + if (err != ELCACHE_OK) { + elcache_client_destroy(client); + return -1; + } + + // Spawn threads for parallel writes + pthread_t* threads = malloc(num_threads * sizeof(pthread_t)); + write_task_t* tasks = malloc(num_threads * sizeof(write_task_t)); + + size_t chunk_size = total_size / num_threads; + for (int i = 0; i < num_threads; i++) { + tasks[i].socket_path = socket_path; + tasks[i].key = key; + tasks[i].data = data; + tasks[i].offset = i * chunk_size; + tasks[i].length = (i == num_threads - 1) + ? (total_size - tasks[i].offset) + : chunk_size; + tasks[i].success = 0; + + pthread_create(&threads[i], NULL, write_worker, &tasks[i]); + } + + // Wait for all threads + int all_success = 1; + for (int i = 0; i < num_threads; i++) { + pthread_join(threads[i], NULL); + if (!tasks[i].success) all_success = 0; + } + + free(threads); + free(tasks); + + if (!all_success) { + elcache_client_destroy(client); + return -1; + } + + // Finalize + err = elcache_client_finalize(client, key, strlen(key)); + elcache_client_destroy(client); + + return (err == ELCACHE_OK) ? 0 : -1; +} +``` + +--- + ### Streaming API For values too large to fit in memory: @@ -588,3 +850,7 @@ void* worker_thread(void* arg) { | Maximum value size | 20 TB | | Default timeout | 30 seconds | | Default buffer size | 256 KB | +| Maximum single PUT size | Buffer size (default 256 KB) | +| Sparse write chunk size | Buffer size (default 256 KB) | + +**Note:** For values larger than the buffer size, use the position-based write API (`create_sparse`, `write_range`, `finalize`) which allows uploading in chunks. diff --git a/examples/sdk_example.c b/examples/sdk_example.c index 096d8d5..48e32fe 100644 --- a/examples/sdk_example.c +++ b/examples/sdk_example.c @@ -130,6 +130,8 @@ int main(int argc, char* argv[]) { if (err == ELCACHE_OK || err == ELCACHE_ERR_PARTIAL) { printf("Read %zu bytes from offset 500\n", len); } + } else { + fprintf(stderr, "1MB put failed: %s (default buffer is 256KB)\n", elcache_error_string(err)); } free(large_value); @@ -147,6 +149,76 @@ int main(int argc, char* argv[]) { elcache_client_exists(client, key, strlen(key), &exists); printf("Key exists after delete: %s\n", exists ? "yes" : "no"); + /* Example: Position-based writes for large values + * This demonstrates writing a large value in parts, which can be done + * from multiple threads in parallel. + */ + printf("\n=== Position-based Write Example ===\n"); + + const char* sparse_key = "large_file"; + size_t total_size = 1024 * 1024; /* 1MB total */ + size_t chunk_size = 200 * 1024; /* 200KB chunks (leave room for protocol overhead) */ + + printf("Creating sparse entry for %zu bytes...\n", total_size); + err = elcache_client_create_sparse(client, sparse_key, strlen(sparse_key), + total_size, NULL); + if (err != ELCACHE_OK) { + fprintf(stderr, "create_sparse failed: %s\n", elcache_error_string(err)); + } else { + printf("Sparse entry created!\n"); + + /* Write in chunks (in real use, these could be parallel from different threads) */ + char* chunk_data = malloc(chunk_size); + if (chunk_data) { + int success = 1; + for (size_t offset = 0; offset < total_size && success; offset += chunk_size) { + size_t write_len = (offset + chunk_size > total_size) ? + (total_size - offset) : chunk_size; + + /* Fill chunk with pattern based on offset */ + memset(chunk_data, (char)('A' + (offset / chunk_size)), write_len); + + printf("Writing %zu bytes at offset %zu...\n", write_len, offset); + err = elcache_client_write_range(client, sparse_key, strlen(sparse_key), + offset, chunk_data, write_len); + if (err != ELCACHE_OK) { + fprintf(stderr, "write_range failed: %s\n", elcache_error_string(err)); + success = 0; + } + } + free(chunk_data); + + if (success) { + printf("All chunks written, finalizing...\n"); + err = elcache_client_finalize(client, sparse_key, strlen(sparse_key)); + if (err == ELCACHE_OK) { + printf("Entry finalized successfully!\n"); + + /* Verify we can read it back */ + printf("Reading back first 100 bytes...\n"); + char verify_buf[100]; + size_t read_len; + elcache_get_options_t opts = ELCACHE_GET_OPTIONS_INIT; + opts.offset = 0; + opts.length = 100; + err = elcache_client_get(client, sparse_key, strlen(sparse_key), + verify_buf, sizeof(verify_buf), &read_len, &opts); + if (err == ELCACHE_OK) { + printf("Read %zu bytes, first char: '%c' (expected 'A')\n", + read_len, verify_buf[0]); + } + + /* Clean up */ + elcache_client_delete(client, sparse_key, strlen(sparse_key)); + } else if (err == ELCACHE_ERR_PARTIAL) { + fprintf(stderr, "Finalize failed: not all ranges written\n"); + } else { + fprintf(stderr, "Finalize failed: %s\n", elcache_error_string(err)); + } + } + } + } + /* Print stats */ printf("\n"); print_stats(client); diff --git a/include/elcache/elcache.hpp b/include/elcache/elcache.hpp index ca474a8..81560d1 100644 --- a/include/elcache/elcache.hpp +++ b/include/elcache/elcache.hpp @@ -13,6 +13,7 @@ #include "cluster.hpp" #include "protocol.hpp" #include "http_api.hpp" +#include "unix_server.hpp" #include "metrics.hpp" #include #include @@ -35,6 +36,7 @@ class ElCacheServer { CacheCoordinator& cache() { return *coordinator_; } Cluster* cluster() { return cluster_.get(); } HttpServer* http_server() { return http_server_.get(); } + UnixSocketServer* unix_server() { return unix_server_.get(); } MetricsCollector& metrics() { return *metrics_collector_; } elio::io::io_context& io_context() { return io_ctx_; } @@ -49,6 +51,7 @@ class ElCacheServer { std::unique_ptr metrics_collector_; std::unique_ptr http_handler_; std::unique_ptr http_server_; + std::unique_ptr unix_server_; std::atomic running_{false}; }; diff --git a/include/elcache/elcache_sdk.h b/include/elcache/elcache_sdk.h index 7770c3a..7c45c9d 100644 --- a/include/elcache/elcache_sdk.h +++ b/include/elcache/elcache_sdk.h @@ -257,6 +257,95 @@ elcache_error_t elcache_client_read_range( size_t* bytes_read ); +/* + * Position-based write operations (for parallel large value writes) + * + * These functions allow writing large values in parts from multiple + * threads or clients in parallel: + * + * 1. Call create_sparse() to allocate space with known total size + * 2. Call write_range() from multiple threads to write different parts + * 3. Call finalize() when all parts are written to mark entry complete + * + * Example: + * // Thread 1: Create sparse entry + * elcache_client_create_sparse(client, key, key_len, total_size, NULL); + * + * // Thread 2 & 3: Write parts in parallel + * elcache_client_write_range(client2, key, key_len, 0, data1, len1); + * elcache_client_write_range(client3, key, key_len, len1, data2, len2); + * + * // After all writes complete: + * elcache_client_finalize(client, key, key_len); + */ + +/* Create a sparse entry for parallel writes + * + * Allocates space for a value of known total_size but doesn't write data. + * The entry remains incomplete until finalize() is called. + * + * Parameters: + * key, key_len: The cache key + * total_size: Total size of the value to be written + * options: Optional put options (TTL, flags, etc.) + * + * Returns: + * ELCACHE_OK on success + * ELCACHE_ERR_INVALID_ARG if arguments invalid + * ELCACHE_ERR_CONNECTION if not connected + */ +elcache_error_t elcache_client_create_sparse( + elcache_client_t* client, + const void* key, + size_t key_len, + uint64_t total_size, + const elcache_put_options_t* options +); + +/* Write data at a specific offset within a sparse entry + * + * Can be called from multiple threads/clients in parallel to write + * different parts of the same key. Each write_range call is atomic. + * + * Parameters: + * key, key_len: The cache key (must have been created with create_sparse) + * offset: Byte offset within the value to write at + * data: Data to write + * data_len: Length of data to write + * + * Returns: + * ELCACHE_OK on success + * ELCACHE_ERR_NOT_FOUND if key doesn't exist or wasn't created as sparse + * ELCACHE_ERR_INVALID_ARG if offset+data_len exceeds total_size + */ +elcache_error_t elcache_client_write_range( + elcache_client_t* client, + const void* key, + size_t key_len, + uint64_t offset, + const void* data, + size_t data_len +); + +/* Finalize a sparse entry after all parts are written + * + * Validates that the entry is complete and marks it as readable. + * After finalization, the entry can be read normally with get(). + * + * Parameters: + * key, key_len: The cache key + * + * Returns: + * ELCACHE_OK on success + * ELCACHE_ERR_NOT_FOUND if key doesn't exist + * ELCACHE_ERR_PARTIAL if not all ranges have been written + */ +elcache_error_t elcache_client_finalize( + elcache_client_t* client, + const void* key, + size_t key_len +); + /* * Streaming operations (for very large values) */ diff --git a/include/elcache/http_api.hpp b/include/elcache/http_api.hpp index 82f93a3..acf1298 100644 --- a/include/elcache/http_api.hpp +++ b/include/elcache/http_api.hpp @@ -96,6 +96,12 @@ class HttpHandler { elio::coro::task handle_head(const HttpRequest& req); elio::coro::task handle_options(const HttpRequest& req); + // Sparse/partial write handlers + elio::coro::task handle_sparse_create(const HttpRequest& req); + elio::coro::task handle_sparse_write(const HttpRequest& req); + elio::coro::task handle_sparse_finalize(const HttpRequest& req); + elio::coro::task handle_sparse_status(const HttpRequest& req); + // Stats/admin endpoints elio::coro::task handle_stats(const HttpRequest& req); elio::coro::task handle_health(const HttpRequest& req); @@ -112,6 +118,10 @@ class HttpHandler { elio::coro::task elio_health_handler(elio::http::context& ctx); elio::coro::task elio_cluster_handler(elio::http::context& ctx); elio::coro::task elio_metrics_handler(elio::http::context& ctx); + elio::coro::task elio_sparse_create_handler(elio::http::context& ctx); + elio::coro::task elio_sparse_write_handler(elio::http::context& ctx); + elio::coro::task elio_sparse_finalize_handler(elio::http::context& ctx); + elio::coro::task elio_sparse_status_handler(elio::http::context& ctx); }; // HTTP server using Elio's async server @@ -147,6 +157,13 @@ HTTP API Design: GET /cache/{key}?range=0-1023 - Partial read (alternative to Range header) + # Sparse/Parallel Write API + POST /sparse/{key} - Create sparse entry (X-ElCache-Size header required) + PATCH /sparse/{key} - Write range (Content-Range header required) + POST /sparse/{key}/finalize - Finalize sparse entry + GET /sparse/{key} - Get sparse entry status + DELETE /sparse/{key} - Abort sparse write + GET /stats - Cache statistics GET /health - Health check GET /cluster - Cluster info @@ -158,6 +175,7 @@ HTTP API Design: X-ElCache-No-Memory: true - Skip memory cache X-ElCache-No-Disk: true - Skip disk cache X-ElCache-No-Cluster: true - Don't replicate + X-ElCache-Size: 1048576 - Total size for sparse entry creation Response Headers: X-ElCache-Hit: memory|disk|cluster|miss diff --git a/include/elcache/sparse_cache.hpp b/include/elcache/sparse_cache.hpp new file mode 100644 index 0000000..42e52d2 --- /dev/null +++ b/include/elcache/sparse_cache.hpp @@ -0,0 +1,135 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace elcache { + +// Sparse entry for position-based writes +// Tracks which byte ranges have been written +struct SparseEntry { + uint64_t total_size; + std::vector data; + std::vector written; // Track which byte ranges are written + bool finalized; + + explicit SparseEntry(uint64_t size) + : total_size(size) + , data(size, 0) + , written(size, false) + , finalized(false) + {} + + bool write_range(uint64_t offset, const uint8_t* buf, size_t len) { + if (offset + len > total_size) return false; + std::memcpy(data.data() + offset, buf, len); + for (size_t i = 0; i < len; i++) { + written[offset + i] = true; + } + return true; + } + + bool is_complete() const { + for (bool w : written) { + if (!w) return false; + } + return true; + } + + // Get percentage of bytes written + double completion_percent() const { + if (total_size == 0) return 100.0; + size_t count = 0; + for (bool w : written) { + if (w) count++; + } + return static_cast(count) / total_size * 100.0; + } +}; + +// Thread-safe sparse cache for tracking incomplete entries +class SparseCache { +public: + // Create a new sparse entry + // Returns false if entry already exists + bool create(const std::string& key, uint64_t total_size) { + std::unique_lock lock(mutex_); + if (entries_.find(key) != entries_.end()) { + return false; // Already exists + } + entries_.emplace(key, std::make_unique(total_size)); + return true; + } + + // Write data at a specific offset + // Returns false if entry doesn't exist or range is invalid + bool write_range(const std::string& key, uint64_t offset, const uint8_t* data, size_t len) { + std::unique_lock lock(mutex_); + auto it = entries_.find(key); + if (it == entries_.end()) return false; + return it->second->write_range(offset, data, len); + } + + // Finalize a sparse entry + // Returns: 0 = success (data moved to output), 1 = not found, 2 = incomplete + int finalize(const std::string& key, std::vector& out_data) { + std::unique_lock lock(mutex_); + auto it = entries_.find(key); + if (it == entries_.end()) return 1; // Not found + if (!it->second->is_complete()) return 2; // Not complete + + // Move data out + out_data = std::move(it->second->data); + entries_.erase(it); + return 0; // Success + } + + // Check if a sparse entry exists + bool exists(const std::string& key) const { + std::shared_lock lock(mutex_); + return entries_.find(key) != entries_.end(); + } + + // Get total size of a sparse entry + uint64_t get_total_size(const std::string& key) const { + std::shared_lock lock(mutex_); + auto it = entries_.find(key); + if (it == entries_.end()) return 0; + return it->second->total_size; + } + + // Get completion percentage + double get_completion(const std::string& key) const { + std::shared_lock lock(mutex_); + auto it = entries_.find(key); + if (it == entries_.end()) return 0.0; + return it->second->completion_percent(); + } + + // Remove a sparse entry without finalizing (abort) + bool remove(const std::string& key) { + std::unique_lock lock(mutex_); + return entries_.erase(key) > 0; + } + + // Get count of active sparse entries + size_t count() const { + std::shared_lock lock(mutex_); + return entries_.size(); + } + +private: + mutable std::shared_mutex mutex_; + std::unordered_map> entries_; +}; + +// Global sparse cache instance +SparseCache& global_sparse_cache(); + +} // namespace elcache diff --git a/include/elcache/unix_server.hpp b/include/elcache/unix_server.hpp new file mode 100644 index 0000000..7332e6c --- /dev/null +++ b/include/elcache/unix_server.hpp @@ -0,0 +1,94 @@ +#pragma once + +#include "types.hpp" +#include "cache_coordinator.hpp" +#include +#include +#include +#include +#include +#include +#include + +namespace elcache { + +// Protocol constants matching SDK +constexpr uint32_t PROTO_MAGIC = 0x454C4341; // "ELCA" +constexpr uint16_t PROTO_VERSION = 1; +constexpr size_t HEADER_SIZE = 24; + +// Message types +enum class MessageType : uint16_t { + Get = 0x0100, + GetResponse = 0x0101, + Put = 0x0102, + PutResponse = 0x0103, + Delete = 0x0104, + DeleteResponse = 0x0105, + Check = 0x0106, + CheckResponse = 0x0107, + // Sparse/partial write operations for parallel writes + CreateSparse = 0x0108, + CreateSparseResponse = 0x0109, + WriteRange = 0x010A, + WriteRangeResponse = 0x010B, + Finalize = 0x010C, + FinalizeResponse = 0x010D, +}; + +// Unix Socket Server for SDK clients +class UnixSocketServer { +public: + UnixSocketServer(const std::string& socket_path, CacheCoordinator& cache, + elio::io::io_context& ctx); + ~UnixSocketServer(); + + // Start listening and accepting connections + elio::coro::task start(elio::runtime::scheduler& sched); + + // Stop the server + void stop(); + + // Get socket path + const std::string& socket_path() const { return socket_path_; } + +private: + // Thread-based handlers using simple sync cache + void handle_client_thread(int fd); + bool process_request_thread(int fd); + void handle_get_thread(int fd, const uint8_t* payload, size_t len, uint32_t request_id); + void handle_put_thread(int fd, const uint8_t* payload, size_t len, uint32_t request_id); + void handle_delete_thread(int fd, const uint8_t* payload, size_t len, uint32_t request_id); + void handle_create_sparse_thread(int fd, const uint8_t* payload, size_t len, uint32_t request_id); + void handle_write_range_thread(int fd, const uint8_t* payload, size_t len, uint32_t request_id); + void handle_finalize_thread(int fd, const uint8_t* payload, size_t len, uint32_t request_id); + bool send_response_sync(int fd, MessageType type, uint32_t request_id, const uint8_t* payload, size_t len); + + // Legacy coroutine-based handlers (kept for API compatibility) + elio::coro::task handle_client_sync(int fd, elio::runtime::scheduler& sched); + elio::coro::task process_request_sync(int fd, elio::runtime::scheduler& sched); + elio::coro::task handle_get_sync(int fd, const uint8_t* payload, size_t len, uint32_t request_id); + elio::coro::task handle_put_sync(int fd, const uint8_t* payload, size_t len, uint32_t request_id); + elio::coro::task handle_delete_sync(int fd, const uint8_t* payload, size_t len, uint32_t request_id); + + elio::coro::task handle_client(elio::net::uds_stream stream); + elio::coro::task process_request(elio::net::uds_stream& stream); + elio::coro::task handle_get(elio::net::uds_stream& stream, const uint8_t* payload, size_t len, uint32_t request_id); + elio::coro::task handle_put(elio::net::uds_stream& stream, const uint8_t* payload, size_t len, uint32_t request_id); + elio::coro::task handle_delete(elio::net::uds_stream& stream, const uint8_t* payload, size_t len, uint32_t request_id); + elio::coro::task handle_create_sparse(elio::net::uds_stream& stream, const uint8_t* payload, size_t len, uint32_t request_id); + elio::coro::task handle_write_range(elio::net::uds_stream& stream, const uint8_t* payload, size_t len, uint32_t request_id); + elio::coro::task handle_finalize(elio::net::uds_stream& stream, const uint8_t* payload, size_t len, uint32_t request_id); + elio::coro::task send_response(elio::net::uds_stream& stream, MessageType type, uint32_t request_id, const uint8_t* payload, size_t len); + + // Async accept loop using Elio's uds_listener + elio::coro::task accept_loop_async(elio::runtime::scheduler& sched); + + std::string socket_path_; + CacheCoordinator& cache_; + elio::io::io_context& io_ctx_; + std::atomic running_{false}; + std::unique_ptr listener_; +}; + +} // namespace elcache diff --git a/src/cache/cache_coordinator.cpp b/src/cache/cache_coordinator.cpp index d87272e..90da3e0 100644 --- a/src/cache/cache_coordinator.cpp +++ b/src/cache/cache_coordinator.cpp @@ -104,6 +104,7 @@ elio::coro::task CacheCoordinator::put( const WriteOptions& opts) { Status status = Status::make_ok(); + bool wrote_to_storage = false; // Write to memory cache if (memory_ && !opts.no_memory) { @@ -111,6 +112,7 @@ elio::coro::task CacheCoordinator::put( if (!status) { co_return status; } + wrote_to_storage = true; } // Write to disk cache @@ -119,6 +121,7 @@ elio::coro::task CacheCoordinator::put( if (!status) { co_return status; } + wrote_to_storage = true; } // Replicate to cluster @@ -138,6 +141,11 @@ elio::coro::task CacheCoordinator::put( } } + // Return error if no storage was available + if (!wrote_to_storage) { + co_return Status::error(ErrorCode::InternalError, "No storage available"); + } + co_return Status::make_ok(); } diff --git a/src/cache/disk_cache.cpp b/src/cache/disk_cache.cpp index 5369dd6..361f9de 100644 --- a/src/cache/disk_cache.cpp +++ b/src/cache/disk_cache.cpp @@ -110,6 +110,9 @@ elio::coro::task DiskStore::write_file( co_return Status::error(ErrorCode::DiskError, "Failed to open file for writing"); } + // Ensure file is readable regardless of umask + fchmod(fd, 0644); + // Pre-allocate if supported #ifdef __linux__ if (config_.use_fallocate) { diff --git a/src/cache/sparse_cache.cpp b/src/cache/sparse_cache.cpp new file mode 100644 index 0000000..23d064c --- /dev/null +++ b/src/cache/sparse_cache.cpp @@ -0,0 +1,11 @@ +#include "elcache/sparse_cache.hpp" + +namespace elcache { + +// Global sparse cache instance - thread-safe singleton +SparseCache& global_sparse_cache() { + static SparseCache instance; + return instance; +} + +} // namespace elcache diff --git a/src/http/http_api.cpp b/src/http/http_api.cpp index 21fcffc..fcc681c 100644 --- a/src/http/http_api.cpp +++ b/src/http/http_api.cpp @@ -1,6 +1,7 @@ #include "elcache/http_api.hpp" #include "elcache/cache_coordinator.hpp" #include "elcache/cluster.hpp" +#include "elcache/sparse_cache.hpp" #include #include #include @@ -512,6 +513,184 @@ elio::coro::task HttpHandler::handle_metrics(const HttpRequest& re co_return resp; } +// Sparse/Partial Write Handlers + +elio::coro::task HttpHandler::handle_sparse_create(const HttpRequest& req) { + // Extract key from path: /sparse/{key} + std::string key = req.path.substr(8); // Remove "/sparse/" + if (key.empty()) { + co_return HttpResponse::bad_request("Missing key"); + } + + // Get total size from header + auto size_header = req.header("X-ElCache-Size"); + if (size_header.empty()) { + co_return HttpResponse::bad_request("X-ElCache-Size header required"); + } + + uint64_t total_size; + try { + total_size = std::stoull(size_header); + } catch (...) { + co_return HttpResponse::bad_request("Invalid X-ElCache-Size value"); + } + + if (total_size == 0) { + co_return HttpResponse::bad_request("Size must be greater than 0"); + } + + // Create sparse entry + bool created = global_sparse_cache().create(key, total_size); + + if (!created) { + HttpResponse resp; + resp.status_code = 409; // Conflict + resp.status_text = "Conflict"; + std::string msg = "Sparse entry already exists"; + resp.body = ByteBuffer(msg.begin(), msg.end()); + resp.set_content_type("text/plain"); + co_return resp; + } + + HttpResponse resp = HttpResponse::created(); + resp.headers["X-ElCache-Size"] = std::to_string(total_size); + co_return resp; +} + +elio::coro::task HttpHandler::handle_sparse_write(const HttpRequest& req) { + // Extract key from path: /sparse/{key} + std::string key = req.path.substr(8); // Remove "/sparse/" + if (key.empty()) { + co_return HttpResponse::bad_request("Missing key"); + } + + // Parse Content-Range header: "bytes start-end/total" or query param ?offset= + uint64_t offset = 0; + + auto content_range = req.header("Content-Range"); + if (!content_range.empty()) { + // Parse "bytes start-end/total" + auto bytes_pos = content_range.find("bytes "); + if (bytes_pos != std::string::npos) { + auto range_part = content_range.substr(bytes_pos + 6); + auto dash_pos = range_part.find('-'); + if (dash_pos != std::string::npos) { + try { + offset = std::stoull(range_part.substr(0, dash_pos)); + } catch (...) { + co_return HttpResponse::bad_request("Invalid Content-Range"); + } + } + } + } else { + // Try query parameter ?offset= + auto offset_pos = req.query.find("offset="); + if (offset_pos != std::string::npos) { + try { + offset = std::stoull(req.query.substr(offset_pos + 7)); + } catch (...) { + co_return HttpResponse::bad_request("Invalid offset parameter"); + } + } + } + + if (req.body.empty()) { + co_return HttpResponse::bad_request("Request body is empty"); + } + + // Write to sparse entry + bool success = global_sparse_cache().write_range(key, offset, req.body.data(), req.body.size()); + + if (!success) { + if (!global_sparse_cache().exists(key)) { + co_return HttpResponse::not_found("Sparse entry not found"); + } + co_return HttpResponse::bad_request("Invalid range (offset + size exceeds total)"); + } + + HttpResponse resp; + resp.status_code = 202; // Accepted + resp.status_text = "Accepted"; + resp.headers["X-ElCache-Completion"] = std::to_string(global_sparse_cache().get_completion(key)) + "%"; + co_return resp; +} + +elio::coro::task HttpHandler::handle_sparse_finalize(const HttpRequest& req) { + // Extract key from path: /sparse/{key}/finalize + std::string path = req.path; + auto finalize_pos = path.rfind("/finalize"); + if (finalize_pos == std::string::npos) { + co_return HttpResponse::bad_request("Invalid path"); + } + + std::string key = path.substr(8, finalize_pos - 8); // Remove "/sparse/" and "/finalize" + if (key.empty()) { + co_return HttpResponse::bad_request("Missing key"); + } + + // Finalize sparse entry + std::vector data; + int result = global_sparse_cache().finalize(key, data); + + if (result == 1) { + co_return HttpResponse::not_found("Sparse entry not found"); + } + + if (result == 2) { + HttpResponse resp; + resp.status_code = 409; // Conflict + resp.status_text = "Conflict"; + std::string msg = "Not all ranges written"; + resp.body = ByteBuffer(msg.begin(), msg.end()); + resp.set_content_type("text/plain"); + resp.headers["X-ElCache-Completion"] = std::to_string(global_sparse_cache().get_completion(key)) + "%"; + co_return resp; + } + + // Store in main cache + WriteOptions opts; + auto ttl_header = req.header("X-ElCache-TTL"); + if (!ttl_header.empty()) { + opts.ttl = std::chrono::seconds(std::stoll(ttl_header)); + } + + auto status = co_await cache_.put(CacheKey(key), ByteBuffer(data.begin(), data.end()), opts); + + if (!status) { + co_return HttpResponse::internal_error("Failed to store finalized data: " + status.message()); + } + + co_return HttpResponse::created(); +} + +elio::coro::task HttpHandler::handle_sparse_status(const HttpRequest& req) { + // Extract key from path: /sparse/{key} + std::string key = req.path.substr(8); // Remove "/sparse/" + if (key.empty()) { + co_return HttpResponse::bad_request("Missing key"); + } + + if (!global_sparse_cache().exists(key)) { + co_return HttpResponse::not_found("Sparse entry not found"); + } + + uint64_t total_size = global_sparse_cache().get_total_size(key); + double completion = global_sparse_cache().get_completion(key); + + std::ostringstream json; + json << "{\n"; + json << " \"key\": \"" << key << "\",\n"; + json << " \"total_size\": " << total_size << ",\n"; + json << " \"completion_percent\": " << completion << ",\n"; + json << " \"complete\": " << (completion >= 100.0 ? "true" : "false") << "\n"; + json << "}\n"; + + auto body_str = json.str(); + HttpResponse resp = HttpResponse::ok(ByteBuffer(body_str.begin(), body_str.end())); + resp.set_content_type("application/json"); + co_return resp; +} + // Elio handler adapters - wrap internal handlers for Elio's router elio::coro::task HttpHandler::elio_get_handler(elio::http::context& ctx) { @@ -568,6 +747,30 @@ elio::coro::task HttpHandler::elio_metrics_handler(elio::h co_return resp.to_elio_response(); } +elio::coro::task HttpHandler::elio_sparse_create_handler(elio::http::context& ctx) { + auto req = HttpRequest::from_context(ctx); + auto resp = co_await handle_sparse_create(req); + co_return resp.to_elio_response(); +} + +elio::coro::task HttpHandler::elio_sparse_write_handler(elio::http::context& ctx) { + auto req = HttpRequest::from_context(ctx); + auto resp = co_await handle_sparse_write(req); + co_return resp.to_elio_response(); +} + +elio::coro::task HttpHandler::elio_sparse_finalize_handler(elio::http::context& ctx) { + auto req = HttpRequest::from_context(ctx); + auto resp = co_await handle_sparse_finalize(req); + co_return resp.to_elio_response(); +} + +elio::coro::task HttpHandler::elio_sparse_status_handler(elio::http::context& ctx) { + auto req = HttpRequest::from_context(ctx); + auto resp = co_await handle_sparse_status(req); + co_return resp.to_elio_response(); +} + elio::http::router HttpHandler::build_router() { elio::http::router router; @@ -617,6 +820,40 @@ elio::http::router HttpHandler::build_router() { return elio_cluster_handler(ctx); }); + // Sparse/Partial write endpoints + // POST /sparse/{key} - Create sparse entry + router.post("/sparse/:key", [this](elio::http::context& ctx) { + return elio_sparse_create_handler(ctx); + }); + + // PATCH /sparse/{key} - Write range to sparse entry + router.add_route(elio::http::method::PATCH, "/sparse/:key", [this](elio::http::context& ctx) { + return elio_sparse_write_handler(ctx); + }); + + // PUT /sparse/{key} - Alternative to PATCH for range write + router.put("/sparse/:key", [this](elio::http::context& ctx) { + return elio_sparse_write_handler(ctx); + }); + + // POST /sparse/{key}/finalize - Finalize sparse entry + router.post("/sparse/:key/finalize", [this](elio::http::context& ctx) { + return elio_sparse_finalize_handler(ctx); + }); + + // GET /sparse/{key} - Get sparse entry status + router.get("/sparse/:key", [this](elio::http::context& ctx) { + return elio_sparse_status_handler(ctx); + }); + + // DELETE /sparse/{key} - Abort sparse write (use sync handler) + router.del("/sparse/:key", [](elio::http::context& ctx) -> elio::http::response { + auto path = std::string(ctx.req().path()); + std::string key = path.substr(8); // Remove "/sparse/" + global_sparse_cache().remove(key); + return elio::http::response(elio::http::status::no_content); + }); + return router; } diff --git a/src/main.cpp b/src/main.cpp index 1dd1cfd..8c49e7a 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -21,6 +21,7 @@ void print_usage(const char* prog) { << "Options:\n" << " -c, --config Configuration file path\n" << " -p, --port HTTP port (default: 8080)\n" + << " -u, --unix Unix socket path (default: /var/run/elcache/elcache.sock)\n" << " -m, --memory Memory cache size in MB (default: 1024)\n" << " -d, --disk Disk cache directory\n" << " -D, --disk-size Disk cache size in GB (default: 100)\n" @@ -72,6 +73,9 @@ int main(int argc, char* argv[]) { else if ((arg == "-s" || arg == "--seed") && i + 1 < argc) { config.cluster.seed_nodes.push_back(argv[++i]); } + else if ((arg == "-u" || arg == "--unix") && i + 1 < argc) { + config.network.unix_socket_path = argv[++i]; + } else { std::cerr << "Unknown option: " << arg << "\n"; print_usage(argv[0]); @@ -108,6 +112,7 @@ int main(int argc, char* argv[]) { << " GB at " << config.disk.path << "\n"; } std::cout << " HTTP port: " << config.network.http_port << "\n"; + std::cout << " Unix socket: " << config.network.unix_socket_path << "\n"; std::cout << " Cluster port: " << config.network.cluster_port << "\n"; if (!config.cluster.seed_nodes.empty()) { @@ -217,6 +222,10 @@ ElCacheServer::ElCacheServer(const Config& config) http_handler_ = std::make_unique(*coordinator_); http_handler_->set_metrics_collector(metrics_collector_.get()); http_server_ = std::make_unique(config.network, *http_handler_); + + // Setup Unix socket server for SDK clients + unix_server_ = std::make_unique( + config.network.unix_socket_path, *coordinator_, io_ctx_); } ElCacheServer::~ElCacheServer() = default; @@ -241,12 +250,20 @@ elio::coro::task ElCacheServer::start(elio::runtime::scheduler& sched) { co_return status; } + // Start Unix socket server for SDK clients + auto unix_task = unix_server_->start(sched); + sched.spawn(unix_task.release()); + co_return Status::make_ok(); } elio::coro::task ElCacheServer::stop() { running_ = false; + if (unix_server_) { + unix_server_->stop(); + } + co_await http_server_->stop(); if (cluster_) { diff --git a/src/network/unix_server.cpp b/src/network/unix_server.cpp new file mode 100644 index 0000000..16735fd --- /dev/null +++ b/src/network/unix_server.cpp @@ -0,0 +1,931 @@ +#include "elcache/unix_server.hpp" +#include "elcache/sparse_cache.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace elcache { + +namespace { + +// Debug helper to check fd info +void debug_fd_info(int fd, const char* label) { + struct stat st; + if (fstat(fd, &st) == 0) { + const char* type = "unknown"; + if (S_ISSOCK(st.st_mode)) type = "socket"; + else if (S_ISREG(st.st_mode)) type = "regular file"; + else if (S_ISFIFO(st.st_mode)) type = "fifo"; + else if (S_ISCHR(st.st_mode)) type = "char device"; + + int flags = fcntl(fd, F_GETFL); + bool nonblock = (flags & O_NONBLOCK) != 0; + + std::cerr << "[UDS DEBUG] " << label << ": fd=" << fd + << " type=" << type + << " nonblock=" << (nonblock ? "yes" : "no") + << " flags=0x" << std::hex << flags << std::dec << "\n"; + } else { + std::cerr << "[UDS DEBUG] " << label << ": fd=" << fd + << " fstat failed: " << strerror(errno) << "\n"; + } +} + +// Simple synchronous in-memory cache for Unix socket server +// Uses standard mutexes instead of async ones +class SyncCache { +public: + void put(const std::string& key, const std::vector& value) { + std::unique_lock lock(mutex_); + data_[key] = value; + } + + bool get(const std::string& key, std::vector& value) { + std::shared_lock lock(mutex_); + auto it = data_.find(key); + if (it == data_.end()) return false; + value = it->second; + return true; + } + + bool remove(const std::string& key) { + std::unique_lock lock(mutex_); + return data_.erase(key) > 0; + } + + bool exists(const std::string& key) { + std::shared_lock lock(mutex_); + return data_.find(key) != data_.end(); + } + +private: + std::shared_mutex mutex_; + std::unordered_map> data_; +}; + +// Global sync cache instance for Unix socket server +static SyncCache g_sync_cache; + +// Little-endian encoding/decoding helpers +inline void encode_u16(uint8_t* buf, uint16_t v) { + buf[0] = v & 0xFF; + buf[1] = (v >> 8) & 0xFF; +} + +inline void encode_u32(uint8_t* buf, uint32_t v) { + buf[0] = v & 0xFF; + buf[1] = (v >> 8) & 0xFF; + buf[2] = (v >> 16) & 0xFF; + buf[3] = (v >> 24) & 0xFF; +} + +inline void encode_u64(uint8_t* buf, uint64_t v) { + for (int i = 0; i < 8; i++) { + buf[i] = (v >> (i * 8)) & 0xFF; + } +} + +inline uint16_t decode_u16(const uint8_t* buf) { + return buf[0] | (static_cast(buf[1]) << 8); +} + +inline uint32_t decode_u32(const uint8_t* buf) { + return buf[0] | (static_cast(buf[1]) << 8) | + (static_cast(buf[2]) << 16) | (static_cast(buf[3]) << 24); +} + +inline uint64_t decode_u64(const uint8_t* buf) { + uint64_t v = 0; + for (int i = 0; i < 8; i++) { + v |= static_cast(buf[i]) << (i * 8); + } + return v; +} + +// Encode message header +void encode_header(uint8_t* buf, MessageType type, uint32_t payload_len, uint32_t request_id) { + encode_u32(buf, PROTO_MAGIC); + encode_u16(buf + 4, PROTO_VERSION); + encode_u16(buf + 6, static_cast(type)); + encode_u32(buf + 8, payload_len); + encode_u32(buf + 12, request_id); + encode_u64(buf + 16, static_cast(time(nullptr)) * 1000); +} + +// Blocking read helper +bool read_all(int fd, void* buf, size_t len) { + uint8_t* ptr = static_cast(buf); + size_t remaining = len; + while (remaining > 0) { + ssize_t n = ::recv(fd, ptr, remaining, 0); + if (n <= 0) return false; + ptr += n; + remaining -= n; + } + return true; +} + +// Blocking write helper +bool write_all(int fd, const void* buf, size_t len) { + const uint8_t* ptr = static_cast(buf); + size_t remaining = len; + while (remaining > 0) { + ssize_t n = ::send(fd, ptr, remaining, 0); + if (n <= 0) return false; + ptr += n; + remaining -= n; + } + return true; +} + +} // anonymous namespace + +UnixSocketServer::UnixSocketServer(const std::string& socket_path, + CacheCoordinator& cache, + elio::io::io_context& ctx) + : socket_path_(socket_path) + , cache_(cache) + , io_ctx_(ctx) +{} + +UnixSocketServer::~UnixSocketServer() { + stop(); +} + +elio::coro::task UnixSocketServer::start(elio::runtime::scheduler& sched) { + std::cerr << "[UDS DEBUG] UnixSocketServer::start() called for " << socket_path_ << std::endl; + running_ = true; + + // Ensure parent directory exists + auto parent = std::filesystem::path(socket_path_).parent_path(); + if (!parent.empty()) { + std::error_code ec; + std::filesystem::create_directories(parent, ec); + if (ec) { + co_return Status::error(ErrorCode::DiskError, + "Failed to create socket directory: " + ec.message()); + } + } + + // Try using Elio's native async UDS first + elio::net::unix_address addr(socket_path_); + auto listener_opt = elio::net::uds_listener::bind(addr, io_ctx_); + + if (listener_opt) { + std::cout << "[ElCache] Unix socket (async) listening on " << socket_path_ << std::endl; + std::cerr << "[UDS DEBUG] Async listener fd=" << listener_opt->fd() << std::endl; + + // Store listener and run async accept loop + listener_ = std::make_unique(std::move(*listener_opt)); + debug_fd_info(listener_->fd(), "listener socket"); + + // Spawn async accept loop + auto accept_task = accept_loop_async(sched); + sched.spawn(accept_task.release()); + + co_return Status::make_ok(); + } + + // Fall back to thread-based approach + std::cerr << "[UDS DEBUG] Failed to bind async listener: " << strerror(errno) + << ", falling back to thread-based" << std::endl; + + // Unlink existing socket + ::unlink(socket_path_.c_str()); + + // Create socket directly + int listen_fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (listen_fd < 0) { + co_return Status::error(ErrorCode::NetworkError, + "Failed to create Unix socket: " + std::string(strerror(errno))); + } + + debug_fd_info(listen_fd, "thread-based listener"); + + // Bind + struct sockaddr_un saddr; + memset(&saddr, 0, sizeof(saddr)); + saddr.sun_family = AF_UNIX; + strncpy(saddr.sun_path, socket_path_.c_str(), sizeof(saddr.sun_path) - 1); + + if (::bind(listen_fd, (struct sockaddr*)&saddr, sizeof(saddr)) < 0) { + ::close(listen_fd); + co_return Status::error(ErrorCode::NetworkError, + "Failed to bind Unix socket: " + std::string(strerror(errno))); + } + + // Listen + if (::listen(listen_fd, 128) < 0) { + ::close(listen_fd); + co_return Status::error(ErrorCode::NetworkError, + "Failed to listen on Unix socket: " + std::string(strerror(errno))); + } + + std::cout << "[ElCache] Unix socket (threaded) listening on " << socket_path_ << std::endl; + + // Accept loop in separate thread + std::thread accept_thread([this, listen_fd]() { + while (running_) { + struct pollfd pfd; + pfd.fd = listen_fd; + pfd.events = POLLIN; + + int ret = poll(&pfd, 1, 100); + if (ret <= 0) continue; + + struct sockaddr_un client_addr; + socklen_t addr_len = sizeof(client_addr); + int client_fd = accept(listen_fd, (struct sockaddr*)&client_addr, &addr_len); + if (client_fd < 0) continue; + + debug_fd_info(client_fd, "accepted client (threaded)"); + + // Handle client in a separate thread + std::thread client_thread([this, client_fd]() { + handle_client_thread(client_fd); + }); + client_thread.detach(); + } + + ::close(listen_fd); + ::unlink(socket_path_.c_str()); + }); + accept_thread.detach(); + + co_return Status::make_ok(); +} + +elio::coro::task UnixSocketServer::accept_loop_async(elio::runtime::scheduler& sched) { + while (running_ && listener_) { + auto client_opt = co_await listener_->accept(); + + if (!client_opt) { + std::cerr << "[UDS DEBUG] accept failed: " << strerror(errno) << "\n"; + continue; + } + + int client_fd = client_opt->fd(); + debug_fd_info(client_fd, "accepted client (async)"); + + // Spawn handler for this client + auto handler = handle_client(std::move(*client_opt)); + sched.spawn(handler.release()); + } +} + +void UnixSocketServer::stop() { + running_ = false; + if (listener_) { + listener_->close(); + listener_.reset(); + } +} + +void UnixSocketServer::handle_client_thread(int fd) { + while (running_) { + if (!process_request_thread(fd)) break; + } + ::close(fd); +} + +bool UnixSocketServer::process_request_thread(int fd) { + uint8_t header[HEADER_SIZE]; + if (!read_all(fd, header, HEADER_SIZE)) { + return false; + } + + uint32_t magic = decode_u32(header); + if (magic != PROTO_MAGIC) { + return false; + } + + uint16_t msg_type = decode_u16(header + 6); + uint32_t payload_len = decode_u32(header + 8); + uint32_t request_id = decode_u32(header + 12); + + std::vector payload(payload_len); + if (payload_len > 0 && !read_all(fd, payload.data(), payload_len)) { + return false; + } + + switch (static_cast(msg_type)) { + case MessageType::Get: + handle_get_thread(fd, payload.data(), payload.size(), request_id); + break; + case MessageType::Put: + handle_put_thread(fd, payload.data(), payload.size(), request_id); + break; + case MessageType::Delete: + handle_delete_thread(fd, payload.data(), payload.size(), request_id); + break; + case MessageType::CreateSparse: + handle_create_sparse_thread(fd, payload.data(), payload.size(), request_id); + break; + case MessageType::WriteRange: + handle_write_range_thread(fd, payload.data(), payload.size(), request_id); + break; + case MessageType::Finalize: + handle_finalize_thread(fd, payload.data(), payload.size(), request_id); + break; + default: + break; + } + + return true; +} + +void UnixSocketServer::handle_get_thread( + int fd, + const uint8_t* payload, size_t len, + uint32_t request_id) +{ + if (len < 4) return; + + uint32_t key_len = decode_u32(payload); + if (len < 4 + key_len + 2) return; + + std::string key(reinterpret_cast(payload + 4), key_len); + size_t pos = 4 + key_len; + + // Parse metadata_only flag + pos++; // Skip metadata_only for now + + // Parse range + uint8_t has_range = payload[pos++]; + uint64_t offset = 0; + uint64_t length = 0; + if (has_range && len >= pos + 16) { + offset = decode_u64(payload + pos); + pos += 8; + length = decode_u64(payload + pos); + pos += 8; + } + + // Use sync cache + std::vector value; + bool found = g_sync_cache.get(key, value); + + // Apply range if specified + std::vector result_data; + if (found && has_range) { + if (offset < value.size()) { + size_t available = value.size() - offset; + size_t to_copy = (length > 0 && length < available) ? length : available; + result_data.assign(value.begin() + offset, value.begin() + offset + to_copy); + } + } else if (found) { + result_data = std::move(value); + } + + // Build response + std::vector response; + response.reserve(64 + result_data.size()); + + uint32_t result_code = found ? 0 : 1; // 0=Hit, 1=Miss + + response.resize(4); + encode_u32(response.data(), result_code); + + // No metadata + response.push_back(0); + + // Data length and data + size_t data_pos = response.size(); + response.resize(data_pos + 4 + result_data.size()); + encode_u32(response.data() + data_pos, static_cast(result_data.size())); + if (!result_data.empty()) { + std::memcpy(response.data() + data_pos + 4, result_data.data(), result_data.size()); + } + + send_response_sync(fd, MessageType::GetResponse, request_id, + response.data(), response.size()); +} + +void UnixSocketServer::handle_put_thread( + int fd, + const uint8_t* payload, size_t len, + uint32_t request_id) +{ + if (len < 8) return; + + uint32_t key_len = decode_u32(payload); + if (len < 4 + key_len + 4) return; + + std::string key(reinterpret_cast(payload + 4), key_len); + size_t pos = 4 + key_len; + + uint32_t value_len = decode_u32(payload + pos); + pos += 4; + if (len < pos + value_len) return; + + std::vector value(payload + pos, payload + pos + value_len); + + // Use sync cache + g_sync_cache.put(key, value); + + // Build response - success + uint8_t response[4]; + encode_u32(response, 0); // 0 = success + + send_response_sync(fd, MessageType::PutResponse, request_id, + response, sizeof(response)); +} + +void UnixSocketServer::handle_delete_thread( + int fd, + const uint8_t* payload, size_t len, + uint32_t request_id) +{ + if (len < 4) return; + + uint32_t key_len = decode_u32(payload); + if (len < 4 + key_len) return; + + std::string key(reinterpret_cast(payload + 4), key_len); + + // Use sync cache + g_sync_cache.remove(key); + + // Build response - success + uint8_t response[4]; + encode_u32(response, 0); + + send_response_sync(fd, MessageType::DeleteResponse, request_id, + response, sizeof(response)); +} + +void UnixSocketServer::handle_create_sparse_thread( + int fd, + const uint8_t* payload, size_t len, + uint32_t request_id) +{ + if (len < 4) return; + + uint32_t key_len = decode_u32(payload); + if (len < 4 + key_len + 8) return; + + std::string key(reinterpret_cast(payload + 4), key_len); + size_t pos = 4 + key_len; + + uint64_t total_size = decode_u64(payload + pos); + + bool created = global_sparse_cache().create(key, total_size); + + uint8_t response[4]; + encode_u32(response, created ? 0 : 1); + + send_response_sync(fd, MessageType::CreateSparseResponse, request_id, + response, sizeof(response)); +} + +void UnixSocketServer::handle_write_range_thread( + int fd, + const uint8_t* payload, size_t len, + uint32_t request_id) +{ + if (len < 4) return; + + uint32_t key_len = decode_u32(payload); + if (len < 4 + key_len + 8 + 4) return; + + std::string key(reinterpret_cast(payload + 4), key_len); + size_t pos = 4 + key_len; + + uint64_t offset = decode_u64(payload + pos); + pos += 8; + + uint32_t data_len = decode_u32(payload + pos); + pos += 4; + + if (len < pos + data_len) return; + + bool success = global_sparse_cache().write_range(key, offset, payload + pos, data_len); + + uint8_t response[4]; + if (!success) { + if (!global_sparse_cache().exists(key)) { + encode_u32(response, 1); + } else { + encode_u32(response, 2); + } + } else { + encode_u32(response, 0); + } + + send_response_sync(fd, MessageType::WriteRangeResponse, request_id, + response, sizeof(response)); +} + +void UnixSocketServer::handle_finalize_thread( + int fd, + const uint8_t* payload, size_t len, + uint32_t request_id) +{ + if (len < 4) return; + + uint32_t key_len = decode_u32(payload); + if (len < 4 + key_len) return; + + std::string key(reinterpret_cast(payload + 4), key_len); + + std::vector data; + int result = global_sparse_cache().finalize(key, data); + + // If successful, store in sync cache + if (result == 0) { + g_sync_cache.put(key, data); + } + + uint8_t response[4]; + encode_u32(response, result); + + send_response_sync(fd, MessageType::FinalizeResponse, request_id, + response, sizeof(response)); +} + +bool UnixSocketServer::send_response_sync( + int fd, + MessageType type, uint32_t request_id, + const uint8_t* payload, size_t len) +{ + std::vector msg(HEADER_SIZE + len); + encode_header(msg.data(), type, static_cast(len), request_id); + if (len > 0) { + std::memcpy(msg.data() + HEADER_SIZE, payload, len); + } + + return write_all(fd, msg.data(), msg.size()); +} + +// Async client handler using Elio's uds_stream +elio::coro::task UnixSocketServer::handle_client(elio::net::uds_stream stream) { + int fd = stream.fd(); + std::cerr << "[UDS DEBUG] handle_client starting for fd=" << fd << "\n"; + + while (running_) { + bool ok = co_await process_request(stream); + if (!ok) { + std::cerr << "[UDS DEBUG] process_request returned false for fd=" << fd << "\n"; + break; + } + } + + std::cerr << "[UDS DEBUG] handle_client ending for fd=" << fd << "\n"; + co_await stream.close(); +} + +elio::coro::task UnixSocketServer::process_request(elio::net::uds_stream& stream) { + uint8_t header[HEADER_SIZE]; + int fd = stream.fd(); + + std::cerr << "[UDS DEBUG] process_request starting read on fd=" << fd << "\n"; + + // Read header + size_t total_read = 0; + while (total_read < HEADER_SIZE) { + std::cerr << "[UDS DEBUG] calling stream.read(), total_read=" << total_read << "\n"; + auto result = co_await stream.read(header + total_read, HEADER_SIZE - total_read); + std::cerr << "[UDS DEBUG] stream.read() returned: result=" << result.result + << " flags=" << result.flags << "\n"; + if (result.result <= 0) { + std::cerr << "[UDS DEBUG] read header failed: result=" << result.result + << " errno=" << errno << " (" << strerror(errno) << ")\n"; + co_return false; + } + total_read += result.result; + } + + uint32_t magic = decode_u32(header); + if (magic != PROTO_MAGIC) { + std::cerr << "[UDS DEBUG] bad magic: 0x" << std::hex << magic << std::dec << "\n"; + co_return false; + } + + uint16_t msg_type = decode_u16(header + 6); + uint32_t payload_len = decode_u32(header + 8); + uint32_t request_id = decode_u32(header + 12); + + std::cerr << "[UDS DEBUG] header parsed: msg_type=0x" << std::hex << msg_type + << std::dec << " payload_len=" << payload_len + << " request_id=" << request_id << "\n"; + + std::vector payload(payload_len); + if (payload_len > 0) { + std::cerr << "[UDS DEBUG] reading payload of " << payload_len << " bytes\n"; + total_read = 0; + while (total_read < payload_len) { + auto result = co_await stream.read(payload.data() + total_read, payload_len - total_read); + if (result.result <= 0) { + std::cerr << "[UDS DEBUG] read payload failed: result=" << result.result << "\n"; + co_return false; + } + total_read += result.result; + std::cerr << "[UDS DEBUG] payload read progress: " << total_read << "/" << payload_len << "\n"; + } + } + + switch (static_cast(msg_type)) { + case MessageType::Get: + co_await handle_get(stream, payload.data(), payload.size(), request_id); + break; + case MessageType::Put: + co_await handle_put(stream, payload.data(), payload.size(), request_id); + break; + case MessageType::Delete: + co_await handle_delete(stream, payload.data(), payload.size(), request_id); + break; + case MessageType::CreateSparse: + co_await handle_create_sparse(stream, payload.data(), payload.size(), request_id); + break; + case MessageType::WriteRange: + co_await handle_write_range(stream, payload.data(), payload.size(), request_id); + break; + case MessageType::Finalize: + co_await handle_finalize(stream, payload.data(), payload.size(), request_id); + break; + default: + std::cerr << "[UDS DEBUG] unknown message type: 0x" << std::hex << msg_type << std::dec << "\n"; + break; + } + + co_return true; +} + +elio::coro::task UnixSocketServer::handle_get( + elio::net::uds_stream& stream, + const uint8_t* payload, size_t len, + uint32_t request_id) +{ + if (len < 4) co_return; + + uint32_t key_len = decode_u32(payload); + if (len < 4 + key_len + 2) co_return; + + std::string key(reinterpret_cast(payload + 4), key_len); + size_t pos = 4 + key_len; + + // Parse metadata_only flag + // uint8_t metadata_only = payload[pos++]; + pos++; // Skip metadata_only for now + + // Parse range + uint8_t has_range = payload[pos++]; + uint64_t offset = 0; + uint64_t length = 0; + if (has_range && len >= pos + 16) { + offset = decode_u64(payload + pos); + pos += 8; + length = decode_u64(payload + pos); + pos += 8; + } + + // Use sync cache (thread-safe) + std::vector value; + bool found = g_sync_cache.get(key, value); + + // Apply range if specified + std::vector result_data; + if (found && has_range) { + if (offset < value.size()) { + size_t available = value.size() - offset; + size_t to_copy = (length > 0 && length < available) ? length : available; + result_data.assign(value.begin() + offset, value.begin() + offset + to_copy); + } + // If offset >= size, result_data stays empty + } else if (found) { + result_data = std::move(value); + } + + // Build response + std::vector response; + response.reserve(64 + result_data.size()); + + uint32_t result_code = found ? 0 : 1; + response.resize(4); + encode_u32(response.data(), result_code); + + // No metadata + response.push_back(0); + + // Data length and data + size_t data_pos = response.size(); + response.resize(data_pos + 4 + result_data.size()); + encode_u32(response.data() + data_pos, static_cast(result_data.size())); + if (!result_data.empty()) { + std::memcpy(response.data() + data_pos + 4, result_data.data(), result_data.size()); + } + + co_await send_response(stream, MessageType::GetResponse, request_id, + response.data(), response.size()); +} + +elio::coro::task UnixSocketServer::handle_put( + elio::net::uds_stream& stream, + const uint8_t* payload, size_t len, + uint32_t request_id) +{ + if (len < 8) co_return; + + uint32_t key_len = decode_u32(payload); + if (len < 4 + key_len + 4) co_return; + + std::string key(reinterpret_cast(payload + 4), key_len); + size_t pos = 4 + key_len; + + uint32_t value_len = decode_u32(payload + pos); + pos += 4; + if (len < pos + value_len) co_return; + + std::vector value(payload + pos, payload + pos + value_len); + + // Use sync cache + g_sync_cache.put(key, value); + + // Build response - success + uint8_t response[4]; + encode_u32(response, 0); + + co_await send_response(stream, MessageType::PutResponse, request_id, + response, sizeof(response)); +} + +elio::coro::task UnixSocketServer::handle_delete( + elio::net::uds_stream& stream, + const uint8_t* payload, size_t len, + uint32_t request_id) +{ + if (len < 4) co_return; + + uint32_t key_len = decode_u32(payload); + if (len < 4 + key_len) co_return; + + std::string key(reinterpret_cast(payload + 4), key_len); + + // Use sync cache + g_sync_cache.remove(key); + + // Build response - success + uint8_t response[4]; + encode_u32(response, 0); + + co_await send_response(stream, MessageType::DeleteResponse, request_id, + response, sizeof(response)); +} + +elio::coro::task UnixSocketServer::handle_create_sparse( + elio::net::uds_stream& stream, + const uint8_t* payload, size_t len, + uint32_t request_id) +{ + std::cerr << "[UDS DEBUG] handle_create_sparse: len=" << len << "\n"; + + // Parse: key_len(4) + key + total_size(8) + has_ttl(1) + [ttl(8)] + flags(4) + if (len < 4) { + std::cerr << "[UDS DEBUG] handle_create_sparse: len < 4, returning\n"; + co_return; + } + + uint32_t key_len = decode_u32(payload); + std::cerr << "[UDS DEBUG] handle_create_sparse: key_len=" << key_len << "\n"; + + if (len < 4 + key_len + 8) { + std::cerr << "[UDS DEBUG] handle_create_sparse: len < 4+key_len+8, returning\n"; + co_return; + } + + std::string key(reinterpret_cast(payload + 4), key_len); + size_t pos = 4 + key_len; + + uint64_t total_size = decode_u64(payload + pos); + std::cerr << "[UDS DEBUG] handle_create_sparse: key='" << key << "' total_size=" << total_size << "\n"; + pos += 8; + + // Skip TTL and flags for now (just for creating the entry) + + // Create sparse entry + std::cerr << "[UDS DEBUG] handle_create_sparse: calling global_sparse_cache().create\n"; + bool created = global_sparse_cache().create(key, total_size); + std::cerr << "[UDS DEBUG] handle_create_sparse: created=" << created << "\n"; + + // Build response + uint8_t response[4]; + encode_u32(response, created ? 0 : 1); // 0 = success, 1 = already exists + + std::cerr << "[UDS DEBUG] handle_create_sparse: sending response\n"; + co_await send_response(stream, MessageType::CreateSparseResponse, request_id, + response, sizeof(response)); + std::cerr << "[UDS DEBUG] handle_create_sparse: response sent\n"; +} + +elio::coro::task UnixSocketServer::handle_write_range( + elio::net::uds_stream& stream, + const uint8_t* payload, size_t len, + uint32_t request_id) +{ + // Parse: key_len(4) + key + offset(8) + data_len(4) + data + if (len < 4) co_return; + + uint32_t key_len = decode_u32(payload); + if (len < 4 + key_len + 8 + 4) co_return; + + std::string key(reinterpret_cast(payload + 4), key_len); + size_t pos = 4 + key_len; + + uint64_t offset = decode_u64(payload + pos); + pos += 8; + + uint32_t data_len = decode_u32(payload + pos); + pos += 4; + + if (len < pos + data_len) co_return; + + // Write to sparse entry + bool success = global_sparse_cache().write_range(key, offset, payload + pos, data_len); + + // Build response: 0 = success, 1 = not found, 2 = invalid range + uint8_t response[4]; + if (!success) { + if (!global_sparse_cache().exists(key)) { + encode_u32(response, 1); // Not found + } else { + encode_u32(response, 2); // Invalid range + } + } else { + encode_u32(response, 0); // Success + } + + co_await send_response(stream, MessageType::WriteRangeResponse, request_id, + response, sizeof(response)); +} + +elio::coro::task UnixSocketServer::handle_finalize( + elio::net::uds_stream& stream, + const uint8_t* payload, size_t len, + uint32_t request_id) +{ + // Parse: key_len(4) + key + if (len < 4) co_return; + + uint32_t key_len = decode_u32(payload); + if (len < 4 + key_len) co_return; + + std::string key(reinterpret_cast(payload + 4), key_len); + + // Finalize sparse entry + std::vector data; + int result = global_sparse_cache().finalize(key, data); + + // If successful, store in sync cache + if (result == 0) { + g_sync_cache.put(key, data); + } + + // Build response: 0 = success, 1 = not found, 2 = incomplete + uint8_t response[4]; + encode_u32(response, result); + + co_await send_response(stream, MessageType::FinalizeResponse, request_id, + response, sizeof(response)); +} + +elio::coro::task UnixSocketServer::send_response( + elio::net::uds_stream& stream, + MessageType type, uint32_t request_id, + const uint8_t* payload, size_t len) +{ + std::vector msg(HEADER_SIZE + len); + encode_header(msg.data(), type, static_cast(len), request_id); + if (len > 0) { + std::memcpy(msg.data() + HEADER_SIZE, payload, len); + } + + size_t total_written = 0; + while (total_written < msg.size()) { + auto result = co_await stream.write(msg.data() + total_written, msg.size() - total_written); + if (result.result <= 0) { + std::cerr << "[UDS DEBUG] write failed: result=" << result.result << "\n"; + co_return false; + } + total_written += result.result; + } + + co_return true; +} + +// Legacy methods - no longer needed +elio::coro::task UnixSocketServer::handle_client_sync(int, elio::runtime::scheduler&) { co_return; } +elio::coro::task UnixSocketServer::process_request_sync(int, elio::runtime::scheduler&) { co_return false; } +elio::coro::task UnixSocketServer::handle_get_sync(int, const uint8_t*, size_t, uint32_t) { co_return; } +elio::coro::task UnixSocketServer::handle_put_sync(int, const uint8_t*, size_t, uint32_t) { co_return; } +elio::coro::task UnixSocketServer::handle_delete_sync(int, const uint8_t*, size_t, uint32_t) { co_return; } + +} // namespace elcache diff --git a/src/sdk/elcache_client.c b/src/sdk/elcache_client.c index 31645ab..269aa20 100644 --- a/src/sdk/elcache_client.c +++ b/src/sdk/elcache_client.c @@ -30,6 +30,13 @@ #define MSG_DELETE_RESPONSE 0x0105 #define MSG_CHECK 0x0106 #define MSG_CHECK_RESPONSE 0x0107 +/* Sparse/partial write operations */ +#define MSG_CREATE_SPARSE 0x0108 +#define MSG_CREATE_SPARSE_RESPONSE 0x0109 +#define MSG_WRITE_RANGE 0x010A +#define MSG_WRITE_RANGE_RESPONSE 0x010B +#define MSG_FINALIZE 0x010C +#define MSG_FINALIZE_RESPONSE 0x010D /* Default buffer sizes */ #define DEFAULT_RECV_BUFFER (256 * 1024) @@ -761,6 +768,234 @@ void elcache_free(void* ptr) { free(ptr); } +/* + * Position-based write operations + */ + +elcache_error_t elcache_client_create_sparse(elcache_client_t* client, + const void* key, size_t key_len, + uint64_t total_size, + const elcache_put_options_t* options) { + if (!client || !key) { + return ELCACHE_ERR_INVALID_ARG; + } + + if (key_len > 8192) { + set_error(client, "Key too large"); + return ELCACHE_ERR_KEY_TOO_LARGE; + } + + if (!elcache_client_is_connected(client)) { + set_error(client, "Not connected"); + return ELCACHE_ERR_CONNECTION; + } + + /* Build request: key_len(4) + key + total_size(8) + has_ttl(1) + [ttl(8)] + flags(4) */ + uint8_t* payload = client->send_buffer + ELCACHE_HEADER_SIZE; + size_t payload_len = 0; + + /* Key */ + encode_u32(payload + payload_len, (uint32_t)key_len); + payload_len += 4; + memcpy(payload + payload_len, key, key_len); + payload_len += key_len; + + /* Total size */ + encode_u64(payload + payload_len, total_size); + payload_len += 8; + + /* TTL */ + int has_ttl = options && options->ttl_seconds > 0; + payload[payload_len++] = has_ttl ? 1 : 0; + if (has_ttl) { + encode_u64(payload + payload_len, (uint64_t)options->ttl_seconds); + payload_len += 8; + } + + /* Flags */ + uint32_t flags = options ? options->flags : 0; + encode_u32(payload + payload_len, flags); + payload_len += 4; + + /* Encode header and send */ + uint32_t request_id = client->next_request_id++; + encode_header(client->send_buffer, MSG_CREATE_SPARSE, (uint32_t)payload_len, request_id); + + elcache_error_t err = send_all(client, client->send_buffer, ELCACHE_HEADER_SIZE + payload_len); + if (err != ELCACHE_OK) { + return err; + } + + /* Receive response */ + err = recv_all(client, client->recv_buffer, ELCACHE_HEADER_SIZE); + if (err != ELCACHE_OK) { + return err; + } + + uint32_t response_len = decode_u32(client->recv_buffer + 8); + err = recv_all(client, client->recv_buffer + ELCACHE_HEADER_SIZE, response_len); + if (err != ELCACHE_OK) { + return err; + } + + /* Parse response */ + uint8_t* resp = client->recv_buffer + ELCACHE_HEADER_SIZE; + uint32_t code = decode_u32(resp); + + if (code != 0) { + set_error(client, "Create sparse failed"); + return ELCACHE_ERR_INTERNAL; + } + + return ELCACHE_OK; +} + +elcache_error_t elcache_client_write_range(elcache_client_t* client, + const void* key, size_t key_len, + uint64_t offset, + const void* data, size_t data_len) { + if (!client || !key || !data) { + return ELCACHE_ERR_INVALID_ARG; + } + + if (key_len > 8192) { + set_error(client, "Key too large"); + return ELCACHE_ERR_KEY_TOO_LARGE; + } + + if (!elcache_client_is_connected(client)) { + set_error(client, "Not connected"); + return ELCACHE_ERR_CONNECTION; + } + + /* Check if payload fits in buffer */ + size_t needed = ELCACHE_HEADER_SIZE + 4 + key_len + 8 + 4 + data_len; + if (needed > client->send_buffer_size) { + set_error(client, "Data too large for buffer"); + return ELCACHE_ERR_VALUE_TOO_LARGE; + } + + /* Build request: key_len(4) + key + offset(8) + data_len(4) + data */ + uint8_t* payload = client->send_buffer + ELCACHE_HEADER_SIZE; + size_t payload_len = 0; + + /* Key */ + encode_u32(payload + payload_len, (uint32_t)key_len); + payload_len += 4; + memcpy(payload + payload_len, key, key_len); + payload_len += key_len; + + /* Offset */ + encode_u64(payload + payload_len, offset); + payload_len += 8; + + /* Data */ + encode_u32(payload + payload_len, (uint32_t)data_len); + payload_len += 4; + memcpy(payload + payload_len, data, data_len); + payload_len += data_len; + + /* Encode header and send */ + uint32_t request_id = client->next_request_id++; + encode_header(client->send_buffer, MSG_WRITE_RANGE, (uint32_t)payload_len, request_id); + + elcache_error_t err = send_all(client, client->send_buffer, ELCACHE_HEADER_SIZE + payload_len); + if (err != ELCACHE_OK) { + return err; + } + + /* Receive response */ + err = recv_all(client, client->recv_buffer, ELCACHE_HEADER_SIZE); + if (err != ELCACHE_OK) { + return err; + } + + uint32_t response_len = decode_u32(client->recv_buffer + 8); + err = recv_all(client, client->recv_buffer + ELCACHE_HEADER_SIZE, response_len); + if (err != ELCACHE_OK) { + return err; + } + + /* Parse response */ + uint8_t* resp = client->recv_buffer + ELCACHE_HEADER_SIZE; + uint32_t code = decode_u32(resp); + + if (code == 1) { + set_error(client, "Key not found or not sparse"); + return ELCACHE_ERR_NOT_FOUND; + } else if (code == 2) { + set_error(client, "Invalid range"); + return ELCACHE_ERR_INVALID_ARG; + } else if (code != 0) { + set_error(client, "Write range failed"); + return ELCACHE_ERR_INTERNAL; + } + + /* Update stats */ + client->bytes_written += data_len; + + return ELCACHE_OK; +} + +elcache_error_t elcache_client_finalize(elcache_client_t* client, + const void* key, size_t key_len) { + if (!client || !key) { + return ELCACHE_ERR_INVALID_ARG; + } + + if (!elcache_client_is_connected(client)) { + set_error(client, "Not connected"); + return ELCACHE_ERR_CONNECTION; + } + + /* Build request: key_len(4) + key */ + uint8_t* payload = client->send_buffer + ELCACHE_HEADER_SIZE; + size_t payload_len = 0; + + encode_u32(payload + payload_len, (uint32_t)key_len); + payload_len += 4; + memcpy(payload + payload_len, key, key_len); + payload_len += key_len; + + /* Encode header and send */ + uint32_t request_id = client->next_request_id++; + encode_header(client->send_buffer, MSG_FINALIZE, (uint32_t)payload_len, request_id); + + elcache_error_t err = send_all(client, client->send_buffer, ELCACHE_HEADER_SIZE + payload_len); + if (err != ELCACHE_OK) { + return err; + } + + /* Receive response */ + err = recv_all(client, client->recv_buffer, ELCACHE_HEADER_SIZE); + if (err != ELCACHE_OK) { + return err; + } + + uint32_t response_len = decode_u32(client->recv_buffer + 8); + err = recv_all(client, client->recv_buffer + ELCACHE_HEADER_SIZE, response_len); + if (err != ELCACHE_OK) { + return err; + } + + /* Parse response */ + uint8_t* resp = client->recv_buffer + ELCACHE_HEADER_SIZE; + uint32_t code = decode_u32(resp); + + if (code == 1) { + set_error(client, "Key not found"); + return ELCACHE_ERR_NOT_FOUND; + } else if (code == 2) { + set_error(client, "Not all ranges written"); + return ELCACHE_ERR_PARTIAL; + } else if (code != 0) { + set_error(client, "Finalize failed"); + return ELCACHE_ERR_INTERNAL; + } + + return ELCACHE_OK; +} + /* Streaming operations stubs */ struct elcache_write_stream {