From 5bce93b1c0b7698af3f9f5b9b0bedc1008e06f98 Mon Sep 17 00:00:00 2001 From: Fennecs <116326048+StrongFennecs@users.noreply.github.com> Date: Tue, 2 Dec 2025 14:28:21 +1100 Subject: [PATCH 1/6] chore: elixir v1.19 compatible pattern matching --- .tool-versions | 4 +-- examples/scripting/client.exs | 7 ++-- lib/slipstream.ex | 10 +++--- lib/slipstream/callback.ex | 2 +- lib/slipstream/connection.ex | 2 +- lib/slipstream/connection/impl.ex | 4 +-- lib/slipstream/connection/pipeline.ex | 49 ++++++++++++++------------- lib/slipstream/connection/state.ex | 12 +++---- lib/slipstream/socket.ex | 19 ++++++----- lib/slipstream/telemetry_helper.ex | 11 +++--- mix.exs | 27 +++++++++------ 11 files changed, 80 insertions(+), 67 deletions(-) diff --git a/.tool-versions b/.tool-versions index 9c71c0d..078af8b 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,2 +1,2 @@ -elixir 1.17.3-otp-27 -erlang 27.1 +elixir 1.19.4-otp-28 +erlang 28.1 diff --git a/examples/scripting/client.exs b/examples/scripting/client.exs index 0e7bc17..027848c 100644 --- a/examples/scripting/client.exs +++ b/examples/scripting/client.exs @@ -1,18 +1,19 @@ import Slipstream import Slipstream.Socket -socket = connect!(uri: "ws://localhost:4000/socket/websocket") |> await_connect! +socket = + connect!(uri: "ws://localhost:4000/socket/websocket") |> await_connect!() topic = "rooms:lobby" socket = join(socket, topic, %{"fizz" => "buzz"}) |> await_join!(topic) _ref = push!(socket, topic, "quicksand", %{"a" => "b"}) -{:ok, %{"pong" => "pong"}} = push!(socket, topic, "ping", %{}) |> await_reply! +{:ok, %{"pong" => "pong"}} = push!(socket, topic, "ping", %{}) |> await_reply!() push!(socket, topic, "push to me", %{}) await_message!(^topic, "foo", _payload) socket = leave(socket, topic) |> await_leave!(topic) -socket |> disconnect() |> await_disconnect! +socket |> disconnect() |> await_disconnect!() diff --git a/lib/slipstream.ex b/lib/slipstream.ex index 1a3a680..bc416fa 100644 --- a/lib/slipstream.ex +++ b/lib/slipstream.ex @@ -802,14 +802,14 @@ defmodule Slipstream do {:ok, Socket.t()} | {:error, NimbleOptions.ValidationError.t()} @spec connect(socket :: Socket.t(), opts :: Keyword.t()) :: {:ok, Socket.t()} | {:error, NimbleOptions.ValidationError.t()} - def connect(socket \\ new_socket(), opts) do + def connect(%Socket{} = socket \\ new_socket(), opts) do case Slipstream.Configuration.validate(opts) do {:ok, config} -> socket = TelemetryHelper.begin_connect(socket, config) route_command %Commands.OpenConnection{config: config, socket: socket} - {:ok, %Socket{socket | channel_config: config}} + {:ok, %{socket | channel_config: config}} {:error, reason} -> {:error, reason} @@ -826,13 +826,13 @@ defmodule Slipstream do @doc since: "0.1.0" @spec connect!(opts :: Keyword.t()) :: Socket.t() @spec connect!(socket :: Socket.t(), opts :: Keyword.t()) :: Socket.t() - def connect!(socket \\ new_socket(), opts) do + def connect!(%Socket{} = socket \\ new_socket(), opts) do config = Slipstream.Configuration.validate!(opts) - socket = TelemetryHelper.begin_connect(socket, config) + %Socket{} = socket = TelemetryHelper.begin_connect(socket, config) route_command %Commands.OpenConnection{config: config, socket: socket} - %Socket{socket | channel_config: config} + %{socket | channel_config: config} end @doc """ diff --git a/lib/slipstream/callback.ex b/lib/slipstream/callback.ex index 052d26c..aa86635 100644 --- a/lib/slipstream/callback.ex +++ b/lib/slipstream/callback.ex @@ -58,7 +58,7 @@ defmodule Slipstream.Callback do # note that `args` needs to be a compile-time list for this to work arity = length(args) + 1 - unless {name, arity} in @known_callbacks do + if {name, arity} not in @known_callbacks do raise CompileError, file: __CALLER__.file, line: __CALLER__.line, diff --git a/lib/slipstream/connection.ex b/lib/slipstream/connection.ex index 14769dc..fe3d250 100644 --- a/lib/slipstream/connection.ex +++ b/lib/slipstream/connection.ex @@ -22,7 +22,7 @@ defmodule Slipstream.Connection do metadata = Telemetry.begin(initial_state) - state = %State{initial_state | metadata: metadata} + %State{} = state = %{initial_state | metadata: metadata} {:ok, state, {:continue, :connect}} end diff --git a/lib/slipstream/connection/impl.ex b/lib/slipstream/connection/impl.ex index 99be94e..a6c00d1 100644 --- a/lib/slipstream/connection/impl.ex +++ b/lib/slipstream/connection/impl.ex @@ -48,7 +48,7 @@ defmodule Slipstream.Connection.Impl do # --- - def push_message(frame, state) when is_tuple(frame) do + def push_message(frame, %State{} = state) when is_tuple(frame) do case Mint.WebSocket.encode(state.websocket, frame) do {:ok, websocket, data} -> case Mint.WebSocket.stream_request_body( @@ -57,7 +57,7 @@ defmodule Slipstream.Connection.Impl do data ) do {:ok, conn} -> - {:ok, %State{state | conn: conn, websocket: websocket}} + {:ok, %{state | conn: conn, websocket: websocket}} # coveralls-ignore-start {:error, conn, reason} -> diff --git a/lib/slipstream/connection/pipeline.ex b/lib/slipstream/connection/pipeline.ex index e89a5a9..5ade97e 100644 --- a/lib/slipstream/connection/pipeline.ex +++ b/lib/slipstream/connection/pipeline.ex @@ -52,7 +52,7 @@ defmodule Slipstream.Connection.Pipeline do defp decode_message( %{ raw_message: {:DOWN, ref, :process, _pid, reason}, - state: %{client_ref: ref} + state: %State{client_ref: ref} } = p ) do put_message(p, event(%Events.ParentProcessExited{reason: reason})) @@ -64,7 +64,7 @@ defmodule Slipstream.Connection.Pipeline do {:status, ref, status}, {:headers, ref, headers} | _maybe_done ], - state: %{request_ref: ref} = state + state: %State{request_ref: ref} = state } = p ) do case Mint.WebSocket.new(state.conn, ref, status, headers) do @@ -77,7 +77,7 @@ defmodule Slipstream.Connection.Pipeline do response_headers: headers }) ) - |> put_state(%State{p.state | conn: conn, websocket: websocket}) + |> put_state(%{p.state | conn: conn, websocket: websocket}) {:error, _conn, reason} -> failure_info = %{ @@ -140,7 +140,7 @@ defmodule Slipstream.Connection.Pipeline do case Mint.WebSocket.stream(p.state.conn, p.raw_message) do {:ok, conn, messages} -> put_in(p.raw_message, messages) - |> put_state(%State{p.state | conn: conn}) + |> put_state(%{p.state | conn: conn}) |> decode_message() {:error, conn, %Mint.TransportError{reason: :closed}, _} -> @@ -173,11 +173,11 @@ defmodule Slipstream.Connection.Pipeline do @spec handle_message(t()) :: t() defp handle_message( - %{message: :connect, state: %{config: config} = state} = p + %{message: :connect, state: %State{config: config} = state} = p ) do with {:ok, conn} <- Impl.http_connect(config), {:ok, conn, ref} <- Impl.websocket_upgrade(conn, config) do - put_state(p, %State{state | conn: conn, request_ref: ref}) + put_state(p, %{state | conn: conn, request_ref: ref}) else # coveralls-ignore-start {:error, conn, reason} -> @@ -273,7 +273,7 @@ defmodule Slipstream.Connection.Pipeline do %{message: command(%Commands.JoinTopic{} = cmd), state: state} = p ) do {ref, state} = State.next_ref(state) - state = %State{state | joins: Map.put(state.joins, cmd.topic, ref)} + %State{} = state = %{state | joins: Map.put(state.joins, cmd.topic, ref)} p |> put_state(state) @@ -290,7 +290,7 @@ defmodule Slipstream.Connection.Pipeline do %{message: command(%Commands.LeaveTopic{} = cmd), state: state} = p ) do {ref, state} = State.next_ref(state) - state = %State{state | leaves: Map.put(state.leaves, cmd.topic, ref)} + %State{} = state = %{state | leaves: Map.put(state.leaves, cmd.topic, ref)} p |> put_state(state) @@ -353,7 +353,7 @@ defmodule Slipstream.Connection.Pipeline do defp handle_message(%{message: event(%type{} = event), state: state} = p) when type in [Events.TopicJoinFailed, Events.TopicJoinClosed] do - state = %State{state | joins: Map.delete(state.joins, event.topic)} + %State{} = state = %{state | joins: Map.delete(state.joins, event.topic)} route_event state, event @@ -364,7 +364,7 @@ defmodule Slipstream.Connection.Pipeline do %{message: event(%Events.TopicLeaveAccepted{} = event), state: state} = p ) do - state = %State{state | leaves: Map.delete(state.leaves, event.topic)} + %State{} = state = %{state | leaves: Map.delete(state.leaves, event.topic)} put_state(p, state) end @@ -391,8 +391,9 @@ defmodule Slipstream.Connection.Pipeline do tref end - state = - %State{state | status: :connected, heartbeat_timer: timer} + %State{} = + state = + %{state | status: :connected, heartbeat_timer: timer} |> State.reset_heartbeat() route_event state, event @@ -410,7 +411,7 @@ defmodule Slipstream.Connection.Pipeline do :timer.cancel(state.heartbeat_timer) end - state = %State{state | status: :terminating} + %State{} = state = %{state | status: :terminating} route_event state, event @@ -450,15 +451,15 @@ defmodule Slipstream.Connection.Pipeline do defp default_return(p), do: p @spec build_events(t()) :: t() - defp build_events(%{events: []} = p), do: p + defp build_events(%__MODULE__{events: []} = p), do: p - defp build_events(%{events: events} = p) do + defp build_events(%__MODULE__{events: events} = p) do built_events = Enum.map(events, fn %{type: type, attrs: attrs} -> build_event(type, attrs) end) - %__MODULE__{p | built_events: built_events} + %{p | built_events: built_events} end defp emit_events(%{built_events: []} = p), do: p @@ -482,13 +483,13 @@ defmodule Slipstream.Connection.Pipeline do # --- token API @spec put_state(t(), State.t()) :: t() - def put_state(p, state) do - %__MODULE__{p | state: state} + def put_state(%__MODULE__{} = p, state) do + %{p | state: state} end @spec put_message(t(), term()) :: t() - def put_message(p, message) do - %__MODULE__{p | message: message} + def put_message(%__MODULE__{} = p, message) do + %{p | message: message} end @doc """ @@ -498,8 +499,8 @@ defmodule Slipstream.Connection.Pipeline do build the event in the `build_events/1` phase of the pipeline """ @spec put_event(t(), atom(), Keyword.t() | map()) :: t() - def put_event(p, event, attrs \\ %{}) do - %__MODULE__{ + def put_event(%__MODULE__{} = p, event, attrs \\ %{}) do + %{ p | events: p.events ++ [%{type: event, attrs: Enum.into(attrs, %{})}] } @@ -511,8 +512,8 @@ defmodule Slipstream.Connection.Pipeline do This value will be given to the GenServer callback that invoked """ @spec put_return(t(), term()) :: t() - def put_return(p, return) do - %__MODULE__{p | return: return} + def put_return(%__MODULE__{} = p, return) do + %{p | return: return} end def push_message(p, message) do diff --git a/lib/slipstream/connection/state.ex b/lib/slipstream/connection/state.ex index 0d45e07..7d7b140 100644 --- a/lib/slipstream/connection/state.ex +++ b/lib/slipstream/connection/state.ex @@ -61,18 +61,18 @@ defmodule Slipstream.Connection.State do Refs are simply strings of incrementing integers. """ - def next_ref(state) do + def next_ref(%__MODULE__{} = state) do ref = state.current_ref + 1 {to_string(ref), - %__MODULE__{state | current_ref: ref, current_ref_str: to_string(ref)}} + %{state | current_ref: ref, current_ref_str: to_string(ref)}} end # coveralls-ignore-start - def next_heartbeat_ref(state) do + def next_heartbeat_ref(%__MODULE__{} = state) do {ref, state} = next_ref(state) - %__MODULE__{state | heartbeat_ref: ref} + %{state | heartbeat_ref: ref} end # coveralls-ignore-stop @@ -85,8 +85,8 @@ defmodule Slipstream.Connection.State do in state is nil, that means we have not received a reply to our heartbeat request and that the server is potentially stuck or otherwise not responding. """ - def reset_heartbeat(state) do - %__MODULE__{state | heartbeat_ref: nil} + def reset_heartbeat(%__MODULE{} = state) do + %{state | heartbeat_ref: nil} end @doc """ diff --git a/lib/slipstream/socket.ex b/lib/slipstream/socket.ex index 0b86231..532a6af 100644 --- a/lib/slipstream/socket.ex +++ b/lib/slipstream/socket.ex @@ -255,10 +255,10 @@ defmodule Slipstream.Socket do @spec apply_event(t(), struct()) :: t() def apply_event(socket, event) - def apply_event(socket, %Events.ChannelConnected{} = event) do + def apply_event(%__MODULE__{} = socket, %Events.ChannelConnected{} = event) do socket = TelemetryHelper.conclude_connect(socket, event) - %__MODULE__{ + %{ socket | channel_pid: event.pid, channel_config: event.config || socket.channel_config, @@ -266,14 +266,17 @@ defmodule Slipstream.Socket do } end - def apply_event(socket, %Events.TopicJoinSucceeded{topic: topic} = event) do + def apply_event( + %__MODULE__{} = socket, + %Events.TopicJoinSucceeded{topic: topic} = event + ) do socket |> TelemetryHelper.conclude_join(event) |> put_in([Access.key(:joins), topic, Access.key(:status)], :joined) |> put_in([Access.key(:joins), topic, Access.key(:rejoin_counter)], 0) end - def apply_event(socket, %event{topic: topic}) + def apply_event(%__MODULE__{} = socket, %event{topic: topic}) when event in [ Events.TopicLeft, Events.TopicJoinFailed, @@ -282,13 +285,13 @@ defmodule Slipstream.Socket do put_in(socket, [Access.key(:joins), topic, Access.key(:status)], :closed) end - def apply_event(socket, %Events.ChannelClosed{}) do - %__MODULE__{ + def apply_event(%__MODULE__{} = socket, %Events.ChannelClosed{}) do + %{ socket | channel_pid: nil, joins: - Enum.into(socket.joins, %{}, fn {topic, join} -> - {topic, %Join{join | status: :closed}} + Enum.into(socket.joins, %{}, fn {topic, %Join{} = join} -> + {topic, %{join | status: :closed}} end) } end diff --git a/lib/slipstream/telemetry_helper.ex b/lib/slipstream/telemetry_helper.ex index 18e4d34..605955e 100644 --- a/lib/slipstream/telemetry_helper.ex +++ b/lib/slipstream/telemetry_helper.ex @@ -49,7 +49,10 @@ defmodule Slipstream.TelemetryHelper do """ @doc since: "0.4.0" @spec conclude_connect(Socket.t(), Events.ChannelConnected.t()) :: Socket.t() - def conclude_connect(%{metadata: %{connect: start_metadata}} = socket, event) + def conclude_connect( + %Socket{metadata: %{connect: start_metadata}} = socket, + event + ) when is_map(start_metadata) and map_size(start_metadata) > 0 do metadata = start_metadata @@ -67,7 +70,7 @@ defmodule Slipstream.TelemetryHelper do metadata ) - %Socket{socket | metadata: Map.delete(socket.metadata, :connect)} + %{socket | metadata: Map.delete(socket.metadata, :connect)} end # technically speaking this case doesn't make any sense... you need to connect @@ -186,11 +189,11 @@ defmodule Slipstream.TelemetryHelper do return_value end - defp clean_socket(socket) do + defp clean_socket(%Slipstream.Socket{} = socket) do # Clear metadata from the socket. The socket contains the metadata and # the metadata contains the socket so if we repeatedly "begin" operations # like connects or joins, we cause sharp memory growth in the client and # connection processes. - %Slipstream.Socket{socket | metadata: %{}} + %{socket | metadata: %{}} end end diff --git a/mix.exs b/mix.exs index 6c2d622..eff3649 100644 --- a/mix.exs +++ b/mix.exs @@ -22,17 +22,6 @@ defmodule Slipstream.MixProject do elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, deps: deps(), - preferred_cli_env: [ - credo: :test, - coveralls: :test, - "coveralls.html": :test, - "coveralls.github": :test, - inch: :dev, - bless: :test, - test: :test, - dialyzer: :test, - docs: :docs - ], test_coverage: [tool: ExCoveralls], package: package(), description: description(), @@ -46,6 +35,22 @@ defmodule Slipstream.MixProject do ] end + def cli do + [ + preferred_envs: [ + credo: :test, + coveralls: :test, + "coveralls.html": :test, + "coveralls.github": :test, + inch: :dev, + bless: :test, + test: :test, + dialyzer: :test, + docs: :docs + ] + ] + end + defp elixirc_paths(env) when env in [:test, :dev], do: ["lib", "test/support", "test/fixtures"] From 4b47857d277a4912b8ff8ca92cccf5227f4a7d7c Mon Sep 17 00:00:00 2001 From: Fennecs <116326048+StrongFennecs@users.noreply.github.com> Date: Tue, 2 Dec 2025 14:33:33 +1100 Subject: [PATCH 2/6] fix: test anon function --- lib/slipstream/connection/state.ex | 2 +- mix.lock | 4 ++-- test/slipstream/examples/rejoin_on_reconnect_test.exs | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/slipstream/connection/state.ex b/lib/slipstream/connection/state.ex index 7d7b140..a505b4b 100644 --- a/lib/slipstream/connection/state.ex +++ b/lib/slipstream/connection/state.ex @@ -85,7 +85,7 @@ defmodule Slipstream.Connection.State do in state is nil, that means we have not received a reply to our heartbeat request and that the server is potentially stuck or otherwise not responding. """ - def reset_heartbeat(%__MODULE{} = state) do + def reset_heartbeat(%__MODULE__{} = state) do %{state | heartbeat_ref: nil} end diff --git a/mix.lock b/mix.lock index 718a5dd..2694ad2 100644 --- a/mix.lock +++ b/mix.lock @@ -4,13 +4,13 @@ "cowboy": {:hex, :cowboy, "2.14.2", "4008be1df6ade45e4f2a4e9e2d22b36d0b5aba4e20b0a0d7049e28d124e34847", [:make, :rebar3], [{:cowlib, ">= 2.16.0 and < 3.0.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, ">= 1.8.0 and < 3.0.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "569081da046e7b41b5df36aa359be71a0c8874e5b9cff6f747073fc57baf1ab9"}, "cowboy_telemetry": {:hex, :cowboy_telemetry, "0.4.0", "f239f68b588efa7707abce16a84d0d2acf3a0f50571f8bb7f56a15865aae820c", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d98bac1ee4565d31b62d59f8823dfd8356a169e7fcbb83831b8a5397404c9de"}, "cowlib": {:hex, :cowlib, "2.16.0", "54592074ebbbb92ee4746c8a8846e5605052f29309d3a873468d76cdf932076f", [:make, :rebar3], [], "hexpm", "7f478d80d66b747344f0ea7708c187645cfcc08b11aa424632f78e25bf05db51"}, - "credo": {:hex, :credo, "1.7.7", "771445037228f763f9b2afd612b6aa2fd8e28432a95dbbc60d8e03ce71ba4446", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "8bc87496c9aaacdc3f90f01b7b0582467b69b4bd2441fe8aae3109d843cc2f2e"}, + "credo": {:hex, :credo, "1.7.14", "c7e75216cea8d978ba8c60ed9dede4cc79a1c99a266c34b3600dd2c33b96bc92", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "12a97d6bb98c277e4fb1dff45aaf5c137287416009d214fb46e68147bd9e0203"}, "dialyxir": {:hex, :dialyxir, "1.4.3", "edd0124f358f0b9e95bfe53a9fcf806d615d8f838e2202a9f430d59566b6b53b", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "bf2cfb75cd5c5006bec30141b131663299c661a864ec7fbbc72dfa557487a986"}, "earmark_parser": {:hex, :earmark_parser, "1.4.41", "ab34711c9dc6212dda44fcd20ecb87ac3f3fce6f0ca2f28d4a00e4154f8cd599", [:mix], [], "hexpm", "a81a04c7e34b6617c2792e291b5a2e57ab316365c2644ddc553bb9ed863ebefa"}, "erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"}, "ex_doc": {:hex, :ex_doc, "0.34.2", "13eedf3844ccdce25cfd837b99bea9ad92c4e511233199440488d217c92571e8", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "5ce5f16b41208a50106afed3de6a2ed34f4acfd65715b82a0b84b49d995f95c1"}, "excoveralls": {:hex, :excoveralls, "0.18.3", "bca47a24d69a3179951f51f1db6d3ed63bca9017f476fe520eb78602d45f7756", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "746f404fcd09d5029f1b211739afb8fb8575d775b21f6a3908e7ce3e640724c6"}, - "file_system": {:hex, :file_system, "1.0.1", "79e8ceaddb0416f8b8cd02a0127bdbababe7bf4a23d2a395b983c1f8b3f73edd", [:mix], [], "hexpm", "4414d1f38863ddf9120720cd976fce5bdde8e91d8283353f0e31850fa89feb9e"}, + "file_system": {:hex, :file_system, "1.1.1", "31864f4685b0148f25bd3fbef2b1228457c0c89024ad67f7a81a3ffbc0bbad3a", [:mix], [], "hexpm", "7a15ff97dfe526aeefb090a7a9d3d03aa907e100e262a0f8f7746b78f8f87a5d"}, "hpax": {:hex, :hpax, "1.0.0", "28dcf54509fe2152a3d040e4e3df5b265dcb6cb532029ecbacf4ce52caea3fd2", [:mix], [], "hexpm", "7f1314731d711e2ca5fdc7fd361296593fc2542570b3105595bb0bc6d0fad601"}, "inch_ex": {:hex, :inch_ex, "2.1.0-rc.1", "7642a8902c0d2ed5d9b5754b2fc88fedf630500d630fc03db7caca2e92dedb36", [:mix], [{:bunt, "~> 0.2", [hex: :bunt, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "4ceee988760f9382d1c1d0b93ea5875727f6071693e89a0a3c49c456ef1be75d"}, "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, diff --git a/test/slipstream/examples/rejoin_on_reconnect_test.exs b/test/slipstream/examples/rejoin_on_reconnect_test.exs index ece273f..74954bb 100644 --- a/test/slipstream/examples/rejoin_on_reconnect_test.exs +++ b/test/slipstream/examples/rejoin_on_reconnect_test.exs @@ -13,7 +13,7 @@ defmodule Slipstream.RejoinOnReconnectTest do setup c do accept_connect(c.client) - Enum.each(c.topics, &c.client.join/1) + Enum.each(c.topics, fn topic -> c.client.join(topic) end) assert_join "foo", %{}, :ok assert_join "bar", %{}, :ok From a62d3a33bd69ddd89979bea96bf053834af326b2 Mon Sep 17 00:00:00 2001 From: Fennecs <116326048+StrongFennecs@users.noreply.github.com> Date: Tue, 2 Dec 2025 14:35:55 +1100 Subject: [PATCH 3/6] chore: revert to default tools --- .tool-versions | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.tool-versions b/.tool-versions index 078af8b..9c71c0d 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,2 +1,2 @@ -elixir 1.19.4-otp-28 -erlang 28.1 +elixir 1.17.3-otp-27 +erlang 27.1 From 93bb15d547df8d5542246778114b7ca72697e5eb Mon Sep 17 00:00:00 2001 From: Fennecs <116326048+StrongFennecs@users.noreply.github.com> Date: Tue, 2 Dec 2025 14:55:07 +1100 Subject: [PATCH 4/6] chore: revert to default tools --- .tool-versions | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.tool-versions b/.tool-versions index 9c71c0d..6b09b2f 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,2 +1,2 @@ -elixir 1.17.3-otp-27 +elixir 1.17.3 erlang 27.1 From 20542fae30ba154f702e55a439b0a4d13e4658a0 Mon Sep 17 00:00:00 2001 From: Fennecs <116326048+StrongFennecs@users.noreply.github.com> Date: Tue, 2 Dec 2025 14:56:08 +1100 Subject: [PATCH 5/6] chore: revert to default tools --- .tool-versions | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.tool-versions b/.tool-versions index 6b09b2f..9c71c0d 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,2 +1,2 @@ -elixir 1.17.3 +elixir 1.17.3-otp-27 erlang 27.1 From e612aab4beb7f3ed8e5b349d6fd3491be866a8b6 Mon Sep 17 00:00:00 2001 From: Fennecs <116326048+StrongFennecs@users.noreply.github.com> Date: Fri, 5 Dec 2025 11:16:29 +1100 Subject: [PATCH 6/6] feat: rm binary oops ;; --- lib/slipstream.ex | 4 +- lib/slipstream/connection.ex | 5 +-- lib/slipstream/connection/pipeline.ex | 58 ++++++++++++++++++--------- 3 files changed, 44 insertions(+), 23 deletions(-) diff --git a/lib/slipstream.ex b/lib/slipstream.ex index bc416fa..114c449 100644 --- a/lib/slipstream.ex +++ b/lib/slipstream.ex @@ -828,7 +828,7 @@ defmodule Slipstream do @spec connect!(socket :: Socket.t(), opts :: Keyword.t()) :: Socket.t() def connect!(%Socket{} = socket \\ new_socket(), opts) do config = Slipstream.Configuration.validate!(opts) - %Socket{} = socket = TelemetryHelper.begin_connect(socket, config) + socket = TelemetryHelper.begin_connect(socket, config) route_command %Commands.OpenConnection{config: config, socket: socket} @@ -869,7 +869,7 @@ defmodule Slipstream do @doc since: "0.1.0" @spec reconnect(socket :: Socket.t()) :: {:ok, Socket.t()} | {:error, :no_config | :not_connected} - def reconnect(socket) do + def reconnect(%Socket{} = socket) do with false <- Socket.connected?(socket), %Slipstream.Configuration{} = config <- socket.channel_config, {time, socket} <- Socket.next_reconnect_time(socket) do diff --git a/lib/slipstream/connection.ex b/lib/slipstream/connection.ex index fe3d250..003bc33 100644 --- a/lib/slipstream/connection.ex +++ b/lib/slipstream/connection.ex @@ -18,11 +18,10 @@ defmodule Slipstream.Connection do socket: %Slipstream.Socket{socket_pid: socket_pid}, config: config }) do - initial_state = State.new(config, socket_pid) + %State{} = initial_state = State.new(config, socket_pid) metadata = Telemetry.begin(initial_state) - - %State{} = state = %{initial_state | metadata: metadata} + state = %{initial_state | metadata: metadata} {:ok, state, {:continue, :connect}} end diff --git a/lib/slipstream/connection/pipeline.ex b/lib/slipstream/connection/pipeline.ex index 5ade97e..0dd8870 100644 --- a/lib/slipstream/connection/pipeline.ex +++ b/lib/slipstream/connection/pipeline.ex @@ -270,10 +270,13 @@ defmodule Slipstream.Connection.Pipeline do end defp handle_message( - %{message: command(%Commands.JoinTopic{} = cmd), state: state} = p + %{ + message: command(%Commands.JoinTopic{} = cmd), + state: %State{} = state + } = p ) do {ref, state} = State.next_ref(state) - %State{} = state = %{state | joins: Map.put(state.joins, cmd.topic, ref)} + state = %{state | joins: Map.put(state.joins, cmd.topic, ref)} p |> put_state(state) @@ -287,10 +290,13 @@ defmodule Slipstream.Connection.Pipeline do end defp handle_message( - %{message: command(%Commands.LeaveTopic{} = cmd), state: state} = p + %{ + message: command(%Commands.LeaveTopic{} = cmd), + state: %State{} = state + } = p ) do {ref, state} = State.next_ref(state) - %State{} = state = %{state | leaves: Map.put(state.leaves, cmd.topic, ref)} + state = %{state | leaves: Map.put(state.leaves, cmd.topic, ref)} p |> put_state(state) @@ -304,7 +310,10 @@ defmodule Slipstream.Connection.Pipeline do end defp handle_message( - %{message: command(%Commands.CloseConnection{}), state: state} = p + %{ + message: command(%Commands.CloseConnection{}), + state: %State{} = state + } = p ) do Mint.HTTP.close(state.conn) @@ -322,7 +331,7 @@ defmodule Slipstream.Connection.Pipeline do defp handle_message( %{ message: event(%Events.ParentProcessExited{reason: reason}), - state: state + state: %State{} = state } = p ) do Mint.HTTP.close(state.conn) @@ -333,7 +342,7 @@ defmodule Slipstream.Connection.Pipeline do defp handle_message( %{ message: event(%Events.ChannelConnectFailed{} = event), - state: state + state: %State{} = state } = p ) do Mint.HTTP.close(state.conn) @@ -351,9 +360,11 @@ defmodule Slipstream.Connection.Pipeline do defp handle_message(%{message: event(%Events.PongReceived{})} = p), do: p # coveralls-ignore-stop - defp handle_message(%{message: event(%type{} = event), state: state} = p) + defp handle_message( + %{message: event(%type{} = event), state: %State{} = state} = p + ) when type in [Events.TopicJoinFailed, Events.TopicJoinClosed] do - %State{} = state = %{state | joins: Map.delete(state.joins, event.topic)} + state = %{state | joins: Map.delete(state.joins, event.topic)} route_event state, event @@ -361,16 +372,22 @@ defmodule Slipstream.Connection.Pipeline do end defp handle_message( - %{message: event(%Events.TopicLeaveAccepted{} = event), state: state} = + %{ + message: event(%Events.TopicLeaveAccepted{} = event), + state: %State{} = state + } = p ) do - %State{} = state = %{state | leaves: Map.delete(state.leaves, event.topic)} + state = %{state | leaves: Map.delete(state.leaves, event.topic)} put_state(p, state) end defp handle_message( - %{message: event(%Events.HeartbeatAcknowledged{}), state: state} = p + %{ + message: event(%Events.HeartbeatAcknowledged{}), + state: %State{} = state + } = p ) do # coveralls-ignore-start put_state(p, State.reset_heartbeat(state)) @@ -378,7 +395,10 @@ defmodule Slipstream.Connection.Pipeline do end defp handle_message( - %{message: event(%Events.ChannelConnected{} = event), state: state} = p + %{ + message: event(%Events.ChannelConnected{} = event), + state: %State{} = state + } = p ) do timer = if state.config.heartbeat_interval_msec != 0 do @@ -391,8 +411,7 @@ defmodule Slipstream.Connection.Pipeline do tref end - %State{} = - state = + state = %{state | status: :connected, heartbeat_timer: timer} |> State.reset_heartbeat() @@ -403,7 +422,10 @@ defmodule Slipstream.Connection.Pipeline do # coveralls-ignore-start defp handle_message( - %{message: event(%Events.ChannelClosed{} = event), state: state} = p + %{ + message: event(%Events.ChannelClosed{} = event), + state: %State{} = state + } = p ) do Mint.HTTP.close(state.conn) @@ -411,7 +433,7 @@ defmodule Slipstream.Connection.Pipeline do :timer.cancel(state.heartbeat_timer) end - %State{} = state = %{state | status: :terminating} + state = %{state | status: :terminating} route_event state, event @@ -483,7 +505,7 @@ defmodule Slipstream.Connection.Pipeline do # --- token API @spec put_state(t(), State.t()) :: t() - def put_state(%__MODULE__{} = p, state) do + def put_state(%__MODULE__{} = p, %State{} = state) do %{p | state: state} end