Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 52 additions & 31 deletions lib/rate_tracker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -94,24 +94,17 @@ defmodule Instruments.RateTracker do
"""
@spec dump_rates() :: [{{String.t(), Keyword.t()}, non_neg_integer()}]
def dump_rates() do
table_count = :erlang.system_info(:schedulers)

1..table_count
|> Enum.flat_map(fn scheduler_id ->
scheduler_id
|> table_name()
|> :ets.tab2list()
end)
|> aggregate_stats()
|> Enum.filter(fn
{_key, 0} -> false
{_key, _rate} -> true
end)
|> Enum.to_list()
GenServer.call(__MODULE__, :dump_rates)
end

## GenServer callbacks

def handle_call(:dump_rates, _from, %__MODULE__{} = state) do
time_since_report = time() - state.last_update_time
rates = do_dump_rates(state, time_since_report)
{:reply, rates, state}
end

def handle_cast({:subscribe, callback}, %__MODULE__{} = state) do
state = %__MODULE__{state | callbacks: [callback | state.callbacks]}

Expand All @@ -125,8 +118,9 @@ defmodule Instruments.RateTracker do

# Extraordinarily unlikely to be zero, but if it is for some reason, we'll just skip this
# and let the next report get it
if threshold != nil and time_since_report > 0 do
do_report(state, time_since_report, threshold)
if time_since_report > 0 do
counts = dump_and_clear_counts(state)
do_report(state, counts, time_since_report, threshold)
end

schedule_report()
Expand All @@ -135,6 +129,28 @@ defmodule Instruments.RateTracker do

## Private

defp do_dump_rates(_state, 0) do
[]
end

defp do_dump_rates(state, time_since_report) do
1..state.table_count
|> Enum.flat_map(fn scheduler_id ->
scheduler_id
|> table_name()
|> :ets.tab2list()
end)
|> aggregate_stats()
|> Enum.filter(fn
{_key, 0} -> false
{_key, _rate} -> true
end)
|> Enum.map(fn {key, count} ->
{key, count / time_since_report}
end)
|> Enum.to_list()
end

defp get_table_key(name, []) do
{name, []}
end
Expand Down Expand Up @@ -173,14 +189,24 @@ defmodule Instruments.RateTracker do
table_name(:erlang.system_info(:scheduler_id))
end

defp aggregate_stats(table_data) do
Enum.reduce(table_data, %{}, fn {key, val}, acc ->
Map.update(acc, key, val, &(&1 + val))
defp do_report(%__MODULE__{} = _state, _aggregated_counts, _time_since_report, nil) do
nil
end

defp do_report(%__MODULE__{} = state, aggregated_counts, time_since_report, threshold) do
Enum.each(aggregated_counts, fn {key, num_tracked} ->
# Sampling correction is technically approximate (we don't know if Statix or another underlying lib will report this differently)
tracked_per_second = num_tracked / time_since_report * sample_rate_for_key(key)

if tracked_per_second > threshold do
Enum.each(state.callbacks, fn callback -> callback.(key, tracked_per_second) end)
end
end)
end

defp do_report(%__MODULE__{} = state, time_since_report, threshold) do
dump_and_clear_data = fn scheduler_id ->
defp dump_and_clear_counts(%__MODULE__{} = state) do
1..state.table_count
|> Enum.flat_map(fn scheduler_id ->
table_name = table_name(scheduler_id)
table_data = :ets.tab2list(table_name)

Expand All @@ -189,18 +215,13 @@ defmodule Instruments.RateTracker do
end)

table_data
end

1..state.table_count
|> Enum.flat_map(dump_and_clear_data)
end)
|> aggregate_stats()
|> Enum.each(fn {key, num_tracked} ->
# Sampling correction is technically approximate (we don't know if Statix or another underlying lib will report this differently)
tracked_per_second = num_tracked / time_since_report * sample_rate_for_key(key)
end

if tracked_per_second > threshold do
Enum.each(state.callbacks, fn callback -> callback.(key, tracked_per_second) end)
end
defp aggregate_stats(table_data) do
Enum.reduce(table_data, %{}, fn {key, val}, acc ->
Map.update(acc, key, val, &(&1 + val))
end)
end

Expand Down
Loading