diff --git a/cloudinary_cli/modules/clone.py b/cloudinary_cli/modules/clone.py index 44bc856..f7d97f0 100644 --- a/cloudinary_cli/modules/clone.py +++ b/cloudinary_cli/modules/clone.py @@ -1,9 +1,11 @@ from click import command, option, style, argument from cloudinary_cli.utils.utils import normalize_list_params, print_help_and_exit import cloudinary +from cloudinary_cli.utils.clone.metadata import clone_metadata from cloudinary.auth_token import _digest from cloudinary_cli.utils.utils import run_tasks_concurrently from cloudinary_cli.utils.api_utils import upload_file +from cloudinary_cli.utils.json_utils import print_json from cloudinary_cli.utils.config_utils import get_cloudinary_config, config_to_dict from cloudinary_cli.defaults import logger from cloudinary_cli.core.search import execute_single_request, handle_auto_pagination @@ -23,7 +25,7 @@ Format: cld clone `` can be a CLOUDINARY_URL or a saved config (see `config` command) Example 1 (Copy all assets including tags and context using CLOUDINARY URL): - cld clone cloudinary://:@ -fi tags,context + cld clone cloudinary://:@ -fi tags,context,metadata Example 2 (Copy all assets with a specific tag via a search expression using a saved config): cld clone -se "tags:" """) @@ -36,7 +38,7 @@ help="Specify the number of concurrent network threads.") @option("-fi", "--fields", multiple=True, help=("Specify whether to copy tags and/or context. " - "Valid options: `tags,context`.")) + "Valid options: `tags,context,metadata`.")) @option("-se", "--search_exp", default="", help="Define a search expression to filter the assets to clone.") @option("--async", "async_", is_flag=True, default=False, @@ -54,7 +56,12 @@ def clone(target, force, overwrite, concurrent_workers, fields, target_config, auth_token = _validate_clone_inputs(target) if not target_config: return False - + if 'metadata' in normalize_list_params(fields): + metadata_clone = clone_metadata(target_config, force) + if not metadata_clone: + return False + else: + logger.info(style(f"The metadata process from {cloudinary.config().cloud_name} to {target_config.cloud_name} is now done. We will now proceed with cloning the assets.", fg="green")) source_assets = search_assets(search_exp, force) if not source_assets: return False @@ -92,6 +99,7 @@ def _validate_clone_inputs(target): "as source environment.") return None, None + auth_token = cloudinary.config().auth_token if auth_token: # It is important to validate auth_token if provided as this prevents @@ -119,7 +127,6 @@ def _prepare_upload_list(source_assets, target_config, overwrite, async_, upload_list.append((asset_url, {**updated_options})) return upload_list - def search_assets(search_exp, force): search_exp = _normalize_search_expression(search_exp) if not search_exp: @@ -127,7 +134,7 @@ def search_assets(search_exp, force): search = cloudinary.search.Search().expression(search_exp) search.fields(['tags', 'context', 'access_control', - 'secure_url', 'display_name', 'format']) + 'secure_url', 'display_name', 'metadata', 'format']) search.max_results(DEFAULT_MAX_RESULTS) res = execute_single_request(search, fields_to_keep="") @@ -135,7 +142,6 @@ def search_assets(search_exp, force): return res - def _normalize_search_expression(search_exp): """ Ensures the search expression has a valid 'type' filter. diff --git a/cloudinary_cli/utils/api_utils.py b/cloudinary_cli/utils/api_utils.py index 8927b43..0c79889 100644 --- a/cloudinary_cli/utils/api_utils.py +++ b/cloudinary_cli/utils/api_utils.py @@ -366,3 +366,5 @@ def handle_auto_pagination(res, func, args, kwargs, force, filter_fields): all_results.pop(cursor_field, None) return all_results + + diff --git a/cloudinary_cli/utils/clone/metadata.py b/cloudinary_cli/utils/clone/metadata.py new file mode 100644 index 0000000..ac26722 --- /dev/null +++ b/cloudinary_cli/utils/clone/metadata.py @@ -0,0 +1,144 @@ +import cloudinary +from cloudinary_cli.utils.config_utils import config_to_dict +from cloudinary_cli.utils.api_utils import call_api +from cloudinary_cli.defaults import logger +from cloudinary_cli.utils.utils import confirm_action, compare_dicts +from click import style + +METADATA_FIELDS = "fields" +METADATA_RULES = "rules" +COMPARE_KEY_FIELDS = "external_id" +COMPARE_KEY_RULES = "name" +METADATA_TYPE_SINGULAR = { + "fields": "field", + "rules": "rule" +} +METADATA_API_METHODS = { + "fields": cloudinary.api.add_metadata_field, + "rules": cloudinary.api.add_metadata_rule +} + +def clone_metadata(config, force): + """Clone metadata fields and rules from source to target.""" + target_config = config_to_dict(config) + + # Clone fields (required) + fields_result = _clone_metadata_type(METADATA_FIELDS, COMPARE_KEY_FIELDS, target_config, force) + if fields_result is False: + return False + + # Clone rules (optional) + rules_result = _clone_metadata_type(METADATA_RULES, COMPARE_KEY_RULES, target_config, force) + if rules_result is False: + return False + + return True + +def _clone_metadata_type(item_type, compare_key, target_config, force): + """ + Generic function to clone a metadata type (fields or rules). + + :param item_type: 'fields' or 'rules' + :param compare_key: Key to use for comparison ('external_id' or 'name') + :param target_config: Target configuration dict + :param force: Skip confirmation if True + :return: True on success, False on failure, None if nothing to clone + """ + source_cloud = cloudinary.config().cloud_name + target_cloud = target_config['cloud_name'] + + # List source items + logger.info(style(f"Listing metadata {item_type} in `{source_cloud}`.", fg="blue")) + source_items = list_metadata_items(item_type) + + if not source_items: + logger.info(style(f"No metadata {item_type} found in `{source_cloud}`.", fg="yellow")) + return False + + logger.info(style(f"{len(source_items)} metadata {item_type} found in `{source_cloud}`.", fg="green")) + + # List target items + logger.info(style(f"Listing metadata {item_type} in `{target_cloud}`.", fg="blue")) + target_items = list_metadata_items(item_type, **target_config) + logger.info(style(f"{len(target_items)} metadata {item_type} found in `{target_cloud}`.", fg="green")) + + # Compare and sync + source_map, only_in_source, common = compare_dicts(source_items, target_items, compare_key) + return sync_metadata_items(source_map, only_in_source, common, item_type, force, **target_config) + +def list_metadata_items(item_type, **options): + """ + List metadata fields or rules. + + :param item_type: Either 'fields' or 'rules' + :param options: Cloudinary API options (cloud_name, api_key, etc.) + :return: List of metadata items + """ + api_method = getattr(cloudinary.api, f'list_metadata_{item_type}') + res = api_method(**options) + return res.get(f'metadata_{item_type}', []) + +def sync_metadata_items(source_metadata_items, only_in_source_items, common_items, item_type, force, **options): + source_cloud = cloudinary.config().cloud_name + target_cloud = options['cloud_name'] + succeeded = [] + failed = [] + + + if not only_in_source_items: + logger.info(style( + f"All metadata {item_type} from `{source_cloud}` already exist in `{target_cloud}`. " + f"No metadata {item_type} cloning needed.", + fg="yellow" + )) + return True + + logger.info(style( + f"Metadata {item_type} {only_in_source_items} will be cloned from `{source_cloud}` to `{target_cloud}`.", + fg="yellow" + )) + + if common_items: + logger.info(style( + f"Metadata {item_type} {list(common_items)} exist in both clouds and will be skipped.", + fg="yellow" + )) + if not force: + if not confirm_action( + f"Based on the analysis above, \n" + f"The module will now copy the metadata {item_type} from {cloudinary.config().cloud_name} to {dict(options)['cloud_name']}.\n" + f"Continue? (y/N)"): + logger.info("Stopping.") + return False + else: + logger.info("Continuing. You may use the -F " + "flag to skip confirmation.") + + add_method = METADATA_API_METHODS.get(item_type) + singular = METADATA_TYPE_SINGULAR.get(item_type) + + for key_field in only_in_source_items: + try: + add_method(source_metadata_items[key_field], **options) + succeeded.append(key_field) + logger.info(style(f"Successfully created metadata {singular} `{key_field}` in `{target_cloud}`", fg="green")) + except Exception as e: + failed.append((key_field, str(e))) + logger.error(style( + f"Failed to create metadata {singular} `{key_field}` in `{target_cloud}`: {e}", + fg="red" + )) + + # Summary + if failed: + logger.warning(style( + f"Cloned {len(succeeded)}/{len(only_in_source_items)} {item_type} successfully. " + f"{len(failed)} failed.", + fg="yellow" + )) + return False # Or consider partial success handling + + if succeeded: + logger.info(style(f"Successfully cloned {len(succeeded)} metadata {item_type}.", fg="green")) + + return True \ No newline at end of file diff --git a/cloudinary_cli/utils/utils.py b/cloudinary_cli/utils/utils.py index e0c69a4..3e77955 100644 --- a/cloudinary_cli/utils/utils.py +++ b/cloudinary_cli/utils/utils.py @@ -401,3 +401,27 @@ def split_opt(opt): if opt[1:2] == first: return opt[:2], opt[2:] return first, opt[1:] + + +def compare_dicts(dict1, dict2, compare_key):# + """ + Diff between two dictionaries. + + This function is used to compare two dictionaries and return the keys that are only in the first dictionary, + the keys that are only in the second dictionary, and the keys that are in both dictionaries. + The compare_key is a unique key to compare the dictionaries by. + For Phase 3 - add deep diff between two lists of dictionaries. + Example for phase 3: compare metadata fields and their datasource + diffs = {} + for k in set(dict1.keys()).union(dict2.keys()): + if dict1.get(k) != dict2.get(k): + diffs[k] = {"json_source": dict1.get(k), "json_target": dict2.get(k)} + """ + list_dict1 = {item[compare_key]: item for item in dict1} + list_dict2 = {item[compare_key]: item for item in dict2} + + only_in_dict1 = list(list_dict1.keys() - list_dict2.keys()) + #only_in_dict2 = list(list_dict2.keys() - list_dict1.keys()) not needed for now + common = list_dict1.keys() & list_dict2.keys() + + return list_dict1, only_in_dict1, common \ No newline at end of file diff --git a/test/test_modules/test_cli_clone.py b/test/test_modules/test_cli_clone.py index a4843b1..9af3bc7 100644 --- a/test/test_modules/test_cli_clone.py +++ b/test/test_modules/test_cli_clone.py @@ -8,7 +8,10 @@ import cloudinary_cli.modules clone_module = sys.modules['cloudinary_cli.modules.clone'] +import cloudinary_cli.utils.clone.metadata as clone_metadata_utils + from cloudinary_cli.defaults import logger +from types import SimpleNamespace class TestCLIClone(unittest.TestCase): @@ -29,6 +32,143 @@ def setUp(self): } ] } + self.mock_target_config = { + 'cloud_name': 'target-cloud', + 'api_key': 'target-key', + 'api_secret': 'target-secret' + } + self.mock_source_config = { + 'cloud_name': 'source-cloud', + 'api_key': 'source-key', + 'api_secret': 'source-secret' + } + self.mock_fields_result = [ + { + 'external_id': 'test_field', + 'type': 'string', + 'label': 'Test Field', + 'mandatory': False + } + ] + self.mock_rules_result = [ + { + 'name': 'test_rule', + 'condition': 'if', + 'metadata_field': { + 'external_id': 'test_field' + }, + 'results': [{ + 'value': 'test_value', + 'apply_to': ['metadata_field_external_id'] + }] + } + ] + + @patch.object(clone_metadata_utils, 'list_metadata_items') + def test_list_metadata_fields(self, mock_metadata_fields): + """Test listing metadata fields""" + mock_metadata_fields.return_value = { + 'metadata_fields': self.mock_fields_result + } + + result = clone_metadata_utils.list_metadata_items('fields') + + mock_metadata_fields.assert_called_once() + self.assertEqual(result, mock_metadata_fields.return_value) + + @patch.object(clone_metadata_utils, 'list_metadata_items') + def test_list_metadata_rules(self, mock_metadata_rules): + """Test listing metadata fields""" + mock_metadata_rules.return_value = { + 'metadata_rules': self.mock_rules_result + } + + result = clone_metadata_utils.list_metadata_items('rules') + + mock_metadata_rules.assert_called_once_with('rules') + self.assertEqual(result, mock_metadata_rules.return_value) + + @patch.object(clone_metadata_utils, 'sync_metadata_items') + @patch.object(clone_metadata_utils, 'list_metadata_items') + @patch.object(clone_metadata_utils, 'compare_dicts') + @patch('cloudinary.config') + def test_clone_metadata_type_fields_success(self, mock_config, mock_compare, mock_list, mock_sync): + """Test _clone_metadata_type for fields""" + mock_config.return_value.cloud_name = 'source-cloud' + mock_list.return_value = [ + {'external_id': 'field1', 'type': 'string'} + ] + mock_compare.return_value = ( + {'field1': {'external_id': 'field1'}}, # source_map + ['field1'], # only_in_source + set() # common + ) + mock_sync.return_value = True + + result = clone_metadata_utils._clone_metadata_type( + 'fields', 'external_id', self.mock_target_config, False + ) + + self.assertTrue(result) + mock_list.assert_any_call('fields') + mock_list.assert_any_call('fields', **self.mock_target_config) + mock_compare.assert_called_once() + mock_sync.assert_called_once() + + @patch.object(clone_metadata_utils, 'sync_metadata_items') + @patch.object(clone_metadata_utils, 'list_metadata_items') + @patch.object(clone_metadata_utils, 'compare_dicts') + @patch('cloudinary.config') + def test_clone_metadata_type_rules_success(self, mock_config, mock_compare, mock_list, mock_sync): + """Test _clone_metadata_type for rules""" + mock_config.return_value.cloud_name = 'source-cloud' + mock_list.return_value = [ + {'name': 'rule1', 'condition': 'if'} + ] + mock_compare.return_value = ( + {'rule1': {'name': 'rule1'}}, + ['rule1'], + set() + ) + mock_sync.return_value = True + + result = clone_metadata_utils._clone_metadata_type( + 'rules', 'name', self.mock_target_config, False + ) + + self.assertTrue(result) + mock_list.assert_any_call('rules') + mock_list.assert_any_call('rules', **self.mock_target_config) + mock_compare.assert_called_once() + mock_sync.assert_called_once() + + @patch.object(clone_metadata_utils, 'list_metadata_items') + @patch('cloudinary.config') + def test_clone_metadata_fields_no_source_items(self, mock_config, mock_list): + """Test _clone_metadata_type for fields when source has no items""" + mock_config.return_value.cloud_name = 'source-cloud' + mock_list.return_value = [] + + result = clone_metadata_utils._clone_metadata_type( + 'fields', 'external_id', self.mock_target_config, False + ) + + self.assertFalse(result) + mock_list.assert_called_once_with('fields') + + @patch.object(clone_metadata_utils, 'list_metadata_items') + @patch('cloudinary.config') + def test_clone_metadata_rules_no_source_items(self, mock_config, mock_list): + """Test _clone_metadata_type for rules when source has no items""" + mock_config.return_value.cloud_name = 'source-cloud' + mock_list.return_value = [] + + result = clone_metadata_utils._clone_metadata_type( + 'rules', 'name', self.mock_target_config, False + ) + + self.assertFalse(result) + mock_list.assert_called_once_with('rules') @patch.object(clone_module, 'handle_auto_pagination') @patch.object(clone_module, 'execute_single_request') @@ -41,7 +181,6 @@ def test_search_assets_default_expression(self, mock_search_class, mock_execute, mock_pagination.return_value = self.mock_search_result result = clone_module.search_assets(force=True, search_exp="") - # Verify default search expression is used mock_search.expression.assert_called_with("type:upload OR type:private OR type:authenticated") self.assertEqual(result, self.mock_search_result)