From 4ce186dd13d8c726af47acac0e74b0be1ade5992 Mon Sep 17 00:00:00 2001 From: Pavol Date: Thu, 14 Aug 2025 11:04:32 +0200 Subject: [PATCH 1/7] fix(client): align REST paths to plural via RESOURCE_PATH_MAP --- .../queries/node_context_requests_backend.py | 28 ++++++++++++++----- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/forloop_modules/queries/node_context_requests_backend.py b/forloop_modules/queries/node_context_requests_backend.py index 41bd9cb..3feb4cd 100644 --- a/forloop_modules/queries/node_context_requests_backend.py +++ b/forloop_modules/queries/node_context_requests_backend.py @@ -57,6 +57,21 @@ "initial_variables": ["get_all", "get", "new", "delete"], } +# Single source of truth for resource paths in URLs (plural) +RESOURCE_PATH_MAP = { + "databases": "databases", + "dbtables": "dbtables", + "files": "files", + "scripts": "scripts", + "datasets": "datasets", + "edges": "edges", + "variables": "variables", + "popups": "popups", + "nodes": "nodes", + "pipelines": "pipelines", + "initial_variables": "initial_variables", +} + DB_API_BODY_TEMPLATE = { "projects": APIProject, "triggers": APITrigger, @@ -190,7 +205,6 @@ def new( resource_url = f"{BASE_API}/{resource_name}" response = http_client.post(resource_url, json=payload) - return response new.__signature__ = Signature(params) @@ -236,7 +250,6 @@ def update( resource_url = f"{BASE_API}/{resource_name}/{uid}" response = http_client.put(resource_url, json=payload) - return response update.__signature__ = Signature( @@ -249,24 +262,25 @@ def update( for resource_name, actions in RESOURCES.items(): resource_name_singular = resource_name[:-1] + resource_path = RESOURCE_PATH_MAP.get(resource_name, resource_name) for action in actions: function_name = None if action == 'get_all': - fn = get_all_factory(resource_name) + fn = get_all_factory(resource_path) function_name = f'{action}_{resource_name}' elif action == 'get': - fn = get_factory(resource_name_singular) + fn = get_factory(resource_path) function_name = f'{action}_{resource_name_singular}_by_uid' elif action == 'new': model = DB_API_BODY_TEMPLATE[resource_name] - fn = new_factory(resource_name, model) + fn = new_factory(resource_path, model) function_name = f'{action}_{resource_name_singular}' elif action == 'delete': - fn = delete_factory(resource_name_singular) + fn = delete_factory(resource_path) function_name = f'{action}_{resource_name_singular}_by_uid' elif action == 'update': model = DB_API_BODY_TEMPLATE[resource_name] - fn = update_factory(resource_name_singular, model) + fn = update_factory(resource_path, model) function_name = f'{action}_{resource_name_singular}_by_uid' else: raise Exception('Unknown action') From d984782684322ad1c32dfca4f91af15d27bb2a00 Mon Sep 17 00:00:00 2001 From: Pavol Date: Thu, 14 Aug 2025 11:37:25 +0200 Subject: [PATCH 2/7] Revert "fix(client): align REST paths to plural via RESOURCE_PATH_MAP" This reverts commit 4ce186dd13d8c726af47acac0e74b0be1ade5992. --- .../queries/node_context_requests_backend.py | 28 +++++-------------- 1 file changed, 7 insertions(+), 21 deletions(-) diff --git a/forloop_modules/queries/node_context_requests_backend.py b/forloop_modules/queries/node_context_requests_backend.py index 3feb4cd..41bd9cb 100644 --- a/forloop_modules/queries/node_context_requests_backend.py +++ b/forloop_modules/queries/node_context_requests_backend.py @@ -57,21 +57,6 @@ "initial_variables": ["get_all", "get", "new", "delete"], } -# Single source of truth for resource paths in URLs (plural) -RESOURCE_PATH_MAP = { - "databases": "databases", - "dbtables": "dbtables", - "files": "files", - "scripts": "scripts", - "datasets": "datasets", - "edges": "edges", - "variables": "variables", - "popups": "popups", - "nodes": "nodes", - "pipelines": "pipelines", - "initial_variables": "initial_variables", -} - DB_API_BODY_TEMPLATE = { "projects": APIProject, "triggers": APITrigger, @@ -205,6 +190,7 @@ def new( resource_url = f"{BASE_API}/{resource_name}" response = http_client.post(resource_url, json=payload) + return response new.__signature__ = Signature(params) @@ -250,6 +236,7 @@ def update( resource_url = f"{BASE_API}/{resource_name}/{uid}" response = http_client.put(resource_url, json=payload) + return response update.__signature__ = Signature( @@ -262,25 +249,24 @@ def update( for resource_name, actions in RESOURCES.items(): resource_name_singular = resource_name[:-1] - resource_path = RESOURCE_PATH_MAP.get(resource_name, resource_name) for action in actions: function_name = None if action == 'get_all': - fn = get_all_factory(resource_path) + fn = get_all_factory(resource_name) function_name = f'{action}_{resource_name}' elif action == 'get': - fn = get_factory(resource_path) + fn = get_factory(resource_name_singular) function_name = f'{action}_{resource_name_singular}_by_uid' elif action == 'new': model = DB_API_BODY_TEMPLATE[resource_name] - fn = new_factory(resource_path, model) + fn = new_factory(resource_name, model) function_name = f'{action}_{resource_name_singular}' elif action == 'delete': - fn = delete_factory(resource_path) + fn = delete_factory(resource_name_singular) function_name = f'{action}_{resource_name_singular}_by_uid' elif action == 'update': model = DB_API_BODY_TEMPLATE[resource_name] - fn = update_factory(resource_path, model) + fn = update_factory(resource_name_singular, model) function_name = f'{action}_{resource_name_singular}_by_uid' else: raise Exception('Unknown action') From f26a31b3e25b4df537e60fb2336341bd306e2072 Mon Sep 17 00:00:00 2001 From: Pavol Date: Sat, 16 Aug 2025 13:56:51 +0200 Subject: [PATCH 3/7] fix - create new variable code to nocode --- .../function_handlers/variable_handlers.py | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/forloop_modules/function_handlers/variable_handlers.py b/forloop_modules/function_handlers/variable_handlers.py index a01ca10..c54f8e0 100644 --- a/forloop_modules/function_handlers/variable_handlers.py +++ b/forloop_modules/function_handlers/variable_handlers.py @@ -61,6 +61,30 @@ def make_form_dict_list(self, *args, node_detail_form=None): return fdl + def make_flpl_node_dict(self, line_dict: dict) -> dict: + """ + Creates a NewVariable node dict from parsed code line_dict. + + For assignment like 'a = 1', line_dict contains: + - new_var: "a" + - arguments: ["1"] + - function: None + """ + node = {"type": self.icon_type, "params": {}} + + # Extract variable name from new_var + var_name = line_dict.get("new_var") or "" + + # Extract value from arguments (first argument is the assigned value) + args = line_dict.get("arguments") or [] + var_value = args[0] if len(args) > 0 else "" + + # Set the node parameters in the expected format + node["params"]["variable_name"] = {"variable": None, "value": var_name} + node["params"]["variable_value"] = {"variable": None, "value": var_value} + + return node + def execute(self, node_detail_form): variable_name = node_detail_form.get_chosen_value_by_name("variable_name", variable_handler) variable_value = node_detail_form.get_chosen_value_by_name("variable_value", variable_handler) From 87af0558fe2946ba1080b9763bcabfda5dd0ba26 Mon Sep 17 00:00:00 2001 From: Pavol Date: Wed, 17 Sep 2025 11:34:46 +0200 Subject: [PATCH 4/7] edit - if conditon behaviour in desktop app change checkbox instead of toggle --- .../control_flow_handlers.py | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/forloop_modules/function_handlers/control_flow_handlers.py b/forloop_modules/function_handlers/control_flow_handlers.py index d9df06a..c1ab60b 100644 --- a/forloop_modules/function_handlers/control_flow_handlers.py +++ b/forloop_modules/function_handlers/control_flow_handlers.py @@ -257,24 +257,25 @@ def make_form_dict_list(self, *args, node_detail_form=None): fdl.label("Value 2") fdl.entry(name="value_q",text="",row=3) - fdl.label("Toggle channel") - fdl.button(function=self.toggle_channel, function_args=args, text="Toggle",row=4, focused=True) + fdl.label("Active channel") + fdl.checkbox(name="active_channel", bool_value=True, row=4) - fdl.label("Active channel: True") + #fdl.label(f"Current channel: ", row=5) return fdl - def toggle_channel(self, args): #Refactor to backend - image = args[0] - elements = image.item_detail_form.elements - channel_str = elements[-1].text.split("Active channel: ")[1] - channel = ast.literal_eval(channel_str) - channel = not channel # toggle - elements[-1].text = "Active channel: " + str(channel) - print(args) + # def toggle_channel(self, args): #Refactor to backend + # image = args[0] + # elements = image.item_detail_form.elements + # channel_str = elements[-1].text.split("Active channel: ")[1] + # channel = ast.literal_eval(channel_str) + # channel = not channel # toggle + # elements[-1].text = "Active channel: " + str(channel) - def direct_execute(self, value_p, operator, value_q): + # print(args) + + def direct_execute(self, value_p, operator, value_q, **kwargs): # def __new__(cls, value_p, operator, value_q, *args, **kwargs): try: if isinstance(value_p, list) and operator == 'isempty': From 3cf6c26cc499307a7dd6907de12c939c09172bce Mon Sep 17 00:00:00 2001 From: Pavol Date: Wed, 17 Sep 2025 11:47:13 +0200 Subject: [PATCH 5/7] feat - enable the print code to pipeline function --- forloop_modules/function_handlers/variable_handlers.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/forloop_modules/function_handlers/variable_handlers.py b/forloop_modules/function_handlers/variable_handlers.py index c54f8e0..5772d76 100644 --- a/forloop_modules/function_handlers/variable_handlers.py +++ b/forloop_modules/function_handlers/variable_handlers.py @@ -1161,16 +1161,17 @@ class PrintVariableHandler(AbstractFunctionHandler): """ def __init__(self): - self.is_disabled = True # FIXME: No FE prepared for the user --> for the user it seems broken + super().__init__() + self.is_disabled = False # FIXME: No FE prepared for the user --> for the user it seems broken self.icon_type = "PrintVariable" self.fn_name = "Print Variable" - + self.code_import_patterns = ["print"] # Enable recognition from Python code + self.type_category = ntcm.categories.variable self.docs_category = DocsCategories.control self._init_docs() - super().__init__() - + def _init_docs(self): self.docs = Docs(description=self.__doc__) self.docs.add_parameter_table_row(title="Variable name", name="variable_name", From cec10f06d75ba7f795437b9011cb2f6849e91863 Mon Sep 17 00:00:00 2001 From: Pavol Date: Wed, 17 Sep 2025 12:11:24 +0200 Subject: [PATCH 6/7] fix - complex run function for running in LEGB rule --- .../function_handlers/mapping_handlers.py | 429 ++++++++++++++---- 1 file changed, 347 insertions(+), 82 deletions(-) diff --git a/forloop_modules/function_handlers/mapping_handlers.py b/forloop_modules/function_handlers/mapping_handlers.py index 4692d74..54cf5bb 100644 --- a/forloop_modules/function_handlers/mapping_handlers.py +++ b/forloop_modules/function_handlers/mapping_handlers.py @@ -1,5 +1,10 @@ import inspect import ast +import re +import types +import threading +import sys +import hashlib import forloop_modules.flog as flog import forloop_modules.queries.node_context_requests_backend as ncrb @@ -77,15 +82,122 @@ def define_function_variable(code, function_name, imports=[]): fce.exec_code(code, globals(), d) func = d[function_name] defined_functions_dict[function_name] = {"function": func, "imports": imports, "code": code} - - variable_handler.new_variable(function_name, func, additional_params={"code": code}) + + address = hex(id(func)) + variable_handler.new_variable(f"{function_name}_address", address) + #variable_handler.update_data_in_variable_explorer(glc) flog.warning(f'New function defined: "{function_name}"') else: flog.error("Function name can't be of type None.") + +def _coerce_literal(v): + """Coerce a string to a Python literal when safe; otherwise return as-is.""" + if isinstance(v, str): + try: + return ast.literal_eval(v) + except Exception: + return v + return v + +def _get_meta_for_function(func_name: str): + """ + Return the stored meta for func_name. If not in defined_functions_dict, + try to get code from variable_handler additional_params. Return (imports, code). + """ + meta = defined_functions_dict.get(func_name) + if meta and "code" in meta: + imports = list(set(meta.get("imports", []))) + code = meta["code"] + return imports, code + + # Fallback: look in variable_handler additional_params (DefineFunctionHandler stores 'code') + var = getattr(variable_handler, "variables", {}).get(func_name) + if var and isinstance(getattr(var, "additional_params", None), dict): + code = var.additional_params.get("code") + if code: + return [], code + + raise SoftPipelineError( + f"No stored code found for function '{func_name}'. " + "Make sure it was created via DefineFunctionHandler so its code is available." + ) + +def push_pipeline_global(name, value): + """Mirror a variable update into the pipeline exec env when in pipeline mode.""" + try: + rf = mapping_handlers_dict['RunFunction'] + if getattr(rf, "_env_mode", None) == "pipeline" and rf._pipeline_env: + rf._pipeline_env.env[name] = value + except Exception as e: + flog.warning(f"push_pipeline_global failed for {name}: {e}") + ### +class _ExecEnvManager: + """ + Shared module-like environment for exec() so functions see globals via LEGB. + - Persists across calls (like a real module). + - Thread-safe exec. + - Lets you sync in globals from variable_handler and/or any dict (e.g., globals()). + - Caches code by hash to avoid redefining when source didn't change. + """ + def __init__(self, name: str = "_forloop_runtime"): + self._lock = threading.RLock() + self._module = types.ModuleType(name) + self.env = self._module.__dict__ + + # make it module-like + self.env["__builtins__"] = __builtins__ + self.env["__name__"] = name + self.env["__package__"] = None + sys.modules[name] = self._module + + # func_name -> sha256(total_code) + self._code_hash: dict[str, str] = {} + + def _should_copy(self, name: str, val) -> bool: + if not name or name.startswith("_"): + return False + if inspect.isfunction(val) or inspect.ismethod(val): + return False + if isinstance(val, types.ModuleType): + return False + if isinstance(val, type): + return False + return True + + def sync_from_variable_handler(self, overwrite: bool = False): + try: + for v in variable_handler.variables.values(): + name, val = v.name, v.value + if val is None: + continue + if not self._should_copy(name, val): + continue + if overwrite or name not in self.env: + self.env[name] = val + except Exception as e: + flog.warning(f"[ExecEnv] variable_handler sync failed: {e}") + + def sync_from_dict(self, d: dict, overwrite: bool = False): + for name, val in d.items(): + if not self._should_copy(name, val): + continue + if overwrite or name not in self.env: + self.env[name] = val + + def define_or_reuse(self, func_name: str, total_code: str): + """Exec total_code if it's new/changed; otherwise reuse existing def.""" + h = hashlib.sha256(total_code.encode("utf-8")).hexdigest() + with self._lock: + if self._code_hash.get(func_name) != h or func_name not in self.env: + exec(total_code, self.env, self.env) + self._code_hash[func_name] = h + return self.env.get(func_name) + + class ApplyMappingHandler(AbstractFunctionHandler): def __init__(self): @@ -281,7 +393,8 @@ def direct_execute(self, code_label): imports = find_imports_in_custom_code(code) - code = init_line + "\n" + 4 * " " + "\n ".join(imports) + "\n\n" + code + indented_imports = "\n".join(" " + line for line in imports) + code = f"{init_line}\n{indented_imports}\n\n{code}" define_function_variable(code, func_name, imports) @@ -369,13 +482,18 @@ def export_imports(self, node_detail_form): class RunFunctionHandler(AbstractFunctionHandler): def __init__(self): - self.is_disabled = True + self.is_disabled = False self.icon_type = 'RunFunction' self.fn_name = 'Run Function' self.type_category = ntcm.categories.mapping self.docs_category = DocsCategories.control + # env management + self._exec_env = _ExecEnvManager() + self._env_mode = "fresh" # "fresh" (click-run) | "pipeline" (RunJob) + self._pipeline_env: _ExecEnvManager | None = None + def make_form_dict_list(self, *args, node_detail_form=None): fdl = FormDictList() @@ -388,124 +506,271 @@ def make_form_dict_list(self, *args, node_detail_form=None): return fdl - def execute(self, node_detail_form): - selected_function = node_detail_form.get_chosen_value_by_name("selected_function", variable_handler) - new_var_name = node_detail_form.get_chosen_value_by_name("new_var_name", variable_handler) + # Switch to fresh-per-call behaviour (click-run) + def use_fresh_env(self): + self._env_mode = "fresh" + self._pipeline_env = None - func_args = inspect.getfullargspec(selected_function).args + # Switch to persistent env for the whole job (RunJob) + def use_pipeline_env(self, env: _ExecEnvManager): + self._env_mode = "pipeline" + self._pipeline_env = env - args_list = [] - for arg in func_args: - arg_value = node_detail_form.get_chosen_value_by_name(arg, variable_handler) - args_list.append(arg_value) + def _parse_function_call_string(self, s): + """ + Accepts: "square(5)", "square(5, y=10)", "square", " square ( x ) " + Returns: (name, pos_args_as_strings, kwargs_as_strings_dict), or (None, None, None) on failure. + """ + s = s.strip() + try: + node = ast.parse(s, mode='eval').body + except Exception as e: + flog.error(f"[RunFunction] _parse_function_call_string parse error for {s!r}: {e}") + return None, None, None - self.direct_execute(selected_function, new_var_name, args_list) + # example: square(5, y=10) + if isinstance(node, ast.Call) and isinstance(node.func, ast.Name): + name = node.func.id + pos = [ast.unparse(a) for a in node.args] + kwargs = {kw.arg: ast.unparse(kw.value) for kw in node.keywords if kw.arg} + return name, pos, kwargs - def execute_with_params(self, params, node_detail_form=None): + # plain name: square + if isinstance(node, ast.Name): + return node.id, [], {} - selected_function = params["selected_function"] - new_var_name = params["new_var_name"] + flog.warning(f"[RunFunction] _parse_function_call_string unsupported AST: {ast.dump(node)}") + return None, None, None - func_args = inspect.getfullargspec(selected_function).args + def _resolve_selected_function(self, selected_function_raw): + # Log selection type at info level without dumping full content - args_list = [] - for arg in func_args: - arg_value = node_detail_form.get_chosen_value_by_name(arg, variable_handler) - args_list.append(arg_value) + if callable(selected_function_raw): + return selected_function_raw, [], {} - self.direct_execute(selected_function, new_var_name, args_list) + if isinstance(selected_function_raw, str): + name, inline_pos, inline_kwargs = self._parse_function_call_string(selected_function_raw) + if not name: + msg = f'Could not parse function selector: {selected_function_raw!r}' + flog.error(msg) + raise SoftPipelineError(msg) - def direct_execute(self, selected_function, new_var_name): - - # HACK: Disable the execution of the node with some feedback for a user until we implement security checks - raise SoftPipelineError("Execution of this node is temporarily disabled.") + # Try variable_handler + try: + var_obj = getattr(variable_handler, "variables", {}).get(name) + except Exception: + var_obj = None + + if var_obj is not None: + fn = getattr(var_obj, "value", None) + if callable(fn): + return fn, inline_pos, inline_kwargs + + # Try registry + meta = defined_functions_dict.get(name) + if meta: + fn = meta.get("function") + if callable(fn): + flog.info("[RunFunction] Resolved callable from defined_functions_dict.") + return fn, inline_pos, inline_kwargs + + # Not found + avail = sorted( + list(defined_functions_dict.keys()) + + [k for k, v in getattr(variable_handler, "variables", {}).items() if callable(getattr(v, "value", None))] + ) + msg = (f'Selected function "{selected_function_raw}" not found as a callable.\n' + f'Available functions: {avail}') + flog.error(msg) + raise SoftPipelineError(msg) + + msg = f"Unsupported function selector type: {type(selected_function_raw).__name__}" + flog.error(msg) + raise SoftPipelineError(msg) - args = [] - - imports = defined_functions_dict[selected_function.__name__]["imports"] - imports = list(set(imports)) + def execute(self, node_detail_form): - code = defined_functions_dict[selected_function.__name__]["code"] - variables_code = self.find_variables_used_in_function(code, args) - - total_code = "\n".join(imports) + "\n" + variables_code + code - fce.exec_code(total_code, globals(), locals()) + selected_function_raw = node_detail_form.get_chosen_value_by_name("selected_function", variable_handler) + new_var_name = node_detail_form.get_chosen_value_by_name("new_var_name", variable_handler) - return_value = None + selected_function, inline_pos, inline_kwargs = self._resolve_selected_function(selected_function_raw) - for i, arg in enumerate(args): - try: - args[i] = ast.literal_eval(arg) - except Exception as e: - flog.warning(e) - pass + # Inspect signature + try: + full = inspect.getfullargspec(selected_function) + func_args = full.args + kwonly = full.kwonlyargs or [] + except Exception as e: + flog.error(f"[RunFunction] getfullargspec failed: {e}") + raise + # If user typed inline args (e.g., "square(5)") – use those. + if inline_pos or inline_kwargs: + args_list = inline_pos[:] # strings + kwargs_dict = inline_kwargs.copy() + else: + args_list = [] + for arg in func_args: + val = node_detail_form.get_chosen_value_by_name(arg, variable_handler) + args_list.append(val) + + kwargs_dict = {} + for k in kwonly: + val = node_detail_form.get_chosen_value_by_name(k, variable_handler) + kwargs_dict[k] = val + + + self.direct_execute(selected_function, new_var_name, args_list=args_list, kwargs_dict=kwargs_dict) + + def direct_execute(self, selected_function, new_var_name, args_list=None, kwargs_dict=None): + if args_list is None: + args_list = [] + if kwargs_dict is None: + kwargs_dict = {} + + # Allow string selector too + if not callable(selected_function): + resolved_fn, inline_pos, inline_kwargs = self._resolve_selected_function(selected_function) + selected_function = resolved_fn + if not args_list: + args_list = inline_pos or [] + if not kwargs_dict: + kwargs_dict = inline_kwargs or {} + + func_name = selected_function.__name__ + imports, code = _get_meta_for_function(func_name) + + if self._env_mode == "fresh": + # CLICK-RUN: re-run from scratch each time (inject referenced globals) + variables_code = self.find_variables_used_in_function( + code, + list(args_list) + list(kwargs_dict.values()), + func_name=func_name, + ) + total_code = (("\n".join(imports) + "\n") if imports else "") + variables_code + code + env = {"__builtins__": __builtins__, "__name__": "__main__", "__package__": None} + fce.exec_code(total_code, env, env) + func = env.get(func_name) + + elif self._env_mode == "pipeline": + # RUNJOB: single shared module env across nodes (NO per-node injection) + if not self._pipeline_env: + raise SoftPipelineError("Pipeline env not initialized. Call use_pipeline_env() before RunJob.") + total_code = (("\n".join(imports) + "\n") if imports else "") + code + func = self._pipeline_env.define_or_reuse(func_name, total_code) - if args: - return_value = selected_function(*args) else: - return_value = selected_function() + raise SoftPipelineError(f"Unknown env mode: {self._env_mode}") + + if func is None: + raise SoftPipelineError("Function not found after exec. Check stored code/imports.") + + coerced_pos = [_coerce_literal(a) for a in args_list] + coerced_kwargs = {k: _coerce_literal(v) for k, v in kwargs_dict.items()} + return_value = func(*coerced_pos, **coerced_kwargs) if return_value is not None: if "," in new_var_name: - variables = new_var_name.split(",") - for i, variable in enumerate(variables): - variable = variable.strip() - variable_handler.new_variable(variable, return_value[i]) + names = [n.strip() for n in new_var_name.split(",")] + for i, n in enumerate(names): + variable_handler.new_variable(n, return_value[i]) else: variable_handler.new_variable(new_var_name, return_value) - #variable_handler.update_data_in_variable_explorer(glc) - def find_variables_used_in_function(self, function_code: str, args_list: list) -> str: - variables_code = "" - for variable in variable_handler.variables.values(): - if not inspect.isfunction(variable.value): - if variable.name in args_list or variable.name in function_code: - print( - f"VAR NAME = {variable.name}, VAR VALUE = {variable.value}, VAR TYPE = {type(variable.value)}") - if type(variable.value) == str: - variables_code += f'{variable.name} = "{variable.value}"\n' - else: - variables_code += f'{variable.name} = {variable.value}\n' - return variables_code + def find_variables_used_in_function(self, function_code: str, args_list: list, func_name: str | None = None) -> str: + """ + Emit Python assignments for variables that the function body actually references, + while avoiding shadowing the function name and ensuring strings are safely repr()'d. + """ + lines = [] + + for v in variable_handler.variables.values(): + name = v.name + val = v.value + + # 1) Never shadow the function we are (re)defining + if func_name and name == func_name: + continue + + # 2) Skip functions (we don't inline callables) + if inspect.isfunction(val): + continue + + # 3) If value looks like raw source code of a function, skip (defense) + if isinstance(val, str) and val.lstrip().startswith(("def ", "lambda ")): + continue + + # 4) Only inject when the identifier actually appears in the function code + if not re.search(rf"\b{re.escape(name)}\b", function_code): + continue + + # 5) Serialize safely + try: + if isinstance(val, str): + lines.append(f"{name} = {val!r}") # repr -> escapes newlines + else: + lines.append(f"{name} = {repr(val)}") + except Exception as e: + flog.warning(f"Skipping var {name}: not serializable ({e})") + continue + + return ("\n".join(lines) + ("\n" if lines else "")) - def export_code(self, node_detail_form): - selected_function = node_detail_form.get_chosen_value_by_name("selected_function", variable_handler) + + def export_code(self, node_detail_form): + raw = node_detail_form.get_chosen_value_by_name("selected_function", variable_handler) new_var_name = node_detail_form.get_chosen_value_by_name("new_var_name", variable_handler) - function_code = defined_functions_dict[selected_function.__name__]["code"] - func_args = inspect.getfullargspec(selected_function).args - arg_values = {} - for arg in func_args: - arg_values[arg] = node_detail_form.get_chosen_value_by_name(arg, variable_handler) + if callable(raw): + fn = raw + func_name = raw.__name__ + else: + name, _pos, _kwargs = self._parse_function_call_string(raw) + if not name: + raise SoftPipelineError("Could not parse function selector for export.") + func_name = name + # Try to resolve callable for arg introspection; optional for export + var_obj = getattr(variable_handler, "variables", {}).get(name) + if var_obj and callable(getattr(var_obj, "value", None)): + fn = var_obj.value + else: + meta = defined_functions_dict.get(name) + fn = meta.get("function") if meta else None - args_code = "" - for arg_name, arg_value in arg_values.items(): - args_code += f'{arg_name} = {arg_value},' + imports, function_code = _get_meta_for_function(func_name) - args_code = args_code[:-1] # removing the last ',' + func_args = inspect.getfullargspec(fn).args if callable(fn) else [] + arg_values = {arg: node_detail_form.get_chosen_value_by_name(arg, variable_handler) for arg in func_args} - args_list = list(arg_values.values()) - variables_code = self.find_variables_used_in_function(function_code, args_list) + args_code = ",".join(f"{k} = {v}" for k, v in arg_values.items()) + variables_code = self.find_variables_used_in_function(function_code, list(arg_values.values()), func_name=func_name) + call_code = f"{func_name}({args_code})" if new_var_name and not new_var_name.isspace(): - code = variables_code + "\n" + f"{new_var_name} = {selected_function.__name__}({args_code})" + return variables_code + "\n" + f"{new_var_name} = {call_code}" else: - code = variables_code + "\n" + f"{selected_function.__name__}({args_code})" - - return code + return variables_code + "\n" + call_code def export_imports(self, node_detail_form): + raw = node_detail_form.get_chosen_value_by_name("selected_function", variable_handler) + if callable(raw): + name = raw.__name__ + else: + name, _pos, _kwargs = self._parse_function_call_string(raw) + if not name: + return [] - selected_function = node_detail_form.get_chosen_value_by_name("selected_function", variable_handler) - code = defined_functions_dict[selected_function.__name__]["code"] - imports = find_imports_in_custom_code(code) + try: + _imports, code = _get_meta_for_function(name) + except SoftPipelineError: + return [] + imports = find_imports_in_custom_code(code) return imports def rebuild_icon_item_detail_form(self, image, last_loaded_function): From b86ca232e2841eb76054a56c8a392e56ef744ca1 Mon Sep 17 00:00:00 2001 From: Pavol Date: Wed, 17 Sep 2025 18:12:16 +0200 Subject: [PATCH 7/7] fix - run pipeline job --- .../function_handlers/mapping_handlers.py | 172 +++++++++++++++--- 1 file changed, 151 insertions(+), 21 deletions(-) diff --git a/forloop_modules/function_handlers/mapping_handlers.py b/forloop_modules/function_handlers/mapping_handlers.py index 54cf5bb..b8351e5 100644 --- a/forloop_modules/function_handlers/mapping_handlers.py +++ b/forloop_modules/function_handlers/mapping_handlers.py @@ -83,8 +83,14 @@ def define_function_variable(code, function_name, imports=[]): func = d[function_name] defined_functions_dict[function_name] = {"function": func, "imports": imports, "code": code} - address = hex(id(func)) - variable_handler.new_variable(f"{function_name}_address", address) + address = hex(id(func)) + # Try to create variable in variable explorer, but don't fail if server is not available + try: + variable_handler.new_variable(f"{function_name}_address", address) + except Exception as e: + # If server is not available, just log a warning and continue + flog.warning(f'Could not create variable in variable explorer: {e}') + flog.warning(f'Function "{function_name}" defined in memory only.') #variable_handler.update_data_in_variable_explorer(glc) flog.warning(f'New function defined: "{function_name}"') @@ -327,12 +333,15 @@ def export_imports(self, node_detail_form): class DefineFunctionHandler(AbstractFunctionHandler): def __init__(self): - self.is_disabled = True # FIXME: Needs refactor - better UX, check functionality, solve how to store functions + self.is_disabled = False # Enable for pipeline mode support self.icon_type = 'DefineFunction' self.fn_name = 'Define Function' self.type_category = ntcm.categories.mapping self.docs_category = DocsCategories.control + + # Pipeline mode support + self._pipeline_env: _ExecEnvManager | None = None def make_form_dict_list(self, *args, node_detail_form=None): # TODO: show name, args, arg types if any, return @@ -379,6 +388,30 @@ def execute_with_params(self, params): self.direct_execute(code_label) + def make_flpl_node_dict(self, line_dict: dict) -> dict: + """ + Creates a DefineFunction node dict from parsed code line_dict. + + For function definition like 'def square(x): return x * x', line_dict contains: + - arguments: ["def square(x):\n return x * x\n"] + - function: None + - new_var: None + """ + node = {"type": self.icon_type, "params": {}} + + # Extract function code from arguments + args = line_dict.get("arguments") or [] + code_label = args[0] if len(args) > 0 else "" + + # Set the node parameters in the expected format + node["params"]["code_label"] = {"variable": None, "value": code_label} + + return node + + def use_pipeline_env(self, env: _ExecEnvManager): + """Set the pipeline environment for function definition.""" + self._pipeline_env = env + def direct_execute(self, code_label): func_name = get_function_name_from_code(code_label) @@ -396,7 +429,14 @@ def direct_execute(self, code_label): indented_imports = "\n".join(" " + line for line in imports) code = f"{init_line}\n{indented_imports}\n\n{code}" + # Always define function in global scope for variable explorer define_function_variable(code, func_name, imports) + + # If pipeline environment is available, also define function there + if self._pipeline_env: + # Define function in pipeline environment + self._pipeline_env.define_or_reuse(func_name, code) + flog.info(f"[DefineFunction] Function '{func_name}' defined in pipeline environment.") def export_code(self, node_detail_form): @@ -506,6 +546,30 @@ def make_form_dict_list(self, *args, node_detail_form=None): return fdl + def make_flpl_node_dict(self, line_dict: dict) -> dict: + """ + Creates a RunFunction node dict from parsed code line_dict. + + For assignment like 'square_res = square(5)', line_dict contains: + - new_var: "square_res" + - function: "square(5)" (reconstructed call string) + - arguments: ["5"] + - instance_var: None + """ + node = {"type": self.icon_type, "params": {}} + + # Extract function call string from function field + function_call = line_dict.get("function") or "" + + # Extract new variable name from new_var field + new_var_name = line_dict.get("new_var") or "" + + # Set the node parameters in the expected format + node["params"]["selected_function"] = {"variable": None, "value": function_call} + node["params"]["new_var_name"] = {"variable": None, "value": new_var_name} + + return node + # Switch to fresh-per-call behaviour (click-run) def use_fresh_env(self): self._env_mode = "fresh" @@ -574,11 +638,23 @@ def _resolve_selected_function(self, selected_function_raw): flog.info("[RunFunction] Resolved callable from defined_functions_dict.") return fn, inline_pos, inline_kwargs + # Try pipeline environment (in pipeline mode) + if self._env_mode == "pipeline" and self._pipeline_env: + fn = self._pipeline_env.env.get(name) + if callable(fn): + flog.info("[RunFunction] Resolved callable from pipeline environment.") + return fn, inline_pos, inline_kwargs + # Not found avail = sorted( list(defined_functions_dict.keys()) + [k for k, v in getattr(variable_handler, "variables", {}).items() if callable(getattr(v, "value", None))] ) + if self._env_mode == "pipeline" and self._pipeline_env: + pipeline_functions = [k for k, v in self._pipeline_env.env.items() if callable(v)] + avail.extend(pipeline_functions) + avail = sorted(set(avail)) + msg = (f'Selected function "{selected_function_raw}" not found as a callable.\n' f'Available functions: {avail}') flog.error(msg) @@ -609,6 +685,7 @@ def execute(self, node_detail_form): args_list = inline_pos[:] # strings kwargs_dict = inline_kwargs.copy() else: + # Get arguments from form (manual input mode) args_list = [] for arg in func_args: val = node_detail_form.get_chosen_value_by_name(arg, variable_handler) @@ -617,7 +694,9 @@ def execute(self, node_detail_form): kwargs_dict = {} for k in kwonly: val = node_detail_form.get_chosen_value_by_name(k, variable_handler) - kwargs_dict[k] = val + # Only add to kwargs_dict if the value is not None/empty + if val is not None and val != "": + kwargs_dict[k] = val self.direct_execute(selected_function, new_var_name, args_list=args_list, kwargs_dict=kwargs_dict) @@ -638,26 +717,35 @@ def direct_execute(self, selected_function, new_var_name, args_list=None, kwargs kwargs_dict = inline_kwargs or {} func_name = selected_function.__name__ - imports, code = _get_meta_for_function(func_name) - + if self._env_mode == "fresh": - # CLICK-RUN: re-run from scratch each time (inject referenced globals) - variables_code = self.find_variables_used_in_function( - code, - list(args_list) + list(kwargs_dict.values()), - func_name=func_name, - ) - total_code = (("\n".join(imports) + "\n") if imports else "") + variables_code + code - env = {"__builtins__": __builtins__, "__name__": "__main__", "__package__": None} - fce.exec_code(total_code, env, env) - func = env.get(func_name) + # Check if we have a direct function object or need to resolve from stored code + try: + # Try to get stored code for the function + imports, code = _get_meta_for_function(func_name) + variables_code = self.find_variables_used_in_function( + code, + list(args_list) + list(kwargs_dict.values()), + func_name=func_name, + ) + total_code = (("\n".join(imports) + "\n") if imports else "") + variables_code + code + env = {"__builtins__": __builtins__, "__name__": "__main__", "__package__": None} + + # Include global variables in the execution environment + env.update(globals()) + + fce.exec_code(total_code, env, env) + func = env.get(func_name) + except SoftPipelineError: + # No stored code found, use the function object directly + func = selected_function elif self._env_mode == "pipeline": - # RUNJOB: single shared module env across nodes (NO per-node injection) + # RUNJOB: single shared module env across nodes (function already defined) if not self._pipeline_env: raise SoftPipelineError("Pipeline env not initialized. Call use_pipeline_env() before RunJob.") - total_code = (("\n".join(imports) + "\n") if imports else "") + code - func = self._pipeline_env.define_or_reuse(func_name, total_code) + # Function is already defined in the pipeline environment, just get it + func = self._pipeline_env.env.get(func_name) else: raise SoftPipelineError(f"Unknown env mode: {self._env_mode}") @@ -665,8 +753,50 @@ def direct_execute(self, selected_function, new_var_name, args_list=None, kwargs if func is None: raise SoftPipelineError("Function not found after exec. Check stored code/imports.") - coerced_pos = [_coerce_literal(a) for a in args_list] - coerced_kwargs = {k: _coerce_literal(v) for k, v in kwargs_dict.items()} + # Evaluate arguments based on environment mode + if self._env_mode == "pipeline": + # Evaluate arguments in the pipeline environment + coerced_pos = [] + for a in args_list: + try: + # Try to evaluate the argument in the pipeline environment + evaluated_arg = eval(a, self._pipeline_env.env, self._pipeline_env.env) + coerced_pos.append(evaluated_arg) + except Exception: + # Fall back to literal coercion if evaluation fails + coerced_pos.append(_coerce_literal(a)) + + coerced_kwargs = {} + for k, v in kwargs_dict.items(): + try: + # Try to evaluate the argument in the pipeline environment + evaluated_arg = eval(v, self._pipeline_env.env, self._pipeline_env.env) + coerced_kwargs[k] = evaluated_arg + except Exception: + # Fall back to literal coercion if evaluation fails + coerced_kwargs[k] = _coerce_literal(v) + else: + # Fresh mode: evaluate arguments in the fresh environment context + coerced_pos = [] + for a in args_list: + try: + # Try to evaluate the argument in the fresh environment + evaluated_arg = eval(a, env, env) + coerced_pos.append(evaluated_arg) + except Exception: + # Fall back to literal coercion if evaluation fails + coerced_pos.append(_coerce_literal(a)) + + coerced_kwargs = {} + for k, v in kwargs_dict.items(): + try: + # Try to evaluate the argument in the fresh environment + evaluated_arg = eval(v, env, env) + coerced_kwargs[k] = evaluated_arg + except Exception: + # Fall back to literal coercion if evaluation fails + coerced_kwargs[k] = _coerce_literal(v) + return_value = func(*coerced_pos, **coerced_kwargs) if return_value is not None: