From b84af8a91e998ddba1e4039549a2a3ad47fddc6e Mon Sep 17 00:00:00 2001 From: Humberto Yusta Date: Thu, 5 Feb 2026 23:28:23 +0100 Subject: [PATCH 1/3] guidelines and better validation, including fields of record add/edit validation --- .../integrations/fi_crm_automations.py | 71 ++++++++++--------- 1 file changed, 38 insertions(+), 33 deletions(-) diff --git a/flexus_client_kit/integrations/fi_crm_automations.py b/flexus_client_kit/integrations/fi_crm_automations.py index 8475f2ed..b7ef37fc 100644 --- a/flexus_client_kit/integrations/fi_crm_automations.py +++ b/flexus_client_kit/integrations/fi_crm_automations.py @@ -242,6 +242,7 @@ ## Important Notes +- Before creating automations, check `erp_table_meta(table_name="...")` for available fields and required fields - Actions execute in sequence - Failed actions are logged but don't stop subsequent actions - Triggers fire IMMEDIATELY when the event happens. Time-based filters check conditions at that moment, they don't delay execution @@ -575,10 +576,26 @@ def get_automation_warnings(automation_config: Dict[str, Any]) -> List[str]: return warnings +_VALID_AUTOMATION_FIELDS = {"enabled", "triggers", "actions"} +_VALID_TRIGGER_FIELDS = {"type", "table", "operations", "filters"} + +# (required, optional) per action type -- "type" is always implicitly valid +_ACTION_SCHEMAS = { + "post_task_into_bot_inbox": ({"title"}, {"details", "provenance", "fexp_name", "comingup_ts"}), + "create_erp_record": ({"table", "fields"}, set()), + "update_erp_record": ({"table", "record_id", "fields"}, set()), + "delete_erp_record": ({"table", "record_id"}, set()), + "move_deal_stage": ({"contact_id", "pipeline_id", "from_stages", "to_stage_id"}, set()), +} + + def validate_automation_config(automation_config: Dict[str, Any], available_erp_tables: List[str] = []) -> Optional[str]: if not isinstance(automation_config, dict): return "❌ automation_config must be a dict" + if unknown := set(automation_config.keys()) - _VALID_AUTOMATION_FIELDS: + return f"❌ Unknown automation fields: {', '.join(sorted(unknown))}. Valid: {', '.join(sorted(_VALID_AUTOMATION_FIELDS))}" + triggers = automation_config.get("triggers") if not triggers: return "❌ Missing required field 'triggers' (must be a non-empty list)" @@ -588,6 +605,8 @@ def validate_automation_config(automation_config: Dict[str, Any], available_erp_ for i, trigger in enumerate(triggers): if not isinstance(trigger, dict): return f"❌ triggers[{i}] must be a dict" + if unknown := set(trigger.keys()) - _VALID_TRIGGER_FIELDS: + return f"❌ triggers[{i}] has unknown fields: {', '.join(sorted(unknown))}. Valid: {', '.join(sorted(_VALID_TRIGGER_FIELDS))}" if trigger.get("type") != "erp_table": return f"❌ triggers[{i}].type must be 'erp_table' (got {trigger.get('type')})" if "table" not in trigger: @@ -609,38 +628,24 @@ def validate_automation_config(automation_config: Dict[str, Any], available_erp_ if not isinstance(action, dict): return f"❌ actions[{i}] must be a dict" action_type = action.get("type") - if action_type == "post_task_into_bot_inbox": - if "title" not in action: - return f"❌ actions[{i}] (post_task_into_bot_inbox) missing required field 'title'" - elif action_type == "create_erp_record": - if "table" not in action: - return f"❌ actions[{i}] (create_erp_record) missing required field 'table'" - if "fields" not in action: - return f"❌ actions[{i}] (create_erp_record) missing required field 'fields'" - elif action_type == "update_erp_record": - if "table" not in action: - return f"❌ actions[{i}] (update_erp_record) missing required field 'table'" - if "record_id" not in action: - return f"❌ actions[{i}] (update_erp_record) missing required field 'record_id'" - if "fields" not in action: - return f"❌ actions[{i}] (update_erp_record) missing required field 'fields'" - elif action_type == "delete_erp_record": - if "table" not in action: - return f"❌ actions[{i}] (delete_erp_record) missing required field 'table'" - if "record_id" not in action: - return f"❌ actions[{i}] (delete_erp_record) missing required field 'record_id'" - elif action_type == "move_deal_stage": - if "contact_id" not in action: - return f"❌ actions[{i}] (move_deal_stage) missing required field 'contact_id'" - if "pipeline_id" not in action: - return f"❌ actions[{i}] (move_deal_stage) missing required field 'pipeline_id'" - if "from_stages" not in action: - return f"❌ actions[{i}] (move_deal_stage) missing required field 'from_stages'" - if not isinstance(action.get("from_stages"), list): - return f"❌ actions[{i}] (move_deal_stage) 'from_stages' must be an array of stage IDs" - if "to_stage_id" not in action: - return f"❌ actions[{i}] (move_deal_stage) missing required field 'to_stage_id'" - else: - return f"❌ actions[{i}].type must be 'post_task_into_bot_inbox', 'create_erp_record', 'update_erp_record', 'delete_erp_record', or 'move_deal_stage' (got {action_type})" + schema = _ACTION_SCHEMAS.get(action_type) + if not schema: + return f"❌ actions[{i}].type must be one of: {', '.join(sorted(_ACTION_SCHEMAS))} (got {action_type})" + required, optional = schema + valid_fields = required | optional | {"type"} + if unknown := set(action.keys()) - valid_fields: + return f"❌ actions[{i}] ({action_type}) has unknown fields: {', '.join(sorted(unknown))}. Valid: {', '.join(sorted(valid_fields))}" + if missing := required - set(action.keys()): + return f"❌ actions[{i}] ({action_type}) missing required fields: {', '.join(sorted(missing))}" + if action_type == "move_deal_stage" and not isinstance(action.get("from_stages"), list): + return f"❌ actions[{i}] (move_deal_stage) 'from_stages' must be an array of stage IDs" + if action_type in ("create_erp_record", "update_erp_record") and action.get("table") in erp_schema.ERP_TABLE_TO_SCHEMA: + cls = erp_schema.ERP_TABLE_TO_SCHEMA[action["table"]] + action_fields = set(action.get("fields", {}).keys()) + if bad := action_fields - set(cls.__dataclass_fields__.keys()): + return f"❌ actions[{i}] ({action_type}) unknown columns for table '{action['table']}': {', '.join(sorted(bad))}" + if action_type == "create_erp_record": + if missing := set(erp_schema.get_required_fields(cls)) - action_fields: + return f"❌ actions[{i}] ({action_type}) missing required columns for table '{action['table']}': {', '.join(sorted(missing))}" return None From e2ec5c97e7695cf52cddd612f076e6df3c84ba39 Mon Sep 17 00:00:00 2001 From: Humberto Yusta Date: Thu, 5 Feb 2026 23:28:35 +0100 Subject: [PATCH 2/3] prompt awareness --- flexus_simple_bots/vix/vix_prompts.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/flexus_simple_bots/vix/vix_prompts.py b/flexus_simple_bots/vix/vix_prompts.py index 51652def..4a87f2c4 100644 --- a/flexus_simple_bots/vix/vix_prompts.py +++ b/flexus_simple_bots/vix/vix_prompts.py @@ -997,6 +997,16 @@ - Direct and professional, friendly but efficient - Always looking to help grow the contact base and nurture leads +## Experts + +You are Vix (marketing expert) - CRM, pipeline, automations, setup. + +Other experts: +- **sales** - live C.L.O.S.E.R. conversations with prospects +- **nurturing** - internal, for automated templated tasks + +If user wants to test sales flow or roleplay as customer → suggest starting a new chat with the **sales** expert. + Responsibilities: - Configure company info, products, and sales strategy - Monitor CRM contacts and tasks From b23a193892679779d7678cb6d9e2d185d587a53e Mon Sep 17 00:00:00 2001 From: Humberto Yusta Date: Thu, 5 Feb 2026 23:30:06 +0100 Subject: [PATCH 3/3] erp changes as dict and recently fired dedup to avoid unwanted loops or repeated automation execution --- flexus_client_kit/ckit_bot_exec.py | 6 ++-- .../integrations/fi_crm_automations.py | 33 ++++++++++++------- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/flexus_client_kit/ckit_bot_exec.py b/flexus_client_kit/ckit_bot_exec.py index e22cfcc7..c96e5f74 100644 --- a/flexus_client_kit/ckit_bot_exec.py +++ b/flexus_client_kit/ckit_bot_exec.py @@ -79,7 +79,7 @@ def __init__(self, fclient: ckit_client.FlexusClient, p: ckit_bot_query.FPersona self._parked_threads: Dict[str, ckit_ask_model.FThreadOutput] = {} self._parked_tasks: Dict[str, ckit_kanban.FPersonaKanbanTaskOutput] = {} self._parked_toolcalls: List[ckit_cloudtool.FCloudtoolCall] = [] - self._parked_erp_changes: List[tuple[str, str, Optional[Dict[str, Any]], Optional[Dict[str, Any]]]] = [] + self._parked_erp_changes: Dict[tuple, tuple[str, str, Optional[Dict[str, Any]], Optional[Dict[str, Any]]]] = {} self._parked_emessages: Dict[str, ckit_bot_query.FExternalMessageOutput] = {} self._parked_anything_new = asyncio.Event() # These fields are designed for direct access: @@ -161,7 +161,7 @@ async def unpark_collected_events(self, sleep_if_no_work: float, turn_tool_calls except Exception as e: logger.error("%s error in on_updated_task handler: %s\n%s", self.persona.persona_id, type(e).__name__, e, exc_info=e) - erp_changes = list(self._parked_erp_changes) + erp_changes = list(self._parked_erp_changes.values()) self._parked_erp_changes.clear() for table_name, action, new_record_dict, old_record_dict in erp_changes: did_anything = True @@ -528,7 +528,7 @@ async def subscribe_and_produce_callbacks( new_record = upd.news_payload_erp_record_new old_record = upd.news_payload_erp_record_old for bot in bc.bots_running.values(): - bot.instance_rcx._parked_erp_changes.append((table_name, upd.news_action, new_record, old_record)) + bot.instance_rcx._parked_erp_changes[(table_name, upd.news_payload_id)] = (table_name, upd.news_action, new_record, old_record) bot.instance_rcx._parked_anything_new.set() elif upd.news_about == "flexus_persona_external_message": diff --git a/flexus_client_kit/integrations/fi_crm_automations.py b/flexus_client_kit/integrations/fi_crm_automations.py index b7ef37fc..0f1b919b 100644 --- a/flexus_client_kit/integrations/fi_crm_automations.py +++ b/flexus_client_kit/integrations/fi_crm_automations.py @@ -1,3 +1,4 @@ +import collections import json import logging import re @@ -265,6 +266,7 @@ def __init__( self.rcx = rcx self.get_setup = get_setup_func self.available_erp_tables = available_erp_tables or [] + self._recently_fired = collections.OrderedDict() # (auto_name, record_id) -> timestamp self._setup_automation_handlers() def _load_automations(self) -> Dict[str, Any]: @@ -399,17 +401,16 @@ def _setup_automation_handlers(self): for t in cfg.get("triggers", []) if t.get("type") == "erp_table" and t.get("table")}) def make_handler(table_name): + pk_field = erp_schema.get_pkey_field(erp_schema.ERP_TABLE_TO_SCHEMA[table_name]) async def handler(operation: str, new_record: Any, old_record: Any): - automations_dict = self._load_automations() - if automations_dict: - await execute_automations_for_erp_event( - self.rcx, - table_name, - operation, - new_record, - old_record, - automations_dict, - ) + if not (automations_dict := self._load_automations()): + return + if not (rid := ckit_erp.dataclass_or_dict_to_dict(new_record or old_record).get(pk_field)): + return + await execute_automations_for_erp_event( + self.rcx, table_name, operation, new_record, old_record, + automations_dict, self._recently_fired, rid, + ) return handler for t in tables: @@ -423,7 +424,14 @@ async def execute_automations_for_erp_event( new_record: Optional[Any], old_record: Optional[Any], automations_dict: Dict[str, Any], + recently_fired: collections.OrderedDict, + record_id: str, ) -> None: + cutoff = time.time() - 60 + while recently_fired and next(iter(recently_fired.values())) < cutoff: + recently_fired.popitem(last=False) + + rec = ckit_erp.dataclass_or_dict_to_dict(old_record if operation.upper() == "DELETE" else new_record) if (new_record or old_record) else {} for auto_name, auto_config in automations_dict.items(): if not auto_config.get("enabled", True): continue @@ -433,10 +441,12 @@ async def execute_automations_for_erp_event( if operation.upper() not in [op.upper() for op in trigger.get("operations", [])]: continue if trigger_filters := trigger.get("filters", []): - rec = ckit_erp.dataclass_or_dict_to_dict(old_record if operation.upper() == "DELETE" else new_record) if not ckit_erp.check_record_matches_filters(rec, trigger_filters): logger.debug(f"Automation '{auto_name}' filtered out for {table_name}.{operation}") continue + if (auto_name, record_id) in recently_fired: + logger.debug(f"Automation '{auto_name}' skipped for {record_id}: recently fired") + continue ctx = {"trigger": { "type": "erp_table", "table": table_name, "operation": operation, @@ -444,6 +454,7 @@ async def execute_automations_for_erp_event( "old_record": ckit_erp.dataclass_or_dict_to_dict(old_record) if old_record else None, }} await _execute_actions(rcx, auto_config.get("actions", []), ctx) + recently_fired[(auto_name, record_id)] = time.time() logger.info(f"Automation '{auto_name}' executed for {table_name}.{operation}")