Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion kubemarine/core/annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@
# limitations under the License.
from typing import Callable, TypeVar

from typing_extensions import ParamSpec
try:
# Prefer stdlib ParamSpec on modern Python to avoid compatibility issues
# with typing_extensions on newer interpreters (e.g. Python 3.13).
from typing import ParamSpec # type: ignore[attr-defined]
except ImportError: # pragma: no cover - fallback for older Python versions
from typing_extensions import ParamSpec # type: ignore[no-redef]

from kubemarine.core.group import NodeGroup

Expand Down
59 changes: 57 additions & 2 deletions kubemarine/core/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,10 +489,12 @@ def get_node_name(self, host: _AnyConnectionTypes) -> str:

def make_group_from_nodes(self, node_names: List[str]) -> NodeGroup:
ips = self.get_addresses_from_node_names(node_names)
return self.make_group(ips)
group = self.make_group(ips)
return self._apply_cli_nodes_filter(group)

def make_group_from_roles(self, roles: Sequence[str]) -> NodeGroup:
return self.nodes['all'].having_roles(roles)
group = self.nodes['all'].having_roles(roles)
return self._apply_cli_nodes_filter(group)

def get_new_nodes(self) -> NodeGroup:
return self.nodes['all'].exclude_group(self.previous_nodes['all'])
Expand Down Expand Up @@ -524,6 +526,59 @@ def create_group_from_groups_nodes_names(self, groups_names: List[str], nodes_na

return common_group

def _get_cli_nodes_filter_selectors(self) -> Optional[dict]:
"""
Returns structured nodes filter selectors parsed from CLI, if any.
"""
args: dict = self.context.get('execution_arguments', {})
expr = args.get('nodes')
if not expr:
return None

cached = self.context.get('_cli_nodes_filter_selectors')
if cached is not None:
return cached

try:
selectors = utils.parse_nodes_filter_expr(expr)
except ValueError as exc:
# Fail fast with a clear message while preserving existing error handling flow.
raise Exception(f"Failed to parse --nodes argument: {exc}") from exc

self.context['_cli_nodes_filter_selectors'] = selectors
return selectors or None

def _apply_cli_nodes_filter(self, group: NodeGroup) -> NodeGroup:
"""
Apply CLI-provided --nodes filter to the given group, if any filter is set.
"""
selectors = self._get_cli_nodes_filter_selectors()
if not selectors or group.is_empty():
return group

labels_selector: Optional[Dict[str, str]] = selectors.get('labels') # type: ignore[assignment]
roles_selector: Optional[Sequence[str]] = selectors.get('roles') # type: ignore[assignment]

def match(node: NodeConfig) -> bool:
if labels_selector:
node_labels = node.get('labels') or {}
if not isinstance(node_labels, dict):
return False
for key, expected_value in labels_selector.items():
if node_labels.get(key) != expected_value:
return False

if roles_selector:
node_roles = node.get('roles') or []
if not isinstance(node_roles, list):
return False
if not set(roles_selector).intersection(node_roles):
return False

return True

return group.new_group(match)

def schedule_cumulative_point(self, point_method: Callable) -> None:
self._check_within_flow()

Expand Down
6 changes: 6 additions & 0 deletions kubemarine/core/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,12 @@ def new_tasks_flow_parser(cli_help: str, tasks: dict = None) -> argparse.Argumen
action='store_true',
help='prevent tasks to be executed')

parser.add_argument('--nodes',
default='',
help='limit tasks execution to nodes selected by attributes, '
'for example: --nodes \"labels=region=infra\" or '
'\"labels=region=infra;zone=dc1,roles=worker\"')

parser.add_argument('--tasks',
default='',
help='define comma-separated tasks to be executed')
Expand Down
59 changes: 59 additions & 0 deletions kubemarine/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,65 @@ def parse_aligned_table(table_text: str) -> List[Dict[str, str]]:
return data


def parse_nodes_filter_expr(expr: str) -> Dict[str, object]:
"""
Parse nodes filter expression provided from CLI into structured selectors.

Supported syntax (comma separated parts):
- labels=key1=value1;key2=value2
- roles=role1;role2

Examples:
labels=region=infra
labels=region=infra;zone=dc1,roles=worker
"""
selectors: Dict[str, object] = {}

expr = expr.strip()
if not expr:
return selectors

parts = [p.strip() for p in expr.split(',') if p.strip()]
for part in parts:
if '=' not in part:
raise ValueError(f"Invalid nodes filter fragment {part!r}: expected key=value")

key, rest = part.split('=', 1)
key = key.strip()
rest = rest.strip()
if not rest:
raise ValueError(f"Invalid nodes filter fragment {part!r}: empty value")

if key in ('role', 'roles'):
roles = [r.strip() for r in rest.split(';') if r.strip()]
if not roles:
raise ValueError(f"Invalid roles selector in nodes filter fragment {part!r}")
existing = selectors.setdefault('roles', [])
if not isinstance(existing, list):
raise ValueError("Internal error: roles selector has unexpected type")
existing.extend(roles)
elif key == 'labels':
labels = selectors.setdefault('labels', {})
if not isinstance(labels, dict):
raise ValueError("Internal error: labels selector has unexpected type")

label_parts = [r.strip() for r in rest.split(';') if r.strip()]
for label_part in label_parts:
if '=' not in label_part:
raise ValueError(
f"Invalid labels selector in nodes filter fragment {part!r}: expected label=value")
l_key, l_val = label_part.split('=', 1)
l_key = l_key.strip()
l_val = l_val.strip()
if not l_key:
raise ValueError(f"Invalid labels selector in nodes filter fragment {part!r}: empty label key")
labels[l_key] = l_val
else:
raise ValueError(f"Unsupported nodes filter key {key!r}, supported: 'labels', 'roles'")

return selectors


class ClusterStorage:
"""
File preservation:
Expand Down
63 changes: 51 additions & 12 deletions kubemarine/kubernetes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ def verify_roles(cluster: KubernetesCluster) -> None:
if cluster.context['initial_procedure'] == 'do':
control_plane_roles = ['control-plane', 'master']

if cluster.make_group_from_roles(control_plane_roles).is_empty():
# Use the base nodes group directly so that CLI runtime filters (e.g. --nodes)
# do not interfere with control plane presence validation.
if cluster.nodes['all'].having_roles(control_plane_roles).is_empty():
raise KME("KME0004")


Expand Down Expand Up @@ -1113,20 +1115,57 @@ def get_group_for_upgrade(cluster: KubernetesCluster) -> NodeGroup:
return upgrade_group

version = cluster.inventory["services"]["kubeadm"]["kubernetesVersion"]
if cluster.procedure_inventory.get('upgrade_nodes'):
nodes_for_upgrade = []
for node in cluster.procedure_inventory['upgrade_nodes']:
if isinstance(node, str):
node_name = node
else:
node_name = node['name']
nodes_for_upgrade.append(node_name)
cluster.log.verbose("Node \"%s\" manually scheduled for upgrade." % node_name)
cluster.nodes['control-plane'].get_first_member().sudo('rm -f /etc/kubernetes/nodes-k8s-versions.txt', warn=True)
procedure_upgrade_nodes = cluster.procedure_inventory.get('upgrade_nodes')
if procedure_upgrade_nodes:
if isinstance(procedure_upgrade_nodes, list):
nodes_for_upgrade = []
for node in procedure_upgrade_nodes:
if isinstance(node, str):
node_name = node
else:
node_name = node['name']
nodes_for_upgrade.append(node_name)
cluster.log.verbose("Node \"%s\" manually scheduled for upgrade." % node_name)
# If nodes are specified explicitly, cached nodes versions file is no longer relevant.
cluster.nodes['control-plane'].get_first_member().sudo(
'rm -f /etc/kubernetes/nodes-k8s-versions.txt', warn=True)
upgrade_group = cluster.make_group_from_nodes(nodes_for_upgrade)
elif isinstance(procedure_upgrade_nodes, dict):
selectors = procedure_upgrade_nodes
labels_selector = selectors.get('labels') or {}
roles_selector = selectors.get('roles') or []

group = cluster.nodes['all']
if roles_selector:
group = cluster.make_group_from_roles(roles_selector)

if labels_selector:
def match_labels(node: dict) -> bool:
node_labels = node.get('labels') or {}
if not isinstance(node_labels, dict):
return False
for key, expected_value in labels_selector.items():
if node_labels.get(key) != expected_value:
return False
return True

group = group.new_group(match_labels)

if group.is_empty():
raise Exception("No nodes match upgrade_nodes selector in procedure inventory.")

upgrade_group = group
for node_name in upgrade_group.get_nodes_names():
cluster.log.verbose("Node \"%s\" scheduled for upgrade by selector." % node_name)

cluster.nodes['control-plane'].get_first_member().sudo(
'rm -f /etc/kubernetes/nodes-k8s-versions.txt', warn=True)
else:
raise Exception("Unsupported 'upgrade_nodes' format in procedure inventory.")
else:
nodes_for_upgrade = autodetect_non_upgraded_nodes(cluster, version)
upgrade_group = cluster.make_group_from_nodes(nodes_for_upgrade)

upgrade_group = cluster.make_group_from_nodes(nodes_for_upgrade)
cluster.context['upgrade_group'] = upgrade_group

return upgrade_group
Expand Down
54 changes: 38 additions & 16 deletions kubemarine/resources/schemas/upgrade.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,45 @@
"description": "List of new versions through which to upgrade the cluster to the last specified version"
},
"upgrade_nodes": {
"type": "array",
"description": "Manually specify certain nodes that need to be upgraded. Each item can be either a node name referring to the cluster.yaml, or full node specification.",
"items": {
"oneOf": [
{"$ref": "definitions/common/node_ref.json#/definitions/Name"},
{
"type": "object",
"allOf": [{"$ref": "definitions/node.json#/definitions/Properties"}],
"required": ["name"],
"propertyNames": {
"$ref": "definitions/node.json#/definitions/PropertyNames"
"description": "Manually specify certain nodes that need to be upgraded. Each item can be either a node name referring to the cluster.yaml, full node specification, or a selector object with labels/roles.",
"oneOf": [
{
"type": "array",
"items": {
"oneOf": [
{"$ref": "definitions/common/node_ref.json#/definitions/Name"},
{
"type": "object",
"allOf": [{"$ref": "definitions/node.json#/definitions/Properties"}],
"required": ["name"],
"propertyNames": {
"$ref": "definitions/node.json#/definitions/PropertyNames"
}
}
]
},
"minItems": 1,
"uniqueItems": true
},
{
"type": "object",
"description": "Selectors to choose nodes for upgrade by attributes instead of explicit names.",
"properties": {
"labels": {
"type": "object",
"description": "Node labels that must be matched. All specified labels must be present on the node.",
"additionalProperties": {
"type": ["string", "boolean", "integer"]
}
},
"roles": {
"$ref": "definitions/common/node_ref.json#/definitions/Roles"
}
}
]
},
"minItems": 1,
"uniqueItems": true
},
"additionalProperties": false,
"minProperties": 1
}
]
},
"disable-eviction": {"$ref": "definitions/procedures.json#/definitions/DisableEviction"},
"prepull_group_size": {"$ref": "definitions/procedures.json#/definitions/PrepullGroupSize"},
Expand Down
Loading