Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
name: Elixir CI

on:
pull_request:
branches:
- main
- dev

jobs:
test:
name: Test (Elixir ${{ matrix.elixir }}, OTP ${{ matrix.otp }})
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
include:
- elixir: '1.18.4'
otp: '28.0'
version-type: 'strict'
- elixir: '1.12.0'
otp: '24.x'
version-type: 'loose'
steps:
- name: Checkout Repository
uses: actions/checkout@v4

- name: Setup Elixir
uses: erlef/setup-beam@v1
with:
elixir-version: ${{ matrix.elixir }}
otp-version: ${{ matrix.otp }}
version-type: ${{ matrix.version-type }}

- name: Install Dependencies
run: mix deps.get

- name: Run Tests
run: mix test
23 changes: 23 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,29 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [v1.1.0] - 2025-08-09

### Added

- Added a new Streaming API that processes data in chunks, reducing peak memory
usage when handling large datasets or network streams
- Introduced `Msgpack.encode_stream/2` to lazily encode a stream of Elixir
terms one by one
- Introduced `Msgpack.decode_stream/2` to lazily decode a stream of
MessagePack objects, capable of handling data that arrives in multiple
chunks
- Added CI workflow to run tests against supported Elixir versions

### Changed

- Updated minimum supported Elixir version to v1.12
- While the library may work with older versions, StreamData supports a
minimum of v1.12, so it would be missing the property tests

### Fixed

- Updated timestamp decoding to be backwards-compatible with Elixir v1.12

## [v1.0.2] - 2025-08-06

### Fixed
Expand Down
29 changes: 28 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ types.
limits to mitigate resource exhaustion from malformed or malicious payloads.
- **Telemetry Integration:** Emits standard `:telemetry` events for integration
with monitoring tools.
- **Streaming API:** Process large collections or continuous data streams with
low memory overhead using `Msgpack.encode_stream/2` and
`Msgpack.decode_stream/2`.

## Installation

Expand Down Expand Up @@ -50,6 +53,27 @@ iex> Msgpack.decode!(<<0xC1>>)
** (Msgpack.DecodeError) Unknown type prefix: 193. The byte `0xC1` is not a valid MessagePack type marker.
```

### Streaming Large Collections

For datasets that may not fit in memory, you can use the streaming API, which
processes one term at a time.

```elixir
# Create a lazy stream of terms to be encoded.
iex> terms = Stream.cycle([1, "elixir", true])

# The output is a lazy stream of encoded binaries.
iex> encoded_stream = Msgpack.encode_stream(terms)

# The stream is only consumed when you enumerate it.
iex> encoded_stream |> Stream.take(3) |> Enum.to_list()
[
{:ok, <<1>>},
{:ok, <<166, 101, 108, 105, 120, 105, 114>>},
{:ok, <<195>>}
]
```

## Full Documentation

For detailed information on all features, options, and functions, see the [full
Expand All @@ -62,7 +86,10 @@ This section explains how to setup the project locally for development.

### Dependencies

- Elixir `~> 1.7` (OTP 21+)
- Elixir `~> 1.12` (OTP 24+)
- See [Compatibility and
deprecations](https://hexdocs.pm/elixir/1.18.4/compatibility-and-deprecations.html)
for more information

### Get the Source

Expand Down
61 changes: 61 additions & 0 deletions lib/msgpack.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ defmodule Msgpack do

alias Msgpack.Encoder
alias Msgpack.Decoder
alias Msgpack.StreamEncoder
alias Msgpack.StreamDecoder
alias Msgpack.EncodeError
alias Msgpack.DecodeError

Expand Down Expand Up @@ -314,4 +316,63 @@ defmodule Msgpack do
raise DecodeError, reason: reason
end
end

@doc """
Encodes a stream of Elixir terms into a stream of MessagePack binaries.

Each term in the input enumerable is encoded individually. The output stream
will contain `{:ok, binary}` tuples for successful encodings or `{:error,
reason}` tuples for failures.

This function delegates to `Msgpack.StreamEncoder.encode/2`.

## Options

Accepts the same options as `Msgpack.encode/2`.

## Examples

```elixir
iex> terms = [1, "elixir", :world]
iex> Msgpack.encode_stream(terms, atoms: :string) |> Enum.to_list()
[
{:ok, <<1>>},
{:ok, <<166, 101, 108, 105, 120, 105, 114>>},
{:ok, <<165, 119, 111, 114, 108, 100>>}
]
```
"""
@spec encode_stream(Enumerable.t(), StreamEncoder.opts_t()) :: StreamEncoder.t()
def encode_stream(enumerable, opts \\ []) do
StreamEncoder.encode(enumerable, opts)
end

@doc """
Decodes a stream of MessagePack binaries into a stream of Elixir terms.

This function provides a streaming, lazy interface for decoding, making it
suitable for handling large datasets that do not fit into memory.

It delegates to `Msgpack.StreamDecoder.decode/2`.

For more detailed information on behavior, see the `Msgpack.StreamDecoder`
module documentation.

## Options

Accepts the same options as `Msgpack.decode/2`.

## Examples

```elixir
iex> objects = [1, "elixir", true]
iex> stream = Enum.map(objects, &Msgpack.encode!/1)
iex> Msgpack.decode_stream(stream) |> Enum.to_list()
[1, "elixir", true]
```
"""
@spec decode_stream(Enumerable.t(binary()), StreamDecoder.opts_t()) :: StreamDecoder.t()
def decode_stream(enumerable, opts \\ []) do
StreamDecoder.decode(enumerable, opts)
end
end
Loading