diff --git a/src/subscribers/common.c b/src/subscribers/common.c index 611d9421..50720fb0 100644 --- a/src/subscribers/common.c +++ b/src/subscribers/common.c @@ -287,7 +287,7 @@ void nchan_subscriber_timeout_ev_handler(ngx_event_t *ev) { #if FAKESHARD memstore_fakeprocess_push(sub->owner); #endif - sub->dequeue_after_response = 1; + //sub->dequeue_after_response = 1; //see https://github.com/slact/nchan/pull/591 sub->fn->respond_status(sub, NGX_HTTP_REQUEST_TIMEOUT, &NCHAN_HTTP_STATUS_408, NULL); #if FAKESHARD memstore_fakeprocess_pop(); diff --git a/src/subscribers/internal.c b/src/subscribers/internal.c index 8f4a8a21..2f0ccf05 100644 --- a/src/subscribers/internal.c +++ b/src/subscribers/internal.c @@ -212,7 +212,7 @@ static ngx_int_t internal_respond_status(subscriber_t *self, ngx_int_t status_co internal_subscriber_t *f = (internal_subscriber_t *)self; DBG("%p status %i", self, status_code); if(status_code == NGX_HTTP_GONE) { - self->dequeue_after_response = 1; + //self->dequeue_after_response = 1; //see https://github.com/slact/nchan/pull/591 } f->respond_status(status_code, (void *)status_line, f->privdata); reset_timer(f); diff --git a/src/subscribers/websocket.c b/src/subscribers/websocket.c index 3e395f48..34055686 100644 --- a/src/subscribers/websocket.c +++ b/src/subscribers/websocket.c @@ -671,10 +671,10 @@ static ngx_int_t websocket_publish(full_subscriber_t *fsub, ngx_buf_t *buf, int //move the msg pool d->pool = fsub->publisher.msg_pool; d->msgbuf = buf; + d->subrequest = NULL; fsub->publisher.msg_pool = NULL; if(fsub->publisher.intercept || fsub->publisher.upstream_request_url == NULL) { // don't need to send request upstream - d->subrequest = NULL; websocket_publish_continue(d); } else { @@ -1086,7 +1086,10 @@ static ngx_int_t websocket_perform_handshake(full_subscriber_t *fsub) { static void websocket_reading(ngx_http_request_t *r); -static ngx_buf_t *websocket_inflate_message(full_subscriber_t *fsub, ngx_buf_t *msgbuf, ngx_pool_t *pool) { +static ngx_buf_t *websocket_inflate_message(full_subscriber_t *fsub, ngx_buf_t *msgbuf, ngx_pool_t *pool, uint64_t max, int *result) { + +*result = 0; + #if (NGX_ZLIB) z_stream *strm; int rc; @@ -1115,7 +1118,7 @@ static ngx_buf_t *websocket_inflate_message(full_subscriber_t *fsub, ngx_buf_t * strm = fsub->deflate.zstream_in; - outbuf = nchan_inflate(strm, msgbuf, fsub->sub.request, pool); + outbuf = nchan_inflate(strm, msgbuf, fsub->sub.request, pool, max, result); return outbuf; #else return NULL; @@ -1304,6 +1307,8 @@ static void websocket_reading(ngx_http_request_t *r) { ngx_connection_t *c; ngx_buf_t *msgbuf, buf; //ngx_str_t msg_in_str; + ngx_http_core_loc_conf_t *clcf; + int result; retry: ctx = ngx_http_get_module_ctx(r, ngx_nchan_module); fsub = (full_subscriber_t *)ctx->sub; @@ -1468,8 +1473,15 @@ static void websocket_reading(ngx_http_request_t *r) { return websocket_reading_finalize(r); } - //TODO: check max websocket message length + clcf = ngx_http_get_module_loc_conf(ctx->sub->request, ngx_http_core_module); + if(frame->payload == NULL) { + if(clcf->client_max_body_size && (uint64_t)clcf->client_max_body_size < frame->payload_len) { + websocket_send_close_frame_cstr(fsub, CLOSE_POLICY_VIOLATION, "Message too large."); + ws_destroy_msgpool(fsub); + fsub->publisher.msg_pool = NULL; + return websocket_reading_finalize(r); + } if(ws_get_msgpool(fsub) == NULL) { ERR("failed to get msgpool"); websocket_send_close_frame(fsub, CLOSE_INTERNAL_SERVER_ERROR, NULL); @@ -1486,6 +1498,13 @@ static void websocket_reading(ngx_http_request_t *r) { } set_buffer(&buf, frame->payload, frame->last, frame->payload_len); + if(clcf->client_max_body_size && clcf->client_max_body_size < ngx_buf_size(&buf)) { + websocket_send_close_frame_cstr(fsub, CLOSE_POLICY_VIOLATION, "Message too large."); + ws_destroy_msgpool(fsub); + fsub->publisher.msg_pool = NULL; + return websocket_reading_finalize(r); + } + if (frame->payload_len > 0 && (rc = ws_recv(c, rev, &buf, frame->payload_len)) != NGX_OK) { DBG("ws_recv NOT OK when receiving payload, but that's ok"); @@ -1513,7 +1532,12 @@ static void websocket_reading(ngx_http_request_t *r) { //inflate message if needed if(fsub->deflate.enabled && frame->rsv1) { - if((msgbuf = websocket_inflate_message(fsub, msgbuf, ws_get_msgpool(fsub))) == NULL) { + if((msgbuf = websocket_inflate_message(fsub, msgbuf, ws_get_msgpool(fsub), (uint64_t)clcf->client_max_body_size, &result)) == NULL) { + if(result == -1) { + websocket_send_close_frame_cstr(fsub, CLOSE_POLICY_VIOLATION, "Message too large."); + ws_destroy_msgpool(fsub); + return websocket_reading_finalize(r); + } websocket_send_close_frame_cstr(fsub, CLOSE_INVALID_PAYLOAD, "Invalid permessage-deflate data"); ws_destroy_msgpool(fsub); return websocket_reading_finalize(r); @@ -1577,7 +1601,7 @@ static void websocket_reading(ngx_http_request_t *r) { static ngx_flag_t is_utf8(ngx_buf_t *buf) { - u_char *p; + u_char *p, *op; size_t n; u_char c, *last; @@ -1599,6 +1623,7 @@ static ngx_flag_t is_utf8(ngx_buf_t *buf) { } last = p + n; + op = p; for (len = 0; p < last; len++) { c = *p; @@ -1611,13 +1636,13 @@ static ngx_flag_t is_utf8(ngx_buf_t *buf) { if (ngx_utf8_decode(&p, last - p) > 0x10ffff) { /* invalid UTF-8 */ if(mmapped) { - munmap(p, n); + munmap(op, n); } return 0; } } if(mmapped) { - munmap(p, n); + munmap(op, n); } return 1; } diff --git a/src/util/nchan_fake_request.c b/src/util/nchan_fake_request.c index 62151de4..f17e3c72 100644 --- a/src/util/nchan_fake_request.c +++ b/src/util/nchan_fake_request.c @@ -374,6 +374,14 @@ static void fakerequest_cleanup_timer_handler(ngx_event_t *ev) { nchan_finalize_fake_request(d->r, NGX_OK); } +//see https://github.com/slact/nchan/pull/591 +typedef struct { + void *fsub; + ngx_pool_t *pool; + ngx_buf_t *msgbuf; + nchan_fakereq_subrequest_data_t *subrequest; +} ws_publish_data_stub_t; + nchan_fakereq_subrequest_data_t *nchan_requestmachine_request(nchan_requestmachine_t *rm, nchan_requestmachine_request_params_t *params) { nchan_fakereq_subrequest_data_t *d; ngx_pool_t *pool = params->pool; @@ -468,7 +476,7 @@ nchan_fakereq_subrequest_data_t *nchan_requestmachine_request(nchan_requestmachi fakebody_buf->last_buf = 1; fakebody_buf->last_in_chain = 1; fakebody_buf->flush = 1; - fakebody_buf->memory = 1; + //fakebody_buf->memory = 1; //why were file-cached requests (over 16KB) disabled? nchan_adjust_subrequest(sr, NGX_HTTP_POST, &POST_REQUEST_STRING, sr_body, sz); } @@ -482,6 +490,11 @@ nchan_fakereq_subrequest_data_t *nchan_requestmachine_request(nchan_requestmachi nchan_slist_append(&rm->request_queue, d); + //see https://github.com/slact/nchan/pull/591 + ws_publish_data_stub_t *pd; + pd = (ws_publish_data_stub_t*)(params->pd); + if(pd) pd->subrequest = d; + nchan_requestmachine_run(rm); return d; } diff --git a/src/util/nchan_output.c b/src/util/nchan_output.c index 1f620280..48973319 100644 --- a/src/util/nchan_output.c +++ b/src/util/nchan_output.c @@ -290,6 +290,11 @@ static ngx_int_t nchan_output_filter_generic(ngx_http_request_t *r, nchan_msg_t if(r->out == NULL) { if(ctx) { flush_all_the_reserved_things(ctx); + + //Fix crash when using X-Accel-Redirect with message forwarding + //See https://github.com/slact/nchan/pull/591 + //There may be a better way to do this... + ngx_http_set_ctx(r->main, NULL, ngx_http_charset_filter_module); } } diff --git a/src/util/nchan_output.h b/src/util/nchan_output.h index d465de82..07fcccc4 100644 --- a/src/util/nchan_output.h +++ b/src/util/nchan_output.h @@ -28,3 +28,5 @@ ngx_int_t nchan_msg_buf_open_fd_if_needed(ngx_buf_t *buf, ngx_file_t *file, ngx_ ngx_str_t *msgtag_to_str(nchan_msg_id_t *id); ngx_str_t *msgid_to_str(nchan_msg_id_t *id); size_t msgtag_to_strptr(nchan_msg_id_t *id, char *ch); + +extern ngx_module_t ngx_http_charset_filter_module; diff --git a/src/util/nchan_util.c b/src/util/nchan_util.c index f675381e..06fcc0a8 100644 --- a/src/util/nchan_util.c +++ b/src/util/nchan_util.c @@ -964,7 +964,7 @@ ngx_buf_t *nchan_common_deflate(ngx_buf_t *in, ngx_http_request_t *r, ngx_pool_t return out; } -ngx_buf_t *nchan_inflate(z_stream *stream, ngx_buf_t *in, ngx_http_request_t *r, ngx_pool_t *pool) { +ngx_buf_t *nchan_inflate(z_stream *stream, ngx_buf_t *in, ngx_http_request_t *r, ngx_pool_t *pool, uint64_t max, int *result) { ngx_str_t mm_instr = {0, NULL}; int mmapped = 0; ngx_temp_file_t *tf = NULL; @@ -975,6 +975,8 @@ ngx_buf_t *nchan_inflate(z_stream *stream, ngx_buf_t *in, ngx_http_request_t *r, unsigned have = 0; off_t written = 0; int trailer_appended = 0; + + *result = 0; //input if(ngx_buf_in_memory(in)) { @@ -1018,6 +1020,7 @@ ngx_buf_t *nchan_inflate(z_stream *stream, ngx_buf_t *in, ngx_http_request_t *r, } have = ZLIB_CHUNK - stream->avail_out; + if((uint64_t)have + written > max) break; if(stream->avail_out == 0 && tf == NULL) { //if we filled up the buffer, let's start dumping to a file. @@ -1033,6 +1036,11 @@ ngx_buf_t *nchan_inflate(z_stream *stream, ngx_buf_t *in, ngx_http_request_t *r, munmap(mm_instr.data, mm_instr.len); } + if((uint64_t)have + written > max) { + *result = -1; + deflateReset(deflate_zstream); + return NULL; + } if((out = ngx_palloc(pool, sizeof(*out))) == NULL) { nchan_log_request_error(r, "failed to allocate output buf for deflated message"); deflateReset(deflate_zstream); diff --git a/src/util/nchan_util.h b/src/util/nchan_util.h index 3dd2358a..5b132a41 100644 --- a/src/util/nchan_util.h +++ b/src/util/nchan_util.h @@ -70,7 +70,7 @@ ngx_int_t nchan_common_deflate_init(nchan_main_conf_t *mcf); ngx_buf_t *nchan_common_deflate(ngx_buf_t *in, ngx_http_request_t *r, ngx_pool_t *pool); ngx_int_t nchan_common_simple_deflate_raw_block(ngx_str_t *in, ngx_str_t *out); ngx_int_t nchan_common_simple_deflate(ngx_str_t *in, ngx_str_t *out); -ngx_buf_t *nchan_inflate(z_stream *stream, ngx_buf_t *in, ngx_http_request_t *r, ngx_pool_t *pool); +ngx_buf_t *nchan_inflate(z_stream *stream, ngx_buf_t *in, ngx_http_request_t *r, ngx_pool_t *pool, uint64_t max, int *result); uint64_t nchan_htonll(uint64_t value); uint64_t nchan_ntohll(uint64_t value);