From 926770c54f8d07dbeee31ddd1efa390f6bd3293e Mon Sep 17 00:00:00 2001 From: Vandern Rodrigues Date: Thu, 7 Aug 2025 22:23:08 +0100 Subject: [PATCH 01/22] create stream_decoder module --- lib/msgpack/stream_decoder.ex | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 lib/msgpack/stream_decoder.ex diff --git a/lib/msgpack/stream_decoder.ex b/lib/msgpack/stream_decoder.ex new file mode 100644 index 0000000..d0f1bb5 --- /dev/null +++ b/lib/msgpack/stream_decoder.ex @@ -0,0 +1,3 @@ +defmodule Msgpack.StreamDecoder do + +end From 1bf6855a60bb87f8ae71df456cf16e5b1f83e9b0 Mon Sep 17 00:00:00 2001 From: Vandern Rodrigues Date: Thu, 7 Aug 2025 23:20:20 +0100 Subject: [PATCH 02/22] refactor decode to use default_options/0 instead of module attributes --- lib/msgpack/decoder.ex | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/lib/msgpack/decoder.ex b/lib/msgpack/decoder.ex index c76e037..a132809 100644 --- a/lib/msgpack/decoder.ex +++ b/lib/msgpack/decoder.ex @@ -3,18 +3,12 @@ defmodule Msgpack.Decoder do Handles the logic of decoding a MessagePack binary into an Elixir term. """ - @default_max_depth 100 - @default_max_byte_size 10_000_000 # 10MB - # The number of gregorian seconds from year 0 to the Unix epoch. This is a constant. @epoch_offset :calendar.datetime_to_gregorian_seconds({{1970, 1, 1}, {0, 0, 0}}) @spec decode(binary(), keyword()) :: {:ok, term()} | {:error, term()} def decode(binary, opts \\ []) do - merged_opts = - opts - |> Keyword.put_new(:max_depth, @default_max_depth) - |> Keyword.put_new(:max_byte_size, @default_max_byte_size) + merged_opts = Keyword.merge(default_options(), opts) try do case do_decode(binary, merged_opts) do @@ -33,6 +27,16 @@ defmodule Msgpack.Decoder do end end + @doc """ + Returns a keyword list of the default options for the decoder. + """ + def default_options() do + [ + max_depth: 100, + max_byte_size: 10_000_000 # 10MB + ] + end + # ==== Nil ==== defp do_decode(<<0xC0, rest::binary>>, _opts), do: {:ok, {nil, rest}} From 42c1443cbc8e164af32f6f28a9eb8b139148746c Mon Sep 17 00:00:00 2001 From: Vandern Rodrigues Date: Thu, 7 Aug 2025 23:46:25 +0100 Subject: [PATCH 03/22] move internal decoding logic into separate module --- lib/msgpack/decoder.ex | 243 +------------------------------- lib/msgpack/decoder/internal.ex | 242 +++++++++++++++++++++++++++++++ 2 files changed, 244 insertions(+), 241 deletions(-) create mode 100644 lib/msgpack/decoder/internal.ex diff --git a/lib/msgpack/decoder.ex b/lib/msgpack/decoder.ex index a132809..bfecf1e 100644 --- a/lib/msgpack/decoder.ex +++ b/lib/msgpack/decoder.ex @@ -3,15 +3,14 @@ defmodule Msgpack.Decoder do Handles the logic of decoding a MessagePack binary into an Elixir term. """ - # The number of gregorian seconds from year 0 to the Unix epoch. This is a constant. - @epoch_offset :calendar.datetime_to_gregorian_seconds({{1970, 1, 1}, {0, 0, 0}}) + alias Msgpack.Decoder.Internal @spec decode(binary(), keyword()) :: {:ok, term()} | {:error, term()} def decode(binary, opts \\ []) do merged_opts = Keyword.merge(default_options(), opts) try do - case do_decode(binary, merged_opts) do + case Internal.decode(binary, merged_opts) do {:ok, {term, <<>>}} -> {:ok, term} @@ -36,242 +35,4 @@ defmodule Msgpack.Decoder do max_byte_size: 10_000_000 # 10MB ] end - - # ==== Nil ==== - defp do_decode(<<0xC0, rest::binary>>, _opts), do: {:ok, {nil, rest}} - - # ==== Boolean ==== - defp do_decode(<<0xC3, rest::binary>>, _opts), do: {:ok, {true, rest}} - defp do_decode(<<0xC2, rest::binary>>, _opts), do: {:ok, {false, rest}} - - # ==== Integers ==== - # ==== Positive Fixint ==== - defp do_decode(<>, _opts) when int < 128 do - {:ok, {int, rest}} - end - - # ==== Negative Fixint ==== - defp do_decode(<>, _opts) when int >= -32 and int < 0 do - {:ok, {int, rest}} - end - - # ==== Unsigned Integers ==== - defp do_decode(<<0xCC, int::8, rest::binary>>, _opts), do: {:ok, {int, rest}} - defp do_decode(<<0xCD, int::16, rest::binary>>, _opts), do: {:ok, {int, rest}} - defp do_decode(<<0xCE, int::32, rest::binary>>, _opts), do: {:ok, {int, rest}} - defp do_decode(<<0xCF, int::64, rest::binary>>, _opts), do: {:ok, {int, rest}} - - # ==== Signed Integers ==== - defp do_decode(<<0xD0, int::signed-8, rest::binary>>, _opts), do: {:ok, {int, rest}} - defp do_decode(<<0xD1, int::signed-16, rest::binary>>, _opts), do: {:ok, {int, rest}} - defp do_decode(<<0xD2, int::signed-32, rest::binary>>, _opts), do: {:ok, {int, rest}} - defp do_decode(<<0xD3, int::signed-64, rest::binary>>, _opts), do: {:ok, {int, rest}} - - # ==== Floats ==== - defp do_decode(<<0xCA, float::float-32, rest::binary>>, _opts), do: {:ok, {float, rest}} - defp do_decode(<<0xCB, float::float-64, rest::binary>>, _opts), do: {:ok, {float, rest}} - - # ==== Strings ==== - defp do_decode(<>, opts) when prefix >= 0xA0 and prefix <= 0xBF do - size = prefix - 0xA0 - decode_string(rest, size, opts) - end - - defp do_decode(<<0xD9, size::8, rest::binary>>, opts), do: decode_string(rest, size, opts) - defp do_decode(<<0xDA, size::16, rest::binary>>, opts), do: decode_string(rest, size, opts) - defp do_decode(<<0xDB, size::32, rest::binary>>, opts), do: decode_string(rest, size, opts) - - # ==== Raw Binary ==== - defp do_decode(<<0xC4, size::8, rest::binary>>, opts), do: decode_binary(rest, size, opts) - defp do_decode(<<0xC5, size::16, rest::binary>>, opts), do: decode_binary(rest, size, opts) - defp do_decode(<<0xC6, size::32, rest::binary>>, opts), do: decode_binary(rest, size, opts) - - # ==== Arrays ==== - defp do_decode(<>, opts) when prefix >= 0x90 and prefix <= 0x9F do - size = prefix - 0x90 - decode_array(rest, size, opts) - end - - defp do_decode(<<0xDC, size::16, rest::binary>>, opts), do: decode_array(rest, size, opts) - defp do_decode(<<0xDD, size::32, rest::binary>>, opts), do: decode_array(rest, size, opts) - - # ==== Maps ==== - defp do_decode(<>, opts) when prefix >= 0x80 and prefix <= 0x8F do - size = prefix - 0x80 - decode_map(rest, size, opts) - end - - defp do_decode(<<0xDE, size::16, rest::binary>>, opts), do: decode_map(rest, size, opts) - defp do_decode(<<0xDF, size::32, rest::binary>>, opts), do: decode_map(rest, size, opts) - - # ==== Extensions & Timestamps ==== - # ==== Fixext ==== - defp do_decode(<<0xD4, type::signed-8, data::binary-size(1), rest::binary>>, opts), - do: decode_ext(type, data, rest, opts) - - defp do_decode(<<0xD5, type::signed-8, data::binary-size(2), rest::binary>>, opts), - do: decode_ext(type, data, rest, opts) - - defp do_decode(<<0xD6, type::signed-8, data::binary-size(4), rest::binary>>, opts), - do: decode_ext(type, data, rest, opts) - - defp do_decode(<<0xD7, type::signed-8, data::binary-size(8), rest::binary>>, opts), - do: decode_ext(type, data, rest, opts) - - defp do_decode(<<0xD8, type::signed-8, data::binary-size(16), rest::binary>>, opts), - do: decode_ext(type, data, rest, opts) - - # ==== Ext ==== - defp do_decode(<<0xC7, len::8, type::signed-8, data::binary-size(len), rest::binary>>, opts), - do: decode_ext(type, data, rest, opts) - - defp do_decode(<<0xC8, len::16, type::signed-8, data::binary-size(len), rest::binary>>, opts), - do: decode_ext(type, data, rest, opts) - - defp do_decode(<<0xC9, len::32, type::signed-8, data::binary-size(len), rest::binary>>, opts), - do: decode_ext(type, data, rest, opts) - - # ==== Unknown types ==== - defp do_decode(<>, _opts) do - {:error, {:unknown_prefix, prefix}} - end - - defp do_decode(<<>>, _opts) do - {:error, :unexpected_eof} - end - - # ==== Helpers ==== - defp decode_string(binary, size, opts) do - if max_size = opts[:max_byte_size], do: check_byte_size(size, max_size) - - case binary do - <> -> - {:ok, {string, rest}} - - _ -> - {:error, :unexpected_eof} - end - end - - defp decode_binary(binary, size, opts) do - if max_size = opts[:max_byte_size], do: check_byte_size(size, max_size) - - case binary do - <> -> - {:ok, {bin, rest}} - - _ -> - {:error, :unexpected_eof} - end - end - - defp decode_array(binary, size, opts) do - depth = opts[:depth] || 0 - - check_depth(depth, opts[:max_depth]) - check_byte_size(size, opts[:max_byte_size]) - - new_opts = Keyword.put(opts, :depth, depth + 1) - - decode_many(binary, size, [], new_opts) - end - - defp decode_map(binary, size, opts) do - depth = opts[:depth] || 0 - - check_depth(depth, opts[:max_depth]) - check_byte_size(size * 2, opts[:max_byte_size]) - - new_opts = Keyword.put(opts, :depth, depth + 1) - - with {:ok, {kv_pairs, rest}} <- decode_many(binary, size * 2, [], new_opts) do - map = - Enum.chunk_every(kv_pairs, 2) - |> Enum.map(&List.to_tuple/1) - |> Enum.into(%{}) - - {:ok, {map, rest}} - end - end - - # Recursively decodes `count` terms from the binary - defp decode_many(binary, 0, acc, _opts) do - {:ok, {Enum.reverse(acc), binary}} - end - - defp decode_many(binary, count, acc, opts) do - case do_decode(binary, opts) do - {:ok, {term, rest}} -> - decode_many(rest, count - 1, [term | acc], opts) - - {:error, reason} -> - {:error, reason} - end - end - - defp decode_ext(-1, data, rest, _opts) do - {:ok, {decode_timestamp(data), rest}} - end - - defp decode_ext(type, data, rest, _opts) do - {:ok, {%Msgpack.Ext{type: type, data: data}, rest}} - end - - # timestamp 32: 4 bytes (32-bit unsigned integer seconds) - defp decode_timestamp(<>) do - gregorian_seconds = unix_seconds + @epoch_offset - erlang_datetime = :calendar.gregorian_seconds_to_datetime(gregorian_seconds) - NaiveDateTime.from_erl!(erlang_datetime) - end - - # timestamp 64: 8 bytes (30-bit nanoseconds + 34-bit seconds) - defp decode_timestamp(<>) do - nanoseconds = :erlang.bsr(data, 34) - - if nanoseconds > 999_999_999 do - throw({:error, :invalid_timestamp}) - else - unix_seconds = :erlang.band(data, 0x00000003_FFFFFFFF) - gregorian_seconds = unix_seconds + @epoch_offset - erlang_datetime = :calendar.gregorian_seconds_to_datetime(gregorian_seconds) - base_datetime = NaiveDateTime.from_erl!(erlang_datetime) - - if nanoseconds > 0 do - microseconds = div(nanoseconds, 1000) - %{base_datetime | microsecond: {microseconds, 6}} - else - base_datetime - end - end - end - - # timestamp 96: 12 bytes (32-bit nanoseconds + 64-bit seconds) - defp decode_timestamp(<>) do - if nanoseconds > 999_999_999 do - throw({:error, :invalid_timestamp}) - else - gregorian_seconds = unix_seconds + @epoch_offset - erlang_datetime = :calendar.gregorian_seconds_to_datetime(gregorian_seconds) - base_datetime = NaiveDateTime.from_erl!(erlang_datetime) - - if nanoseconds > 0 do - microseconds = div(nanoseconds, 1000) - %{base_datetime | microsecond: {microseconds, 6}} - else - base_datetime - end - end - end - - defp check_byte_size(size, max_size) when size > max_size do - throw({:error, {:max_byte_size_exceeded, max_size}}) - end - - defp check_byte_size(_size, _max_size), do: :ok - - defp check_depth(depth, max_depth) when depth >= max_depth do - throw({:error, {:max_depth_reached, max_depth}}) - end - - defp check_depth(_depth, _max_depth), do: :ok end diff --git a/lib/msgpack/decoder/internal.ex b/lib/msgpack/decoder/internal.ex new file mode 100644 index 0000000..90da7f9 --- /dev/null +++ b/lib/msgpack/decoder/internal.ex @@ -0,0 +1,242 @@ +defmodule Msgpack.Decoder.Internal do + @moduledoc false + + # The number of gregorian seconds from year 0 to the Unix epoch. This is a constant. + @epoch_offset :calendar.datetime_to_gregorian_seconds({{1970, 1, 1}, {0, 0, 0}}) + + # ==== Nil ==== + def decode(<<0xC0, rest::binary>>, _opts), do: {:ok, {nil, rest}} + + # ==== Boolean ==== + def decode(<<0xC3, rest::binary>>, _opts), do: {:ok, {true, rest}} + def decode(<<0xC2, rest::binary>>, _opts), do: {:ok, {false, rest}} + + # ==== Integers ==== + # ==== Positive Fixint ==== + def decode(<>, _opts) when int < 128 do + {:ok, {int, rest}} + end + + # ==== Negative Fixint ==== + def decode(<>, _opts) when int >= -32 and int < 0 do + {:ok, {int, rest}} + end + + # ==== Unsigned Integers ==== + def decode(<<0xCC, int::8, rest::binary>>, _opts), do: {:ok, {int, rest}} + def decode(<<0xCD, int::16, rest::binary>>, _opts), do: {:ok, {int, rest}} + def decode(<<0xCE, int::32, rest::binary>>, _opts), do: {:ok, {int, rest}} + def decode(<<0xCF, int::64, rest::binary>>, _opts), do: {:ok, {int, rest}} + + # ==== Signed Integers ==== + def decode(<<0xD0, int::signed-8, rest::binary>>, _opts), do: {:ok, {int, rest}} + def decode(<<0xD1, int::signed-16, rest::binary>>, _opts), do: {:ok, {int, rest}} + def decode(<<0xD2, int::signed-32, rest::binary>>, _opts), do: {:ok, {int, rest}} + def decode(<<0xD3, int::signed-64, rest::binary>>, _opts), do: {:ok, {int, rest}} + + # ==== Floats ==== + def decode(<<0xCA, float::float-32, rest::binary>>, _opts), do: {:ok, {float, rest}} + def decode(<<0xCB, float::float-64, rest::binary>>, _opts), do: {:ok, {float, rest}} + + # ==== Strings ==== + def decode(<>, opts) when prefix >= 0xA0 and prefix <= 0xBF do + size = prefix - 0xA0 + decode_string(rest, size, opts) + end + + def decode(<<0xD9, size::8, rest::binary>>, opts), do: decode_string(rest, size, opts) + def decode(<<0xDA, size::16, rest::binary>>, opts), do: decode_string(rest, size, opts) + def decode(<<0xDB, size::32, rest::binary>>, opts), do: decode_string(rest, size, opts) + + # ==== Raw Binary ==== + def decode(<<0xC4, size::8, rest::binary>>, opts), do: decode_binary(rest, size, opts) + def decode(<<0xC5, size::16, rest::binary>>, opts), do: decode_binary(rest, size, opts) + def decode(<<0xC6, size::32, rest::binary>>, opts), do: decode_binary(rest, size, opts) + + # ==== Arrays ==== + def decode(<>, opts) when prefix >= 0x90 and prefix <= 0x9F do + size = prefix - 0x90 + decode_array(rest, size, opts) + end + + def decode(<<0xDC, size::16, rest::binary>>, opts), do: decode_array(rest, size, opts) + def decode(<<0xDD, size::32, rest::binary>>, opts), do: decode_array(rest, size, opts) + + # ==== Maps ==== + def decode(<>, opts) when prefix >= 0x80 and prefix <= 0x8F do + size = prefix - 0x80 + decode_map(rest, size, opts) + end + + def decode(<<0xDE, size::16, rest::binary>>, opts), do: decode_map(rest, size, opts) + def decode(<<0xDF, size::32, rest::binary>>, opts), do: decode_map(rest, size, opts) + + # ==== Extensions & Timestamps ==== + # ==== Fixext ==== + def decode(<<0xD4, type::signed-8, data::binary-size(1), rest::binary>>, opts), + do: decode_ext(type, data, rest, opts) + + def decode(<<0xD5, type::signed-8, data::binary-size(2), rest::binary>>, opts), + do: decode_ext(type, data, rest, opts) + + def decode(<<0xD6, type::signed-8, data::binary-size(4), rest::binary>>, opts), + do: decode_ext(type, data, rest, opts) + + def decode(<<0xD7, type::signed-8, data::binary-size(8), rest::binary>>, opts), + do: decode_ext(type, data, rest, opts) + + def decode(<<0xD8, type::signed-8, data::binary-size(16), rest::binary>>, opts), + do: decode_ext(type, data, rest, opts) + + # ==== Ext ==== + def decode(<<0xC7, len::8, type::signed-8, data::binary-size(len), rest::binary>>, opts), + do: decode_ext(type, data, rest, opts) + + def decode(<<0xC8, len::16, type::signed-8, data::binary-size(len), rest::binary>>, opts), + do: decode_ext(type, data, rest, opts) + + def decode(<<0xC9, len::32, type::signed-8, data::binary-size(len), rest::binary>>, opts), + do: decode_ext(type, data, rest, opts) + + # ==== Unknown types ==== + def decode(<>, _opts) do + {:error, {:unknown_prefix, prefix}} + end + + def decode(<<>>, _opts) do + {:error, :unexpected_eof} + end + + # ==== Helpers ==== + def decode_string(binary, size, opts) do + if max_size = opts[:max_byte_size], do: check_byte_size(size, max_size) + + case binary do + <> -> + {:ok, {string, rest}} + + _ -> + {:error, :unexpected_eof} + end + end + + def decode_binary(binary, size, opts) do + if max_size = opts[:max_byte_size], do: check_byte_size(size, max_size) + + case binary do + <> -> + {:ok, {bin, rest}} + + _ -> + {:error, :unexpected_eof} + end + end + + def decode_array(binary, size, opts) do + depth = opts[:depth] || 0 + + check_depth(depth, opts[:max_depth]) + check_byte_size(size, opts[:max_byte_size]) + + new_opts = Keyword.put(opts, :depth, depth + 1) + + decode_many(binary, size, [], new_opts) + end + + def decode_map(binary, size, opts) do + depth = opts[:depth] || 0 + + check_depth(depth, opts[:max_depth]) + check_byte_size(size * 2, opts[:max_byte_size]) + + new_opts = Keyword.put(opts, :depth, depth + 1) + + with {:ok, {kv_pairs, rest}} <- decode_many(binary, size * 2, [], new_opts) do + map = + Enum.chunk_every(kv_pairs, 2) + |> Enum.map(&List.to_tuple/1) + |> Enum.into(%{}) + + {:ok, {map, rest}} + end + end + + # Recursively decodes `count` terms from the binary + def decode_many(binary, 0, acc, _opts) do + {:ok, {Enum.reverse(acc), binary}} + end + + def decode_many(binary, count, acc, opts) do + case decode(binary, opts) do + {:ok, {term, rest}} -> + decode_many(rest, count - 1, [term | acc], opts) + + {:error, reason} -> + {:error, reason} + end + end + + def decode_ext(-1, data, rest, _opts) do + {:ok, {decode_timestamp(data), rest}} + end + + def decode_ext(type, data, rest, _opts) do + {:ok, {%Msgpack.Ext{type: type, data: data}, rest}} + end + + # timestamp 32: 4 bytes (32-bit unsigned integer seconds) + def decode_timestamp(<>) do + gregorian_seconds = unix_seconds + @epoch_offset + erlang_datetime = :calendar.gregorian_seconds_to_datetime(gregorian_seconds) + NaiveDateTime.from_erl!(erlang_datetime) + end + + # timestamp 64: 8 bytes (30-bit nanoseconds + 34-bit seconds) + def decode_timestamp(<>) do + nanoseconds = :erlang.bsr(data, 34) + + if nanoseconds > 999_999_999 do + throw({:error, :invalid_timestamp}) + else + unix_seconds = :erlang.band(data, 0x00000003_FFFFFFFF) + gregorian_seconds = unix_seconds + @epoch_offset + erlang_datetime = :calendar.gregorian_seconds_to_datetime(gregorian_seconds) + base_datetime = NaiveDateTime.from_erl!(erlang_datetime) + + if nanoseconds > 0 do + NaiveDateTime.add(base_datetime, nanoseconds, :nanosecond) + else + base_datetime + end + end + end + + # timestamp 96: 12 bytes (32-bit nanoseconds + 64-bit seconds) + def decode_timestamp(<>) do + if nanoseconds > 999_999_999 do + throw({:error, :invalid_timestamp}) + else + gregorian_seconds = unix_seconds + @epoch_offset + erlang_datetime = :calendar.gregorian_seconds_to_datetime(gregorian_seconds) + base_datetime = NaiveDateTime.from_erl!(erlang_datetime) + + if nanoseconds > 0 do + NaiveDateTime.add(base_datetime, nanoseconds, :nanosecond) + else + base_datetime + end + end + end + + def check_byte_size(size, max_size) when size > max_size do + throw({:error, {:max_byte_size_exceeded, max_size}}) + end + + def check_byte_size(_size, _max_size), do: :ok + + def check_depth(depth, max_depth) when depth >= max_depth do + throw({:error, {:max_depth_reached, max_depth}}) + end + + def check_depth(_depth, _max_depth), do: :ok +end From c3d8d02b6cd8ee95aeaa5817509d7b48d2f4118b Mon Sep 17 00:00:00 2001 From: Vandern Rodrigues Date: Fri, 8 Aug 2025 00:30:20 +0100 Subject: [PATCH 04/22] implement StreamDecoder --- lib/msgpack/stream_decoder.ex | 42 +++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/lib/msgpack/stream_decoder.ex b/lib/msgpack/stream_decoder.ex index d0f1bb5..7164b1d 100644 --- a/lib/msgpack/stream_decoder.ex +++ b/lib/msgpack/stream_decoder.ex @@ -1,3 +1,45 @@ defmodule Msgpack.StreamDecoder do + alias Msgpack.Decoder + alias Msgpack.Decoder.Internal + @doc """ + Decodes a stream of MessagePack binaries into a stream of Elixir terms + """ + def decode(enumerable, opts \\ []) do + merged_opts = Keyword.merge(Decoder.default_options(), opts) + + initial_acc = %{ + buffer: <<>>, + opts: merged_opts + } + + Stream.transform(enumerable, initial_acc, &process_chunk/2) + end + + defp process_chunk(chunk, acc) do + buffer = acc.buffer <> chunk + opts = acc.opts + + {decoded_terms, leftover_buffer} = decode_from_buffer(buffer, opts, []) + + {decoded_terms, %{acc | buffer: leftover_buffer}} + end + + defp decode_from_buffer(buffer, opts, decoded_acc) do + try do + case Internal.decode(buffer, opts) do + {:ok, {term, rest}} -> + decode_from_buffer(rest, opts, [term | decoded_acc]) + + {:error, :unexpected_eof} -> + {Enum.reverse(decoded_acc), buffer} + + {:error, reason} -> + {Enum.reverse(decoded_acc, [{:error, reason}]), <<>>} + end + catch + {:error, reason} -> + {Enum.reverse(decoded_acc, [{:error, reason}]), <<>>} + end + end end From 465afb289acf2d56a5f8258673e2795589669f87 Mon Sep 17 00:00:00 2001 From: Vandern Rodrigues Date: Fri, 8 Aug 2025 00:45:09 +0100 Subject: [PATCH 05/22] implement StreamEncoder --- lib/msgpack/stream_decoder.ex | 2 +- lib/msgpack/stream_encoder.ex | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) create mode 100644 lib/msgpack/stream_encoder.ex diff --git a/lib/msgpack/stream_decoder.ex b/lib/msgpack/stream_decoder.ex index 7164b1d..6f8f50a 100644 --- a/lib/msgpack/stream_decoder.ex +++ b/lib/msgpack/stream_decoder.ex @@ -3,7 +3,7 @@ defmodule Msgpack.StreamDecoder do alias Msgpack.Decoder.Internal @doc """ - Decodes a stream of MessagePack binaries into a stream of Elixir terms + Decodes a stream of MessagePack binaries into a stream of Elixir terms. """ def decode(enumerable, opts \\ []) do merged_opts = Keyword.merge(Decoder.default_options(), opts) diff --git a/lib/msgpack/stream_encoder.ex b/lib/msgpack/stream_encoder.ex new file mode 100644 index 0000000..9f5d2ac --- /dev/null +++ b/lib/msgpack/stream_encoder.ex @@ -0,0 +1,8 @@ +defmodule Msgpack.StreamEncoder do + @doc """ + Encodes a stream of Elixir terms into a stream of MessagePack binaries. + """ + def encode(enumerable, opts \\ []) do + Stream.map(enumerable, &Msgpack.encode(&1, opts)) + end +end From 61353488610e6d803d4b92894965e50b87dc4a9c Mon Sep 17 00:00:00 2001 From: Vandern Rodrigues Date: Fri, 8 Aug 2025 00:48:43 +0100 Subject: [PATCH 06/22] add StreamEncoder and StreamDecoder to library api --- lib/msgpack.ex | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/lib/msgpack.ex b/lib/msgpack.ex index 6d0c706..b667649 100644 --- a/lib/msgpack.ex +++ b/lib/msgpack.ex @@ -49,6 +49,8 @@ defmodule Msgpack do alias Msgpack.Encoder alias Msgpack.Decoder + alias Msgpack.StreamEncoder + alias Msgpack.StreamDecoder alias Msgpack.EncodeError alias Msgpack.DecodeError @@ -314,4 +316,12 @@ defmodule Msgpack do raise DecodeError, reason: reason end end + + def encode_stream(enumerable, opts \\ []) do + StreamEncoder.encode(enumerable, opts) + end + + def decode_stream(enumerable, opts \\ []) do + StreamDecoder.decode(enumerable, opts) + end end From d7fb1be95fcf3dda54ff76aee7d6f6e6fc7225f9 Mon Sep 17 00:00:00 2001 From: Vandern Rodrigues Date: Fri, 8 Aug 2025 00:55:52 +0100 Subject: [PATCH 07/22] add happy path test for stream decoding complete objects --- test/stream_decoder_test.exs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 test/stream_decoder_test.exs diff --git a/test/stream_decoder_test.exs b/test/stream_decoder_test.exs new file mode 100644 index 0000000..ca15416 --- /dev/null +++ b/test/stream_decoder_test.exs @@ -0,0 +1,14 @@ +defmodule Msgpack.StreamDecoderTest do + use ExUnit.Case, async: true + + alias Msgpack.StreamDecoder + + test "decodes a stream of complete objects" do + terms = [1, "elixir", true, %{"a" => 1}] + input_stream = Enum.map(terms, &Msgpack.encode!/1) + + result = StreamDecoder.decode(input_stream) |> Enum.to_list() + + assert result == terms + end +end From 98fb0ac8e5fa89d89f21ee0ec532a7511bde7c2e Mon Sep 17 00:00:00 2001 From: Vandern Rodrigues Date: Fri, 8 Aug 2025 01:03:32 +0100 Subject: [PATCH 08/22] add boundary-case test for stream decoder --- test/stream_decoder_test.exs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/test/stream_decoder_test.exs b/test/stream_decoder_test.exs index ca15416..3dc7246 100644 --- a/test/stream_decoder_test.exs +++ b/test/stream_decoder_test.exs @@ -11,4 +11,15 @@ defmodule Msgpack.StreamDecoderTest do assert result == terms end + + test "decodes a stream where objects cross chunk boundaries" do + terms = [123, "elixir", true, %{"a" => 1}] + single_binary = Enum.map(terms, &Msgpack.encode!/1) |> Enum.join() + <> = single_binary + input_stream = [chunk1, chunk2] + + result = StreamDecoder.decode(input_stream) |> Enum.to_list() + + assert result == terms + end end From 246b0b6ea2ad58e56a43f7f931c0267fe7c1576a Mon Sep 17 00:00:00 2001 From: Vandern Rodrigues Date: Fri, 8 Aug 2025 02:00:18 +0100 Subject: [PATCH 09/22] refactor stream decoder to use Stream.unfold/2 for incomplete data test --- lib/msgpack/stream_decoder.ex | 37 ++++++++++++++++++----------------- test/stream_decoder_test.exs | 11 +++++++++++ 2 files changed, 30 insertions(+), 18 deletions(-) diff --git a/lib/msgpack/stream_decoder.ex b/lib/msgpack/stream_decoder.ex index 6f8f50a..9437b70 100644 --- a/lib/msgpack/stream_decoder.ex +++ b/lib/msgpack/stream_decoder.ex @@ -9,37 +9,38 @@ defmodule Msgpack.StreamDecoder do merged_opts = Keyword.merge(Decoder.default_options(), opts) initial_acc = %{ + enum: enumerable, buffer: <<>>, opts: merged_opts } - Stream.transform(enumerable, initial_acc, &process_chunk/2) + Stream.unfold(initial_acc, &decode_one_or_fetch/1) end - defp process_chunk(chunk, acc) do - buffer = acc.buffer <> chunk - opts = acc.opts - - {decoded_terms, leftover_buffer} = decode_from_buffer(buffer, opts, []) - - {decoded_terms, %{acc | buffer: leftover_buffer}} - end - - defp decode_from_buffer(buffer, opts, decoded_acc) do + defp decode_one_or_fetch(state) do try do - case Internal.decode(buffer, opts) do + case Internal.decode(state.buffer, state.opts) do {:ok, {term, rest}} -> - decode_from_buffer(rest, opts, [term | decoded_acc]) + {term, %{state | buffer: rest}} {:error, :unexpected_eof} -> - {Enum.reverse(decoded_acc), buffer} - - {:error, reason} -> - {Enum.reverse(decoded_acc, [{:error, reason}]), <<>>} + case Enum.fetch(state.enum, 0) do + {:ok, chunk} -> + new_buffer = state.buffer <> chunk + remaining_enum = Stream.drop(state.enum, 1) + decode_one_or_fetch(%{state | buffer: new_buffer, enum: remaining_enum}) + + :error -> + if byte_size(state.buffer) > 0 do + {{:error, :unexpected_eof}, %{state | buffer: <<>>}} + else + nil + end + end end catch {:error, reason} -> - {Enum.reverse(decoded_acc, [{:error, reason}]), <<>>} + {{:error, reason}, %{state | buffer: <<>>}} end end end diff --git a/test/stream_decoder_test.exs b/test/stream_decoder_test.exs index 3dc7246..dafa1fb 100644 --- a/test/stream_decoder_test.exs +++ b/test/stream_decoder_test.exs @@ -22,4 +22,15 @@ defmodule Msgpack.StreamDecoderTest do assert result == terms end + + test "returns an error when the stream ends with incomplete data" do + binary = Msgpack.encode!("returns an error when the stream ends with incomplete data") + incomplete_binary = :binary.part(binary, 0, 4) + input_stream = [incomplete_binary] + expected_result = [{:error, :unexpected_eof}] + + result = StreamDecoder.decode(input_stream) |> Enum.to_list() + + assert result == expected_result + end end From 6f33f7a7eaeff15dffa960be14f65f91b8889980 Mon Sep 17 00:00:00 2001 From: Vandern Rodrigues Date: Fri, 8 Aug 2025 02:03:24 +0100 Subject: [PATCH 10/22] add test suite for stream encoder --- test/stream_encoder_test.exs | 42 ++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 test/stream_encoder_test.exs diff --git a/test/stream_encoder_test.exs b/test/stream_encoder_test.exs new file mode 100644 index 0000000..9dd77ea --- /dev/null +++ b/test/stream_encoder_test.exs @@ -0,0 +1,42 @@ +defmodule Msgpack.StreamEncoderTest do + use ExUnit.Case, async: true + + alias Msgpack.StreamEncoder + + test "encodes a stream of valid terms into ok-tuples" do + terms = [1, "elixir", true] + + expected_result = [ + {:ok, Msgpack.encode!(1)}, + {:ok, Msgpack.encode!("elixir")}, + {:ok, Msgpack.encode!(true)} + ] + + result = StreamEncoder.encode(terms) |> Enum.to_list() + + assert result == expected_result + end + + test "handles unencodable terms by emitting an error tuple" do + terms = [1, :water, 3] + opts = [atoms: :error] + + expected_result = [ + {:ok, Msgpack.encode!(1)}, + {:error, {:unsupported_atom, :water}}, + {:ok, Msgpack.encode!(3)} + ] + + result = StreamEncoder.encode(terms, opts) |> Enum.to_list() + + assert result == expected_result + end + + test "returns an empty list when given an empty stream" do + terms = [] + + result = StreamEncoder.encode(terms) |> Enum.to_list() + + assert result == [] + end +end From 3224531ab1aee9213edcb9a44cf5fbeb78eab40d Mon Sep 17 00:00:00 2001 From: Vandern Rodrigues Date: Sat, 9 Aug 2025 02:58:56 +0100 Subject: [PATCH 11/22] overhaul StreamDecoder for robustness --- lib/msgpack/decoder.ex | 4 +- lib/msgpack/stream_decoder.ex | 62 +++++++++++++++--------------- test/stream_decoder_test.exs | 4 +- test/streaming_round_trip_test.exs | 54 ++++++++++++++++++++++++++ 4 files changed, 89 insertions(+), 35 deletions(-) create mode 100644 test/streaming_round_trip_test.exs diff --git a/lib/msgpack/decoder.ex b/lib/msgpack/decoder.ex index bfecf1e..438533b 100644 --- a/lib/msgpack/decoder.ex +++ b/lib/msgpack/decoder.ex @@ -7,7 +7,7 @@ defmodule Msgpack.Decoder do @spec decode(binary(), keyword()) :: {:ok, term()} | {:error, term()} def decode(binary, opts \\ []) do - merged_opts = Keyword.merge(default_options(), opts) + merged_opts = Keyword.merge(default_opts(), opts) try do case Internal.decode(binary, merged_opts) do @@ -29,7 +29,7 @@ defmodule Msgpack.Decoder do @doc """ Returns a keyword list of the default options for the decoder. """ - def default_options() do + def default_opts() do [ max_depth: 100, max_byte_size: 10_000_000 # 10MB diff --git a/lib/msgpack/stream_decoder.ex b/lib/msgpack/stream_decoder.ex index 9437b70..fc746ff 100644 --- a/lib/msgpack/stream_decoder.ex +++ b/lib/msgpack/stream_decoder.ex @@ -6,41 +6,41 @@ defmodule Msgpack.StreamDecoder do Decodes a stream of MessagePack binaries into a stream of Elixir terms. """ def decode(enumerable, opts \\ []) do - merged_opts = Keyword.merge(Decoder.default_options(), opts) + start_fun = fn -> + merged_opts = Keyword.merge(Decoder.default_opts(), opts) + {<<>>, merged_opts} + end + + stream_with_eof = Stream.concat(enumerable, [:eof]) + transform_fun = &transform_chunk/2 + + Stream.transform(stream_with_eof, start_fun.(), transform_fun) + end + + defp transform_chunk(:eof, {<<>>, _opts}) do + {[], {<<>>, nil}} + end - initial_acc = %{ - enum: enumerable, - buffer: <<>>, - opts: merged_opts - } + defp transform_chunk(:eof, {buffer, _opts}) do + {[{:error, :unexpected_eof}], {buffer, nil}} + end - Stream.unfold(initial_acc, &decode_one_or_fetch/1) + defp transform_chunk(chunk, {buffer, opts}) do + {decoded_terms, leftover_buffer} = do_transform(buffer <> chunk, opts, []) + {decoded_terms, {leftover_buffer, opts}} end - defp decode_one_or_fetch(state) do - try do - case Internal.decode(state.buffer, state.opts) do - {:ok, {term, rest}} -> - {term, %{state | buffer: rest}} - - {:error, :unexpected_eof} -> - case Enum.fetch(state.enum, 0) do - {:ok, chunk} -> - new_buffer = state.buffer <> chunk - remaining_enum = Stream.drop(state.enum, 1) - decode_one_or_fetch(%{state | buffer: new_buffer, enum: remaining_enum}) - - :error -> - if byte_size(state.buffer) > 0 do - {{:error, :unexpected_eof}, %{state | buffer: <<>>}} - else - nil - end - end - end - catch - {:error, reason} -> - {{:error, reason}, %{state | buffer: <<>>}} + defp do_transform(<<>>, _opts, acc) do + {Enum.reverse(acc), <<>>} + end + + defp do_transform(buffer, opts, acc) do + case Internal.decode(buffer, opts) do + {:ok, {term, rest}} -> + do_transform(rest, opts, [term | acc]) + + {:error, _reason} -> + {Enum.reverse(acc), buffer} end end end diff --git a/test/stream_decoder_test.exs b/test/stream_decoder_test.exs index dafa1fb..0532ace 100644 --- a/test/stream_decoder_test.exs +++ b/test/stream_decoder_test.exs @@ -24,8 +24,8 @@ defmodule Msgpack.StreamDecoderTest do end test "returns an error when the stream ends with incomplete data" do - binary = Msgpack.encode!("returns an error when the stream ends with incomplete data") - incomplete_binary = :binary.part(binary, 0, 4) + binary = Msgpack.encode!("a guaranteed incomplete string") + incomplete_binary = :binary.part(binary, 0, byte_size(binary) - 1) input_stream = [incomplete_binary] expected_result = [{:error, :unexpected_eof}] diff --git a/test/streaming_round_trip_test.exs b/test/streaming_round_trip_test.exs new file mode 100644 index 0000000..258db1a --- /dev/null +++ b/test/streaming_round_trip_test.exs @@ -0,0 +1,54 @@ +defmodule Msgpack.StreamingRoundTripTest do + use ExUnit.Case, async: true + use ExUnitProperties + + alias Msgpack + alias Msgpack.StreamEncoder + alias Msgpack.StreamDecoder + + property "a round trip through the streaming API is lossless" do + check all(terms <- list_of_encodable_terms(), chunk_size <- StreamData.integer(1..20)) do + binaries = + StreamEncoder.encode(terms) + |> Stream.map(fn {:ok, binary} -> binary end) + + chunked_stream = + binaries + |> Enum.to_list() + |> IO.iodata_to_binary() + |> chunk_binary(chunk_size) + + decoded_terms = + StreamDecoder.decode(chunked_stream) + |> Enum.to_list() + + assert decoded_terms == terms + end + end + + defp list_of_encodable_terms do + StreamData.list_of(encodable_term()) + end + + defp encodable_term do + StreamData.one_of([ + StreamData.integer(-1_000_000..1_000_000), + StreamData.string(:utf8, max_length: 50), + StreamData.boolean(), + StreamData.binary(max_length: 50), + StreamData.constant(nil) + ]) + end + + defp chunk_binary(binary, chunk_size) do + Stream.unfold(binary, fn + <<>> -> + nil + + remaining -> + size = min(chunk_size, byte_size(remaining)) + <> = remaining + {chunk, rest} + end) + end +end From 14ad7ba5409ab5c9be039539b60019843f4d244c Mon Sep 17 00:00:00 2001 From: Vandern Rodrigues Date: Sat, 9 Aug 2025 03:20:41 +0100 Subject: [PATCH 12/22] add docs for StreamDecoder --- lib/msgpack/stream_decoder.ex | 67 ++++++++++++++++++++++++++++++++++- 1 file changed, 66 insertions(+), 1 deletion(-) diff --git a/lib/msgpack/stream_decoder.ex b/lib/msgpack/stream_decoder.ex index fc746ff..5113a84 100644 --- a/lib/msgpack/stream_decoder.ex +++ b/lib/msgpack/stream_decoder.ex @@ -1,9 +1,74 @@ defmodule Msgpack.StreamDecoder do + @moduledoc """ + Decodes a stream of MessagePack binaries into a stream of Elixir terms. + + This module is designed to handle large sequences of MessagePack objects that + arrive in chunks, such as from a network socket or a large file. + + It incrementally parses the incoming binaries and emits complete Elixir terms + as they are decoded. + + ## Capabilities + + * **Buffering:** The module internally buffers data, allowing a single + MessagePack object to be split across multiple chunks in the input stream. + * **Error Handling:** If the stream finishes while an object is only + partially decoded, the last element emitted by the stream will be the tuple + `{:error, :unexpected_eof}`. + + This module can be used together with `Msgpack.StreamEncoder` to create a lazy + serialization and deserialization pipeline. + """ + alias Msgpack.Decoder alias Msgpack.Decoder.Internal @doc """ - Decodes a stream of MessagePack binaries into a stream of Elixir terms. + Lazily decodes an enumerable of MessagePack binaries into a stream of Elixir + terms. + + ## Parameters + + * `enumerable`: An `Enumerable` that yields chunks of a MessagePack binary + stream (e.g., `File.stream/3` or a list of binaries). + * `opts`: A keyword list of options passed to the underlying decoder. + + ## Return Value + + Returns a lazy `Stream` that emits Elixir terms as they are decoded. + + If the input stream ends with incomplete data, the last item emitted will be + an error tuple `{:error, :unexpected_eof}`. + + ## Options + + This function accepts the same options as `Msgpack.decode/2`, which are + applied to the decoding of each object in the stream: + + * `:max_depth`: Sets a limit on the nesting level of arrays and maps. + Defaults to `100`. + * `:max_byte_size`: Sets a limit on the declared byte size of any single + string, binary, array, or map. + Defaults to `10_000_000` (10MB). + + ## Examples + + ### Standard Usage + + ```elixir + iex> objects = [1, "elixir", true] + iex> stream = Enum.map(objects, &Msgpack.encode!/1) + iex> Msgpack.StreamDecoder.decode(stream) |> Enum.to_list() + [1, "elixir", true] + ``` + + ### Handling Incomplete Streams + + ```elixir + iex> incomplete_stream = [<<0x91>>] # Array header + no elements + iex> Msgpack.StreamDecoder.decode(incomplete_stream) |> Enum.to_list() + [{:error, :unexpected_eof}] + ``` """ def decode(enumerable, opts \\ []) do start_fun = fn -> From b75bff57fa800b2bc5679ddacc6262729129a0c4 Mon Sep 17 00:00:00 2001 From: Vandern Rodrigues Date: Sat, 9 Aug 2025 03:29:23 +0100 Subject: [PATCH 13/22] rename test helper --- test/streaming_round_trip_test.exs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/streaming_round_trip_test.exs b/test/streaming_round_trip_test.exs index 258db1a..efb01f0 100644 --- a/test/streaming_round_trip_test.exs +++ b/test/streaming_round_trip_test.exs @@ -7,7 +7,7 @@ defmodule Msgpack.StreamingRoundTripTest do alias Msgpack.StreamDecoder property "a round trip through the streaming API is lossless" do - check all(terms <- list_of_encodable_terms(), chunk_size <- StreamData.integer(1..20)) do + check all(terms <- encodable_terms(), chunk_size <- StreamData.integer(1..20)) do binaries = StreamEncoder.encode(terms) |> Stream.map(fn {:ok, binary} -> binary end) @@ -26,7 +26,7 @@ defmodule Msgpack.StreamingRoundTripTest do end end - defp list_of_encodable_terms do + defp encodable_terms() do StreamData.list_of(encodable_term()) end From 6e0c041960093f95288dbefee965b11d6c17bd51 Mon Sep 17 00:00:00 2001 From: Vandern Rodrigues Date: Sat, 9 Aug 2025 03:42:49 +0100 Subject: [PATCH 14/22] add docs for StreamEncoder --- lib/msgpack/stream_encoder.ex | 57 ++++++++++++++++++++++++++++++++++- 1 file changed, 56 insertions(+), 1 deletion(-) diff --git a/lib/msgpack/stream_encoder.ex b/lib/msgpack/stream_encoder.ex index 9f5d2ac..746deb1 100644 --- a/lib/msgpack/stream_encoder.ex +++ b/lib/msgpack/stream_encoder.ex @@ -1,6 +1,61 @@ defmodule Msgpack.StreamEncoder do + @moduledoc """ + Lazily encodes a stream of Elixir terms into a stream of MessagePack binaries. + + This module is the counterpart to `Msgpack.StreamDecoder`. It processes an + enumerable item by item, making it memory-efficient for encoding large + collections or infinite streams without loading the entire dataset into + memory. + + Each item in the output stream is a result tuple, either `{:ok, binary}` for + a successful encoding or `{:error, reason}` if an individual term could + not be encoded. + """ + @doc """ - Encodes a stream of Elixir terms into a stream of MessagePack binaries. + Lazily encodes an enumerable of Elixir terms into a stream of result tuples. + + ## Parameters + + * `enumerable`: An `Enumerable` that yields Elixir terms to be encoded. + * `opts`: A keyword list of options passed to the underlying encoder for each term. + + ## Return Value + + Returns a lazy `Stream` that emits result tuples. For each term in the + input enumerable, the stream will contain either: + * `{:ok, binary}` - On successful encoding. + * `{:error, reason}` - If the term cannot be encoded. + + ## Options + + This function accepts the same options as `Msgpack.encode/2`. See the + documentation for `Msgpack.encode/2` for a full list. + + ## Examples + + ### Standard Usage + + ```elixir + iex> terms = [1, "elixir"] + iex> Msgpack.StreamEncoder.encode(terms) |> Enum.to_list() + [ + {:ok, <<1>>}, + {:ok, <<166, 101, 108, 105, 120, 105, 114>>} + ] + ``` + + ### Handling Unencodable Terms + + ```elixir + iex> terms = [1, :elixir, 4] + iex> Msgpack.StreamEncoder.encode(terms, atoms: :error) |> Enum.to_list() + [ + {:ok, <<1>>}, + {:error, {:unsupported_atom, :elixir}}, + {:ok, <<4>>} + ] + ``` """ def encode(enumerable, opts \\ []) do Stream.map(enumerable, &Msgpack.encode(&1, opts)) From f9cfa34f7faac9bbf69735dfbfc6992013cac86b Mon Sep 17 00:00:00 2001 From: Vandern Rodrigues Date: Sat, 9 Aug 2025 03:54:25 +0100 Subject: [PATCH 15/22] document streaming functions in library interface --- lib/msgpack.ex | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/lib/msgpack.ex b/lib/msgpack.ex index b667649..ec8598e 100644 --- a/lib/msgpack.ex +++ b/lib/msgpack.ex @@ -317,10 +317,59 @@ defmodule Msgpack do end end + @doc """ + Encodes a stream of Elixir terms into a stream of MessagePack binaries. + + Each term in the input enumerable is encoded individually. The output stream + will contain `{:ok, binary}` tuples for successful encodings or `{:error, + reason}` tuples for failures. + + This function delegates to `Msgpack.StreamEncoder.encode/2`. + + ## Options + + Accepts the same options as `Msgpack.encode/2`. + + ## Examples + + ```elixir + iex> terms = [1, "elixir", :world] + iex> Msgpack.encode_stream(terms, atoms: :string) |> Enum.to_list() + [ + {:ok, <<1>>}, + {:ok, <<166, 101, 108, 105, 120, 105, 114>>}, + {:ok, <<165, 119, 111, 114, 108, 100>>} + ] + ``` + """ def encode_stream(enumerable, opts \\ []) do StreamEncoder.encode(enumerable, opts) end + @doc """ + Decodes a stream of MessagePack binaries into a stream of Elixir terms. + + This function provides a streaming, lazy interface for decoding, making it + suitable for handling large datasets that do not fit into memory. + + It delegates to `Msgpack.StreamDecoder.decode/2`. + + For more detailed information on behavior, see the `Msgpack.StreamDecoder` + module documentation. + + ## Options + + Accepts the same options as `Msgpack.decode/2`. + + ## Examples + + ```elixir + iex> objects = [1, "elixir", true] + iex> stream = Enum.map(objects, &Msgpack.encode!/1) + iex> Msgpack.decode_stream(stream) |> Enum.to_list() + [1, "elixir", true] + ``` + """ def decode_stream(enumerable, opts \\ []) do StreamDecoder.decode(enumerable, opts) end From 54ee7cd346d50d4f5ce2ef29a082534e8adc41ba Mon Sep 17 00:00:00 2001 From: Vandern Rodrigues Date: Sat, 9 Aug 2025 04:01:48 +0100 Subject: [PATCH 16/22] update readme --- README.md | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/README.md b/README.md index 617bf21..ff314ea 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,9 @@ types. limits to mitigate resource exhaustion from malformed or malicious payloads. - **Telemetry Integration:** Emits standard `:telemetry` events for integration with monitoring tools. +- **Streaming API:** Process large collections or continuous data streams with + low memory overhead using `Msgpack.encode_stream/2` and + `Msgpack.decode_stream/2`. ## Installation @@ -50,6 +53,27 @@ iex> Msgpack.decode!(<<0xC1>>) ** (Msgpack.DecodeError) Unknown type prefix: 193. The byte `0xC1` is not a valid MessagePack type marker. ``` +### Streaming Large Collections + +For datasets that may not fit in memory, you can use the streaming API, which +processes one term at a time. + +```elixir +# Create a lazy stream of terms to be encoded. +iex> terms = Stream.cycle([1, "elixir", true]) + +# The output is a lazy stream of encoded binaries. +iex> encoded_stream = Msgpack.encode_stream(terms) + +# The stream is only consumed when you enumerate it. +iex> encoded_stream |> Stream.take(3) |> Enum.to_list() +[ + {:ok, <<1>>}, + {:ok, <<166, 101, 108, 105, 120, 105, 114>>}, + {:ok, <<195>>} +] +``` + ## Full Documentation For detailed information on all features, options, and functions, see the [full From 133e691c87fcbcb27d4d331d0ab3b26c3b61dbba Mon Sep 17 00:00:00 2001 From: Vandern Rodrigues Date: Sat, 9 Aug 2025 04:21:10 +0100 Subject: [PATCH 17/22] define types and specs --- lib/msgpack.ex | 2 ++ lib/msgpack/stream_decoder.ex | 11 +++++++++++ lib/msgpack/stream_encoder.ex | 10 ++++++++++ 3 files changed, 23 insertions(+) diff --git a/lib/msgpack.ex b/lib/msgpack.ex index ec8598e..1898d3a 100644 --- a/lib/msgpack.ex +++ b/lib/msgpack.ex @@ -342,6 +342,7 @@ defmodule Msgpack do ] ``` """ + @spec encode_stream(Enumerable.t(), StreamEncoder.opts_t()) :: StreamEncoder.t() def encode_stream(enumerable, opts \\ []) do StreamEncoder.encode(enumerable, opts) end @@ -370,6 +371,7 @@ defmodule Msgpack do [1, "elixir", true] ``` """ + @spec decode_stream(Enumerable.t(binary()), StreamDecoder.opts_t()) :: StreamDecoder.t() def decode_stream(enumerable, opts \\ []) do StreamDecoder.decode(enumerable, opts) end diff --git a/lib/msgpack/stream_decoder.ex b/lib/msgpack/stream_decoder.ex index 5113a84..0e92dcc 100644 --- a/lib/msgpack/stream_decoder.ex +++ b/lib/msgpack/stream_decoder.ex @@ -23,6 +23,12 @@ defmodule Msgpack.StreamDecoder do alias Msgpack.Decoder alias Msgpack.Decoder.Internal + @typedoc "A stream that yields decoded Elixir terms." + @type t :: Stream.t() + + @typedoc "Options passed to the decoder." + @type opts_t :: keyword() + @doc """ Lazily decodes an enumerable of MessagePack binaries into a stream of Elixir terms. @@ -70,6 +76,7 @@ defmodule Msgpack.StreamDecoder do [{:error, :unexpected_eof}] ``` """ + @spec decode(Enumerable.t(binary()), opts_t()) :: t() def decode(enumerable, opts \\ []) do start_fun = fn -> merged_opts = Keyword.merge(Decoder.default_opts(), opts) @@ -82,6 +89,8 @@ defmodule Msgpack.StreamDecoder do Stream.transform(stream_with_eof, start_fun.(), transform_fun) end + @doc false + @spec transform_chunk(binary() | :eof, {binary(), opts_t()}) :: {list(), {binary(), opts_t() | nil}} defp transform_chunk(:eof, {<<>>, _opts}) do {[], {<<>>, nil}} end @@ -95,6 +104,8 @@ defmodule Msgpack.StreamDecoder do {decoded_terms, {leftover_buffer, opts}} end + @doc false + @spec do_transform(binary(), opts_t(), list()) :: {list(), binary()} defp do_transform(<<>>, _opts, acc) do {Enum.reverse(acc), <<>>} end diff --git a/lib/msgpack/stream_encoder.ex b/lib/msgpack/stream_encoder.ex index 746deb1..e419d10 100644 --- a/lib/msgpack/stream_encoder.ex +++ b/lib/msgpack/stream_encoder.ex @@ -12,6 +12,15 @@ defmodule Msgpack.StreamEncoder do not be encoded. """ + @typedoc "A stream that yields encoded MessagePack objects." + @type t :: Stream.t(result_t()) + + @typedoc "An individual result from the encoding stream." + @type result_t :: {:ok, binary()} | {:error, any()} + + @typedoc "Options passed to the encoder." + @type opts_t :: keyword() + @doc """ Lazily encodes an enumerable of Elixir terms into a stream of result tuples. @@ -57,6 +66,7 @@ defmodule Msgpack.StreamEncoder do ] ``` """ + @spec encode(Enumerable.t(), opts_t()) :: t() def encode(enumerable, opts \\ []) do Stream.map(enumerable, &Msgpack.encode(&1, opts)) end From 3b419dcc89a6cf4e3f2812dbf21cd4086c04cc67 Mon Sep 17 00:00:00 2001 From: Vandern Rodrigues Date: Sat, 9 Aug 2025 04:33:30 +0100 Subject: [PATCH 18/22] add default_opts/0 to encoder module --- lib/msgpack/encoder.ex | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/lib/msgpack/encoder.ex b/lib/msgpack/encoder.ex index 963e7b1..4d7a1a8 100644 --- a/lib/msgpack/encoder.ex +++ b/lib/msgpack/encoder.ex @@ -4,8 +4,19 @@ defmodule Msgpack.Encoder do """ @spec encode(term(), keyword()) :: {:ok, iodata()} | {:error, term()} - def encode(term, opts) do - do_encode(term, opts) + def encode(term, opts \\ []) do + merged_opts = Keyword.merge(default_opts(), opts) + do_encode(term, merged_opts) + end + + @doc """ + Returns a keyword list of the default options for the encoder. + """ + def default_opts() do + [ + atoms: :string, + string_validation: true + ] end # ==== Nil ==== From 1fafe37fd7b18802013365bbd64386c849735e00 Mon Sep 17 00:00:00 2001 From: Vandern Rodrigues Date: Sat, 9 Aug 2025 13:30:45 +0100 Subject: [PATCH 19/22] add tests for public interface --- test/msgpack_test.exs | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/test/msgpack_test.exs b/test/msgpack_test.exs index d8e336a..59dbd70 100644 --- a/test/msgpack_test.exs +++ b/test/msgpack_test.exs @@ -323,6 +323,38 @@ defmodule MsgpackTest do end end + describe "Streaming" do + test "provides a lossless round trip for streaming encode and decode" do + terms = [1, "elixir", true, %{"key" => "value"}] + + result = + terms + |> Msgpack.encode_stream() + |> Stream.map(fn {:ok, binary} -> binary end) + |> Msgpack.decode_stream() + |> Enum.to_list() + + assert result == terms + end + + test "encode_stream handles unencodable terms gracefully" do + terms = [1, :elixir, 4] + + expected = [ + {:ok, <<1>>}, + {:ok, <<166, 101, 108, 105, 120, 105, 114>>}, + {:ok, <<4>>}, + ] + + result = + terms + |> Msgpack.encode_stream() + |> Enum.to_list() + + assert result == expected + end + end + # ==== Helpers ==== defp assert_encode(input, expected_binary) do From d64e7d1bc4afa03be9c060a7a3044e57c35d0363 Mon Sep 17 00:00:00 2001 From: Vandern Rodrigues Date: Sat, 9 Aug 2025 18:03:34 +0100 Subject: [PATCH 20/22] update changelog --- CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d3b32ad..f0ef737 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added a new Streaming API that processes data in chunks, reducing peak memory + usage when handling large datasets or network streams + - Introduced `Msgpack.encode_stream/2` to lazily encode a stream of Elixir + terms one by one + - Introduced `Msgpack.decode_stream/2` to lazily decode a stream of + MessagePack objects, capable of handling data that arrives in multiple + chunks - Added CI workflow to run tests against supported Elixir versions ### Changed From 4564c6d6bd2d56d7ffd53597b2949682a7dc81c4 Mon Sep 17 00:00:00 2001 From: Vandern Rodrigues Date: Sat, 9 Aug 2025 18:20:24 +0100 Subject: [PATCH 21/22] use map_join instead of map |> join --- test/stream_decoder_test.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/stream_decoder_test.exs b/test/stream_decoder_test.exs index 0532ace..4a33717 100644 --- a/test/stream_decoder_test.exs +++ b/test/stream_decoder_test.exs @@ -14,7 +14,7 @@ defmodule Msgpack.StreamDecoderTest do test "decodes a stream where objects cross chunk boundaries" do terms = [123, "elixir", true, %{"a" => 1}] - single_binary = Enum.map(terms, &Msgpack.encode!/1) |> Enum.join() + single_binary = Enum.map_join(terms, &Msgpack.encode!/1) <> = single_binary input_stream = [chunk1, chunk2] From 9cb9f3cf69a18f21bec25188eea4adf91806f7a4 Mon Sep 17 00:00:00 2001 From: Vandern Rodrigues Date: Sat, 9 Aug 2025 20:10:17 +0100 Subject: [PATCH 22/22] re-add code lost when fixing merge conflicts --- lib/msgpack/decoder/internal.ex | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/msgpack/decoder/internal.ex b/lib/msgpack/decoder/internal.ex index 90da7f9..b43bc71 100644 --- a/lib/msgpack/decoder/internal.ex +++ b/lib/msgpack/decoder/internal.ex @@ -204,7 +204,8 @@ defmodule Msgpack.Decoder.Internal do base_datetime = NaiveDateTime.from_erl!(erlang_datetime) if nanoseconds > 0 do - NaiveDateTime.add(base_datetime, nanoseconds, :nanosecond) + microseconds = div(nanoseconds, 1000) + %{base_datetime | microsecond: {microseconds, 6}} else base_datetime end @@ -221,7 +222,8 @@ defmodule Msgpack.Decoder.Internal do base_datetime = NaiveDateTime.from_erl!(erlang_datetime) if nanoseconds > 0 do - NaiveDateTime.add(base_datetime, nanoseconds, :nanosecond) + microseconds = div(nanoseconds, 1000) + %{base_datetime | microsecond: {microseconds, 6}} else base_datetime end