diff --git a/ext/h2o/build.zig b/ext/h2o/build.zig index 0149ef8bf4..1cba2fa23a 100644 --- a/ext/h2o/build.zig +++ b/ext/h2o/build.zig @@ -30,6 +30,11 @@ pub fn build(b: *std.Build) !void { .optimize = optimize, }); + const wslay = b.dependency("wslay", .{ + .target = target, + .optimize = optimize, + }); + const h2o_c = b.dependency("h2o", .{ .target = target, .optimize = optimize, @@ -345,6 +350,7 @@ pub fn build(b: *std.Build) !void { h2o.linkLibrary(libyrmcds); h2o.linkLibrary(picohttpparser); h2o.linkLibrary(picotls); + h2o.linkLibrary(wslay.artifact("wslay")); // h2o.linkLibrary(ssl_conservatory); h2o.linkLibC(); @@ -354,6 +360,7 @@ pub fn build(b: *std.Build) !void { h2o.addIncludePath(h2o_c.path("include")); h2o.addIncludePath(h2o_c.path("include/h2o")); h2o.addIncludePath(h2o_c.path("include/h2o/socket")); + h2o.addIncludePath(wslay.path("lib/includes")); h2o.addCSourceFiles(.{ .root = h2o_c.path("lib"), @@ -425,6 +432,7 @@ pub fn build(b: *std.Build) !void { "http2/http2_debug_state.c", "http2/scheduler.c", "http2/stream.c", + "websocket.c", "tunnel.c", }, .flags = &.{ @@ -435,6 +443,7 @@ pub fn build(b: *std.Build) !void { "-pthread", "-DH2O_USE_LIBUV", "-DH2O_USE_PICOTLS", + "-DWSLAY_VERSION=\\\"1.1.1\\\"", if (t.os.tag == .linux) "-D_GNU_SOURCE" else "", }, }); diff --git a/ext/h2o/build.zig.zon b/ext/h2o/build.zig.zon index 76ab140c9d..5fc0143bf0 100644 --- a/ext/h2o/build.zig.zon +++ b/ext/h2o/build.zig.zon @@ -23,6 +23,9 @@ .url = "https://github.com/DLTcollab/sse2neon/archive/refs/tags/v1.5.1.tar.gz", .hash = "N-V-__8AAPihCgDO1A74r0pFZaoafeZ41DR435smSDwGLbLb", }, + .wslay = .{ + .path = "../wslay", + }, .patches = .{ .path = "./patches", }, diff --git a/ext/wslay/build.zig b/ext/wslay/build.zig new file mode 100644 index 0000000000..91dd3b746d --- /dev/null +++ b/ext/wslay/build.zig @@ -0,0 +1,60 @@ +const std = @import("std"); + +const wslay_version = "1.1.1"; + +pub fn build(b: *std.Build) !void { + const target = b.standardTargetOptions(.{}); + const optimize = b.standardOptimizeOption(.{}); + const t = target.result; + + const wslay_src = b.dependency("wslay", .{ + .target = target, + .optimize = optimize, + }); + + const lib = b.addStaticLibrary(.{ + .name = "wslay", + .target = target, + .optimize = optimize, + }); + + lib.linkLibC(); + + lib.addIncludePath(wslay_src.path("lib")); + lib.addIncludePath(wslay_src.path("lib/includes")); + + var flags = std.ArrayList([]const u8).init(b.allocator); + defer flags.deinit(); + + try flags.append("-fno-sanitize=all"); + try flags.append(b.fmt("-DWSLAY_VERSION=\\\"{s}\\\"", .{wslay_version})); + + if (t.os.tag == .windows) { + try flags.append("-DHAVE_WINSOCK2_H=1"); + } else { + try flags.append("-DHAVE_ARPA_INET_H=1"); + try flags.append("-DHAVE_NETINET_IN_H=1"); + } + + if (t.cpu.arch.endian() == .big) { + try flags.append("-DWORDS_BIGENDIAN=1"); + } + + lib.addCSourceFiles(.{ + .root = wslay_src.path("lib"), + .files = &.{ + "wslay_event.c", + "wslay_frame.c", + "wslay_net.c", + "wslay_queue.c", + "wslay_stack.c", + }, + .flags = flags.items, + }); + + lib.installHeadersDirectory(wslay_src.path("lib/includes"), "", .{ + .include_extensions = &.{".h"}, + }); + + b.installArtifact(lib); +} diff --git a/ext/wslay/build.zig.zon b/ext/wslay/build.zig.zon new file mode 100644 index 0000000000..47f8b12a05 --- /dev/null +++ b/ext/wslay/build.zig.zon @@ -0,0 +1,13 @@ +.{ + .name = "wslay", + .version = "0.0.1", + .dependencies = .{ + .wslay = .{ + .url = "https://github.com/tatsuhiro-t/wslay/archive/refs/tags/release-1.1.1.tar.gz", + .hash = "N-V-__8AADdLBADNETOk9pvYZ-k7F8a08-U43YCrYSlxu621", + }, + }, + .paths = .{ + "", + }, +} diff --git a/flake.lock b/flake.lock index 10f88e0e43..ba1acb0ec2 100644 --- a/flake.lock +++ b/flake.lock @@ -1,12 +1,30 @@ { "nodes": { + "flake-utils": { + "inputs": { + "systems": "systems" + }, + "locked": { + "lastModified": 1731533236, + "narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=", + "owner": "numtide", + "repo": "flake-utils", + "rev": "11707dc2f618dd54ca8739b309ec4fc024de578b", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "flake-utils", + "type": "github" + } + }, "nixpkgs": { "locked": { - "lastModified": 1712849433, - "narHash": "sha256-flQtf/ZPJgkLY/So3Fd+dGilw2DKIsiwgMEn7BbBHL0=", + "lastModified": 1760256791, + "narHash": "sha256-uTpzDHRASEDeFUuToWSQ46Re8beXyG9dx4W36FQa0/c=", "owner": "nixos", "repo": "nixpkgs", - "rev": "f173d0881eff3b21ebb29a2ef8bedbc106c86ea5", + "rev": "832e3b6db48508ae436c2c7bfc0cf914eac6938e", "type": "github" }, "original": { @@ -16,31 +34,26 @@ "type": "github" } }, - "parts": { + "root": { "inputs": { - "nixpkgs-lib": [ - "nixpkgs" - ] - }, + "flake-utils": "flake-utils", + "nixpkgs": "nixpkgs" + } + }, + "systems": { "locked": { - "lastModified": 1712014858, - "narHash": "sha256-sB4SWl2lX95bExY2gMFG5HIzvva5AVMJd4Igm+GpZNw=", - "owner": "hercules-ci", - "repo": "flake-parts", - "rev": "9126214d0a59633752a136528f5f3b9aa8565b7d", + "lastModified": 1681028828, + "narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=", + "owner": "nix-systems", + "repo": "default", + "rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e", "type": "github" }, "original": { - "owner": "hercules-ci", - "repo": "flake-parts", + "owner": "nix-systems", + "repo": "default", "type": "github" } - }, - "root": { - "inputs": { - "nixpkgs": "nixpkgs", - "parts": "parts" - } } }, "root": "root", diff --git a/flake.nix b/flake.nix index 05237289b3..c54f20adad 100644 --- a/flake.nix +++ b/flake.nix @@ -1,127 +1,30 @@ { - description = "Vere build devshell"; + description = "default devshell"; inputs = { nixpkgs.url = "github:nixos/nixpkgs/nixpkgs-unstable"; - parts = { - url = "github:hercules-ci/flake-parts"; - inputs.nixpkgs-lib.follows = "nixpkgs"; - }; + flake-utils.url = "github:numtide/flake-utils"; }; - outputs = inputs@{ self, nixpkgs, parts }: parts.lib.mkFlake { inherit inputs; } (let - # map systems to the musl-cross-make target names; only - # x86_64-linux is tested though - toolchainTargets = { - "x86_64-linux" = "x86_64-linux-musl"; - "aarch64-linux" = "aarch64-linux-musl"; - }; - # map dep urls to hashes - toolchainDeps = { - "https://ftpmirror.gnu.org/gnu/gcc/gcc-9.4.0/gcc-9.4.0.tar.xz" = "13l3p6g2krilaawbapmn9zmmrh3zdwc36mfr3msxfy038hps6pf9"; - "https://ftpmirror.gnu.org/gnu/binutils/binutils-2.33.1.tar.xz" = "1grcf8jaw3i0bk6f9xfzxw3qfgmn6fgkr108isdkbh1y3hnzqrmb"; - "https://musl.libc.org/releases/musl-1.2.3.tar.gz" = "196lrzw0qy5axiz9p5ay50q2mls8hbfckr4rw0klc7jjc9h0nnvx"; - "https://ftpmirror.gnu.org/gnu/gmp/gmp-6.1.2.tar.bz2" = "1clg7pbpk6qwxj5b2mw0pghzawp2qlm3jf9gdd8i6fl6yh2bnxaj"; - "https://ftpmirror.gnu.org/gnu/mpc/mpc-1.1.0.tar.gz" = "0biwnhjm3rx3hc0rfpvyniky4lpzsvdcwhmcn7f0h4iw2hwcb1b9"; - "https://ftpmirror.gnu.org/gnu/mpfr/mpfr-4.0.2.tar.bz2" = "1k1s4p56272bggvyrxfn3zdycr4wy7h5ipac70cr03lys013ypn0"; - "http://ftp.barfooze.de/pub/sabotage/tarballs//linux-headers-4.19.88-1.tar.xz" = "04r8k4ckqbklx9sfm07cr7vfw5ra4cic0rzanm9dfh0crxncfnwr"; - "http://git.savannah.gnu.org/gitweb/?p=config.git;a=blob_plain;f=config.sub;hb=3d5db9ebe860" = "75d5d255a2a273b6e651f82eecfabf6cbcd8eaeae70e86b417384c8f4a58d8d3"; - }; - in { - systems = builtins.attrNames toolchainTargets; - - flake = {}; - - perSystem = { pkgs, system, ... }: let - target = toolchainTargets.${system}; - - # in order to make the toolchain derivation pure, spoof `wget` - # to use FODs for the dependencies that would otherwise be - # downloaded - pseudoWget = let - c = pkgs.coreutils; - in pkgs.writeScriptBin "wget" '' - #!${pkgs.stdenv.shell} - - declare -A targets - ${builtins.concatStringsSep "\n" - (builtins.map (url: - "targets[${nixpkgs.lib.escapeShellArg url}]=" + (nixpkgs.lib.escapeShellArg (pkgs.fetchurl { - inherit url; - sha256 = builtins.getAttr url toolchainDeps; - }))) - (builtins.attrNames toolchainDeps))} - - # -c -O target url - target="$3" - url="$4" - preFetched=${"$" + "{targets[$url]}"} - if [[ -z "$preFetched" ]]; then - ${c}/bin/echo 1>&2 "$target ($url) is new or changed, nix-prefetch-url it and add or update the url and hash in toolchainDeps in flake.nix" - exit 1 - fi - ${c}/bin/cp --reflink=auto "$preFetched" "$target" - ''; - - toolchain = let - # keep in sync with `_musl_cross_make_version` in - # bazel/toolchain/BUILD.bazel - version = "fe915821b652a7fa37b34a596f47d8e20bc72338"; - - # `-g -O2` are the defaults (there's no way to add cflags via - # config.mak), `-Wno-format-security` is the interesting - # addition (without which gcc build fails in several places - # due to `-Werror=format-security` being on for some reason) - commonCFLAGS = "-g -O2 -Wno-format-security"; - in pkgs.stdenv.mkDerivation { - name = "musl-cross-make"; - inherit system version; - src = pkgs.fetchFromGitHub { - owner = "richfelker"; - repo = "musl-cross-make"; - rev = version; - sha256 = "sha256-FthfhZ+qGf2nWLICvjaO8fiP5+PYU7PqFCbPwXmJFes="; + outputs = { + self, + nixpkgs, + flake-utils, + ... + }: + flake-utils.lib.eachDefaultSystem + ( + system: let + pkgs = nixpkgs.legacyPackages.${system}; + in { + devShell = pkgs.mkShell { + buildInputs = with pkgs; [ + clang-tools + zig_0_14 + zls + llvm_19 + ]; }; - - nativeBuildInputs = [ - pseudoWget - ]; - - enableParallelBuilding = true; - configurePhase = '' - cat > config.mak < +#include +#include + +#define U3_WS_GUID "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" /* u3_csat: client connection state. */ @@ -25,6 +34,16 @@ u3_hbod* dob_u; // entry of body queue } u3_cres; +typedef enum { + u3_cws_pending = 0, + u3_cws_open = 1, + u3_cws_closing = 2, + u3_cws_closed = 3 +} u3_cwsat; + +typedef struct _u3_cttp u3_cttp; +typedef struct _u3_cws u3_cws; + /* u3_creq: outgoing http request. */ typedef struct _u3_creq { // client request @@ -45,17 +64,59 @@ u3_hbod* bur_u; // entry of send queue h2o_iovec_t* vec_u; // send-buffer array u3_cres* res_u; // nascent response + u3_cws* wsu_u; // websocket session (optional) struct _u3_creq* nex_u; // next in list struct _u3_creq* pre_u; // previous in list struct _u3_cttp* ctp_u; // cttp backpointer } u3_creq; +struct _u3_cws { + c3_l wid_l; // websocket id + u3_cwsat sat_e; // websocket state + c3_o sec; // secure (wss) + c3_c* hot_c; // host string (nullable) + c3_w ipf_w; // ipv4 (numeric) + c3_c* ipf_c; // ipv4 string + c3_s por_s; // port (numeric) + c3_c* por_c; // port string + c3_c* url_c; // request url/path + c3_c key_c[29]; // sec-websocket-key + u3_cttp* ctp_u; // backpointer + u3_creq* ceq_u; // pending handshake request + h2o_socket_t* sok_u; // underlying socket + wslay_event_context_ptr wsl_w; // wslay context + struct wslay_event_callbacks wcb_u; // wslay callbacks + c3_y* out_y; // pending write buffer + struct _u3_cws* nex_u; // next in list + struct _u3_cws* pre_u; // prev in list +}; + +static void _cttp_ws_close(u3_cws* cws_u, c3_o send_event); +static void _cttp_ws_proceed(u3_cws* cws_u); +static c3_o _cttp_ws_start(u3_cttp* ctp_u, c3_l wid_l, u3_atom url); +static c3_o _cttp_ws_send_message(u3_cws* cws_u, u3_noun msg); +static void _cttp_ws_queue_close(u3_cws* cws_u); +static void _cttp_ws_fail_handshake(u3_creq* ceq_u, const c3_c* err_c); +static void _cttp_ws_generate_key(u3_cws* cws_u); +static void _cttp_ws_compute_accept(const c3_c* key_c, c3_c out_c[29]); +static void _cttp_ws_read_cb(h2o_socket_t* sock_u, const c3_c* err_c); +static void _cttp_ws_write_cb(h2o_socket_t* sock_u, const c3_c* err_c); +static ssize_t _cttp_ws_recv_cb(wslay_event_context_ptr ctx, uint8_t* buf_y, size_t len_w, int flags, void* ves_p); +static ssize_t _cttp_ws_send_cb(wslay_event_context_ptr ctx, const uint8_t* dat_y, size_t len_w, int flags, void* ves_p); +static void _cttp_ws_msg_cb(wslay_event_context_ptr ctx, const struct wslay_event_on_msg_recv_arg* arg, void* ves_p); +static int _cttp_ws_genmask_cb(wslay_event_context_ptr ctx, uint8_t* buf_y, size_t len_w, void* ves_p); +static c3_c* _cttp_ws_origin(u3_atom nor); + +static void _cttp_creq_start(u3_creq* ceq_u); +static u3_atom _cttp_ws_normalize_url(u3_atom url); + /* u3_cttp: http client. */ typedef struct _u3_cttp { u3_auto car_u; // driver c3_l sev_l; // instance number u3_creq* ceq_u; // request list + u3_cws* cws_u; // websocket sessions uv_async_t nop_u; // unused handle (async close) h2o_timeout_t tim_u; // request timeout h2o_http1client_ctx_t // @@ -216,6 +277,218 @@ _cttp_bods_to_vec(u3_hbod* bod_u, c3_w* tot_w) return vec_u; } +/* websocket session bookkeeping helpers +*/ +static void +_cttp_ws_link(u3_cttp* ctp_u, u3_cws* cws_u) +{ + cws_u->ctp_u = ctp_u; + cws_u->nex_u = ctp_u->cws_u; + cws_u->pre_u = 0; + + if ( 0 != ctp_u->cws_u ) { + ctp_u->cws_u->pre_u = cws_u; + } + + ctp_u->cws_u = cws_u; +} + +static void +_cttp_ws_unlink(u3_cws* cws_u) +{ + u3_cttp* ctp_u = cws_u->ctp_u; + + if ( cws_u->pre_u ) { + cws_u->pre_u->nex_u = cws_u->nex_u; + if ( 0 != cws_u->nex_u ) { + cws_u->nex_u->pre_u = cws_u->pre_u; + } + } + else if ( ctp_u->cws_u == cws_u ) { + ctp_u->cws_u = cws_u->nex_u; + if ( 0 != cws_u->nex_u ) { + cws_u->nex_u->pre_u = 0; + } + } +} + +static u3_cws* +_cttp_ws_find(u3_cttp* ctp_u, c3_l wid_l) +{ + u3_cws* cws_u = ctp_u->cws_u; + + while ( cws_u ) { + if ( wid_l == cws_u->wid_l ) { + return cws_u; + } + cws_u = cws_u->nex_u; + } + + return 0; +} + +static void +_cttp_ws_plan_event(u3_cws* cws_u, u3_noun event) +{ + u3_cttp* ctp_u = cws_u->ctp_u; + c3_l wid_l = cws_u->wid_l; + u3_noun typ = u3h(event); + if ( c3y == u3a_is_cat(typ) ) { + c3_c* nam_c = u3r_string(u3k(typ)); + u3l_log("cttp: ws plan wid=%u typ=%s", wid_l, nam_c); + c3_free(nam_c); + } + else { + u3l_log("cttp: ws plan wid=%u", wid_l); + } + u3_noun wir = u3nt(u3i_string("http-client"), + u3dc("scot", c3__uv, ctp_u->sev_l), + u3_nul); + u3_noun pay = u3nc(u3i_chub((c3_d)cws_u->wid_l), event); + u3_noun cad = u3nc(u3i_string("websocket-event"), pay); + + u3_auto_plan(&ctp_u->car_u, u3_ovum_init(0, c3__i, wir, cad)); +} + +static void +_cttp_base64_encode(c3_c* dst_c, const uint8_t* src_y, size_t len_w) +{ + static const char tab_c[] = + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + size_t i_w = 0; + size_t o_w = 0; + + while ( len_w >= 3 ) { + dst_c[o_w++] = tab_c[(src_y[i_w] >> 2) & 0x3F]; + dst_c[o_w++] = tab_c[((src_y[i_w] & 0x3) << 4) | (src_y[i_w + 1] >> 4)]; + dst_c[o_w++] = tab_c[((src_y[i_w + 1] & 0xF) << 2) | (src_y[i_w + 2] >> 6)]; + dst_c[o_w++] = tab_c[src_y[i_w + 2] & 0x3F]; + i_w += 3; + len_w -= 3; + } + + if ( len_w > 0 ) { + dst_c[o_w++] = tab_c[(src_y[i_w] >> 2) & 0x3F]; + if ( 1 == len_w ) { + dst_c[o_w++] = tab_c[(src_y[i_w] & 0x3) << 4]; + dst_c[o_w++] = '='; + dst_c[o_w++] = '='; + } + else { // len_w == 2 + dst_c[o_w++] = tab_c[((src_y[i_w] & 0x3) << 4) | (src_y[i_w + 1] >> 4)]; + dst_c[o_w++] = tab_c[(src_y[i_w + 1] & 0xF) << 2]; + dst_c[o_w++] = '='; + } + } + + dst_c[o_w] = 0; +} + +static void +_cttp_ws_generate_key(u3_cws* cws_u) +{ + c3_w rad_w[16]; + c3_y raw_y[16]; + + c3_rand(rad_w); + memcpy(raw_y, rad_w, sizeof(raw_y)); + + _cttp_base64_encode(cws_u->key_c, raw_y, sizeof(raw_y)); +} + +static void +_cttp_ws_compute_accept(const c3_c* key_c, c3_c out_c[29]) +{ + c3_y sha_y[20]; + c3_c buf_c[24 + sizeof(U3_WS_GUID)]; + + memcpy(buf_c, key_c, 24); + memcpy(buf_c + 24, U3_WS_GUID, sizeof(U3_WS_GUID) - 1); + + SHA1((const unsigned char*)buf_c, 24 + (sizeof(U3_WS_GUID) - 1), sha_y); + _cttp_base64_encode(out_c, sha_y, sizeof(sha_y)); +} + +static c3_o +_cttp_header_is(const h2o_header_t* hed_u, const c3_c* name_c) +{ + size_t len = strlen(name_c); + + if ( hed_u->name->len != len ) { + return c3n; + } + + for ( size_t i = 0; i < len; i++ ) { + if ( tolower((unsigned char)hed_u->name->base[i]) != tolower((unsigned char)name_c[i]) ) { + return c3n; + } + } + + return c3y; +} + +static h2o_iovec_t* +_cttp_find_header(h2o_header_t* hed_u, size_t hed_t, const c3_c* name_c) +{ + for ( size_t i = 0; i < hed_t; i++ ) { + if ( c3y == _cttp_header_is(&hed_u[i], name_c) ) { + return &hed_u[i].value; + } + } + + return 0; +} + +static c3_c* +_cttp_ws_origin(u3_atom nor) +{ + c3_c* url_c = u3r_string(nor); + c3_c* scheme_c = strstr(url_c, "://"); + c3_c* start_c = scheme_c ? scheme_c + 3 : url_c; + c3_c* end_c = strchr(start_c, '/'); + size_t len_w = end_c ? (size_t)(end_c - url_c) : strlen(url_c); + + c3_c* ori_c = c3_malloc(len_w + 1); + memcpy(ori_c, url_c, len_w); + ori_c[len_w] = 0; + + c3_free(url_c); + return ori_c; +} + +static u3_atom +_cttp_ws_normalize_url(u3_atom url) +{ + c3_c* url_c = u3r_string(url); + size_t len_w = strlen(url_c); + u3_atom ret; + + if ( len_w >= 5 && 0 == strncmp(url_c, "ws://", 5) ) { + size_t new_len = len_w + 2; + c3_c* rew_c = c3_malloc(new_len + 1); + memcpy(rew_c, "http://", 7); + memcpy(rew_c + 7, url_c + 5, len_w - 5); + rew_c[new_len] = '\0'; + ret = u3i_string(rew_c); + c3_free(rew_c); + } + else if ( len_w >= 6 && 0 == strncmp(url_c, "wss://", 6) ) { + size_t new_len = len_w + 2; + c3_c* rew_c = c3_malloc(new_len + 1); + memcpy(rew_c, "https://", 8); + memcpy(rew_c + 8, url_c + 6, len_w - 6); + rew_c[new_len] = '\0'; + ret = u3i_string(rew_c); + c3_free(rew_c); + } + else { + ret = u3k(url); + } + + c3_free(url_c); + return ret; +} + // XX deduplicate with _http_heds_free /* _cttp_heds_free(): free header linked list */ @@ -256,6 +529,14 @@ _cttp_hed_new(u3_atom nam, u3_atom val) return hed_u; } +static void +_cttp_hed_push(u3_hhed** list_u, const c3_c* nam_c, const c3_c* val_c) +{ + u3_hhed* hed_u = _cttp_hed_new(u3i_string(nam_c), u3i_string(val_c)); + hed_u->nex_u = *list_u; + *list_u = hed_u; +} + // XX deduplicate with _http_heds_from_noun /* _cttp_heds_from_noun(): convert (list (pair @t @t)) to u3_hhed */ @@ -516,6 +797,7 @@ _cttp_creq_unlink(u3_creq* ceq_u) static void _cttp_creq_free(u3_creq* ceq_u) { + ceq_u->wsu_u = 0; _cttp_creq_unlink(ceq_u); _cttp_heds_free(ceq_u->hed_u); @@ -693,11 +975,467 @@ _cttp_creq_fire(u3_creq* ceq_u) } } +static void +_cttp_ws_close(u3_cws* cws_u, c3_o send_event) +{ + if ( !cws_u ) { + return; + } + + if ( u3_cws_closed != cws_u->sat_e ) { + u3l_log("cttp: ws close wid=%u send=%c", cws_u->wid_l, (c3y==send_event?'y':'n')); + cws_u->sat_e = u3_cws_closed; + + if ( c3y == send_event ) { + _cttp_ws_plan_event(cws_u, u3nc(u3i_string("disconnect"), u3_nul)); + send_event = c3n; + } + + if ( cws_u->wsl_w ) { + wslay_event_context_free(cws_u->wsl_w); + cws_u->wsl_w = 0; + } + + if ( cws_u->sok_u ) { + h2o_socket_t* sok_u = cws_u->sok_u; + cws_u->sok_u = 0; + h2o_socket_close(sok_u); + } + } + else if ( c3y == send_event ) { + _cttp_ws_plan_event(cws_u, u3nc(u3i_string("disconnect"), u3_nul)); + send_event = c3n; + } + + if ( cws_u->out_y ) { + c3_free(cws_u->out_y); + cws_u->out_y = 0; + } + + _cttp_ws_unlink(cws_u); + cws_u->ctp_u = 0; + + c3_free(cws_u->hot_c); + c3_free(cws_u->ipf_c); + c3_free(cws_u->por_c); + c3_free(cws_u->url_c); + + c3_free(cws_u); +} + +static ssize_t +_cttp_ws_recv_cb(wslay_event_context_ptr ctx, + uint8_t* buf_y, + size_t len_w, + int flags, + void* ves_p) +{ + u3_cws* cws_u = ves_p; + + u3l_log("cttp: ws recv_cb wid=%u want=%zu have=%zu", + cws_u->wid_l, + len_w, + (size_t)cws_u->sok_u->input->size); + + if ( 0 == cws_u->sok_u->input->size ) { + wslay_event_set_error(ctx, WSLAY_ERR_WOULDBLOCK); + u3l_log("cttp: ws recv_cb wid=%u empty", cws_u->wid_l); + return -1; + } + + if ( cws_u->sok_u->input->size < len_w ) { + len_w = cws_u->sok_u->input->size; + } + + memcpy(buf_y, cws_u->sok_u->input->bytes, len_w); + h2o_buffer_consume(&cws_u->sok_u->input, len_w); + + return (ssize_t)len_w; +} + +static ssize_t +_cttp_ws_send_cb(wslay_event_context_ptr ctx, + const uint8_t* dat_y, + size_t len_w, + int flags, + void* ves_p) +{ + u3_cws* cws_u = ves_p; + + u3l_log("cttp: ws send_cb wid=%u len=%zu writing=%c", + cws_u->wid_l, + len_w, + h2o_socket_is_writing(cws_u->sok_u) ? 'y' : 'n'); + + if ( h2o_socket_is_writing(cws_u->sok_u) ) { + wslay_event_set_error(ctx, WSLAY_ERR_WOULDBLOCK); + u3l_log("cttp: ws send_cb wid=%u busy", cws_u->wid_l); + return -1; + } + + if ( 0 == len_w ) { + return 0; + } + + cws_u->out_y = c3_malloc(len_w); + memcpy(cws_u->out_y, dat_y, len_w); + + h2o_iovec_t buf_u = h2o_iovec_init((char*)cws_u->out_y, len_w); + h2o_socket_write(cws_u->sok_u, &buf_u, 1, _cttp_ws_write_cb); + + return (ssize_t)len_w; +} + +static void +_cttp_ws_msg_cb(wslay_event_context_ptr ctx, + const struct wslay_event_on_msg_recv_arg* arg, + void* ves_p) +{ + u3_cws* cws_u = ves_p; + + if ( 0 == arg ) { + u3l_log("cttp: ws msg close wid=%u", cws_u->wid_l); + _cttp_ws_close(cws_u, c3y); + return; + } + + if ( WSLAY_CONNECTION_CLOSE == arg->opcode ) { + u3l_log("cttp: ws msg opcode close wid=%u", cws_u->wid_l); + _cttp_ws_close(cws_u, c3y); + return; + } + + u3l_log("cttp: ws msg opcode=%u len=%zu", arg->opcode, arg->msg_length); + + u3_noun payload; + if ( 0 == arg->msg_length ) { + payload = u3_nul; + } + else { + u3_noun octs = u3nc(u3i_chub((c3_d)arg->msg_length), + u3i_bytes(arg->msg_length, (const c3_y*)arg->msg)); + payload = u3nc(u3_nul, octs); + } + + u3_noun event = u3nc(u3i_string("message"), + u3nc(u3i_chub((c3_d)arg->opcode), payload)); + + _cttp_ws_plan_event(cws_u, event); +} + +static int +_cttp_ws_genmask_cb(wslay_event_context_ptr ctx, + uint8_t* buf_y, + size_t len_w, + void* ves_p) +{ + (void)ctx; + (void)ves_p; + + size_t off_w = 0; + + while ( off_w < len_w ) { + c3_w rad_w[16]; + size_t rem_w = len_w - off_w; + size_t chc_w = c3_min(rem_w, sizeof(rad_w)); + + c3_rand(rad_w); + memcpy(buf_y + off_w, (const uint8_t*)rad_w, chc_w); + off_w += chc_w; + } + + return 0; +} + +static void +_cttp_ws_proceed(u3_cws* cws_u) +{ + if ( 0 == cws_u->sok_u ) { + return; + } + + while ( 1 ) { + c3_o handled = c3n; + + if ( !h2o_socket_is_writing(cws_u->sok_u) && + wslay_event_want_write(cws_u->wsl_w) ) + { + int sas_i = wslay_event_send(cws_u->wsl_w); + + if ( 0 == sas_i ) { + handled = c3y; + } + else { + u3l_log("cttp: ws send err wid=%u code=%d want-read=%c want-write=%c", + cws_u->wid_l, + sas_i, + wslay_event_want_read(cws_u->wsl_w) ? 'y' : 'n', + wslay_event_want_write(cws_u->wsl_w) ? 'y' : 'n'); + + if ( WSLAY_ERR_WOULDBLOCK != sas_i ) { + // any other error is fatal to the websocket session + _cttp_ws_close(cws_u, c3y); + return; + } + } + } + + if ( cws_u->sok_u->input->size && wslay_event_want_read(cws_u->wsl_w) ) { + int ras_i = wslay_event_recv(cws_u->wsl_w); + + if ( 0 == ras_i ) { + handled = c3y; + } + else { + u3l_log("cttp: ws recv err wid=%u code=%d want-read=%c want-write=%c input=%zu", + cws_u->wid_l, + ras_i, + wslay_event_want_read(cws_u->wsl_w) ? 'y' : 'n', + wslay_event_want_write(cws_u->wsl_w) ? 'y' : 'n', + (size_t)cws_u->sok_u->input->size); + + if ( WSLAY_ERR_WOULDBLOCK != ras_i ) { + // wslay reports anything else as a hard protocol failure + _cttp_ws_close(cws_u, c3y); + return; + } + } + } + + if ( c3n == handled ) { + break; + } + } + + if ( wslay_event_want_read(cws_u->wsl_w) ) { + u3l_log("cttp: ws proceed wid=%u want-read", cws_u->wid_l); + h2o_socket_read_start(cws_u->sok_u, _cttp_ws_read_cb); + } + else if ( h2o_socket_is_writing(cws_u->sok_u) || + wslay_event_want_write(cws_u->wsl_w) ) + { + u3l_log("cttp: ws proceed wid=%u write-pending", cws_u->wid_l); + h2o_socket_read_stop(cws_u->sok_u); + } + else { + u3l_log("cttp: ws proceed wid=%u idle", cws_u->wid_l); + h2o_socket_read_start(cws_u->sok_u, _cttp_ws_read_cb); + } +} + +static void +_cttp_ws_read_cb(h2o_socket_t* sok_u, const c3_c* err_c) +{ + u3_cws* cws_u = sok_u->data; + + if ( err_c ) { + u3l_log("cttp: ws read err %s", err_c); + _cttp_ws_close(cws_u, c3y); + return; + } + + u3l_log("cttp: ws read wid=%u size=%zu", cws_u->wid_l, sok_u->input->size); + + _cttp_ws_proceed(cws_u); +} + +static void +_cttp_ws_write_cb(h2o_socket_t* sok_u, const c3_c* err_c) +{ + u3_cws* cws_u = sok_u->data; + + if ( cws_u->out_y ) { + c3_free(cws_u->out_y); + cws_u->out_y = 0; + } + + if ( err_c ) { + u3l_log("cttp: ws write err %s", err_c); + _cttp_ws_close(cws_u, c3y); + return; + } + + u3l_log("cttp: ws write done wid=%u", cws_u->wid_l); + _cttp_ws_proceed(cws_u); +} + +static void +_cttp_ws_queue_close(u3_cws* cws_u) +{ + if ( (u3_cws_open != cws_u->sat_e) || (0 == cws_u->wsl_w) ) { + u3l_log("cttp: ws queue close wid=%u direct", cws_u->wid_l); + _cttp_ws_close(cws_u, c3y); + return; + } + + if ( u3_cws_closing == cws_u->sat_e ) { + u3l_log("cttp: ws queue close wid=%u already", cws_u->wid_l); + return; + } + + u3l_log("cttp: ws queue close wid=%u normal", cws_u->wid_l); + cws_u->sat_e = u3_cws_closing; + wslay_event_queue_close(cws_u->wsl_w, WSLAY_CODE_NORMAL_CLOSURE, NULL, 0); + _cttp_ws_proceed(cws_u); +} + +static c3_o +_cttp_ws_send_message(u3_cws* cws_u, u3_noun msg) +{ + if ( (u3_cws_open != cws_u->sat_e) || (0 == cws_u->wsl_w) ) { + u3z(msg); + return c3n; + } + + u3_noun opcode = u3h(msg); + u3_noun body = u3t(msg); + c3_w opc_w; + + if ( c3n == u3r_safe_word(opcode, &opc_w) ) { + u3l_log("cttp: websocket invalid opcode"); + u3z(msg); + return c3n; + } + + c3_y* buf_y = 0; + size_t len_w = 0; + + if ( u3_nul != body ) { + if ( u3_nul != u3h(body) ) { + u3l_log("cttp: websocket body malformed"); + u3z(msg); + return c3n; + } + + u3_noun oct = u3t(body); + c3_d len_d = u3r_chub(0, u3h(oct)); + if ( len_d > SIZE_MAX ) { + u3l_log("cttp: websocket body too large"); + u3z(msg); + return c3n; + } + + len_w = (size_t)len_d; + if ( len_w ) { + buf_y = c3_malloc(len_w); + u3r_bytes(0, len_w, buf_y, u3t(oct)); + } + } + + struct wslay_event_msg out = { + .opcode = (uint8_t)(opc_w & 0xFF), + .msg = buf_y, + .msg_length = len_w + }; + + if ( 0 != wslay_event_queue_msg(cws_u->wsl_w, &out) ) { + if ( buf_y ) { + c3_free(buf_y); + } + u3l_log("cttp: websocket queue failed"); + u3z(msg); + return c3n; + } + + if ( buf_y ) { + c3_free(buf_y); + } + + u3z(msg); + _cttp_ws_proceed(cws_u); + return c3y; +} + +static void +_cttp_ws_fail_handshake(u3_creq* ceq_u, const c3_c* err_c) +{ + u3_cws* cws_u = ceq_u->wsu_u; + + if ( 0 == cws_u ) { + return; + } + + ceq_u->wsu_u = 0; + + if ( err_c ) { + u3l_log("cttp: websocket handshake failed (%s)", err_c); + } + else { + u3l_log("cttp: websocket handshake failed (unknown)"); + } + + _cttp_ws_plan_event(cws_u, u3nc(u3i_string("reject"), u3_nul)); + _cttp_ws_close(cws_u, c3n); +} + +static c3_o +_cttp_ws_start(u3_cttp* ctp_u, c3_l wid_l, u3_atom url) +{ + u3l_log("cttp: ws start wid=%u", wid_l); + u3_cws* cws_u = c3_calloc(sizeof(*cws_u)); + + cws_u->wid_l = wid_l; + cws_u->sat_e = u3_cws_pending; + + _cttp_ws_generate_key(cws_u); + _cttp_ws_link(ctp_u, cws_u); + + u3_atom nor = _cttp_ws_normalize_url(url); + + u3_noun hed = u3_nul; + hed = u3nc(u3nc(u3i_string("Sec-WebSocket-Key"), + u3i_string(cws_u->key_c)), + hed); + hed = u3nc(u3nc(u3i_string("Sec-WebSocket-Version"), + u3i_string("13")), + hed); + hed = u3nc(u3nc(u3i_string("Connection"), + u3i_string("Upgrade")), + hed); + hed = u3nc(u3nc(u3i_string("Upgrade"), + u3i_string("websocket")), + hed); + { + c3_c* ori_c = _cttp_ws_origin(nor); + hed = u3nc(u3nc(u3i_string("Origin"), u3i_string(ori_c)), hed); + c3_free(ori_c); + } + + u3_noun hes = u3nq(u3i_string("GET"), u3k(nor), hed, u3_nul); + + u3_creq* ceq_u = _cttp_creq_new(ctp_u, wid_l, hes); + + if ( 0 == ceq_u ) { + _cttp_ws_plan_event(cws_u, u3nc(u3i_string("reject"), u3_nul)); + _cttp_ws_close(cws_u, c3n); + u3z(nor); + u3z(url); + return c3y; + } + + ceq_u->wsu_u = cws_u; + cws_u->ceq_u = ceq_u; + cws_u->sec = ceq_u->sec; + + _cttp_creq_start(ceq_u); + + u3z(nor); + u3z(url); + return c3y; +} + + /* _cttp_creq_quit(): cancel a u3_creq */ static void _cttp_creq_quit(u3_creq* ceq_u) { + if ( ceq_u->wsu_u ) { + _cttp_ws_fail_handshake(ceq_u, "cancel"); + _cttp_creq_free(ceq_u); + return; + } + if ( u3_csat_addr == ceq_u->sat_e ) { ceq_u->sat_e = u3_csat_quit; return; // wait to be called again on address resolution @@ -732,6 +1470,12 @@ _cttp_http_client_receive(u3_creq* ceq_u, c3_w sas_w, u3_noun mes, u3_noun uct) static void _cttp_creq_fail(u3_creq* ceq_u, const c3_c* err_c) { + if ( ceq_u->wsu_u ) { + _cttp_ws_fail_handshake(ceq_u, err_c ? err_c : "handshake failed"); + _cttp_creq_free(ceq_u); + return; + } + // XX anything other than a 504? c3_w cod_w = 504; @@ -747,6 +1491,12 @@ _cttp_creq_fail(u3_creq* ceq_u, const c3_c* err_c) static void _cttp_creq_respond(u3_creq* ceq_u) { + if ( ceq_u->wsu_u ) { + _cttp_ws_fail_handshake(ceq_u, "handshake rejected"); + _cttp_creq_free(ceq_u); + return; + } + u3_cres* res_u = ceq_u->res_u; _cttp_http_client_receive(ceq_u, res_u->sas_w, res_u->hed, @@ -764,6 +1514,12 @@ _cttp_creq_on_body(h2o_http1client_t* cli_u, const c3_c* err_c) { u3_creq* ceq_u = (u3_creq *)cli_u->data; + if ( ceq_u->wsu_u ) { + _cttp_ws_fail_handshake(ceq_u, err_c ? err_c : "unexpected body"); + _cttp_creq_free(ceq_u); + return -1; + } + if ( 0 != err_c && h2o_http1client_error_is_eos != err_c ) { _cttp_creq_fail(ceq_u, err_c); return -1; @@ -795,6 +1551,84 @@ _cttp_creq_on_head(h2o_http1client_t* cli_u, const c3_c* err_c, c3_i ver_i, { u3_creq* ceq_u = (u3_creq *)cli_u->data; + if ( ceq_u->wsu_u ) { + u3_cws* cws_u = ceq_u->wsu_u; + + if ( 0 != err_c && h2o_http1client_error_is_eos != err_c ) { + _cttp_ws_fail_handshake(ceq_u, err_c); + _cttp_creq_free(ceq_u); + return 0; + } + + if ( 101 != sas_i ) { + _cttp_ws_fail_handshake(ceq_u, "status"); + _cttp_creq_free(ceq_u); + return 0; + } + + for ( size_t i = 0; i < hed_t; i++ ) { + h2o_header_t* hdr = &hed_u[i]; + u3l_log("cttp: ws hdr %.*s: %.*s", + (int)hdr->name->len, hdr->name->base, + (int)hdr->value.len, hdr->value.base); + } + + h2o_iovec_t* acc_u = _cttp_find_header(hed_u, hed_t, "sec-websocket-accept"); + c3_c exp_c[29]; + + if ( 0 == acc_u ) { + _cttp_ws_fail_handshake(ceq_u, "missing accept"); + _cttp_creq_free(ceq_u); + return 0; + } + + _cttp_ws_compute_accept(cws_u->key_c, exp_c); + + if ( acc_u->len != 28 || 0 != memcmp(acc_u->base, exp_c, 28) ) { + _cttp_ws_fail_handshake(ceq_u, "bad accept"); + _cttp_creq_free(ceq_u); + return 0; + } + + h2o_socket_t* sok_u = h2o_http1client_steal_socket(cli_u); + + if ( 0 == sok_u ) { + _cttp_ws_fail_handshake(ceq_u, "steal failure"); + _cttp_creq_free(ceq_u); + return 0; + } + + ceq_u->wsu_u = 0; + ceq_u->cli_u = 0; + cws_u->ceq_u = 0; + + if ( sok_u->input && sok_u->input->size ) { + u3l_log("cttp: ws draining leftover handshake wid=%u size=%zu", + cws_u->wid_l, + (size_t)sok_u->input->size); + h2o_buffer_consume(&sok_u->input, sok_u->input->size); + } + + _cttp_creq_free(ceq_u); + + memset(&cws_u->wcb_u, 0, sizeof(cws_u->wcb_u)); + cws_u->wcb_u.recv_callback = _cttp_ws_recv_cb; + cws_u->wcb_u.send_callback = _cttp_ws_send_cb; + cws_u->wcb_u.genmask_callback = _cttp_ws_genmask_cb; + cws_u->wcb_u.on_msg_recv_callback = _cttp_ws_msg_cb; + + wslay_event_context_client_init(&cws_u->wsl_w, &cws_u->wcb_u, cws_u); + + cws_u->sok_u = sok_u; + sok_u->data = cws_u; + + cws_u->sat_e = u3_cws_open; + + _cttp_ws_plan_event(cws_u, u3nc(u3i_string("accept"), u3_nul)); + _cttp_ws_proceed(cws_u); + return 0; + } + if ( 0 != err_c && h2o_http1client_error_is_eos != err_c ) { _cttp_creq_fail(ceq_u, err_c); return 0; @@ -1031,6 +1865,62 @@ _cttp_ef_http_client(u3_cttp* ctp_u, u3_noun tag, u3_noun dat) ret_o = c3y; } } + else if ( c3y == u3r_sing_c("websocket-handshake", tag) ) { + u3_noun wid, url; + c3_l wid_l; + + if ( (c3n == u3r_cell(dat, &wid, &url)) + || (c3n == u3r_safe_word(wid, &wid_l)) ) + { + u3l_log("cttp: strange websocket-handshake"); + ret_o = c3n; + } + else { + ret_o = _cttp_ws_start(ctp_u, wid_l, u3k(url)); + } + } + else if ( c3y == u3r_sing_c("websocket-response", tag) ) { + u3_noun wid, evt; + c3_l wid_l; + + if ( (c3n == u3r_cell(dat, &wid, &evt)) + || (c3n == u3r_safe_word(wid, &wid_l)) ) + { + u3l_log("cttp: strange websocket-response"); + ret_o = c3n; + } + else { + u3_cws* cws_u = _cttp_ws_find(ctp_u, wid_l); + + u3_noun typ = u3h(evt); + + if ( 0 == cws_u ) { + if ( c3y == u3r_sing_c("message", typ) ) { + u3l_log("cttp: unknown websocket id %u", wid_l); + ret_o = c3n; + } + else { + ret_o = c3y; + } + } + else { + if ( c3y == u3r_sing_c("message", typ) ) { + ret_o = _cttp_ws_send_message(cws_u, u3k(u3t(evt))); + } + else if ( c3y == u3r_sing_c("disconnect", typ) ) { + _cttp_ws_queue_close(cws_u); + ret_o = c3y; + } + else if ( c3y == u3r_sing_c("accept", typ) ) { + ret_o = c3y; + } + else { + u3l_log("cttp: unexpected websocket response"); + ret_o = c3n; + } + } + } + } else { u3l_log("cttp: strange effect (unknown type)"); ret_o = c3n; @@ -1103,6 +1993,16 @@ _cttp_io_exit(u3_auto* car_u) // uv_close((uv_handle_t*)&ctp_u->nop_u, _cttp_io_exit_cb); + { + u3_cws* cws_u = ctp_u->cws_u; + + while ( cws_u ) { + u3_cws* nex_u = cws_u->nex_u; + _cttp_ws_close(cws_u, c3n); + cws_u = nex_u; + } + } + // cancel requests // { @@ -1137,6 +2037,7 @@ u3_cttp_io_init(u3_pier* pir_u) // h2o_timeout_init(u3L, &ctp_u->tim_u, 300 * 1000); ctp_u->ctx_u.io_timeout = &ctp_u->tim_u; + ctp_u->ctx_u.websocket_timeout = &ctp_u->tim_u; // link to initialized tls ctx // diff --git a/pkg/vere/io/http.c b/pkg/vere/io/http.c index 5d8ca11904..90c01e1a29 100644 --- a/pkg/vere/io/http.c +++ b/pkg/vere/io/http.c @@ -2,7 +2,11 @@ #include "vere.h" +#include +#include + #include "h2o.h" +#include "h2o/websocket.h" #include "noun.h" #include "openssl/err.h" #include "openssl/ssl.h" @@ -25,6 +29,8 @@ typedef struct _u3_h2o_serv { u3_rsat_ripe = 3 // responded } u3_rsat; +typedef struct _u3_hws u3_hws; + /* u3_hreq: incoming http request. */ typedef struct _u3_hreq { @@ -34,6 +40,7 @@ typedef struct _u3_h2o_serv { uv_timer_t* tim_u; // timeout void* gen_u; // response generator struct _u3_preq* peq_u; // scry-backed (rsat_peek only) + u3_hws* wsu_u; // websocket session (optional) struct _u3_hcon* hon_u; // connection backlink struct _u3_hreq* nex_u; // next in connection's list struct _u3_hreq* pre_u; // prev in connection's list @@ -110,8 +117,31 @@ typedef struct _u3_httd { SSL_CTX* tls_u; // server SSL_CTX* u3p(u3h_root) sax_p; // url->scry cache u3p(u3h_root) nax_p; // scry->noun cache + u3_hws* web_u; // websocket sessions + c3_w wid_l; // next websocket id } u3_httd; +typedef enum { + u3_hws_pending = 0, + u3_hws_open = 1, + u3_hws_closing = 2, + u3_hws_closed = 3, +} u3_hwsat; + +struct _u3_hws { + h2o_websocket_conn_t* woc_u; // h2o websocket connection + u3_httd* htd_u; // device backpointer + u3_hreq* req_u; // pending request (handshake) + u3_hws* nex_u; // next in list + u3_hws* pre_u; // prev in list + u3_hwsat sat_e; // websocket state + c3_w wid_l; // websocket id + c3_l sev_l; // server id + c3_l coq_l; // connection id + c3_l seq_l; // request id + c3_c key_c[29]; // sec-websocket-key buffer +}; + static u3_weak _http_rec_to_httq(h2o_req_t* rec_u); static u3_hreq* _http_req_prepare(h2o_req_t* rec_u, u3_hreq* (*new_f)(u3_hcon*, h2o_req_t*)); static void _http_serv_free(u3_http* htp_u); @@ -122,6 +152,15 @@ static void _http_start_respond(u3_hreq* req_u, u3_noun headers, u3_noun data, u3_noun complete); +static void _http_ws_handshake(u3_hreq* req_u, u3_noun req, const c3_c* client_key); +static void _http_ws_message_cb(h2o_websocket_conn_t *conn, const struct wslay_event_on_msg_recv_arg *arg); +static void _http_ws_link(u3_httd* htd_u, u3_hws* web_u); +static void _http_ws_unlink(u3_hws* web_u); +static u3_hws* _http_ws_find(u3_httd* htd_u, c3_w wid_l); +static void _http_ws_disconnect(u3_hws* web_u); +static void _http_ws_plan_event(u3_hws* web_u, u3_noun event); +static void _http_ws_close_all(u3_httd* htd_u); +static void _http_ws_detach_request(u3_hreq* req_u); static const c3_i TCP_BACKLOG = 16; static const c3_w HEARTBEAT_TIMEOUT = 20 * 1000; @@ -147,8 +186,7 @@ _http_vec_to_meth(h2o_iovec_t vec_u) ( 0 == strncmp(vec_u.base, "DELETE", vec_u.len) ) ? u3i_string("DELETE") : ( 0 == strncmp(vec_u.base, "OPTIONS", vec_u.len) ) ? u3i_string("OPTIONS") : ( 0 == strncmp(vec_u.base, "TRACE", vec_u.len) ) ? u3i_string("TRACE") : - // TODO ?? - // ( 0 == strncmp(vec_u.base, "PATCH", vec_u.len) ) ? c3__patc : + ( 0 == strncmp(vec_u.base, "PATCH", vec_u.len) ) ? u3i_string("PATCH") : u3_none; } @@ -327,66 +365,69 @@ _http_heds_from_noun(u3_noun hed) /* _http_req_is_auth(): returns c3y if rec_u contains a valid auth cookie */ static c3_o -_http_req_is_auth(u3_hfig* fig_u, h2o_req_t* rec_u) +_http_cookie_has_token(u3_hfig* fig_u, h2o_iovec_t coo_u) { - // try to find a cookie header - // - h2o_iovec_t coo_u = {NULL, 0}; - { - //TODO http2 allows the client to put multiple 'cookie' headers, - // runtime should support that once eyre does too. - ssize_t hin_i = h2o_find_header_by_str(&rec_u->headers, "cookie", 6, -1); - if ( hin_i != -1 ) { - coo_u = rec_u->headers.entries[hin_i].value; + if ( NULL == coo_u.base || 0 == coo_u.len ) { + return c3n; + } + + c3_c* key_c = fig_u->key_c; + c3_c val_c[128]; + c3_y val_y = 0; + size_t i_i = 0; + size_t j_i = 0; + + while ( i_i < coo_u.len ) { + if ( ('\0' == key_c[j_i]) && ('=' == coo_u.base[i_i]) ) { + i_i++; + while ( i_i < coo_u.len + && ';' != coo_u.base[i_i] + && val_y < sizeof(val_c) ) + { + val_c[val_y++] = coo_u.base[i_i++]; + } + break; } + else if ( coo_u.base[i_i] == key_c[j_i] ) { + j_i++; + } + else { + j_i = 0; + } + i_i++; } - // if there is no cookie header, it can't possibly be authenticated - // - if ( NULL == coo_u.base ) { + if ( 0 == val_y ) { return c3n; } - // if there is a cookie, see if it contains a valid auth token - // - else { - c3_c* key_c = fig_u->key_c; - c3_c val_c[128]; - c3_y val_y = 0; - size_t i_i = 0; - size_t j_i = 0; - // step through the cookie string - // - while (i_i < coo_u.len) { - // if we found our key, read the value - // - if (key_c[j_i] == '\0' && coo_u.base[i_i] == '=') { - i_i++; - while ( i_i < coo_u.len - && coo_u.base[i_i] != ';' - && val_y < sizeof(val_c) ) { - val_c[val_y] = coo_u.base[i_i]; - val_y++; - i_i++; - } - break; - } - // keep reading the key as long as it matches - // - else if (coo_u.base[i_i] == key_c[j_i]) { - j_i++; - } - else { - j_i = 0; - } - i_i++; - } + u3_noun tok = u3i_bytes(val_y, (const c3_y*)val_c); + c3_o aut = u3kdi_has(u3k(fig_u->ses), tok); + u3_assert( (c3y == aut) || (c3n == aut) ); + u3z(tok); + return aut; +} - u3_noun aut = u3kdi_has(u3k(fig_u->ses), u3i_bytes(val_y, (c3_y*)val_c)); - u3_assert(c3y == aut || c3n == aut); +static c3_o +_http_req_is_auth(u3_hfig* fig_u, h2o_req_t* rec_u) +{ + ssize_t idx_i = -1; - return aut; + while ( 1 ) { + idx_i = h2o_find_header_by_str(&rec_u->headers, + H2O_STRLIT("cookie"), + idx_i); + if ( -1 == idx_i ) { + break; + } + + h2o_iovec_t coo_u = rec_u->headers.entries[idx_i].value; + if ( c3y == _http_cookie_has_token(fig_u, coo_u) ) { + return c3y; + } } + + return c3n; } /* _http_req_find(): find http request in connection by sequence. @@ -511,6 +552,122 @@ _http_req_kill(u3_hreq* req_u) u3_auto_plan(&htd_u->car_u, u3_ovum_init(0, c3__e, wir, cad)); } +/* _http_ws_link(): link websocket session to device list. +*/ +static void +_http_ws_link(u3_httd* htd_u, u3_hws* web_u) +{ + web_u->htd_u = htd_u; + web_u->nex_u = htd_u->web_u; + web_u->pre_u = 0; + + if ( 0 != web_u->nex_u ) { + web_u->nex_u->pre_u = web_u; + } + htd_u->web_u = web_u; +} + +/* _http_ws_unlink(): unlink websocket session from device list. +*/ +static void +_http_ws_unlink(u3_hws* web_u) +{ + if ( 0 != web_u->pre_u ) { + web_u->pre_u->nex_u = web_u->nex_u; + + if ( 0 != web_u->nex_u ) { + web_u->nex_u->pre_u = web_u->pre_u; + } + } + else if ( web_u->htd_u->web_u == web_u ) { + web_u->htd_u->web_u = web_u->nex_u; + + if ( 0 != web_u->nex_u ) { + web_u->nex_u->pre_u = 0; + } + } + + web_u->nex_u = 0; + web_u->pre_u = 0; +} + +/* _http_ws_find(): locate websocket session by id. +*/ +static u3_hws* +_http_ws_find(u3_httd* htd_u, c3_w wid_l) +{ + u3_hws* web_u = htd_u->web_u; + + while ( 0 != web_u ) { + if ( wid_l == web_u->wid_l ) { + return web_u; + } + web_u = web_u->nex_u; + } + + return 0; +} + +/* _http_ws_to_duct(): rebuild duct for websocket session. +*/ +static u3_noun +_http_ws_to_duct(u3_hws* web_u) +{ + return u3nc(u3i_string("http-server"), + u3nq(u3dc("scot", c3__uv, web_u->sev_l), + u3dc("scot", c3__ud, web_u->coq_l), + u3dc("scot", c3__ud, web_u->seq_l), + u3_nul)); +} + +/* _http_ws_plan_event(): queue websocket event for eyre. +*/ +static void +_http_ws_plan_event(u3_hws* web_u, u3_noun event) +{ + u3_noun wir = _http_ws_to_duct(web_u); + u3_noun pay = u3nc(u3i_chub((c3_d)web_u->wid_l), event); + u3_noun cad = u3nc(u3i_string("websocket-event"), pay); + + // u3l_log("http: ws emit wid=%u event=%s", web_u->wid_l, u3r_string(u3h(cad))); + + u3_auto_plan(&web_u->htd_u->car_u, u3_ovum_init(0, c3__e, wir, cad)); +} + +/* _http_ws_close_all(): close all active websocket sessions. +*/ +static void +_http_ws_close_all(u3_httd* htd_u) +{ + u3_hws* web_u = htd_u->web_u; + + while ( 0 != web_u ) { + u3_hws* nex_u = web_u->nex_u; + _http_ws_disconnect(web_u); + web_u = nex_u; + } +} + +/* _http_ws_detach_request(): release websocket session tied to request (if pending). +*/ +static void +_http_ws_detach_request(u3_hreq* req_u) +{ + if ( 0 == req_u->wsu_u ) { + return; + } + + u3_hws* web_u = req_u->wsu_u; + req_u->wsu_u = 0; + + if ( 0 != web_u->req_u && web_u->req_u == req_u ) { + web_u->req_u = 0; + web_u->sat_e = u3_hws_closed; + _http_ws_unlink(web_u); + c3_free(web_u); + } +} + typedef struct _u3_hgen { h2o_generator_t neg_u; // response callbacks c3_o red; // ready to send @@ -530,6 +687,10 @@ typedef struct _u3_hgen { static void _http_req_close(u3_hreq* req_u) { + if ( 0 != req_u->wsu_u ) { + _http_ws_detach_request(req_u); + } + // client canceled request before response // if ( u3_rsat_plan == req_u->sat_e ) { @@ -581,6 +742,10 @@ _http_req_timer_cb(uv_timer_t* tim_u) c3_c* msg_c = "gateway timeout"; h2o_send_error_generic(req_u->rec_u, 504, msg_c, msg_c, 0); + + if ( 0 != req_u->wsu_u ) { + _http_ws_detach_request(req_u); + } } break; case u3_rsat_peek: { @@ -1004,6 +1169,292 @@ _http_req_dispatch(u3_hreq* req_u, u3_noun req) } } +/* _http_ws_handshake(): handle websocket handshake request. +*/ +static void +_http_ws_handshake(u3_hreq* req_u, u3_noun req, const c3_c* client_key) +{ + // capture the owning HTTP server/device so we can reuse shared state + // + u3_http* htp_u = req_u->hon_u->htp_u; + u3_httd* htd_u = htp_u->htd_u; + + // websocket ids start at 1; initialize the counter lazily + // + if ( 0 == htd_u->wid_l ) { + htd_u->wid_l = 1; + } + + // allocate a websocket session, mark it pending, and record the + // identifiers needed to upgrade/respond later (server/connection/request) + // + u3_hws* web_u = c3_calloc(sizeof(*web_u)); + web_u->sat_e = u3_hws_pending; + web_u->wid_l = htd_u->wid_l++; + web_u->sev_l = htp_u->sev_l; + web_u->coq_l = req_u->hon_u->coq_l; + web_u->seq_l = req_u->seq_l; + web_u->req_u = req_u; + web_u->woc_u = 0; + + // stash the Sec-WebSocket-Key so h2o can finalize the upgrade later + // + ssize_t key_index = h2o_find_header_by_str(&req_u->rec_u->headers, + H2O_STRLIT("sec-websocket-key"), -1); + size_t key_len = ( -1 != key_index ) + ? req_u->rec_u->headers.entries[key_index].value.len + : 0; + if ( key_len >= sizeof(web_u->key_c) ) { + key_len = sizeof(web_u->key_c) - 1; + } + memcpy(web_u->key_c, client_key, key_len); + web_u->key_c[key_len] = 0; + + // link the session into the device list and note the pending plan on req + // + _http_ws_link(htd_u, web_u); + req_u->wsu_u = web_u; + req_u->sat_e = u3_rsat_plan; + + // build the `%http-server` duct: ["http-server" uv-id conn-id req-id] + // + u3_noun wir = _http_req_to_duct(req_u); + // peer IPv4 as `[c3__ipv4 (atom ip-address)]` + // + c3_w ipf_w = req_u->hon_u->ipf_w; + u3_noun adr = u3nc(c3__ipv4, u3i_words(1, &ipf_w)); + // + u3_noun dat = u3nt(htp_u->sec, adr, u3k(req)); + // + u3_noun pay = u3nc(u3i_chub((c3_d)web_u->wid_l), dat); + // final ovum: ["websocket-handshake" pay], i.e. [%websocket-handshake websocket-id=@ secure=? =address:eyre =request:http] + // + u3_noun cad = u3nc(u3i_string("websocket-handshake"), pay); + + // enqueue the ovum for %eyre and drop the temporary request noun + // + u3_auto_plan(&htd_u->car_u, u3_ovum_init(0, c3__e, wir, cad)); + + u3z(req); +} + +/* _http_ws_accept(): accept websocket handshake. +*/ +static void +_http_ws_accept(u3_hws* web_u) +{ + if ( 0 == web_u || u3_hws_pending != web_u->sat_e ) { + return; + } + + u3_hreq* req_u = web_u->req_u; + if ( 0 == req_u ) { + return; + } + + if ( 0 != req_u->tim_u ) { + uv_timer_stop(req_u->tim_u); + } + + req_u->sat_e = u3_rsat_ripe; + + // u3l_log("http: ws accept wid=%u req=%p rec=%p", web_u->wid_l, (void*)req_u, (void*)req_u->rec_u); + + // guard against h2o freeing the request while we upgrade; if that happens, + // _http_req_close() will no longer tear down the websocket session. + web_u->req_u = 0; + + web_u->woc_u = h2o_upgrade_to_websocket(req_u->rec_u, + web_u->key_c, + web_u, + _http_ws_message_cb); + + if ( 0 == web_u->woc_u ) { + c3_c* msg_c = "websocket upgrade failed"; + h2o_send_error_generic(req_u->rec_u, 500, msg_c, msg_c, 0); + web_u->req_u = req_u; + req_u->wsu_u = 0; + web_u->sat_e = u3_hws_closed; + _http_ws_unlink(web_u); + c3_free(web_u); + return; + } + + web_u->sat_e = u3_hws_open; + req_u->wsu_u = web_u; + + // u3l_log("http: ws proceed scheduled wid=%u conn=%p", web_u->wid_l, (void*)web_u->woc_u); + + /* h2o will invoke h2o_websocket_proceed() once the HTTP upgrade completes + * (see on_complete in lib/websocket.c). Calling it here would dereference a + * null sock before the upgrade is finalized. + */ +} + +/* _http_ws_reject(): reject websocket handshake. +*/ +static void +_http_ws_reject(u3_hws* web_u) +{ + if ( 0 == web_u || u3_hws_pending != web_u->sat_e ) { + return; + } + + web_u->sat_e = u3_hws_closed; + + if ( 0 != web_u->req_u ) { + u3_hreq* req_u = web_u->req_u; + if ( 0 != req_u->tim_u ) { + uv_timer_stop(req_u->tim_u); + } + req_u->sat_e = u3_rsat_ripe; + req_u->wsu_u = 0; + + c3_c* msg_c = "websocket rejected"; + h2o_send_error_generic(req_u->rec_u, 403, msg_c, msg_c, 0); + } + + _http_ws_unlink(web_u); + c3_free(web_u); +} + +/* _http_ws_disconnect(): close websocket connection. +*/ +static void +_http_ws_disconnect(u3_hws* web_u) +{ + if ( 0 == web_u ) { + return; + } + + if ( u3_hws_pending == web_u->sat_e ) { + _http_ws_reject(web_u); + return; + } + + if ( (u3_hws_open == web_u->sat_e) && (0 != web_u->woc_u) ) { + web_u->sat_e = u3_hws_closing; + h2o_websocket_close(web_u->woc_u); + } +} + +/* _http_ws_send_message(): serialize and send websocket message to client. +*/ +static c3_o +_http_ws_send_message(u3_hws* web_u, u3_noun msg) +{ + if ( 0 == web_u || u3_hws_open != web_u->sat_e || 0 == web_u->woc_u ) { + u3z(msg); + return c3n; + } + + u3_noun opcode = u3h(msg); + u3_noun body = u3t(msg); + + c3_w opc_w; + if ( c3n == u3r_safe_word(opcode, &opc_w) ) { + u3l_log("http: bad websocket opcode"); + u3z(msg); + return c3n; + } + + c3_y* buf_y = 0; + size_t len_w = 0; + + if ( u3_nul != body ) { + if ( u3_nul != u3h(body) ) { + u3l_log("http: malformed websocket message body"); + u3z(msg); + return c3n; + } + + u3_noun oct = u3t(body); + c3_d len_d = u3r_chub(0, u3h(oct)); + if ( len_d > SIZE_MAX ) { + u3l_log("http: websocket message too large"); + u3z(msg); + return c3n; + } + len_w = (size_t)len_d; + + if ( 0 != len_w ) { + buf_y = c3_malloc(len_w); + u3r_bytes(0, len_w, buf_y, u3t(oct)); + } + } + + struct wslay_event_msg out = { + .opcode = (uint8_t)(opc_w & 0xFF), + .msg = buf_y, + .msg_length = len_w, + }; + + int sas_i = wslay_event_queue_msg(web_u->woc_u->ws_ctx, &out); + if ( 0 != sas_i ) { + u3l_log("http: websocket queue failed (%d)", sas_i); + if ( buf_y ) { + c3_free(buf_y); + } + u3z(msg); + return c3n; + } + + if ( buf_y ) { + c3_free(buf_y); + } + + h2o_websocket_proceed(web_u->woc_u); + u3z(msg); + return c3y; +} + +/* _http_ws_message_cb(): websocket receive / close callback. +*/ +static void +_http_ws_message_cb(h2o_websocket_conn_t *conn, + const struct wslay_event_on_msg_recv_arg *arg) +{ + u3_hws* web_u = (u3_hws*)conn->data; + + if ( 0 == web_u ) { + return; + } + + if ( NULL == arg ) { + if ( u3_hws_closed != web_u->sat_e ) { + web_u->sat_e = u3_hws_closed; + _http_ws_plan_event(web_u, u3nc(u3i_string("disconnect"), u3_nul)); + } + + _http_ws_unlink(web_u); + c3_free(web_u); + return; + } + + if ( WSLAY_CONNECTION_CLOSE == arg->opcode ) { + if ( u3_hws_closed != web_u->sat_e ) { + web_u->sat_e = u3_hws_closed; + _http_ws_plan_event(web_u, u3nc(u3i_string("disconnect"), u3_nul)); + } + return; + } + + u3_noun payload; + if ( 0 == arg->msg_length ) { + payload = u3_nul; + } + else { + u3_noun octs = u3nc(u3i_chub((c3_d)arg->msg_length), + u3i_bytes(arg->msg_length, (const c3_y*)arg->msg)); + payload = u3nc(u3_nul, octs); + } + + u3_noun event = u3nc(u3i_string("message"), + u3nc(u3i_chub((c3_d)arg->opcode), payload)); + + _http_ws_plan_event(web_u, event); +} + /* _http_cache_respond(): respond with a simple-payload:http */ static void @@ -1389,6 +1840,10 @@ _http_cancel_respond(u3_hreq* req_u) c3_c* msg_c = "hosed"; h2o_send_error_generic(req_u->rec_u, 500, msg_c, msg_c, 0); + + if ( 0 != req_u->wsu_u ) { + _http_ws_detach_request(req_u); + } } break; case u3_rsat_ripe: { @@ -1549,8 +2004,29 @@ _http_rec_accept(h2o_handler_t* han_u, h2o_req_t* rec_u) } else { u3_hreq* req_u = _http_req_prepare(rec_u, _http_req_new); - if ( c3n == _http_req_cache(req_u) ) { - _http_req_dispatch(req_u, req); + const c3_c* client_key = 0; + c3_i ws_rc = h2o_is_websocket_handshake(rec_u, &client_key); + + if ( (0 == ws_rc) && (0 != client_key) ) { + _http_ws_handshake(req_u, req, client_key); + } + else { + if ( ws_rc < 0 ) { + if ( 0 != req_u->tim_u ) { + uv_timer_stop(req_u->tim_u); + } + req_u->sat_e = u3_rsat_ripe; + u3l_log("http: invalid websocket handshake"); + c3_c* msg_c = "bad websocket handshake"; + h2o_send_error_generic(rec_u, 400, msg_c, msg_c, 0); + u3z(req); + } + else if ( c3n == _http_req_cache(req_u) ) { + _http_req_dispatch(req_u, req); + } + else { + u3z(req); + } } } @@ -2460,6 +2936,8 @@ _http_form_free(u3_httd* htd_u) return; } + _http_ws_close_all(htd_u); + if ( 0 != for_u->key_u.base ) { c3_free(for_u->key_u.base); } @@ -2570,6 +3048,7 @@ _http_ef_http_server(u3_httd* htd_u, u3_noun dat) { u3_hreq* req_u; + u3_hws* web_u; // sets server configuration // @@ -2589,15 +3068,57 @@ _http_ef_http_server(u3_httd* htd_u, } // responds to an open request // + else if ( c3y == u3r_sing_c("websocket-response", tag) ) { + u3_noun wid = u3k(u3h(dat)); + u3_noun res = u3k(u3t(dat)); + + c3_w wid_w; + if ( c3n == u3r_safe_word(wid, &wid_w) ) { + u3l_log("http: invalid websocket id"); + } + else if ( 0 == (web_u = _http_ws_find(htd_u, wid_w)) ) { + u3l_log("http: unknown websocket id %u", wid_w); + } + else { + u3_noun typ = u3h(res); + + if ( c3y == u3r_sing_c("accept", typ) ) { + _http_ws_accept(web_u); + } + else if ( c3y == u3r_sing_c("reject", typ) ) { + _http_ws_reject(web_u); + } + else if ( c3y == u3r_sing_c("disconnect", typ) ) { + _http_ws_disconnect(web_u); + } + else if ( c3y == u3r_sing_c("message", typ) ) { + _http_ws_send_message(web_u, u3k(u3t(res))); + } + else { + u3l_log("http: unexpected websocket response"); + } + } + + u3z(wid); + u3z(res); + } else if ( 0 != (req_u = _http_search_req(htd_u, sev_l, coq_l, seq_l)) ) { if ( c3y == u3r_sing_c("response", tag) ) { u3_noun response = dat; - if ( c3y == u3r_sing_c("start", u3h(response)) ) { - // Separate the %start message into its components. - // - u3_noun response_header, data, complete; - u3_noun status, headers; + if ( 0 != req_u->wsu_u && (u3C.wag_w & u3o_verbose) ) { + u3l_log("http: ws pending got http response sev=%u coq=%u seq=%u wid=%u", + (c3_w)sev_l, + (c3_w)coq_l, + (c3_w)seq_l, + req_u->wsu_u->wid_l); + } + + if ( c3y == u3r_sing_c("start", u3h(response)) ) { + // Separate the %start message into its components. + // + u3_noun response_header, data, complete; + u3_noun status, headers; u3x_trel(u3t(response), &response_header, &data, &complete); u3x_cell(response_header, &status, &headers); @@ -2790,6 +3311,8 @@ _http_io_exit(u3_auto* car_u) { u3_httd* htd_u = (u3_httd*)car_u; + _http_ws_close_all(htd_u); + u3h_free(htd_u->sax_p); u3h_free(htd_u->nax_p); @@ -2882,6 +3405,8 @@ u3_http_io_init(u3_pier* pir_u) u3_httd* htd_u = c3_calloc(sizeof(*htd_u)); htd_u->sax_p = u3h_new(); htd_u->nax_p = u3h_new_cache(512); + htd_u->web_u = 0; + htd_u->wid_l = 1; { u3_noun key = u3dt("cat", 3,