From 2fe6457d417c0aa77c147032271a396d1de9dd69 Mon Sep 17 00:00:00 2001 From: hash-anu Date: Mon, 21 Apr 2025 13:15:00 +0000 Subject: [PATCH 01/13] Adding sync main for basic SQL converter from rocksdb logs --- CMakeLists.txt | 2 + sync/AnuDBSyncServer.cpp | 413 +++++++++++++++++++++++++++++++++++++++ sync/CMakeLists.txt | 11 ++ 3 files changed, 426 insertions(+) create mode 100644 sync/AnuDBSyncServer.cpp create mode 100644 sync/CMakeLists.txt diff --git a/CMakeLists.txt b/CMakeLists.txt index c638b59..2aea656 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -100,3 +100,5 @@ set(ENABLE_OPENSSL OFF CACHE BOOL "Disable OpenSSL" FORCE) include_directories(${CMAKE_SOURCE_DIR}/third_party/nanomq/nanomq/include) # Add mqtt file add_subdirectory(mqtt) +# Add sync +add_subdirectory(sync) diff --git a/sync/AnuDBSyncServer.cpp b/sync/AnuDBSyncServer.cpp new file mode 100644 index 0000000..2f34efb --- /dev/null +++ b/sync/AnuDBSyncServer.cpp @@ -0,0 +1,413 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// Custom RocksDB event listener to detect log updates +class LogUpdateListener : public rocksdb::EventListener { +private: + std::function updateCallback; + +public: + LogUpdateListener(std::function callback) : updateCallback(callback) {} + + void OnFlushBegin(rocksdb::DB* db, const rocksdb::FlushJobInfo& info) override { + updateCallback(); + } + + void OnFlushCompleted(rocksdb::DB* db, const rocksdb::FlushJobInfo& info) override { + updateCallback(); + } + + void OnBackgroundError(rocksdb::BackgroundErrorReason reason, rocksdb::Status* status) override { + std::cerr << "RocksDB background error: " << status->ToString() << std::endl; + } +}; + +class AnuDBSQLConverter { +private: + std::string dbPath; + uint64_t lastSequence; + std::ofstream logFile; + std::atomic running; + std::mutex mtx; + std::condition_variable cv; + bool hasNewLogs; + bool processAll; + + // Convert RocksDB operation to SQL query + std::string operationToSQL(const std::string& opType, const std::string& key, + const std::string& value, uint32_t cfId, + const std::map& cfNameMap) { + // Get column family name + std::string cfName = "default"; + auto it = cfNameMap.find(cfId); + if (it != cfNameMap.end()) { + cfName = it->second; + } + + // Document ID from key + std::string docId = key; + + // Escape single quotes in values for SQL + std::string escapedValue = value; + size_t pos = 0; + while ((pos = escapedValue.find("'", pos)) != std::string::npos) { + escapedValue.replace(pos, 1, "''"); + pos += 2; + } + + if (opType == "PUT") { + return "INSERT INTO " + cfName + " (id, data) VALUES ('" + + docId + "', '" + escapedValue + "') ON DUPLICATE KEY UPDATE data=VALUES(data);"; + } + else if (opType == "DELETE") { + return "DELETE FROM " + cfName + " WHERE id = '" + docId + "';"; + } + else if (opType == "CREATE_CF") { + return "CREATE TABLE " + key + " (id VARCHAR(255) PRIMARY KEY, data TEXT);"; + } + else if (opType == "DROP_CF") { + return "DROP TABLE " + key + ";"; + } + + return "-- Unknown operation: " + opType; + } + + // Load last sequence from state file + void loadState() { + if (processAll) { + // Force starting from sequence 0 if processing all logs + lastSequence = 0; + std::cout << "Processing all logs from sequence 0" << std::endl; + return; + } + + std::ifstream stateFile(dbPath + "/sql_converter_state"); + if (stateFile) { + stateFile >> lastSequence; + stateFile.close(); + std::cout << "Loaded last processed sequence: " << lastSequence << std::endl; + } else { + lastSequence = 0; + std::cout << "No previous state found. Starting from sequence 0" << std::endl; + } + } + + // Save last sequence to state file + void saveState() { + std::ofstream stateFile(dbPath + "/sql_converter_state"); + if (stateFile) { + stateFile << lastSequence; + stateFile.close(); + } else { + std::cerr << "Failed to save state" << std::endl; + } + } + +public: + AnuDBSQLConverter(const std::string& path, const std::string& outputFile = "", bool processAllLogs = false) + : dbPath(path), lastSequence(0), running(false), hasNewLogs(false), processAll(processAllLogs) { + + // Open log file if specified + if (!outputFile.empty()) { + logFile.open(outputFile, std::ios::app); + if (!logFile) { + std::cerr << "Failed to open log file: " << outputFile << std::endl; + } + } + + // Load previous state or start from 0 if processing all logs + loadState(); + } + + ~AnuDBSQLConverter() { + // Stop any running threads + stop(); + + // Close log file + if (logFile.is_open()) { + logFile.close(); + } + + // Save state + saveState(); + } + + // Notify that there are new logs to process + void notifyNewLogs() { + std::lock_guard lock(mtx); + hasNewLogs = true; + cv.notify_one(); + } + + // Process logs and convert to SQL + void processLogs() { + rocksdb::Options options; + options.create_if_missing = false; + + // Add listener for log updates + options.listeners.push_back(std::make_shared( + [this]() { this->notifyNewLogs(); } + )); + + // Get list of column families + std::vector cfNames; + rocksdb::Status s = rocksdb::DB::ListColumnFamilies(options, dbPath, &cfNames); + if (!s.ok()) { + std::cerr << "Failed to list column families: " << s.ToString() << std::endl; + return; + } + + // Open DB with all column families + std::vector cfDescriptors; + for (const auto& name : cfNames) { + cfDescriptors.push_back(rocksdb::ColumnFamilyDescriptor(name, rocksdb::ColumnFamilyOptions())); + } + + std::vector cfHandles; + rocksdb::DB* dbRaw; + + // Open in read-only mode, but with listener for updates + s = rocksdb::DB::OpenForReadOnly(options, dbPath, cfDescriptors, &cfHandles, &dbRaw); + if (!s.ok()) { + std::cerr << "Failed to open RocksDB: " << s.ToString() << std::endl; + return; + } + + std::shared_ptr db(dbRaw); + + // Build column family ID to name map + std::map cfNameMap; + for (size_t i = 0; i < cfHandles.size(); i++) { + cfNameMap[cfHandles[i]->GetID()] = cfNames[i]; + delete cfHandles[i]; + } + + // Process existing logs first (force notification) + notifyNewLogs(); + + std::cout << "RocksDB SQL converter started. Monitoring for log updates..." << std::endl; + + while (running) { + // Wait for notification of new logs or timeout (to periodically check anyway) + { + std::unique_lock lock(mtx); + cv.wait_for(lock, std::chrono::seconds(5), [this]() { return hasNewLogs || !running; }); + + if (!running) break; + hasNewLogs = false; + } + + // Get transaction logs since last processed sequence + std::unique_ptr iter; + rocksdb::Status status = db->GetUpdatesSince(lastSequence, &iter); + + if (!status.ok()) { + if (status.IsNotFound()) { + // WAL files might have been deleted, start from current + std::cout << "WAL files not found, resetting sequence number" << std::endl; + uint64_t currentSeq = db->GetLatestSequenceNumber(); + lastSequence = currentSeq; + saveState(); + continue; + } + + std::cerr << "Failed to get transaction logs: " << status.ToString() << std::endl; + continue; + } + + int queryCount = 0; + bool foundLogs = false; + + while (iter->Valid()) { + foundLogs = true; + rocksdb::BatchResult batch = iter->GetBatch(); + lastSequence = batch.sequence; + + class OperationExtractor : public rocksdb::WriteBatch::Handler { + private: + AnuDBSQLConverter* converter; + const std::map& cfMap; + std::vector sqlQueries; + + public: + OperationExtractor(AnuDBSQLConverter* conv, const std::map& cfNameMap) + : converter(conv), cfMap(cfNameMap) {} + + const std::vector& getSqlQueries() const { + return sqlQueries; + } + + rocksdb::Status PutCF(uint32_t column_family_id, const rocksdb::Slice& key, + const rocksdb::Slice& value) override { + std::string sql = converter->operationToSQL("PUT", key.ToString(), value.ToString(), + column_family_id, cfMap); + sqlQueries.push_back(sql); + return rocksdb::Status::OK(); + } + + rocksdb::Status DeleteCF(uint32_t column_family_id, const rocksdb::Slice& key) override { + std::string sql = converter->operationToSQL("DELETE", key.ToString(), "", + column_family_id, cfMap); + sqlQueries.push_back(sql); + return rocksdb::Status::OK(); + } + + void LogData(const rocksdb::Slice& blob) override { + std::string data = blob.ToString(); + + size_t createPos = data.find("CREATE_CF:"); + if (createPos != std::string::npos) { + std::string cfName = data.substr(createPos + 10); + sqlQueries.push_back(converter->operationToSQL("CREATE_CF", cfName, "", 0, cfMap)); + return; + } + + size_t dropPos = data.find("DROP_CF:"); + if (dropPos != std::string::npos) { + std::string cfName = data.substr(dropPos + 8); + sqlQueries.push_back(converter->operationToSQL("DROP_CF", cfName, "", 0, cfMap)); + return; + } + } + + // Other required handler methods + rocksdb::Status DeleteRangeCF(uint32_t column_family_id, const rocksdb::Slice& begin_key, + const rocksdb::Slice& end_key) override { + return rocksdb::Status::OK(); + } + + rocksdb::Status MergeCF(uint32_t column_family_id, const rocksdb::Slice& key, + const rocksdb::Slice& value) override { + return rocksdb::Status::OK(); + } + + rocksdb::Status PutBlobIndexCF(uint32_t column_family_id, const rocksdb::Slice& key, + const rocksdb::Slice& value) override { + return rocksdb::Status::OK(); + } + }; + + // Process the write batch + OperationExtractor extractor(this, cfNameMap); + status = batch.writeBatchPtr->Iterate(&extractor); + + if (!status.ok()) { + std::cerr << "Error iterating batch: " << status.ToString() << std::endl; + break; + } + + // Output SQL queries + for (const auto& sql : extractor.getSqlQueries()) { + queryCount++; + + // Use timestamp for each SQL entry + auto now = std::chrono::system_clock::now(); + std::time_t now_time = std::chrono::system_clock::to_time_t(now); + char timestamp[100]; + std::strftime(timestamp, sizeof(timestamp), "%Y-%m-%d %H:%M:%S", std::localtime(&now_time)); + + std::string output = std::string(timestamp) + " | " + sql; + + // Log to file if open + if (logFile.is_open()) { + logFile << output << std::endl; + logFile.flush(); // Ensure it's written immediately + } else { + // Otherwise print to stdout + std::cout << output << std::endl; + } + } + + iter->Next(); + } + + if (queryCount > 0) { + std::cout << "Processed " << queryCount << " SQL queries. Latest sequence: " << lastSequence << std::endl; + saveState(); // Save state after processing batch + } else if (!foundLogs && processAll) { + // If we were processing all logs and found none, switch to monitoring mode + processAll = false; + std::cout << "Finished processing existing logs. Now monitoring for new changes." << std::endl; + } + } + } + + // Start monitoring in a separate thread + void start() { + if (running) return; + + running = true; + std::thread([this]() { + this->processLogs(); + }).detach(); + } + + // Stop monitoring + void stop() { + running = false; + cv.notify_all(); + } + + // Get the latest sequence number + uint64_t getLastSequence() const { + return lastSequence; + } +}; + +int main(int argc, char* argv[]) { + if (argc < 2) { + std::cerr << "Usage: " << argv[0] << " [output_file] [--all]" << std::endl; + std::cerr << " db_path: Path to RocksDB database" << std::endl; + std::cerr << " output_file: Optional file to write SQL queries (default: stdout)" << std::endl; + std::cerr << " --all: Process all logs from the beginning, ignoring previous state" << std::endl; + return 1; + } + + std::string dbPath = argv[1]; + std::string outputFile = ""; + bool processAll = false; + + // Parse arguments + for (int i = 2; i < argc; i++) { + std::string arg = argv[i]; + if (arg == "--all") { + processAll = true; + } else { + outputFile = arg; + } + } + + try { + AnuDBSQLConverter converter(dbPath, outputFile, processAll); + + // Start monitoring in real-time + converter.start(); + + std::cout << "SQL converter running for " << dbPath << std::endl; + std::cout << "Press Ctrl+C to stop" << std::endl; + + // Keep main thread alive + while (true) { + std::this_thread::sleep_for(std::chrono::seconds(60)); + } + + } catch (const std::exception& e) { + std::cerr << "Error: " << e.what() << std::endl; + return 1; + } + + return 0; +} diff --git a/sync/CMakeLists.txt b/sync/CMakeLists.txt new file mode 100644 index 0000000..3ddda1b --- /dev/null +++ b/sync/CMakeLists.txt @@ -0,0 +1,11 @@ +add_executable(AnuDBSyncServer AnuDBSyncServer.cpp) + +# Link against your project library +target_link_libraries(AnuDBSyncServer PRIVATE + libanu +) + +# Add include directories +target_include_directories(AnuDBSyncServer PRIVATE + ${CMAKE_SOURCE_DIR}/src ${CMAKE_SOURCE_DIR}/third_party/json ${CMAKE_SOURCE_DIR}/third_party/nanomq/nanomq/include ${CMAKE_SOURCE_DIR}/nng/include/nng/ +) From dc61771f5fcb215fcf77e59b75a0c0e219881067 Mon Sep 17 00:00:00 2001 From: Hash Anu Date: Wed, 23 Apr 2025 22:18:58 +0530 Subject: [PATCH 02/13] Adding WAL tracker class --- sync/AnuDBSyncServer.cpp | 1106 +++++++++++++++++++++++++------------- 1 file changed, 732 insertions(+), 374 deletions(-) diff --git a/sync/AnuDBSyncServer.cpp b/sync/AnuDBSyncServer.cpp index 2f34efb..3de198e 100644 --- a/sync/AnuDBSyncServer.cpp +++ b/sync/AnuDBSyncServer.cpp @@ -1,413 +1,771 @@ -#include +// WalTracker.h +#ifndef WAL_TRACKER_H +#define WAL_TRACKER_H + +#include +#include +#include #include -#include -#include #include -#include -#include #include -#include -#include -#include -#include -#include -#include -#include +#include +#include +#ifdef _WIN32 +#include +#pragma comment(lib, "ws2_32.lib") +extern "C" uintptr_t __cdecl _beginthreadex(void*, unsigned int, + unsigned int(__stdcall*)(void*), void*, unsigned int, unsigned int*); +#endif +// Callback type for WAL operations +using WalOperationCallback = std::function; -// Custom RocksDB event listener to detect log updates -class LogUpdateListener : public rocksdb::EventListener { -private: - std::function updateCallback; - +class WalTracker { public: - LogUpdateListener(std::function callback) : updateCallback(callback) {} - - void OnFlushBegin(rocksdb::DB* db, const rocksdb::FlushJobInfo& info) override { - updateCallback(); + // Constructor - takes a RocksDB instance and optional CF mapping + WalTracker(rocksdb::DB* db, + const std::unordered_map& cf_map = {}); + + // Destructor - stops tracking thread + ~WalTracker(); + + // Start tracking WAL operations + void StartTracking(); + + // Stop tracking WAL operations + void StopTracking(); + + // Register a callback for WAL operations + void RegisterCallback(WalOperationCallback callback); + + // Check if tracking is active + bool IsTracking() const; + + // Get current sequence number + uint64_t GetCurrentSequence() const; + +private: + // Handler for WAL operations + class WalLogHandler : public rocksdb::WriteBatch::Handler { + public: + WalLogHandler(std::unordered_map& cf_map, + WalOperationCallback callback); + + rocksdb::Status PutCF(uint32_t column_family_id, + const rocksdb::Slice& key, + const rocksdb::Slice& value) override; + + rocksdb::Status DeleteCF(uint32_t column_family_id, + const rocksdb::Slice& key) override; + + void LogData(const rocksdb::Slice& blob) override; + + private: + std::unordered_map& cf_id_to_name; + WalOperationCallback callback; + + std::string GetCFName(uint32_t id) const; + }; + + // Thread function for monitoring WAL + void TrackingThread(); + + // Read WAL logs from a sequence number + bool ReadWalLogs(uint64_t& last_sequence); + + // Update column family mapping + void UpdateColumnFamilyMap(); + + // Member variables + rocksdb::DB* db_; + std::unordered_map cf_id_to_name_; + std::unique_ptr tracking_thread_; + std::atomic should_stop_; + std::atomic is_tracking_; + WalOperationCallback callback_; + uint64_t current_sequence_; +}; + +#endif // WAL_TRACKER_H + +// WalTracker.cpp +//#include "WalTracker.h" +#include +#include +#include + +// Constructor +WalTracker::WalTracker(rocksdb::DB* db, + const std::unordered_map& cf_map) + : db_(db), + cf_id_to_name_(cf_map), + should_stop_(false), + is_tracking_(false), + current_sequence_(0) { + // Initialize column family mapping if empty + if (cf_id_to_name_.empty()) { + // Default column family always has ID 0 + cf_id_to_name_[0] = "default"; + + // Try to get more column families from DB path + std::vector cf_names; + rocksdb::Status s = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(), + db_->GetName(), + &cf_names); } - - void OnFlushCompleted(rocksdb::DB* db, const rocksdb::FlushJobInfo& info) override { - updateCallback(); +} + +// Destructor +WalTracker::~WalTracker() { + StopTracking(); +} + +// Start tracking WAL operations +void WalTracker::StartTracking() { + if (is_tracking_) { + return; // Already tracking } - - void OnBackgroundError(rocksdb::BackgroundErrorReason reason, rocksdb::Status* status) override { - std::cerr << "RocksDB background error: " << status->ToString() << std::endl; + + should_stop_ = false; + is_tracking_ = true; + + // Create and start tracking thread + tracking_thread_ = std::make_unique(&WalTracker::TrackingThread, this); +} + +// Stop tracking WAL operations +void WalTracker::StopTracking() { + if (!is_tracking_) { + return; // Not tracking } -}; -class AnuDBSQLConverter { -private: - std::string dbPath; - uint64_t lastSequence; - std::ofstream logFile; - std::atomic running; - std::mutex mtx; - std::condition_variable cv; - bool hasNewLogs; - bool processAll; - - // Convert RocksDB operation to SQL query - std::string operationToSQL(const std::string& opType, const std::string& key, - const std::string& value, uint32_t cfId, - const std::map& cfNameMap) { - // Get column family name - std::string cfName = "default"; - auto it = cfNameMap.find(cfId); - if (it != cfNameMap.end()) { - cfName = it->second; - } - - // Document ID from key - std::string docId = key; - - // Escape single quotes in values for SQL - std::string escapedValue = value; - size_t pos = 0; - while ((pos = escapedValue.find("'", pos)) != std::string::npos) { - escapedValue.replace(pos, 1, "''"); - pos += 2; - } - - if (opType == "PUT") { - return "INSERT INTO " + cfName + " (id, data) VALUES ('" + - docId + "', '" + escapedValue + "') ON DUPLICATE KEY UPDATE data=VALUES(data);"; - } - else if (opType == "DELETE") { - return "DELETE FROM " + cfName + " WHERE id = '" + docId + "';"; - } - else if (opType == "CREATE_CF") { - return "CREATE TABLE " + key + " (id VARCHAR(255) PRIMARY KEY, data TEXT);"; - } - else if (opType == "DROP_CF") { - return "DROP TABLE " + key + ";"; - } - - return "-- Unknown operation: " + opType; + // Signal thread to stop and wait for it + should_stop_ = true; + if (tracking_thread_ && tracking_thread_->joinable()) { + tracking_thread_->join(); } - - // Load last sequence from state file - void loadState() { - if (processAll) { - // Force starting from sequence 0 if processing all logs - lastSequence = 0; - std::cout << "Processing all logs from sequence 0" << std::endl; - return; + + is_tracking_ = false; +} + +// Register a callback for WAL operations +void WalTracker::RegisterCallback(WalOperationCallback callback) { + callback_ = callback; +} + +// Check if tracking is active +bool WalTracker::IsTracking() const { + return is_tracking_; +} + +// Get current sequence number +uint64_t WalTracker::GetCurrentSequence() const { + return current_sequence_; +} + +// Thread function for monitoring WAL +void WalTracker::TrackingThread() { + // Start from the current sequence number of the database + uint64_t last_sequence = 0; + + // First read historical WAL entries + ReadWalLogs(last_sequence); + + // Then monitor for new entries + while (!should_stop_) { + uint64_t current_seq = db_->GetLatestSequenceNumber(); + current_sequence_ = current_seq; // Update current sequence + + if (last_sequence < current_seq) { + ReadWalLogs(last_sequence); } - - std::ifstream stateFile(dbPath + "/sql_converter_state"); - if (stateFile) { - stateFile >> lastSequence; - stateFile.close(); - std::cout << "Loaded last processed sequence: " << lastSequence << std::endl; - } else { - lastSequence = 0; - std::cout << "No previous state found. Starting from sequence 0" << std::endl; + else { + // No new sequences, wait a bit + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } - - // Save last sequence to state file - void saveState() { - std::ofstream stateFile(dbPath + "/sql_converter_state"); - if (stateFile) { - stateFile << lastSequence; - stateFile.close(); - } else { - std::cerr << "Failed to save state" << std::endl; +} + +// Read WAL logs from a sequence number +bool WalTracker::ReadWalLogs(uint64_t& last_sequence) { + std::unique_ptr iter; + rocksdb::Status status = db_->GetUpdatesSince(last_sequence, &iter); + + if (!status.ok()) { + return false; + } + + bool has_new_entries = false; + + while (iter->Valid()) { + const auto& result = iter->GetBatch(); + last_sequence = result.sequence + 1; + + // Process WAL entries if callback is registered + if (callback_) { + WalLogHandler handler(cf_id_to_name_, callback_); + result.writeBatchPtr->Iterate(&handler); } + + iter->Next(); + has_new_entries = true; } -public: - AnuDBSQLConverter(const std::string& path, const std::string& outputFile = "", bool processAllLogs = false) - : dbPath(path), lastSequence(0), running(false), hasNewLogs(false), processAll(processAllLogs) { - - // Open log file if specified - if (!outputFile.empty()) { - logFile.open(outputFile, std::ios::app); - if (!logFile) { - std::cerr << "Failed to open log file: " << outputFile << std::endl; + return has_new_entries; +} + +// Update column family mapping +void WalTracker::UpdateColumnFamilyMap() { + // For older RocksDB versions that don't have GetColumnFamilyHandles() + // We need to maintain our own list of column families + + // Default column family always has ID 0 + cf_id_to_name_[0] = "default"; + + // We rely on WAL LogData entries to update our mapping + // when new column families are created +} + +// WalLogHandler implementation +WalTracker::WalLogHandler::WalLogHandler( + std::unordered_map& cf_map, + WalOperationCallback callback) + : cf_id_to_name(cf_map), callback(callback) {} + +rocksdb::Status WalTracker::WalLogHandler::PutCF( + uint32_t column_family_id, + const rocksdb::Slice& key, + const rocksdb::Slice& value) { + std::string cf_name = GetCFName(column_family_id); + + if (callback) { + callback("PUT", cf_name, key.ToString(), value.ToString()); + } + + return rocksdb::Status::OK(); +} + +rocksdb::Status WalTracker::WalLogHandler::DeleteCF( + uint32_t column_family_id, + const rocksdb::Slice& key) { + std::string cf_name = GetCFName(column_family_id); + + if (callback) { + callback("DELETE", cf_name, key.ToString(), ""); + } + + return rocksdb::Status::OK(); +} + +void WalTracker::WalLogHandler::LogData(const rocksdb::Slice& blob) { + std::string data = blob.ToString(); + + if (data.find("rocksdb.ColumnFamilyAdd") != std::string::npos) { + std::regex rgx("name=([^,]+),id=(\\d+)"); + std::smatch match; + if (std::regex_search(data, match, rgx) && match.size() == 3) { + std::string cf_name = match[1]; + uint32_t cf_id = static_cast(std::stoul(match[2])); + cf_id_to_name[cf_id] = cf_name; + + if (callback) { + callback("CREATE_CF", cf_name, std::to_string(cf_id), ""); } } - - // Load previous state or start from 0 if processing all logs - loadState(); - } - - ~AnuDBSQLConverter() { - // Stop any running threads - stop(); - - // Close log file - if (logFile.is_open()) { - logFile.close(); - } - - // Save state - saveState(); } - - // Notify that there are new logs to process - void notifyNewLogs() { - std::lock_guard lock(mtx); - hasNewLogs = true; - cv.notify_one(); - } - - // Process logs and convert to SQL - void processLogs() { - rocksdb::Options options; - options.create_if_missing = false; - - // Add listener for log updates - options.listeners.push_back(std::make_shared( - [this]() { this->notifyNewLogs(); } - )); - - // Get list of column families - std::vector cfNames; - rocksdb::Status s = rocksdb::DB::ListColumnFamilies(options, dbPath, &cfNames); - if (!s.ok()) { - std::cerr << "Failed to list column families: " << s.ToString() << std::endl; - return; - } - - // Open DB with all column families - std::vector cfDescriptors; - for (const auto& name : cfNames) { - cfDescriptors.push_back(rocksdb::ColumnFamilyDescriptor(name, rocksdb::ColumnFamilyOptions())); - } - - std::vector cfHandles; - rocksdb::DB* dbRaw; - - // Open in read-only mode, but with listener for updates - s = rocksdb::DB::OpenForReadOnly(options, dbPath, cfDescriptors, &cfHandles, &dbRaw); - if (!s.ok()) { - std::cerr << "Failed to open RocksDB: " << s.ToString() << std::endl; - return; - } - - std::shared_ptr db(dbRaw); - - // Build column family ID to name map - std::map cfNameMap; - for (size_t i = 0; i < cfHandles.size(); i++) { - cfNameMap[cfHandles[i]->GetID()] = cfNames[i]; - delete cfHandles[i]; +} + +std::string WalTracker::WalLogHandler::GetCFName(uint32_t id) const { + auto it = cf_id_to_name.find(id); + return it != cf_id_to_name.end() ? it->second : "unknown"; +} + +// DatabaseOperations.h +#ifndef DATABASE_OPERATIONS_H +#define DATABASE_OPERATIONS_H + +#include +#include +#include +#include +#include +#include +#include +#include + +class DatabaseOperations { +public: + // Constructor + DatabaseOperations(rocksdb::DB* db, + const std::vector& cf_handles); + + // Destructor + ~DatabaseOperations(); + + // Start random operations + void StartRandomOperations(int operations_per_second = 10); + + // Stop random operations + void StopRandomOperations(); + + // Create a new column family + rocksdb::ColumnFamilyHandle* CreateNewColumnFamily(const std::string& name); + + // Register callback for operation tracking (for logging purposes) + using OperationCallback = std::function; + void RegisterCallback(OperationCallback callback); + +private: + // Thread function for random operations + void OperationsThread(); + + // Generate random string + std::string GenerateRandomString(int length); + + // Perform random put operation + void RandomPut(); + + // Perform random delete operation + void RandomDelete(); + + // Perform random get operation + void RandomGet(); + + // Perform random scan operation + void RandomScan(); + + // Member variables + rocksdb::DB* db_; + std::vector cf_handles_; + std::vector generated_keys_; + std::unique_ptr operations_thread_; + std::atomic should_stop_; + std::atomic is_running_; + int ops_per_second_; + std::mt19937 random_generator_; + OperationCallback callback_; +}; + +#endif // DATABASE_OPERATIONS_H + +// DatabaseOperations.cpp +//#include "DatabaseOperations.h" +#include +#include +#include + +// Constructor +DatabaseOperations::DatabaseOperations(rocksdb::DB* db, + const std::vector& cf_handles) + : db_(db), + cf_handles_(cf_handles), + should_stop_(false), + is_running_(false), + ops_per_second_(10) { + // Initialize random generator + std::random_device rd; + random_generator_.seed(rd()); +} + +// Destructor +DatabaseOperations::~DatabaseOperations() { + StopRandomOperations(); +} + +// Start random operations +void DatabaseOperations::StartRandomOperations(int operations_per_second) { + if (is_running_) { + return; // Already running + } + + ops_per_second_ = operations_per_second; + should_stop_ = false; + is_running_ = true; + + // Create and start operations thread + operations_thread_ = std::make_unique(&DatabaseOperations::OperationsThread, this); +} + +// Stop random operations +void DatabaseOperations::StopRandomOperations() { + if (!is_running_) { + return; // Not running + } + + // Signal thread to stop and wait for it + should_stop_ = true; + if (operations_thread_ && operations_thread_->joinable()) { + operations_thread_->join(); + } + + is_running_ = false; +} + +// Create a new column family +rocksdb::ColumnFamilyHandle* DatabaseOperations::CreateNewColumnFamily(const std::string& name) { + rocksdb::ColumnFamilyHandle* cf_handle = nullptr; + rocksdb::Status status = db_->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), name, &cf_handle); + + if (status.ok() && cf_handle != nullptr) { + cf_handles_.push_back(cf_handle); + + if (callback_) { + callback_("CREATE_CF_MANUAL", name, std::to_string(cf_handle->GetID()), ""); } + } + else { + std::cerr << "Error creating column family: " << status.ToString() << std::endl; + } - // Process existing logs first (force notification) - notifyNewLogs(); - - std::cout << "RocksDB SQL converter started. Monitoring for log updates..." << std::endl; - - while (running) { - // Wait for notification of new logs or timeout (to periodically check anyway) - { - std::unique_lock lock(mtx); - cv.wait_for(lock, std::chrono::seconds(5), [this]() { return hasNewLogs || !running; }); - - if (!running) break; - hasNewLogs = false; - } - - // Get transaction logs since last processed sequence - std::unique_ptr iter; - rocksdb::Status status = db->GetUpdatesSince(lastSequence, &iter); - - if (!status.ok()) { - if (status.IsNotFound()) { - // WAL files might have been deleted, start from current - std::cout << "WAL files not found, resetting sequence number" << std::endl; - uint64_t currentSeq = db->GetLatestSequenceNumber(); - lastSequence = currentSeq; - saveState(); - continue; - } - - std::cerr << "Failed to get transaction logs: " << status.ToString() << std::endl; - continue; - } + return cf_handle; +} - int queryCount = 0; - bool foundLogs = false; - - while (iter->Valid()) { - foundLogs = true; - rocksdb::BatchResult batch = iter->GetBatch(); - lastSequence = batch.sequence; - - class OperationExtractor : public rocksdb::WriteBatch::Handler { - private: - AnuDBSQLConverter* converter; - const std::map& cfMap; - std::vector sqlQueries; - - public: - OperationExtractor(AnuDBSQLConverter* conv, const std::map& cfNameMap) - : converter(conv), cfMap(cfNameMap) {} - - const std::vector& getSqlQueries() const { - return sqlQueries; - } - - rocksdb::Status PutCF(uint32_t column_family_id, const rocksdb::Slice& key, - const rocksdb::Slice& value) override { - std::string sql = converter->operationToSQL("PUT", key.ToString(), value.ToString(), - column_family_id, cfMap); - sqlQueries.push_back(sql); - return rocksdb::Status::OK(); - } - - rocksdb::Status DeleteCF(uint32_t column_family_id, const rocksdb::Slice& key) override { - std::string sql = converter->operationToSQL("DELETE", key.ToString(), "", - column_family_id, cfMap); - sqlQueries.push_back(sql); - return rocksdb::Status::OK(); - } - - void LogData(const rocksdb::Slice& blob) override { - std::string data = blob.ToString(); - - size_t createPos = data.find("CREATE_CF:"); - if (createPos != std::string::npos) { - std::string cfName = data.substr(createPos + 10); - sqlQueries.push_back(converter->operationToSQL("CREATE_CF", cfName, "", 0, cfMap)); - return; - } - - size_t dropPos = data.find("DROP_CF:"); - if (dropPos != std::string::npos) { - std::string cfName = data.substr(dropPos + 8); - sqlQueries.push_back(converter->operationToSQL("DROP_CF", cfName, "", 0, cfMap)); - return; - } - } - - // Other required handler methods - rocksdb::Status DeleteRangeCF(uint32_t column_family_id, const rocksdb::Slice& begin_key, - const rocksdb::Slice& end_key) override { - return rocksdb::Status::OK(); - } - - rocksdb::Status MergeCF(uint32_t column_family_id, const rocksdb::Slice& key, - const rocksdb::Slice& value) override { - return rocksdb::Status::OK(); - } - - rocksdb::Status PutBlobIndexCF(uint32_t column_family_id, const rocksdb::Slice& key, - const rocksdb::Slice& value) override { - return rocksdb::Status::OK(); - } - }; - - // Process the write batch - OperationExtractor extractor(this, cfNameMap); - status = batch.writeBatchPtr->Iterate(&extractor); - - if (!status.ok()) { - std::cerr << "Error iterating batch: " << status.ToString() << std::endl; - break; - } - - // Output SQL queries - for (const auto& sql : extractor.getSqlQueries()) { - queryCount++; - - // Use timestamp for each SQL entry - auto now = std::chrono::system_clock::now(); - std::time_t now_time = std::chrono::system_clock::to_time_t(now); - char timestamp[100]; - std::strftime(timestamp, sizeof(timestamp), "%Y-%m-%d %H:%M:%S", std::localtime(&now_time)); - - std::string output = std::string(timestamp) + " | " + sql; - - // Log to file if open - if (logFile.is_open()) { - logFile << output << std::endl; - logFile.flush(); // Ensure it's written immediately - } else { - // Otherwise print to stdout - std::cout << output << std::endl; - } - } - - iter->Next(); - } - - if (queryCount > 0) { - std::cout << "Processed " << queryCount << " SQL queries. Latest sequence: " << lastSequence << std::endl; - saveState(); // Save state after processing batch - } else if (!foundLogs && processAll) { - // If we were processing all logs and found none, switch to monitoring mode - processAll = false; - std::cout << "Finished processing existing logs. Now monitoring for new changes." << std::endl; - } +// Register callback +void DatabaseOperations::RegisterCallback(OperationCallback callback) { + callback_ = callback; +} + +// Thread function for random operations +void DatabaseOperations::OperationsThread() { + // Calculate sleep time between operations + int sleep_ms = 1000 / ops_per_second_; + + while (!should_stop_) { + // Select a random operation type + std::uniform_int_distribution<> op_dist(0, 1); + int operation = op_dist(random_generator_); + + switch (operation) { + case 0: + RandomPut(); + break; + case 1: + RandomDelete(); + break; + case 2: + //RandomGet(); + break; + case 3: + //RandomScan(); + break; } + + // Sleep between operations + std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms)); + } +} + +// Generate random string +std::string DatabaseOperations::GenerateRandomString(int length) { + static const char alphanum[] = + "0123456789" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz"; + std::uniform_int_distribution<> dis(0, sizeof(alphanum) - 2); + + std::string result; + result.reserve(length); + for (int i = 0; i < length; ++i) { + result += alphanum[dis(random_generator_)]; } + return result; +} - // Start monitoring in a separate thread - void start() { - if (running) return; - - running = true; - std::thread([this]() { - this->processLogs(); - }).detach(); +// Perform random put operation +void DatabaseOperations::RandomPut() { + if (cf_handles_.empty()) { + return; } - // Stop monitoring - void stop() { - running = false; - cv.notify_all(); + // Select a random column family + std::uniform_int_distribution<> cf_dist(0, cf_handles_.size() - 1); + int cf_index = cf_dist(random_generator_); + rocksdb::ColumnFamilyHandle* cf = cf_handles_[cf_index]; + + // Create key and value + std::string key = "key_" + GenerateRandomString(8); + std::string value = "value_" + GenerateRandomString(16); + + // Store key for potential future delete/get operations + generated_keys_.push_back(key); + if (generated_keys_.size() > 1000) { + generated_keys_.erase(generated_keys_.begin()); } - // Get the latest sequence number - uint64_t getLastSequence() const { - return lastSequence; + // Write to database + rocksdb::WriteOptions write_options; + write_options.disableWAL = false; // Ensure WAL is enabled + rocksdb::Status status = db_->Put(write_options, cf, key, value); + + if (status.ok() && callback_) { + callback_("PUT_MANUAL", cf->GetName(), key, value); } -}; +} -int main(int argc, char* argv[]) { - if (argc < 2) { - std::cerr << "Usage: " << argv[0] << " [output_file] [--all]" << std::endl; - std::cerr << " db_path: Path to RocksDB database" << std::endl; - std::cerr << " output_file: Optional file to write SQL queries (default: stdout)" << std::endl; - std::cerr << " --all: Process all logs from the beginning, ignoring previous state" << std::endl; - return 1; +// Perform random delete operation +void DatabaseOperations::RandomDelete() { + if (cf_handles_.empty() || generated_keys_.empty()) { + return; } - - std::string dbPath = argv[1]; - std::string outputFile = ""; - bool processAll = false; - - // Parse arguments - for (int i = 2; i < argc; i++) { - std::string arg = argv[i]; - if (arg == "--all") { - processAll = true; - } else { - outputFile = arg; + + // Select a random column family + std::uniform_int_distribution<> cf_dist(0, cf_handles_.size() - 1); + int cf_index = cf_dist(random_generator_); + rocksdb::ColumnFamilyHandle* cf = cf_handles_[cf_index]; + + // Select a random key to delete + std::uniform_int_distribution<> key_dist(0, generated_keys_.size() - 1); + int key_index = key_dist(random_generator_); + std::string key = generated_keys_[key_index]; + + // Remove key from generated keys + generated_keys_.erase(generated_keys_.begin() + key_index); + + // Delete from database + rocksdb::WriteOptions write_options; + rocksdb::Status status = db_->Delete(write_options, cf, key); + + if (status.ok() && callback_) { + callback_("DELETE_MANUAL", cf->GetName(), key, ""); + } +} + +// Perform random get operation +void DatabaseOperations::RandomGet() { + if (cf_handles_.empty() || generated_keys_.empty()) { + return; + } + + // Select a random column family + std::uniform_int_distribution<> cf_dist(0, cf_handles_.size() - 1); + int cf_index = cf_dist(random_generator_); + rocksdb::ColumnFamilyHandle* cf = cf_handles_[cf_index]; + + // Select a random key to get + std::uniform_int_distribution<> key_dist(0, generated_keys_.size() - 1); + int key_index = key_dist(random_generator_); + std::string key = generated_keys_[key_index]; + + // Get from database + std::string value; + rocksdb::ReadOptions read_options; + rocksdb::Status status = db_->Get(read_options, cf, key, &value); + + if (status.ok() && callback_) { + callback_("GET", cf->GetName(), key, value); + } +} + +// Perform random scan operation +void DatabaseOperations::RandomScan() { + if (cf_handles_.empty()) { + return; + } + + // Select a random column family + std::uniform_int_distribution<> cf_dist(0, cf_handles_.size() - 1); + int cf_index = cf_dist(random_generator_); + rocksdb::ColumnFamilyHandle* cf = cf_handles_[cf_index]; + + // Create iterator and scan some items + rocksdb::ReadOptions read_options; + std::unique_ptr it(db_->NewIterator(read_options, cf)); + + // Start from a random position or beginning + std::uniform_int_distribution<> start_dist(0, 1); + bool random_start = start_dist(random_generator_) == 1; + + if (random_start && !generated_keys_.empty()) { + std::uniform_int_distribution<> key_dist(0, generated_keys_.size() - 1); + int key_index = key_dist(random_generator_); + std::string start_key = generated_keys_[key_index]; + it->Seek(start_key); + } + else { + it->SeekToFirst(); + } + + // Scan a few items + std::uniform_int_distribution<> count_dist(1, 10); + int items_to_scan = count_dist(random_generator_); + int items_scanned = 0; + + if (callback_) { + callback_("SCAN_START", cf->GetName(), "", ""); + } + + for (; it->Valid() && items_scanned < items_to_scan; it->Next()) { + std::string key = it->key().ToString(); + std::string value = it->value().ToString(); + + if (callback_) { + callback_("SCAN_ITEM", cf->GetName(), key, value); + } + + items_scanned++; + } + + if (callback_) { + callback_("SCAN_END", cf->GetName(), "", ""); + } +} + +// main.cpp - Usage example +//#include "WalTracker.h" +//#include "DatabaseOperations.h" +#include +#include +#include +#include +#include +#include + +// Mutex for thread-safe console output +std::mutex console_mutex; + +// Function to get current timestamp as string +std::string GetTimestamp() { + auto now = std::chrono::system_clock::now(); + auto time_t_now = std::chrono::system_clock::to_time_t(now); + std::stringstream ss; + ss << std::put_time(std::localtime(&time_t_now), "%Y-%m-%d %H:%M:%S"); + return ss.str(); +} + +// WAL operation handler +void WalOperationHandler(const std::string& operation, + const std::string& cf_name, + const std::string& key, + const std::string& value) { + std::lock_guard lock(console_mutex); + + std::cout << "[" << GetTimestamp() << "] [WAL] "; + std::cout << std::left << std::setw(10) << operation; + std::cout << " | CF: " << std::setw(15) << cf_name; + std::cout << " | Key: " << std::setw(20) << key; + + if (!value.empty()) { + // Truncate value if it's too long + std::string display_value = value; + if (display_value.length() > 30) { + display_value = display_value.substr(0, 27) + "..."; } + std::cout << " | Value: " << display_value; } + + std::cout << std::endl; +} + +// DB operation handler +void DbOperationHandler(const std::string& operation, + const std::string& cf_name, + const std::string& key, + const std::string& value) { + std::lock_guard lock(console_mutex); - try { - AnuDBSQLConverter converter(dbPath, outputFile, processAll); - - // Start monitoring in real-time - converter.start(); - - std::cout << "SQL converter running for " << dbPath << std::endl; - std::cout << "Press Ctrl+C to stop" << std::endl; - - // Keep main thread alive - while (true) { - std::this_thread::sleep_for(std::chrono::seconds(60)); + std::cout << "[" << GetTimestamp() << "] [DB] "; + std::cout << std::left << std::setw(12) << operation; + std::cout << " | CF: " << std::setw(15) << cf_name; + std::cout << " | Key: " << std::setw(20) << key; + + if (!value.empty() && operation != "SCAN_START" && operation != "SCAN_END") { + // Truncate value if it's too long + std::string display_value = value; + if (display_value.length() > 30) { + display_value = display_value.substr(0, 27) + "..."; } - - } catch (const std::exception& e) { - std::cerr << "Error: " << e.what() << std::endl; + std::cout << " | Value: " << display_value; + } + + std::cout << std::endl; +} + +int main(int argc, char** argv) { + if (argc != 2) { + std::cerr << "Usage: " << argv[0] << " " << std::endl; return 1; } - + + std::string db_path = argv[1]; + + // Set up RocksDB options + rocksdb::Options options; + options.create_if_missing = true; + options.create_missing_column_families = true; + + // Define initial column families + std::vector cf_names = { "default", "users", "products", "orders" }; + std::vector cf_descriptors; + + for (const auto& name : cf_names) { + rocksdb::ColumnFamilyOptions cf_options; + cf_descriptors.emplace_back(name, cf_options); + } + + // Open database + std::vector cf_handles; + rocksdb::DB* db_raw = nullptr; + rocksdb::Status status = rocksdb::DB::Open(options, db_path, cf_descriptors, &cf_handles, &db_raw); + + if (!status.ok()) { + std::cerr << "Error opening database: " << status.ToString() << std::endl; + return 1; + } + + // Use smart pointer for automatic cleanup + std::unique_ptr db(db_raw); + + // Create column family ID to name mapping for WAL tracker + std::unordered_map cf_id_to_name; + for (auto* handle : cf_handles) { + cf_id_to_name[handle->GetID()] = handle->GetName(); + std::cout << "Column Family: " << handle->GetName() << " (ID: " << handle->GetID() << ")" << std::endl; + } + + // Create WAL tracker + WalTracker wal_tracker(db.get(), cf_id_to_name); + wal_tracker.RegisterCallback(WalOperationHandler); + + // Create database operations manager + DatabaseOperations db_ops(db.get(), cf_handles); + db_ops.RegisterCallback(DbOperationHandler); + + // Start WAL tracking + std::cout << "\nStarting WAL tracking..." << std::endl; + wal_tracker.StartTracking(); + + // Start random database operations + std::cout << "Starting random database operations (5 ops/sec)..." << std::endl; + db_ops.StartRandomOperations(5); // 5 operations per second + + // Create a new column family after 5 seconds + std::cout << "\nWill create a new column family after 5 seconds..." << std::endl; + std::this_thread::sleep_for(std::chrono::seconds(5)); + + rocksdb::ColumnFamilyHandle* analytics_cf = db_ops.CreateNewColumnFamily("analytics"); + if (analytics_cf != nullptr) { + std::cout << "Created new column family: analytics (ID: " << analytics_cf->GetID() << ")" << std::endl; + } + + // Wait for user input to stop + std::cout << "\nPress Enter to stop..." << std::endl; + std::cin.get(); + + // Stop operations + db_ops.StopRandomOperations(); + wal_tracker.StopTracking(); + + std::cout << "Random operations and WAL tracking stopped." << std::endl; + + // Clean up column family handles + for (auto* handle : cf_handles) { + db->DestroyColumnFamilyHandle(handle); + } + return 0; -} +} \ No newline at end of file From d844250feb6d3f0c38bcc569bed78b12dafa61e5 Mon Sep 17 00:00:00 2001 From: Hash Anu Date: Fri, 25 Apr 2025 00:01:57 +0530 Subject: [PATCH 03/13] Sync server --- sync/AnuDBSyncServer.cpp | 47 ++++++++++++++++++---------------------- 1 file changed, 21 insertions(+), 26 deletions(-) diff --git a/sync/AnuDBSyncServer.cpp b/sync/AnuDBSyncServer.cpp index 3de198e..19f3b87 100644 --- a/sync/AnuDBSyncServer.cpp +++ b/sync/AnuDBSyncServer.cpp @@ -46,6 +46,9 @@ class WalTracker { // Get current sequence number uint64_t GetCurrentSequence() const; + // Update column family mapping + void UpdateColumnFamilyMap(uint32_t id, const std::string& name); + private: // Handler for WAL operations class WalLogHandler : public rocksdb::WriteBatch::Handler { @@ -60,8 +63,6 @@ class WalTracker { rocksdb::Status DeleteCF(uint32_t column_family_id, const rocksdb::Slice& key) override; - void LogData(const rocksdb::Slice& blob) override; - private: std::unordered_map& cf_id_to_name; WalOperationCallback callback; @@ -117,6 +118,11 @@ WalTracker::WalTracker(rocksdb::DB* db, } } +void WalTracker::UpdateColumnFamilyMap(uint32_t id, const std::string& name) { + callback_("CREATE_CF_MANUAL", name, std::to_string(id), ""); + cf_id_to_name_[id] = name; +} + // Destructor WalTracker::~WalTracker() { StopTracking(); @@ -259,24 +265,6 @@ rocksdb::Status WalTracker::WalLogHandler::DeleteCF( return rocksdb::Status::OK(); } -void WalTracker::WalLogHandler::LogData(const rocksdb::Slice& blob) { - std::string data = blob.ToString(); - - if (data.find("rocksdb.ColumnFamilyAdd") != std::string::npos) { - std::regex rgx("name=([^,]+),id=(\\d+)"); - std::smatch match; - if (std::regex_search(data, match, rgx) && match.size() == 3) { - std::string cf_name = match[1]; - uint32_t cf_id = static_cast(std::stoul(match[2])); - cf_id_to_name[cf_id] = cf_name; - - if (callback) { - callback("CREATE_CF", cf_name, std::to_string(cf_id), ""); - } - } - } -} - std::string WalTracker::WalLogHandler::GetCFName(uint32_t id) const { auto it = cf_id_to_name.find(id); return it != cf_id_to_name.end() ? it->second : "unknown"; @@ -299,7 +287,7 @@ class DatabaseOperations { public: // Constructor DatabaseOperations(rocksdb::DB* db, - const std::vector& cf_handles); + const std::vector& cf_handles,WalTracker* wal_tracker); // Destructor ~DatabaseOperations(); @@ -349,6 +337,7 @@ class DatabaseOperations { int ops_per_second_; std::mt19937 random_generator_; OperationCallback callback_; + WalTracker* waltracker_; }; #endif // DATABASE_OPERATIONS_H @@ -361,12 +350,14 @@ class DatabaseOperations { // Constructor DatabaseOperations::DatabaseOperations(rocksdb::DB* db, - const std::vector& cf_handles) + const std::vector& cf_handles, + WalTracker* wal_tracker) : db_(db), cf_handles_(cf_handles), should_stop_(false), is_running_(false), - ops_per_second_(10) { + ops_per_second_(10), + waltracker_(wal_tracker){ // Initialize random generator std::random_device rd; random_generator_.seed(rd()); @@ -414,6 +405,10 @@ rocksdb::ColumnFamilyHandle* DatabaseOperations::CreateNewColumnFamily(const std if (status.ok() && cf_handle != nullptr) { cf_handles_.push_back(cf_handle); + if (waltracker_ != NULL) { + waltracker_->UpdateColumnFamilyMap(cf_handle->GetID(), cf_handle->GetName()); + } + if (callback_) { callback_("CREATE_CF_MANUAL", name, std::to_string(cf_handle->GetID()), ""); } @@ -667,12 +662,12 @@ void DbOperationHandler(const std::string& operation, const std::string& key, const std::string& value) { std::lock_guard lock(console_mutex); - +#if 0 std::cout << "[" << GetTimestamp() << "] [DB] "; std::cout << std::left << std::setw(12) << operation; std::cout << " | CF: " << std::setw(15) << cf_name; std::cout << " | Key: " << std::setw(20) << key; - +#endif if (!value.empty() && operation != "SCAN_START" && operation != "SCAN_END") { // Truncate value if it's too long std::string display_value = value; @@ -732,7 +727,7 @@ int main(int argc, char** argv) { wal_tracker.RegisterCallback(WalOperationHandler); // Create database operations manager - DatabaseOperations db_ops(db.get(), cf_handles); + DatabaseOperations db_ops(db.get(), cf_handles, &wal_tracker); db_ops.RegisterCallback(DbOperationHandler); // Start WAL tracking From 61cc3aeb39560adf69b95fcbc385fb33a9865b69 Mon Sep 17 00:00:00 2001 From: Hash Anu Date: Sat, 26 Apr 2025 22:09:01 +0530 Subject: [PATCH 04/13] Moving waltracker class to correct location and adding its usage in main class --- CMakeLists.txt | 2 - src/Database.cpp | 8 +- src/Database.h | 3 +- src/main.cpp | 43 +- src/storage_engine/CMakeLists.txt | 2 + src/storage_engine/StorageEngine.cpp | 37 +- src/storage_engine/StorageEngine.h | 7 +- src/storage_engine/WalTracker.cpp | 181 +++++++ src/storage_engine/WalTracker.h | 97 ++++ sync/AnuDBSyncServer.cpp | 766 --------------------------- sync/CMakeLists.txt | 11 - 11 files changed, 370 insertions(+), 787 deletions(-) create mode 100644 src/storage_engine/WalTracker.cpp create mode 100644 src/storage_engine/WalTracker.h delete mode 100644 sync/AnuDBSyncServer.cpp delete mode 100644 sync/CMakeLists.txt diff --git a/CMakeLists.txt b/CMakeLists.txt index 2aea656..c638b59 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -100,5 +100,3 @@ set(ENABLE_OPENSSL OFF CACHE BOOL "Disable OpenSSL" FORCE) include_directories(${CMAKE_SOURCE_DIR}/third_party/nanomq/nanomq/include) # Add mqtt file add_subdirectory(mqtt) -# Add sync -add_subdirectory(sync) diff --git a/src/Database.cpp b/src/Database.cpp index 99c2a9d..93b3ae1 100644 --- a/src/Database.cpp +++ b/src/Database.cpp @@ -2,9 +2,9 @@ using namespace anudb; -Status Database::open() { +Status Database::open(bool walTracker) { isDbOpen_ = true; - return engine_.open(); + return engine_.open(walTracker); } Status Database::close() { @@ -77,6 +77,10 @@ Status Database::exportAllToJsonAsync(const std::string& collectionName, const s return collection->exportAllToJsonAsync(exportPath); } +void Database::registerCallback(WalOperationCallback callback) { + engine_.registerCallback(callback); +} + Status Database::importFromJsonFile(const std::string& collectionName, const std::string& importFile) { Collection* collection = getCollection(collectionName); if (!collection) { diff --git a/src/Database.h b/src/Database.h index 4227a89..cad701f 100644 --- a/src/Database.h +++ b/src/Database.h @@ -9,7 +9,7 @@ namespace anudb { class Database { public: Database(const std::string& dbPath) : engine_(dbPath) {} - Status open(); + Status open(bool walTracker = false); Status close(); Status createCollection(const std::string& name); Status dropCollection(const std::string& name); @@ -18,6 +18,7 @@ namespace anudb { Status readAllDocuments(const std::string& collectionName, std::vector& docIds); Status exportAllToJsonAsync(const std::string& collectionName, const std::string& exportPath); Status importFromJsonFile(const std::string& collectionName, const std::string& importFile); + void registerCallback(WalOperationCallback callback); Collection* getCollection(const std::string& name); std::vector getCollectionNames() const; diff --git a/src/main.cpp b/src/main.cpp index 783c051..93bc55d 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -32,17 +32,56 @@ void executeQuery(Collection* collection, const json& query, const std::string& } } +#include +// Mutex for thread-safe console output +std::mutex console_mutex; +// Function to get current timestamp as string +std::string GetTimestamp() { + auto now = std::chrono::system_clock::now(); + auto time_t_now = std::chrono::system_clock::to_time_t(now); + std::stringstream ss; + ss << std::put_time(std::localtime(&time_t_now), "%Y-%m-%d %H:%M:%S"); + return ss.str(); +} +// WAL operation handler +void WalOperationHandler(const std::string& operation, + const std::string& cf_name, + const std::string& key, + const std::string& value) { + std::lock_guard lock(console_mutex); + + std::cout << "[" << GetTimestamp() << "] [WAL] "; + std::cout << std::left << std::setw(10) << operation; + std::cout << " | CF: " << std::setw(15) << cf_name; + std::cout << " | Key: " << std::setw(20) << key; + + if (!value.empty()) { + // Truncate value if it's too long + std::string display_value = value; + if (display_value.length() > 30) { + display_value = display_value.substr(0, 27) + "..."; + } + std::cout << " | Value: " << display_value; + } + + std::cout << std::endl; +} + int main() { + bool walTracker = false; // Database initialization Database db("./product_db"); - Status status = db.open(); - + Status status = db.open(walTracker); if (!status.ok()) { std::cerr << "Failed to open database: " << status.message() << std::endl; return 1; } std::cout << "Database opened successfully." << std::endl; + if (walTracker) { + db.registerCallback(WalOperationHandler); + } + // Create products collection status = db.createCollection("products"); if (!status.ok()) { diff --git a/src/storage_engine/CMakeLists.txt b/src/storage_engine/CMakeLists.txt index 35fa29d..4717593 100644 --- a/src/storage_engine/CMakeLists.txt +++ b/src/storage_engine/CMakeLists.txt @@ -1,6 +1,8 @@ set(storage_engine_SRCS StorageEngine.h StorageEngine.cpp + WalTracker.h + WalTracker.cpp ${CMAKE_SOURCE_DIR}/third_party/rocksdb/include ) diff --git a/src/storage_engine/StorageEngine.cpp b/src/storage_engine/StorageEngine.cpp index ee9d090..e6ded11 100644 --- a/src/storage_engine/StorageEngine.cpp +++ b/src/storage_engine/StorageEngine.cpp @@ -2,7 +2,7 @@ using namespace anudb; -Status StorageEngine::open() { +Status StorageEngine::open(bool walTracker) { rocksdb::Options options; RocksDBOptimizer::EmbeddedConfig config; @@ -31,7 +31,7 @@ Status StorageEngine::open() { for (const auto& cf : columnFamilies) { columnFamilyDescriptors.emplace_back(cf, rocksdb::ColumnFamilyOptions()); } - + wal_tracker_ = walTracker; // Open the database with column families std::vector handles; rocksdb::DB* dbRaw; @@ -50,6 +50,17 @@ Status StorageEngine::open() { if (db_) db_->DestroyColumnFamilyHandle(h); }); } + if (wal_tracker_) { + // Create column family ID to name mapping for WAL tracker + for (auto* handle : handles) { + cf_id_to_name_[handle->GetID()] = handle->GetName(); + std::cout << "Column Family: " << handle->GetName() << " (ID: " << handle->GetID() << ")" << std::endl; + } + + // Create WAL tracker + waltracker_ = new WalTracker(db_, cf_id_to_name_); + waltracker_->StartTracking(); + } // Print estimated memory usage size_t estimated_mem = RocksDBOptimizer::estimateMemoryUsage(config); //std::cout << "Estimated memory usage by storage engine: " << (estimated_mem >> 20) << "MB\n"; @@ -76,6 +87,15 @@ Status StorageEngine::close() { // Clear the ownership vector which will destroy all handles properly ownedHandles_.clear(); + if (wal_tracker_) { + // stop waltracker + waltracker_->StopTracking(); + if (waltracker_) { + delete waltracker_; + waltracker_ = NULL; + } + } + // Close and reset the DB if (db_) { rocksdb::Status s = db_->Close(); @@ -91,6 +111,10 @@ Status StorageEngine::close() { return Status::OK(); } +void StorageEngine::registerCallback(WalOperationCallback callback) { + waltracker_->RegisterCallback(callback); +} + Status StorageEngine::createCollection(const std::string& name) { // Check if collection already exists if (columnFamilies_.find(name) != columnFamilies_.end()) { @@ -105,6 +129,10 @@ Status StorageEngine::createCollection(const std::string& name) { return Status::IOError(s.ToString()); } + if (wal_tracker_) { + waltracker_->UpdateColumnFamilyMap(handle->GetID(), handle->GetName()); + } + // Store the handle columnFamilies_[name] = handle; ownedHandles_.emplace_back(handle, [this](rocksdb::ColumnFamilyHandle* h) { @@ -127,6 +155,11 @@ Status StorageEngine::dropCollection(const std::string& name) { if (!s.ok()) { return Status::IOError(s.ToString()); } + + if (wal_tracker_) { + waltracker_->DeleteColumnFamilyMap(handle->GetID(), handle->GetName()); + } + // Remove from our map columnFamilies_.erase(it); return Status::OK(); diff --git a/src/storage_engine/StorageEngine.h b/src/storage_engine/StorageEngine.h index 27528c7..879080e 100644 --- a/src/storage_engine/StorageEngine.h +++ b/src/storage_engine/StorageEngine.h @@ -4,6 +4,7 @@ #include "json.hpp" #include "Status.h" +#include "WalTracker.h" #include "rocksdb/db.h" #include "rocksdb/table.h" @@ -32,7 +33,7 @@ namespace anudb { class StorageEngine { public: StorageEngine(const std::string& dbPath) : dbPath_(dbPath), index_delimiter_("__index__"){} - Status open(); + Status open(bool WalTracker = false); Status close(); Status createCollection(const std::string& name); @@ -52,6 +53,7 @@ namespace anudb { Status fetchDocIdsForLesser(const std::string& collection, const std::string& prefix, std::vector& docIds) const; Status fetchDocIdsByOrder(const std::string& collection, const std::string& key, std::vector& docIds) const; rocksdb::DB* getDB(); + void registerCallback(WalOperationCallback callback); virtual ~StorageEngine(); // Decode a Big-Endian encoded integer from a RocksDB key static int64_t DecodeIntKey(const std::string& encoded) { @@ -73,6 +75,9 @@ namespace anudb { std::unordered_map columnFamilies_; std::vector>> ownedHandles_; std::string index_delimiter_; + bool wal_tracker_; + WalTracker* waltracker_; + std::unordered_map cf_id_to_name_; //mutable std::mutex db_mutex_; }; diff --git a/src/storage_engine/WalTracker.cpp b/src/storage_engine/WalTracker.cpp new file mode 100644 index 0000000..897c1f0 --- /dev/null +++ b/src/storage_engine/WalTracker.cpp @@ -0,0 +1,181 @@ +// WalTracker.cpp +#include "WalTracker.h" + +using namespace anudb; +// Constructor +WalTracker::WalTracker(rocksdb::DB* db, + const std::unordered_map& cf_map) + : db_(db), + cf_id_to_name_(cf_map), + should_stop_(false), + is_tracking_(false), + current_sequence_(0) { + // Initialize column family mapping if empty + if (cf_id_to_name_.empty()) { + // Default column family always has ID 0 + cf_id_to_name_[0] = "default"; + + // Try to get more column families from DB path + std::vector cf_names; + rocksdb::Status s = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(), + db_->GetName(), + &cf_names); + } +} + +void WalTracker::UpdateColumnFamilyMap(uint32_t id, const std::string& name) { + if (name.find("__index__") != std::string::npos) { + return; + } + callback_("CREATE_CF_MANUAL", name, std::to_string(id), ""); + cf_id_to_name_[id] = name; +} + +void WalTracker::DeleteColumnFamilyMap(uint32_t id, const std::string& name) { + if (name.find("__index__") != std::string::npos) { + return; + } + callback_("DELETE_CF_MANUAL", name, std::to_string(id), ""); + cf_id_to_name_.erase(id); +} + +// Destructor +WalTracker::~WalTracker() { + StopTracking(); +} + +// Start tracking WAL operations +void WalTracker::StartTracking() { + if (is_tracking_) { + return; // Already tracking + } + + should_stop_ = false; + is_tracking_ = true; + + // Create and start tracking thread + tracking_thread_ = std::make_unique(&WalTracker::TrackingThread, this); +} + +// Stop tracking WAL operations +void WalTracker::StopTracking() { + if (!is_tracking_) { + return; // Not tracking + } + + // Signal thread to stop and wait for it + should_stop_ = true; + if (tracking_thread_ && tracking_thread_->joinable()) { + tracking_thread_->join(); + } + + is_tracking_ = false; +} + +// Register a callback for WAL operations +void WalTracker::RegisterCallback(WalOperationCallback callback) { + callback_ = callback; +} + +// Check if tracking is active +bool WalTracker::IsTracking() const { + return is_tracking_; +} + +// Get current sequence number +uint64_t WalTracker::GetCurrentSequence() const { + return current_sequence_; +} + +// Thread function for monitoring WAL +void WalTracker::TrackingThread() { + // Start from the current sequence number of the database + uint64_t last_sequence = 0; + + // First read historical WAL entries + ReadWalLogs(last_sequence); + + // Then monitor for new entries + while (!should_stop_) { + uint64_t current_seq = db_->GetLatestSequenceNumber(); + current_sequence_ = current_seq; // Update current sequence + + if (last_sequence < current_seq) { + ReadWalLogs(last_sequence); + } + else { + // No new sequences, wait a bit + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + } +} + +// Read WAL logs from a sequence number +bool WalTracker::ReadWalLogs(uint64_t& last_sequence) { + std::unique_ptr iter; + rocksdb::Status status = db_->GetUpdatesSince(last_sequence, &iter); + + if (!status.ok()) { + return false; + } + + bool has_new_entries = false; + + while (iter->Valid()) { + const auto& result = iter->GetBatch(); + last_sequence = result.sequence + 1; + + // Process WAL entries if callback is registered + if (callback_) { + WalLogHandler handler(cf_id_to_name_, callback_); + result.writeBatchPtr->Iterate(&handler); + } + + iter->Next(); + has_new_entries = true; + } + + return has_new_entries; +} + +// WalLogHandler implementation +WalTracker::WalLogHandler::WalLogHandler( + std::unordered_map& cf_map, + WalOperationCallback callback) + : cf_id_to_name(cf_map), callback(callback) {} + +rocksdb::Status WalTracker::WalLogHandler::PutCF( + uint32_t column_family_id, + const rocksdb::Slice& key, + const rocksdb::Slice& value) { + std::string cf_name = GetCFName(column_family_id); + if (cf_name == "unknown") { + return rocksdb::Status::OK(); + } + + if (callback) { + callback("PUT", cf_name, key.ToString(), value.ToString()); + } + + return rocksdb::Status::OK(); +} + +rocksdb::Status WalTracker::WalLogHandler::DeleteCF( + uint32_t column_family_id, + const rocksdb::Slice& key) { + std::string cf_name = GetCFName(column_family_id); + if (cf_name == "unknown") { + return rocksdb::Status::OK(); + } + + if (callback) { + callback("DELETE", cf_name, key.ToString(), ""); + } + + return rocksdb::Status::OK(); +} + +std::string WalTracker::WalLogHandler::GetCFName(uint32_t id) const { + auto it = cf_id_to_name.find(id); + return it != cf_id_to_name.end() ? it->second : "unknown"; +} diff --git a/src/storage_engine/WalTracker.h b/src/storage_engine/WalTracker.h new file mode 100644 index 0000000..2fd1712 --- /dev/null +++ b/src/storage_engine/WalTracker.h @@ -0,0 +1,97 @@ +// WalTracker.h +#ifndef WAL_TRACKER_H +#define WAL_TRACKER_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#ifdef _WIN32 +#include +#pragma comment(lib, "ws2_32.lib") +extern "C" uintptr_t __cdecl _beginthreadex(void*, unsigned int, + unsigned int(__stdcall*)(void*), void*, unsigned int, unsigned int*); +#endif + +namespace anudb { + // Callback type for WAL operations + using WalOperationCallback = std::function; + + class WalTracker { + public: + // Constructor - takes a RocksDB instance and optional CF mapping + WalTracker(rocksdb::DB* db, + const std::unordered_map& cf_map = {}); + + // Destructor - stops tracking thread + ~WalTracker(); + + // Start tracking WAL operations + void StartTracking(); + + // Stop tracking WAL operations + void StopTracking(); + + // Register a callback for WAL operations + void RegisterCallback(WalOperationCallback callback); + + // Check if tracking is active + bool IsTracking() const; + + // Get current sequence number + uint64_t GetCurrentSequence() const; + + // Update column family mapping + void UpdateColumnFamilyMap(uint32_t id, const std::string& name); + + // Delete id from column family mapping + void DeleteColumnFamilyMap(uint32_t id, const std::string& name); + + private: + // Handler for WAL operations + class WalLogHandler : public rocksdb::WriteBatch::Handler { + public: + WalLogHandler(std::unordered_map& cf_map, + WalOperationCallback callback); + + rocksdb::Status PutCF(uint32_t column_family_id, + const rocksdb::Slice& key, + const rocksdb::Slice& value) override; + + rocksdb::Status DeleteCF(uint32_t column_family_id, + const rocksdb::Slice& key) override; + + private: + std::unordered_map& cf_id_to_name; + WalOperationCallback callback; + + std::string GetCFName(uint32_t id) const; + }; + + // Thread function for monitoring WAL + void TrackingThread(); + + // Read WAL logs from a sequence number + bool ReadWalLogs(uint64_t& last_sequence); + + // Member variables + rocksdb::DB* db_; + std::unordered_map cf_id_to_name_; + std::unique_ptr tracking_thread_; + std::atomic should_stop_; + std::atomic is_tracking_; + WalOperationCallback callback_; + uint64_t current_sequence_; + }; +}; +#endif // WAL_TRACKER_H \ No newline at end of file diff --git a/sync/AnuDBSyncServer.cpp b/sync/AnuDBSyncServer.cpp deleted file mode 100644 index 19f3b87..0000000 --- a/sync/AnuDBSyncServer.cpp +++ /dev/null @@ -1,766 +0,0 @@ -// WalTracker.h -#ifndef WAL_TRACKER_H -#define WAL_TRACKER_H - -#include -#include -#include -#include -#include -#include -#include -#include -#ifdef _WIN32 -#include -#pragma comment(lib, "ws2_32.lib") -extern "C" uintptr_t __cdecl _beginthreadex(void*, unsigned int, - unsigned int(__stdcall*)(void*), void*, unsigned int, unsigned int*); -#endif -// Callback type for WAL operations -using WalOperationCallback = std::function; - -class WalTracker { -public: - // Constructor - takes a RocksDB instance and optional CF mapping - WalTracker(rocksdb::DB* db, - const std::unordered_map& cf_map = {}); - - // Destructor - stops tracking thread - ~WalTracker(); - - // Start tracking WAL operations - void StartTracking(); - - // Stop tracking WAL operations - void StopTracking(); - - // Register a callback for WAL operations - void RegisterCallback(WalOperationCallback callback); - - // Check if tracking is active - bool IsTracking() const; - - // Get current sequence number - uint64_t GetCurrentSequence() const; - - // Update column family mapping - void UpdateColumnFamilyMap(uint32_t id, const std::string& name); - -private: - // Handler for WAL operations - class WalLogHandler : public rocksdb::WriteBatch::Handler { - public: - WalLogHandler(std::unordered_map& cf_map, - WalOperationCallback callback); - - rocksdb::Status PutCF(uint32_t column_family_id, - const rocksdb::Slice& key, - const rocksdb::Slice& value) override; - - rocksdb::Status DeleteCF(uint32_t column_family_id, - const rocksdb::Slice& key) override; - - private: - std::unordered_map& cf_id_to_name; - WalOperationCallback callback; - - std::string GetCFName(uint32_t id) const; - }; - - // Thread function for monitoring WAL - void TrackingThread(); - - // Read WAL logs from a sequence number - bool ReadWalLogs(uint64_t& last_sequence); - - // Update column family mapping - void UpdateColumnFamilyMap(); - - // Member variables - rocksdb::DB* db_; - std::unordered_map cf_id_to_name_; - std::unique_ptr tracking_thread_; - std::atomic should_stop_; - std::atomic is_tracking_; - WalOperationCallback callback_; - uint64_t current_sequence_; -}; - -#endif // WAL_TRACKER_H - -// WalTracker.cpp -//#include "WalTracker.h" -#include -#include -#include - -// Constructor -WalTracker::WalTracker(rocksdb::DB* db, - const std::unordered_map& cf_map) - : db_(db), - cf_id_to_name_(cf_map), - should_stop_(false), - is_tracking_(false), - current_sequence_(0) { - // Initialize column family mapping if empty - if (cf_id_to_name_.empty()) { - // Default column family always has ID 0 - cf_id_to_name_[0] = "default"; - - // Try to get more column families from DB path - std::vector cf_names; - rocksdb::Status s = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(), - db_->GetName(), - &cf_names); - } -} - -void WalTracker::UpdateColumnFamilyMap(uint32_t id, const std::string& name) { - callback_("CREATE_CF_MANUAL", name, std::to_string(id), ""); - cf_id_to_name_[id] = name; -} - -// Destructor -WalTracker::~WalTracker() { - StopTracking(); -} - -// Start tracking WAL operations -void WalTracker::StartTracking() { - if (is_tracking_) { - return; // Already tracking - } - - should_stop_ = false; - is_tracking_ = true; - - // Create and start tracking thread - tracking_thread_ = std::make_unique(&WalTracker::TrackingThread, this); -} - -// Stop tracking WAL operations -void WalTracker::StopTracking() { - if (!is_tracking_) { - return; // Not tracking - } - - // Signal thread to stop and wait for it - should_stop_ = true; - if (tracking_thread_ && tracking_thread_->joinable()) { - tracking_thread_->join(); - } - - is_tracking_ = false; -} - -// Register a callback for WAL operations -void WalTracker::RegisterCallback(WalOperationCallback callback) { - callback_ = callback; -} - -// Check if tracking is active -bool WalTracker::IsTracking() const { - return is_tracking_; -} - -// Get current sequence number -uint64_t WalTracker::GetCurrentSequence() const { - return current_sequence_; -} - -// Thread function for monitoring WAL -void WalTracker::TrackingThread() { - // Start from the current sequence number of the database - uint64_t last_sequence = 0; - - // First read historical WAL entries - ReadWalLogs(last_sequence); - - // Then monitor for new entries - while (!should_stop_) { - uint64_t current_seq = db_->GetLatestSequenceNumber(); - current_sequence_ = current_seq; // Update current sequence - - if (last_sequence < current_seq) { - ReadWalLogs(last_sequence); - } - else { - // No new sequences, wait a bit - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - } -} - -// Read WAL logs from a sequence number -bool WalTracker::ReadWalLogs(uint64_t& last_sequence) { - std::unique_ptr iter; - rocksdb::Status status = db_->GetUpdatesSince(last_sequence, &iter); - - if (!status.ok()) { - return false; - } - - bool has_new_entries = false; - - while (iter->Valid()) { - const auto& result = iter->GetBatch(); - last_sequence = result.sequence + 1; - - // Process WAL entries if callback is registered - if (callback_) { - WalLogHandler handler(cf_id_to_name_, callback_); - result.writeBatchPtr->Iterate(&handler); - } - - iter->Next(); - has_new_entries = true; - } - - return has_new_entries; -} - -// Update column family mapping -void WalTracker::UpdateColumnFamilyMap() { - // For older RocksDB versions that don't have GetColumnFamilyHandles() - // We need to maintain our own list of column families - - // Default column family always has ID 0 - cf_id_to_name_[0] = "default"; - - // We rely on WAL LogData entries to update our mapping - // when new column families are created -} - -// WalLogHandler implementation -WalTracker::WalLogHandler::WalLogHandler( - std::unordered_map& cf_map, - WalOperationCallback callback) - : cf_id_to_name(cf_map), callback(callback) {} - -rocksdb::Status WalTracker::WalLogHandler::PutCF( - uint32_t column_family_id, - const rocksdb::Slice& key, - const rocksdb::Slice& value) { - std::string cf_name = GetCFName(column_family_id); - - if (callback) { - callback("PUT", cf_name, key.ToString(), value.ToString()); - } - - return rocksdb::Status::OK(); -} - -rocksdb::Status WalTracker::WalLogHandler::DeleteCF( - uint32_t column_family_id, - const rocksdb::Slice& key) { - std::string cf_name = GetCFName(column_family_id); - - if (callback) { - callback("DELETE", cf_name, key.ToString(), ""); - } - - return rocksdb::Status::OK(); -} - -std::string WalTracker::WalLogHandler::GetCFName(uint32_t id) const { - auto it = cf_id_to_name.find(id); - return it != cf_id_to_name.end() ? it->second : "unknown"; -} - -// DatabaseOperations.h -#ifndef DATABASE_OPERATIONS_H -#define DATABASE_OPERATIONS_H - -#include -#include -#include -#include -#include -#include -#include -#include - -class DatabaseOperations { -public: - // Constructor - DatabaseOperations(rocksdb::DB* db, - const std::vector& cf_handles,WalTracker* wal_tracker); - - // Destructor - ~DatabaseOperations(); - - // Start random operations - void StartRandomOperations(int operations_per_second = 10); - - // Stop random operations - void StopRandomOperations(); - - // Create a new column family - rocksdb::ColumnFamilyHandle* CreateNewColumnFamily(const std::string& name); - - // Register callback for operation tracking (for logging purposes) - using OperationCallback = std::function; - void RegisterCallback(OperationCallback callback); - -private: - // Thread function for random operations - void OperationsThread(); - - // Generate random string - std::string GenerateRandomString(int length); - - // Perform random put operation - void RandomPut(); - - // Perform random delete operation - void RandomDelete(); - - // Perform random get operation - void RandomGet(); - - // Perform random scan operation - void RandomScan(); - - // Member variables - rocksdb::DB* db_; - std::vector cf_handles_; - std::vector generated_keys_; - std::unique_ptr operations_thread_; - std::atomic should_stop_; - std::atomic is_running_; - int ops_per_second_; - std::mt19937 random_generator_; - OperationCallback callback_; - WalTracker* waltracker_; -}; - -#endif // DATABASE_OPERATIONS_H - -// DatabaseOperations.cpp -//#include "DatabaseOperations.h" -#include -#include -#include - -// Constructor -DatabaseOperations::DatabaseOperations(rocksdb::DB* db, - const std::vector& cf_handles, - WalTracker* wal_tracker) - : db_(db), - cf_handles_(cf_handles), - should_stop_(false), - is_running_(false), - ops_per_second_(10), - waltracker_(wal_tracker){ - // Initialize random generator - std::random_device rd; - random_generator_.seed(rd()); -} - -// Destructor -DatabaseOperations::~DatabaseOperations() { - StopRandomOperations(); -} - -// Start random operations -void DatabaseOperations::StartRandomOperations(int operations_per_second) { - if (is_running_) { - return; // Already running - } - - ops_per_second_ = operations_per_second; - should_stop_ = false; - is_running_ = true; - - // Create and start operations thread - operations_thread_ = std::make_unique(&DatabaseOperations::OperationsThread, this); -} - -// Stop random operations -void DatabaseOperations::StopRandomOperations() { - if (!is_running_) { - return; // Not running - } - - // Signal thread to stop and wait for it - should_stop_ = true; - if (operations_thread_ && operations_thread_->joinable()) { - operations_thread_->join(); - } - - is_running_ = false; -} - -// Create a new column family -rocksdb::ColumnFamilyHandle* DatabaseOperations::CreateNewColumnFamily(const std::string& name) { - rocksdb::ColumnFamilyHandle* cf_handle = nullptr; - rocksdb::Status status = db_->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), name, &cf_handle); - - if (status.ok() && cf_handle != nullptr) { - cf_handles_.push_back(cf_handle); - - if (waltracker_ != NULL) { - waltracker_->UpdateColumnFamilyMap(cf_handle->GetID(), cf_handle->GetName()); - } - - if (callback_) { - callback_("CREATE_CF_MANUAL", name, std::to_string(cf_handle->GetID()), ""); - } - } - else { - std::cerr << "Error creating column family: " << status.ToString() << std::endl; - } - - return cf_handle; -} - -// Register callback -void DatabaseOperations::RegisterCallback(OperationCallback callback) { - callback_ = callback; -} - -// Thread function for random operations -void DatabaseOperations::OperationsThread() { - // Calculate sleep time between operations - int sleep_ms = 1000 / ops_per_second_; - - while (!should_stop_) { - // Select a random operation type - std::uniform_int_distribution<> op_dist(0, 1); - int operation = op_dist(random_generator_); - - switch (operation) { - case 0: - RandomPut(); - break; - case 1: - RandomDelete(); - break; - case 2: - //RandomGet(); - break; - case 3: - //RandomScan(); - break; - } - - // Sleep between operations - std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms)); - } -} - -// Generate random string -std::string DatabaseOperations::GenerateRandomString(int length) { - static const char alphanum[] = - "0123456789" - "ABCDEFGHIJKLMNOPQRSTUVWXYZ" - "abcdefghijklmnopqrstuvwxyz"; - std::uniform_int_distribution<> dis(0, sizeof(alphanum) - 2); - - std::string result; - result.reserve(length); - for (int i = 0; i < length; ++i) { - result += alphanum[dis(random_generator_)]; - } - return result; -} - -// Perform random put operation -void DatabaseOperations::RandomPut() { - if (cf_handles_.empty()) { - return; - } - - // Select a random column family - std::uniform_int_distribution<> cf_dist(0, cf_handles_.size() - 1); - int cf_index = cf_dist(random_generator_); - rocksdb::ColumnFamilyHandle* cf = cf_handles_[cf_index]; - - // Create key and value - std::string key = "key_" + GenerateRandomString(8); - std::string value = "value_" + GenerateRandomString(16); - - // Store key for potential future delete/get operations - generated_keys_.push_back(key); - if (generated_keys_.size() > 1000) { - generated_keys_.erase(generated_keys_.begin()); - } - - // Write to database - rocksdb::WriteOptions write_options; - write_options.disableWAL = false; // Ensure WAL is enabled - rocksdb::Status status = db_->Put(write_options, cf, key, value); - - if (status.ok() && callback_) { - callback_("PUT_MANUAL", cf->GetName(), key, value); - } -} - -// Perform random delete operation -void DatabaseOperations::RandomDelete() { - if (cf_handles_.empty() || generated_keys_.empty()) { - return; - } - - // Select a random column family - std::uniform_int_distribution<> cf_dist(0, cf_handles_.size() - 1); - int cf_index = cf_dist(random_generator_); - rocksdb::ColumnFamilyHandle* cf = cf_handles_[cf_index]; - - // Select a random key to delete - std::uniform_int_distribution<> key_dist(0, generated_keys_.size() - 1); - int key_index = key_dist(random_generator_); - std::string key = generated_keys_[key_index]; - - // Remove key from generated keys - generated_keys_.erase(generated_keys_.begin() + key_index); - - // Delete from database - rocksdb::WriteOptions write_options; - rocksdb::Status status = db_->Delete(write_options, cf, key); - - if (status.ok() && callback_) { - callback_("DELETE_MANUAL", cf->GetName(), key, ""); - } -} - -// Perform random get operation -void DatabaseOperations::RandomGet() { - if (cf_handles_.empty() || generated_keys_.empty()) { - return; - } - - // Select a random column family - std::uniform_int_distribution<> cf_dist(0, cf_handles_.size() - 1); - int cf_index = cf_dist(random_generator_); - rocksdb::ColumnFamilyHandle* cf = cf_handles_[cf_index]; - - // Select a random key to get - std::uniform_int_distribution<> key_dist(0, generated_keys_.size() - 1); - int key_index = key_dist(random_generator_); - std::string key = generated_keys_[key_index]; - - // Get from database - std::string value; - rocksdb::ReadOptions read_options; - rocksdb::Status status = db_->Get(read_options, cf, key, &value); - - if (status.ok() && callback_) { - callback_("GET", cf->GetName(), key, value); - } -} - -// Perform random scan operation -void DatabaseOperations::RandomScan() { - if (cf_handles_.empty()) { - return; - } - - // Select a random column family - std::uniform_int_distribution<> cf_dist(0, cf_handles_.size() - 1); - int cf_index = cf_dist(random_generator_); - rocksdb::ColumnFamilyHandle* cf = cf_handles_[cf_index]; - - // Create iterator and scan some items - rocksdb::ReadOptions read_options; - std::unique_ptr it(db_->NewIterator(read_options, cf)); - - // Start from a random position or beginning - std::uniform_int_distribution<> start_dist(0, 1); - bool random_start = start_dist(random_generator_) == 1; - - if (random_start && !generated_keys_.empty()) { - std::uniform_int_distribution<> key_dist(0, generated_keys_.size() - 1); - int key_index = key_dist(random_generator_); - std::string start_key = generated_keys_[key_index]; - it->Seek(start_key); - } - else { - it->SeekToFirst(); - } - - // Scan a few items - std::uniform_int_distribution<> count_dist(1, 10); - int items_to_scan = count_dist(random_generator_); - int items_scanned = 0; - - if (callback_) { - callback_("SCAN_START", cf->GetName(), "", ""); - } - - for (; it->Valid() && items_scanned < items_to_scan; it->Next()) { - std::string key = it->key().ToString(); - std::string value = it->value().ToString(); - - if (callback_) { - callback_("SCAN_ITEM", cf->GetName(), key, value); - } - - items_scanned++; - } - - if (callback_) { - callback_("SCAN_END", cf->GetName(), "", ""); - } -} - -// main.cpp - Usage example -//#include "WalTracker.h" -//#include "DatabaseOperations.h" -#include -#include -#include -#include -#include -#include - -// Mutex for thread-safe console output -std::mutex console_mutex; - -// Function to get current timestamp as string -std::string GetTimestamp() { - auto now = std::chrono::system_clock::now(); - auto time_t_now = std::chrono::system_clock::to_time_t(now); - std::stringstream ss; - ss << std::put_time(std::localtime(&time_t_now), "%Y-%m-%d %H:%M:%S"); - return ss.str(); -} - -// WAL operation handler -void WalOperationHandler(const std::string& operation, - const std::string& cf_name, - const std::string& key, - const std::string& value) { - std::lock_guard lock(console_mutex); - - std::cout << "[" << GetTimestamp() << "] [WAL] "; - std::cout << std::left << std::setw(10) << operation; - std::cout << " | CF: " << std::setw(15) << cf_name; - std::cout << " | Key: " << std::setw(20) << key; - - if (!value.empty()) { - // Truncate value if it's too long - std::string display_value = value; - if (display_value.length() > 30) { - display_value = display_value.substr(0, 27) + "..."; - } - std::cout << " | Value: " << display_value; - } - - std::cout << std::endl; -} - -// DB operation handler -void DbOperationHandler(const std::string& operation, - const std::string& cf_name, - const std::string& key, - const std::string& value) { - std::lock_guard lock(console_mutex); -#if 0 - std::cout << "[" << GetTimestamp() << "] [DB] "; - std::cout << std::left << std::setw(12) << operation; - std::cout << " | CF: " << std::setw(15) << cf_name; - std::cout << " | Key: " << std::setw(20) << key; -#endif - if (!value.empty() && operation != "SCAN_START" && operation != "SCAN_END") { - // Truncate value if it's too long - std::string display_value = value; - if (display_value.length() > 30) { - display_value = display_value.substr(0, 27) + "..."; - } - std::cout << " | Value: " << display_value; - } - - std::cout << std::endl; -} - -int main(int argc, char** argv) { - if (argc != 2) { - std::cerr << "Usage: " << argv[0] << " " << std::endl; - return 1; - } - - std::string db_path = argv[1]; - - // Set up RocksDB options - rocksdb::Options options; - options.create_if_missing = true; - options.create_missing_column_families = true; - - // Define initial column families - std::vector cf_names = { "default", "users", "products", "orders" }; - std::vector cf_descriptors; - - for (const auto& name : cf_names) { - rocksdb::ColumnFamilyOptions cf_options; - cf_descriptors.emplace_back(name, cf_options); - } - - // Open database - std::vector cf_handles; - rocksdb::DB* db_raw = nullptr; - rocksdb::Status status = rocksdb::DB::Open(options, db_path, cf_descriptors, &cf_handles, &db_raw); - - if (!status.ok()) { - std::cerr << "Error opening database: " << status.ToString() << std::endl; - return 1; - } - - // Use smart pointer for automatic cleanup - std::unique_ptr db(db_raw); - - // Create column family ID to name mapping for WAL tracker - std::unordered_map cf_id_to_name; - for (auto* handle : cf_handles) { - cf_id_to_name[handle->GetID()] = handle->GetName(); - std::cout << "Column Family: " << handle->GetName() << " (ID: " << handle->GetID() << ")" << std::endl; - } - - // Create WAL tracker - WalTracker wal_tracker(db.get(), cf_id_to_name); - wal_tracker.RegisterCallback(WalOperationHandler); - - // Create database operations manager - DatabaseOperations db_ops(db.get(), cf_handles, &wal_tracker); - db_ops.RegisterCallback(DbOperationHandler); - - // Start WAL tracking - std::cout << "\nStarting WAL tracking..." << std::endl; - wal_tracker.StartTracking(); - - // Start random database operations - std::cout << "Starting random database operations (5 ops/sec)..." << std::endl; - db_ops.StartRandomOperations(5); // 5 operations per second - - // Create a new column family after 5 seconds - std::cout << "\nWill create a new column family after 5 seconds..." << std::endl; - std::this_thread::sleep_for(std::chrono::seconds(5)); - - rocksdb::ColumnFamilyHandle* analytics_cf = db_ops.CreateNewColumnFamily("analytics"); - if (analytics_cf != nullptr) { - std::cout << "Created new column family: analytics (ID: " << analytics_cf->GetID() << ")" << std::endl; - } - - // Wait for user input to stop - std::cout << "\nPress Enter to stop..." << std::endl; - std::cin.get(); - - // Stop operations - db_ops.StopRandomOperations(); - wal_tracker.StopTracking(); - - std::cout << "Random operations and WAL tracking stopped." << std::endl; - - // Clean up column family handles - for (auto* handle : cf_handles) { - db->DestroyColumnFamilyHandle(handle); - } - - return 0; -} \ No newline at end of file diff --git a/sync/CMakeLists.txt b/sync/CMakeLists.txt deleted file mode 100644 index 3ddda1b..0000000 --- a/sync/CMakeLists.txt +++ /dev/null @@ -1,11 +0,0 @@ -add_executable(AnuDBSyncServer AnuDBSyncServer.cpp) - -# Link against your project library -target_link_libraries(AnuDBSyncServer PRIVATE - libanu -) - -# Add include directories -target_include_directories(AnuDBSyncServer PRIVATE - ${CMAKE_SOURCE_DIR}/src ${CMAKE_SOURCE_DIR}/third_party/json ${CMAKE_SOURCE_DIR}/third_party/nanomq/nanomq/include ${CMAKE_SOURCE_DIR}/nng/include/nng/ -) From a8c78bca7bc92adde373eb768acf1f971f80d437 Mon Sep 17 00:00:00 2001 From: Hash Anu Date: Sat, 26 Apr 2025 22:15:16 +0530 Subject: [PATCH 05/13] Build fix --- src/Collection.h | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/Collection.h b/src/Collection.h index fe511b9..3b1b95d 100644 --- a/src/Collection.h +++ b/src/Collection.h @@ -10,18 +10,18 @@ extern "C" uintptr_t __cdecl _beginthreadex(void*, unsigned int, unsigned int(__stdcall*)(void*), void*, unsigned int, unsigned int*); #endif + +namespace anudb { #ifdef MAKE_UNIQUE #include -namespace std { - template - std::unique_ptr make_unique(Args&&... args) { - return std::unique_ptr(new T(std::forward(args)...)); - } -} + namespace std { + template + std::unique_ptr make_unique(Args&&... args) { + return std::unique_ptr(new T(std::forward(args)...)); + } + } #endif - -namespace anudb { // Collection class representing a MongoDB-like collection class Collection { public: From 449d219773149da60b451ddb20b055c2678b295e Mon Sep 17 00:00:00 2001 From: Hash Anu Date: Sat, 26 Apr 2025 22:19:35 +0530 Subject: [PATCH 06/13] Build fix for linux --- src/storage_engine/WalTracker.h | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/storage_engine/WalTracker.h b/src/storage_engine/WalTracker.h index 2fd1712..bb9361c 100644 --- a/src/storage_engine/WalTracker.h +++ b/src/storage_engine/WalTracker.h @@ -21,6 +21,16 @@ extern "C" uintptr_t __cdecl _beginthreadex(void*, unsigned int, #endif namespace anudb { +#ifdef MAKE_UNIQUE +#include + + namespace std { + template + std::unique_ptr make_unique(Args&&... args) { + return std::unique_ptr(new T(std::forward(args)...)); + } + } +#endif // Callback type for WAL operations using WalOperationCallback = std::function Date: Sat, 26 Apr 2025 22:27:36 +0530 Subject: [PATCH 07/13] Adding make_unique fun --- src/Collection.h | 13 +++++++------ src/storage_engine/WalTracker.h | 13 ++++++------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/Collection.h b/src/Collection.h index 3b1b95d..bbf252a 100644 --- a/src/Collection.h +++ b/src/Collection.h @@ -11,17 +11,18 @@ extern "C" uintptr_t __cdecl _beginthreadex(void*, unsigned int, unsigned int(__stdcall*)(void*), void*, unsigned int, unsigned int*); #endif -namespace anudb { #ifdef MAKE_UNIQUE #include - namespace std { - template - std::unique_ptr make_unique(Args&&... args) { - return std::unique_ptr(new T(std::forward(args)...)); - } +namespace std { + template + std::unique_ptr make_unique(Args&&... args) { + return std::unique_ptr(new T(std::forward(args)...)); } +} #endif + +namespace anudb { // Collection class representing a MongoDB-like collection class Collection { public: diff --git a/src/storage_engine/WalTracker.h b/src/storage_engine/WalTracker.h index bb9361c..1094382 100644 --- a/src/storage_engine/WalTracker.h +++ b/src/storage_engine/WalTracker.h @@ -19,18 +19,17 @@ extern "C" uintptr_t __cdecl _beginthreadex(void*, unsigned int, unsigned int(__stdcall*)(void*), void*, unsigned int, unsigned int*); #endif - -namespace anudb { #ifdef MAKE_UNIQUE #include - namespace std { - template - std::unique_ptr make_unique(Args&&... args) { - return std::unique_ptr(new T(std::forward(args)...)); - } +namespace std { + template + std::unique_ptr make_unique(Args&&... args) { + return std::unique_ptr(new T(std::forward(args)...)); } +} #endif +namespace anudb { // Callback type for WAL operations using WalOperationCallback = std::function Date: Sun, 27 Apr 2025 17:05:11 +0530 Subject: [PATCH 08/13] Showing real value of wal logs --- src/main.cpp | 6 ++---- src/storage_engine/WalTracker.cpp | 3 ++- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/main.cpp b/src/main.cpp index 93bc55d..331d866 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -58,9 +58,7 @@ void WalOperationHandler(const std::string& operation, if (!value.empty()) { // Truncate value if it's too long std::string display_value = value; - if (display_value.length() > 30) { - display_value = display_value.substr(0, 27) + "..."; - } + std::cout << " | Value: " << display_value; } @@ -68,7 +66,7 @@ void WalOperationHandler(const std::string& operation, } int main() { - bool walTracker = false; + bool walTracker = true; // Database initialization Database db("./product_db"); Status status = db.open(walTracker); diff --git a/src/storage_engine/WalTracker.cpp b/src/storage_engine/WalTracker.cpp index 897c1f0..7e34dda 100644 --- a/src/storage_engine/WalTracker.cpp +++ b/src/storage_engine/WalTracker.cpp @@ -154,7 +154,8 @@ rocksdb::Status WalTracker::WalLogHandler::PutCF( } if (callback) { - callback("PUT", cf_name, key.ToString(), value.ToString()); + std::vector value_vec(value.data(), value.data() + value.size()); + callback("PUT", cf_name, key.ToString(), json::from_msgpack(value_vec)["data"].dump()); } return rocksdb::Status::OK(); From 56c0b20eee9cbaa0441d9e58ed76f75099dec50a Mon Sep 17 00:00:00 2001 From: hash-anu Date: Sun, 27 Apr 2025 11:39:55 +0000 Subject: [PATCH 09/13] Build fix for linux --- CMakeLists.txt | 4 ++++ src/Collection.h | 11 ----------- src/main.cpp | 2 +- src/storage_engine/WalTracker.h | 4 +++- 4 files changed, 8 insertions(+), 13 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index c638b59..4695585 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -58,6 +58,10 @@ set(WITH_FUZZER OFF CACHE BOOL "Disable fuzzing tools" FORCE) add_subdirectory(third_party/rocksdb) # Add storage engine add_subdirectory(src/storage_engine) +if (CMAKE_SYSTEM_NAME STREQUAL "Linux") +target_compile_options(rocksdb PRIVATE -frtti) +target_compile_options(libstorage PRIVATE -frtti) +endif() set(LIBRARY_SOURCES ${CMAKE_SOURCE_DIR}/src/Cursor.cpp ${CMAKE_SOURCE_DIR}/src/Database.cpp ${CMAKE_SOURCE_DIR}/src/Collection.cpp ${CMAKE_SOURCE_DIR}/src/Document.cpp) diff --git a/src/Collection.h b/src/Collection.h index bbf252a..f75b75d 100644 --- a/src/Collection.h +++ b/src/Collection.h @@ -11,17 +11,6 @@ extern "C" uintptr_t __cdecl _beginthreadex(void*, unsigned int, unsigned int(__stdcall*)(void*), void*, unsigned int, unsigned int*); #endif -#ifdef MAKE_UNIQUE -#include - -namespace std { - template - std::unique_ptr make_unique(Args&&... args) { - return std::unique_ptr(new T(std::forward(args)...)); - } -} -#endif - namespace anudb { // Collection class representing a MongoDB-like collection class Collection { diff --git a/src/main.cpp b/src/main.cpp index 331d866..28d1bd4 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -718,4 +718,4 @@ int main() { std::cout << "C++ version: " << __cplusplus << std::endl; return 0; -} \ No newline at end of file +} diff --git a/src/storage_engine/WalTracker.h b/src/storage_engine/WalTracker.h index 1094382..4ab7cd9 100644 --- a/src/storage_engine/WalTracker.h +++ b/src/storage_engine/WalTracker.h @@ -19,6 +19,8 @@ extern "C" uintptr_t __cdecl _beginthreadex(void*, unsigned int, unsigned int(__stdcall*)(void*), void*, unsigned int, unsigned int*); #endif +#include "json.hpp" +using json = nlohmann::json; #ifdef MAKE_UNIQUE #include @@ -103,4 +105,4 @@ namespace anudb { uint64_t current_sequence_; }; }; -#endif // WAL_TRACKER_H \ No newline at end of file +#endif // WAL_TRACKER_H From b3f39cdbf5489ce9be0392b141efff3c80d9b87f Mon Sep 17 00:00:00 2001 From: hash-anu Date: Mon, 12 May 2025 17:11:25 +0000 Subject: [PATCH 10/13] Making WalTracker to ARM compatible --- src/main.cpp | 18 +++++++++++------- src/storage_engine/WalTracker.h | 2 +- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/main.cpp b/src/main.cpp index 28d1bd4..813277e 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -35,14 +35,18 @@ void executeQuery(Collection* collection, const json& query, const std::string& #include // Mutex for thread-safe console output std::mutex console_mutex; -// Function to get current timestamp as string +// Get current time as formatted string std::string GetTimestamp() { - auto now = std::chrono::system_clock::now(); - auto time_t_now = std::chrono::system_clock::to_time_t(now); - std::stringstream ss; - ss << std::put_time(std::localtime(&time_t_now), "%Y-%m-%d %H:%M:%S"); - return ss.str(); -} + std::stringstream ss; +auto now = std::chrono::system_clock::now(); +std::time_t now_time_t = std::chrono::system_clock::to_time_t(now); +std::tm* tm_info = std::localtime(&now_time_t); + +char time_buffer[20]; // "YYYY-MM-DD HH:MM:SS" +std::strftime(time_buffer, sizeof(time_buffer), "%Y-%m-%d %H:%M:%S", tm_info); +ss << time_buffer; + return ss.str(); + } // WAL operation handler void WalOperationHandler(const std::string& operation, const std::string& cf_name, diff --git a/src/storage_engine/WalTracker.h b/src/storage_engine/WalTracker.h index 4ab7cd9..d97f9ab 100644 --- a/src/storage_engine/WalTracker.h +++ b/src/storage_engine/WalTracker.h @@ -42,7 +42,7 @@ namespace anudb { public: // Constructor - takes a RocksDB instance and optional CF mapping WalTracker(rocksdb::DB* db, - const std::unordered_map& cf_map = {}); + const std::unordered_map& cf_map); // Destructor - stops tracking thread ~WalTracker(); From ac1441b01f693254cd80c5c71d68c16177629b05 Mon Sep 17 00:00:00 2001 From: Hash Anu Date: Sun, 22 Jun 2025 15:15:19 +0530 Subject: [PATCH 11/13] Indentation --- src/main.cpp | 1394 +++++++++++++++++++++++++------------------------- 1 file changed, 697 insertions(+), 697 deletions(-) diff --git a/src/main.cpp b/src/main.cpp index 813277e..fc7dbf9 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -9,27 +9,27 @@ using json = nlohmann::json; // Helper function to print document void printDocument(const Document& doc) { - std::cout << "Document ID: " << doc.id() << "\nContent:\n" << doc.data().dump(4) << "\n" << std::endl; + std::cout << "Document ID: " << doc.id() << "\nContent:\n" << doc.data().dump(4) << "\n" << std::endl; } // Helper function to execute a query and print results void executeQuery(Collection* collection, const json& query, const std::string& queryName) { - std::vector docIds; - std::cout << "\n===== Executing " << queryName << " =====\n"; - - docIds = collection->findDocument(query); - - std::cout << "Found " << docIds.size() << " document(s)" << std::endl; - for (const std::string& docId : docIds) { - Document doc; - Status status = collection->readDocument(docId, doc); - if (status.ok()) { - printDocument(doc); - } - else { - std::cerr << "Failed to read document " << docId << ": " << status.message() << std::endl; - } - } + std::vector docIds; + std::cout << "\n===== Executing " << queryName << " =====\n"; + + docIds = collection->findDocument(query); + + std::cout << "Found " << docIds.size() << " document(s)" << std::endl; + for (const std::string& docId : docIds) { + Document doc; + Status status = collection->readDocument(docId, doc); + if (status.ok()) { + printDocument(doc); + } + else { + std::cerr << "Failed to read document " << docId << ": " << status.message() << std::endl; + } + } } #include @@ -37,689 +37,689 @@ void executeQuery(Collection* collection, const json& query, const std::string& std::mutex console_mutex; // Get current time as formatted string std::string GetTimestamp() { - std::stringstream ss; -auto now = std::chrono::system_clock::now(); -std::time_t now_time_t = std::chrono::system_clock::to_time_t(now); -std::tm* tm_info = std::localtime(&now_time_t); - -char time_buffer[20]; // "YYYY-MM-DD HH:MM:SS" -std::strftime(time_buffer, sizeof(time_buffer), "%Y-%m-%d %H:%M:%S", tm_info); -ss << time_buffer; - return ss.str(); - } + std::stringstream ss; + auto now = std::chrono::system_clock::now(); + std::time_t now_time_t = std::chrono::system_clock::to_time_t(now); + std::tm* tm_info = std::localtime(&now_time_t); + + char time_buffer[20]; // "YYYY-MM-DD HH:MM:SS" + std::strftime(time_buffer, sizeof(time_buffer), "%Y-%m-%d %H:%M:%S", tm_info); + ss << time_buffer; + return ss.str(); +} // WAL operation handler void WalOperationHandler(const std::string& operation, - const std::string& cf_name, - const std::string& key, - const std::string& value) { - std::lock_guard lock(console_mutex); - - std::cout << "[" << GetTimestamp() << "] [WAL] "; - std::cout << std::left << std::setw(10) << operation; - std::cout << " | CF: " << std::setw(15) << cf_name; - std::cout << " | Key: " << std::setw(20) << key; - - if (!value.empty()) { - // Truncate value if it's too long - std::string display_value = value; - - std::cout << " | Value: " << display_value; - } - - std::cout << std::endl; + const std::string& cf_name, + const std::string& key, + const std::string& value) { + std::lock_guard lock(console_mutex); + + std::cout << "[" << GetTimestamp() << "] [WAL] "; + std::cout << std::left << std::setw(10) << operation; + std::cout << " | CF: " << std::setw(15) << cf_name; + std::cout << " | Key: " << std::setw(20) << key; + + if (!value.empty()) { + // Truncate value if it's too long + std::string display_value = value; + + std::cout << " | Value: " << display_value; + } + + std::cout << std::endl; } int main() { - bool walTracker = true; - // Database initialization - Database db("./product_db"); - Status status = db.open(walTracker); - if (!status.ok()) { - std::cerr << "Failed to open database: " << status.message() << std::endl; - return 1; - } - std::cout << "Database opened successfully." << std::endl; - - if (walTracker) { - db.registerCallback(WalOperationHandler); - } - - // Create products collection - status = db.createCollection("products"); - if (!status.ok()) { - // Check if the collection already exists - if so, continue; otherwise, return - if (status.message().find("already exists") != std::string::npos) { - std::cout << "Collection 'products' already exists, continuing..." << std::endl; - } - else { - std::cerr << "Failed to create collection: " << status.message() << std::endl; - return 1; - } - } - else { - std::cout << "Collection 'products' created successfully." << std::endl; - } - - // Get the collection - Collection* products = db.getCollection("products"); - if (!products) { - std::cerr << "Failed to get collection." << std::endl; - return 1; - } - - // Create sample product documents with different structures - // Electronics product - complex nested structure - json product1 = { - {"name", "Laptop"}, - {"price", 1299.99}, - {"stock", 45}, - {"category", "Electronics"}, - {"rating", 4.7}, - {"brand", "TechMaster"}, - {"specs", { - {"processor", "i9"}, - {"ram", "32GB"}, - {"storage", "1TB SSD"} - }}, - {"tags", {"laptop", "gaming", "high-performance"}}, - {"dimensions", { - {"length", 35.8}, - {"width", 24.7}, - {"height", 1.9} - }}, - {"available", true} - }; - - // Smartphone product - array of objects structure - json product2 = { - {"name", "Smartphone"}, - {"price", 799.99}, - {"stock", 160}, - {"category", "Electronics"}, - {"rating", 4.5}, - {"brand", "MobiTech"}, - {"colors", {"Black", "Silver", "Blue"}}, - {"features", { - {"camera", "48MP"}, - {"display", "AMOLED"}, - {"battery", "5000mAh"} - }}, - {"reviews", { - { - {"user", "user123"}, - {"rating", 5}, - {"comment", "Great phone!"} - }, - { - {"user", "tech_reviewer"}, - {"rating", 4}, - {"comment", "Good performance but battery drains quickly"} - } - }}, - {"available", true} - }; - - // Book product - simple flat structure - json product3 = { - {"name", "Programming in C++"}, - {"price", 49.99}, - {"stock", 75}, - {"category", "Books"}, - {"rating", 4.2}, - {"author", "John Smith"}, - {"publisher", "Tech Books Inc"}, - {"pages", 450}, - {"isbn", "978-3-16-148410-0"}, - {"published_date", "2023-03-15"}, - {"available", true} - }; - - // Food product - another structure - json product4 = { - {"name", "Organic Coffee"}, - {"price", 15.99}, - {"stock", 200}, - {"category", "Food"}, - {"rating", 4.8}, - {"brand", "BeanMaster"}, - {"weight", "500g"}, - {"origin", "Colombia"}, - {"expiry_date", "2025-06-30"}, - {"nutritional_info", { - {"calories", 0}, - {"fat", "0g"}, - {"caffeine", "95mg per serving"} - }}, - {"available", false} - }; - - // Create document objects - Document doc1("prod001", product1); - Document doc2("prod002", product2); - Document doc3("prod003", product3); - Document doc4("prod004", product4); - - // Insert documents - std::vector documents = { doc1, doc2, doc3, doc4 }; - - for (Document& doc : documents) { - status = products->createDocument(doc); - if (!status.ok()) { - if (status.message().find("already exists") != std::string::npos) { - std::cout << "Document " << doc.id() << " already exists, updating instead..." << std::endl; - - // If document exists, update it instead - json updateDoc = { - {"$set", doc.data()} - }; - - status = products->updateDocument(doc.id(), updateDoc); - if (!status.ok()) { - std::cerr << "Failed to update existing document " << doc.id() << ": " << status.message() << std::endl; - } - else { - std::cout << "Document " << doc.id() << " updated." << std::endl; - } - } - else { - std::cerr << "Failed to create document " << doc.id() << ": " << status.message() << std::endl; - } - } - else { - std::cout << "Document " << doc.id() << " created successfully." << std::endl; - } - } - - // Create indexes - std::cout << "\n===== Creating Indexes =====\n"; - for (const auto& field : { "price", "stock", "category", "rating", "available", "name" }) { - status = products->createIndex(field); - if (!status.ok()) { - if (status.message().find("already exists") != std::string::npos) { - std::cout << "Index on '" << field << "' already exists." << std::endl; - } - else { - std::cerr << "Failed to create index on " << field << ": " << status.message() << std::endl; - } - } - else { - std::cout << "Index on '" << field << "' created successfully." << std::endl; - } - } - - // ===== Query Examples ===== - - // 1. OrderBy queries - std::cout << "\n===== OrderBy Queries =====\n"; - - json orderByPriceAsc = { - {"$orderBy", { - {"price", "asc"} - }} - }; - executeQuery(products, orderByPriceAsc, "Order By Price (Ascending)"); - - json orderByRatingDesc = { - {"$orderBy", { - {"rating", "desc"} - }} - }; - executeQuery(products, orderByRatingDesc, "Order By Rating (Descending)"); - - // 2. Equality queries - std::cout << "\n===== Equality Queries =====\n"; - - json eqCategory = { - {"$eq", { - {"category", "Electronics"} - }} - }; - executeQuery(products, eqCategory, "Equal Category: Electronics"); - - json eqAvailable = { - {"$eq", { - {"available", true} - }} - }; - executeQuery(products, eqAvailable, "Equal Available: true"); - - // 3. Greater than queries - std::cout << "\n===== Greater Than Queries =====\n"; - - json gtPrice = { - {"$gt", { - {"price", 50.0} - }} - }; - executeQuery(products, gtPrice, "Price > 50.0"); - - json gtRating = { - {"$gt", { - {"rating", 4.5} - }} - }; - executeQuery(products, gtRating, "Rating > 4.5"); - - // 4. Less than queries - std::cout << "\n===== Less Than Queries =====\n"; - - json ltStock = { - {"$lt", { - {"stock", 100} - }} - }; - executeQuery(products, ltStock, "Stock < 100"); - - json ltPrice = { - {"$lt", { - {"price", 500.0} - }} - }; - executeQuery(products, ltPrice, "Price < 500.0"); - - // 5. AND queries - std::cout << "\n===== AND Queries =====\n"; - - json andPriceRating = { - {"$and", { - {{"$gt", {{"price", 100.0}}}}, - {{"$lt", {{"price", 1000.0}}}} - }} - }; - executeQuery(products, andPriceRating, "100 < Price < 1000"); - - json andCategoryAvailable = { - {"$and", { - {{"$eq", {{"category", "Electronics"}}}}, - {{"$eq", {{"available", true}}}} - }} - }; - executeQuery(products, andCategoryAvailable, "Category = Electronics AND Available = true"); - - // 6. OR queries - std::cout << "\n===== OR Queries =====\n"; - - json orCategories = { - {"$or", { - {{"$eq", {{"category", "Books"}}}}, - {{"$eq", {{"category", "Food"}}}} - }} - }; - executeQuery(products, orCategories, "Category = Books OR Category = Food"); - - json orRatingStock = { - {"$or", { - {{"$gt", {{"rating", 4.7}}}}, - {{"$gt", {{"stock", 150}}}} - }} - }; - executeQuery(products, orRatingStock, "Rating > 4.7 OR Stock > 150"); - - // ===== Update Examples ===== - std::cout << "\n===== Update Operations =====\n"; - - // Example 1: $set - Top-level fields - std::cout << "\n----- $set Operation: Top-level Fields -----\n"; - - // Show original document - Document updateDoc1; - status = products->readDocument("prod001", updateDoc1); - if (status.ok()) { - std::cout << "Original document:" << std::endl; - printDocument(updateDoc1); - } - - json setTopLevel = { - {"$set", { - {"price", 1399.99}, - {"stock", 50}, - {"rating", 4.8}, - {"promotion", "Summer Sale"} // Add new field - }} - }; - - status = products->updateDocument("prod001", setTopLevel); - if (!status.ok()) { - std::cerr << "Failed to update document: " << status.message() << std::endl; - } - else { - std::cout << "Document updated with $set operator (top-level fields)" << std::endl; - status = products->readDocument("prod001", updateDoc1); - if (status.ok()) { - printDocument(updateDoc1); - } - } - - // Example 2: $set - Nested fields - std::cout << "\n----- $set Operation: Nested Fields -----\n"; - - json setNested = { - {"$set", { - {"specs.processor", "i9-12900K"}, - {"specs.ram", "64GB"}, - {"specs.storage", "2TB SSD"}, - }} - }; - - status = products->updateDocument("prod001", setNested); - if (!status.ok()) { - std::cerr << "Failed to update document: " << status.message() << std::endl; - } - else { - std::cout << "Document updated with $set operator (nested fields)" << std::endl; - status = products->readDocument("prod001", updateDoc1); - if (status.ok()) { - printDocument(updateDoc1); - } - } - - // Example 3: $unset - Top-level fields - std::cout << "\n----- $unset Operation: Top-level Fields -----\n"; - - json unsetTopLevel = { - {"$unset", { - {"promotion", ""}, - {"available", ""} - }} - }; - - status = products->updateDocument("prod001", unsetTopLevel); - if (!status.ok()) { - std::cerr << "Failed to update document: " << status.message() << std::endl; - } - else { - std::cout << "Document updated with $unset operator (top-level fields)" << std::endl; - status = products->readDocument("prod001", updateDoc1); - if (status.ok()) { - printDocument(updateDoc1); - } - } - - // Example 4: $unset - Nested fields - std::cout << "\n----- $unset Operation: Nested Fields -----\n"; - - json unsetNested = { - {"$unset", { - {"specs.storage", ""}, - {"dimensions.height", ""} - }} - }; - - status = products->updateDocument("prod001", unsetNested); - if (!status.ok()) { - std::cerr << "Failed to update document: " << status.message() << std::endl; - } - else { - std::cout << "Document updated with $unset operator (nested fields)" << std::endl; - status = products->readDocument("prod001", updateDoc1); - if (status.ok()) { - printDocument(updateDoc1); - } - } - - // Example 5: $push - Top-level array field - std::cout << "\n----- $push Operation: Top-level Array Fields -----\n"; - - Document updateDoc2; - status = products->readDocument("prod001", updateDoc2); - if (status.ok()) { - std::cout << "Before $push operation:" << std::endl; - printDocument(updateDoc2); - } - - json pushTopLevel = { - {"$push", { - {"tags", "limited-edition"}, - {"specs", {{"storage", "2 GB"}} } - }} - }; - - status = products->updateDocument("prod001", pushTopLevel, true); - if (!status.ok()) { - std::cerr << "Failed to update document: " << status.message() << std::endl; - } - else { - std::cout << "Document updated with $push operator" << std::endl; - status = products->readDocument("prod001", updateDoc2); - if (status.ok()) { - printDocument(updateDoc2); - } - } - - // Example 6: $pull - Top-level array field - std::cout << "\n----- $pull Operation: Top-level Array Fields -----\n"; - - json pullTopLevel = { - {"$pull", { - {"tags", "limited-edition"} - }} - }; - - status = products->updateDocument("prod001", pullTopLevel); - if (!status.ok()) { - std::cerr << "Failed to update document: " << status.message() << std::endl; - } - else { - std::cout << "Document updated with $pull operator" << std::endl; - status = products->readDocument("prod001", updateDoc2); - if (status.ok()) { - printDocument(updateDoc2); - } - } - - // Example 7: Updating smartphone product with reviews - std::cout << "\n----- Updating Smartphone Document -----\n"; - - Document smartphoneDoc; - status = products->readDocument("prod002", smartphoneDoc); - if (status.ok()) { - std::cout << "Original smartphone document:" << std::endl; - printDocument(smartphoneDoc); - } - - // Setting new element is not allowed - json setSmartphone = { - {"$set", { - {"features.waterproof", "IP68"}, - {"price", 849.99} - }} - }; - - status = products->updateDocument("prod002", setSmartphone); - if (!status.ok()) { - std::cerr << "Failed to update smartphone document: " << status.message() << std::endl; - } - else { - std::cout << "Smartphone document updated with new features" << std::endl; - status = products->readDocument("prod002", smartphoneDoc); - if (status.ok()) { - printDocument(smartphoneDoc); - } - } - - // Add a new review - json newReview = { - {"user", "mobile_fan"}, - {"rating", 5}, - {"comment", "Best smartphone I've ever owned!"} - }; - - json pushReview = { - {"$push", { - {"reviews", newReview} - }} - }; - - status = products->updateDocument("prod002", pushReview, true); - if (!status.ok()) { - std::cerr << "Failed to add review: " << status.message() << std::endl; - } - else { - std::cout << "Review added to smartphone document" << std::endl; - status = products->readDocument("prod002", smartphoneDoc); - if (status.ok()) { - printDocument(smartphoneDoc); - } - } - - // Example 8: Updating book product - std::cout << "\n----- Updating Book Document -----\n"; - - Document bookDoc; - status = products->readDocument("prod003", bookDoc); - if (status.ok()) { - std::cout << "Original book document:" << std::endl; - printDocument(bookDoc); - } - - // Update book information - json updateBook = { - {"$set", { - {"edition", "Second Edition"}, - {"price", 39.99}, - {"stock", 100} - }} - }; - - status = products->updateDocument("prod003", updateBook); - if (!status.ok()) { - std::cerr << "Failed to update book document: " << status.message() << std::endl; - } - else { - std::cout << "Book document updated" << std::endl; - status = products->readDocument("prod003", bookDoc); - if (status.ok()) { - printDocument(bookDoc); - } - } - - // Example 9: Updating food product - std::cout << "\n----- Updating Food Document -----\n"; - - Document foodDoc; - status = products->readDocument("prod004", foodDoc); - if (status.ok()) { - std::cout << "Original food document:" << std::endl; - printDocument(foodDoc); - } - - // Update food availability and add certifications - json updateFood = { - {"$set", { - {"available", true}, - {"certifications", {"Organic", "Fair Trade", "Rainforest Alliance"}} - }} - }; - - status = products->updateDocument("prod004", updateFood); - if (!status.ok()) { - std::cerr << "Failed to update food document: " << status.message() << std::endl; - } - else { - std::cout << "Food document updated" << std::endl; - status = products->readDocument("prod004", foodDoc); - if (status.ok()) { - printDocument(foodDoc); - } - } - - // Export all documents - std::cout << "\n===== Exporting Documents =====\n"; - status = db.exportAllToJsonAsync("products", "./product_export/"); - if (!status.ok()) { - std::cerr << "Failed to export documents: " << status.message() << std::endl; - } - else { - std::cout << "Documents exported successfully to ./product_export/" << std::endl; - } - products->waitForExportOperation(); - - status = db.createCollection("products_import"); - if (!status.ok()) { - // Check if the collection already exists - if so, continue; otherwise, return - if (status.message().find("already exists") != std::string::npos) { - std::cout << "Collection 'products' already exists, continuing..." << std::endl; - } - else { - std::cerr << "Failed to create collection: " << status.message() << std::endl; - return 1; - } - } - else { - std::cout << "Collection 'products_import' created successfully." << std::endl; - } - - status = db.importFromJsonFile("products_import", "./product_export/products.json"); - if (!status.ok()) { - std::cerr << "Failed to import documents: " << status.message() << std::endl; - } - else { - std::cout << "Documents imported successfully to products_import collection" << std::endl; - } - - std::vector docIds; - status = db.readAllDocuments("products_import", docIds); - if (!status.ok()) { - std::cerr << "Failed to import documents: " << status.message() << std::endl; - } - else { - std::cout << "Documents imported successfully to products_import collection" << std::endl; - } - - for (Document doc : docIds) { - std::cout << doc.data().dump(4) << std::endl; - } - // Clean up - Delete documents - std::cout << "\n===== Cleanup Operations =====\n"; - - // Delete a document - std::string docIdToDelete = "prod004"; - status = products->deleteDocument(docIdToDelete); - if (!status.ok()) { - std::cerr << "Failed to delete document " << docIdToDelete << ": " << status.message() << std::endl; - } - else { - std::cout << "Document " << docIdToDelete << " deleted successfully." << std::endl; - } - - // Delete indexes - std::vector indexesToDelete = { "stock", "rating", "available" }; - for (const auto& indexName : indexesToDelete) { - status = products->deleteIndex(indexName); - if (!status.ok()) { - std::cerr << "Failed to delete index on " << indexName << ": " << status.message() << std::endl; - } - else { - std::cout << "Index on " << indexName << " deleted successfully." << std::endl; - } - } - - // List remaining collections - std::cout << "\n===== Collections in Database =====\n"; - for (const auto& name : db.getCollectionNames()) { - std::cout << "- " << name << std::endl; - } - - // Drop collection - std::string collectionToDrop = "products"; - std::cout << "\nDropping collection '" << collectionToDrop << "'" << std::endl; - status = db.dropCollection(collectionToDrop); - if (!status.ok()) { - std::cerr << "Failed to drop collection: " << status.message() << std::endl; - } - else { - std::cout << "Collection dropped successfully." << std::endl; - } - - // Close database - status = db.close(); - if (!status.ok()) { - std::cerr << "Failed to close database: " << status.message() << std::endl; - return 1; - } - - std::cout << "\nDatabase closed successfully." << std::endl; - std::cout << "C++ version: " << __cplusplus << std::endl; - - return 0; + bool walTracker = true; + // Database initialization + Database db("./product_db"); + Status status = db.open(walTracker); + if (!status.ok()) { + std::cerr << "Failed to open database: " << status.message() << std::endl; + return 1; + } + std::cout << "Database opened successfully." << std::endl; + + if (walTracker) { + db.registerCallback(WalOperationHandler); + } + + // Create products collection + status = db.createCollection("products"); + if (!status.ok()) { + // Check if the collection already exists - if so, continue; otherwise, return + if (status.message().find("already exists") != std::string::npos) { + std::cout << "Collection 'products' already exists, continuing..." << std::endl; + } + else { + std::cerr << "Failed to create collection: " << status.message() << std::endl; + return 1; + } + } + else { + std::cout << "Collection 'products' created successfully." << std::endl; + } + + // Get the collection + Collection* products = db.getCollection("products"); + if (!products) { + std::cerr << "Failed to get collection." << std::endl; + return 1; + } + + // Create sample product documents with different structures + // Electronics product - complex nested structure + json product1 = { + {"name", "Laptop"}, + {"price", 1299.99}, + {"stock", 45}, + {"category", "Electronics"}, + {"rating", 4.7}, + {"brand", "TechMaster"}, + {"specs", { + {"processor", "i9"}, + {"ram", "32GB"}, + {"storage", "1TB SSD"} + }}, + {"tags", {"laptop", "gaming", "high-performance"}}, + {"dimensions", { + {"length", 35.8}, + {"width", 24.7}, + {"height", 1.9} + }}, + {"available", true} + }; + + // Smartphone product - array of objects structure + json product2 = { + {"name", "Smartphone"}, + {"price", 799.99}, + {"stock", 160}, + {"category", "Electronics"}, + {"rating", 4.5}, + {"brand", "MobiTech"}, + {"colors", {"Black", "Silver", "Blue"}}, + {"features", { + {"camera", "48MP"}, + {"display", "AMOLED"}, + {"battery", "5000mAh"} + }}, + {"reviews", { + { + {"user", "user123"}, + {"rating", 5}, + {"comment", "Great phone!"} + }, + { + {"user", "tech_reviewer"}, + {"rating", 4}, + {"comment", "Good performance but battery drains quickly"} + } + }}, + {"available", true} + }; + + // Book product - simple flat structure + json product3 = { + {"name", "Programming in C++"}, + {"price", 49.99}, + {"stock", 75}, + {"category", "Books"}, + {"rating", 4.2}, + {"author", "John Smith"}, + {"publisher", "Tech Books Inc"}, + {"pages", 450}, + {"isbn", "978-3-16-148410-0"}, + {"published_date", "2023-03-15"}, + {"available", true} + }; + + // Food product - another structure + json product4 = { + {"name", "Organic Coffee"}, + {"price", 15.99}, + {"stock", 200}, + {"category", "Food"}, + {"rating", 4.8}, + {"brand", "BeanMaster"}, + {"weight", "500g"}, + {"origin", "Colombia"}, + {"expiry_date", "2025-06-30"}, + {"nutritional_info", { + {"calories", 0}, + {"fat", "0g"}, + {"caffeine", "95mg per serving"} + }}, + {"available", false} + }; + + // Create document objects + Document doc1("prod001", product1); + Document doc2("prod002", product2); + Document doc3("prod003", product3); + Document doc4("prod004", product4); + + // Insert documents + std::vector documents = { doc1, doc2, doc3, doc4 }; + + for (Document& doc : documents) { + status = products->createDocument(doc); + if (!status.ok()) { + if (status.message().find("already exists") != std::string::npos) { + std::cout << "Document " << doc.id() << " already exists, updating instead..." << std::endl; + + // If document exists, update it instead + json updateDoc = { + {"$set", doc.data()} + }; + + status = products->updateDocument(doc.id(), updateDoc); + if (!status.ok()) { + std::cerr << "Failed to update existing document " << doc.id() << ": " << status.message() << std::endl; + } + else { + std::cout << "Document " << doc.id() << " updated." << std::endl; + } + } + else { + std::cerr << "Failed to create document " << doc.id() << ": " << status.message() << std::endl; + } + } + else { + std::cout << "Document " << doc.id() << " created successfully." << std::endl; + } + } + + // Create indexes + std::cout << "\n===== Creating Indexes =====\n"; + for (const auto& field : { "price", "stock", "category", "rating", "available", "name" }) { + status = products->createIndex(field); + if (!status.ok()) { + if (status.message().find("already exists") != std::string::npos) { + std::cout << "Index on '" << field << "' already exists." << std::endl; + } + else { + std::cerr << "Failed to create index on " << field << ": " << status.message() << std::endl; + } + } + else { + std::cout << "Index on '" << field << "' created successfully." << std::endl; + } + } + + // ===== Query Examples ===== + + // 1. OrderBy queries + std::cout << "\n===== OrderBy Queries =====\n"; + + json orderByPriceAsc = { + {"$orderBy", { + {"price", "asc"} + }} + }; + executeQuery(products, orderByPriceAsc, "Order By Price (Ascending)"); + + json orderByRatingDesc = { + {"$orderBy", { + {"rating", "desc"} + }} + }; + executeQuery(products, orderByRatingDesc, "Order By Rating (Descending)"); + + // 2. Equality queries + std::cout << "\n===== Equality Queries =====\n"; + + json eqCategory = { + {"$eq", { + {"category", "Electronics"} + }} + }; + executeQuery(products, eqCategory, "Equal Category: Electronics"); + + json eqAvailable = { + {"$eq", { + {"available", true} + }} + }; + executeQuery(products, eqAvailable, "Equal Available: true"); + + // 3. Greater than queries + std::cout << "\n===== Greater Than Queries =====\n"; + + json gtPrice = { + {"$gt", { + {"price", 50.0} + }} + }; + executeQuery(products, gtPrice, "Price > 50.0"); + + json gtRating = { + {"$gt", { + {"rating", 4.5} + }} + }; + executeQuery(products, gtRating, "Rating > 4.5"); + + // 4. Less than queries + std::cout << "\n===== Less Than Queries =====\n"; + + json ltStock = { + {"$lt", { + {"stock", 100} + }} + }; + executeQuery(products, ltStock, "Stock < 100"); + + json ltPrice = { + {"$lt", { + {"price", 500.0} + }} + }; + executeQuery(products, ltPrice, "Price < 500.0"); + + // 5. AND queries + std::cout << "\n===== AND Queries =====\n"; + + json andPriceRating = { + {"$and", { + {{"$gt", {{"price", 100.0}}}}, + {{"$lt", {{"price", 1000.0}}}} + }} + }; + executeQuery(products, andPriceRating, "100 < Price < 1000"); + + json andCategoryAvailable = { + {"$and", { + {{"$eq", {{"category", "Electronics"}}}}, + {{"$eq", {{"available", true}}}} + }} + }; + executeQuery(products, andCategoryAvailable, "Category = Electronics AND Available = true"); + + // 6. OR queries + std::cout << "\n===== OR Queries =====\n"; + + json orCategories = { + {"$or", { + {{"$eq", {{"category", "Books"}}}}, + {{"$eq", {{"category", "Food"}}}} + }} + }; + executeQuery(products, orCategories, "Category = Books OR Category = Food"); + + json orRatingStock = { + {"$or", { + {{"$gt", {{"rating", 4.7}}}}, + {{"$gt", {{"stock", 150}}}} + }} + }; + executeQuery(products, orRatingStock, "Rating > 4.7 OR Stock > 150"); + + // ===== Update Examples ===== + std::cout << "\n===== Update Operations =====\n"; + + // Example 1: $set - Top-level fields + std::cout << "\n----- $set Operation: Top-level Fields -----\n"; + + // Show original document + Document updateDoc1; + status = products->readDocument("prod001", updateDoc1); + if (status.ok()) { + std::cout << "Original document:" << std::endl; + printDocument(updateDoc1); + } + + json setTopLevel = { + {"$set", { + {"price", 1399.99}, + {"stock", 50}, + {"rating", 4.8}, + {"promotion", "Summer Sale"} // Add new field + }} + }; + + status = products->updateDocument("prod001", setTopLevel); + if (!status.ok()) { + std::cerr << "Failed to update document: " << status.message() << std::endl; + } + else { + std::cout << "Document updated with $set operator (top-level fields)" << std::endl; + status = products->readDocument("prod001", updateDoc1); + if (status.ok()) { + printDocument(updateDoc1); + } + } + + // Example 2: $set - Nested fields + std::cout << "\n----- $set Operation: Nested Fields -----\n"; + + json setNested = { + {"$set", { + {"specs.processor", "i9-12900K"}, + {"specs.ram", "64GB"}, + {"specs.storage", "2TB SSD"}, + }} + }; + + status = products->updateDocument("prod001", setNested); + if (!status.ok()) { + std::cerr << "Failed to update document: " << status.message() << std::endl; + } + else { + std::cout << "Document updated with $set operator (nested fields)" << std::endl; + status = products->readDocument("prod001", updateDoc1); + if (status.ok()) { + printDocument(updateDoc1); + } + } + + // Example 3: $unset - Top-level fields + std::cout << "\n----- $unset Operation: Top-level Fields -----\n"; + + json unsetTopLevel = { + {"$unset", { + {"promotion", ""}, + {"available", ""} + }} + }; + + status = products->updateDocument("prod001", unsetTopLevel); + if (!status.ok()) { + std::cerr << "Failed to update document: " << status.message() << std::endl; + } + else { + std::cout << "Document updated with $unset operator (top-level fields)" << std::endl; + status = products->readDocument("prod001", updateDoc1); + if (status.ok()) { + printDocument(updateDoc1); + } + } + + // Example 4: $unset - Nested fields + std::cout << "\n----- $unset Operation: Nested Fields -----\n"; + + json unsetNested = { + {"$unset", { + {"specs.storage", ""}, + {"dimensions.height", ""} + }} + }; + + status = products->updateDocument("prod001", unsetNested); + if (!status.ok()) { + std::cerr << "Failed to update document: " << status.message() << std::endl; + } + else { + std::cout << "Document updated with $unset operator (nested fields)" << std::endl; + status = products->readDocument("prod001", updateDoc1); + if (status.ok()) { + printDocument(updateDoc1); + } + } + + // Example 5: $push - Top-level array field + std::cout << "\n----- $push Operation: Top-level Array Fields -----\n"; + + Document updateDoc2; + status = products->readDocument("prod001", updateDoc2); + if (status.ok()) { + std::cout << "Before $push operation:" << std::endl; + printDocument(updateDoc2); + } + + json pushTopLevel = { + {"$push", { + {"tags", "limited-edition"}, + {"specs", {{"storage", "2 GB"}} } + }} + }; + + status = products->updateDocument("prod001", pushTopLevel, true); + if (!status.ok()) { + std::cerr << "Failed to update document: " << status.message() << std::endl; + } + else { + std::cout << "Document updated with $push operator" << std::endl; + status = products->readDocument("prod001", updateDoc2); + if (status.ok()) { + printDocument(updateDoc2); + } + } + + // Example 6: $pull - Top-level array field + std::cout << "\n----- $pull Operation: Top-level Array Fields -----\n"; + + json pullTopLevel = { + {"$pull", { + {"tags", "limited-edition"} + }} + }; + + status = products->updateDocument("prod001", pullTopLevel); + if (!status.ok()) { + std::cerr << "Failed to update document: " << status.message() << std::endl; + } + else { + std::cout << "Document updated with $pull operator" << std::endl; + status = products->readDocument("prod001", updateDoc2); + if (status.ok()) { + printDocument(updateDoc2); + } + } + + // Example 7: Updating smartphone product with reviews + std::cout << "\n----- Updating Smartphone Document -----\n"; + + Document smartphoneDoc; + status = products->readDocument("prod002", smartphoneDoc); + if (status.ok()) { + std::cout << "Original smartphone document:" << std::endl; + printDocument(smartphoneDoc); + } + + // Setting new element is not allowed + json setSmartphone = { + {"$set", { + {"features.waterproof", "IP68"}, + {"price", 849.99} + }} + }; + + status = products->updateDocument("prod002", setSmartphone); + if (!status.ok()) { + std::cerr << "Failed to update smartphone document: " << status.message() << std::endl; + } + else { + std::cout << "Smartphone document updated with new features" << std::endl; + status = products->readDocument("prod002", smartphoneDoc); + if (status.ok()) { + printDocument(smartphoneDoc); + } + } + + // Add a new review + json newReview = { + {"user", "mobile_fan"}, + {"rating", 5}, + {"comment", "Best smartphone I've ever owned!"} + }; + + json pushReview = { + {"$push", { + {"reviews", newReview} + }} + }; + + status = products->updateDocument("prod002", pushReview, true); + if (!status.ok()) { + std::cerr << "Failed to add review: " << status.message() << std::endl; + } + else { + std::cout << "Review added to smartphone document" << std::endl; + status = products->readDocument("prod002", smartphoneDoc); + if (status.ok()) { + printDocument(smartphoneDoc); + } + } + + // Example 8: Updating book product + std::cout << "\n----- Updating Book Document -----\n"; + + Document bookDoc; + status = products->readDocument("prod003", bookDoc); + if (status.ok()) { + std::cout << "Original book document:" << std::endl; + printDocument(bookDoc); + } + + // Update book information + json updateBook = { + {"$set", { + {"edition", "Second Edition"}, + {"price", 39.99}, + {"stock", 100} + }} + }; + + status = products->updateDocument("prod003", updateBook); + if (!status.ok()) { + std::cerr << "Failed to update book document: " << status.message() << std::endl; + } + else { + std::cout << "Book document updated" << std::endl; + status = products->readDocument("prod003", bookDoc); + if (status.ok()) { + printDocument(bookDoc); + } + } + + // Example 9: Updating food product + std::cout << "\n----- Updating Food Document -----\n"; + + Document foodDoc; + status = products->readDocument("prod004", foodDoc); + if (status.ok()) { + std::cout << "Original food document:" << std::endl; + printDocument(foodDoc); + } + + // Update food availability and add certifications + json updateFood = { + {"$set", { + {"available", true}, + {"certifications", {"Organic", "Fair Trade", "Rainforest Alliance"}} + }} + }; + + status = products->updateDocument("prod004", updateFood); + if (!status.ok()) { + std::cerr << "Failed to update food document: " << status.message() << std::endl; + } + else { + std::cout << "Food document updated" << std::endl; + status = products->readDocument("prod004", foodDoc); + if (status.ok()) { + printDocument(foodDoc); + } + } + + // Export all documents + std::cout << "\n===== Exporting Documents =====\n"; + status = db.exportAllToJsonAsync("products", "./product_export/"); + if (!status.ok()) { + std::cerr << "Failed to export documents: " << status.message() << std::endl; + } + else { + std::cout << "Documents exported successfully to ./product_export/" << std::endl; + } + products->waitForExportOperation(); + + status = db.createCollection("products_import"); + if (!status.ok()) { + // Check if the collection already exists - if so, continue; otherwise, return + if (status.message().find("already exists") != std::string::npos) { + std::cout << "Collection 'products' already exists, continuing..." << std::endl; + } + else { + std::cerr << "Failed to create collection: " << status.message() << std::endl; + return 1; + } + } + else { + std::cout << "Collection 'products_import' created successfully." << std::endl; + } + + status = db.importFromJsonFile("products_import", "./product_export/products.json"); + if (!status.ok()) { + std::cerr << "Failed to import documents: " << status.message() << std::endl; + } + else { + std::cout << "Documents imported successfully to products_import collection" << std::endl; + } + + std::vector docIds; + status = db.readAllDocuments("products_import", docIds); + if (!status.ok()) { + std::cerr << "Failed to import documents: " << status.message() << std::endl; + } + else { + std::cout << "Documents imported successfully to products_import collection" << std::endl; + } + + for (Document doc : docIds) { + std::cout << doc.data().dump(4) << std::endl; + } + // Clean up - Delete documents + std::cout << "\n===== Cleanup Operations =====\n"; + + // Delete a document + std::string docIdToDelete = "prod004"; + status = products->deleteDocument(docIdToDelete); + if (!status.ok()) { + std::cerr << "Failed to delete document " << docIdToDelete << ": " << status.message() << std::endl; + } + else { + std::cout << "Document " << docIdToDelete << " deleted successfully." << std::endl; + } + + // Delete indexes + std::vector indexesToDelete = { "stock", "rating", "available" }; + for (const auto& indexName : indexesToDelete) { + status = products->deleteIndex(indexName); + if (!status.ok()) { + std::cerr << "Failed to delete index on " << indexName << ": " << status.message() << std::endl; + } + else { + std::cout << "Index on " << indexName << " deleted successfully." << std::endl; + } + } + + // List remaining collections + std::cout << "\n===== Collections in Database =====\n"; + for (const auto& name : db.getCollectionNames()) { + std::cout << "- " << name << std::endl; + } + + // Drop collection + std::string collectionToDrop = "products"; + std::cout << "\nDropping collection '" << collectionToDrop << "'" << std::endl; + status = db.dropCollection(collectionToDrop); + if (!status.ok()) { + std::cerr << "Failed to drop collection: " << status.message() << std::endl; + } + else { + std::cout << "Collection dropped successfully." << std::endl; + } + + // Close database + status = db.close(); + if (!status.ok()) { + std::cerr << "Failed to close database: " << status.message() << std::endl; + return 1; + } + + std::cout << "\nDatabase closed successfully." << std::endl; + std::cout << "C++ version: " << __cplusplus << std::endl; + + return 0; } From a4f4c630eeac377604c9a3ad500f190ca084d122 Mon Sep 17 00:00:00 2001 From: Hash Anu Date: Sun, 22 Jun 2025 18:10:00 +0530 Subject: [PATCH 12/13] Log create/delete cf --- src/main.cpp | 15 +++++++++++--- src/storage_engine/StorageEngine.cpp | 29 ++++++++++++++++++++++++++-- src/storage_engine/WalTracker.cpp | 2 -- 3 files changed, 39 insertions(+), 7 deletions(-) diff --git a/src/main.cpp b/src/main.cpp index fc7dbf9..a7e8cc9 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -53,7 +53,9 @@ void WalOperationHandler(const std::string& operation, const std::string& key, const std::string& value) { std::lock_guard lock(console_mutex); - + if (value.find("__index__") != std::string::npos) { + return; // or continue; depending on context + } std::cout << "[" << GetTimestamp() << "] [WAL] "; std::cout << std::left << std::setw(10) << operation; std::cout << " | CF: " << std::setw(15) << cf_name; @@ -62,7 +64,6 @@ void WalOperationHandler(const std::string& operation, if (!value.empty()) { // Truncate value if it's too long std::string display_value = value; - std::cout << " | Value: " << display_value; } @@ -711,13 +712,21 @@ int main() { std::cout << "Collection dropped successfully." << std::endl; } + collectionToDrop = "products_import"; + std::cout << "\nDropping collection '" << collectionToDrop << "'" << std::endl; + status = db.dropCollection(collectionToDrop); + if (!status.ok()) { + std::cerr << "Failed to drop collection: " << status.message() << std::endl; + } + else { + std::cout << "Collection dropped successfully." << std::endl; + } // Close database status = db.close(); if (!status.ok()) { std::cerr << "Failed to close database: " << status.message() << std::endl; return 1; } - std::cout << "\nDatabase closed successfully." << std::endl; std::cout << "C++ version: " << __cplusplus << std::endl; diff --git a/src/storage_engine/StorageEngine.cpp b/src/storage_engine/StorageEngine.cpp index 6b1adca..404f535 100644 --- a/src/storage_engine/StorageEngine.cpp +++ b/src/storage_engine/StorageEngine.cpp @@ -125,6 +125,7 @@ Status StorageEngine::close() { ownedHandles_.clear(); if (wal_tracker_) { + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // stop waltracker waltracker_->StopTracking(); if (waltracker_) { @@ -166,8 +167,20 @@ Status StorageEngine::createCollection(const std::string& name) { return Status::IOError(s.ToString()); } - if (wal_tracker_) { + if (wal_tracker_ /* && handle->GetName().find(index_delimiter_) == std::string::npos*/) { waltracker_->UpdateColumnFamilyMap(handle->GetID(), handle->GetName()); + // Wrap the event string inside a "data" key + std::string event_string = "CREATE_CF:" + handle->GetName(); + json wrapped = { + {"data", event_string} + }; + // Serialize to MessagePack + std::vector msgpack_data = json::to_msgpack(wrapped); + // Insert into RocksDB + rocksdb::WriteBatch batch; + batch.Put("__meta__event", rocksdb::Slice(reinterpret_cast(msgpack_data.data()), msgpack_data.size())); + // Write to DB + db_->Write(rocksdb::WriteOptions(), &batch); } // Store the handle @@ -193,8 +206,20 @@ Status StorageEngine::dropCollection(const std::string& name) { return Status::IOError(s.ToString()); } - if (wal_tracker_) { + if (wal_tracker_ /*&& handle->GetName().find(index_delimiter_) == std::string::npos*/) { waltracker_->DeleteColumnFamilyMap(handle->GetID(), handle->GetName()); + // Wrap the event string inside a "data" key + std::string event_string = "DELETE_CF:" + handle->GetName(); + json wrapped = { + {"data", event_string} + }; + // Serialize to MessagePack + std::vector msgpack_data = json::to_msgpack(wrapped); + // Insert into RocksDB + rocksdb::WriteBatch batch; + batch.Put("__meta__event", rocksdb::Slice(reinterpret_cast(msgpack_data.data()), msgpack_data.size())); + // Write to DB + db_->Write(rocksdb::WriteOptions(), &batch); } // Remove from our map diff --git a/src/storage_engine/WalTracker.cpp b/src/storage_engine/WalTracker.cpp index 7e34dda..4132485 100644 --- a/src/storage_engine/WalTracker.cpp +++ b/src/storage_engine/WalTracker.cpp @@ -27,7 +27,6 @@ void WalTracker::UpdateColumnFamilyMap(uint32_t id, const std::string& name) { if (name.find("__index__") != std::string::npos) { return; } - callback_("CREATE_CF_MANUAL", name, std::to_string(id), ""); cf_id_to_name_[id] = name; } @@ -35,7 +34,6 @@ void WalTracker::DeleteColumnFamilyMap(uint32_t id, const std::string& name) { if (name.find("__index__") != std::string::npos) { return; } - callback_("DELETE_CF_MANUAL", name, std::to_string(id), ""); cf_id_to_name_.erase(id); } From 9b35acae1ade91a6faa16d10e5c2dd236df633d1 Mon Sep 17 00:00:00 2001 From: Hash Anu Date: Sun, 22 Jun 2025 23:12:19 +0530 Subject: [PATCH 13/13] Modifying import operation logic to load GBs of json --- src/Collection.cpp | 86 ++++++++++++ src/Collection.h | 203 ++++++++++++++++++++++++++- src/storage_engine/StorageEngine.cpp | 2 - 3 files changed, 287 insertions(+), 4 deletions(-) diff --git a/src/Collection.cpp b/src/Collection.cpp index b846561..a46307f 100644 --- a/src/Collection.cpp +++ b/src/Collection.cpp @@ -450,6 +450,7 @@ Status Collection::deleteIfIndexFieldExists(const Document& doc, const std::stri return engine_->remove(getIndexCfName(index), key.str()); } +#if 0 Status Collection::importFromJsonFile(const std::string& filePath) { try { @@ -511,6 +512,91 @@ Status Collection::importFromJsonFile(const std::string& filePath) { return Status::InternalError("JSON import error: " + std::string(e.what())); } } +#else +Status Collection::importFromJsonFile(const std::string& filePath) { + try { + std::ifstream file(filePath, std::ios::binary); + if (!file.is_open()) { + return Status::IOError("Could not open file: " + filePath); + } + + int successCount = 0; + int failureCount = 0; + + // Create parser in a scope to ensure cleanup + { + MemoryEfficientJsonParser parser(file); + + std::string objectStr; + while (!(objectStr = parser.extractNextObject()).empty()) { + try { + // Parse in a nested scope for immediate cleanup + { + json item = json::parse(objectStr); + + // Clear immediately after parsing + objectStr.clear(); + objectStr.shrink_to_fit(); + + if (!item.is_object()) { + failureCount++; + continue; + } + + std::string docId; + if (item.contains("_id")) { + docId = item["_id"].get(); + } + else { + docId = "doc_" + std::to_string(successCount + failureCount); + } + + // Create document in nested scope + { + Document doc(docId, item); + Status status = createDocument(doc); + if (status.ok()) { + successCount++; + } + else { + failureCount++; + std::cerr << "Failed to import document " << docId << ": " + << status.message() << std::endl; + } + } // doc destroyed here + } // item destroyed here + + } + catch (const json::parse_error& e) { + failureCount++; + objectStr.clear(); + objectStr.shrink_to_fit(); + continue; + } + } + + if (!parser.isArrayStarted()) { + return Status::NotSupported("File must contain a JSON array of objects: " + filePath); + } + } // parser destroyed here, buffer memory freed + + if (successCount == 0 && failureCount == 0) { + return Status::NotSupported("No valid JSON objects found in array: " + filePath); + } + + std::cout << "JSON Array Import Summary:\n" + << "File: " << filePath << "\n" + << "Successfully imported: " << successCount << " documents\n" + << "Failed to import: " << failureCount << " documents" << std::endl; + + return Status::OK(); + + } + catch (const std::exception& e) { + return Status::InternalError("JSON import error: " + std::string(e.what())); + } +} +#endif Status Collection::exportAllToJsonAsync(const std::string& exportPath) { ExportTask task(engine_, name_, exportPath); diff --git a/src/Collection.h b/src/Collection.h index f75b75d..e2be3bd 100644 --- a/src/Collection.h +++ b/src/Collection.h @@ -45,13 +45,13 @@ namespace anudb { Status readAllDocuments(std::vector& docIds, uint64_t limit = 10); // Read all documents from the collection - Status exportAllToJsonAsync(const std::string &exportPath); + Status exportAllToJsonAsync(const std::string& exportPath); // Read all documents from the collection Status importFromJsonFile(const std::string& filePath); // Update document from the collection whose Id is matching - Status updateDocument(const std::string& id, const json &update, bool upsert = false); + Status updateDocument(const std::string& id, const json& update, bool upsert = false); // find document from the collection whose filter option is matchin std::vector findDocument(const json& filterOption); @@ -107,5 +107,204 @@ namespace anudb { std::string collection_name_; std::string output_path_; }; + + class MemoryEfficientJsonParser { + private: + std::ifstream& file; + std::vector buffer; + size_t bufferPos; + size_t validSize; + static const size_t BUFFER_SIZE = 8192; // 8KB chunks + bool arrayStarted; + bool arrayEnded; + + public: + MemoryEfficientJsonParser(std::ifstream& f) + : file(f), bufferPos(0), validSize(0), arrayStarted(false), arrayEnded(false) { + buffer.resize(BUFFER_SIZE * 2); // Fixed 16KB buffer + } + + ~MemoryEfficientJsonParser() { + // Explicit cleanup + buffer.clear(); + buffer.shrink_to_fit(); + } + + // Fill buffer with new data, moving remaining data to beginning + bool fillBuffer() { + if (file.eof()) return false; + + // Move remaining data to beginning using efficient memmove + if (bufferPos > 0 && bufferPos < validSize) { + size_t remaining = validSize - bufferPos; + std::memmove(&buffer[0], &buffer[bufferPos], remaining); + validSize = remaining; + bufferPos = 0; + } + else if (bufferPos >= validSize) { + validSize = 0; + bufferPos = 0; + } + + // Read new data into available space + size_t spaceAvailable = buffer.size() - validSize; + if (spaceAvailable > 0) { + file.read(&buffer[validSize], spaceAvailable); + std::streamsize bytesRead = file.gcount(); + if (bytesRead > 0) { + validSize += bytesRead; + return true; + } + } + return false; + } + + // Skip whitespace characters + void skipWhitespace() { + while (bufferPos < validSize && std::isspace(buffer[bufferPos])) { + bufferPos++; + } + + // Refill buffer if we're near the end + if (bufferPos >= validSize - 10 && !file.eof()) { + fillBuffer(); + while (bufferPos < validSize && std::isspace(buffer[bufferPos])) { + bufferPos++; + } + } + } + + // Extract next JSON object from array + std::string extractNextObject() { + if (arrayEnded) return ""; + + skipWhitespace(); + + // Ensure we have data + if (bufferPos >= validSize) { + if (!fillBuffer()) return ""; + skipWhitespace(); + } + + if (bufferPos >= validSize) return ""; + + char ch = buffer[bufferPos]; + + // Handle array start + if (!arrayStarted && ch == '[') { + arrayStarted = true; + bufferPos++; + skipWhitespace(); + + // Check for empty array + if (bufferPos < validSize && buffer[bufferPos] == ']') { + arrayEnded = true; + return ""; + } + } + // Handle array end + else if (ch == ']') { + arrayEnded = true; + return ""; + } + // Handle comma separator + else if (ch == ',') { + bufferPos++; + skipWhitespace(); + } + + // Verify we're in an array + if (!arrayStarted) { + return ""; // Not a valid JSON array + } + + // Extract the JSON object + return extractJsonObject(); + } + + bool isArrayStarted() const { return arrayStarted; } + bool isArrayEnded() const { return arrayEnded; } + + private: + // Extract a single JSON object using incremental building + std::string extractJsonObject() { + skipWhitespace(); + + if (bufferPos >= validSize || buffer[bufferPos] != '{') { + return ""; // Not a valid object start + } + + // Use incremental result building to avoid large string copies + std::string result; + result.reserve(512); // Start with reasonable size + + size_t objectStart = bufferPos; + int braceCount = 0; + bool inString = false; + bool escaped = false; + + while (true) { + // Check if we need more data + if (bufferPos >= validSize) { + // Append current segment to result + if (objectStart < validSize) { + result.append(&buffer[objectStart], validSize - objectStart); + } + + // Reset buffer and read more data + bufferPos = 0; + validSize = 0; + objectStart = 0; + + if (!fillBuffer()) { + break; // End of file + } + } + + char ch = buffer[bufferPos]; + + // JSON parsing state machine + if (!inString) { + if (ch == '"') { + inString = true; + } + else if (ch == '{') { + braceCount++; + } + else if (ch == '}') { + braceCount--; + if (braceCount == 0) { + // Complete object found + bufferPos++; // Include closing brace + + // Append final segment + result.append(&buffer[objectStart], bufferPos - objectStart); + + // Optimize memory usage + result.shrink_to_fit(); + return result; + } + } + } + else { + // Inside string + if (escaped) { + escaped = false; + } + else if (ch == '\\') { + escaped = true; + } + else if (ch == '"') { + inString = false; + } + } + + bufferPos++; + } + + // Incomplete object (shouldn't happen with valid JSON) + return ""; + } + }; } #endif // COLLECTION_H diff --git a/src/storage_engine/StorageEngine.cpp b/src/storage_engine/StorageEngine.cpp index 404f535..cfedeaa 100644 --- a/src/storage_engine/StorageEngine.cpp +++ b/src/storage_engine/StorageEngine.cpp @@ -89,7 +89,6 @@ Status StorageEngine::open(bool walTracker) { // Create column family ID to name mapping for WAL tracker for (auto* handle : handles) { cf_id_to_name_[handle->GetID()] = handle->GetName(); - std::cout << "Column Family: " << handle->GetName() << " (ID: " << handle->GetID() << ")" << std::endl; } // Create WAL tracker @@ -98,7 +97,6 @@ Status StorageEngine::open(bool walTracker) { } // Print estimated memory usage size_t estimated_mem = RocksDBOptimizer::estimateMemoryUsage(config); - //std::cout << "Estimated memory usage by storage engine: " << (estimated_mem >> 20) << "MB\n"; return Status::OK(); }