diff --git a/CHANGELOG.md b/CHANGELOG.md index d3b32ad..f0ef737 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added a new Streaming API that processes data in chunks, reducing peak memory + usage when handling large datasets or network streams + - Introduced `Msgpack.encode_stream/2` to lazily encode a stream of Elixir + terms one by one + - Introduced `Msgpack.decode_stream/2` to lazily decode a stream of + MessagePack objects, capable of handling data that arrives in multiple + chunks - Added CI workflow to run tests against supported Elixir versions ### Changed diff --git a/README.md b/README.md index 617bf21..ff314ea 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,9 @@ types. limits to mitigate resource exhaustion from malformed or malicious payloads. - **Telemetry Integration:** Emits standard `:telemetry` events for integration with monitoring tools. +- **Streaming API:** Process large collections or continuous data streams with + low memory overhead using `Msgpack.encode_stream/2` and + `Msgpack.decode_stream/2`. ## Installation @@ -50,6 +53,27 @@ iex> Msgpack.decode!(<<0xC1>>) ** (Msgpack.DecodeError) Unknown type prefix: 193. The byte `0xC1` is not a valid MessagePack type marker. ``` +### Streaming Large Collections + +For datasets that may not fit in memory, you can use the streaming API, which +processes one term at a time. + +```elixir +# Create a lazy stream of terms to be encoded. +iex> terms = Stream.cycle([1, "elixir", true]) + +# The output is a lazy stream of encoded binaries. +iex> encoded_stream = Msgpack.encode_stream(terms) + +# The stream is only consumed when you enumerate it. +iex> encoded_stream |> Stream.take(3) |> Enum.to_list() +[ + {:ok, <<1>>}, + {:ok, <<166, 101, 108, 105, 120, 105, 114>>}, + {:ok, <<195>>} +] +``` + ## Full Documentation For detailed information on all features, options, and functions, see the [full diff --git a/lib/msgpack.ex b/lib/msgpack.ex index 6d0c706..1898d3a 100644 --- a/lib/msgpack.ex +++ b/lib/msgpack.ex @@ -49,6 +49,8 @@ defmodule Msgpack do alias Msgpack.Encoder alias Msgpack.Decoder + alias Msgpack.StreamEncoder + alias Msgpack.StreamDecoder alias Msgpack.EncodeError alias Msgpack.DecodeError @@ -314,4 +316,63 @@ defmodule Msgpack do raise DecodeError, reason: reason end end + + @doc """ + Encodes a stream of Elixir terms into a stream of MessagePack binaries. + + Each term in the input enumerable is encoded individually. The output stream + will contain `{:ok, binary}` tuples for successful encodings or `{:error, + reason}` tuples for failures. + + This function delegates to `Msgpack.StreamEncoder.encode/2`. + + ## Options + + Accepts the same options as `Msgpack.encode/2`. + + ## Examples + + ```elixir + iex> terms = [1, "elixir", :world] + iex> Msgpack.encode_stream(terms, atoms: :string) |> Enum.to_list() + [ + {:ok, <<1>>}, + {:ok, <<166, 101, 108, 105, 120, 105, 114>>}, + {:ok, <<165, 119, 111, 114, 108, 100>>} + ] + ``` + """ + @spec encode_stream(Enumerable.t(), StreamEncoder.opts_t()) :: StreamEncoder.t() + def encode_stream(enumerable, opts \\ []) do + StreamEncoder.encode(enumerable, opts) + end + + @doc """ + Decodes a stream of MessagePack binaries into a stream of Elixir terms. + + This function provides a streaming, lazy interface for decoding, making it + suitable for handling large datasets that do not fit into memory. + + It delegates to `Msgpack.StreamDecoder.decode/2`. + + For more detailed information on behavior, see the `Msgpack.StreamDecoder` + module documentation. + + ## Options + + Accepts the same options as `Msgpack.decode/2`. + + ## Examples + + ```elixir + iex> objects = [1, "elixir", true] + iex> stream = Enum.map(objects, &Msgpack.encode!/1) + iex> Msgpack.decode_stream(stream) |> Enum.to_list() + [1, "elixir", true] + ``` + """ + @spec decode_stream(Enumerable.t(binary()), StreamDecoder.opts_t()) :: StreamDecoder.t() + def decode_stream(enumerable, opts \\ []) do + StreamDecoder.decode(enumerable, opts) + end end diff --git a/lib/msgpack/decoder.ex b/lib/msgpack/decoder.ex index c76e037..438533b 100644 --- a/lib/msgpack/decoder.ex +++ b/lib/msgpack/decoder.ex @@ -3,21 +3,14 @@ defmodule Msgpack.Decoder do Handles the logic of decoding a MessagePack binary into an Elixir term. """ - @default_max_depth 100 - @default_max_byte_size 10_000_000 # 10MB - - # The number of gregorian seconds from year 0 to the Unix epoch. This is a constant. - @epoch_offset :calendar.datetime_to_gregorian_seconds({{1970, 1, 1}, {0, 0, 0}}) + alias Msgpack.Decoder.Internal @spec decode(binary(), keyword()) :: {:ok, term()} | {:error, term()} def decode(binary, opts \\ []) do - merged_opts = - opts - |> Keyword.put_new(:max_depth, @default_max_depth) - |> Keyword.put_new(:max_byte_size, @default_max_byte_size) + merged_opts = Keyword.merge(default_opts(), opts) try do - case do_decode(binary, merged_opts) do + case Internal.decode(binary, merged_opts) do {:ok, {term, <<>>}} -> {:ok, term} @@ -33,241 +26,13 @@ defmodule Msgpack.Decoder do end end - # ==== Nil ==== - defp do_decode(<<0xC0, rest::binary>>, _opts), do: {:ok, {nil, rest}} - - # ==== Boolean ==== - defp do_decode(<<0xC3, rest::binary>>, _opts), do: {:ok, {true, rest}} - defp do_decode(<<0xC2, rest::binary>>, _opts), do: {:ok, {false, rest}} - - # ==== Integers ==== - # ==== Positive Fixint ==== - defp do_decode(<>, _opts) when int < 128 do - {:ok, {int, rest}} - end - - # ==== Negative Fixint ==== - defp do_decode(<>, _opts) when int >= -32 and int < 0 do - {:ok, {int, rest}} - end - - # ==== Unsigned Integers ==== - defp do_decode(<<0xCC, int::8, rest::binary>>, _opts), do: {:ok, {int, rest}} - defp do_decode(<<0xCD, int::16, rest::binary>>, _opts), do: {:ok, {int, rest}} - defp do_decode(<<0xCE, int::32, rest::binary>>, _opts), do: {:ok, {int, rest}} - defp do_decode(<<0xCF, int::64, rest::binary>>, _opts), do: {:ok, {int, rest}} - - # ==== Signed Integers ==== - defp do_decode(<<0xD0, int::signed-8, rest::binary>>, _opts), do: {:ok, {int, rest}} - defp do_decode(<<0xD1, int::signed-16, rest::binary>>, _opts), do: {:ok, {int, rest}} - defp do_decode(<<0xD2, int::signed-32, rest::binary>>, _opts), do: {:ok, {int, rest}} - defp do_decode(<<0xD3, int::signed-64, rest::binary>>, _opts), do: {:ok, {int, rest}} - - # ==== Floats ==== - defp do_decode(<<0xCA, float::float-32, rest::binary>>, _opts), do: {:ok, {float, rest}} - defp do_decode(<<0xCB, float::float-64, rest::binary>>, _opts), do: {:ok, {float, rest}} - - # ==== Strings ==== - defp do_decode(<>, opts) when prefix >= 0xA0 and prefix <= 0xBF do - size = prefix - 0xA0 - decode_string(rest, size, opts) - end - - defp do_decode(<<0xD9, size::8, rest::binary>>, opts), do: decode_string(rest, size, opts) - defp do_decode(<<0xDA, size::16, rest::binary>>, opts), do: decode_string(rest, size, opts) - defp do_decode(<<0xDB, size::32, rest::binary>>, opts), do: decode_string(rest, size, opts) - - # ==== Raw Binary ==== - defp do_decode(<<0xC4, size::8, rest::binary>>, opts), do: decode_binary(rest, size, opts) - defp do_decode(<<0xC5, size::16, rest::binary>>, opts), do: decode_binary(rest, size, opts) - defp do_decode(<<0xC6, size::32, rest::binary>>, opts), do: decode_binary(rest, size, opts) - - # ==== Arrays ==== - defp do_decode(<>, opts) when prefix >= 0x90 and prefix <= 0x9F do - size = prefix - 0x90 - decode_array(rest, size, opts) - end - - defp do_decode(<<0xDC, size::16, rest::binary>>, opts), do: decode_array(rest, size, opts) - defp do_decode(<<0xDD, size::32, rest::binary>>, opts), do: decode_array(rest, size, opts) - - # ==== Maps ==== - defp do_decode(<>, opts) when prefix >= 0x80 and prefix <= 0x8F do - size = prefix - 0x80 - decode_map(rest, size, opts) - end - - defp do_decode(<<0xDE, size::16, rest::binary>>, opts), do: decode_map(rest, size, opts) - defp do_decode(<<0xDF, size::32, rest::binary>>, opts), do: decode_map(rest, size, opts) - - # ==== Extensions & Timestamps ==== - # ==== Fixext ==== - defp do_decode(<<0xD4, type::signed-8, data::binary-size(1), rest::binary>>, opts), - do: decode_ext(type, data, rest, opts) - - defp do_decode(<<0xD5, type::signed-8, data::binary-size(2), rest::binary>>, opts), - do: decode_ext(type, data, rest, opts) - - defp do_decode(<<0xD6, type::signed-8, data::binary-size(4), rest::binary>>, opts), - do: decode_ext(type, data, rest, opts) - - defp do_decode(<<0xD7, type::signed-8, data::binary-size(8), rest::binary>>, opts), - do: decode_ext(type, data, rest, opts) - - defp do_decode(<<0xD8, type::signed-8, data::binary-size(16), rest::binary>>, opts), - do: decode_ext(type, data, rest, opts) - - # ==== Ext ==== - defp do_decode(<<0xC7, len::8, type::signed-8, data::binary-size(len), rest::binary>>, opts), - do: decode_ext(type, data, rest, opts) - - defp do_decode(<<0xC8, len::16, type::signed-8, data::binary-size(len), rest::binary>>, opts), - do: decode_ext(type, data, rest, opts) - - defp do_decode(<<0xC9, len::32, type::signed-8, data::binary-size(len), rest::binary>>, opts), - do: decode_ext(type, data, rest, opts) - - # ==== Unknown types ==== - defp do_decode(<>, _opts) do - {:error, {:unknown_prefix, prefix}} - end - - defp do_decode(<<>>, _opts) do - {:error, :unexpected_eof} - end - - # ==== Helpers ==== - defp decode_string(binary, size, opts) do - if max_size = opts[:max_byte_size], do: check_byte_size(size, max_size) - - case binary do - <> -> - {:ok, {string, rest}} - - _ -> - {:error, :unexpected_eof} - end - end - - defp decode_binary(binary, size, opts) do - if max_size = opts[:max_byte_size], do: check_byte_size(size, max_size) - - case binary do - <> -> - {:ok, {bin, rest}} - - _ -> - {:error, :unexpected_eof} - end - end - - defp decode_array(binary, size, opts) do - depth = opts[:depth] || 0 - - check_depth(depth, opts[:max_depth]) - check_byte_size(size, opts[:max_byte_size]) - - new_opts = Keyword.put(opts, :depth, depth + 1) - - decode_many(binary, size, [], new_opts) - end - - defp decode_map(binary, size, opts) do - depth = opts[:depth] || 0 - - check_depth(depth, opts[:max_depth]) - check_byte_size(size * 2, opts[:max_byte_size]) - - new_opts = Keyword.put(opts, :depth, depth + 1) - - with {:ok, {kv_pairs, rest}} <- decode_many(binary, size * 2, [], new_opts) do - map = - Enum.chunk_every(kv_pairs, 2) - |> Enum.map(&List.to_tuple/1) - |> Enum.into(%{}) - - {:ok, {map, rest}} - end - end - - # Recursively decodes `count` terms from the binary - defp decode_many(binary, 0, acc, _opts) do - {:ok, {Enum.reverse(acc), binary}} - end - - defp decode_many(binary, count, acc, opts) do - case do_decode(binary, opts) do - {:ok, {term, rest}} -> - decode_many(rest, count - 1, [term | acc], opts) - - {:error, reason} -> - {:error, reason} - end - end - - defp decode_ext(-1, data, rest, _opts) do - {:ok, {decode_timestamp(data), rest}} - end - - defp decode_ext(type, data, rest, _opts) do - {:ok, {%Msgpack.Ext{type: type, data: data}, rest}} - end - - # timestamp 32: 4 bytes (32-bit unsigned integer seconds) - defp decode_timestamp(<>) do - gregorian_seconds = unix_seconds + @epoch_offset - erlang_datetime = :calendar.gregorian_seconds_to_datetime(gregorian_seconds) - NaiveDateTime.from_erl!(erlang_datetime) - end - - # timestamp 64: 8 bytes (30-bit nanoseconds + 34-bit seconds) - defp decode_timestamp(<>) do - nanoseconds = :erlang.bsr(data, 34) - - if nanoseconds > 999_999_999 do - throw({:error, :invalid_timestamp}) - else - unix_seconds = :erlang.band(data, 0x00000003_FFFFFFFF) - gregorian_seconds = unix_seconds + @epoch_offset - erlang_datetime = :calendar.gregorian_seconds_to_datetime(gregorian_seconds) - base_datetime = NaiveDateTime.from_erl!(erlang_datetime) - - if nanoseconds > 0 do - microseconds = div(nanoseconds, 1000) - %{base_datetime | microsecond: {microseconds, 6}} - else - base_datetime - end - end - end - - # timestamp 96: 12 bytes (32-bit nanoseconds + 64-bit seconds) - defp decode_timestamp(<>) do - if nanoseconds > 999_999_999 do - throw({:error, :invalid_timestamp}) - else - gregorian_seconds = unix_seconds + @epoch_offset - erlang_datetime = :calendar.gregorian_seconds_to_datetime(gregorian_seconds) - base_datetime = NaiveDateTime.from_erl!(erlang_datetime) - - if nanoseconds > 0 do - microseconds = div(nanoseconds, 1000) - %{base_datetime | microsecond: {microseconds, 6}} - else - base_datetime - end - end - end - - defp check_byte_size(size, max_size) when size > max_size do - throw({:error, {:max_byte_size_exceeded, max_size}}) - end - - defp check_byte_size(_size, _max_size), do: :ok - - defp check_depth(depth, max_depth) when depth >= max_depth do - throw({:error, {:max_depth_reached, max_depth}}) + @doc """ + Returns a keyword list of the default options for the decoder. + """ + def default_opts() do + [ + max_depth: 100, + max_byte_size: 10_000_000 # 10MB + ] end - - defp check_depth(_depth, _max_depth), do: :ok end diff --git a/lib/msgpack/decoder/internal.ex b/lib/msgpack/decoder/internal.ex new file mode 100644 index 0000000..b43bc71 --- /dev/null +++ b/lib/msgpack/decoder/internal.ex @@ -0,0 +1,244 @@ +defmodule Msgpack.Decoder.Internal do + @moduledoc false + + # The number of gregorian seconds from year 0 to the Unix epoch. This is a constant. + @epoch_offset :calendar.datetime_to_gregorian_seconds({{1970, 1, 1}, {0, 0, 0}}) + + # ==== Nil ==== + def decode(<<0xC0, rest::binary>>, _opts), do: {:ok, {nil, rest}} + + # ==== Boolean ==== + def decode(<<0xC3, rest::binary>>, _opts), do: {:ok, {true, rest}} + def decode(<<0xC2, rest::binary>>, _opts), do: {:ok, {false, rest}} + + # ==== Integers ==== + # ==== Positive Fixint ==== + def decode(<>, _opts) when int < 128 do + {:ok, {int, rest}} + end + + # ==== Negative Fixint ==== + def decode(<>, _opts) when int >= -32 and int < 0 do + {:ok, {int, rest}} + end + + # ==== Unsigned Integers ==== + def decode(<<0xCC, int::8, rest::binary>>, _opts), do: {:ok, {int, rest}} + def decode(<<0xCD, int::16, rest::binary>>, _opts), do: {:ok, {int, rest}} + def decode(<<0xCE, int::32, rest::binary>>, _opts), do: {:ok, {int, rest}} + def decode(<<0xCF, int::64, rest::binary>>, _opts), do: {:ok, {int, rest}} + + # ==== Signed Integers ==== + def decode(<<0xD0, int::signed-8, rest::binary>>, _opts), do: {:ok, {int, rest}} + def decode(<<0xD1, int::signed-16, rest::binary>>, _opts), do: {:ok, {int, rest}} + def decode(<<0xD2, int::signed-32, rest::binary>>, _opts), do: {:ok, {int, rest}} + def decode(<<0xD3, int::signed-64, rest::binary>>, _opts), do: {:ok, {int, rest}} + + # ==== Floats ==== + def decode(<<0xCA, float::float-32, rest::binary>>, _opts), do: {:ok, {float, rest}} + def decode(<<0xCB, float::float-64, rest::binary>>, _opts), do: {:ok, {float, rest}} + + # ==== Strings ==== + def decode(<>, opts) when prefix >= 0xA0 and prefix <= 0xBF do + size = prefix - 0xA0 + decode_string(rest, size, opts) + end + + def decode(<<0xD9, size::8, rest::binary>>, opts), do: decode_string(rest, size, opts) + def decode(<<0xDA, size::16, rest::binary>>, opts), do: decode_string(rest, size, opts) + def decode(<<0xDB, size::32, rest::binary>>, opts), do: decode_string(rest, size, opts) + + # ==== Raw Binary ==== + def decode(<<0xC4, size::8, rest::binary>>, opts), do: decode_binary(rest, size, opts) + def decode(<<0xC5, size::16, rest::binary>>, opts), do: decode_binary(rest, size, opts) + def decode(<<0xC6, size::32, rest::binary>>, opts), do: decode_binary(rest, size, opts) + + # ==== Arrays ==== + def decode(<>, opts) when prefix >= 0x90 and prefix <= 0x9F do + size = prefix - 0x90 + decode_array(rest, size, opts) + end + + def decode(<<0xDC, size::16, rest::binary>>, opts), do: decode_array(rest, size, opts) + def decode(<<0xDD, size::32, rest::binary>>, opts), do: decode_array(rest, size, opts) + + # ==== Maps ==== + def decode(<>, opts) when prefix >= 0x80 and prefix <= 0x8F do + size = prefix - 0x80 + decode_map(rest, size, opts) + end + + def decode(<<0xDE, size::16, rest::binary>>, opts), do: decode_map(rest, size, opts) + def decode(<<0xDF, size::32, rest::binary>>, opts), do: decode_map(rest, size, opts) + + # ==== Extensions & Timestamps ==== + # ==== Fixext ==== + def decode(<<0xD4, type::signed-8, data::binary-size(1), rest::binary>>, opts), + do: decode_ext(type, data, rest, opts) + + def decode(<<0xD5, type::signed-8, data::binary-size(2), rest::binary>>, opts), + do: decode_ext(type, data, rest, opts) + + def decode(<<0xD6, type::signed-8, data::binary-size(4), rest::binary>>, opts), + do: decode_ext(type, data, rest, opts) + + def decode(<<0xD7, type::signed-8, data::binary-size(8), rest::binary>>, opts), + do: decode_ext(type, data, rest, opts) + + def decode(<<0xD8, type::signed-8, data::binary-size(16), rest::binary>>, opts), + do: decode_ext(type, data, rest, opts) + + # ==== Ext ==== + def decode(<<0xC7, len::8, type::signed-8, data::binary-size(len), rest::binary>>, opts), + do: decode_ext(type, data, rest, opts) + + def decode(<<0xC8, len::16, type::signed-8, data::binary-size(len), rest::binary>>, opts), + do: decode_ext(type, data, rest, opts) + + def decode(<<0xC9, len::32, type::signed-8, data::binary-size(len), rest::binary>>, opts), + do: decode_ext(type, data, rest, opts) + + # ==== Unknown types ==== + def decode(<>, _opts) do + {:error, {:unknown_prefix, prefix}} + end + + def decode(<<>>, _opts) do + {:error, :unexpected_eof} + end + + # ==== Helpers ==== + def decode_string(binary, size, opts) do + if max_size = opts[:max_byte_size], do: check_byte_size(size, max_size) + + case binary do + <> -> + {:ok, {string, rest}} + + _ -> + {:error, :unexpected_eof} + end + end + + def decode_binary(binary, size, opts) do + if max_size = opts[:max_byte_size], do: check_byte_size(size, max_size) + + case binary do + <> -> + {:ok, {bin, rest}} + + _ -> + {:error, :unexpected_eof} + end + end + + def decode_array(binary, size, opts) do + depth = opts[:depth] || 0 + + check_depth(depth, opts[:max_depth]) + check_byte_size(size, opts[:max_byte_size]) + + new_opts = Keyword.put(opts, :depth, depth + 1) + + decode_many(binary, size, [], new_opts) + end + + def decode_map(binary, size, opts) do + depth = opts[:depth] || 0 + + check_depth(depth, opts[:max_depth]) + check_byte_size(size * 2, opts[:max_byte_size]) + + new_opts = Keyword.put(opts, :depth, depth + 1) + + with {:ok, {kv_pairs, rest}} <- decode_many(binary, size * 2, [], new_opts) do + map = + Enum.chunk_every(kv_pairs, 2) + |> Enum.map(&List.to_tuple/1) + |> Enum.into(%{}) + + {:ok, {map, rest}} + end + end + + # Recursively decodes `count` terms from the binary + def decode_many(binary, 0, acc, _opts) do + {:ok, {Enum.reverse(acc), binary}} + end + + def decode_many(binary, count, acc, opts) do + case decode(binary, opts) do + {:ok, {term, rest}} -> + decode_many(rest, count - 1, [term | acc], opts) + + {:error, reason} -> + {:error, reason} + end + end + + def decode_ext(-1, data, rest, _opts) do + {:ok, {decode_timestamp(data), rest}} + end + + def decode_ext(type, data, rest, _opts) do + {:ok, {%Msgpack.Ext{type: type, data: data}, rest}} + end + + # timestamp 32: 4 bytes (32-bit unsigned integer seconds) + def decode_timestamp(<>) do + gregorian_seconds = unix_seconds + @epoch_offset + erlang_datetime = :calendar.gregorian_seconds_to_datetime(gregorian_seconds) + NaiveDateTime.from_erl!(erlang_datetime) + end + + # timestamp 64: 8 bytes (30-bit nanoseconds + 34-bit seconds) + def decode_timestamp(<>) do + nanoseconds = :erlang.bsr(data, 34) + + if nanoseconds > 999_999_999 do + throw({:error, :invalid_timestamp}) + else + unix_seconds = :erlang.band(data, 0x00000003_FFFFFFFF) + gregorian_seconds = unix_seconds + @epoch_offset + erlang_datetime = :calendar.gregorian_seconds_to_datetime(gregorian_seconds) + base_datetime = NaiveDateTime.from_erl!(erlang_datetime) + + if nanoseconds > 0 do + microseconds = div(nanoseconds, 1000) + %{base_datetime | microsecond: {microseconds, 6}} + else + base_datetime + end + end + end + + # timestamp 96: 12 bytes (32-bit nanoseconds + 64-bit seconds) + def decode_timestamp(<>) do + if nanoseconds > 999_999_999 do + throw({:error, :invalid_timestamp}) + else + gregorian_seconds = unix_seconds + @epoch_offset + erlang_datetime = :calendar.gregorian_seconds_to_datetime(gregorian_seconds) + base_datetime = NaiveDateTime.from_erl!(erlang_datetime) + + if nanoseconds > 0 do + microseconds = div(nanoseconds, 1000) + %{base_datetime | microsecond: {microseconds, 6}} + else + base_datetime + end + end + end + + def check_byte_size(size, max_size) when size > max_size do + throw({:error, {:max_byte_size_exceeded, max_size}}) + end + + def check_byte_size(_size, _max_size), do: :ok + + def check_depth(depth, max_depth) when depth >= max_depth do + throw({:error, {:max_depth_reached, max_depth}}) + end + + def check_depth(_depth, _max_depth), do: :ok +end diff --git a/lib/msgpack/encoder.ex b/lib/msgpack/encoder.ex index 963e7b1..4d7a1a8 100644 --- a/lib/msgpack/encoder.ex +++ b/lib/msgpack/encoder.ex @@ -4,8 +4,19 @@ defmodule Msgpack.Encoder do """ @spec encode(term(), keyword()) :: {:ok, iodata()} | {:error, term()} - def encode(term, opts) do - do_encode(term, opts) + def encode(term, opts \\ []) do + merged_opts = Keyword.merge(default_opts(), opts) + do_encode(term, merged_opts) + end + + @doc """ + Returns a keyword list of the default options for the encoder. + """ + def default_opts() do + [ + atoms: :string, + string_validation: true + ] end # ==== Nil ==== diff --git a/lib/msgpack/stream_decoder.ex b/lib/msgpack/stream_decoder.ex new file mode 100644 index 0000000..0e92dcc --- /dev/null +++ b/lib/msgpack/stream_decoder.ex @@ -0,0 +1,122 @@ +defmodule Msgpack.StreamDecoder do + @moduledoc """ + Decodes a stream of MessagePack binaries into a stream of Elixir terms. + + This module is designed to handle large sequences of MessagePack objects that + arrive in chunks, such as from a network socket or a large file. + + It incrementally parses the incoming binaries and emits complete Elixir terms + as they are decoded. + + ## Capabilities + + * **Buffering:** The module internally buffers data, allowing a single + MessagePack object to be split across multiple chunks in the input stream. + * **Error Handling:** If the stream finishes while an object is only + partially decoded, the last element emitted by the stream will be the tuple + `{:error, :unexpected_eof}`. + + This module can be used together with `Msgpack.StreamEncoder` to create a lazy + serialization and deserialization pipeline. + """ + + alias Msgpack.Decoder + alias Msgpack.Decoder.Internal + + @typedoc "A stream that yields decoded Elixir terms." + @type t :: Stream.t() + + @typedoc "Options passed to the decoder." + @type opts_t :: keyword() + + @doc """ + Lazily decodes an enumerable of MessagePack binaries into a stream of Elixir + terms. + + ## Parameters + + * `enumerable`: An `Enumerable` that yields chunks of a MessagePack binary + stream (e.g., `File.stream/3` or a list of binaries). + * `opts`: A keyword list of options passed to the underlying decoder. + + ## Return Value + + Returns a lazy `Stream` that emits Elixir terms as they are decoded. + + If the input stream ends with incomplete data, the last item emitted will be + an error tuple `{:error, :unexpected_eof}`. + + ## Options + + This function accepts the same options as `Msgpack.decode/2`, which are + applied to the decoding of each object in the stream: + + * `:max_depth`: Sets a limit on the nesting level of arrays and maps. + Defaults to `100`. + * `:max_byte_size`: Sets a limit on the declared byte size of any single + string, binary, array, or map. + Defaults to `10_000_000` (10MB). + + ## Examples + + ### Standard Usage + + ```elixir + iex> objects = [1, "elixir", true] + iex> stream = Enum.map(objects, &Msgpack.encode!/1) + iex> Msgpack.StreamDecoder.decode(stream) |> Enum.to_list() + [1, "elixir", true] + ``` + + ### Handling Incomplete Streams + + ```elixir + iex> incomplete_stream = [<<0x91>>] # Array header + no elements + iex> Msgpack.StreamDecoder.decode(incomplete_stream) |> Enum.to_list() + [{:error, :unexpected_eof}] + ``` + """ + @spec decode(Enumerable.t(binary()), opts_t()) :: t() + def decode(enumerable, opts \\ []) do + start_fun = fn -> + merged_opts = Keyword.merge(Decoder.default_opts(), opts) + {<<>>, merged_opts} + end + + stream_with_eof = Stream.concat(enumerable, [:eof]) + transform_fun = &transform_chunk/2 + + Stream.transform(stream_with_eof, start_fun.(), transform_fun) + end + + @doc false + @spec transform_chunk(binary() | :eof, {binary(), opts_t()}) :: {list(), {binary(), opts_t() | nil}} + defp transform_chunk(:eof, {<<>>, _opts}) do + {[], {<<>>, nil}} + end + + defp transform_chunk(:eof, {buffer, _opts}) do + {[{:error, :unexpected_eof}], {buffer, nil}} + end + + defp transform_chunk(chunk, {buffer, opts}) do + {decoded_terms, leftover_buffer} = do_transform(buffer <> chunk, opts, []) + {decoded_terms, {leftover_buffer, opts}} + end + + @doc false + @spec do_transform(binary(), opts_t(), list()) :: {list(), binary()} + defp do_transform(<<>>, _opts, acc) do + {Enum.reverse(acc), <<>>} + end + + defp do_transform(buffer, opts, acc) do + case Internal.decode(buffer, opts) do + {:ok, {term, rest}} -> + do_transform(rest, opts, [term | acc]) + + {:error, _reason} -> + {Enum.reverse(acc), buffer} + end + end +end diff --git a/lib/msgpack/stream_encoder.ex b/lib/msgpack/stream_encoder.ex new file mode 100644 index 0000000..e419d10 --- /dev/null +++ b/lib/msgpack/stream_encoder.ex @@ -0,0 +1,73 @@ +defmodule Msgpack.StreamEncoder do + @moduledoc """ + Lazily encodes a stream of Elixir terms into a stream of MessagePack binaries. + + This module is the counterpart to `Msgpack.StreamDecoder`. It processes an + enumerable item by item, making it memory-efficient for encoding large + collections or infinite streams without loading the entire dataset into + memory. + + Each item in the output stream is a result tuple, either `{:ok, binary}` for + a successful encoding or `{:error, reason}` if an individual term could + not be encoded. + """ + + @typedoc "A stream that yields encoded MessagePack objects." + @type t :: Stream.t(result_t()) + + @typedoc "An individual result from the encoding stream." + @type result_t :: {:ok, binary()} | {:error, any()} + + @typedoc "Options passed to the encoder." + @type opts_t :: keyword() + + @doc """ + Lazily encodes an enumerable of Elixir terms into a stream of result tuples. + + ## Parameters + + * `enumerable`: An `Enumerable` that yields Elixir terms to be encoded. + * `opts`: A keyword list of options passed to the underlying encoder for each term. + + ## Return Value + + Returns a lazy `Stream` that emits result tuples. For each term in the + input enumerable, the stream will contain either: + * `{:ok, binary}` - On successful encoding. + * `{:error, reason}` - If the term cannot be encoded. + + ## Options + + This function accepts the same options as `Msgpack.encode/2`. See the + documentation for `Msgpack.encode/2` for a full list. + + ## Examples + + ### Standard Usage + + ```elixir + iex> terms = [1, "elixir"] + iex> Msgpack.StreamEncoder.encode(terms) |> Enum.to_list() + [ + {:ok, <<1>>}, + {:ok, <<166, 101, 108, 105, 120, 105, 114>>} + ] + ``` + + ### Handling Unencodable Terms + + ```elixir + iex> terms = [1, :elixir, 4] + iex> Msgpack.StreamEncoder.encode(terms, atoms: :error) |> Enum.to_list() + [ + {:ok, <<1>>}, + {:error, {:unsupported_atom, :elixir}}, + {:ok, <<4>>} + ] + ``` + """ + @spec encode(Enumerable.t(), opts_t()) :: t() + def encode(enumerable, opts \\ []) do + Stream.map(enumerable, &Msgpack.encode(&1, opts)) + end +end diff --git a/test/msgpack_test.exs b/test/msgpack_test.exs index d8e336a..59dbd70 100644 --- a/test/msgpack_test.exs +++ b/test/msgpack_test.exs @@ -323,6 +323,38 @@ defmodule MsgpackTest do end end + describe "Streaming" do + test "provides a lossless round trip for streaming encode and decode" do + terms = [1, "elixir", true, %{"key" => "value"}] + + result = + terms + |> Msgpack.encode_stream() + |> Stream.map(fn {:ok, binary} -> binary end) + |> Msgpack.decode_stream() + |> Enum.to_list() + + assert result == terms + end + + test "encode_stream handles unencodable terms gracefully" do + terms = [1, :elixir, 4] + + expected = [ + {:ok, <<1>>}, + {:ok, <<166, 101, 108, 105, 120, 105, 114>>}, + {:ok, <<4>>}, + ] + + result = + terms + |> Msgpack.encode_stream() + |> Enum.to_list() + + assert result == expected + end + end + # ==== Helpers ==== defp assert_encode(input, expected_binary) do diff --git a/test/stream_decoder_test.exs b/test/stream_decoder_test.exs new file mode 100644 index 0000000..4a33717 --- /dev/null +++ b/test/stream_decoder_test.exs @@ -0,0 +1,36 @@ +defmodule Msgpack.StreamDecoderTest do + use ExUnit.Case, async: true + + alias Msgpack.StreamDecoder + + test "decodes a stream of complete objects" do + terms = [1, "elixir", true, %{"a" => 1}] + input_stream = Enum.map(terms, &Msgpack.encode!/1) + + result = StreamDecoder.decode(input_stream) |> Enum.to_list() + + assert result == terms + end + + test "decodes a stream where objects cross chunk boundaries" do + terms = [123, "elixir", true, %{"a" => 1}] + single_binary = Enum.map_join(terms, &Msgpack.encode!/1) + <> = single_binary + input_stream = [chunk1, chunk2] + + result = StreamDecoder.decode(input_stream) |> Enum.to_list() + + assert result == terms + end + + test "returns an error when the stream ends with incomplete data" do + binary = Msgpack.encode!("a guaranteed incomplete string") + incomplete_binary = :binary.part(binary, 0, byte_size(binary) - 1) + input_stream = [incomplete_binary] + expected_result = [{:error, :unexpected_eof}] + + result = StreamDecoder.decode(input_stream) |> Enum.to_list() + + assert result == expected_result + end +end diff --git a/test/stream_encoder_test.exs b/test/stream_encoder_test.exs new file mode 100644 index 0000000..9dd77ea --- /dev/null +++ b/test/stream_encoder_test.exs @@ -0,0 +1,42 @@ +defmodule Msgpack.StreamEncoderTest do + use ExUnit.Case, async: true + + alias Msgpack.StreamEncoder + + test "encodes a stream of valid terms into ok-tuples" do + terms = [1, "elixir", true] + + expected_result = [ + {:ok, Msgpack.encode!(1)}, + {:ok, Msgpack.encode!("elixir")}, + {:ok, Msgpack.encode!(true)} + ] + + result = StreamEncoder.encode(terms) |> Enum.to_list() + + assert result == expected_result + end + + test "handles unencodable terms by emitting an error tuple" do + terms = [1, :water, 3] + opts = [atoms: :error] + + expected_result = [ + {:ok, Msgpack.encode!(1)}, + {:error, {:unsupported_atom, :water}}, + {:ok, Msgpack.encode!(3)} + ] + + result = StreamEncoder.encode(terms, opts) |> Enum.to_list() + + assert result == expected_result + end + + test "returns an empty list when given an empty stream" do + terms = [] + + result = StreamEncoder.encode(terms) |> Enum.to_list() + + assert result == [] + end +end diff --git a/test/streaming_round_trip_test.exs b/test/streaming_round_trip_test.exs new file mode 100644 index 0000000..efb01f0 --- /dev/null +++ b/test/streaming_round_trip_test.exs @@ -0,0 +1,54 @@ +defmodule Msgpack.StreamingRoundTripTest do + use ExUnit.Case, async: true + use ExUnitProperties + + alias Msgpack + alias Msgpack.StreamEncoder + alias Msgpack.StreamDecoder + + property "a round trip through the streaming API is lossless" do + check all(terms <- encodable_terms(), chunk_size <- StreamData.integer(1..20)) do + binaries = + StreamEncoder.encode(terms) + |> Stream.map(fn {:ok, binary} -> binary end) + + chunked_stream = + binaries + |> Enum.to_list() + |> IO.iodata_to_binary() + |> chunk_binary(chunk_size) + + decoded_terms = + StreamDecoder.decode(chunked_stream) + |> Enum.to_list() + + assert decoded_terms == terms + end + end + + defp encodable_terms() do + StreamData.list_of(encodable_term()) + end + + defp encodable_term do + StreamData.one_of([ + StreamData.integer(-1_000_000..1_000_000), + StreamData.string(:utf8, max_length: 50), + StreamData.boolean(), + StreamData.binary(max_length: 50), + StreamData.constant(nil) + ]) + end + + defp chunk_binary(binary, chunk_size) do + Stream.unfold(binary, fn + <<>> -> + nil + + remaining -> + size = min(chunk_size, byte_size(remaining)) + <> = remaining + {chunk, rest} + end) + end +end