diff --git a/CMakeLists.txt b/CMakeLists.txt index c638b59..4695585 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -58,6 +58,10 @@ set(WITH_FUZZER OFF CACHE BOOL "Disable fuzzing tools" FORCE) add_subdirectory(third_party/rocksdb) # Add storage engine add_subdirectory(src/storage_engine) +if (CMAKE_SYSTEM_NAME STREQUAL "Linux") +target_compile_options(rocksdb PRIVATE -frtti) +target_compile_options(libstorage PRIVATE -frtti) +endif() set(LIBRARY_SOURCES ${CMAKE_SOURCE_DIR}/src/Cursor.cpp ${CMAKE_SOURCE_DIR}/src/Database.cpp ${CMAKE_SOURCE_DIR}/src/Collection.cpp ${CMAKE_SOURCE_DIR}/src/Document.cpp) diff --git a/src/Collection.cpp b/src/Collection.cpp index b846561..a46307f 100644 --- a/src/Collection.cpp +++ b/src/Collection.cpp @@ -450,6 +450,7 @@ Status Collection::deleteIfIndexFieldExists(const Document& doc, const std::stri return engine_->remove(getIndexCfName(index), key.str()); } +#if 0 Status Collection::importFromJsonFile(const std::string& filePath) { try { @@ -511,6 +512,91 @@ Status Collection::importFromJsonFile(const std::string& filePath) { return Status::InternalError("JSON import error: " + std::string(e.what())); } } +#else +Status Collection::importFromJsonFile(const std::string& filePath) { + try { + std::ifstream file(filePath, std::ios::binary); + if (!file.is_open()) { + return Status::IOError("Could not open file: " + filePath); + } + + int successCount = 0; + int failureCount = 0; + + // Create parser in a scope to ensure cleanup + { + MemoryEfficientJsonParser parser(file); + + std::string objectStr; + while (!(objectStr = parser.extractNextObject()).empty()) { + try { + // Parse in a nested scope for immediate cleanup + { + json item = json::parse(objectStr); + + // Clear immediately after parsing + objectStr.clear(); + objectStr.shrink_to_fit(); + + if (!item.is_object()) { + failureCount++; + continue; + } + + std::string docId; + if (item.contains("_id")) { + docId = item["_id"].get(); + } + else { + docId = "doc_" + std::to_string(successCount + failureCount); + } + + // Create document in nested scope + { + Document doc(docId, item); + Status status = createDocument(doc); + if (status.ok()) { + successCount++; + } + else { + failureCount++; + std::cerr << "Failed to import document " << docId << ": " + << status.message() << std::endl; + } + } // doc destroyed here + } // item destroyed here + + } + catch (const json::parse_error& e) { + failureCount++; + objectStr.clear(); + objectStr.shrink_to_fit(); + continue; + } + } + + if (!parser.isArrayStarted()) { + return Status::NotSupported("File must contain a JSON array of objects: " + filePath); + } + } // parser destroyed here, buffer memory freed + + if (successCount == 0 && failureCount == 0) { + return Status::NotSupported("No valid JSON objects found in array: " + filePath); + } + + std::cout << "JSON Array Import Summary:\n" + << "File: " << filePath << "\n" + << "Successfully imported: " << successCount << " documents\n" + << "Failed to import: " << failureCount << " documents" << std::endl; + + return Status::OK(); + + } + catch (const std::exception& e) { + return Status::InternalError("JSON import error: " + std::string(e.what())); + } +} +#endif Status Collection::exportAllToJsonAsync(const std::string& exportPath) { ExportTask task(engine_, name_, exportPath); diff --git a/src/Collection.h b/src/Collection.h index fe511b9..e2be3bd 100644 --- a/src/Collection.h +++ b/src/Collection.h @@ -10,16 +10,6 @@ extern "C" uintptr_t __cdecl _beginthreadex(void*, unsigned int, unsigned int(__stdcall*)(void*), void*, unsigned int, unsigned int*); #endif -#ifdef MAKE_UNIQUE -#include - -namespace std { - template - std::unique_ptr make_unique(Args&&... args) { - return std::unique_ptr(new T(std::forward(args)...)); - } -} -#endif namespace anudb { // Collection class representing a MongoDB-like collection @@ -55,13 +45,13 @@ namespace anudb { Status readAllDocuments(std::vector& docIds, uint64_t limit = 10); // Read all documents from the collection - Status exportAllToJsonAsync(const std::string &exportPath); + Status exportAllToJsonAsync(const std::string& exportPath); // Read all documents from the collection Status importFromJsonFile(const std::string& filePath); // Update document from the collection whose Id is matching - Status updateDocument(const std::string& id, const json &update, bool upsert = false); + Status updateDocument(const std::string& id, const json& update, bool upsert = false); // find document from the collection whose filter option is matchin std::vector findDocument(const json& filterOption); @@ -117,5 +107,204 @@ namespace anudb { std::string collection_name_; std::string output_path_; }; + + class MemoryEfficientJsonParser { + private: + std::ifstream& file; + std::vector buffer; + size_t bufferPos; + size_t validSize; + static const size_t BUFFER_SIZE = 8192; // 8KB chunks + bool arrayStarted; + bool arrayEnded; + + public: + MemoryEfficientJsonParser(std::ifstream& f) + : file(f), bufferPos(0), validSize(0), arrayStarted(false), arrayEnded(false) { + buffer.resize(BUFFER_SIZE * 2); // Fixed 16KB buffer + } + + ~MemoryEfficientJsonParser() { + // Explicit cleanup + buffer.clear(); + buffer.shrink_to_fit(); + } + + // Fill buffer with new data, moving remaining data to beginning + bool fillBuffer() { + if (file.eof()) return false; + + // Move remaining data to beginning using efficient memmove + if (bufferPos > 0 && bufferPos < validSize) { + size_t remaining = validSize - bufferPos; + std::memmove(&buffer[0], &buffer[bufferPos], remaining); + validSize = remaining; + bufferPos = 0; + } + else if (bufferPos >= validSize) { + validSize = 0; + bufferPos = 0; + } + + // Read new data into available space + size_t spaceAvailable = buffer.size() - validSize; + if (spaceAvailable > 0) { + file.read(&buffer[validSize], spaceAvailable); + std::streamsize bytesRead = file.gcount(); + if (bytesRead > 0) { + validSize += bytesRead; + return true; + } + } + return false; + } + + // Skip whitespace characters + void skipWhitespace() { + while (bufferPos < validSize && std::isspace(buffer[bufferPos])) { + bufferPos++; + } + + // Refill buffer if we're near the end + if (bufferPos >= validSize - 10 && !file.eof()) { + fillBuffer(); + while (bufferPos < validSize && std::isspace(buffer[bufferPos])) { + bufferPos++; + } + } + } + + // Extract next JSON object from array + std::string extractNextObject() { + if (arrayEnded) return ""; + + skipWhitespace(); + + // Ensure we have data + if (bufferPos >= validSize) { + if (!fillBuffer()) return ""; + skipWhitespace(); + } + + if (bufferPos >= validSize) return ""; + + char ch = buffer[bufferPos]; + + // Handle array start + if (!arrayStarted && ch == '[') { + arrayStarted = true; + bufferPos++; + skipWhitespace(); + + // Check for empty array + if (bufferPos < validSize && buffer[bufferPos] == ']') { + arrayEnded = true; + return ""; + } + } + // Handle array end + else if (ch == ']') { + arrayEnded = true; + return ""; + } + // Handle comma separator + else if (ch == ',') { + bufferPos++; + skipWhitespace(); + } + + // Verify we're in an array + if (!arrayStarted) { + return ""; // Not a valid JSON array + } + + // Extract the JSON object + return extractJsonObject(); + } + + bool isArrayStarted() const { return arrayStarted; } + bool isArrayEnded() const { return arrayEnded; } + + private: + // Extract a single JSON object using incremental building + std::string extractJsonObject() { + skipWhitespace(); + + if (bufferPos >= validSize || buffer[bufferPos] != '{') { + return ""; // Not a valid object start + } + + // Use incremental result building to avoid large string copies + std::string result; + result.reserve(512); // Start with reasonable size + + size_t objectStart = bufferPos; + int braceCount = 0; + bool inString = false; + bool escaped = false; + + while (true) { + // Check if we need more data + if (bufferPos >= validSize) { + // Append current segment to result + if (objectStart < validSize) { + result.append(&buffer[objectStart], validSize - objectStart); + } + + // Reset buffer and read more data + bufferPos = 0; + validSize = 0; + objectStart = 0; + + if (!fillBuffer()) { + break; // End of file + } + } + + char ch = buffer[bufferPos]; + + // JSON parsing state machine + if (!inString) { + if (ch == '"') { + inString = true; + } + else if (ch == '{') { + braceCount++; + } + else if (ch == '}') { + braceCount--; + if (braceCount == 0) { + // Complete object found + bufferPos++; // Include closing brace + + // Append final segment + result.append(&buffer[objectStart], bufferPos - objectStart); + + // Optimize memory usage + result.shrink_to_fit(); + return result; + } + } + } + else { + // Inside string + if (escaped) { + escaped = false; + } + else if (ch == '\\') { + escaped = true; + } + else if (ch == '"') { + inString = false; + } + } + + bufferPos++; + } + + // Incomplete object (shouldn't happen with valid JSON) + return ""; + } + }; } #endif // COLLECTION_H diff --git a/src/Database.cpp b/src/Database.cpp index 99c2a9d..93b3ae1 100644 --- a/src/Database.cpp +++ b/src/Database.cpp @@ -2,9 +2,9 @@ using namespace anudb; -Status Database::open() { +Status Database::open(bool walTracker) { isDbOpen_ = true; - return engine_.open(); + return engine_.open(walTracker); } Status Database::close() { @@ -77,6 +77,10 @@ Status Database::exportAllToJsonAsync(const std::string& collectionName, const s return collection->exportAllToJsonAsync(exportPath); } +void Database::registerCallback(WalOperationCallback callback) { + engine_.registerCallback(callback); +} + Status Database::importFromJsonFile(const std::string& collectionName, const std::string& importFile) { Collection* collection = getCollection(collectionName); if (!collection) { diff --git a/src/Database.h b/src/Database.h index 4227a89..cad701f 100644 --- a/src/Database.h +++ b/src/Database.h @@ -9,7 +9,7 @@ namespace anudb { class Database { public: Database(const std::string& dbPath) : engine_(dbPath) {} - Status open(); + Status open(bool walTracker = false); Status close(); Status createCollection(const std::string& name); Status dropCollection(const std::string& name); @@ -18,6 +18,7 @@ namespace anudb { Status readAllDocuments(const std::string& collectionName, std::vector& docIds); Status exportAllToJsonAsync(const std::string& collectionName, const std::string& exportPath); Status importFromJsonFile(const std::string& collectionName, const std::string& importFile); + void registerCallback(WalOperationCallback callback); Collection* getCollection(const std::string& name); std::vector getCollectionNames() const; diff --git a/src/main.cpp b/src/main.cpp index 783c051..a7e8cc9 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -9,676 +9,726 @@ using json = nlohmann::json; // Helper function to print document void printDocument(const Document& doc) { - std::cout << "Document ID: " << doc.id() << "\nContent:\n" << doc.data().dump(4) << "\n" << std::endl; + std::cout << "Document ID: " << doc.id() << "\nContent:\n" << doc.data().dump(4) << "\n" << std::endl; } // Helper function to execute a query and print results void executeQuery(Collection* collection, const json& query, const std::string& queryName) { - std::vector docIds; - std::cout << "\n===== Executing " << queryName << " =====\n"; - - docIds = collection->findDocument(query); - - std::cout << "Found " << docIds.size() << " document(s)" << std::endl; - for (const std::string& docId : docIds) { - Document doc; - Status status = collection->readDocument(docId, doc); - if (status.ok()) { - printDocument(doc); - } - else { - std::cerr << "Failed to read document " << docId << ": " << status.message() << std::endl; - } - } + std::vector docIds; + std::cout << "\n===== Executing " << queryName << " =====\n"; + + docIds = collection->findDocument(query); + + std::cout << "Found " << docIds.size() << " document(s)" << std::endl; + for (const std::string& docId : docIds) { + Document doc; + Status status = collection->readDocument(docId, doc); + if (status.ok()) { + printDocument(doc); + } + else { + std::cerr << "Failed to read document " << docId << ": " << status.message() << std::endl; + } + } +} + +#include +// Mutex for thread-safe console output +std::mutex console_mutex; +// Get current time as formatted string +std::string GetTimestamp() { + std::stringstream ss; + auto now = std::chrono::system_clock::now(); + std::time_t now_time_t = std::chrono::system_clock::to_time_t(now); + std::tm* tm_info = std::localtime(&now_time_t); + + char time_buffer[20]; // "YYYY-MM-DD HH:MM:SS" + std::strftime(time_buffer, sizeof(time_buffer), "%Y-%m-%d %H:%M:%S", tm_info); + ss << time_buffer; + return ss.str(); +} +// WAL operation handler +void WalOperationHandler(const std::string& operation, + const std::string& cf_name, + const std::string& key, + const std::string& value) { + std::lock_guard lock(console_mutex); + if (value.find("__index__") != std::string::npos) { + return; // or continue; depending on context + } + std::cout << "[" << GetTimestamp() << "] [WAL] "; + std::cout << std::left << std::setw(10) << operation; + std::cout << " | CF: " << std::setw(15) << cf_name; + std::cout << " | Key: " << std::setw(20) << key; + + if (!value.empty()) { + // Truncate value if it's too long + std::string display_value = value; + std::cout << " | Value: " << display_value; + } + + std::cout << std::endl; } int main() { - // Database initialization - Database db("./product_db"); - Status status = db.open(); - - if (!status.ok()) { - std::cerr << "Failed to open database: " << status.message() << std::endl; - return 1; - } - std::cout << "Database opened successfully." << std::endl; - - // Create products collection - status = db.createCollection("products"); - if (!status.ok()) { - // Check if the collection already exists - if so, continue; otherwise, return - if (status.message().find("already exists") != std::string::npos) { - std::cout << "Collection 'products' already exists, continuing..." << std::endl; - } - else { - std::cerr << "Failed to create collection: " << status.message() << std::endl; - return 1; - } - } - else { - std::cout << "Collection 'products' created successfully." << std::endl; - } - - // Get the collection - Collection* products = db.getCollection("products"); - if (!products) { - std::cerr << "Failed to get collection." << std::endl; - return 1; - } - - // Create sample product documents with different structures - // Electronics product - complex nested structure - json product1 = { - {"name", "Laptop"}, - {"price", 1299.99}, - {"stock", 45}, - {"category", "Electronics"}, - {"rating", 4.7}, - {"brand", "TechMaster"}, - {"specs", { - {"processor", "i9"}, - {"ram", "32GB"}, - {"storage", "1TB SSD"} - }}, - {"tags", {"laptop", "gaming", "high-performance"}}, - {"dimensions", { - {"length", 35.8}, - {"width", 24.7}, - {"height", 1.9} - }}, - {"available", true} - }; - - // Smartphone product - array of objects structure - json product2 = { - {"name", "Smartphone"}, - {"price", 799.99}, - {"stock", 160}, - {"category", "Electronics"}, - {"rating", 4.5}, - {"brand", "MobiTech"}, - {"colors", {"Black", "Silver", "Blue"}}, - {"features", { - {"camera", "48MP"}, - {"display", "AMOLED"}, - {"battery", "5000mAh"} - }}, - {"reviews", { - { - {"user", "user123"}, - {"rating", 5}, - {"comment", "Great phone!"} - }, - { - {"user", "tech_reviewer"}, - {"rating", 4}, - {"comment", "Good performance but battery drains quickly"} - } - }}, - {"available", true} - }; - - // Book product - simple flat structure - json product3 = { - {"name", "Programming in C++"}, - {"price", 49.99}, - {"stock", 75}, - {"category", "Books"}, - {"rating", 4.2}, - {"author", "John Smith"}, - {"publisher", "Tech Books Inc"}, - {"pages", 450}, - {"isbn", "978-3-16-148410-0"}, - {"published_date", "2023-03-15"}, - {"available", true} - }; - - // Food product - another structure - json product4 = { - {"name", "Organic Coffee"}, - {"price", 15.99}, - {"stock", 200}, - {"category", "Food"}, - {"rating", 4.8}, - {"brand", "BeanMaster"}, - {"weight", "500g"}, - {"origin", "Colombia"}, - {"expiry_date", "2025-06-30"}, - {"nutritional_info", { - {"calories", 0}, - {"fat", "0g"}, - {"caffeine", "95mg per serving"} - }}, - {"available", false} - }; - - // Create document objects - Document doc1("prod001", product1); - Document doc2("prod002", product2); - Document doc3("prod003", product3); - Document doc4("prod004", product4); - - // Insert documents - std::vector documents = { doc1, doc2, doc3, doc4 }; - - for (Document& doc : documents) { - status = products->createDocument(doc); - if (!status.ok()) { - if (status.message().find("already exists") != std::string::npos) { - std::cout << "Document " << doc.id() << " already exists, updating instead..." << std::endl; - - // If document exists, update it instead - json updateDoc = { - {"$set", doc.data()} - }; - - status = products->updateDocument(doc.id(), updateDoc); - if (!status.ok()) { - std::cerr << "Failed to update existing document " << doc.id() << ": " << status.message() << std::endl; - } - else { - std::cout << "Document " << doc.id() << " updated." << std::endl; - } - } - else { - std::cerr << "Failed to create document " << doc.id() << ": " << status.message() << std::endl; - } - } - else { - std::cout << "Document " << doc.id() << " created successfully." << std::endl; - } - } - - // Create indexes - std::cout << "\n===== Creating Indexes =====\n"; - for (const auto& field : { "price", "stock", "category", "rating", "available", "name" }) { - status = products->createIndex(field); - if (!status.ok()) { - if (status.message().find("already exists") != std::string::npos) { - std::cout << "Index on '" << field << "' already exists." << std::endl; - } - else { - std::cerr << "Failed to create index on " << field << ": " << status.message() << std::endl; - } - } - else { - std::cout << "Index on '" << field << "' created successfully." << std::endl; - } - } - - // ===== Query Examples ===== - - // 1. OrderBy queries - std::cout << "\n===== OrderBy Queries =====\n"; - - json orderByPriceAsc = { - {"$orderBy", { - {"price", "asc"} - }} - }; - executeQuery(products, orderByPriceAsc, "Order By Price (Ascending)"); - - json orderByRatingDesc = { - {"$orderBy", { - {"rating", "desc"} - }} - }; - executeQuery(products, orderByRatingDesc, "Order By Rating (Descending)"); - - // 2. Equality queries - std::cout << "\n===== Equality Queries =====\n"; - - json eqCategory = { - {"$eq", { - {"category", "Electronics"} - }} - }; - executeQuery(products, eqCategory, "Equal Category: Electronics"); - - json eqAvailable = { - {"$eq", { - {"available", true} - }} - }; - executeQuery(products, eqAvailable, "Equal Available: true"); - - // 3. Greater than queries - std::cout << "\n===== Greater Than Queries =====\n"; - - json gtPrice = { - {"$gt", { - {"price", 50.0} - }} - }; - executeQuery(products, gtPrice, "Price > 50.0"); - - json gtRating = { - {"$gt", { - {"rating", 4.5} - }} - }; - executeQuery(products, gtRating, "Rating > 4.5"); - - // 4. Less than queries - std::cout << "\n===== Less Than Queries =====\n"; - - json ltStock = { - {"$lt", { - {"stock", 100} - }} - }; - executeQuery(products, ltStock, "Stock < 100"); - - json ltPrice = { - {"$lt", { - {"price", 500.0} - }} - }; - executeQuery(products, ltPrice, "Price < 500.0"); - - // 5. AND queries - std::cout << "\n===== AND Queries =====\n"; - - json andPriceRating = { - {"$and", { - {{"$gt", {{"price", 100.0}}}}, - {{"$lt", {{"price", 1000.0}}}} - }} - }; - executeQuery(products, andPriceRating, "100 < Price < 1000"); - - json andCategoryAvailable = { - {"$and", { - {{"$eq", {{"category", "Electronics"}}}}, - {{"$eq", {{"available", true}}}} - }} - }; - executeQuery(products, andCategoryAvailable, "Category = Electronics AND Available = true"); - - // 6. OR queries - std::cout << "\n===== OR Queries =====\n"; - - json orCategories = { - {"$or", { - {{"$eq", {{"category", "Books"}}}}, - {{"$eq", {{"category", "Food"}}}} - }} - }; - executeQuery(products, orCategories, "Category = Books OR Category = Food"); - - json orRatingStock = { - {"$or", { - {{"$gt", {{"rating", 4.7}}}}, - {{"$gt", {{"stock", 150}}}} - }} - }; - executeQuery(products, orRatingStock, "Rating > 4.7 OR Stock > 150"); - - // ===== Update Examples ===== - std::cout << "\n===== Update Operations =====\n"; - - // Example 1: $set - Top-level fields - std::cout << "\n----- $set Operation: Top-level Fields -----\n"; - - // Show original document - Document updateDoc1; - status = products->readDocument("prod001", updateDoc1); - if (status.ok()) { - std::cout << "Original document:" << std::endl; - printDocument(updateDoc1); - } - - json setTopLevel = { - {"$set", { - {"price", 1399.99}, - {"stock", 50}, - {"rating", 4.8}, - {"promotion", "Summer Sale"} // Add new field - }} - }; - - status = products->updateDocument("prod001", setTopLevel); - if (!status.ok()) { - std::cerr << "Failed to update document: " << status.message() << std::endl; - } - else { - std::cout << "Document updated with $set operator (top-level fields)" << std::endl; - status = products->readDocument("prod001", updateDoc1); - if (status.ok()) { - printDocument(updateDoc1); - } - } - - // Example 2: $set - Nested fields - std::cout << "\n----- $set Operation: Nested Fields -----\n"; - - json setNested = { - {"$set", { - {"specs.processor", "i9-12900K"}, - {"specs.ram", "64GB"}, - {"specs.storage", "2TB SSD"}, - }} - }; - - status = products->updateDocument("prod001", setNested); - if (!status.ok()) { - std::cerr << "Failed to update document: " << status.message() << std::endl; - } - else { - std::cout << "Document updated with $set operator (nested fields)" << std::endl; - status = products->readDocument("prod001", updateDoc1); - if (status.ok()) { - printDocument(updateDoc1); - } - } - - // Example 3: $unset - Top-level fields - std::cout << "\n----- $unset Operation: Top-level Fields -----\n"; - - json unsetTopLevel = { - {"$unset", { - {"promotion", ""}, - {"available", ""} - }} - }; - - status = products->updateDocument("prod001", unsetTopLevel); - if (!status.ok()) { - std::cerr << "Failed to update document: " << status.message() << std::endl; - } - else { - std::cout << "Document updated with $unset operator (top-level fields)" << std::endl; - status = products->readDocument("prod001", updateDoc1); - if (status.ok()) { - printDocument(updateDoc1); - } - } - - // Example 4: $unset - Nested fields - std::cout << "\n----- $unset Operation: Nested Fields -----\n"; - - json unsetNested = { - {"$unset", { - {"specs.storage", ""}, - {"dimensions.height", ""} - }} - }; - - status = products->updateDocument("prod001", unsetNested); - if (!status.ok()) { - std::cerr << "Failed to update document: " << status.message() << std::endl; - } - else { - std::cout << "Document updated with $unset operator (nested fields)" << std::endl; - status = products->readDocument("prod001", updateDoc1); - if (status.ok()) { - printDocument(updateDoc1); - } - } - - // Example 5: $push - Top-level array field - std::cout << "\n----- $push Operation: Top-level Array Fields -----\n"; - - Document updateDoc2; - status = products->readDocument("prod001", updateDoc2); - if (status.ok()) { - std::cout << "Before $push operation:" << std::endl; - printDocument(updateDoc2); - } - - json pushTopLevel = { - {"$push", { - {"tags", "limited-edition"}, - {"specs", {{"storage", "2 GB"}} } - }} - }; - - status = products->updateDocument("prod001", pushTopLevel, true); - if (!status.ok()) { - std::cerr << "Failed to update document: " << status.message() << std::endl; - } - else { - std::cout << "Document updated with $push operator" << std::endl; - status = products->readDocument("prod001", updateDoc2); - if (status.ok()) { - printDocument(updateDoc2); - } - } - - // Example 6: $pull - Top-level array field - std::cout << "\n----- $pull Operation: Top-level Array Fields -----\n"; - - json pullTopLevel = { - {"$pull", { - {"tags", "limited-edition"} - }} - }; - - status = products->updateDocument("prod001", pullTopLevel); - if (!status.ok()) { - std::cerr << "Failed to update document: " << status.message() << std::endl; - } - else { - std::cout << "Document updated with $pull operator" << std::endl; - status = products->readDocument("prod001", updateDoc2); - if (status.ok()) { - printDocument(updateDoc2); - } - } - - // Example 7: Updating smartphone product with reviews - std::cout << "\n----- Updating Smartphone Document -----\n"; - - Document smartphoneDoc; - status = products->readDocument("prod002", smartphoneDoc); - if (status.ok()) { - std::cout << "Original smartphone document:" << std::endl; - printDocument(smartphoneDoc); - } - - // Setting new element is not allowed - json setSmartphone = { - {"$set", { - {"features.waterproof", "IP68"}, - {"price", 849.99} - }} - }; - - status = products->updateDocument("prod002", setSmartphone); - if (!status.ok()) { - std::cerr << "Failed to update smartphone document: " << status.message() << std::endl; - } - else { - std::cout << "Smartphone document updated with new features" << std::endl; - status = products->readDocument("prod002", smartphoneDoc); - if (status.ok()) { - printDocument(smartphoneDoc); - } - } - - // Add a new review - json newReview = { - {"user", "mobile_fan"}, - {"rating", 5}, - {"comment", "Best smartphone I've ever owned!"} - }; - - json pushReview = { - {"$push", { - {"reviews", newReview} - }} - }; - - status = products->updateDocument("prod002", pushReview, true); - if (!status.ok()) { - std::cerr << "Failed to add review: " << status.message() << std::endl; - } - else { - std::cout << "Review added to smartphone document" << std::endl; - status = products->readDocument("prod002", smartphoneDoc); - if (status.ok()) { - printDocument(smartphoneDoc); - } - } - - // Example 8: Updating book product - std::cout << "\n----- Updating Book Document -----\n"; - - Document bookDoc; - status = products->readDocument("prod003", bookDoc); - if (status.ok()) { - std::cout << "Original book document:" << std::endl; - printDocument(bookDoc); - } - - // Update book information - json updateBook = { - {"$set", { - {"edition", "Second Edition"}, - {"price", 39.99}, - {"stock", 100} - }} - }; - - status = products->updateDocument("prod003", updateBook); - if (!status.ok()) { - std::cerr << "Failed to update book document: " << status.message() << std::endl; - } - else { - std::cout << "Book document updated" << std::endl; - status = products->readDocument("prod003", bookDoc); - if (status.ok()) { - printDocument(bookDoc); - } - } - - // Example 9: Updating food product - std::cout << "\n----- Updating Food Document -----\n"; - - Document foodDoc; - status = products->readDocument("prod004", foodDoc); - if (status.ok()) { - std::cout << "Original food document:" << std::endl; - printDocument(foodDoc); - } - - // Update food availability and add certifications - json updateFood = { - {"$set", { - {"available", true}, - {"certifications", {"Organic", "Fair Trade", "Rainforest Alliance"}} - }} - }; - - status = products->updateDocument("prod004", updateFood); - if (!status.ok()) { - std::cerr << "Failed to update food document: " << status.message() << std::endl; - } - else { - std::cout << "Food document updated" << std::endl; - status = products->readDocument("prod004", foodDoc); - if (status.ok()) { - printDocument(foodDoc); - } - } - - // Export all documents - std::cout << "\n===== Exporting Documents =====\n"; - status = db.exportAllToJsonAsync("products", "./product_export/"); - if (!status.ok()) { - std::cerr << "Failed to export documents: " << status.message() << std::endl; - } - else { - std::cout << "Documents exported successfully to ./product_export/" << std::endl; - } - products->waitForExportOperation(); - - status = db.createCollection("products_import"); - if (!status.ok()) { - // Check if the collection already exists - if so, continue; otherwise, return - if (status.message().find("already exists") != std::string::npos) { - std::cout << "Collection 'products' already exists, continuing..." << std::endl; - } - else { - std::cerr << "Failed to create collection: " << status.message() << std::endl; - return 1; - } - } - else { - std::cout << "Collection 'products_import' created successfully." << std::endl; - } - - status = db.importFromJsonFile("products_import", "./product_export/products.json"); - if (!status.ok()) { - std::cerr << "Failed to import documents: " << status.message() << std::endl; - } - else { - std::cout << "Documents imported successfully to products_import collection" << std::endl; - } - - std::vector docIds; - status = db.readAllDocuments("products_import", docIds); - if (!status.ok()) { - std::cerr << "Failed to import documents: " << status.message() << std::endl; - } - else { - std::cout << "Documents imported successfully to products_import collection" << std::endl; - } - - for (Document doc : docIds) { - std::cout << doc.data().dump(4) << std::endl; - } - // Clean up - Delete documents - std::cout << "\n===== Cleanup Operations =====\n"; - - // Delete a document - std::string docIdToDelete = "prod004"; - status = products->deleteDocument(docIdToDelete); - if (!status.ok()) { - std::cerr << "Failed to delete document " << docIdToDelete << ": " << status.message() << std::endl; - } - else { - std::cout << "Document " << docIdToDelete << " deleted successfully." << std::endl; - } - - // Delete indexes - std::vector indexesToDelete = { "stock", "rating", "available" }; - for (const auto& indexName : indexesToDelete) { - status = products->deleteIndex(indexName); - if (!status.ok()) { - std::cerr << "Failed to delete index on " << indexName << ": " << status.message() << std::endl; - } - else { - std::cout << "Index on " << indexName << " deleted successfully." << std::endl; - } - } - - // List remaining collections - std::cout << "\n===== Collections in Database =====\n"; - for (const auto& name : db.getCollectionNames()) { - std::cout << "- " << name << std::endl; - } - - // Drop collection - std::string collectionToDrop = "products"; - std::cout << "\nDropping collection '" << collectionToDrop << "'" << std::endl; - status = db.dropCollection(collectionToDrop); - if (!status.ok()) { - std::cerr << "Failed to drop collection: " << status.message() << std::endl; - } - else { - std::cout << "Collection dropped successfully." << std::endl; - } - - // Close database - status = db.close(); - if (!status.ok()) { - std::cerr << "Failed to close database: " << status.message() << std::endl; - return 1; - } - - std::cout << "\nDatabase closed successfully." << std::endl; - std::cout << "C++ version: " << __cplusplus << std::endl; - - return 0; -} \ No newline at end of file + bool walTracker = true; + // Database initialization + Database db("./product_db"); + Status status = db.open(walTracker); + if (!status.ok()) { + std::cerr << "Failed to open database: " << status.message() << std::endl; + return 1; + } + std::cout << "Database opened successfully." << std::endl; + + if (walTracker) { + db.registerCallback(WalOperationHandler); + } + + // Create products collection + status = db.createCollection("products"); + if (!status.ok()) { + // Check if the collection already exists - if so, continue; otherwise, return + if (status.message().find("already exists") != std::string::npos) { + std::cout << "Collection 'products' already exists, continuing..." << std::endl; + } + else { + std::cerr << "Failed to create collection: " << status.message() << std::endl; + return 1; + } + } + else { + std::cout << "Collection 'products' created successfully." << std::endl; + } + + // Get the collection + Collection* products = db.getCollection("products"); + if (!products) { + std::cerr << "Failed to get collection." << std::endl; + return 1; + } + + // Create sample product documents with different structures + // Electronics product - complex nested structure + json product1 = { + {"name", "Laptop"}, + {"price", 1299.99}, + {"stock", 45}, + {"category", "Electronics"}, + {"rating", 4.7}, + {"brand", "TechMaster"}, + {"specs", { + {"processor", "i9"}, + {"ram", "32GB"}, + {"storage", "1TB SSD"} + }}, + {"tags", {"laptop", "gaming", "high-performance"}}, + {"dimensions", { + {"length", 35.8}, + {"width", 24.7}, + {"height", 1.9} + }}, + {"available", true} + }; + + // Smartphone product - array of objects structure + json product2 = { + {"name", "Smartphone"}, + {"price", 799.99}, + {"stock", 160}, + {"category", "Electronics"}, + {"rating", 4.5}, + {"brand", "MobiTech"}, + {"colors", {"Black", "Silver", "Blue"}}, + {"features", { + {"camera", "48MP"}, + {"display", "AMOLED"}, + {"battery", "5000mAh"} + }}, + {"reviews", { + { + {"user", "user123"}, + {"rating", 5}, + {"comment", "Great phone!"} + }, + { + {"user", "tech_reviewer"}, + {"rating", 4}, + {"comment", "Good performance but battery drains quickly"} + } + }}, + {"available", true} + }; + + // Book product - simple flat structure + json product3 = { + {"name", "Programming in C++"}, + {"price", 49.99}, + {"stock", 75}, + {"category", "Books"}, + {"rating", 4.2}, + {"author", "John Smith"}, + {"publisher", "Tech Books Inc"}, + {"pages", 450}, + {"isbn", "978-3-16-148410-0"}, + {"published_date", "2023-03-15"}, + {"available", true} + }; + + // Food product - another structure + json product4 = { + {"name", "Organic Coffee"}, + {"price", 15.99}, + {"stock", 200}, + {"category", "Food"}, + {"rating", 4.8}, + {"brand", "BeanMaster"}, + {"weight", "500g"}, + {"origin", "Colombia"}, + {"expiry_date", "2025-06-30"}, + {"nutritional_info", { + {"calories", 0}, + {"fat", "0g"}, + {"caffeine", "95mg per serving"} + }}, + {"available", false} + }; + + // Create document objects + Document doc1("prod001", product1); + Document doc2("prod002", product2); + Document doc3("prod003", product3); + Document doc4("prod004", product4); + + // Insert documents + std::vector documents = { doc1, doc2, doc3, doc4 }; + + for (Document& doc : documents) { + status = products->createDocument(doc); + if (!status.ok()) { + if (status.message().find("already exists") != std::string::npos) { + std::cout << "Document " << doc.id() << " already exists, updating instead..." << std::endl; + + // If document exists, update it instead + json updateDoc = { + {"$set", doc.data()} + }; + + status = products->updateDocument(doc.id(), updateDoc); + if (!status.ok()) { + std::cerr << "Failed to update existing document " << doc.id() << ": " << status.message() << std::endl; + } + else { + std::cout << "Document " << doc.id() << " updated." << std::endl; + } + } + else { + std::cerr << "Failed to create document " << doc.id() << ": " << status.message() << std::endl; + } + } + else { + std::cout << "Document " << doc.id() << " created successfully." << std::endl; + } + } + + // Create indexes + std::cout << "\n===== Creating Indexes =====\n"; + for (const auto& field : { "price", "stock", "category", "rating", "available", "name" }) { + status = products->createIndex(field); + if (!status.ok()) { + if (status.message().find("already exists") != std::string::npos) { + std::cout << "Index on '" << field << "' already exists." << std::endl; + } + else { + std::cerr << "Failed to create index on " << field << ": " << status.message() << std::endl; + } + } + else { + std::cout << "Index on '" << field << "' created successfully." << std::endl; + } + } + + // ===== Query Examples ===== + + // 1. OrderBy queries + std::cout << "\n===== OrderBy Queries =====\n"; + + json orderByPriceAsc = { + {"$orderBy", { + {"price", "asc"} + }} + }; + executeQuery(products, orderByPriceAsc, "Order By Price (Ascending)"); + + json orderByRatingDesc = { + {"$orderBy", { + {"rating", "desc"} + }} + }; + executeQuery(products, orderByRatingDesc, "Order By Rating (Descending)"); + + // 2. Equality queries + std::cout << "\n===== Equality Queries =====\n"; + + json eqCategory = { + {"$eq", { + {"category", "Electronics"} + }} + }; + executeQuery(products, eqCategory, "Equal Category: Electronics"); + + json eqAvailable = { + {"$eq", { + {"available", true} + }} + }; + executeQuery(products, eqAvailable, "Equal Available: true"); + + // 3. Greater than queries + std::cout << "\n===== Greater Than Queries =====\n"; + + json gtPrice = { + {"$gt", { + {"price", 50.0} + }} + }; + executeQuery(products, gtPrice, "Price > 50.0"); + + json gtRating = { + {"$gt", { + {"rating", 4.5} + }} + }; + executeQuery(products, gtRating, "Rating > 4.5"); + + // 4. Less than queries + std::cout << "\n===== Less Than Queries =====\n"; + + json ltStock = { + {"$lt", { + {"stock", 100} + }} + }; + executeQuery(products, ltStock, "Stock < 100"); + + json ltPrice = { + {"$lt", { + {"price", 500.0} + }} + }; + executeQuery(products, ltPrice, "Price < 500.0"); + + // 5. AND queries + std::cout << "\n===== AND Queries =====\n"; + + json andPriceRating = { + {"$and", { + {{"$gt", {{"price", 100.0}}}}, + {{"$lt", {{"price", 1000.0}}}} + }} + }; + executeQuery(products, andPriceRating, "100 < Price < 1000"); + + json andCategoryAvailable = { + {"$and", { + {{"$eq", {{"category", "Electronics"}}}}, + {{"$eq", {{"available", true}}}} + }} + }; + executeQuery(products, andCategoryAvailable, "Category = Electronics AND Available = true"); + + // 6. OR queries + std::cout << "\n===== OR Queries =====\n"; + + json orCategories = { + {"$or", { + {{"$eq", {{"category", "Books"}}}}, + {{"$eq", {{"category", "Food"}}}} + }} + }; + executeQuery(products, orCategories, "Category = Books OR Category = Food"); + + json orRatingStock = { + {"$or", { + {{"$gt", {{"rating", 4.7}}}}, + {{"$gt", {{"stock", 150}}}} + }} + }; + executeQuery(products, orRatingStock, "Rating > 4.7 OR Stock > 150"); + + // ===== Update Examples ===== + std::cout << "\n===== Update Operations =====\n"; + + // Example 1: $set - Top-level fields + std::cout << "\n----- $set Operation: Top-level Fields -----\n"; + + // Show original document + Document updateDoc1; + status = products->readDocument("prod001", updateDoc1); + if (status.ok()) { + std::cout << "Original document:" << std::endl; + printDocument(updateDoc1); + } + + json setTopLevel = { + {"$set", { + {"price", 1399.99}, + {"stock", 50}, + {"rating", 4.8}, + {"promotion", "Summer Sale"} // Add new field + }} + }; + + status = products->updateDocument("prod001", setTopLevel); + if (!status.ok()) { + std::cerr << "Failed to update document: " << status.message() << std::endl; + } + else { + std::cout << "Document updated with $set operator (top-level fields)" << std::endl; + status = products->readDocument("prod001", updateDoc1); + if (status.ok()) { + printDocument(updateDoc1); + } + } + + // Example 2: $set - Nested fields + std::cout << "\n----- $set Operation: Nested Fields -----\n"; + + json setNested = { + {"$set", { + {"specs.processor", "i9-12900K"}, + {"specs.ram", "64GB"}, + {"specs.storage", "2TB SSD"}, + }} + }; + + status = products->updateDocument("prod001", setNested); + if (!status.ok()) { + std::cerr << "Failed to update document: " << status.message() << std::endl; + } + else { + std::cout << "Document updated with $set operator (nested fields)" << std::endl; + status = products->readDocument("prod001", updateDoc1); + if (status.ok()) { + printDocument(updateDoc1); + } + } + + // Example 3: $unset - Top-level fields + std::cout << "\n----- $unset Operation: Top-level Fields -----\n"; + + json unsetTopLevel = { + {"$unset", { + {"promotion", ""}, + {"available", ""} + }} + }; + + status = products->updateDocument("prod001", unsetTopLevel); + if (!status.ok()) { + std::cerr << "Failed to update document: " << status.message() << std::endl; + } + else { + std::cout << "Document updated with $unset operator (top-level fields)" << std::endl; + status = products->readDocument("prod001", updateDoc1); + if (status.ok()) { + printDocument(updateDoc1); + } + } + + // Example 4: $unset - Nested fields + std::cout << "\n----- $unset Operation: Nested Fields -----\n"; + + json unsetNested = { + {"$unset", { + {"specs.storage", ""}, + {"dimensions.height", ""} + }} + }; + + status = products->updateDocument("prod001", unsetNested); + if (!status.ok()) { + std::cerr << "Failed to update document: " << status.message() << std::endl; + } + else { + std::cout << "Document updated with $unset operator (nested fields)" << std::endl; + status = products->readDocument("prod001", updateDoc1); + if (status.ok()) { + printDocument(updateDoc1); + } + } + + // Example 5: $push - Top-level array field + std::cout << "\n----- $push Operation: Top-level Array Fields -----\n"; + + Document updateDoc2; + status = products->readDocument("prod001", updateDoc2); + if (status.ok()) { + std::cout << "Before $push operation:" << std::endl; + printDocument(updateDoc2); + } + + json pushTopLevel = { + {"$push", { + {"tags", "limited-edition"}, + {"specs", {{"storage", "2 GB"}} } + }} + }; + + status = products->updateDocument("prod001", pushTopLevel, true); + if (!status.ok()) { + std::cerr << "Failed to update document: " << status.message() << std::endl; + } + else { + std::cout << "Document updated with $push operator" << std::endl; + status = products->readDocument("prod001", updateDoc2); + if (status.ok()) { + printDocument(updateDoc2); + } + } + + // Example 6: $pull - Top-level array field + std::cout << "\n----- $pull Operation: Top-level Array Fields -----\n"; + + json pullTopLevel = { + {"$pull", { + {"tags", "limited-edition"} + }} + }; + + status = products->updateDocument("prod001", pullTopLevel); + if (!status.ok()) { + std::cerr << "Failed to update document: " << status.message() << std::endl; + } + else { + std::cout << "Document updated with $pull operator" << std::endl; + status = products->readDocument("prod001", updateDoc2); + if (status.ok()) { + printDocument(updateDoc2); + } + } + + // Example 7: Updating smartphone product with reviews + std::cout << "\n----- Updating Smartphone Document -----\n"; + + Document smartphoneDoc; + status = products->readDocument("prod002", smartphoneDoc); + if (status.ok()) { + std::cout << "Original smartphone document:" << std::endl; + printDocument(smartphoneDoc); + } + + // Setting new element is not allowed + json setSmartphone = { + {"$set", { + {"features.waterproof", "IP68"}, + {"price", 849.99} + }} + }; + + status = products->updateDocument("prod002", setSmartphone); + if (!status.ok()) { + std::cerr << "Failed to update smartphone document: " << status.message() << std::endl; + } + else { + std::cout << "Smartphone document updated with new features" << std::endl; + status = products->readDocument("prod002", smartphoneDoc); + if (status.ok()) { + printDocument(smartphoneDoc); + } + } + + // Add a new review + json newReview = { + {"user", "mobile_fan"}, + {"rating", 5}, + {"comment", "Best smartphone I've ever owned!"} + }; + + json pushReview = { + {"$push", { + {"reviews", newReview} + }} + }; + + status = products->updateDocument("prod002", pushReview, true); + if (!status.ok()) { + std::cerr << "Failed to add review: " << status.message() << std::endl; + } + else { + std::cout << "Review added to smartphone document" << std::endl; + status = products->readDocument("prod002", smartphoneDoc); + if (status.ok()) { + printDocument(smartphoneDoc); + } + } + + // Example 8: Updating book product + std::cout << "\n----- Updating Book Document -----\n"; + + Document bookDoc; + status = products->readDocument("prod003", bookDoc); + if (status.ok()) { + std::cout << "Original book document:" << std::endl; + printDocument(bookDoc); + } + + // Update book information + json updateBook = { + {"$set", { + {"edition", "Second Edition"}, + {"price", 39.99}, + {"stock", 100} + }} + }; + + status = products->updateDocument("prod003", updateBook); + if (!status.ok()) { + std::cerr << "Failed to update book document: " << status.message() << std::endl; + } + else { + std::cout << "Book document updated" << std::endl; + status = products->readDocument("prod003", bookDoc); + if (status.ok()) { + printDocument(bookDoc); + } + } + + // Example 9: Updating food product + std::cout << "\n----- Updating Food Document -----\n"; + + Document foodDoc; + status = products->readDocument("prod004", foodDoc); + if (status.ok()) { + std::cout << "Original food document:" << std::endl; + printDocument(foodDoc); + } + + // Update food availability and add certifications + json updateFood = { + {"$set", { + {"available", true}, + {"certifications", {"Organic", "Fair Trade", "Rainforest Alliance"}} + }} + }; + + status = products->updateDocument("prod004", updateFood); + if (!status.ok()) { + std::cerr << "Failed to update food document: " << status.message() << std::endl; + } + else { + std::cout << "Food document updated" << std::endl; + status = products->readDocument("prod004", foodDoc); + if (status.ok()) { + printDocument(foodDoc); + } + } + + // Export all documents + std::cout << "\n===== Exporting Documents =====\n"; + status = db.exportAllToJsonAsync("products", "./product_export/"); + if (!status.ok()) { + std::cerr << "Failed to export documents: " << status.message() << std::endl; + } + else { + std::cout << "Documents exported successfully to ./product_export/" << std::endl; + } + products->waitForExportOperation(); + + status = db.createCollection("products_import"); + if (!status.ok()) { + // Check if the collection already exists - if so, continue; otherwise, return + if (status.message().find("already exists") != std::string::npos) { + std::cout << "Collection 'products' already exists, continuing..." << std::endl; + } + else { + std::cerr << "Failed to create collection: " << status.message() << std::endl; + return 1; + } + } + else { + std::cout << "Collection 'products_import' created successfully." << std::endl; + } + + status = db.importFromJsonFile("products_import", "./product_export/products.json"); + if (!status.ok()) { + std::cerr << "Failed to import documents: " << status.message() << std::endl; + } + else { + std::cout << "Documents imported successfully to products_import collection" << std::endl; + } + + std::vector docIds; + status = db.readAllDocuments("products_import", docIds); + if (!status.ok()) { + std::cerr << "Failed to import documents: " << status.message() << std::endl; + } + else { + std::cout << "Documents imported successfully to products_import collection" << std::endl; + } + + for (Document doc : docIds) { + std::cout << doc.data().dump(4) << std::endl; + } + // Clean up - Delete documents + std::cout << "\n===== Cleanup Operations =====\n"; + + // Delete a document + std::string docIdToDelete = "prod004"; + status = products->deleteDocument(docIdToDelete); + if (!status.ok()) { + std::cerr << "Failed to delete document " << docIdToDelete << ": " << status.message() << std::endl; + } + else { + std::cout << "Document " << docIdToDelete << " deleted successfully." << std::endl; + } + + // Delete indexes + std::vector indexesToDelete = { "stock", "rating", "available" }; + for (const auto& indexName : indexesToDelete) { + status = products->deleteIndex(indexName); + if (!status.ok()) { + std::cerr << "Failed to delete index on " << indexName << ": " << status.message() << std::endl; + } + else { + std::cout << "Index on " << indexName << " deleted successfully." << std::endl; + } + } + + // List remaining collections + std::cout << "\n===== Collections in Database =====\n"; + for (const auto& name : db.getCollectionNames()) { + std::cout << "- " << name << std::endl; + } + + // Drop collection + std::string collectionToDrop = "products"; + std::cout << "\nDropping collection '" << collectionToDrop << "'" << std::endl; + status = db.dropCollection(collectionToDrop); + if (!status.ok()) { + std::cerr << "Failed to drop collection: " << status.message() << std::endl; + } + else { + std::cout << "Collection dropped successfully." << std::endl; + } + + collectionToDrop = "products_import"; + std::cout << "\nDropping collection '" << collectionToDrop << "'" << std::endl; + status = db.dropCollection(collectionToDrop); + if (!status.ok()) { + std::cerr << "Failed to drop collection: " << status.message() << std::endl; + } + else { + std::cout << "Collection dropped successfully." << std::endl; + } + // Close database + status = db.close(); + if (!status.ok()) { + std::cerr << "Failed to close database: " << status.message() << std::endl; + return 1; + } + std::cout << "\nDatabase closed successfully." << std::endl; + std::cout << "C++ version: " << __cplusplus << std::endl; + + return 0; +} diff --git a/src/storage_engine/CMakeLists.txt b/src/storage_engine/CMakeLists.txt index 35fa29d..4717593 100644 --- a/src/storage_engine/CMakeLists.txt +++ b/src/storage_engine/CMakeLists.txt @@ -1,6 +1,8 @@ set(storage_engine_SRCS StorageEngine.h StorageEngine.cpp + WalTracker.h + WalTracker.cpp ${CMAKE_SOURCE_DIR}/third_party/rocksdb/include ) diff --git a/src/storage_engine/StorageEngine.cpp b/src/storage_engine/StorageEngine.cpp index bd991cb..cfedeaa 100644 --- a/src/storage_engine/StorageEngine.cpp +++ b/src/storage_engine/StorageEngine.cpp @@ -2,7 +2,7 @@ using namespace anudb; -Status StorageEngine::open() { +Status StorageEngine::open(bool walTracker) { rocksdb::Options options; RocksDBOptimizer::EmbeddedConfig config; @@ -66,7 +66,7 @@ Status StorageEngine::open() { for (const auto& cf : columnFamilies) { columnFamilyDescriptors.emplace_back(cf, rocksdb::ColumnFamilyOptions()); } - + wal_tracker_ = walTracker; // Open the database with column families std::vector handles; rocksdb::DB* dbRaw; @@ -85,9 +85,18 @@ Status StorageEngine::open() { if (db_) db_->DestroyColumnFamilyHandle(h); }); } + if (wal_tracker_) { + // Create column family ID to name mapping for WAL tracker + for (auto* handle : handles) { + cf_id_to_name_[handle->GetID()] = handle->GetName(); + } + + // Create WAL tracker + waltracker_ = new WalTracker(db_, cf_id_to_name_); + waltracker_->StartTracking(); + } // Print estimated memory usage size_t estimated_mem = RocksDBOptimizer::estimateMemoryUsage(config); - //std::cout << "Estimated memory usage by storage engine: " << (estimated_mem >> 20) << "MB\n"; return Status::OK(); } @@ -113,6 +122,16 @@ Status StorageEngine::close() { // Clear the ownership vector which will destroy all handles properly ownedHandles_.clear(); + if (wal_tracker_) { + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + // stop waltracker + waltracker_->StopTracking(); + if (waltracker_) { + delete waltracker_; + waltracker_ = NULL; + } + } + // Close and reset the DB if (db_) { rocksdb::Status s = db_->Close(); @@ -128,6 +147,10 @@ Status StorageEngine::close() { return Status::OK(); } +void StorageEngine::registerCallback(WalOperationCallback callback) { + waltracker_->RegisterCallback(callback); +} + Status StorageEngine::createCollection(const std::string& name) { // Check if collection already exists if (columnFamilies_.find(name) != columnFamilies_.end()) { @@ -142,6 +165,22 @@ Status StorageEngine::createCollection(const std::string& name) { return Status::IOError(s.ToString()); } + if (wal_tracker_ /* && handle->GetName().find(index_delimiter_) == std::string::npos*/) { + waltracker_->UpdateColumnFamilyMap(handle->GetID(), handle->GetName()); + // Wrap the event string inside a "data" key + std::string event_string = "CREATE_CF:" + handle->GetName(); + json wrapped = { + {"data", event_string} + }; + // Serialize to MessagePack + std::vector msgpack_data = json::to_msgpack(wrapped); + // Insert into RocksDB + rocksdb::WriteBatch batch; + batch.Put("__meta__event", rocksdb::Slice(reinterpret_cast(msgpack_data.data()), msgpack_data.size())); + // Write to DB + db_->Write(rocksdb::WriteOptions(), &batch); + } + // Store the handle columnFamilies_[name] = handle; ownedHandles_.emplace_back(handle, [this](rocksdb::ColumnFamilyHandle* h) { @@ -164,6 +203,23 @@ Status StorageEngine::dropCollection(const std::string& name) { if (!s.ok()) { return Status::IOError(s.ToString()); } + + if (wal_tracker_ /*&& handle->GetName().find(index_delimiter_) == std::string::npos*/) { + waltracker_->DeleteColumnFamilyMap(handle->GetID(), handle->GetName()); + // Wrap the event string inside a "data" key + std::string event_string = "DELETE_CF:" + handle->GetName(); + json wrapped = { + {"data", event_string} + }; + // Serialize to MessagePack + std::vector msgpack_data = json::to_msgpack(wrapped); + // Insert into RocksDB + rocksdb::WriteBatch batch; + batch.Put("__meta__event", rocksdb::Slice(reinterpret_cast(msgpack_data.data()), msgpack_data.size())); + // Write to DB + db_->Write(rocksdb::WriteOptions(), &batch); + } + // Remove from our map columnFamilies_.erase(it); return Status::OK(); diff --git a/src/storage_engine/StorageEngine.h b/src/storage_engine/StorageEngine.h index bf15346..39c5970 100644 --- a/src/storage_engine/StorageEngine.h +++ b/src/storage_engine/StorageEngine.h @@ -4,6 +4,7 @@ #include "json.hpp" #include "Status.h" +#include "WalTracker.h" #include "rocksdb/db.h" #include "rocksdb/table.h" @@ -32,7 +33,7 @@ namespace anudb { class StorageEngine { public: StorageEngine(const std::string& dbPath) : dbPath_(dbPath), index_delimiter_("__index__"){} - Status open(); + Status open(bool WalTracker = false); Status close(); Status createCollection(const std::string& name); @@ -52,6 +53,7 @@ namespace anudb { Status fetchDocIdsForLesser(const std::string& collection, const std::string& prefix, std::vector& docIds) const; Status fetchDocIdsByOrder(const std::string& collection, const std::string& key, std::vector& docIds) const; rocksdb::DB* getDB(); + void registerCallback(WalOperationCallback callback); virtual ~StorageEngine(); // Decode a Big-Endian encoded integer from a RocksDB key static int64_t DecodeIntKey(const std::string& encoded) { @@ -73,6 +75,9 @@ namespace anudb { std::unordered_map columnFamilies_; std::vector>> ownedHandles_; std::string index_delimiter_; + bool wal_tracker_; + WalTracker* waltracker_; + std::unordered_map cf_id_to_name_; //mutable std::mutex db_mutex_; }; diff --git a/src/storage_engine/WalTracker.cpp b/src/storage_engine/WalTracker.cpp new file mode 100644 index 0000000..4132485 --- /dev/null +++ b/src/storage_engine/WalTracker.cpp @@ -0,0 +1,180 @@ +// WalTracker.cpp +#include "WalTracker.h" + +using namespace anudb; +// Constructor +WalTracker::WalTracker(rocksdb::DB* db, + const std::unordered_map& cf_map) + : db_(db), + cf_id_to_name_(cf_map), + should_stop_(false), + is_tracking_(false), + current_sequence_(0) { + // Initialize column family mapping if empty + if (cf_id_to_name_.empty()) { + // Default column family always has ID 0 + cf_id_to_name_[0] = "default"; + + // Try to get more column families from DB path + std::vector cf_names; + rocksdb::Status s = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(), + db_->GetName(), + &cf_names); + } +} + +void WalTracker::UpdateColumnFamilyMap(uint32_t id, const std::string& name) { + if (name.find("__index__") != std::string::npos) { + return; + } + cf_id_to_name_[id] = name; +} + +void WalTracker::DeleteColumnFamilyMap(uint32_t id, const std::string& name) { + if (name.find("__index__") != std::string::npos) { + return; + } + cf_id_to_name_.erase(id); +} + +// Destructor +WalTracker::~WalTracker() { + StopTracking(); +} + +// Start tracking WAL operations +void WalTracker::StartTracking() { + if (is_tracking_) { + return; // Already tracking + } + + should_stop_ = false; + is_tracking_ = true; + + // Create and start tracking thread + tracking_thread_ = std::make_unique(&WalTracker::TrackingThread, this); +} + +// Stop tracking WAL operations +void WalTracker::StopTracking() { + if (!is_tracking_) { + return; // Not tracking + } + + // Signal thread to stop and wait for it + should_stop_ = true; + if (tracking_thread_ && tracking_thread_->joinable()) { + tracking_thread_->join(); + } + + is_tracking_ = false; +} + +// Register a callback for WAL operations +void WalTracker::RegisterCallback(WalOperationCallback callback) { + callback_ = callback; +} + +// Check if tracking is active +bool WalTracker::IsTracking() const { + return is_tracking_; +} + +// Get current sequence number +uint64_t WalTracker::GetCurrentSequence() const { + return current_sequence_; +} + +// Thread function for monitoring WAL +void WalTracker::TrackingThread() { + // Start from the current sequence number of the database + uint64_t last_sequence = 0; + + // First read historical WAL entries + ReadWalLogs(last_sequence); + + // Then monitor for new entries + while (!should_stop_) { + uint64_t current_seq = db_->GetLatestSequenceNumber(); + current_sequence_ = current_seq; // Update current sequence + + if (last_sequence < current_seq) { + ReadWalLogs(last_sequence); + } + else { + // No new sequences, wait a bit + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + } +} + +// Read WAL logs from a sequence number +bool WalTracker::ReadWalLogs(uint64_t& last_sequence) { + std::unique_ptr iter; + rocksdb::Status status = db_->GetUpdatesSince(last_sequence, &iter); + + if (!status.ok()) { + return false; + } + + bool has_new_entries = false; + + while (iter->Valid()) { + const auto& result = iter->GetBatch(); + last_sequence = result.sequence + 1; + + // Process WAL entries if callback is registered + if (callback_) { + WalLogHandler handler(cf_id_to_name_, callback_); + result.writeBatchPtr->Iterate(&handler); + } + + iter->Next(); + has_new_entries = true; + } + + return has_new_entries; +} + +// WalLogHandler implementation +WalTracker::WalLogHandler::WalLogHandler( + std::unordered_map& cf_map, + WalOperationCallback callback) + : cf_id_to_name(cf_map), callback(callback) {} + +rocksdb::Status WalTracker::WalLogHandler::PutCF( + uint32_t column_family_id, + const rocksdb::Slice& key, + const rocksdb::Slice& value) { + std::string cf_name = GetCFName(column_family_id); + if (cf_name == "unknown") { + return rocksdb::Status::OK(); + } + + if (callback) { + std::vector value_vec(value.data(), value.data() + value.size()); + callback("PUT", cf_name, key.ToString(), json::from_msgpack(value_vec)["data"].dump()); + } + + return rocksdb::Status::OK(); +} + +rocksdb::Status WalTracker::WalLogHandler::DeleteCF( + uint32_t column_family_id, + const rocksdb::Slice& key) { + std::string cf_name = GetCFName(column_family_id); + if (cf_name == "unknown") { + return rocksdb::Status::OK(); + } + + if (callback) { + callback("DELETE", cf_name, key.ToString(), ""); + } + + return rocksdb::Status::OK(); +} + +std::string WalTracker::WalLogHandler::GetCFName(uint32_t id) const { + auto it = cf_id_to_name.find(id); + return it != cf_id_to_name.end() ? it->second : "unknown"; +} diff --git a/src/storage_engine/WalTracker.h b/src/storage_engine/WalTracker.h new file mode 100644 index 0000000..d97f9ab --- /dev/null +++ b/src/storage_engine/WalTracker.h @@ -0,0 +1,108 @@ +// WalTracker.h +#ifndef WAL_TRACKER_H +#define WAL_TRACKER_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#ifdef _WIN32 +#include +#pragma comment(lib, "ws2_32.lib") +extern "C" uintptr_t __cdecl _beginthreadex(void*, unsigned int, + unsigned int(__stdcall*)(void*), void*, unsigned int, unsigned int*); +#endif +#include "json.hpp" +using json = nlohmann::json; +#ifdef MAKE_UNIQUE +#include + +namespace std { + template + std::unique_ptr make_unique(Args&&... args) { + return std::unique_ptr(new T(std::forward(args)...)); + } +} +#endif +namespace anudb { + // Callback type for WAL operations + using WalOperationCallback = std::function; + + class WalTracker { + public: + // Constructor - takes a RocksDB instance and optional CF mapping + WalTracker(rocksdb::DB* db, + const std::unordered_map& cf_map); + + // Destructor - stops tracking thread + ~WalTracker(); + + // Start tracking WAL operations + void StartTracking(); + + // Stop tracking WAL operations + void StopTracking(); + + // Register a callback for WAL operations + void RegisterCallback(WalOperationCallback callback); + + // Check if tracking is active + bool IsTracking() const; + + // Get current sequence number + uint64_t GetCurrentSequence() const; + + // Update column family mapping + void UpdateColumnFamilyMap(uint32_t id, const std::string& name); + + // Delete id from column family mapping + void DeleteColumnFamilyMap(uint32_t id, const std::string& name); + + private: + // Handler for WAL operations + class WalLogHandler : public rocksdb::WriteBatch::Handler { + public: + WalLogHandler(std::unordered_map& cf_map, + WalOperationCallback callback); + + rocksdb::Status PutCF(uint32_t column_family_id, + const rocksdb::Slice& key, + const rocksdb::Slice& value) override; + + rocksdb::Status DeleteCF(uint32_t column_family_id, + const rocksdb::Slice& key) override; + + private: + std::unordered_map& cf_id_to_name; + WalOperationCallback callback; + + std::string GetCFName(uint32_t id) const; + }; + + // Thread function for monitoring WAL + void TrackingThread(); + + // Read WAL logs from a sequence number + bool ReadWalLogs(uint64_t& last_sequence); + + // Member variables + rocksdb::DB* db_; + std::unordered_map cf_id_to_name_; + std::unique_ptr tracking_thread_; + std::atomic should_stop_; + std::atomic is_tracking_; + WalOperationCallback callback_; + uint64_t current_sequence_; + }; +}; +#endif // WAL_TRACKER_H