From e025648d9234dfbf883a8fc12ca91c772eb62a7d Mon Sep 17 00:00:00 2001 From: Coldwings Date: Wed, 28 Jan 2026 23:33:09 +0800 Subject: [PATCH] serve tool --- examples/http2_client.cpp | 103 +++++------- examples/http_client.cpp | 113 ++++++------- examples/http_server.cpp | 187 +++++++--------------- examples/sse_client.cpp | 172 +++++++------------- examples/sse_server.cpp | 200 ++++++++++------------- examples/websocket_client.cpp | 163 +++++++------------ examples/websocket_server.cpp | 163 ++++++++----------- include/elio/elio.hpp | 1 + include/elio/http/websocket_server.hpp | 10 +- include/elio/runtime/serve.hpp | 210 +++++++++++++++++++++++++ wiki/API-Reference.md | 86 ++++++++++ wiki/Signal-Handling.md | 22 ++- 12 files changed, 728 insertions(+), 702 deletions(-) create mode 100644 include/elio/runtime/serve.hpp diff --git a/examples/http2_client.cpp b/examples/http2_client.cpp index 2e31027..7ebc82e 100644 --- a/examples/http2_client.cpp +++ b/examples/http2_client.cpp @@ -1,6 +1,6 @@ /// @file http2_client.cpp /// @brief HTTP/2 Client Example -/// +/// /// This example demonstrates how to make HTTP/2 requests using Elio's /// HTTP/2 client with multiplexed streams over TLS. /// @@ -13,31 +13,23 @@ #include #include -#include -#include -#include +#include using namespace elio; using namespace elio::http; -using namespace elio::runtime; - -// Completion signaling -std::atomic g_done{false}; -std::mutex g_mutex; -std::condition_variable g_cv; /// Perform HTTP/2 requests demonstrating various features -coro::task run_client(const std::string& base_url) { +coro::task run_demo(const std::string& base_url) { // Create HTTP/2 client with custom config h2_client_config config; config.user_agent = "elio-http2-client-example/1.0"; config.max_concurrent_streams = 100; h2_client client(config); - + ELIO_LOG_INFO("=== HTTP/2 Client Example ==="); ELIO_LOG_INFO("Base URL: {}", base_url); - + // 1. Simple GET request ELIO_LOG_INFO("\n--- HTTP/2 GET Request ---"); { @@ -47,7 +39,7 @@ coro::task run_client(const std::string& base_url) { ELIO_LOG_INFO("Status: {}", static_cast(resp.get_status())); ELIO_LOG_INFO("Content-Type: {}", resp.content_type()); ELIO_LOG_INFO("Body length: {} bytes", resp.body().size()); - + // Print first 200 chars of body auto body = resp.body(); if (body.size() > 200) { @@ -59,7 +51,7 @@ coro::task run_client(const std::string& base_url) { ELIO_LOG_ERROR("HTTP/2 GET request failed: {}", strerror(errno)); } } - + // 2. Multiple requests using same connection (HTTP/2 multiplexing) ELIO_LOG_INFO("\n--- HTTP/2 Connection Multiplexing ---"); { @@ -68,7 +60,7 @@ coro::task run_client(const std::string& base_url) { ELIO_LOG_INFO("Request {}/3...", i); auto result = co_await client.get(base_url); if (result) { - ELIO_LOG_INFO(" Status: {} (body: {} bytes)", + ELIO_LOG_INFO(" Status: {} (body: {} bytes)", static_cast(result->get_status()), result->body().size()); } else { @@ -77,36 +69,30 @@ coro::task run_client(const std::string& base_url) { } ELIO_LOG_INFO("All requests used HTTP/2 multiplexed streams on single connection"); } - + ELIO_LOG_INFO("\n=== HTTP/2 Client Example Complete ==="); - - // Signal completion - { - std::lock_guard lock(g_mutex); - g_done = true; - } - g_cv.notify_all(); + co_return; } /// Simple one-off HTTP/2 request demonstration -coro::task simple_request(const std::string& url) { +coro::task simple_fetch(const std::string& url) { ELIO_LOG_INFO("Fetching via HTTP/2: {}", url); // Use convenience function for one-off requests auto result = co_await h2_get(url); - + if (result) { auto& resp = *result; ELIO_LOG_INFO("Status: {}", static_cast(resp.get_status())); ELIO_LOG_INFO("Content-Type: {}", resp.content_type()); ELIO_LOG_INFO("Content-Length: {}", resp.body().size()); - + // Print response headers ELIO_LOG_INFO("Response Headers:"); for (const auto& [name, value] : resp.get_headers()) { ELIO_LOG_INFO(" {}: {}", name, value); } - + // Print response body (truncated if too long) auto body = resp.body(); if (body.size() > 500) { @@ -118,25 +104,33 @@ coro::task simple_request(const std::string& url) { ELIO_LOG_ERROR("HTTP/2 request failed: {}", strerror(errno)); ELIO_LOG_INFO("Note: HTTP/2 requires HTTPS and server support for h2 ALPN."); } - - // Signal completion - { - std::lock_guard lock(g_mutex); - g_done = true; - } - g_cv.notify_all(); + co_return; } -int main(int argc, char* argv[]) { +/// Async main - uses ELIO_ASYNC_MAIN for automatic scheduler management +coro::task async_main(int argc, char* argv[]) { std::string url; bool full_demo = false; - + + ELIO_LOG_INFO("HTTP/2 Client Example"); + ELIO_LOG_INFO("Using nghttp2 library for HTTP/2 protocol support"); + // Parse arguments if (argc > 1) { std::string arg = argv[1]; if (arg == "--demo" || arg == "-d") { full_demo = true; url = "https://nghttp2.org/"; // nghttp2.org always supports HTTP/2 + } else if (arg == "--help" || arg == "-h") { + std::cout << "Usage: " << argv[0] << " [options] [url]\n" + << "\n" + << "Options:\n" + << " --demo, -d Run full feature demonstration\n" + << " --help, -h Show this help\n" + << "\n" + << "Default: Fetches https://nghttp2.org/\n" + << "Note: HTTP/2 requires HTTPS and server support for h2 ALPN.\n"; + co_return 0; } else { url = arg; } @@ -145,35 +139,16 @@ int main(int argc, char* argv[]) { full_demo = false; url = "https://nghttp2.org/"; } - - ELIO_LOG_INFO("HTTP/2 Client Example"); - ELIO_LOG_INFO("Using nghttp2 library for HTTP/2 protocol support"); - - // Create scheduler - scheduler sched(2); - sched.start(); - + // Run appropriate mode if (full_demo) { - auto task = run_client(url); - sched.spawn(task.release()); + co_await run_demo(url); } else { - auto task = simple_request(url); - sched.spawn(task.release()); - } - - // Wait for completion with timeout - { - std::unique_lock lock(g_mutex); - g_cv.wait_for(lock, std::chrono::seconds(60), [] { return g_done.load(); }); + co_await simple_fetch(url); } - - // Brief drain before shutdown - auto& ctx = io::default_io_context(); - for (int i = 0; i < 10 && ctx.has_pending(); ++i) { - ctx.poll(std::chrono::milliseconds(10)); - } - - sched.shutdown(); - return 0; + + co_return 0; } + +// Use ELIO_ASYNC_MAIN - handles scheduler creation, execution, and shutdown automatically +ELIO_ASYNC_MAIN(async_main) diff --git a/examples/http_client.cpp b/examples/http_client.cpp index ef5475b..9970ecc 100644 --- a/examples/http_client.cpp +++ b/examples/http_client.cpp @@ -1,6 +1,6 @@ /// @file http_client.cpp /// @brief HTTP Client Example -/// +/// /// This example demonstrates how to make HTTP requests using Elio's /// HTTP client with connection pooling and TLS support. /// @@ -11,21 +11,13 @@ #include #include -#include -#include -#include +#include using namespace elio; using namespace elio::http; -using namespace elio::runtime; - -// Completion signaling -std::atomic g_done{false}; -std::mutex g_mutex; -std::condition_variable g_cv; /// Perform multiple HTTP requests demonstrating various features -coro::task run_client(const std::string& base_url) { +coro::task run_demo(const std::string& base_url) { // Create client with custom config client_config config; config.user_agent = "elio-http-client-example/1.0"; @@ -33,10 +25,10 @@ coro::task run_client(const std::string& base_url) { config.max_redirects = 5; client c(config); - + ELIO_LOG_INFO("=== HTTP Client Example ==="); ELIO_LOG_INFO("Base URL: {}", base_url); - + // 1. Simple GET request ELIO_LOG_INFO("\n--- GET Request ---"); { @@ -46,7 +38,7 @@ coro::task run_client(const std::string& base_url) { ELIO_LOG_INFO("Status: {} {}", resp.status_code(), status_reason(resp.get_status())); ELIO_LOG_INFO("Content-Type: {}", resp.content_type()); ELIO_LOG_INFO("Body length: {} bytes", resp.body().size()); - + // Print first 200 chars of body auto body = resp.body(); if (body.size() > 200) { @@ -58,7 +50,7 @@ coro::task run_client(const std::string& base_url) { ELIO_LOG_ERROR("GET request failed: {}", strerror(errno)); } } - + // 2. POST request with JSON ELIO_LOG_INFO("\n--- POST Request (JSON) ---"); { @@ -67,7 +59,7 @@ coro::task run_client(const std::string& base_url) { if (result) { auto& resp = *result; ELIO_LOG_INFO("Status: {} {}", resp.status_code(), status_reason(resp.get_status())); - + auto body = resp.body(); if (body.size() > 300) { ELIO_LOG_INFO("Body (truncated): {}", body.substr(0, 300)); @@ -78,7 +70,7 @@ coro::task run_client(const std::string& base_url) { ELIO_LOG_ERROR("POST request failed: {}", strerror(errno)); } } - + // 3. POST with form data ELIO_LOG_INFO("\n--- POST Request (Form) ---"); { @@ -90,7 +82,7 @@ coro::task run_client(const std::string& base_url) { ELIO_LOG_ERROR("Form POST failed: {}", strerror(errno)); } } - + // 4. Custom headers ELIO_LOG_INFO("\n--- Custom Headers ---"); { @@ -98,12 +90,12 @@ coro::task run_client(const std::string& base_url) { req.set_host("httpbin.org"); req.set_header("X-Custom-Header", "elio-test-value"); req.set_header("Accept", "application/json"); - + url target; target.scheme = "https"; target.host = "httpbin.org"; target.path = "/headers"; - + auto result = co_await c.send(req, target); if (result) { ELIO_LOG_INFO("Status: {} {}", result->status_code(), status_reason(result->get_status())); @@ -117,7 +109,7 @@ coro::task run_client(const std::string& base_url) { ELIO_LOG_ERROR("Custom headers request failed: {}", strerror(errno)); } } - + // 5. Connection reuse (multiple requests to same host) ELIO_LOG_INFO("\n--- Connection Reuse (Keep-Alive) ---"); { @@ -132,7 +124,7 @@ coro::task run_client(const std::string& base_url) { } ELIO_LOG_INFO("All requests used keep-alive connection pooling"); } - + // 6. Redirect following ELIO_LOG_INFO("\n--- Redirect Following ---"); { @@ -144,7 +136,7 @@ coro::task run_client(const std::string& base_url) { ELIO_LOG_ERROR("Redirect request failed: {}", strerror(errno)); } } - + // 7. Status codes ELIO_LOG_INFO("\n--- Various Status Codes ---"); { @@ -152,14 +144,14 @@ coro::task run_client(const std::string& base_url) { for (int code : codes) { auto result = co_await c.get(base_url + "/status/" + std::to_string(code)); if (result) { - ELIO_LOG_INFO("Requested {}: got {} {}", + ELIO_LOG_INFO("Requested {}: got {} {}", code, result->status_code(), status_reason(result->get_status())); } else { ELIO_LOG_ERROR("Request for status {} failed", code); } } } - + // 8. HEAD request ELIO_LOG_INFO("\n--- HEAD Request ---"); { @@ -172,30 +164,24 @@ coro::task run_client(const std::string& base_url) { ELIO_LOG_ERROR("HEAD request failed: {}", strerror(errno)); } } - + ELIO_LOG_INFO("\n=== HTTP Client Example Complete ==="); - - // Signal completion - { - std::lock_guard lock(g_mutex); - g_done = true; - } - g_cv.notify_all(); + co_return; } /// Simple one-off request demonstration -coro::task simple_request(const std::string& url) { +coro::task simple_fetch(const std::string& url) { ELIO_LOG_INFO("Fetching: {}", url); // Use convenience function for one-off requests auto result = co_await http::get(url); - + if (result) { auto& resp = *result; ELIO_LOG_INFO("Status: {} {}", resp.status_code(), status_reason(resp.get_status())); ELIO_LOG_INFO("Content-Type: {}", resp.content_type()); ELIO_LOG_INFO("Content-Length: {}", resp.body().size()); - + // Print response body (truncated if too long) auto body = resp.body(); if (body.size() > 500) { @@ -206,25 +192,30 @@ coro::task simple_request(const std::string& url) { } else { ELIO_LOG_ERROR("Request failed: {}", strerror(errno)); } - - // Signal completion - { - std::lock_guard lock(g_mutex); - g_done = true; - } - g_cv.notify_all(); + co_return; } -int main(int argc, char* argv[]) { +/// Async main - uses ELIO_ASYNC_MAIN for automatic scheduler management +coro::task async_main(int argc, char* argv[]) { std::string url; bool full_demo = false; - + // Parse arguments if (argc > 1) { std::string arg = argv[1]; if (arg == "--demo" || arg == "-d") { full_demo = true; url = "https://httpbin.org"; + } else if (arg == "--help" || arg == "-h") { + std::cout << "Usage: " << argv[0] << " [options] [url]\n" + << "\n" + << "Options:\n" + << " --demo, -d Run full feature demonstration\n" + << " --help, -h Show this help\n" + << "\n" + << "If url is provided, fetches that URL.\n" + << "If no arguments, runs full demo with httpbin.org.\n"; + co_return 0; } else { url = arg; } @@ -233,32 +224,16 @@ int main(int argc, char* argv[]) { full_demo = true; url = "https://httpbin.org"; } - - // Create scheduler - scheduler sched(2); - sched.start(); - + // Run appropriate mode if (full_demo) { - auto task = run_client(url); - sched.spawn(task.release()); + co_await run_demo(url); } else { - auto task = simple_request(url); - sched.spawn(task.release()); - } - - // Wait for completion with timeout - { - std::unique_lock lock(g_mutex); - g_cv.wait_for(lock, std::chrono::seconds(60), [] { return g_done.load(); }); + co_await simple_fetch(url); } - - // Brief drain before shutdown - auto& ctx = io::default_io_context(); - for (int i = 0; i < 10 && ctx.has_pending(); ++i) { - ctx.poll(std::chrono::milliseconds(10)); - } - - sched.shutdown(); - return 0; + + co_return 0; } + +// Use ELIO_ASYNC_MAIN - handles scheduler creation, execution, and shutdown automatically +ELIO_ASYNC_MAIN(async_main) diff --git a/examples/http_server.cpp b/examples/http_server.cpp index a51320e..954f74b 100644 --- a/examples/http_server.cpp +++ b/examples/http_server.cpp @@ -1,6 +1,6 @@ /// @file http_server.cpp /// @brief HTTP Server Example -/// +/// /// This example demonstrates how to build an HTTP server using Elio's /// HTTP module with router-based request handling. /// @@ -21,32 +21,10 @@ #include #include -#include #include using namespace elio; using namespace elio::http; -using namespace elio::runtime; -using namespace elio::signal; - -// Global flag for graceful shutdown -std::atomic g_running{true}; - -/// Signal handler coroutine - waits for SIGINT/SIGTERM -coro::task signal_handler_task() { - signal_set sigs{SIGINT, SIGTERM}; - signal_fd sigfd(sigs); - - ELIO_LOG_DEBUG("Signal handler started, waiting for SIGINT/SIGTERM..."); - - auto info = co_await sigfd.wait(); - if (info) { - ELIO_LOG_INFO("Received signal: {} - initiating shutdown", info->full_name()); - } - - g_running = false; - co_return; -} // Simple in-memory data store for demo std::vector todos; @@ -83,24 +61,24 @@ coro::task hello_handler([[maybe_unused]] context& ctx) { // Handler: POST /api/echo coro::task echo_handler(context& ctx) { auto& req = ctx.req(); - std::string body_str(req.body()); - - std::string json = R"({"echo": ")" + body_str + R"(", "content_type": ")" + - std::string(req.content_type()) + R"("})"; + std::string body(req.body()); + + std::string json = R"({"echo": ")" + body + R"(", "length": )" + + std::to_string(body.size()) + "}"; co_return response::json(json); } // Handler: GET /api/todos coro::task list_todos_handler([[maybe_unused]] context& ctx) { std::lock_guard lock(todos_mutex); - + std::string json = "["; for (size_t i = 0; i < todos.size(); ++i) { if (i > 0) json += ","; json += R"({"id": )" + std::to_string(i) + R"(, "text": ")" + todos[i] + R"("})"; } json += "]"; - + co_return response::json(json); } @@ -108,84 +86,78 @@ coro::task list_todos_handler([[maybe_unused]] context& ctx) { coro::task add_todo_handler(context& ctx) { auto& req = ctx.req(); std::string text(req.body()); - + + if (text.empty()) { + co_return response::bad_request("Todo text is required"); + } + size_t id; { std::lock_guard lock(todos_mutex); id = todos.size(); todos.push_back(text); } - + std::string json = R"({"id": )" + std::to_string(id) + R"(, "text": ")" + text + R"("})"; - - response resp(status::created, json, mime::application_json); - co_return resp; + co_return response(status::created, json, mime::application_json); } // Handler: GET /api/todos/:id coro::task get_todo_handler(context& ctx) { - auto id_str = ctx.param("id"); - size_t id = 0; - auto [ptr, ec] = std::from_chars(id_str.data(), id_str.data() + id_str.size(), id); - - if (ec != std::errc{}) { - co_return response::bad_request(R"({"error": "Invalid ID"})"); - } - + std::string id_str(ctx.param("id")); + size_t id = std::stoul(id_str); + std::lock_guard lock(todos_mutex); if (id >= todos.size()) { - co_return response::not_found(R"({"error": "Todo not found"})"); + co_return response::not_found(); } - - std::string json = R"({"id": )" + std::to_string(id) + R"(, "text": ")" + todos[id] + R"("})"; + + std::string json = R"({"id": )" + std::to_string(id) + + R"(, "text": ")" + todos[id] + R"("})"; co_return response::json(json); } // Handler: DELETE /api/todos/:id coro::task delete_todo_handler(context& ctx) { - auto id_str = ctx.param("id"); - size_t id = 0; - auto [ptr, ec] = std::from_chars(id_str.data(), id_str.data() + id_str.size(), id); - - if (ec != std::errc{}) { - co_return response::bad_request(R"({"error": "Invalid ID"})"); - } - + std::string id_str(ctx.param("id")); + size_t id = std::stoul(id_str); + std::lock_guard lock(todos_mutex); if (id >= todos.size()) { - co_return response::not_found(R"({"error": "Todo not found"})"); + co_return response::not_found(); } - + todos.erase(todos.begin() + static_cast(id)); - - co_return response(status::no_content); + co_return response::json(R"({"deleted": true})"); } // Handler: GET /api/info -coro::task info_handler(context& ctx) { +coro::task info_handler([[maybe_unused]] context& ctx) { std::string json = R"({ "server": "Elio HTTP Server", - "version": "1.0.0", - "client": ")" + std::string(ctx.client_addr()) + R"(" + "version": ")" + std::string(elio::version()) + R"(", + "features": ["async", "coroutines", "http/1.1", "keep-alive"] })"; co_return response::json(json); } -// Handler: GET /api/users/:id +// Handler with path parameter: GET /api/users/:id coro::task user_handler(context& ctx) { - auto id = ctx.param("id"); - std::string json = R"({"user_id": ")" + std::string(id) + R"("})"; + std::string_view id = ctx.param("id"); + std::string json = R"({"user_id": ")" + std::string(id) + R"(", "name": "User )" + + std::string(id) + R"("})"; co_return response::json(json); } -int main(int argc, char* argv[]) { +/// Async main - uses ELIO_ASYNC_MAIN with elio::serve() for clean server lifecycle +coro::task async_main(int argc, char* argv[]) { uint16_t port = 8080; bool use_https = false; std::string cert_file, key_file; std::string bind_address; bool ipv4_only = false; bool ipv6_only = false; - + // Parse arguments for (int i = 1; i < argc; ++i) { std::string arg = argv[i]; @@ -208,16 +180,16 @@ int main(int argc, char* argv[]) { << " --https cert key Enable HTTPS\n" << " -h, --help Show this help\n" << "Default: HTTP on port 8080 (dual-stack)\n"; - return 0; + co_return 0; } else if (arg[0] != '-') { port = static_cast(std::stoi(arg)); } } - + // Determine bind address net::socket_address bind_addr; net::tcp_options opts; - + if (!bind_address.empty()) { bind_addr = net::socket_address(bind_address, port); } else if (ipv4_only) { @@ -226,14 +198,10 @@ int main(int argc, char* argv[]) { bind_addr = net::socket_address(net::ipv6_address(port)); opts.ipv6_only = ipv6_only; } - - // Block signals BEFORE creating scheduler threads - signal_set sigs{SIGINT, SIGTERM}; - sigs.block_all_threads(); - + // Create router router r; - + // Register routes r.get("/", index_handler); r.get("/api/hello", hello_handler); @@ -244,70 +212,37 @@ int main(int argc, char* argv[]) { r.del("/api/todos/:id", delete_todo_handler); r.get("/api/info", info_handler); r.get("/api/users/:id", user_handler); - + // Create server server srv(std::move(r)); - + // Set custom 404 handler srv.set_not_found_handler([](context& ctx) -> coro::task { - std::string json = R"({"error": "Not Found", "path": ")" + + std::string json = R"({"error": "Not Found", "path": ")" + std::string(ctx.req().path()) + R"("})"; - auto resp = response(status::not_found, json, mime::application_json); - co_return resp; + co_return response(status::not_found, json, mime::application_json); }); - - // Create scheduler - scheduler sched(4); - sched.start(); - - // Spawn signal handler coroutine - auto sig_handler = signal_handler_task(); - sched.spawn(sig_handler.release()); - - // Start server + + ELIO_LOG_INFO("Press Ctrl+C to stop"); + + // Start server and wait for shutdown signal + // elio::serve() handles signal waiting and graceful shutdown automatically if (use_https) { try { auto tls_ctx = tls::tls_context::make_server(cert_file, key_file); - - auto server_task = srv.listen_tls( - bind_addr, - tls_ctx, - opts - ); - sched.spawn(server_task.release()); - - ELIO_LOG_INFO("HTTPS server started on {}", bind_addr.to_string()); + ELIO_LOG_INFO("Starting HTTPS server on {}", bind_addr.to_string()); + co_await elio::serve(srv, srv.listen_tls(bind_addr, tls_ctx, opts)); } catch (const std::exception& e) { ELIO_LOG_ERROR("Failed to start HTTPS server: {}", e.what()); - return 1; + co_return 1; } } else { - auto server_task = srv.listen( - bind_addr, - opts - ); - sched.spawn(server_task.release()); - - ELIO_LOG_INFO("HTTP server started on {}", bind_addr.to_string()); - } - - ELIO_LOG_INFO("Press Ctrl+C to stop"); - - // Wait for shutdown - while (g_running) { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - - srv.stop(); - - // Brief drain before shutdown - auto& ctx = io::default_io_context(); - for (int i = 0; i < 10 && ctx.has_pending(); ++i) { - ctx.poll(std::chrono::milliseconds(10)); + ELIO_LOG_INFO("Starting HTTP server on {}", bind_addr.to_string()); + co_await elio::serve(srv, srv.listen(bind_addr, opts)); } - - sched.shutdown(); - - ELIO_LOG_INFO("Server stopped"); - return 0; + + co_return 0; } + +// Use ELIO_ASYNC_MAIN - handles scheduler creation, execution, and shutdown automatically +ELIO_ASYNC_MAIN(async_main) diff --git a/examples/sse_client.cpp b/examples/sse_client.cpp index 8163b59..c88ce80 100644 --- a/examples/sse_client.cpp +++ b/examples/sse_client.cpp @@ -17,19 +17,11 @@ #include #include -#include -#include -#include #include using namespace elio; using namespace elio::http::sse; -// Completion signaling -std::atomic g_done{false}; -std::mutex g_mutex; -std::condition_variable g_cv; - /// Listen to SSE events coro::task listen_events(const std::string& url) { ELIO_LOG_INFO("=== SSE Client Demo ==="); @@ -43,26 +35,21 @@ coro::task listen_events(const std::string& url) { config.verify_certificate = false; // Allow self-signed certs for testing sse_client client(config); - + // Connect bool connected = co_await client.connect(url); if (!connected) { ELIO_LOG_ERROR("Failed to connect to {}", url); - { - std::lock_guard lock(g_mutex); - g_done = true; - } - g_cv.notify_all(); co_return; } - + ELIO_LOG_INFO("Connected! Listening for events..."); ELIO_LOG_INFO("Press Ctrl+C to stop\n"); - + // Receive events int event_count = 0; int max_events = 20; // Limit for demo - + while (client.is_connected() && event_count < max_events) { auto evt = co_await client.receive(); if (!evt) { @@ -73,35 +60,29 @@ coro::task listen_events(const std::string& url) { ELIO_LOG_INFO("Connection closed"); break; } - + ++event_count; - + // Display event std::string type = evt->type.empty() ? "message" : evt->type; std::string id = evt->id.empty() ? "(none)" : evt->id; - + ELIO_LOG_INFO("[Event #{:3}] type={} id={}", event_count, type, id); ELIO_LOG_INFO(" data: {}", evt->data); - + // Track last event ID if (!evt->id.empty()) { ELIO_LOG_DEBUG(" Last-Event-ID is now: {}", client.last_event_id()); } } - + // Close connection ELIO_LOG_INFO("\nClosing connection..."); co_await client.close(); - + ELIO_LOG_INFO("Received {} events total", event_count); ELIO_LOG_INFO("=== Demo Complete ==="); - - // Signal completion - { - std::lock_guard lock(g_mutex); - g_done = true; - } - g_cv.notify_all(); + co_return; } /// Simple connection test @@ -111,17 +92,12 @@ coro::task simple_test(const std::string& url) { auto client_opt = co_await sse_connect(url); if (!client_opt) { ELIO_LOG_ERROR("Failed to connect"); - { - std::lock_guard lock(g_mutex); - g_done = true; - } - g_cv.notify_all(); co_return; } - + auto& client = *client_opt; ELIO_LOG_INFO("Connected! Waiting for events..."); - + // Receive a few events for (int i = 0; i < 5; ++i) { auto evt = co_await client.receive(); @@ -129,19 +105,14 @@ coro::task simple_test(const std::string& url) { ELIO_LOG_INFO("Connection closed"); break; } - - ELIO_LOG_INFO("Event {}: {} ({})", i + 1, evt->data, + + ELIO_LOG_INFO("Event {}: {} ({})", i + 1, evt->data, evt->type.empty() ? "message" : evt->type); } - + co_await client.close(); ELIO_LOG_INFO("Test complete"); - - { - std::lock_guard lock(g_mutex); - g_done = true; - } - g_cv.notify_all(); + co_return; } /// Test reconnection behavior @@ -154,23 +125,18 @@ coro::task reconnect_test(const std::string& url) { config.max_reconnect_attempts = 3; sse_client client(config); - + if (!co_await client.connect(url)) { ELIO_LOG_ERROR("Initial connection failed"); - { - std::lock_guard lock(g_mutex); - g_done = true; - } - g_cv.notify_all(); co_return; } - + ELIO_LOG_INFO("Connected! Listening for events (reconnection enabled)..."); - + int event_count = 0; while (event_count < 30) { // Receive up to 30 events auto evt = co_await client.receive(); - + if (!evt) { if (client.state() == client_state::reconnecting) { ELIO_LOG_INFO("Connection lost, reconnecting..."); @@ -179,49 +145,41 @@ coro::task reconnect_test(const std::string& url) { ELIO_LOG_INFO("Connection permanently closed"); break; } - + ++event_count; - ELIO_LOG_INFO("[{}] {}: {}", event_count, - evt->type.empty() ? "message" : evt->type, + ELIO_LOG_INFO("[{}] {}: {}", event_count, + evt->type.empty() ? "message" : evt->type, evt->data); } - + co_await client.close(); ELIO_LOG_INFO("Reconnection test complete ({} events received)", event_count); - - { - std::lock_guard lock(g_mutex); - g_done = true; - } - g_cv.notify_all(); -} - -void print_usage(const char* program) { - std::cout << "Usage: " << program << " [options] [url]\n" - << "\n" - << "Options:\n" - << " --demo Run feature demonstration (default)\n" - << " --simple Run simple connection test\n" - << " --reconnect Test reconnection behavior\n" - << " --help Show this help\n" - << "\n" - << "Default URL: http://localhost:8080/events\n" - << "\n" - << "Examples:\n" - << " " << program << " http://localhost:8080/events\n" - << " " << program << " --simple http://localhost:3000/sse\n"; + co_return; } -int main(int argc, char* argv[]) { +/// Async main - uses ELIO_ASYNC_MAIN for automatic scheduler management +coro::task async_main(int argc, char* argv[]) { std::string url = "http://localhost:8080/events"; enum class Mode { demo, simple, reconnect } mode = Mode::demo; - + // Parse arguments for (int i = 1; i < argc; ++i) { std::string arg = argv[i]; if (arg == "--help" || arg == "-h") { - print_usage(argv[0]); - return 0; + std::cout << "Usage: " << argv[0] << " [options] [url]\n" + << "\n" + << "Options:\n" + << " --demo, -d Run feature demonstration (default)\n" + << " --simple, -s Run simple connection test\n" + << " --reconnect, -r Test reconnection behavior\n" + << " --help, -h Show this help\n" + << "\n" + << "Default URL: http://localhost:8080/events\n" + << "\n" + << "Examples:\n" + << " " << argv[0] << " http://localhost:8080/events\n" + << " " << argv[0] << " --simple http://localhost:3000/sse\n"; + co_return 0; } else if (arg == "--demo" || arg == "-d") { mode = Mode::demo; } else if (arg == "--simple" || arg == "-s") { @@ -232,42 +190,22 @@ int main(int argc, char* argv[]) { url = arg; } } - - // Create scheduler - runtime::scheduler sched(2); - sched.start(); - + // Run client based on mode switch (mode) { - case Mode::demo: { - auto task = listen_events(url); - sched.spawn(task.release()); + case Mode::demo: + co_await listen_events(url); break; - } - case Mode::simple: { - auto task = simple_test(url); - sched.spawn(task.release()); + case Mode::simple: + co_await simple_test(url); break; - } - case Mode::reconnect: { - auto task = reconnect_test(url); - sched.spawn(task.release()); + case Mode::reconnect: + co_await reconnect_test(url); break; - } } - - // Wait for completion with timeout - { - std::unique_lock lock(g_mutex); - g_cv.wait_for(lock, std::chrono::seconds(120), [] { return g_done.load(); }); - } - - // Brief drain before shutdown - auto& ctx = io::default_io_context(); - for (int i = 0; i < 10 && ctx.has_pending(); ++i) { - ctx.poll(std::chrono::milliseconds(10)); - } - - sched.shutdown(); - return 0; + + co_return 0; } + +// Use ELIO_ASYNC_MAIN - handles scheduler creation, execution, and shutdown automatically +ELIO_ASYNC_MAIN(async_main) diff --git a/examples/sse_server.cpp b/examples/sse_server.cpp index 21b1e26..70723cf 100644 --- a/examples/sse_server.cpp +++ b/examples/sse_server.cpp @@ -24,46 +24,29 @@ using namespace elio; using namespace elio::http; using namespace elio::http::sse; -using namespace elio::signal; - -// Global flag for graceful shutdown -std::atomic g_running{true}; - -/// Signal handler coroutine - waits for SIGINT/SIGTERM -coro::task signal_handler_task() { - signal_set sigs{SIGINT, SIGTERM}; - signal_fd sigfd(sigs); - - ELIO_LOG_DEBUG("Signal handler started, waiting for SIGINT/SIGTERM..."); - - auto info = co_await sigfd.wait(); - if (info) { - ELIO_LOG_INFO("Received signal: {} - initiating shutdown", info->full_name()); - } - - g_running = false; - co_return; -} // Global counter for events std::atomic g_event_counter{0}; +// Running flag for SSE streams +std::atomic g_sse_active{true}; + /// SSE event stream handler - sends periodic events coro::task event_stream(net::tcp_stream& stream) { // Create SSE connection sse_connection conn(&stream); - + ELIO_LOG_INFO("SSE client connected"); - + // Send initial retry interval co_await conn.send_retry(3000); // 3 seconds - + // Send events until client disconnects or server stops uint64_t local_counter = 0; - while (conn.is_active() && g_running) { + while (conn.is_active() && g_sse_active) { ++local_counter; uint64_t event_id = ++g_event_counter; - + // Send different event types switch (local_counter % 4) { case 0: { @@ -75,13 +58,13 @@ coro::task event_stream(net::tcp_stream& stream) { co_await conn.send(evt); break; } - + case 1: { // Typed event (e.g., for different data streams) event evt = event::full( std::to_string(event_id), "heartbeat", - R"({"alive":true,"timestamp":)" + + R"({"alive":true,"timestamp":)" + std::to_string(std::chrono::system_clock::now() .time_since_epoch().count()) + "}", -1 @@ -89,27 +72,27 @@ coro::task event_stream(net::tcp_stream& stream) { co_await conn.send(evt); break; } - + case 2: { // JSON data event - std::string json = R"({"type":"update","count":)" + - std::to_string(local_counter) + + std::string json = R"({"type":"update","count":)" + + std::to_string(local_counter) + R"(,"id":")" + std::to_string(event_id) + R"("})"; co_await conn.send_event("data", json); break; } - + case 3: { // Keep-alive comment (doesn't trigger client event) co_await conn.send_comment("keep-alive"); break; } } - + // Wait before sending next event co_await time::sleep_for(std::chrono::seconds(1)); } - + ELIO_LOG_INFO("SSE client disconnected"); } @@ -118,8 +101,8 @@ class sse_http_server { public: explicit sse_http_server(router r, server_config config = {}) : router_(std::move(r)), config_(config) {} - - coro::task listen(const net::ipv4_address& addr) { + + coro::task listen(const net::socket_address& addr) { auto* sched = runtime::scheduler::current(); if (!sched) { ELIO_LOG_ERROR("SSE server must be started from within a scheduler context"); @@ -131,13 +114,13 @@ class sse_http_server { ELIO_LOG_ERROR("Failed to bind SSE server: {}", strerror(errno)); co_return; } - + ELIO_LOG_INFO("SSE server listening on {}", addr.to_string()); - + auto& listener = *listener_result; running_ = true; - - while (running_ && g_running) { + + while (running_) { auto stream_result = co_await listener.accept(); if (!stream_result) { if (running_) { @@ -145,53 +128,56 @@ class sse_http_server { } continue; } - + auto handler = handle_connection(std::move(*stream_result)); sched->spawn(handler.release()); } } - - void stop() { running_ = false; } - + + void stop() { + running_ = false; + g_sse_active = false; + } + private: coro::task handle_connection(net::tcp_stream stream) { std::vector buffer(config_.read_buffer_size); request_parser parser; - + // Read HTTP request while (!parser.is_complete() && !parser.has_error()) { auto result = co_await stream.read(buffer.data(), buffer.size()); if (result.result <= 0) co_return; - + auto [parse_result, consumed] = parser.parse( std::string_view(buffer.data(), result.result)); if (parse_result == parse_result::error) co_return; } - + if (parser.has_error()) co_return; - + auto req = request::from_parser(parser); - + // Check if this is an SSE request if (req.path() == "/events" || req.path() == "/sse") { // Send SSE headers - std::string headers = + std::string headers = "HTTP/1.1 200 OK\r\n" "Content-Type: text/event-stream\r\n" "Cache-Control: no-cache\r\n" "Connection: keep-alive\r\n" "Access-Control-Allow-Origin: *\r\n" "\r\n"; - + auto write_result = co_await stream.write(headers.data(), headers.size()); if (write_result.result <= 0) co_return; - + // Get Last-Event-ID if present auto last_id = req.header("Last-Event-ID"); if (!last_id.empty()) { ELIO_LOG_INFO("Client reconnecting with Last-Event-ID: {}", last_id); } - + // Handle SSE stream co_await event_stream(stream); } else { @@ -199,11 +185,11 @@ class sse_http_server { auto peer = stream.peer_address(); std::string client_addr = peer ? peer->to_string() : "unknown"; context ctx(std::move(req), client_addr); - + std::unordered_map params; - auto* route = router_.find_route(ctx.req().get_method(), + auto* route = router_.find_route(ctx.req().get_method(), ctx.req().path(), params); - + response resp; if (route) { for (const auto& [name, value] : params) { @@ -217,12 +203,12 @@ class sse_http_server { } else { resp = response::not_found(); } - + auto data = resp.serialize(); co_await stream.write(data.data(), data.size()); } } - + router router_; server_config config_; std::atomic running_{false}; @@ -247,20 +233,20 @@ coro::task index_handler([[maybe_unused]] context& ctx) {

Elio Server-Sent Events Test

- +
Disconnected
- +

Events:

- +