diff --git a/forloop_modules/function_handlers/database_handlers.py b/forloop_modules/function_handlers/database_handlers.py index a0f32a1..5a02822 100644 --- a/forloop_modules/function_handlers/database_handlers.py +++ b/forloop_modules/function_handlers/database_handlers.py @@ -137,6 +137,57 @@ def get_db_table_from_db(table_name: str, db_name: str) -> Union[DbTable, None]: return db_table + +def get_db_connection_by_db_name(db_name: str, icon_type: str) -> Optional[dbc.DbConnection]: + """ + Resolve and connect to a project database by name using the common pattern: + - look up DB in project DBs + - decrypt stored password using Redis + RSA + - build DbConnection + - test connectivity and show standard popup on failure + + Returns a ready-to-use DbConnection or None if the connection cannot be established. + """ + project_databases = ncrb.get_all_databases_by_project_uid() + db_dict = filter_database_by_name_from_all_project_databases( + project_databases=project_databases, + db_name=db_name, + ) + + if db_dict is None: + # User selects from stored DBs so this shouldn't happen. If this is raised, these is an issue in code probably. + raise Exception(f'{icon_type}: No DB named {db_name} found in project DBs.') + + redis_key = create_redis_key_for_project_db_private_key(project_uid=aet.project_uid) + private_key_base64 = kv_redis.get(redis_key) + + if private_key_base64 is None: + return None + + private_key = convert_base64_private_key_to_rsa_private_key(private_key_base64=private_key_base64) + + encrypted_password = db_dict["password"] + decrypted_password = decrypt_text(text=encrypted_password, private_key=private_key) + + db_dict["password"] = decrypted_password + + db_details = dbc.create_db_details_from_database_dict(db_dict=db_dict) + db_connection = dbc.DbConnection(db_details=db_details) + + # Safeguard against uninitialized or failed connections + try: + is_connected = db_connection.test_database_connection() + except Exception as e: + flog.error(f"{icon_type}: DB connection failed – {e.__class__.__name__}: {e}") + ncrb.new_popup([500, 400], "RaiseNotConnectedPopup") + return None + + if not is_connected: + ncrb.new_popup([500, 400], "RaiseNotConnectedPopup") + return None + + return db_connection + def generate_sql_condition(cols_to_be_selected, dbtable_name, column_name, value, operator, limit, dataset=None): """ Generates an SQL query string with optional filtering and limit. @@ -226,11 +277,10 @@ def __init__(self): def _init_docs(self): self.docs = Docs(description=self.__doc__) self.docs.add_parameter_table_row( - title="DB table name", - name="db_table_name", - description="Database table on which the query is executed.", - typ="Comboentry", - example=['employees', 'salaries_table'] + title="Database", + name="db_name", + description="A name of the database on which the query is executed.", + typ="Combobox", ) self.docs.add_parameter_table_row( title="Query", @@ -250,75 +300,109 @@ def _init_docs(self): ) def make_form_dict_list(self, *args, options=None, node_detail_form=None): - db_tables = get_connected_db_table_names() + databases = options.get("databases", []) if options is not None else [] + database_names = [database["database_name"] for database in databases] fdl = FormDictList(docs=self.docs) fdl.label(self.fn_name) - fdl.label("DB table name") - fdl.comboentry(name="db_table_name", text="", options=db_tables, row=1) + fdl.label("Database") + fdl.combobox(name="db_name", options=database_names, row=1) fdl.label("Query") fdl.entry(name="query", text="", input_types=["str"], row=2) - fdl.label("Specify new variable name with SELECT query") fdl.label("New variable") - fdl.entry(name="new_var_name", text="", category="new_var", input_types=["str"], row=4) + fdl.entry(name="new_var_name", text="", category="new_var", input_types=["str"], row=3) fdl.button(function=self.execute, function_args=node_detail_form, text="Execute", focused=True) return fdl def execute(self, node_detail_form): - db_table_name = node_detail_form.get_chosen_value_by_name("db_table_name", variable_handler) - db_table_name = parse_comboentry_input(input_value=db_table_name) + db_name = node_detail_form.get_chosen_value_by_name("db_name", variable_handler) + db_name = parse_comboentry_input(input_value=db_name) query = node_detail_form.get_chosen_value_by_name("query", variable_handler) new_var_name = node_detail_form.get_chosen_value_by_name("new_var_name", variable_handler) - new_var_name = variable_handler._set_up_unique_varname(new_var_name) + # Normalize query to safely inspect its first word + normalized_query = query if isinstance(query, str) else "" + first_word = normalized_query.split(" ")[0].lower().strip() if normalized_query.strip() else "" + + # If this is a SELECT query, behave like DBSelectHandler: + # - do NOT create a separate DataFrame node + # - attach / refresh `shown_dataframe` on THIS node + # - store the DataFrame under `new_var_name` + # - mark THIS node as the last active dataframe node + if first_word == "select": + if not db_name: + return - self.direct_execute(db_table_name, query, new_var_name) + # Ensure we have a unique dataframe var name and attach a `shown_dataframe` field + new_var_name = self.update_node_fields_with_shown_dataframe( + node_detail_form, + new_var_name, + ) - def direct_execute(self, db_table_name, query, new_var_name): - if not db_table_name: - return + db_connection = get_db_connection_by_db_name(db_name=db_name, icon_type=self.icon_type) + if db_connection is None: + return + + db_instance = db_connection.db_instance + df_new = self._get_df_from_query(db_instance, query) + + df_new = validate_input_data_types(df_new) + variable_handler.new_variable(new_var_name, df_new) + + # Mirror DBSelectHandler – mark this DBQuery node as the last active dataframe node + ncrb.update_last_active_dataframe_node_uid(node_detail_form.node_uid) + else: + # Non-SELECT queries retain the original behaviour (just execute the query) + self.direct_execute(db_name, query) - matching_dbtables = get_name_matching_db_tables(db_table_name) - - if len(matching_dbtables) == 1: - dbtable = matching_dbtables[0] - db_instance = dbtable.db1 - - with db_instance.connect_to_db(): - flog.info(f"Qeury: {query}") - flog.info(f'FIRST Query word altered: {query.split(" ")[0].lower().strip()}') - if query.split(" ")[0].lower().strip() == "select": - # Hotfix for the df Image to show a proper label - # TODO Daniel/Tomas: Refactor out - new_var_name = variable_handler._set_up_unique_varname(new_var_name) - fields = self.generate_shown_dataframe_option_field(new_var_name) - - response = ncrb.new_node(pos=[500, 300], typ="DataFrame", fields=fields) - if response.status_code in [200, 201]: - result = json.loads(response.content.decode('utf-8')) - node_uid = result["uid"] + def _get_df_from_query(self, db_instance: DBInstance, query: str) -> pd.DataFrame: + """ + Execute an arbitrary SELECT query on the given DB instance and return the result as a DataFrame. - try: - rows = dbtable.select(query) - except Exception as e: - flog.error(f"DBTABLE SELECT ERROR {e}") + Uses a BigQuery-specific path when necessary and falls back to a generic cursor-based + implementation for SQL databases. + """ + if type(db_instance) is dh.BigQueryDb: + try: + df_new = db_instance.select_to_df(query) + except Exception as e: + flog.error(f"DBQUERY BIGQUERY SELECT ERROR {e}") + df_new = pd.DataFrame() + return df_new - df_new = pd.DataFrame(rows,columns=dbtable.columns) - flog.info(f"DF: {df_new}") + rows = [] + columns = None - df_new = validate_input_data_types(df_new) - variable_handler.new_variable(new_var_name, df_new) + with db_instance.connect_to_db(): + try: + # Use the underlying cursor API to preserve column ordering + db_instance.cursor.execute(query) + rows = db_instance.cursor.fetchall() + if getattr(db_instance.cursor, "description", None): + columns = [desc[0] for desc in db_instance.cursor.description] + except Exception as e: + flog.error(f"DBQUERY EXECUTE SELECT ERROR {e}") - ncrb.update_last_active_dataframe_node_uid(node_uid) - else: - raise HTTPException(status_code=response.status_code, detail="Error requesting new node from api") - else: - try: - dbtable.db1.execute(query) - except Exception as e: - flog.error(f"DBTABLE EXECUTE ERROR {e}") + try: + df_new = pd.DataFrame(rows, columns=columns) + except Exception as e: + flog.error(f"DBQUERY DATAFRAME CONVERSION ERROR {e}") + df_new = pd.DataFrame() + + return df_new + + def direct_execute(self, db_name, query): + if not db_name: + return + + db_connection = get_db_connection_by_db_name(db_name=db_name, icon_type=self.icon_type) + if db_connection is None: + return + + with db_connection.db_instance.connect_to_db(): + db_connection.db_instance.execute(query=query) class MySQLQueryHandler(AbstractFunctionHandler): def __init__(self): @@ -352,40 +436,12 @@ def execute(self, node_detail_form): self.direct_execute(db_name, query) def direct_execute(self, db_name, query): - project_databases = ncrb.get_all_databases_by_project_uid() - db_dict = filter_database_by_name_from_all_project_databases(project_databases=project_databases, db_name=db_name) - - if db_dict is None: - # User selects from stored DBs so this shouldn't happen. If this is raised, these is an issue in code probably. - raise Exception(f'{self.icon_type}: No DB named {db_name} found in project DBs.') - - redis_key = create_redis_key_for_project_db_private_key(project_uid=aet.project_uid) - private_key_base64 = kv_redis.get(redis_key) - - if private_key_base64 is not None: - private_key = convert_base64_private_key_to_rsa_private_key(private_key_base64=private_key_base64) - - encrypted_password = db_dict["password"] - decrypted_password = decrypt_text(text=encrypted_password, private_key=private_key) - - db_dict["password"] = decrypted_password - - db_details = dbc.create_db_details_from_database_dict(db_dict=db_dict) - db_connection = dbc.DbConnection(db_details=db_details) - # Safeguard against uninitialized or failed connections - try: - is_connected = db_connection.test_database_connection() - except Exception as e: - flog.error(f"{self.icon_type}: DB connection failed – {e.__class__.__name__}: {e}") - ncrb.new_popup([500, 400], "RaiseNotConnectedPopup") - return - if not is_connected: - ncrb.new_popup([500, 400], "RaiseNotConnectedPopup") - return + db_connection = get_db_connection_by_db_name(db_name=db_name, icon_type=self.icon_type) + if db_connection is None: + return - if is_connected: - with db_connection.db_instance.connect_to_db(): - db_connection.db_instance.execute(query=query) + with db_connection.db_instance.connect_to_db(): + db_connection.db_instance.execute(query=query) class DBSelectHandler(AbstractFunctionHandler): """ @@ -506,8 +562,30 @@ def execute(self, node_detail_form): where_value = node_detail_form.get_chosen_value_by_name("where_value", variable_handler) limit = node_detail_form.get_chosen_value_by_name("limit", variable_handler) new_var_name = node_detail_form.get_chosen_value_by_name("new_var_name", variable_handler) - - self.direct_execute(db_name, db_table_name, select, where_column_name, where_operator, where_value, limit, new_var_name) + + # Mirror LoadExcelHandler pattern: + # 1) ensure we have a unique dataframe var name and attach a `shown_dataframe` field + new_var_name = self.update_node_fields_with_shown_dataframe( + node_detail_form, + new_var_name, + ) + + # 2) execute the DB select and store the dataframe into that variable + self.direct_execute( + db_name, + db_table_name, + select, + where_column_name, + where_operator, + where_value, + limit, + new_var_name, + ) + + # 3) mark this DBSelect node as the last active dataframe node + # so `/last_active_dataframe_node_uid` and `/get_last_active_df` + # can find and display its dataframe. + ncrb.update_last_active_dataframe_node_uid(node_detail_form.node_uid) # FIXME: Deprecated - causes issues on cloud (API crash) # fields = self.generate_shown_dataframe_option_field(new_var_name) @@ -538,49 +616,21 @@ def direct_execute(self, db_name, db_table_name, select, where_column_name, wher if isinstance(limit, str): limit = limit.strip() if isinstance(new_var_name, str): new_var_name = new_var_name.strip() - project_databases = ncrb.get_all_databases_by_project_uid() - db_dict = filter_database_by_name_from_all_project_databases(project_databases=project_databases, db_name=db_name) - if not new_var_name: new_var_name = f"{db_table_name}_data" - if db_dict is None: - # User selects from stored DBs so this shouldn't happen. If this is raised, these is an issue in code probably. - raise Exception(f'{self.icon_type}: No DB named {db_name} found in project DBs.') - - redis_key = create_redis_key_for_project_db_private_key(project_uid=aet.project_uid) - private_key_base64 = kv_redis.get(redis_key) - - if private_key_base64 is not None: - private_key = convert_base64_private_key_to_rsa_private_key(private_key_base64=private_key_base64) - - encrypted_password = db_dict["password"] - decrypted_password = decrypt_text(text=encrypted_password, private_key=private_key) - - db_dict["password"] = decrypted_password - - db_details = dbc.create_db_details_from_database_dict(db_dict=db_dict) - db_connection = dbc.DbConnection(db_details=db_details) - # Safeguard against uninitialized or failed connections - try: - is_connected = db_connection.test_database_connection() - except Exception as e: - flog.error(f"{self.icon_type}: DB connection failed – {e.__class__.__name__}: {e}") - ncrb.new_popup([500, 400], "RaiseNotConnectedPopup") - return - if not is_connected: - ncrb.new_popup([500, 400], "RaiseNotConnectedPopup") - return + db_connection = get_db_connection_by_db_name(db_name=db_name, icon_type=self.icon_type) + if db_connection is None: + return + + db_table = db_connection.get_db_table(db_table_name) + if db_table is None: + return - if is_connected: - db_table = db_connection.get_db_table(db_table_name) - if db_table is None: - return - - df_new = self._get_df(select, db_table_name, db_connection.db_instance, db_table, where_column_name, where_operator, - where_value, limit) - df_new = validate_input_data_types(df_new) - variable_handler.new_variable(new_var_name, df_new) + df_new = self._get_df(select, db_table_name, db_connection.db_instance, db_table, where_column_name, where_operator, + where_value, limit) + df_new = validate_input_data_types(df_new) + variable_handler.new_variable(new_var_name, df_new) def select(self, db_instance, dbtable, query, cols_to_be_selected): if type(db_instance) is dh.MongoDb: @@ -1196,9 +1246,9 @@ def make_form_dict_list(self, *args, options=None, node_detail_form=None): fdl.label("Table name") fdl.entry(name="new_table_name", text="", input_types=["str"], required=True, row=2) fdl.label("Columns") - fdl.entry(name="columns", text="", input_types=["list"], required=True, row=3) + fdl.entry(name="columns", text="", input_types=["list; Example: [\"id\", \"name\"]"], required=True, row=3) fdl.label("Types") - fdl.entry(name="types", text="", input_types=["list"], required=True, row=4) + fdl.entry(name="types", text="", input_types=["list; Example: [\"int\", \"varchar(255)\"]"], required=True, row=4) fdl.button(function=self.execute, function_args=node_detail_form, text="Execute", focused=True) return fdl @@ -1209,9 +1259,55 @@ def execute(self, node_detail_form): columns = node_detail_form.get_chosen_value_by_name("columns", variable_handler) types = node_detail_form.get_chosen_value_by_name("types", variable_handler) + # Parse string representations of lists into actual lists + columns = self._parse_list_input(columns, "columns") + types = self._parse_list_input(types, "types") + + # Validate that columns and types have matching lengths + if len(columns) != len(types): + raise ValueError( + f"Columns and types lists must have the same length. " + f"Got {len(columns)} columns but {len(types)} types. " + f"Columns: {columns}, Types: {types}" + ) + self.direct_execute(db_name, new_table_name, columns, types) + + def _parse_list_input(self, value, field_name): + """Parse list input that may come as a string representation or actual list.""" + if isinstance(value, list): + return value + + if isinstance(value, str): + try: + parsed = ast.literal_eval(value) + if isinstance(parsed, list): + return parsed + else: + raise ValueError(f"{field_name} must be a list, got {type(parsed).__name__}") + except (ValueError, SyntaxError) as e: + raise ValueError( + f"Could not parse {field_name} as a list. " + f"Expected format: ['item1', 'item2', ...]. " + f"Got: {value}. Error: {e}" + ) + + raise ValueError(f"{field_name} must be a list or string representation of a list, got {type(value).__name__}") def direct_execute(self, db_name, new_table_name, columns, types): + # Validate inputs + if not isinstance(columns, list) or len(columns) == 0: + raise ValueError(f"Columns must be a non-empty list, got {type(columns).__name__}") + + if not isinstance(types, list) or len(types) == 0: + raise ValueError(f"Types must be a non-empty list, got {type(types).__name__}") + + if len(columns) != len(types): + raise ValueError( + f"Columns and types lists must have the same length. " + f"Got {len(columns)} columns but {len(types)} types." + ) + project_databases = ncrb.get_all_databases_by_project_uid() db_dict = filter_database_by_name_from_all_project_databases(project_databases=project_databases, db_name=db_name) diff --git a/forloop_modules/utils/definitions.py b/forloop_modules/utils/definitions.py index 9ef7539..be83a1d 100644 --- a/forloop_modules/utils/definitions.py +++ b/forloop_modules/utils/definitions.py @@ -17,7 +17,7 @@ ] DB_ICONS = [ - "DBSelect", "DBInsert", "DBDelete", "DBUpdate", "AnalyzeDbTable", "MySQLQuery", "CreateDbTable" + "DBSelect", "DBInsert", "DBDelete", "DBUpdate", "AnalyzeDbTable", "MySQLQuery", "CreateDbTable", "DBQuery", ] SCRIPT_ICONS = ["RunPythonScript", "RunJupyterScript"]