Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion lib/cloud_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ defmodule CloudCache do

@default_adapter CloudCache.Adapters.S3
@default_name __MODULE__

def start_link(caches, opts \\ []) do
Supervisor.start_link(__MODULE__, caches, Keyword.put_new(opts, :name, @default_name))
end
Expand Down Expand Up @@ -53,6 +52,10 @@ defmodule CloudCache do

# Non-Multipart Upload API

def list_buckets(opts \\ []) do
adapter(opts).list_buckets(opts)
end

def head_object(bucket, object, opts \\ []) do
adapter(opts).head_object(bucket, object, opts)
end
Expand All @@ -61,6 +64,10 @@ defmodule CloudCache do
adapter(opts).pre_sign(bucket, object, opts)
end

def get_object(bucket, object, opts \\ []) do
adapter(opts).get_object(bucket, object, opts)
end

def put_object(bucket, object, body, opts \\ []) do
adapter(opts).put_object(bucket, object, body, opts)
end
Expand Down
11 changes: 11 additions & 0 deletions lib/cloud_cache/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ defmodule CloudCache.Adapter do

@callback supervisor_children(opts :: options()) :: list()

@callback list_buckets(opts :: options()) :: {:ok, term()} | {:error, term()}

@callback head_object(
bucket :: bucket(),
object :: object(),
Expand Down Expand Up @@ -93,6 +95,11 @@ defmodule CloudCache.Adapter do
opts :: options()
) :: {:ok, term()} | {:error, term()}

@callback list_multipart_uploads(
bucket :: bucket(),
opts :: options()
) :: {:ok, term()} | {:error, term()}

@callback copy_object_multipart(
dest_bucket :: bucket(),
dest_object :: object(),
Expand Down Expand Up @@ -164,6 +171,10 @@ defmodule CloudCache.Adapter do
adapter.list_parts(bucket, object, upload_id, opts)
end

def list_multipart_uploads(adapter, bucket, opts \\ []) do
adapter.list_multipart_uploads(bucket, opts)
end

def complete_multipart_upload(adapter, bucket, object, upload_id, parts, opts \\ []) do
adapter.complete_multipart_upload(bucket, object, upload_id, parts, opts)
end
Expand Down
225 changes: 151 additions & 74 deletions lib/cloud_cache/adapters/s3.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,113 +20,148 @@ defmodule CloudCache.Adapters.S3 do
@behaviour CloudCache.Adapter

@logger_prefix "CloudCache.Adapters.S3"
@one_minute_seconds 60

@mix_env_test Mix.env() === :test
@localhost_scheme "http://"
@localhost_host "s3.localhost.localstack.cloud"
@localhost_port 4566

@one_minute_seconds 60
@http_client CloudCache.Adapters.S3.HTTP
@region "us-west-1"
@sandbox_scheme "http://"
@sandbox_host "s3.localhost.localstack.cloud"
@sandbox_port 4566
@default_retry_options [
max_attempts: if(@mix_env_test, do: 1, else: 10),
max_attempts: if(Mix.env() === :test, do: 1, else: 10),
base_backoff_in_ms: 10,
max_backoff_in_ms: 10_000
]
@default_options [
s3: [
sandbox_enabled: @mix_env_test,
sandbox: [
scheme: @sandbox_scheme,
host: @sandbox_host,
port: @sandbox_port
],
http_client: @http_client,
region: @region,
access_key_id: if(@mix_env_test, do: "test", else: "<ACCESS_KEY_ID>"),
secret_access_key: if(@mix_env_test, do: "test", else: "<SECRET_ACCESS_KEY>"),
retries: @default_retry_options
]
]

@s3_config_keys [
:port,
:scheme,
:host,
:http_client,
:access_key_id,
:secret_access_key,
:region,
:json_codec,
:retries,
:normalize_path,
:require_imds_v2
@default_s3_options [
sandbox_enabled: Mix.env() === :test,
local_stack_enabled: Mix.env() === :test,
http_client: CloudCache.Adapters.S3.HTTP,
region: "us-west-1",
retries: @default_retry_options
]

@default_options [s3: @default_s3_options]

# 64 MiB (67_108_864 bytes)
@sixty_four_mib 64 * 1_024 * 1_024

@doc """
Returns the S3 configuration as a map.

CloudCache.Adapters.S3.config()
### Examples

iex> CloudCache.Adapters.S3.config()
"""
def config(opts \\ []) do
opts =
Keyword.merge(@default_options, opts, fn
_k, v1, v2 when is_list(v2) -> Keyword.merge(v1, v2)
_k, v1, v2 when is_map(v2) -> Map.merge(v1, v2)
_, _v1, v2 -> v2
end)
s3_opts = Keyword.merge(@default_s3_options, opts[:s3] || [])

sandbox_opts =
if @mix_env_test do
sandbox_opts = opts[:sandbox] || []

case sandbox_opts[:endpoint] do
nil ->
[
scheme: uri_scheme(sandbox_opts[:scheme] || @sandbox_scheme),
host: sandbox_opts[:host] || @sandbox_host,
port: sandbox_opts[:port] || @sandbox_port
]

uri ->
uri = URI.parse(uri)
scheme = uri_scheme(uri.scheme || @sandbox_scheme)
host = uri.host || @sandbox_host
port = uri.port || @sandbox_port

[
scheme: scheme,
host: host,
port: port
]
end
else
[]
end
s3_endpoint_opts = s3_endpoint_options(s3_opts)

overrides =
:ex_aws
|> Application.get_all_env()
|> Keyword.merge(opts[:s3] || [])
s3_opts
|> Keyword.update(
:retries,
@default_retry_options,
&Keyword.merge(@default_retry_options, &1)
)
|> then(&Keyword.merge(sandbox_opts, &1))
|> Keyword.take(@s3_config_keys)
|> Keyword.merge(s3_endpoint_opts)
|> Keyword.take([
:access_key_id,
:host,
:http_client,
:json_codec,
:normalize_path,
:port,
:region,
:require_imds_v2,
:retries,
:secret_access_key,
:scheme
])

ExAws.Config.new(:s3, overrides)
end

defp s3_endpoint_options(opts) do
if opts[:local_stack_enabled] do
[
scheme: @localhost_scheme,
host: @localhost_host,
port: @localhost_port,
access_key_id: "test",
secret_access_key: "test"
]
else
creds = aws_credentials_opts(opts)

base =
case opts[:endpoint] do
nil ->
[]

endpoint ->
uri = URI.parse(endpoint)

[
scheme: uri_scheme(uri.scheme),
host: uri.host,
port: uri.port
]
end

Keyword.merge(base, creds)
end
end

defp aws_credentials_opts(opts) do
profile = Keyword.get(opts, :profile, "cloud_cache")

[
access_key_id: [
{:awscli, profile, 30},
:instance_role,
{:system, "AWS_ACCESS_KEY_ID"},
"<AWS_ACCESS_KEY_ID>"
],
secret_access_key: [
{:awscli, profile, 30},
:instance_role,
{:system, "AWS_SECRET_ACCESS_KEY"},
"<AWS_SECRET_ACCESS_KEY>"
]
]
end

defp uri_scheme("https" <> _), do: "https://"
defp uri_scheme("http" <> _), do: "http://"
defp uri_scheme(_), do: "https://"

@impl true
def list_buckets(opts \\ []) do
opts = Keyword.merge(@default_options, opts)

sandbox? = opts[:s3][:sandbox_enabled] === true

if not sandbox? or sandbox_disabled?() do
case opts
|> Keyword.take([:host, :port, :region, :scheme, :headers, :timeout])
|> S3.list_buckets()
|> perform(opts) do
{:ok, %{body: body}} ->
{:ok, body.buckets}

{:error, %{status: status} = response} when status in 400..499 ->
{:error, ErrorMessage.not_found("buckets not found", %{response: response})}

{:error, reason} ->
{:error,
ErrorMessage.service_unavailable("service temporarily unavailable", %{reason: reason})}
end
else
sandbox_list_buckets_response(opts)
end
end

@impl true
def head_object(bucket, object, opts \\ []) do
opts = Keyword.merge(@default_options, opts)
Expand Down Expand Up @@ -450,6 +485,32 @@ defmodule CloudCache.Adapters.S3 do
end
end

@impl true
@doc """
...
"""
def list_multipart_uploads(bucket, opts \\ []) do
opts = Keyword.merge(@default_options, opts)

sandbox? = opts[:s3][:sandbox_enabled] === true

if not sandbox? or sandbox_disabled?() do
case bucket |> S3.list_multipart_uploads(opts) |> perform(opts) do
{:ok, %{body: %{uploads: uploads}}} ->
{:ok, uploads}

{:error, reason} ->
{:error,
ErrorMessage.service_unavailable("service temporarily unavailable", %{
bucket: bucket,
reason: reason
})}
end
else
sandbox_list_multipart_uploads_response(bucket, opts)
end
end

@impl true
@doc """
...
Expand Down Expand Up @@ -935,6 +996,10 @@ defmodule CloudCache.Adapters.S3 do
if Mix.env() === :test do
defdelegate sandbox_disabled?, to: CloudCache.Adapters.S3.Testing.S3Sandbox

defdelegate sandbox_list_buckets_response(opts),
to: CloudCache.Adapters.S3.Testing.S3Sandbox,
as: :list_buckets_response

defdelegate sandbox_head_object_response(bucket, object, opts),
to: CloudCache.Adapters.S3.Testing.S3Sandbox,
as: :head_object_response
Expand Down Expand Up @@ -984,6 +1049,10 @@ defmodule CloudCache.Adapters.S3 do
to: CloudCache.Adapters.S3.Testing.S3Sandbox,
as: :pre_sign_part_response

defdelegate sandbox_list_multipart_uploads_response(bucket, opts),
to: CloudCache.Adapters.S3.Testing.S3Sandbox,
as: :list_multipart_uploads_response

defdelegate sandbox_copy_object_multipart_response(
dest_bucket,
dest_object,
Expand Down Expand Up @@ -1039,6 +1108,14 @@ defmodule CloudCache.Adapters.S3 do
else
defp sandbox_disabled?, do: true

defp sandbox_list_buckets_response(opts) do
raise """
Cannot use #{inspect(__MODULE__)}.list_buckets/1 outside of test.

options: #{inspect(opts)}
"""
end

defp sandbox_head_object_response(bucket, object, opts) do
raise """
Cannot use #{inspect(__MODULE__)}.head_object/3 outside of test.
Expand Down
Loading