Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions examples/scripting/client.exs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
import Slipstream
import Slipstream.Socket

socket = connect!(uri: "ws://localhost:4000/socket/websocket") |> await_connect!
socket =
connect!(uri: "ws://localhost:4000/socket/websocket") |> await_connect!()

topic = "rooms:lobby"
socket = join(socket, topic, %{"fizz" => "buzz"}) |> await_join!(topic)

_ref = push!(socket, topic, "quicksand", %{"a" => "b"})

{:ok, %{"pong" => "pong"}} = push!(socket, topic, "ping", %{}) |> await_reply!
{:ok, %{"pong" => "pong"}} = push!(socket, topic, "ping", %{}) |> await_reply!()

push!(socket, topic, "push to me", %{})

await_message!(^topic, "foo", _payload)

socket = leave(socket, topic) |> await_leave!(topic)
socket |> disconnect() |> await_disconnect!
socket |> disconnect() |> await_disconnect!()
10 changes: 5 additions & 5 deletions lib/slipstream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -802,14 +802,14 @@ defmodule Slipstream do
{:ok, Socket.t()} | {:error, NimbleOptions.ValidationError.t()}
@spec connect(socket :: Socket.t(), opts :: Keyword.t()) ::
{:ok, Socket.t()} | {:error, NimbleOptions.ValidationError.t()}
def connect(socket \\ new_socket(), opts) do
def connect(%Socket{} = socket \\ new_socket(), opts) do
case Slipstream.Configuration.validate(opts) do
{:ok, config} ->
socket = TelemetryHelper.begin_connect(socket, config)

route_command %Commands.OpenConnection{config: config, socket: socket}

{:ok, %Socket{socket | channel_config: config}}
{:ok, %{socket | channel_config: config}}

{:error, reason} ->
{:error, reason}
Expand All @@ -826,13 +826,13 @@ defmodule Slipstream do
@doc since: "0.1.0"
@spec connect!(opts :: Keyword.t()) :: Socket.t()
@spec connect!(socket :: Socket.t(), opts :: Keyword.t()) :: Socket.t()
def connect!(socket \\ new_socket(), opts) do
def connect!(%Socket{} = socket \\ new_socket(), opts) do
config = Slipstream.Configuration.validate!(opts)
socket = TelemetryHelper.begin_connect(socket, config)

route_command %Commands.OpenConnection{config: config, socket: socket}

%Socket{socket | channel_config: config}
%{socket | channel_config: config}
end

@doc """
Expand Down Expand Up @@ -869,7 +869,7 @@ defmodule Slipstream do
@doc since: "0.1.0"
@spec reconnect(socket :: Socket.t()) ::
{:ok, Socket.t()} | {:error, :no_config | :not_connected}
def reconnect(socket) do
def reconnect(%Socket{} = socket) do
with false <- Socket.connected?(socket),
%Slipstream.Configuration{} = config <- socket.channel_config,
{time, socket} <- Socket.next_reconnect_time(socket) do
Expand Down
2 changes: 1 addition & 1 deletion lib/slipstream/callback.ex
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ defmodule Slipstream.Callback do
# note that `args` needs to be a compile-time list for this to work
arity = length(args) + 1

unless {name, arity} in @known_callbacks do
if {name, arity} not in @known_callbacks do
raise CompileError,
file: __CALLER__.file,
line: __CALLER__.line,
Expand Down
5 changes: 2 additions & 3 deletions lib/slipstream/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ defmodule Slipstream.Connection do
socket: %Slipstream.Socket{socket_pid: socket_pid},
config: config
}) do
initial_state = State.new(config, socket_pid)
%State{} = initial_state = State.new(config, socket_pid)

metadata = Telemetry.begin(initial_state)

state = %State{initial_state | metadata: metadata}
state = %{initial_state | metadata: metadata}

{:ok, state, {:continue, :connect}}
end
Expand Down
4 changes: 2 additions & 2 deletions lib/slipstream/connection/impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ defmodule Slipstream.Connection.Impl do

# ---

def push_message(frame, state) when is_tuple(frame) do
def push_message(frame, %State{} = state) when is_tuple(frame) do
case Mint.WebSocket.encode(state.websocket, frame) do
{:ok, websocket, data} ->
case Mint.WebSocket.stream_request_body(
Expand All @@ -57,7 +57,7 @@ defmodule Slipstream.Connection.Impl do
data
) do
{:ok, conn} ->
{:ok, %State{state | conn: conn, websocket: websocket}}
{:ok, %{state | conn: conn, websocket: websocket}}

# coveralls-ignore-start
{:error, conn, reason} ->
Expand Down
89 changes: 56 additions & 33 deletions lib/slipstream/connection/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ defmodule Slipstream.Connection.Pipeline do
defp decode_message(
%{
raw_message: {:DOWN, ref, :process, _pid, reason},
state: %{client_ref: ref}
state: %State{client_ref: ref}
} = p
) do
put_message(p, event(%Events.ParentProcessExited{reason: reason}))
Expand All @@ -64,7 +64,7 @@ defmodule Slipstream.Connection.Pipeline do
{:status, ref, status},
{:headers, ref, headers} | _maybe_done
],
state: %{request_ref: ref} = state
state: %State{request_ref: ref} = state
} = p
) do
case Mint.WebSocket.new(state.conn, ref, status, headers) do
Expand All @@ -77,7 +77,7 @@ defmodule Slipstream.Connection.Pipeline do
response_headers: headers
})
)
|> put_state(%State{p.state | conn: conn, websocket: websocket})
|> put_state(%{p.state | conn: conn, websocket: websocket})

{:error, _conn, reason} ->
failure_info = %{
Expand Down Expand Up @@ -140,7 +140,7 @@ defmodule Slipstream.Connection.Pipeline do
case Mint.WebSocket.stream(p.state.conn, p.raw_message) do
{:ok, conn, messages} ->
put_in(p.raw_message, messages)
|> put_state(%State{p.state | conn: conn})
|> put_state(%{p.state | conn: conn})
|> decode_message()

{:error, conn, %Mint.TransportError{reason: :closed}, _} ->
Expand Down Expand Up @@ -173,11 +173,11 @@ defmodule Slipstream.Connection.Pipeline do

@spec handle_message(t()) :: t()
defp handle_message(
%{message: :connect, state: %{config: config} = state} = p
%{message: :connect, state: %State{config: config} = state} = p
) do
with {:ok, conn} <- Impl.http_connect(config),
{:ok, conn, ref} <- Impl.websocket_upgrade(conn, config) do
put_state(p, %State{state | conn: conn, request_ref: ref})
put_state(p, %{state | conn: conn, request_ref: ref})
else
# coveralls-ignore-start
{:error, conn, reason} ->
Expand Down Expand Up @@ -270,10 +270,13 @@ defmodule Slipstream.Connection.Pipeline do
end

defp handle_message(
%{message: command(%Commands.JoinTopic{} = cmd), state: state} = p
%{
message: command(%Commands.JoinTopic{} = cmd),
state: %State{} = state
} = p
) do
{ref, state} = State.next_ref(state)
state = %State{state | joins: Map.put(state.joins, cmd.topic, ref)}
state = %{state | joins: Map.put(state.joins, cmd.topic, ref)}

p
|> put_state(state)
Expand All @@ -287,10 +290,13 @@ defmodule Slipstream.Connection.Pipeline do
end

defp handle_message(
%{message: command(%Commands.LeaveTopic{} = cmd), state: state} = p
%{
message: command(%Commands.LeaveTopic{} = cmd),
state: %State{} = state
} = p
) do
{ref, state} = State.next_ref(state)
state = %State{state | leaves: Map.put(state.leaves, cmd.topic, ref)}
state = %{state | leaves: Map.put(state.leaves, cmd.topic, ref)}

p
|> put_state(state)
Expand All @@ -304,7 +310,10 @@ defmodule Slipstream.Connection.Pipeline do
end

defp handle_message(
%{message: command(%Commands.CloseConnection{}), state: state} = p
%{
message: command(%Commands.CloseConnection{}),
state: %State{} = state
} = p
) do
Mint.HTTP.close(state.conn)

Expand All @@ -322,7 +331,7 @@ defmodule Slipstream.Connection.Pipeline do
defp handle_message(
%{
message: event(%Events.ParentProcessExited{reason: reason}),
state: state
state: %State{} = state
} = p
) do
Mint.HTTP.close(state.conn)
Expand All @@ -333,7 +342,7 @@ defmodule Slipstream.Connection.Pipeline do
defp handle_message(
%{
message: event(%Events.ChannelConnectFailed{} = event),
state: state
state: %State{} = state
} = p
) do
Mint.HTTP.close(state.conn)
Expand All @@ -351,34 +360,45 @@ defmodule Slipstream.Connection.Pipeline do
defp handle_message(%{message: event(%Events.PongReceived{})} = p), do: p
# coveralls-ignore-stop

defp handle_message(%{message: event(%type{} = event), state: state} = p)
defp handle_message(
%{message: event(%type{} = event), state: %State{} = state} = p
)
when type in [Events.TopicJoinFailed, Events.TopicJoinClosed] do
state = %State{state | joins: Map.delete(state.joins, event.topic)}
state = %{state | joins: Map.delete(state.joins, event.topic)}

route_event state, event

put_state(p, state)
end

defp handle_message(
%{message: event(%Events.TopicLeaveAccepted{} = event), state: state} =
%{
message: event(%Events.TopicLeaveAccepted{} = event),
state: %State{} = state
} =
p
) do
state = %State{state | leaves: Map.delete(state.leaves, event.topic)}
state = %{state | leaves: Map.delete(state.leaves, event.topic)}

put_state(p, state)
end

defp handle_message(
%{message: event(%Events.HeartbeatAcknowledged{}), state: state} = p
%{
message: event(%Events.HeartbeatAcknowledged{}),
state: %State{} = state
} = p
) do
# coveralls-ignore-start
put_state(p, State.reset_heartbeat(state))
# coveralls-ignore-stop
end

defp handle_message(
%{message: event(%Events.ChannelConnected{} = event), state: state} = p
%{
message: event(%Events.ChannelConnected{} = event),
state: %State{} = state
} = p
) do
timer =
if state.config.heartbeat_interval_msec != 0 do
Expand All @@ -392,7 +412,7 @@ defmodule Slipstream.Connection.Pipeline do
end

state =
%State{state | status: :connected, heartbeat_timer: timer}
%{state | status: :connected, heartbeat_timer: timer}
|> State.reset_heartbeat()

route_event state, event
Expand All @@ -402,15 +422,18 @@ defmodule Slipstream.Connection.Pipeline do

# coveralls-ignore-start
defp handle_message(
%{message: event(%Events.ChannelClosed{} = event), state: state} = p
%{
message: event(%Events.ChannelClosed{} = event),
state: %State{} = state
} = p
) do
Mint.HTTP.close(state.conn)

if match?({:interval, ref} when is_reference(ref), state.heartbeat_timer) do
:timer.cancel(state.heartbeat_timer)
end

state = %State{state | status: :terminating}
state = %{state | status: :terminating}

route_event state, event

Expand Down Expand Up @@ -450,15 +473,15 @@ defmodule Slipstream.Connection.Pipeline do
defp default_return(p), do: p

@spec build_events(t()) :: t()
defp build_events(%{events: []} = p), do: p
defp build_events(%__MODULE__{events: []} = p), do: p

defp build_events(%{events: events} = p) do
defp build_events(%__MODULE__{events: events} = p) do
built_events =
Enum.map(events, fn %{type: type, attrs: attrs} ->
build_event(type, attrs)
end)

%__MODULE__{p | built_events: built_events}
%{p | built_events: built_events}
end

defp emit_events(%{built_events: []} = p), do: p
Expand All @@ -482,13 +505,13 @@ defmodule Slipstream.Connection.Pipeline do
# --- token API

@spec put_state(t(), State.t()) :: t()
def put_state(p, state) do
%__MODULE__{p | state: state}
def put_state(%__MODULE__{} = p, %State{} = state) do
%{p | state: state}
end

@spec put_message(t(), term()) :: t()
def put_message(p, message) do
%__MODULE__{p | message: message}
def put_message(%__MODULE__{} = p, message) do
%{p | message: message}
end

@doc """
Expand All @@ -498,8 +521,8 @@ defmodule Slipstream.Connection.Pipeline do
build the event in the `build_events/1` phase of the pipeline
"""
@spec put_event(t(), atom(), Keyword.t() | map()) :: t()
def put_event(p, event, attrs \\ %{}) do
%__MODULE__{
def put_event(%__MODULE__{} = p, event, attrs \\ %{}) do
%{
p
| events: p.events ++ [%{type: event, attrs: Enum.into(attrs, %{})}]
}
Expand All @@ -511,8 +534,8 @@ defmodule Slipstream.Connection.Pipeline do
This value will be given to the GenServer callback that invoked
"""
@spec put_return(t(), term()) :: t()
def put_return(p, return) do
%__MODULE__{p | return: return}
def put_return(%__MODULE__{} = p, return) do
%{p | return: return}
end

def push_message(p, message) do
Expand Down
12 changes: 6 additions & 6 deletions lib/slipstream/connection/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,18 @@ defmodule Slipstream.Connection.State do

Refs are simply strings of incrementing integers.
"""
def next_ref(state) do
def next_ref(%__MODULE__{} = state) do
ref = state.current_ref + 1

{to_string(ref),
%__MODULE__{state | current_ref: ref, current_ref_str: to_string(ref)}}
%{state | current_ref: ref, current_ref_str: to_string(ref)}}
end

# coveralls-ignore-start
def next_heartbeat_ref(state) do
def next_heartbeat_ref(%__MODULE__{} = state) do
{ref, state} = next_ref(state)

%__MODULE__{state | heartbeat_ref: ref}
%{state | heartbeat_ref: ref}
end

# coveralls-ignore-stop
Expand All @@ -85,8 +85,8 @@ defmodule Slipstream.Connection.State do
in state is nil, that means we have not received a reply to our heartbeat
request and that the server is potentially stuck or otherwise not responding.
"""
def reset_heartbeat(state) do
%__MODULE__{state | heartbeat_ref: nil}
def reset_heartbeat(%__MODULE__{} = state) do
%{state | heartbeat_ref: nil}
end

@doc """
Expand Down
Loading