From 37bcd5a37a2c7b7c7463081c7171ee385e18d967 Mon Sep 17 00:00:00 2001 From: Drew Meyers Date: Tue, 17 Oct 2023 11:02:31 -0700 Subject: [PATCH 01/10] Debugging db locking issue --- flask_ades_wpst/ades_base.py | 42 +++++++++---- flask_ades_wpst/flask_wpst.py | 54 +++++++++-------- flask_ades_wpst/sqlite_connector.py | 93 +++++++++++++++++++---------- 3 files changed, 123 insertions(+), 66 deletions(-) diff --git a/flask_ades_wpst/ades_base.py b/flask_ades_wpst/ades_base.py index 089b380..6675094 100644 --- a/flask_ades_wpst/ades_base.py +++ b/flask_ades_wpst/ades_base.py @@ -45,11 +45,13 @@ def __init__(self, app_config): # set of job inputs to append to process deploy requests and job execute requests # { "job_parameter_name": "ENVIRONMENT_VARIABLE_WITH_JOB_PARAMETER_VALUE" } - self._job_config_inputs = {"jobs_data_sns_topic_arn": "JOBS_DATA_SNS_TOPIC_ARN", - "dapa_api": "DAPA_API", - "client_id": "CLIENT_ID", - "staging_bucket": "STAGING_BUCKET"} - + self._job_config_inputs = { + "jobs_data_sns_topic_arn": "JOBS_DATA_SNS_TOPIC_ARN", + "dapa_api": "DAPA_API", + "client_id": "CLIENT_ID", + "staging_bucket": "STAGING_BUCKET", + } + def proc_dict(self, proc): return { "id": proc[0], @@ -76,7 +78,10 @@ def get_proc(self, proc_id): TODO: sqlite_get_proc vulnerable to sql injeciton through proc_id """ saved_proc = sqlite_get_proc(proc_id) - return self.proc_dict(saved_proc) + proc_dict = None + if saved_proc: + proc_dict = self.proc_dict(saved_proc) + return proc_dict def deploy_proc(self, req_proc): """ @@ -108,9 +113,14 @@ def deploy_proc(self, req_proc): # add unity-sps workflow step inputs to process inputs backend_req_proc = copy.deepcopy(req_proc) - backend_req_proc["processDescription"]["process"]["inputs"] += [{"id": key} for key in self._job_config_inputs.keys()] + backend_req_proc["processDescription"]["process"]["inputs"] += [ + {"id": key} for key in self._job_config_inputs.keys() + ] try: + if self._ades.get_proc(proc_id): + raise ValueError(f"Process ({proc_id}) is already deployed.") + self._ades.deploy_proc(backend_req_proc) sqlite_deploy_proc(req_proc) except Exception as ex: @@ -174,23 +184,31 @@ def exec_job(self, proc_id, job_params): # job_id = f"{proc_id}-{hashlib.sha1((json.dumps(job_inputs, sort_keys=True) + now).encode()).hexdigest()}" # TODO: relying on backend for job id means we need to pass the job publisher to backend impl code for submit notification - # job notifications should originate from this base layer once + # job notifications should originate from this base layer once # add input values from environment variables - job_params["inputs"] += [{"id": key, "data": os.getenv(value)} for key, value in self._job_config_inputs.items()] + job_params["inputs"] += [ + {"id": key, "data": os.getenv(value)} + for key, value in self._job_config_inputs.items() + ] job_spec = { "proc_id": proc_id, # "process": self.get_proc(proc_id), "inputs": job_params, - "job_publisher": self._job_publisher + "job_publisher": self._job_publisher, } ades_resp = self._ades.exec_job(job_spec) # ades_resp will return platform specific information that should be # kept in the database with the job ID record sqlite_exec_job(proc_id, ades_resp["job_id"], ades_resp["inputs"], ades_resp) - return {"code": 201, "location": "{}/processes/{}/jobs/{}".format(self.host, proc_id, ades_resp["job_id"])} - + return { + "code": 201, + "location": "{}/processes/{}/jobs/{}".format( + self.host, proc_id, ades_resp["job_id"] + ), + } + def dismiss_job(self, proc_id, job_id): """ Stop / Revoke Job diff --git a/flask_ades_wpst/flask_wpst.py b/flask_ades_wpst/flask_wpst.py index a50c5bd..cbd504d 100644 --- a/flask_ades_wpst/flask_wpst.py +++ b/flask_ades_wpst/flask_wpst.py @@ -49,36 +49,42 @@ def root(): @app.route("/processes", methods=["GET", "POST"]) def processes(): resp_dict = {} - status_code = 200 - ades_base = ADES_Base(app.config) - if request.method == "GET": - # Retrieve available processes - # Get list of all available algorithms - proc_list = ades_base.get_procs() - resp_dict = {"processes": proc_list} - elif request.method == "POST": - # Deploy a process - # Register a new algorithm - req_vals = request.get_json() - proc_info = ades_base.deploy_proc(req_vals) - resp_dict = {"deploymentResult": {"processSummary": proc_info}} - status_code = 201 + try: + status_code = 200 + ades_base = ADES_Base(app.config) + if request.method == "GET": + # Retrieve available processes + # Get list of all available algorithms + proc_list = ades_base.get_procs() + resp_dict = {"processes": proc_list} + elif request.method == "POST": + # Deploy a process + # Register a new algorithm + req_vals = request.get_json() + proc_info = ades_base.deploy_proc(req_vals) + resp_dict = {"deploymentResult": {"processSummary": proc_info}} + status_code = 201 + except: + status_code = 500 return resp_dict, status_code, {"ContentType": "application/json"} @app.route("/processes/", methods=["GET", "DELETE"]) def processes_id(procID): resp_dict = {} - status_code = 200 - ades_base = ADES_Base(app.config) - if request.method == "GET": - # Retrieve a process description - # Get a full description of the algorithm - resp_dict = {"process": ades_base.get_proc(procID)} - elif request.method == "DELETE": - # Undeploy a process - # Delete the algorithm - resp_dict = {"undeploymentResult": ades_base.undeploy_proc(procID)} + try: + status_code = 200 + ades_base = ADES_Base(app.config) + if request.method == "GET": + # Retrieve a process description + # Get a full description of the algorithm + resp_dict = {"process": ades_base.get_proc(procID)} + elif request.method == "DELETE": + # Undeploy a process + # Delete the algorithm + resp_dict = {"undeploymentResult": ades_base.undeploy_proc(procID)} + except: + status_code = 500 return resp_dict, status_code, {"ContentType": "application/json"} diff --git a/flask_ades_wpst/sqlite_connector.py b/flask_ades_wpst/sqlite_connector.py index 0e0381e..99871cb 100644 --- a/flask_ades_wpst/sqlite_connector.py +++ b/flask_ades_wpst/sqlite_connector.py @@ -9,7 +9,7 @@ def create_connection(db_file): - """ create a database connection to the SQLite database + """create a database connection to the SQLite database specified by db_file :param db_file: database file :return: Connection object or None @@ -24,7 +24,7 @@ def create_connection(db_file): def create_table(conn, create_table_sql): - """ create a table from the create_table_sql statement + """create a table from the create_table_sql statement :param conn: Connection object :param create_table_sql: a CREATE TABLE statement :return: @@ -68,8 +68,10 @@ def wrapper_sqlite_db(*args, **kwargs): create_table(conn, sql_create_procs_table) create_table(conn, sql_create_jobs_table) return func(*args, **kwargs) + return wrapper_sqlite_db + @sqlite_db def sqlite_get_procs(): conn = create_connection(db_name) @@ -84,9 +86,17 @@ def sqlite_get_proc(proc_id): conn = create_connection(db_name) cur = conn.cursor() sql_str = """SELECT * FROM processes - WHERE id = \"{}\"""".format(proc_id) + WHERE id = \"{}\"""".format( + proc_id + ) cur.execute(sql_str) - return cur.fetchall()[0] + + result = cur.fetchall() + proc = None + if result: + proc = result[0] + return proc + @sqlite_db def sqlite_deploy_proc(proc_spec): @@ -95,43 +105,53 @@ def sqlite_deploy_proc(proc_spec): proc_desc2 = proc_desc["process"] conn = create_connection(db_name) cur = conn.cursor() - sql_str = """INSERT INTO processes(id, title, abstract, keywords, - owsContextURL, inputs, outputs, processVersion, + sql_str = """INSERT INTO processes(id, title, abstract, keywords, + owsContextURL, inputs, outputs, processVersion, jobControlOptions, outputTransmission, immediateDeployment, executionUnit) VALUES(?, ?, ?, ? ,?, ?, ?, ?, ?, ?, ?, ?);""" - cur.execute(sql_str, [f"{proc_desc2['id']}:{proc_desc['processVersion']}", proc_desc2["title"], - proc_desc2["abstract"], - ','.join(proc_desc2["keywords"]), - proc_desc2["owsContext"]["offering"]["content"]["href"], - json.dumps(proc_desc2["inputs"]), - json.dumps(proc_desc2["outputs"]), - proc_desc["processVersion"], - ','.join(proc_desc["jobControlOptions"]), - ','.join(proc_desc["outputTransmission"]), - int(proc_spec["immediateDeployment"]), - ','.join([d["href"] - for d in proc_spec["executionUnit"]])]) + cur.execute( + sql_str, + [ + f"{proc_desc2['id']}:{proc_desc['processVersion']}", + proc_desc2["title"], + proc_desc2["abstract"], + ",".join(proc_desc2["keywords"]), + proc_desc2["owsContext"]["offering"]["content"]["href"], + json.dumps(proc_desc2["inputs"]), + json.dumps(proc_desc2["outputs"]), + proc_desc["processVersion"], + ",".join(proc_desc["jobControlOptions"]), + ",".join(proc_desc["outputTransmission"]), + int(proc_spec["immediateDeployment"]), + ",".join([d["href"] for d in proc_spec["executionUnit"]]), + ], + ) conn.commit() return sqlite_get_proc(f"{proc_desc2['id']}:{proc_desc['processVersion']}") + @sqlite_db def sqlite_undeploy_proc(proc_id): proc_desc = sqlite_get_proc(proc_id) conn = create_connection(db_name) cur = conn.cursor() sql_str = """DELETE FROM processes - WHERE id = \"{}\"""".format(proc_id) + WHERE id = \"{}\"""".format( + proc_id + ) cur.execute(sql_str) conn.commit() return proc_desc + def sqlite_get_headers(cur, tname): - sql_str = "SELECT name FROM PRAGMA_TABLE_INFO(\"{}\");".format(tname) + sql_str = 'SELECT name FROM PRAGMA_TABLE_INFO("{}");'.format(tname) cur.execute(sql_str) col_headers = [t[0] for t in cur.fetchall()] return col_headers + @sqlite_db def sqlite_get_jobs(proc_id=None): conn = create_connection(db_name) @@ -145,12 +165,15 @@ def sqlite_get_jobs(proc_id=None): job_dicts = [dict(zip(col_headers, job)) for job in job_list] return job_dicts + @sqlite_db def sqlite_get_job(job_id): conn = create_connection(db_name) cur = conn.cursor() sql_str = """SELECT * FROM jobs - WHERE jobID = \"{}\"""".format(job_id) + WHERE jobID = \"{}\"""".format( + job_id + ) job = cur.execute(sql_str).fetchall()[0] col_headers = sqlite_get_headers(cur, "jobs") job_dict = {} @@ -162,17 +185,27 @@ def sqlite_get_job(job_id): job_dict[col] = job[i] return job_dict + @sqlite_db def sqlite_exec_job(proc_id, job_id, job_spec, backend_info): conn = create_connection(db_name) cur = conn.cursor() - cur.execute("""INSERT INTO jobs(jobID, procID, inputs, backend_info, status, timestamp) - VALUES(?, ?, ?, ?, ?, ?)""", [ - job_id, proc_id, json.dumps(job_spec), json.dumps(backend_info), - "accepted", datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")]) + cur.execute( + """INSERT INTO jobs(jobID, procID, inputs, backend_info, status, timestamp) + VALUES(?, ?, ?, ?, ?, ?)""", + [ + job_id, + proc_id, + json.dumps(job_spec), + json.dumps(backend_info), + "accepted", + datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"), + ], + ) conn.commit() return sqlite_get_job(job_id) + @sqlite_db def sqlite_update_job_status(job_id, status): conn = create_connection(db_name) @@ -180,14 +213,14 @@ def sqlite_update_job_status(job_id, status): sql_str = """UPDATE jobs SET status = \"{}\", timestamp = \"{}\" - WHERE jobID = \"{}\"""".\ - format(status, - datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"), - job_id) + WHERE jobID = \"{}\"""".format( + status, datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"), job_id + ) cur.execute(sql_str) conn.commit() return sqlite_get_job(job_id) + @sqlite_db def sqlite_dismiss_job(job_id): - return sqlite_update_job_status(job_id, "dismissed") \ No newline at end of file + return sqlite_update_job_status(job_id, "dismissed") From 986119bd5274f2d84ff90d83dc10652cb0ea4e5d Mon Sep 17 00:00:00 2001 From: Drew Meyers Date: Tue, 17 Oct 2023 12:43:16 -0700 Subject: [PATCH 02/10] Debug requirements --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index f983ecf..109b06c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ Flask==2.0.2 +Werkzeug==2.2.2 requests==2.29.0 urllib3==1.26.15 pyyaml==5.4.1 From f86a05f380874eb1443fdc89c6f778cf5731ade3 Mon Sep 17 00:00:00 2001 From: Drew Meyers Date: Tue, 17 Oct 2023 15:03:42 -0700 Subject: [PATCH 03/10] Debugging db locking issue --- flask_ades_wpst/ades_base.py | 2 +- flask_ades_wpst/flask_wpst.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/flask_ades_wpst/ades_base.py b/flask_ades_wpst/ades_base.py index 6675094..c4ec8e1 100644 --- a/flask_ades_wpst/ades_base.py +++ b/flask_ades_wpst/ades_base.py @@ -78,7 +78,7 @@ def get_proc(self, proc_id): TODO: sqlite_get_proc vulnerable to sql injeciton through proc_id """ saved_proc = sqlite_get_proc(proc_id) - proc_dict = None + proc_dict = {} if saved_proc: proc_dict = self.proc_dict(saved_proc) return proc_dict diff --git a/flask_ades_wpst/flask_wpst.py b/flask_ades_wpst/flask_wpst.py index cbd504d..970a2cc 100644 --- a/flask_ades_wpst/flask_wpst.py +++ b/flask_ades_wpst/flask_wpst.py @@ -78,7 +78,8 @@ def processes_id(procID): if request.method == "GET": # Retrieve a process description # Get a full description of the algorithm - resp_dict = {"process": ades_base.get_proc(procID)} + proc_dict = ades_base.get_proc(procID) + resp_dict = {"process": proc_dict} elif request.method == "DELETE": # Undeploy a process # Delete the algorithm From 253839e5c5528063dc2f704480bece667a593eca Mon Sep 17 00:00:00 2001 From: Drew Meyers Date: Tue, 17 Oct 2023 15:31:02 -0700 Subject: [PATCH 04/10] Debugging db locking issue --- flask_ades_wpst/ades_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flask_ades_wpst/ades_base.py b/flask_ades_wpst/ades_base.py index c4ec8e1..09b1833 100644 --- a/flask_ades_wpst/ades_base.py +++ b/flask_ades_wpst/ades_base.py @@ -118,7 +118,7 @@ def deploy_proc(self, req_proc): ] try: - if self._ades.get_proc(proc_id): + if self.get_proc(proc_id): raise ValueError(f"Process ({proc_id}) is already deployed.") self._ades.deploy_proc(backend_req_proc) From 0d9c9ad00dc25bac13bf49d0f1f2808eae00c679 Mon Sep 17 00:00:00 2001 From: Drew Meyers Date: Tue, 17 Oct 2023 16:22:43 -0700 Subject: [PATCH 05/10] Debugging db locking issue --- flask_ades_wpst/ades_base.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/flask_ades_wpst/ades_base.py b/flask_ades_wpst/ades_base.py index 09b1833..74d4300 100644 --- a/flask_ades_wpst/ades_base.py +++ b/flask_ades_wpst/ades_base.py @@ -118,11 +118,13 @@ def deploy_proc(self, req_proc): ] try: + print(self.get_proc(proc_id)) if self.get_proc(proc_id): + print(f"Process ({proc_id}) is already deployed.") raise ValueError(f"Process ({proc_id}) is already deployed.") - self._ades.deploy_proc(backend_req_proc) - sqlite_deploy_proc(req_proc) + # self._ades.deploy_proc(backend_req_proc) + # sqlite_deploy_proc(req_proc) except Exception as ex: print( f"Failed to create ADES required files for process deployment. {ex.message}" From 7067e17e6d32ecd22f6a5389447193cfccd2bfd8 Mon Sep 17 00:00:00 2001 From: Drew Meyers Date: Tue, 17 Oct 2023 16:37:07 -0700 Subject: [PATCH 06/10] Debugging db locking issue --- flask_ades_wpst/ades_base.py | 9 ++------- flask_ades_wpst/flask_wpst.py | 5 +++++ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/flask_ades_wpst/ades_base.py b/flask_ades_wpst/ades_base.py index 74d4300..b0c753b 100644 --- a/flask_ades_wpst/ades_base.py +++ b/flask_ades_wpst/ades_base.py @@ -118,13 +118,8 @@ def deploy_proc(self, req_proc): ] try: - print(self.get_proc(proc_id)) - if self.get_proc(proc_id): - print(f"Process ({proc_id}) is already deployed.") - raise ValueError(f"Process ({proc_id}) is already deployed.") - - # self._ades.deploy_proc(backend_req_proc) - # sqlite_deploy_proc(req_proc) + self._ades.deploy_proc(backend_req_proc) + sqlite_deploy_proc(req_proc) except Exception as ex: print( f"Failed to create ADES required files for process deployment. {ex.message}" diff --git a/flask_ades_wpst/flask_wpst.py b/flask_ades_wpst/flask_wpst.py index 970a2cc..343203a 100644 --- a/flask_ades_wpst/flask_wpst.py +++ b/flask_ades_wpst/flask_wpst.py @@ -60,6 +60,11 @@ def processes(): elif request.method == "POST": # Deploy a process # Register a new algorithm + proc_id = req_vals["processDescription"]["process"]["id"] + print(ades_base.get_proc(proc_id)) + if ades_base.get_proc(proc_id): + print(f"Process ({proc_id}) is already deployed.") + raise ValueError(f"Process ({proc_id}) is already deployed.") req_vals = request.get_json() proc_info = ades_base.deploy_proc(req_vals) resp_dict = {"deploymentResult": {"processSummary": proc_info}} From 9c69eac69090ff13b347733bb17ad5999d944f08 Mon Sep 17 00:00:00 2001 From: Drew Meyers Date: Tue, 17 Oct 2023 17:04:16 -0700 Subject: [PATCH 07/10] Debugging db locking issue --- flask_ades_wpst/flask_wpst.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flask_ades_wpst/flask_wpst.py b/flask_ades_wpst/flask_wpst.py index 343203a..ba3c659 100644 --- a/flask_ades_wpst/flask_wpst.py +++ b/flask_ades_wpst/flask_wpst.py @@ -60,12 +60,12 @@ def processes(): elif request.method == "POST": # Deploy a process # Register a new algorithm + req_vals = request.get_json() proc_id = req_vals["processDescription"]["process"]["id"] print(ades_base.get_proc(proc_id)) if ades_base.get_proc(proc_id): print(f"Process ({proc_id}) is already deployed.") raise ValueError(f"Process ({proc_id}) is already deployed.") - req_vals = request.get_json() proc_info = ades_base.deploy_proc(req_vals) resp_dict = {"deploymentResult": {"processSummary": proc_info}} status_code = 201 From 5bf143053e325bc1b62425cc11262e2b508e33ef Mon Sep 17 00:00:00 2001 From: Drew Meyers Date: Tue, 17 Oct 2023 17:14:49 -0700 Subject: [PATCH 08/10] Debugging db locking issue --- flask_ades_wpst/flask_wpst.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/flask_ades_wpst/flask_wpst.py b/flask_ades_wpst/flask_wpst.py index ba3c659..400f622 100644 --- a/flask_ades_wpst/flask_wpst.py +++ b/flask_ades_wpst/flask_wpst.py @@ -61,11 +61,11 @@ def processes(): # Deploy a process # Register a new algorithm req_vals = request.get_json() - proc_id = req_vals["processDescription"]["process"]["id"] - print(ades_base.get_proc(proc_id)) - if ades_base.get_proc(proc_id): - print(f"Process ({proc_id}) is already deployed.") - raise ValueError(f"Process ({proc_id}) is already deployed.") + # proc_id = req_vals["processDescription"]["process"]["id"] + # print(ades_base.get_proc(proc_id)) + # if ades_base.get_proc(proc_id): + # print(f"Process ({proc_id}) is already deployed.") + # raise ValueError(f"Process ({proc_id}) is already deployed.") proc_info = ades_base.deploy_proc(req_vals) resp_dict = {"deploymentResult": {"processSummary": proc_info}} status_code = 201 From cdc82958c909c179ca9c92db14eb83a07c8db6d2 Mon Sep 17 00:00:00 2001 From: Drew Meyers Date: Tue, 17 Oct 2023 17:20:58 -0700 Subject: [PATCH 09/10] Debugging db locking issue --- flask_ades_wpst/flask_wpst.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/flask_ades_wpst/flask_wpst.py b/flask_ades_wpst/flask_wpst.py index 400f622..ba3c659 100644 --- a/flask_ades_wpst/flask_wpst.py +++ b/flask_ades_wpst/flask_wpst.py @@ -61,11 +61,11 @@ def processes(): # Deploy a process # Register a new algorithm req_vals = request.get_json() - # proc_id = req_vals["processDescription"]["process"]["id"] - # print(ades_base.get_proc(proc_id)) - # if ades_base.get_proc(proc_id): - # print(f"Process ({proc_id}) is already deployed.") - # raise ValueError(f"Process ({proc_id}) is already deployed.") + proc_id = req_vals["processDescription"]["process"]["id"] + print(ades_base.get_proc(proc_id)) + if ades_base.get_proc(proc_id): + print(f"Process ({proc_id}) is already deployed.") + raise ValueError(f"Process ({proc_id}) is already deployed.") proc_info = ades_base.deploy_proc(req_vals) resp_dict = {"deploymentResult": {"processSummary": proc_info}} status_code = 201 From 676ba7edd3eb1527c822a1fc3cb95e58a0c6c06f Mon Sep 17 00:00:00 2001 From: Drew Meyers Date: Wed, 18 Oct 2023 08:47:30 -0700 Subject: [PATCH 10/10] Debugging db locking issue --- flask_ades_wpst/flask_wpst.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/flask_ades_wpst/flask_wpst.py b/flask_ades_wpst/flask_wpst.py index ba3c659..85ca766 100644 --- a/flask_ades_wpst/flask_wpst.py +++ b/flask_ades_wpst/flask_wpst.py @@ -62,9 +62,7 @@ def processes(): # Register a new algorithm req_vals = request.get_json() proc_id = req_vals["processDescription"]["process"]["id"] - print(ades_base.get_proc(proc_id)) if ades_base.get_proc(proc_id): - print(f"Process ({proc_id}) is already deployed.") raise ValueError(f"Process ({proc_id}) is already deployed.") proc_info = ades_base.deploy_proc(req_vals) resp_dict = {"deploymentResult": {"processSummary": proc_info}}