Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/async/async_post.ml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ let main port host () =
don't_wait_for (
Reader.read_one_chunk_at_a_time stdin ~handle_chunk:(fun bs ~pos:off ~len ->
Body.Writer.write_bigstring request_body bs ~off ~len;
Body.Writer.flush request_body (fun () -> ());
Body.Writer.flush request_body Fn.ignore;
return (`Consumed(len, `Need_unknown)))
>>| function
| `Eof_with_unconsumed_data s -> Body.Writer.write_string request_body s;
Expand Down
111 changes: 69 additions & 42 deletions lib/body.ml
Original file line number Diff line number Diff line change
Expand Up @@ -102,53 +102,74 @@ module Reader = struct
end

module Writer = struct
module Writer = Serialize.Writer

type encoding =
| Identity
| Chunked of { mutable written_final_chunk : bool }

type t =
{ faraday : Faraday.t
; encoding : encoding
; when_ready_to_write : unit -> unit
; buffered_bytes : int ref
{ faraday : Faraday.t
; writer : Writer.t
; encoding : encoding
; buffered_bytes : int ref
}

let of_faraday faraday ~encoding ~when_ready_to_write =
let of_faraday faraday writer ~encoding =
let encoding =
match encoding with
| `Fixed _ | `Close_delimited -> Identity
| `Chunked -> Chunked { written_final_chunk = false }
in
{ faraday
; encoding
; when_ready_to_write
; writer
; buffered_bytes = ref 0
}

let create buffer ~encoding ~when_ready_to_write =
of_faraday (Faraday.of_bigstring buffer) ~encoding ~when_ready_to_write
let create buffer writer ~encoding =
of_faraday (Faraday.of_bigstring buffer) writer ~encoding

let write_char t c =
Faraday.write_char t.faraday c
if not (Faraday.is_closed t.faraday) then
Faraday.write_char t.faraday c

let write_string t ?off ?len s =
Faraday.write_string ?off ?len t.faraday s
if not (Faraday.is_closed t.faraday) then
Faraday.write_string ?off ?len t.faraday s

let write_bigstring t ?off ?len b =
Faraday.write_bigstring ?off ?len t.faraday b
if not (Faraday.is_closed t.faraday) then
Faraday.write_bigstring ?off ?len t.faraday b

let schedule_bigstring t ?off ?len (b:Bigstringaf.t) =
Faraday.schedule_bigstring ?off ?len t.faraday b
if not (Faraday.is_closed t.faraday) then
Faraday.schedule_bigstring ?off ?len t.faraday b

let ready_to_write t = t.when_ready_to_write ()
let ready_to_write t = Writer.wakeup t.writer

let flush t kontinue =
Faraday.flush t.faraday kontinue;
ready_to_write t
if Writer.is_closed t.writer then
kontinue `Closed
else begin
Faraday.flush_with_reason t.faraday (fun reason ->
let result =
match reason with
| Nothing_pending | Shift -> `Written
| Drain -> `Closed
in
kontinue result);
ready_to_write t
end

let is_closed t =
Faraday.is_closed t.faraday

let close_and_drain t =
Faraday.close t.faraday;
(* Resolve all pending flushes *)
ignore (Faraday.drain t.faraday : int)

let close t =
Faraday.close t.faraday;
ready_to_write t;
Expand All @@ -166,33 +187,39 @@ module Writer = struct
in
faraday_has_output || additional_encoding_output

let transfer_to_writer t writer =
let transfer_to_writer t =
let faraday = t.faraday in
begin match Faraday.operation faraday with
| `Yield -> ()
| `Close ->
(match t.encoding with
| Identity -> ()
| Chunked ({ written_final_chunk } as chunked) ->
if not written_final_chunk then begin
chunked.written_final_chunk <- true;
Serialize.Writer.schedule_chunk writer [];
end);
Serialize.Writer.unyield writer;
| `Writev iovecs ->
let buffered = t.buffered_bytes in
begin match IOVec.shiftv iovecs !buffered with
| [] -> ()
| iovecs ->
let lengthv = IOVec.lengthv iovecs in
buffered := !buffered + lengthv;
begin match t.encoding with
| Identity -> Serialize.Writer.schedule_fixed writer iovecs
| Chunked _ -> Serialize.Writer.schedule_chunk writer iovecs
end;
Serialize.Writer.flush writer (fun () ->
Faraday.shift faraday lengthv;
buffered := !buffered - lengthv)
end
if Writer.is_closed t.writer then
close_and_drain t
else begin
match Faraday.operation faraday with
| `Yield -> ()
| `Close ->
(match t.encoding with
| Identity -> ()
| Chunked ({ written_final_chunk } as chunked) ->
if not written_final_chunk then begin
chunked.written_final_chunk <- true;
Serialize.Writer.schedule_chunk t.writer [];
end);
Serialize.Writer.unyield t.writer;
| `Writev iovecs ->
let buffered = t.buffered_bytes in
begin match IOVec.shiftv iovecs !buffered with
| [] -> ()
| iovecs ->
let lengthv = IOVec.lengthv iovecs in
buffered := !buffered + lengthv;
begin match t.encoding with
| Identity -> Serialize.Writer.schedule_fixed t.writer iovecs
| Chunked _ -> Serialize.Writer.schedule_chunk t.writer iovecs
end;
Serialize.Writer.flush t.writer (fun result ->
match result with
| `Closed -> close_and_drain t
| `Written ->
Faraday.shift faraday lengthv;
buffered := !buffered - lengthv)
end
end
end
6 changes: 3 additions & 3 deletions lib/client_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ module Oneshot = struct
| `Error `Bad_request ->
failwith "Httpaf.Client_connection.request: invalid body length"
in
Body.Writer.create (Bigstringaf.create config.request_body_buffer_size)
~encoding ~when_ready_to_write:(fun () -> Writer.wakeup writer)
Body.Writer.create (Bigstringaf.create config.request_body_buffer_size) writer
~encoding
in
let t =
{ request
Expand All @@ -89,7 +89,7 @@ module Oneshot = struct

let flush_request_body t =
if Body.Writer.has_pending_output t.request_body
then Body.Writer.transfer_to_writer t.request_body t.writer
then Body.Writer.transfer_to_writer t.request_body
;;

let set_error_and_handle_without_shutdown t error =
Expand Down
14 changes: 8 additions & 6 deletions lib/httpaf.mli
Original file line number Diff line number Diff line change
Expand Up @@ -497,10 +497,11 @@ module Body : sig
modified until a subsequent call to {!flush} has successfully
completed. *)

val flush : t -> (unit -> unit) -> unit
(** [flush t f] makes all bytes in [t] available for writing to the awaiting
output channel. Once those bytes have reached that output channel, [f]
will be called.
val flush : t -> ([ `Written | `Closed ] -> unit) -> unit
(** [flush t f] makes all bytes in [t] available for writing to the awaiting output
channel. Once those bytes have reached that output channel, [f `Written] will be
called. If instead, the output channel is closed before all of those bytes are
successfully written, [f `Closed] will be called.

The type of the output channel is runtime-dependent, as are guarantees
about whether those packets have been queued for delivery or have
Expand All @@ -512,8 +513,9 @@ module Body : sig
to the output channel. *)

val is_closed : t -> bool
(** [is_closed t] is [true] if {!close} has been called on [t] and [false]
otherwise. A closed [t] may still have pending output. *)
(** [is_closed t] is [true] if {!close} has been called on [t], or if the attached
output channel is closed (e.g. because [report_write_result `Closed] has been
called). A closed [t] may still have pending output. *)
end

end
Expand Down
8 changes: 2 additions & 6 deletions lib/reqd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,7 @@ let unsafe_respond_with_streaming ~flush_headers_immediately t response =
| `Error (`Bad_gateway | `Internal_server_error) ->
failwith "httpaf.Reqd.respond_with_streaming: invalid response body length"
in
let response_body =
Body.Writer.create t.response_body_buffer ~encoding ~when_ready_to_write:(fun () ->
Writer.wakeup t.writer)
in
let response_body = Body.Writer.create t.response_body_buffer t.writer ~encoding in
Writer.write_response t.writer response;
if t.persistent then
t.persistent <- Response.persistent_connection response;
Expand Down Expand Up @@ -256,6 +253,5 @@ let flush_request_body t =

let flush_response_body t =
match t.response_state with
| Streaming (_, response_body) ->
Body.Writer.transfer_to_writer response_body t.writer
| Streaming (_, response_body) -> Body.Writer.transfer_to_writer response_body
| _ -> ()
28 changes: 17 additions & 11 deletions lib/serialize.ml
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,18 @@ let schedule_bigstring_chunk t chunk =
module Writer = struct
type t =
{ buffer : Bigstringaf.t
(* The buffer that the encoder uses for buffered writes. Managed by the
* control module for the encoder. *)
(* The buffer that the encoder uses for buffered writes. Managed by the
* control module for the encoder. *)
; encoder : Faraday.t
(* The encoder that handles encoding for writes. Uses the [buffer]
* referenced above internally. *)
(* The encoder that handles encoding for writes. Uses the [buffer]
* referenced above internally. *)
; mutable drained_bytes : int
(* The number of bytes that were not written due to the output stream
* being closed before all buffered output could be written. Useful for
* detecting error cases. *)
(* The number of bytes that were not written due to the output stream
* being closed before all buffered output could be written. Useful for
* detecting error cases. *)
; mutable wakeup : Optional_thunk.t
(* The callback from the runtime to be invoked when output is ready to be
* flushed. *)
(* The callback from the runtime to be invoked when output is ready to be
* flushed. *)
}

let create ?(buffer_size=0x800) () =
Expand Down Expand Up @@ -158,13 +158,19 @@ module Writer = struct
;;

let flush t f =
flush t.encoder f
flush_with_reason t.encoder (fun reason ->
let result =
match reason with
| Nothing_pending | Shift -> `Written
| Drain -> `Closed
in
f result)

let unyield t =
(* This would be better implemented by a function that just takes the
encoder out of a yielded state if it's in that state. Requires a change
to the faraday library. *)
flush t (fun () -> ())
flush t (fun _result -> ())

let yield t =
Faraday.yield t.encoder
Expand Down
3 changes: 1 addition & 2 deletions lib/server_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,7 @@ let set_error_and_handle ?request t error =
| `Error (`Bad_gateway | `Internal_server_error) ->
failwith "httpaf.Server_connection.error_handler: invalid response body length"
in
Body.Writer.of_faraday (Writer.faraday writer) ~encoding
~when_ready_to_write:(fun () -> Writer.wakeup writer));
Body.Writer.of_faraday (Writer.faraday writer) writer ~encoding);
end

let report_exn t exn =
Expand Down
Loading