Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ set(WITH_FUZZER OFF CACHE BOOL "Disable fuzzing tools" FORCE)
add_subdirectory(third_party/rocksdb)
# Add storage engine
add_subdirectory(src/storage_engine)
if (CMAKE_SYSTEM_NAME STREQUAL "Linux")
target_compile_options(rocksdb PRIVATE -frtti)
target_compile_options(libstorage PRIVATE -frtti)
endif()

set(LIBRARY_SOURCES ${CMAKE_SOURCE_DIR}/src/Cursor.cpp ${CMAKE_SOURCE_DIR}/src/Database.cpp ${CMAKE_SOURCE_DIR}/src/Collection.cpp ${CMAKE_SOURCE_DIR}/src/Document.cpp)

Expand Down
86 changes: 86 additions & 0 deletions src/Collection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,7 @@ Status Collection::deleteIfIndexFieldExists(const Document& doc, const std::stri
return engine_->remove(getIndexCfName(index), key.str());
}

#if 0
Status Collection::importFromJsonFile(const std::string& filePath) {
try
{
Expand Down Expand Up @@ -511,6 +512,91 @@ Status Collection::importFromJsonFile(const std::string& filePath) {
return Status::InternalError("JSON import error: " + std::string(e.what()));
}
}
#else
Status Collection::importFromJsonFile(const std::string& filePath) {
try {
std::ifstream file(filePath, std::ios::binary);
if (!file.is_open()) {
return Status::IOError("Could not open file: " + filePath);
}

int successCount = 0;
int failureCount = 0;

// Create parser in a scope to ensure cleanup
{
MemoryEfficientJsonParser parser(file);

std::string objectStr;
while (!(objectStr = parser.extractNextObject()).empty()) {
try {
// Parse in a nested scope for immediate cleanup
{
json item = json::parse(objectStr);

// Clear immediately after parsing
objectStr.clear();
objectStr.shrink_to_fit();

if (!item.is_object()) {
failureCount++;
continue;
}

std::string docId;
if (item.contains("_id")) {
docId = item["_id"].get<std::string>();
}
else {
docId = "doc_" + std::to_string(successCount + failureCount);
}

// Create document in nested scope
{
Document doc(docId, item);
Status status = createDocument(doc);
if (status.ok()) {
successCount++;
}
else {
failureCount++;
std::cerr << "Failed to import document " << docId << ": "
<< status.message() << std::endl;
}
} // doc destroyed here
} // item destroyed here

}
catch (const json::parse_error& e) {
failureCount++;
objectStr.clear();
objectStr.shrink_to_fit();
continue;
}
}

if (!parser.isArrayStarted()) {
return Status::NotSupported("File must contain a JSON array of objects: " + filePath);
}
} // parser destroyed here, buffer memory freed

if (successCount == 0 && failureCount == 0) {
return Status::NotSupported("No valid JSON objects found in array: " + filePath);
}

std::cout << "JSON Array Import Summary:\n"
<< "File: " << filePath << "\n"
<< "Successfully imported: " << successCount << " documents\n"
<< "Failed to import: " << failureCount << " documents" << std::endl;

return Status::OK();

}
catch (const std::exception& e) {
return Status::InternalError("JSON import error: " + std::string(e.what()));
}
}
#endif

Status Collection::exportAllToJsonAsync(const std::string& exportPath) {
ExportTask task(engine_, name_, exportPath);
Expand Down
213 changes: 201 additions & 12 deletions src/Collection.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,6 @@
extern "C" uintptr_t __cdecl _beginthreadex(void*, unsigned int,
unsigned int(__stdcall*)(void*), void*, unsigned int, unsigned int*);
#endif
#ifdef MAKE_UNIQUE
#include <memory>

namespace std {
template <typename T, typename... Args>
std::unique_ptr<T> make_unique(Args&&... args) {
return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
}
}
#endif

namespace anudb {
// Collection class representing a MongoDB-like collection
Expand Down Expand Up @@ -55,13 +45,13 @@ namespace anudb {
Status readAllDocuments(std::vector<Document>& docIds, uint64_t limit = 10);

// Read all documents from the collection
Status exportAllToJsonAsync(const std::string &exportPath);
Status exportAllToJsonAsync(const std::string& exportPath);

// Read all documents from the collection
Status importFromJsonFile(const std::string& filePath);

// Update document from the collection whose Id is matching
Status updateDocument(const std::string& id, const json &update, bool upsert = false);
Status updateDocument(const std::string& id, const json& update, bool upsert = false);

// find document from the collection whose filter option is matchin
std::vector<std::string> findDocument(const json& filterOption);
Expand Down Expand Up @@ -117,5 +107,204 @@ namespace anudb {
std::string collection_name_;
std::string output_path_;
};

class MemoryEfficientJsonParser {
private:
std::ifstream& file;
std::vector<char> buffer;
size_t bufferPos;
size_t validSize;
static const size_t BUFFER_SIZE = 8192; // 8KB chunks
bool arrayStarted;
bool arrayEnded;

public:
MemoryEfficientJsonParser(std::ifstream& f)
: file(f), bufferPos(0), validSize(0), arrayStarted(false), arrayEnded(false) {
buffer.resize(BUFFER_SIZE * 2); // Fixed 16KB buffer
}

~MemoryEfficientJsonParser() {
// Explicit cleanup
buffer.clear();
buffer.shrink_to_fit();
}

// Fill buffer with new data, moving remaining data to beginning
bool fillBuffer() {
if (file.eof()) return false;

// Move remaining data to beginning using efficient memmove
if (bufferPos > 0 && bufferPos < validSize) {
size_t remaining = validSize - bufferPos;
std::memmove(&buffer[0], &buffer[bufferPos], remaining);
validSize = remaining;
bufferPos = 0;
}
else if (bufferPos >= validSize) {
validSize = 0;
bufferPos = 0;
}

// Read new data into available space
size_t spaceAvailable = buffer.size() - validSize;
if (spaceAvailable > 0) {
file.read(&buffer[validSize], spaceAvailable);
std::streamsize bytesRead = file.gcount();
if (bytesRead > 0) {
validSize += bytesRead;
return true;
}
}
return false;
}

// Skip whitespace characters
void skipWhitespace() {
while (bufferPos < validSize && std::isspace(buffer[bufferPos])) {
bufferPos++;
}

// Refill buffer if we're near the end
if (bufferPos >= validSize - 10 && !file.eof()) {
fillBuffer();
while (bufferPos < validSize && std::isspace(buffer[bufferPos])) {
bufferPos++;
}
}
}

// Extract next JSON object from array
std::string extractNextObject() {
if (arrayEnded) return "";

skipWhitespace();

// Ensure we have data
if (bufferPos >= validSize) {
if (!fillBuffer()) return "";
skipWhitespace();
}

if (bufferPos >= validSize) return "";

char ch = buffer[bufferPos];

// Handle array start
if (!arrayStarted && ch == '[') {
arrayStarted = true;
bufferPos++;
skipWhitespace();

// Check for empty array
if (bufferPos < validSize && buffer[bufferPos] == ']') {
arrayEnded = true;
return "";
}
}
// Handle array end
else if (ch == ']') {
arrayEnded = true;
return "";
}
// Handle comma separator
else if (ch == ',') {
bufferPos++;
skipWhitespace();
}

// Verify we're in an array
if (!arrayStarted) {
return ""; // Not a valid JSON array
}

// Extract the JSON object
return extractJsonObject();
}

bool isArrayStarted() const { return arrayStarted; }
bool isArrayEnded() const { return arrayEnded; }

private:
// Extract a single JSON object using incremental building
std::string extractJsonObject() {
skipWhitespace();

if (bufferPos >= validSize || buffer[bufferPos] != '{') {
return ""; // Not a valid object start
}

// Use incremental result building to avoid large string copies
std::string result;
result.reserve(512); // Start with reasonable size

size_t objectStart = bufferPos;
int braceCount = 0;
bool inString = false;
bool escaped = false;

while (true) {
// Check if we need more data
if (bufferPos >= validSize) {
// Append current segment to result
if (objectStart < validSize) {
result.append(&buffer[objectStart], validSize - objectStart);
}

// Reset buffer and read more data
bufferPos = 0;
validSize = 0;
objectStart = 0;

if (!fillBuffer()) {
break; // End of file
}
}

char ch = buffer[bufferPos];

// JSON parsing state machine
if (!inString) {
if (ch == '"') {
inString = true;
}
else if (ch == '{') {
braceCount++;
}
else if (ch == '}') {
braceCount--;
if (braceCount == 0) {
// Complete object found
bufferPos++; // Include closing brace

// Append final segment
result.append(&buffer[objectStart], bufferPos - objectStart);

// Optimize memory usage
result.shrink_to_fit();
return result;
}
}
}
else {
// Inside string
if (escaped) {
escaped = false;
}
else if (ch == '\\') {
escaped = true;
}
else if (ch == '"') {
inString = false;
}
}

bufferPos++;
}

// Incomplete object (shouldn't happen with valid JSON)
return "";
}
};
}
#endif // COLLECTION_H
8 changes: 6 additions & 2 deletions src/Database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

using namespace anudb;

Status Database::open() {
Status Database::open(bool walTracker) {
isDbOpen_ = true;
return engine_.open();
return engine_.open(walTracker);
}

Status Database::close() {
Expand Down Expand Up @@ -77,6 +77,10 @@ Status Database::exportAllToJsonAsync(const std::string& collectionName, const s
return collection->exportAllToJsonAsync(exportPath);
}

void Database::registerCallback(WalOperationCallback callback) {
engine_.registerCallback(callback);
}

Status Database::importFromJsonFile(const std::string& collectionName, const std::string& importFile) {
Collection* collection = getCollection(collectionName);
if (!collection) {
Expand Down
3 changes: 2 additions & 1 deletion src/Database.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace anudb {
class Database {
public:
Database(const std::string& dbPath) : engine_(dbPath) {}
Status open();
Status open(bool walTracker = false);
Status close();
Status createCollection(const std::string& name);
Status dropCollection(const std::string& name);
Expand All @@ -18,6 +18,7 @@ namespace anudb {
Status readAllDocuments(const std::string& collectionName, std::vector<Document>& docIds);
Status exportAllToJsonAsync(const std::string& collectionName, const std::string& exportPath);
Status importFromJsonFile(const std::string& collectionName, const std::string& importFile);
void registerCallback(WalOperationCallback callback);

Collection* getCollection(const std::string& name);
std::vector<std::string> getCollectionNames() const;
Expand Down
Loading