Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions flexus_client_kit/ckit_bot_exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def __init__(self, fclient: ckit_client.FlexusClient, p: ckit_bot_query.FPersona
self._parked_threads: Dict[str, ckit_ask_model.FThreadOutput] = {}
self._parked_tasks: Dict[str, ckit_kanban.FPersonaKanbanTaskOutput] = {}
self._parked_toolcalls: List[ckit_cloudtool.FCloudtoolCall] = []
self._parked_erp_changes: List[tuple[str, str, Optional[Dict[str, Any]], Optional[Dict[str, Any]]]] = []
self._parked_erp_changes: Dict[tuple, tuple[str, str, Optional[Dict[str, Any]], Optional[Dict[str, Any]]]] = {}
self._parked_emessages: Dict[str, ckit_bot_query.FExternalMessageOutput] = {}
self._parked_anything_new = asyncio.Event()
# These fields are designed for direct access:
Expand Down Expand Up @@ -161,7 +161,7 @@ async def unpark_collected_events(self, sleep_if_no_work: float, turn_tool_calls
except Exception as e:
logger.error("%s error in on_updated_task handler: %s\n%s", self.persona.persona_id, type(e).__name__, e, exc_info=e)

erp_changes = list(self._parked_erp_changes)
erp_changes = list(self._parked_erp_changes.values())
self._parked_erp_changes.clear()
for table_name, action, new_record_dict, old_record_dict in erp_changes:
did_anything = True
Expand Down Expand Up @@ -528,7 +528,7 @@ async def subscribe_and_produce_callbacks(
new_record = upd.news_payload_erp_record_new
old_record = upd.news_payload_erp_record_old
for bot in bc.bots_running.values():
bot.instance_rcx._parked_erp_changes.append((table_name, upd.news_action, new_record, old_record))
bot.instance_rcx._parked_erp_changes[(table_name, upd.news_payload_id)] = (table_name, upd.news_action, new_record, old_record)
bot.instance_rcx._parked_anything_new.set()

elif upd.news_about == "flexus_persona_external_message":
Expand Down
104 changes: 60 additions & 44 deletions flexus_client_kit/integrations/fi_crm_automations.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import collections
import json
import logging
import re
Expand Down Expand Up @@ -242,6 +243,7 @@

## Important Notes

- Before creating automations, check `erp_table_meta(table_name="...")` for available fields and required fields
- Actions execute in sequence
- Failed actions are logged but don't stop subsequent actions
- Triggers fire IMMEDIATELY when the event happens. Time-based filters check conditions at that moment, they don't delay execution
Expand All @@ -264,6 +266,7 @@ def __init__(
self.rcx = rcx
self.get_setup = get_setup_func
self.available_erp_tables = available_erp_tables or []
self._recently_fired = collections.OrderedDict() # (auto_name, record_id) -> timestamp
self._setup_automation_handlers()

def _load_automations(self) -> Dict[str, Any]:
Expand Down Expand Up @@ -398,17 +401,16 @@ def _setup_automation_handlers(self):
for t in cfg.get("triggers", []) if t.get("type") == "erp_table" and t.get("table")})

def make_handler(table_name):
pk_field = erp_schema.get_pkey_field(erp_schema.ERP_TABLE_TO_SCHEMA[table_name])
async def handler(operation: str, new_record: Any, old_record: Any):
automations_dict = self._load_automations()
if automations_dict:
await execute_automations_for_erp_event(
self.rcx,
table_name,
operation,
new_record,
old_record,
automations_dict,
)
if not (automations_dict := self._load_automations()):
return
if not (rid := ckit_erp.dataclass_or_dict_to_dict(new_record or old_record).get(pk_field)):
return
await execute_automations_for_erp_event(
self.rcx, table_name, operation, new_record, old_record,
automations_dict, self._recently_fired, rid,
)
return handler

for t in tables:
Expand All @@ -422,7 +424,14 @@ async def execute_automations_for_erp_event(
new_record: Optional[Any],
old_record: Optional[Any],
automations_dict: Dict[str, Any],
recently_fired: collections.OrderedDict,
record_id: str,
) -> None:
cutoff = time.time() - 60
while recently_fired and next(iter(recently_fired.values())) < cutoff:
recently_fired.popitem(last=False)

rec = ckit_erp.dataclass_or_dict_to_dict(old_record if operation.upper() == "DELETE" else new_record) if (new_record or old_record) else {}
for auto_name, auto_config in automations_dict.items():
if not auto_config.get("enabled", True):
continue
Expand All @@ -432,17 +441,20 @@ async def execute_automations_for_erp_event(
if operation.upper() not in [op.upper() for op in trigger.get("operations", [])]:
continue
if trigger_filters := trigger.get("filters", []):
rec = ckit_erp.dataclass_or_dict_to_dict(old_record if operation.upper() == "DELETE" else new_record)
if not ckit_erp.check_record_matches_filters(rec, trigger_filters):
logger.debug(f"Automation '{auto_name}' filtered out for {table_name}.{operation}")
continue
if (auto_name, record_id) in recently_fired:
logger.debug(f"Automation '{auto_name}' skipped for {record_id}: recently fired")
continue

ctx = {"trigger": {
"type": "erp_table", "table": table_name, "operation": operation,
"new_record": ckit_erp.dataclass_or_dict_to_dict(new_record) if new_record else None,
"old_record": ckit_erp.dataclass_or_dict_to_dict(old_record) if old_record else None,
}}
await _execute_actions(rcx, auto_config.get("actions", []), ctx)
recently_fired[(auto_name, record_id)] = time.time()
logger.info(f"Automation '{auto_name}' executed for {table_name}.{operation}")


Expand Down Expand Up @@ -575,10 +587,26 @@ def get_automation_warnings(automation_config: Dict[str, Any]) -> List[str]:
return warnings


_VALID_AUTOMATION_FIELDS = {"enabled", "triggers", "actions"}
_VALID_TRIGGER_FIELDS = {"type", "table", "operations", "filters"}

# (required, optional) per action type -- "type" is always implicitly valid
_ACTION_SCHEMAS = {
"post_task_into_bot_inbox": ({"title"}, {"details", "provenance", "fexp_name", "comingup_ts"}),
"create_erp_record": ({"table", "fields"}, set()),
"update_erp_record": ({"table", "record_id", "fields"}, set()),
"delete_erp_record": ({"table", "record_id"}, set()),
"move_deal_stage": ({"contact_id", "pipeline_id", "from_stages", "to_stage_id"}, set()),
}


def validate_automation_config(automation_config: Dict[str, Any], available_erp_tables: List[str] = []) -> Optional[str]:
if not isinstance(automation_config, dict):
return "❌ automation_config must be a dict"

if unknown := set(automation_config.keys()) - _VALID_AUTOMATION_FIELDS:
return f"❌ Unknown automation fields: {', '.join(sorted(unknown))}. Valid: {', '.join(sorted(_VALID_AUTOMATION_FIELDS))}"

triggers = automation_config.get("triggers")
if not triggers:
return "❌ Missing required field 'triggers' (must be a non-empty list)"
Expand All @@ -588,6 +616,8 @@ def validate_automation_config(automation_config: Dict[str, Any], available_erp_
for i, trigger in enumerate(triggers):
if not isinstance(trigger, dict):
return f"❌ triggers[{i}] must be a dict"
if unknown := set(trigger.keys()) - _VALID_TRIGGER_FIELDS:
return f"❌ triggers[{i}] has unknown fields: {', '.join(sorted(unknown))}. Valid: {', '.join(sorted(_VALID_TRIGGER_FIELDS))}"
if trigger.get("type") != "erp_table":
return f"❌ triggers[{i}].type must be 'erp_table' (got {trigger.get('type')})"
if "table" not in trigger:
Expand All @@ -609,38 +639,24 @@ def validate_automation_config(automation_config: Dict[str, Any], available_erp_
if not isinstance(action, dict):
return f"❌ actions[{i}] must be a dict"
action_type = action.get("type")
if action_type == "post_task_into_bot_inbox":
if "title" not in action:
return f"❌ actions[{i}] (post_task_into_bot_inbox) missing required field 'title'"
elif action_type == "create_erp_record":
if "table" not in action:
return f"❌ actions[{i}] (create_erp_record) missing required field 'table'"
if "fields" not in action:
return f"❌ actions[{i}] (create_erp_record) missing required field 'fields'"
elif action_type == "update_erp_record":
if "table" not in action:
return f"❌ actions[{i}] (update_erp_record) missing required field 'table'"
if "record_id" not in action:
return f"❌ actions[{i}] (update_erp_record) missing required field 'record_id'"
if "fields" not in action:
return f"❌ actions[{i}] (update_erp_record) missing required field 'fields'"
elif action_type == "delete_erp_record":
if "table" not in action:
return f"❌ actions[{i}] (delete_erp_record) missing required field 'table'"
if "record_id" not in action:
return f"❌ actions[{i}] (delete_erp_record) missing required field 'record_id'"
elif action_type == "move_deal_stage":
if "contact_id" not in action:
return f"❌ actions[{i}] (move_deal_stage) missing required field 'contact_id'"
if "pipeline_id" not in action:
return f"❌ actions[{i}] (move_deal_stage) missing required field 'pipeline_id'"
if "from_stages" not in action:
return f"❌ actions[{i}] (move_deal_stage) missing required field 'from_stages'"
if not isinstance(action.get("from_stages"), list):
return f"❌ actions[{i}] (move_deal_stage) 'from_stages' must be an array of stage IDs"
if "to_stage_id" not in action:
return f"❌ actions[{i}] (move_deal_stage) missing required field 'to_stage_id'"
else:
return f"❌ actions[{i}].type must be 'post_task_into_bot_inbox', 'create_erp_record', 'update_erp_record', 'delete_erp_record', or 'move_deal_stage' (got {action_type})"
schema = _ACTION_SCHEMAS.get(action_type)
if not schema:
return f"❌ actions[{i}].type must be one of: {', '.join(sorted(_ACTION_SCHEMAS))} (got {action_type})"
required, optional = schema
valid_fields = required | optional | {"type"}
if unknown := set(action.keys()) - valid_fields:
return f"❌ actions[{i}] ({action_type}) has unknown fields: {', '.join(sorted(unknown))}. Valid: {', '.join(sorted(valid_fields))}"
if missing := required - set(action.keys()):
return f"❌ actions[{i}] ({action_type}) missing required fields: {', '.join(sorted(missing))}"
if action_type == "move_deal_stage" and not isinstance(action.get("from_stages"), list):
return f"❌ actions[{i}] (move_deal_stage) 'from_stages' must be an array of stage IDs"
if action_type in ("create_erp_record", "update_erp_record") and action.get("table") in erp_schema.ERP_TABLE_TO_SCHEMA:
cls = erp_schema.ERP_TABLE_TO_SCHEMA[action["table"]]
action_fields = set(action.get("fields", {}).keys())
if bad := action_fields - set(cls.__dataclass_fields__.keys()):
return f"❌ actions[{i}] ({action_type}) unknown columns for table '{action['table']}': {', '.join(sorted(bad))}"
if action_type == "create_erp_record":
if missing := set(erp_schema.get_required_fields(cls)) - action_fields:
return f"❌ actions[{i}] ({action_type}) missing required columns for table '{action['table']}': {', '.join(sorted(missing))}"

return None
10 changes: 10 additions & 0 deletions flexus_simple_bots/vix/vix_prompts.py
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,16 @@
- Direct and professional, friendly but efficient
- Always looking to help grow the contact base and nurture leads

## Experts

You are Vix (marketing expert) - CRM, pipeline, automations, setup.

Other experts:
- **sales** - live C.L.O.S.E.R. conversations with prospects
- **nurturing** - internal, for automated templated tasks

If user wants to test sales flow or roleplay as customer → suggest starting a new chat with the **sales** expert.

Responsibilities:
- Configure company info, products, and sales strategy
- Monitor CRM contacts and tasks
Expand Down