Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
2c6f9ed
rebase commit on new master
Vdeub-cloudinary Jul 3, 2025
5fbc26a
add tests
Vdeub-cloudinary Jul 3, 2025
d791f85
fix tests
Vdeub-cloudinary Jul 3, 2025
c6019bf
fix tests due to wrong mock data
Vdeub-cloudinary Jul 3, 2025
f25738a
fix tests due to wrong mock data + api missing method
Vdeub-cloudinary Jul 3, 2025
4c068ea
fix tests due to wrong definition
Vdeub-cloudinary Jul 3, 2025
d0f67ea
fix list first
Vdeub-cloudinary Jul 3, 2025
e77f822
fix patch
Vdeub-cloudinary Jul 3, 2025
c0a1d64
fix assertion error for list
Vdeub-cloudinary Jul 3, 2025
0e51e47
fix compare create
Vdeub-cloudinary Jul 3, 2025
03d053d
add listing after creation test
Vdeub-cloudinary Jul 3, 2025
36e19c3
fix test list after creation
Vdeub-cloudinary Jul 3, 2025
b7486b1
fix tests
Vdeub-cloudinary Jul 10, 2025
820c889
Merge branch 'master' into devx-16946-smd-for-clone
Vdeub-cloudinary Jul 16, 2025
3e3d67c
fix declarations due to conflict resolution mistake
Vdeub-cloudinary Jul 16, 2025
a2cf5a1
fix fields declaration for smd
Vdeub-cloudinary Jul 16, 2025
2b6fd28
improve scripts based on discussions
Vdeub-cloudinary Jul 21, 2025
23febe2
tentatively add user input mock
Vdeub-cloudinary Jul 21, 2025
b074d0e
tentatively add user input mock - 2
Vdeub-cloudinary Jul 21, 2025
17981a8
tentatively add user input mock - 3
Vdeub-cloudinary Jul 21, 2025
fe0233e
tentatively add user input mock - 4
Vdeub-cloudinary Jul 21, 2025
a626d18
tentatively add user input mock - 5
Vdeub-cloudinary Jul 21, 2025
30c8f6b
revamp code to make it more reusable and improve tests
Vdeub-cloudinary Dec 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions cloudinary_cli/modules/clone.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from click import command, option, style, argument
from cloudinary_cli.utils.utils import normalize_list_params, print_help_and_exit
import cloudinary
from cloudinary_cli.utils.clone.metadata import clone_metadata
from cloudinary.auth_token import _digest
from cloudinary_cli.utils.utils import run_tasks_concurrently
from cloudinary_cli.utils.api_utils import upload_file
from cloudinary_cli.utils.json_utils import print_json
from cloudinary_cli.utils.config_utils import get_cloudinary_config, config_to_dict
from cloudinary_cli.defaults import logger
from cloudinary_cli.core.search import execute_single_request, handle_auto_pagination
Expand All @@ -23,7 +25,7 @@
Format: cld clone <target_environment> <command options>
`<target_environment>` can be a CLOUDINARY_URL or a saved config (see `config` command)
Example 1 (Copy all assets including tags and context using CLOUDINARY URL):
cld clone cloudinary://<api_key>:<api_secret>@<cloudname> -fi tags,context
cld clone cloudinary://<api_key>:<api_secret>@<cloudname> -fi tags,context,metadata
Example 2 (Copy all assets with a specific tag via a search expression using a saved config):
cld clone <config_name> -se "tags:<tag_name>"
""")
Expand All @@ -36,7 +38,7 @@
help="Specify the number of concurrent network threads.")
@option("-fi", "--fields", multiple=True,
help=("Specify whether to copy tags and/or context. "
"Valid options: `tags,context`."))
"Valid options: `tags,context,metadata`."))
@option("-se", "--search_exp", default="",
help="Define a search expression to filter the assets to clone.")
@option("--async", "async_", is_flag=True, default=False,
Expand All @@ -54,7 +56,12 @@ def clone(target, force, overwrite, concurrent_workers, fields,
target_config, auth_token = _validate_clone_inputs(target)
if not target_config:
return False

if 'metadata' in normalize_list_params(fields):
metadata_clone = clone_metadata(target_config, force)
if not metadata_clone:
return False
else:
logger.info(style(f"The metadata process from {cloudinary.config().cloud_name} to {target_config.cloud_name} is now done. We will now proceed with cloning the assets.", fg="green"))
source_assets = search_assets(search_exp, force)
if not source_assets:
return False
Expand Down Expand Up @@ -92,6 +99,7 @@ def _validate_clone_inputs(target):
"as source environment.")
return None, None


auth_token = cloudinary.config().auth_token
if auth_token:
# It is important to validate auth_token if provided as this prevents
Expand Down Expand Up @@ -119,23 +127,21 @@ def _prepare_upload_list(source_assets, target_config, overwrite, async_,
upload_list.append((asset_url, {**updated_options}))
return upload_list


def search_assets(search_exp, force):
search_exp = _normalize_search_expression(search_exp)
if not search_exp:
return False

search = cloudinary.search.Search().expression(search_exp)
search.fields(['tags', 'context', 'access_control',
'secure_url', 'display_name', 'format'])
'secure_url', 'display_name', 'metadata', 'format'])
search.max_results(DEFAULT_MAX_RESULTS)

res = execute_single_request(search, fields_to_keep="")
res = handle_auto_pagination(res, search, force, fields_to_keep="")

return res


def _normalize_search_expression(search_exp):
"""
Ensures the search expression has a valid 'type' filter.
Expand Down
2 changes: 2 additions & 0 deletions cloudinary_cli/utils/api_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,3 +366,5 @@ def handle_auto_pagination(res, func, args, kwargs, force, filter_fields):
all_results.pop(cursor_field, None)

return all_results


144 changes: 144 additions & 0 deletions cloudinary_cli/utils/clone/metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import cloudinary
from cloudinary_cli.utils.config_utils import config_to_dict
from cloudinary_cli.utils.api_utils import call_api
from cloudinary_cli.defaults import logger
from cloudinary_cli.utils.utils import confirm_action, compare_dicts
from click import style

METADATA_FIELDS = "fields"
METADATA_RULES = "rules"
COMPARE_KEY_FIELDS = "external_id"
COMPARE_KEY_RULES = "name"
METADATA_TYPE_SINGULAR = {
"fields": "field",
"rules": "rule"
}
METADATA_API_METHODS = {
"fields": cloudinary.api.add_metadata_field,
"rules": cloudinary.api.add_metadata_rule
}

def clone_metadata(config, force):
"""Clone metadata fields and rules from source to target."""
target_config = config_to_dict(config)

# Clone fields (required)
fields_result = _clone_metadata_type(METADATA_FIELDS, COMPARE_KEY_FIELDS, target_config, force)
if fields_result is False:
return False

# Clone rules (optional)
rules_result = _clone_metadata_type(METADATA_RULES, COMPARE_KEY_RULES, target_config, force)
if rules_result is False:
return False

return True

def _clone_metadata_type(item_type, compare_key, target_config, force):
"""
Generic function to clone a metadata type (fields or rules).

:param item_type: 'fields' or 'rules'
:param compare_key: Key to use for comparison ('external_id' or 'name')
:param target_config: Target configuration dict
:param force: Skip confirmation if True
:return: True on success, False on failure, None if nothing to clone
"""
source_cloud = cloudinary.config().cloud_name
target_cloud = target_config['cloud_name']

# List source items
logger.info(style(f"Listing metadata {item_type} in `{source_cloud}`.", fg="blue"))
source_items = list_metadata_items(item_type)

if not source_items:
logger.info(style(f"No metadata {item_type} found in `{source_cloud}`.", fg="yellow"))
return False

logger.info(style(f"{len(source_items)} metadata {item_type} found in `{source_cloud}`.", fg="green"))

# List target items
logger.info(style(f"Listing metadata {item_type} in `{target_cloud}`.", fg="blue"))
target_items = list_metadata_items(item_type, **target_config)
logger.info(style(f"{len(target_items)} metadata {item_type} found in `{target_cloud}`.", fg="green"))

# Compare and sync
source_map, only_in_source, common = compare_dicts(source_items, target_items, compare_key)
return sync_metadata_items(source_map, only_in_source, common, item_type, force, **target_config)

def list_metadata_items(item_type, **options):
"""
List metadata fields or rules.

:param item_type: Either 'fields' or 'rules'
:param options: Cloudinary API options (cloud_name, api_key, etc.)
:return: List of metadata items
"""
api_method = getattr(cloudinary.api, f'list_metadata_{item_type}')
res = api_method(**options)
return res.get(f'metadata_{item_type}', [])

def sync_metadata_items(source_metadata_items, only_in_source_items, common_items, item_type, force, **options):
source_cloud = cloudinary.config().cloud_name
target_cloud = options['cloud_name']
succeeded = []
failed = []


if not only_in_source_items:
logger.info(style(
f"All metadata {item_type} from `{source_cloud}` already exist in `{target_cloud}`. "
f"No metadata {item_type} cloning needed.",
fg="yellow"
))
return True

logger.info(style(
f"Metadata {item_type} {only_in_source_items} will be cloned from `{source_cloud}` to `{target_cloud}`.",
fg="yellow"
))

if common_items:
logger.info(style(
f"Metadata {item_type} {list(common_items)} exist in both clouds and will be skipped.",
fg="yellow"
))
if not force:
if not confirm_action(
f"Based on the analysis above, \n"
f"The module will now copy the metadata {item_type} from {cloudinary.config().cloud_name} to {dict(options)['cloud_name']}.\n"
f"Continue? (y/N)"):
logger.info("Stopping.")
return False
else:
logger.info("Continuing. You may use the -F "
"flag to skip confirmation.")

add_method = METADATA_API_METHODS.get(item_type)
singular = METADATA_TYPE_SINGULAR.get(item_type)

for key_field in only_in_source_items:
try:
add_method(source_metadata_items[key_field], **options)
succeeded.append(key_field)
logger.info(style(f"Successfully created metadata {singular} `{key_field}` in `{target_cloud}`", fg="green"))
except Exception as e:
failed.append((key_field, str(e)))
logger.error(style(
f"Failed to create metadata {singular} `{key_field}` in `{target_cloud}`: {e}",
fg="red"
))

# Summary
if failed:
logger.warning(style(
f"Cloned {len(succeeded)}/{len(only_in_source_items)} {item_type} successfully. "
f"{len(failed)} failed.",
fg="yellow"
))
return False # Or consider partial success handling

if succeeded:
logger.info(style(f"Successfully cloned {len(succeeded)} metadata {item_type}.", fg="green"))

return True
24 changes: 24 additions & 0 deletions cloudinary_cli/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,3 +401,27 @@ def split_opt(opt):
if opt[1:2] == first:
return opt[:2], opt[2:]
return first, opt[1:]


def compare_dicts(dict1, dict2, compare_key):#
"""
Diff between two dictionaries.

This function is used to compare two dictionaries and return the keys that are only in the first dictionary,
the keys that are only in the second dictionary, and the keys that are in both dictionaries.
The compare_key is a unique key to compare the dictionaries by.
For Phase 3 - add deep diff between two lists of dictionaries.
Example for phase 3: compare metadata fields and their datasource
diffs = {}
for k in set(dict1.keys()).union(dict2.keys()):
if dict1.get(k) != dict2.get(k):
diffs[k] = {"json_source": dict1.get(k), "json_target": dict2.get(k)}
"""
list_dict1 = {item[compare_key]: item for item in dict1}
list_dict2 = {item[compare_key]: item for item in dict2}

only_in_dict1 = list(list_dict1.keys() - list_dict2.keys())
#only_in_dict2 = list(list_dict2.keys() - list_dict1.keys()) not needed for now
common = list_dict1.keys() & list_dict2.keys()

return list_dict1, only_in_dict1, common
Loading