Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,9 @@ base_cron = [
# Every day at 1am
{"0 1 * * *", Plausible.Workers.CleanInvitations},
# Every 2 hours
{"0 */2 * * *", Plausible.Workers.ExpireDomainChangeTransitions}
{"0 */2 * * *", Plausible.Workers.ExpireDomainChangeTransitions},
# Daily at midnight
{"0 0 * * *", Plausible.Workers.LocationsSync}
]

cloud_cron = [
Expand Down Expand Up @@ -626,7 +628,9 @@ base_queues = [
analytics_exports: 1,
notify_exported_analytics: 1,
domain_change_transition: 1,
check_accept_traffic_until: 1
check_accept_traffic_until: 1,
clickhouse_clean_sites: 1,
locations_sync: 1
]

cloud_queues = [
Expand Down
2 changes: 1 addition & 1 deletion lib/mix/tasks/clean_clickhouse.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule Mix.Tasks.CleanClickhouse do
def run(_) do
%{rows: rows} = IngestRepo.query!("show tables")
tables = Enum.map(rows, fn [table] -> table end)
to_truncate = tables -- ["schema_migrations"]
to_truncate = tables -- ["schema_migrations", "location_data", "location_data_dict"]

Enum.each(to_truncate, fn table ->
IngestRepo.query!("truncate #{table}")
Expand Down
16 changes: 16 additions & 0 deletions lib/plausible/clickhouse_location_data.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
defmodule Plausible.ClickhouseLocationData do
@moduledoc """
Schema for storing location id <-> translation mappings in ClickHouse

Indirectly read via dictionary `location_data_dictionary` in ALIAS columns in
`events_v2`, `sessions_v2` and `imported_locations` table.
"""
use Ecto.Schema

@primary_key false
schema "location_data" do
field :type, Ch, type: "LowCardinality(String)"
field :id, :string
field :name, :string
end
end
140 changes: 140 additions & 0 deletions lib/plausible/data_migration/locations_sync.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
defmodule Plausible.DataMigration.LocationsSync do
@moduledoc """
ClickHouse locations data migration for storing location names in ClickHouse.
Only run when `Location.version()` changes: either as a migration or in cron.
The migration:
1. Truncates existing `location_data` table (if exists)
2. Creates new table (if needed)
3. Inserts new data from Location module
4. (Re-)Creates dictionary to read location data from table
5. Creates ALIAS columns in `events_v2`, `sessions_v2` and `imported_locations` table to make reading location names easy
6. Updates table comment for `location_data` to indicate last version synced.
Note that the dictionary is large enough to cache the whole dataset in memory, making lookups fast.
This migration is intended to be idempotent and rerunnable - if run multiple times, it should always set things to the same
result as if run once.
SQL files available at: priv/data_migrations/LocationsSync/sql
"""
alias Plausible.ClickhouseLocationData

use Plausible.DataMigration, dir: "LocationsSync", repo: Plausible.IngestRepo

@columns [
%{
table: "events_v2",
column_name: "country_name",
type: "country",
input_column: "country_code"
},
%{
table: "events_v2",
column_name: "region_name",
type: "subdivision",
input_column: "subdivision1_code"
},
%{
table: "events_v2",
column_name: "city_name",
type: "city",
input_column: "city_geoname_id"
},
%{
table: "sessions_v2",
column_name: "country_name",
type: "country",
input_column: "country_code"
},
%{
table: "sessions_v2",
column_name: "region_name",
type: "subdivision",
input_column: "subdivision1_code"
},
%{
table: "sessions_v2",
column_name: "city_name",
type: "city",
input_column: "city_geoname_id"
},
%{
table: "imported_locations",
column_name: "country_name",
type: "country",
input_column: "country"
},
%{
table: "imported_locations",
column_name: "region_name",
type: "subdivision",
input_column: "region"
},
%{
table: "imported_locations",
column_name: "city_name",
type: "city",
input_column: "city"
}
]

def out_of_date?() do
case run_sql("get-location-data-table-comment") do
{:ok, %{rows: [[stored_version]]}} -> stored_version != Location.version()
_ -> true
end
end

def run() do
cluster? = Plausible.MigrationUtils.clustered_table?("sessions_v2")

{:ok, _} = run_sql("truncate-location-data-table", cluster?: cluster?)
{:ok, _} = run_sql("create-location-data-table", cluster?: cluster?)

countries =
Location.Country.all()
|> Enum.map(fn %Location.Country{alpha_2: alpha_2, name: name} ->
%{type: "country", id: alpha_2, name: name}
end)

subdivisions =
Location.Subdivision.all()
|> Enum.map(fn %Location.Subdivision{code: code, name: name} ->
%{type: "subdivision", id: code, name: name}
end)

cities =
Location.City.all()
|> Enum.map(fn %Location.City{id: id, name: name} ->
%{type: "city", id: Integer.to_string(id), name: name}
end)

insert_data = Enum.concat([countries, subdivisions, cities])
@repo.insert_all(ClickhouseLocationData, insert_data)

{:ok, _} =
run_sql("update-location-data-dictionary",
cluster?: cluster?,
dictionary_connection_params: Plausible.MigrationUtils.dictionary_connection_params()
)

for column <- @columns do
{:ok, _} =
run_sql("add-alias-column",
cluster?: cluster?,
table: column.table,
column_name: column.column_name,
type: column.type,
input_column: column.input_column
)
end

{:ok, _} =
run_sql("update-location-data-table-comment",
cluster?: cluster?,
version: Location.version()
)
end
end
19 changes: 5 additions & 14 deletions lib/plausible/data_migration/populate_event_session_columns.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ defmodule Plausible.DataMigration.PopulateEventSessionColumns do
run_sql("create-sessions-dictionary",
cluster?: cluster?,
dictionary_connection_params:
Keyword.get(opts, :dictionary_connection_string, dictionary_connection_params()),
Keyword.get(
opts,
:dictionary_connection_string,
Plausible.MigrationUtils.dictionary_connection_params()
),
dictionary_config: dictionary_config(opts)
)

Expand Down Expand Up @@ -136,19 +140,6 @@ defmodule Plausible.DataMigration.PopulateEventSessionColumns do
|> Map.merge(Keyword.get(opts, :dictionary_config, %{}))
end

# See https://clickhouse.com/docs/en/sql-reference/dictionaries#clickhouse for context
defp dictionary_connection_params() do
Plausible.IngestRepo.config()
|> Enum.map(fn
{:database, database} -> "DB '#{database}'"
{:username, username} -> "USER '#{username}'"
{:password, password} -> "PASSWORD '#{password}'"
_ -> nil
end)
|> Enum.reject(&is_nil/1)
|> Enum.join(" ")
end

defp get_partitions(opts) do
[min_partition, max_partition] = Keyword.get(opts, :partition_range, ["0", "999999"])

Expand Down
13 changes: 13 additions & 0 deletions lib/plausible/migration_utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,17 @@ defmodule Plausible.MigrationUtils do
{:ok, _} -> true
end
end

# See https://clickhouse.com/docs/en/sql-reference/dictionaries#clickhouse for context
def dictionary_connection_params() do
Plausible.IngestRepo.config()
|> Enum.map(fn
{:database, database} -> "DB '#{database}'"
{:username, username} -> "USER '#{username}'"
{:password, password} -> "PASSWORD '#{password}'"
_ -> nil
end)
|> Enum.reject(&is_nil/1)
|> Enum.join(" ")
end
end
3 changes: 3 additions & 0 deletions lib/plausible/stats/filters/filters.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ defmodule Plausible.Stats.Filters do
:country,
:region,
:city,
:country_name,
:region_name,
:city_name,
:entry_page,
:exit_page,
:entry_page_hostname,
Expand Down
3 changes: 3 additions & 0 deletions lib/plausible/stats/imported/base.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ defmodule Plausible.Stats.Imported.Base do
"visit:country" => "imported_locations",
"visit:region" => "imported_locations",
"visit:city" => "imported_locations",
"visit:country_name" => "imported_locations",
"visit:region_name" => "imported_locations",
"visit:city_name" => "imported_locations",
"visit:device" => "imported_devices",
"visit:browser" => "imported_browsers",
"visit:browser_version" => "imported_browsers",
Expand Down
4 changes: 4 additions & 0 deletions lib/plausible/stats/imported/sql/builder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,10 @@ defmodule Plausible.Stats.Imported.SQL.Builder do
defp filter_group_values(q, "visit:region"), do: where(q, [i], i.region != "")
defp filter_group_values(q, "visit:city"), do: where(q, [i], i.city != 0 and not is_nil(i.city))

defp filter_group_values(q, "visit:country_name"), do: where(q, [i], i.country_name != "ZZ")
defp filter_group_values(q, "visit:region_name"), do: where(q, [i], i.region_name != "")
defp filter_group_values(q, "visit:city_name"), do: where(q, [i], i.city_name != "")

defp filter_group_values(q, _dimension), do: q

def select_joined_dimensions(q, query) do
Expand Down
9 changes: 9 additions & 0 deletions lib/plausible/stats/sql/expression.ex
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,15 @@ defmodule Plausible.Stats.SQL.Expression do
def select_dimension(q, key, "visit:city", _table, _query),
do: select_merge_as(q, [t], %{key => t.city})

def select_dimension(q, key, "visit:country_name", _table, _query),
do: select_merge_as(q, [t], %{key => t.country_name})

def select_dimension(q, key, "visit:region_name", _table, _query),
do: select_merge_as(q, [t], %{key => t.region_name})

def select_dimension(q, key, "visit:city_name", _table, _query),
do: select_merge_as(q, [t], %{key => t.city_name})

def event_metric(:pageviews) do
wrap_alias([e], %{
pageviews:
Expand Down
15 changes: 15 additions & 0 deletions lib/workers/locations_sync.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
defmodule Plausible.Workers.LocationsSync do
@moduledoc false

use Plausible.Repo
use Oban.Worker, queue: :locations_sync

@impl Oban.Worker
def perform(_job) do
if Plausible.DataMigration.LocationsSync.out_of_date?() do
Plausible.DataMigration.LocationsSync.run()
end

:ok
end
end
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
"joken": {:hex, :joken, "2.6.0", "b9dd9b6d52e3e6fcb6c65e151ad38bf4bc286382b5b6f97079c47ade6b1bcc6a", [:mix], [{:jose, "~> 1.11.5", [hex: :jose, repo: "hexpm", optional: false]}], "hexpm", "5a95b05a71cd0b54abd35378aeb1d487a23a52c324fa7efdffc512b655b5aaa7"},
"jose": {:hex, :jose, "1.11.6", "613fda82552128aa6fb804682e3a616f4bc15565a048dabd05b1ebd5827ed965", [:mix, :rebar3], [], "hexpm", "6275cb75504f9c1e60eeacb771adfeee4905a9e182103aa59b53fed651ff9738"},
"kaffy": {:hex, :kaffy, "0.10.2", "72e807c525323bd0cbc3ac0c127b7bde61caffdc576fb6554964d3fe6a2a6100", [:mix], [{:ecto, "~> 3.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:phoenix, "~> 1.6", [hex: :phoenix, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 3.0", [hex: :phoenix_html, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0.2", [hex: :phoenix_view, repo: "hexpm", optional: false]}], "hexpm", "651cad5f3bcc91510a671c13c7a273b8b8195fdf2d809208708baecbb77300bf"},
"location": {:git, "https://github.com/plausible/location.git", "eddd52590f2423cd677d9206787468e1fb018668", []},
"location": {:git, "https://github.com/plausible/location.git", "a89bf79985c3c3d0830477ae587001156a646ce8", []},
"locus": {:hex, :locus, "2.3.6", "c9f53fd5df872fca66a54dc0aa2f8b2d3640388e56a0c39a741be0df6d8854bf", [:rebar3], [{:tls_certificate_check, "~> 1.9", [hex: :tls_certificate_check, repo: "hexpm", optional: false]}], "hexpm", "6087aa9a69673e7011837fb4b3d7f756560adde76892c32f5f93904ee30064e2"},
"mail": {:hex, :mail, "0.3.1", "cb0a14e4ed8904e4e5a08214e686ccf6f9099346885db17d8c309381f865cc5c", [:mix], [], "hexpm", "1db701e89865c1d5fa296b2b57b1cd587587cca8d8a1a22892b35ef5a8e352a6"},
"makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ALTER TABLE <%= @table %>
<%= if @cluster? do %>ON CLUSTER '{cluster}'<% end %>
ADD COLUMN IF NOT EXISTS
<%= @column_name %> String
ALIAS dictGet('location_data_dict', 'name', tuple('<%= @type %>', <%= @input_column %>))
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
CREATE TABLE IF NOT EXISTS location_data <%= if @cluster? do %>ON CLUSTER '{cluster}'<% end %>
(
`type` LowCardinality(String),
`id` String,
`name` String
)
<%= if @cluster? do %>
ENGINE = ReplicateMergeTree('/clickhouse/{cluster}/tables/{shard}/plausible_prod/location_data', '{replica}')
<% else %>
ENGINE = MergeTree()
<% end %>
ORDER BY (type, id)
SETTINGS index_granularity = 128
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
select comment from system.tables where database = currentDatabase() and table = 'location_data'
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
TRUNCATE TABLE IF EXISTS location_data <%= if @cluster? do %>ON CLUSTER '{cluster}'<% end %>
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
CREATE OR REPLACE DICTIONARY location_data_dict
<%= if @cluster? do %>ON CLUSTER '{cluster}'<% end %>
(
`type` String,
`id` String,
`name` String
)
PRIMARY KEY type, id
SOURCE(CLICKHOUSE(TABLE location_data <%= @dictionary_connection_params %>))
LIFETIME(0)
LAYOUT(complex_key_cache(size_in_cells 500000))
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE location_data
<%= if @cluster? do %>ON CLUSTER '{cluster}'<% end %>
MODIFY COMMENT '<%= @version %>'
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
defmodule Plausible.IngestRepo.Migrations.PopulateLocationData do
use Ecto.Migration

def up do
try do
Location.load_all()
rescue
# Already loaded
ArgumentError -> nil
end

Plausible.DataMigration.LocationsSync.run()
end

def down do
raise "Irreversible"
end
end
Loading