diff --git a/kubemarine/core/annotations.py b/kubemarine/core/annotations.py index 6f634b3fe..864577a2e 100644 --- a/kubemarine/core/annotations.py +++ b/kubemarine/core/annotations.py @@ -13,7 +13,12 @@ # limitations under the License. from typing import Callable, TypeVar -from typing_extensions import ParamSpec +try: + # Prefer stdlib ParamSpec on modern Python to avoid compatibility issues + # with typing_extensions on newer interpreters (e.g. Python 3.13). + from typing import ParamSpec # type: ignore[attr-defined] +except ImportError: # pragma: no cover - fallback for older Python versions + from typing_extensions import ParamSpec # type: ignore[no-redef] from kubemarine.core.group import NodeGroup diff --git a/kubemarine/core/cluster.py b/kubemarine/core/cluster.py index fa852726d..1827a80ad 100755 --- a/kubemarine/core/cluster.py +++ b/kubemarine/core/cluster.py @@ -489,10 +489,12 @@ def get_node_name(self, host: _AnyConnectionTypes) -> str: def make_group_from_nodes(self, node_names: List[str]) -> NodeGroup: ips = self.get_addresses_from_node_names(node_names) - return self.make_group(ips) + group = self.make_group(ips) + return self._apply_cli_nodes_filter(group) def make_group_from_roles(self, roles: Sequence[str]) -> NodeGroup: - return self.nodes['all'].having_roles(roles) + group = self.nodes['all'].having_roles(roles) + return self._apply_cli_nodes_filter(group) def get_new_nodes(self) -> NodeGroup: return self.nodes['all'].exclude_group(self.previous_nodes['all']) @@ -524,6 +526,59 @@ def create_group_from_groups_nodes_names(self, groups_names: List[str], nodes_na return common_group + def _get_cli_nodes_filter_selectors(self) -> Optional[dict]: + """ + Returns structured nodes filter selectors parsed from CLI, if any. + """ + args: dict = self.context.get('execution_arguments', {}) + expr = args.get('nodes') + if not expr: + return None + + cached = self.context.get('_cli_nodes_filter_selectors') + if cached is not None: + return cached + + try: + selectors = utils.parse_nodes_filter_expr(expr) + except ValueError as exc: + # Fail fast with a clear message while preserving existing error handling flow. + raise Exception(f"Failed to parse --nodes argument: {exc}") from exc + + self.context['_cli_nodes_filter_selectors'] = selectors + return selectors or None + + def _apply_cli_nodes_filter(self, group: NodeGroup) -> NodeGroup: + """ + Apply CLI-provided --nodes filter to the given group, if any filter is set. + """ + selectors = self._get_cli_nodes_filter_selectors() + if not selectors or group.is_empty(): + return group + + labels_selector: Optional[Dict[str, str]] = selectors.get('labels') # type: ignore[assignment] + roles_selector: Optional[Sequence[str]] = selectors.get('roles') # type: ignore[assignment] + + def match(node: NodeConfig) -> bool: + if labels_selector: + node_labels = node.get('labels') or {} + if not isinstance(node_labels, dict): + return False + for key, expected_value in labels_selector.items(): + if node_labels.get(key) != expected_value: + return False + + if roles_selector: + node_roles = node.get('roles') or [] + if not isinstance(node_roles, list): + return False + if not set(roles_selector).intersection(node_roles): + return False + + return True + + return group.new_group(match) + def schedule_cumulative_point(self, point_method: Callable) -> None: self._check_within_flow() diff --git a/kubemarine/core/flow.py b/kubemarine/core/flow.py index b4ddbaa65..96033023b 100755 --- a/kubemarine/core/flow.py +++ b/kubemarine/core/flow.py @@ -434,6 +434,12 @@ def new_tasks_flow_parser(cli_help: str, tasks: dict = None) -> argparse.Argumen action='store_true', help='prevent tasks to be executed') + parser.add_argument('--nodes', + default='', + help='limit tasks execution to nodes selected by attributes, ' + 'for example: --nodes \"labels=region=infra\" or ' + '\"labels=region=infra;zone=dc1,roles=worker\"') + parser.add_argument('--tasks', default='', help='define comma-separated tasks to be executed') diff --git a/kubemarine/core/utils.py b/kubemarine/core/utils.py index 44a306ab0..1e4c9d919 100755 --- a/kubemarine/core/utils.py +++ b/kubemarine/core/utils.py @@ -623,6 +623,65 @@ def parse_aligned_table(table_text: str) -> List[Dict[str, str]]: return data +def parse_nodes_filter_expr(expr: str) -> Dict[str, object]: + """ + Parse nodes filter expression provided from CLI into structured selectors. + + Supported syntax (comma separated parts): + - labels=key1=value1;key2=value2 + - roles=role1;role2 + + Examples: + labels=region=infra + labels=region=infra;zone=dc1,roles=worker + """ + selectors: Dict[str, object] = {} + + expr = expr.strip() + if not expr: + return selectors + + parts = [p.strip() for p in expr.split(',') if p.strip()] + for part in parts: + if '=' not in part: + raise ValueError(f"Invalid nodes filter fragment {part!r}: expected key=value") + + key, rest = part.split('=', 1) + key = key.strip() + rest = rest.strip() + if not rest: + raise ValueError(f"Invalid nodes filter fragment {part!r}: empty value") + + if key in ('role', 'roles'): + roles = [r.strip() for r in rest.split(';') if r.strip()] + if not roles: + raise ValueError(f"Invalid roles selector in nodes filter fragment {part!r}") + existing = selectors.setdefault('roles', []) + if not isinstance(existing, list): + raise ValueError("Internal error: roles selector has unexpected type") + existing.extend(roles) + elif key == 'labels': + labels = selectors.setdefault('labels', {}) + if not isinstance(labels, dict): + raise ValueError("Internal error: labels selector has unexpected type") + + label_parts = [r.strip() for r in rest.split(';') if r.strip()] + for label_part in label_parts: + if '=' not in label_part: + raise ValueError( + f"Invalid labels selector in nodes filter fragment {part!r}: expected label=value") + l_key, l_val = label_part.split('=', 1) + l_key = l_key.strip() + l_val = l_val.strip() + if not l_key: + raise ValueError(f"Invalid labels selector in nodes filter fragment {part!r}: empty label key") + labels[l_key] = l_val + else: + raise ValueError(f"Unsupported nodes filter key {key!r}, supported: 'labels', 'roles'") + + return selectors + + class ClusterStorage: """ File preservation: diff --git a/kubemarine/kubernetes/__init__.py b/kubemarine/kubernetes/__init__.py index e52ed387c..93b0dd10b 100644 --- a/kubemarine/kubernetes/__init__.py +++ b/kubemarine/kubernetes/__init__.py @@ -114,7 +114,9 @@ def verify_roles(cluster: KubernetesCluster) -> None: if cluster.context['initial_procedure'] == 'do': control_plane_roles = ['control-plane', 'master'] - if cluster.make_group_from_roles(control_plane_roles).is_empty(): + # Use the base nodes group directly so that CLI runtime filters (e.g. --nodes) + # do not interfere with control plane presence validation. + if cluster.nodes['all'].having_roles(control_plane_roles).is_empty(): raise KME("KME0004") @@ -1113,20 +1115,57 @@ def get_group_for_upgrade(cluster: KubernetesCluster) -> NodeGroup: return upgrade_group version = cluster.inventory["services"]["kubeadm"]["kubernetesVersion"] - if cluster.procedure_inventory.get('upgrade_nodes'): - nodes_for_upgrade = [] - for node in cluster.procedure_inventory['upgrade_nodes']: - if isinstance(node, str): - node_name = node - else: - node_name = node['name'] - nodes_for_upgrade.append(node_name) - cluster.log.verbose("Node \"%s\" manually scheduled for upgrade." % node_name) - cluster.nodes['control-plane'].get_first_member().sudo('rm -f /etc/kubernetes/nodes-k8s-versions.txt', warn=True) + procedure_upgrade_nodes = cluster.procedure_inventory.get('upgrade_nodes') + if procedure_upgrade_nodes: + if isinstance(procedure_upgrade_nodes, list): + nodes_for_upgrade = [] + for node in procedure_upgrade_nodes: + if isinstance(node, str): + node_name = node + else: + node_name = node['name'] + nodes_for_upgrade.append(node_name) + cluster.log.verbose("Node \"%s\" manually scheduled for upgrade." % node_name) + # If nodes are specified explicitly, cached nodes versions file is no longer relevant. + cluster.nodes['control-plane'].get_first_member().sudo( + 'rm -f /etc/kubernetes/nodes-k8s-versions.txt', warn=True) + upgrade_group = cluster.make_group_from_nodes(nodes_for_upgrade) + elif isinstance(procedure_upgrade_nodes, dict): + selectors = procedure_upgrade_nodes + labels_selector = selectors.get('labels') or {} + roles_selector = selectors.get('roles') or [] + + group = cluster.nodes['all'] + if roles_selector: + group = cluster.make_group_from_roles(roles_selector) + + if labels_selector: + def match_labels(node: dict) -> bool: + node_labels = node.get('labels') or {} + if not isinstance(node_labels, dict): + return False + for key, expected_value in labels_selector.items(): + if node_labels.get(key) != expected_value: + return False + return True + + group = group.new_group(match_labels) + + if group.is_empty(): + raise Exception("No nodes match upgrade_nodes selector in procedure inventory.") + + upgrade_group = group + for node_name in upgrade_group.get_nodes_names(): + cluster.log.verbose("Node \"%s\" scheduled for upgrade by selector." % node_name) + + cluster.nodes['control-plane'].get_first_member().sudo( + 'rm -f /etc/kubernetes/nodes-k8s-versions.txt', warn=True) + else: + raise Exception("Unsupported 'upgrade_nodes' format in procedure inventory.") else: nodes_for_upgrade = autodetect_non_upgraded_nodes(cluster, version) + upgrade_group = cluster.make_group_from_nodes(nodes_for_upgrade) - upgrade_group = cluster.make_group_from_nodes(nodes_for_upgrade) cluster.context['upgrade_group'] = upgrade_group return upgrade_group diff --git a/kubemarine/resources/schemas/upgrade.json b/kubemarine/resources/schemas/upgrade.json index fe3e9a1d0..032870252 100644 --- a/kubemarine/resources/schemas/upgrade.json +++ b/kubemarine/resources/schemas/upgrade.json @@ -7,23 +7,45 @@ "description": "List of new versions through which to upgrade the cluster to the last specified version" }, "upgrade_nodes": { - "type": "array", - "description": "Manually specify certain nodes that need to be upgraded. Each item can be either a node name referring to the cluster.yaml, or full node specification.", - "items": { - "oneOf": [ - {"$ref": "definitions/common/node_ref.json#/definitions/Name"}, - { - "type": "object", - "allOf": [{"$ref": "definitions/node.json#/definitions/Properties"}], - "required": ["name"], - "propertyNames": { - "$ref": "definitions/node.json#/definitions/PropertyNames" + "description": "Manually specify certain nodes that need to be upgraded. Each item can be either a node name referring to the cluster.yaml, full node specification, or a selector object with labels/roles.", + "oneOf": [ + { + "type": "array", + "items": { + "oneOf": [ + {"$ref": "definitions/common/node_ref.json#/definitions/Name"}, + { + "type": "object", + "allOf": [{"$ref": "definitions/node.json#/definitions/Properties"}], + "required": ["name"], + "propertyNames": { + "$ref": "definitions/node.json#/definitions/PropertyNames" + } + } + ] + }, + "minItems": 1, + "uniqueItems": true + }, + { + "type": "object", + "description": "Selectors to choose nodes for upgrade by attributes instead of explicit names.", + "properties": { + "labels": { + "type": "object", + "description": "Node labels that must be matched. All specified labels must be present on the node.", + "additionalProperties": { + "type": ["string", "boolean", "integer"] + } + }, + "roles": { + "$ref": "definitions/common/node_ref.json#/definitions/Roles" } - } - ] - }, - "minItems": 1, - "uniqueItems": true + }, + "additionalProperties": false, + "minProperties": 1 + } + ] }, "disable-eviction": {"$ref": "definitions/procedures.json#/definitions/DisableEviction"}, "prepull_group_size": {"$ref": "definitions/procedures.json#/definitions/PrepullGroupSize"}, diff --git a/test/unit/core/test_nodes_filter.py b/test/unit/core/test_nodes_filter.py new file mode 100644 index 000000000..d68371c64 --- /dev/null +++ b/test/unit/core/test_nodes_filter.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python3 +# Copyright 2021-2022 NetCracker Technology Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +from kubemarine import demo +from kubemarine.core import utils, group as core_group +from kubemarine.kubernetes import get_group_for_upgrade + + +class ParseNodesFilterExpr(unittest.TestCase): + def test_empty(self): + self.assertEqual({}, utils.parse_nodes_filter_expr('')) + self.assertEqual({}, utils.parse_nodes_filter_expr(' ')) + + def test_labels_only_single(self): + selectors = utils.parse_nodes_filter_expr('labels=region=infra') + self.assertEqual({'labels': {'region': 'infra'}}, selectors) + + def test_labels_multiple(self): + selectors = utils.parse_nodes_filter_expr('labels=region=infra;zone=dc1') + self.assertEqual({'labels': {'region': 'infra', 'zone': 'dc1'}}, selectors) + + def test_roles_only(self): + selectors = utils.parse_nodes_filter_expr('roles=worker') + self.assertEqual({'roles': ['worker']}, selectors) + + def test_roles_multiple(self): + selectors = utils.parse_nodes_filter_expr('roles=worker;control-plane') + self.assertEqual({'roles': ['worker', 'control-plane']}, selectors) + + def test_labels_and_roles(self): + selectors = utils.parse_nodes_filter_expr('labels=region=infra;zone=dc1,roles=worker') + self.assertEqual({'labels': {'region': 'infra', 'zone': 'dc1'}, 'roles': ['worker']}, selectors) + + +class CliNodesFilterApplication(unittest.TestCase): + def setUp(self) -> None: + # All-in-one inventory, add labels per node to distinguish them. + self.inventory = demo.generate_inventory(**demo.FULLHA) + for i, node in enumerate(self.inventory['nodes']): + if 'worker' in node['roles']: + node.setdefault('labels', {}) + node['labels']['region'] = 'infra' if i % 2 == 0 else 'edge' + + def test_make_group_from_roles_filtered_by_labels(self): + context = demo.create_silent_context(['--tasks', 'deploy', '--nodes', 'labels=region=infra']) + cluster = demo.new_cluster(self.inventory, context=context) + + group = cluster.make_group_from_roles(['worker']) + worker_names = set(group.get_nodes_names()) + + expected = {node['name'] for node in self.inventory['nodes'] + if 'worker' in node['roles'] and node.get('labels', {}).get('region') == 'infra'} + self.assertEqual(expected, worker_names) + + +class UpgradeNodesSelector(unittest.TestCase): + def setUp(self) -> None: + self.inventory = demo.generate_inventory(**demo.FULLHA) + # Mark only a subset of nodes with a specific label. + for node in self.inventory['nodes']: + if 'worker' in node['roles']: + node.setdefault('labels', {}) + node['labels']['region'] = 'infra' + + def test_upgrade_nodes_by_labels_selector(self): + procedure_inventory = { + # Use a version that is present in the compatibility map and higher than the default one + # from the demo inventory, so that upgrade prechecks pass. + 'upgrade_plan': ['v1.34.1'], + 'upgrade_nodes': { + 'labels': { + 'region': 'infra', + }, + }, + } + context = demo.create_silent_context(['procedure.yaml', '--tasks', 'kubernetes'], procedure='upgrade') + # Emulate the first upgrade step as the real upgrade procedure does. + context['upgrade_step'] = 0 + # Use demo.new_cluster to get a cluster with fully mocked nodes context. + cluster = demo.new_cluster(self.inventory, procedure_inventory=procedure_inventory, context=context) + + # Avoid hitting the real sudo call for removing cached versions file on the control plane by + # patching NodeGroup.sudo just for this test. + original_sudo = core_group.NodeGroup.sudo + + def patched_sudo(self, command: str, *args, **kwargs): # type: ignore[override] + if command == 'rm -f /etc/kubernetes/nodes-k8s-versions.txt': + return None + return original_sudo(self, command, *args, **kwargs) + + core_group.NodeGroup.sudo = patched_sudo # type: ignore[assignment] + try: + group = get_group_for_upgrade(cluster) + finally: + core_group.NodeGroup.sudo = original_sudo # type: ignore[assignment] + selected_names = set(group.get_nodes_names()) + expected = {node['name'] for node in self.inventory['nodes'] + if 'worker' in node['roles'] and node.get('labels', {}).get('region') == 'infra'} + self.assertEqual(expected, selected_names) + + +if __name__ == '__main__': + unittest.main() + +