Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 12 additions & 13 deletions capio/common/requests.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,18 @@ constexpr const int CAPIO_REQUEST_FSTAT = 7;
constexpr const int CAPIO_REQUEST_GETDENTS = 8;
constexpr const int CAPIO_REQUEST_GETDENTS64 = 9;
constexpr const int CAPIO_REQUEST_HANDSHAKE_NAMED = 10;
constexpr const int CAPIO_REQUEST_HANDSHAKE_ANONYMOUS = 11;
constexpr const int CAPIO_REQUEST_MKDIR = 12;
constexpr const int CAPIO_REQUEST_OPEN = 13;
constexpr const int CAPIO_REQUEST_READ = 14;
constexpr const int CAPIO_REQUEST_RENAME = 15;
constexpr const int CAPIO_REQUEST_SEEK = 16;
constexpr const int CAPIO_REQUEST_SEEK_DATA = 17;
constexpr const int CAPIO_REQUEST_SEEK_END = 18;
constexpr const int CAPIO_REQUEST_SEEK_HOLE = 19;
constexpr const int CAPIO_REQUEST_STAT = 20;
constexpr const int CAPIO_REQUEST_UNLINK = 21;
constexpr const int CAPIO_REQUEST_WRITE = 22;
constexpr const int CAPIO_REQUEST_RMDIR = 23;
constexpr const int CAPIO_REQUEST_MKDIR = 11;
constexpr const int CAPIO_REQUEST_OPEN = 12;
constexpr const int CAPIO_REQUEST_READ = 13;
constexpr const int CAPIO_REQUEST_RENAME = 14;
constexpr const int CAPIO_REQUEST_SEEK = 15;
constexpr const int CAPIO_REQUEST_SEEK_DATA = 16;
constexpr const int CAPIO_REQUEST_SEEK_END = 17;
constexpr const int CAPIO_REQUEST_SEEK_HOLE = 18;
constexpr const int CAPIO_REQUEST_STAT = 19;
constexpr const int CAPIO_REQUEST_UNLINK = 20;
constexpr const int CAPIO_REQUEST_WRITE = 21;
constexpr const int CAPIO_REQUEST_RMDIR = 22;

constexpr const int CAPIO_NR_REQUESTS = 24;

Expand Down
7 changes: 0 additions & 7 deletions capio/posix/utils/requests.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,6 @@ inline off64_t getdents_request(const int fd, const off64_t count, bool is64bit,
return res;
}

inline void handshake_anonymous_request(const long tid, const long pid) {
START_LOG(capio_syscall(SYS_gettid), "call(tid=%ld, pid=%ld)", tid, pid);
char req[CAPIO_REQ_MAX_SIZE];
sprintf(req, "%04d %ld %ld", CAPIO_REQUEST_HANDSHAKE_ANONYMOUS, tid, pid);
buf_requests->write(req, CAPIO_REQ_MAX_SIZE);
}

inline void handshake_named_request(const long tid, const long pid, const std::string &app_name,
const bool wait = false) {
START_LOG(capio_syscall(SYS_gettid), "call(tid=%ld, pid=%ld, app_name=%s)", tid, pid,
Expand Down
4 changes: 2 additions & 2 deletions capio/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ set(ARGS_BUILD_UNITTESTS OFF CACHE INTERNAL "")
FetchContent_Declare(
capio_cl
GIT_REPOSITORY https://github.com/High-Performance-IO/CAPIO-CL.git
GIT_TAG v1.1.1
GIT_TAG v1.3.4
)

FetchContent_MakeAvailable(args capio_cl)
Expand Down Expand Up @@ -84,4 +84,4 @@ IF (ENABLE_COVERAGE)
ELSE (CMAKE_BUILD_TYPE STREQUAL "Debug")
message(WARNING "Code coverage is disabled in release mode.")
ENDIF (CMAKE_BUILD_TYPE STREQUAL "Debug")
ENDIF (ENABLE_COVERAGE)
ENDIF (ENABLE_COVERAGE)
108 changes: 29 additions & 79 deletions capio/server/capio_server.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@


#include <algorithm>
#include <args.hxx>
#include <array>
Expand All @@ -18,11 +20,13 @@
#include <unordered_set>
#include <vector>

#include "capiocl.hpp"
#include "capiocl/engine.h"
#include "capiocl/parser.h"
#include "utils/capiocl_adapter.hpp"

std::string workflow_name;

#include "client-manager/client_manager.hpp"
#include "client/manager.hpp"
#include "client/request.hpp"
#include "common/env.hpp"
#include "common/logger.hpp"
#include "common/requests.hpp"
Expand All @@ -34,6 +38,7 @@ std::string workflow_name;

ClientManager *client_manager;
StorageManager *storage_manager;
ClientRequestManager *request_manager;

int n_servers;
// name of the node
Expand All @@ -46,7 +51,6 @@ CSClientsRemotePendingNFilesMap_t clients_remote_pending_nfiles;

std::mutex nfiles_mutex;

#include "handlers.hpp"
#include "utils/location.hpp"
#include "utils/signals.hpp"

Expand All @@ -57,68 +61,8 @@ std::mutex nfiles_mutex;
* can only access it through a const reference. This prevents any modifications to the engine
* outside of those permitted by the capiocl::Engine class itself.
*/
capiocl::Engine *capio_cl_engine;
const capiocl::Engine &CapioCLEngine::get() { return *capio_cl_engine; }

static constexpr std::array<CSHandler_t, CAPIO_NR_REQUESTS> build_request_handlers_table() {
std::array<CSHandler_t, CAPIO_NR_REQUESTS> _request_handlers{0};

_request_handlers[CAPIO_REQUEST_ACCESS] = access_handler;
_request_handlers[CAPIO_REQUEST_CLONE] = clone_handler;
_request_handlers[CAPIO_REQUEST_CLOSE] = close_handler;
_request_handlers[CAPIO_REQUEST_CREATE] = create_handler;
_request_handlers[CAPIO_REQUEST_CREATE_EXCLUSIVE] = create_exclusive_handler;
_request_handlers[CAPIO_REQUEST_DUP] = dup_handler;
_request_handlers[CAPIO_REQUEST_EXIT_GROUP] = exit_group_handler;
_request_handlers[CAPIO_REQUEST_FSTAT] = fstat_handler;
_request_handlers[CAPIO_REQUEST_GETDENTS] = getdents_handler;
_request_handlers[CAPIO_REQUEST_GETDENTS64] = getdents_handler;
_request_handlers[CAPIO_REQUEST_HANDSHAKE_NAMED] = handshake_named_handler;
_request_handlers[CAPIO_REQUEST_HANDSHAKE_ANONYMOUS] = handshake_anonymous_handler;
_request_handlers[CAPIO_REQUEST_MKDIR] = mkdir_handler;
_request_handlers[CAPIO_REQUEST_OPEN] = open_handler;
_request_handlers[CAPIO_REQUEST_READ] = read_handler;
_request_handlers[CAPIO_REQUEST_RENAME] = rename_handler;
_request_handlers[CAPIO_REQUEST_RMDIR] = rmdir_handler;
_request_handlers[CAPIO_REQUEST_SEEK] = lseek_handler;
_request_handlers[CAPIO_REQUEST_SEEK_DATA] = seek_data_handler;
_request_handlers[CAPIO_REQUEST_SEEK_END] = seek_end_handler;
_request_handlers[CAPIO_REQUEST_SEEK_HOLE] = seek_hole_handler;
_request_handlers[CAPIO_REQUEST_STAT] = stat_handler;
_request_handlers[CAPIO_REQUEST_UNLINK] = unlink_handler;
_request_handlers[CAPIO_REQUEST_WRITE] = write_handler;

return _request_handlers;
}

[[noreturn]] void capio_server(Semaphore &internal_server_sem) {
static const std::array<CSHandler_t, CAPIO_NR_REQUESTS> request_handlers =
build_request_handlers_table();

START_LOG(gettid(), "call()");

MPI_Comm_size(MPI_COMM_WORLD, &n_servers);
setup_signal_handlers();
backend->handshake_servers();

storage_manager->addDirectory(getpid(), get_capio_dir());

internal_server_sem.unlock();

auto str = std::unique_ptr<char[]>(new char[CAPIO_REQ_MAX_SIZE]);
while (true) {
LOG(CAPIO_LOG_SERVER_REQUEST_START);
int code = client_manager->readNextRequest(str.get());
if (code < 0 || code > CAPIO_NR_REQUESTS) {
std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << "Received invalid code: " << code
<< std::endl;

ERR_EXIT("Error: received invalid request code");
}
request_handlers[code](str.get());
LOG(CAPIO_LOG_SERVER_REQUEST_END);
}
}
capiocl::engine::Engine *capio_cl_engine;
const capiocl::engine::Engine &CapioCLEngine::get() { return *capio_cl_engine; }

int parseCLI(int argc, char **argv) {
Logger *log;
Expand Down Expand Up @@ -226,11 +170,10 @@ int parseCLI(int argc, char **argv) {
std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "parsing config file: " << token
<< std::endl;

std::tie(workflow_name, capio_cl_engine) =
capiocl::Parser::parse(token, resolve_path, store_all_in_memory);
capio_cl_engine = capiocl::parser::Parser::parse(token, resolve_path, store_all_in_memory);
} else if (noConfigFile) {
workflow_name = std::string_view(get_capio_workflow_name());
capio_cl_engine = new capiocl::Engine();
capio_cl_engine = new capiocl::engine::Engine();
capio_cl_engine->setWorkflowName(get_capio_workflow_name());
if (store_all_in_memory) {
capio_cl_engine->setAllStoreInMemory();
}
Expand All @@ -239,7 +182,7 @@ int parseCLI(int argc, char **argv) {
<< std::endl
<< CAPIO_LOG_SERVER_CLI_LEVEL_WARNING
<< "Obtained from environment variable current workflow name: "
<< workflow_name.data() << std::endl;
<< capio_cl_engine->getWorkflowName() << std::endl;
} else {
std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR
<< "Error: no config file provided. To skip config file use --no-config option!"
Expand Down Expand Up @@ -276,34 +219,41 @@ int parseCLI(int argc, char **argv) {
}
backend = select_backend(backend_name_str, argc, argv);

std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "server initialization completed!" << std::endl
std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "Completed parse of CLI args!" << std::endl
<< std::flush;
return 0;
}

int main(int argc, char **argv) {

Semaphore internal_server_sem(0);

std::cout << CAPIO_LOG_SERVER_BANNER;

setup_signal_handlers();

parseCLI(argc, argv);

START_LOG(gettid(), "call()");

open_files_location();

shm_canary = new CapioShmCanary(workflow_name);
shm_canary = new CapioShmCanary(capio_cl_engine->getWorkflowName());
storage_manager = new StorageManager();
client_manager = new ClientManager();
request_manager = new ClientRequestManager();

backend->handshake_servers();
storage_manager->addDirectory(getpid(), get_capio_dir());

std::thread server_thread(capio_server, std::ref(internal_server_sem));
LOG("capio_server thread started");
std::thread remote_listener_thread(capio_remote_listener, std::ref(internal_server_sem));
std::thread remote_listener_thread(capio_remote_listener);
LOG("capio_remote_listener thread started.");
server_thread.join();

request_manager->start();
remote_listener_thread.join();

delete backend;
delete storage_manager;
delete client_manager;
delete request_manager;

return 0;
}
54 changes: 54 additions & 0 deletions capio/server/include/client/request.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#ifndef CAPIO_REQUEST_HPP
#define CAPIO_REQUEST_HPP
#include "common/requests.hpp"
#include "utils/types.hpp"

class ClientRequestManager {

typedef void (*CSHandler_t)(const char *const);

struct MemHandlers {
static void access_handler(const char *const str);
static void clone_handler(const char *const str);
static void close_handler(const char *const str);
static void create_handler(const char *const str);
static void create_exclusive_handler(const char *const str);
static void dup_handler(const char *const str);
static void fstat_handler(const char *const str);
static void getdents_handler(const char *const str);
static void mkdir_handler(const char *const str);
static void open_handler(const char *const str);
static void read_handler(const char *const str);
static void rename_handler(const char *const str);
static void rmdir_handler(const char *const str);
static void lseek_handler(const char *const str);
static void seek_data_handler(const char *const str);
static void seek_end_handler(const char *const str);
static void seek_hole_handler(const char *const str);
static void stat_handler(const char *const str);
static void unlink_handler(const char *const str);
static void write_handler(const char *const str);
};

struct Handlers {
static void handshake_named_handler(const char *const str);
static void exit_group_handler(const char *const str);
};

struct ClientUtilities {
static void reply_stat(int tid, const std::filesystem::path &path);
static void handle_close(int tid, int fd);
static void handle_exit_group(int fd);
static void handle_seek_end(int tid, int fd);
};

const std::array<CSHandler_t, CAPIO_NR_REQUESTS> request_handlers;

static constexpr std::array<CSHandler_t, CAPIO_NR_REQUESTS> build_request_handlers_table();

public:
ClientRequestManager();
void start() const;
};

#endif // CAPIO_REQUEST_HPP
24 changes: 0 additions & 24 deletions capio/server/include/handlers.hpp

This file was deleted.

4 changes: 3 additions & 1 deletion capio/server/include/remote/backend.hpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#ifndef CAPIO_SERVER_REMOTE_BACKEND_HPP
#define CAPIO_SERVER_REMOTE_BACKEND_HPP
#include "common/logger.hpp"
#include <charconv>
#include <set>

#include "common/logger.hpp"

class RemoteRequest {
private:
Expand Down
1 change: 1 addition & 0 deletions capio/server/include/remote/backend/mpi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class MPIBackend : public Backend {
nodes.emplace(node_name);
rank_nodes_equivalence[std::to_string(rank)] = node_name;
rank_nodes_equivalence[node_name] = std::to_string(rank);
MPI_Comm_size(MPI_COMM_WORLD, &n_servers);
}

~MPIBackend() override {
Expand Down
2 changes: 1 addition & 1 deletion capio/server/include/remote/handlers/stat.hpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#ifndef CAPIO_SERVER_REMOTE_HANDLERS_STAT_HPP
#define CAPIO_SERVER_REMOTE_HANDLERS_STAT_HPP

#include "client-manager/client_manager.hpp"
#include "client/manager.hpp"
#include "remote/backend.hpp"
#include "remote/requests.hpp"
#include "storage/manager.hpp"
Expand Down
4 changes: 1 addition & 3 deletions capio/server/include/remote/listener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,12 @@ inline Backend *select_backend(const std::string &backend_name, int argc, char *
return new MPIBackend(argc, argv);
}

[[noreturn]] void capio_remote_listener(Semaphore &internal_server_sem) {
[[noreturn]] void capio_remote_listener() {
static const std::array<CComsHandler_t, CAPIO_SERVER_NR_REQUEST> server_request_handlers =
build_server_request_handlers_table();

START_LOG(gettid(), "call()");

internal_server_sem.lock();

while (true) {
auto request = backend->read_next_request();

Expand Down
3 changes: 3 additions & 0 deletions capio/server/include/remote/requests.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
#define CAPIO_REMOTE_REQUESTS_HPP

#include "storage/manager.hpp"
#include "common/requests.hpp"
#include "utils/location.hpp"

extern StorageManager *storage_manager;
extern char *node_name;

inline void serve_remote_stat_request(const std::filesystem::path &path, int source_tid,
off64_t file_size, bool is_dir, const std::string &dest) {
Expand Down
1 change: 1 addition & 0 deletions capio/server/include/utils/capio_file.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <set>
#include <string_view>
#include <utility>
#include <vector>

#include <fcntl.h>
#include <sys/stat.h>
Expand Down
Loading
Loading