Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 27 additions & 12 deletions flask_ades_wpst/ades_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@ def __init__(self, app_config):

# set of job inputs to append to process deploy requests and job execute requests
# { "job_parameter_name": "ENVIRONMENT_VARIABLE_WITH_JOB_PARAMETER_VALUE" }
self._job_config_inputs = {"jobs_data_sns_topic_arn": "JOBS_DATA_SNS_TOPIC_ARN",
"dapa_api": "DAPA_API",
"client_id": "CLIENT_ID",
"staging_bucket": "STAGING_BUCKET"}

self._job_config_inputs = {
"jobs_data_sns_topic_arn": "JOBS_DATA_SNS_TOPIC_ARN",
"dapa_api": "DAPA_API",
"client_id": "CLIENT_ID",
"staging_bucket": "STAGING_BUCKET",
}

def proc_dict(self, proc):
return {
"id": proc[0],
Expand All @@ -76,7 +78,10 @@ def get_proc(self, proc_id):
TODO: sqlite_get_proc vulnerable to sql injeciton through proc_id
"""
saved_proc = sqlite_get_proc(proc_id)
return self.proc_dict(saved_proc)
proc_dict = {}
if saved_proc:
proc_dict = self.proc_dict(saved_proc)
return proc_dict

def deploy_proc(self, req_proc):
"""
Expand Down Expand Up @@ -108,7 +113,9 @@ def deploy_proc(self, req_proc):

# add unity-sps workflow step inputs to process inputs
backend_req_proc = copy.deepcopy(req_proc)
backend_req_proc["processDescription"]["process"]["inputs"] += [{"id": key} for key in self._job_config_inputs.keys()]
backend_req_proc["processDescription"]["process"]["inputs"] += [
{"id": key} for key in self._job_config_inputs.keys()
]

try:
self._ades.deploy_proc(backend_req_proc)
Expand Down Expand Up @@ -174,23 +181,31 @@ def exec_job(self, proc_id, job_params):
# job_id = f"{proc_id}-{hashlib.sha1((json.dumps(job_inputs, sort_keys=True) + now).encode()).hexdigest()}"

# TODO: relying on backend for job id means we need to pass the job publisher to backend impl code for submit notification
# job notifications should originate from this base layer once
# job notifications should originate from this base layer once

# add input values from environment variables
job_params["inputs"] += [{"id": key, "data": os.getenv(value)} for key, value in self._job_config_inputs.items()]
job_params["inputs"] += [
{"id": key, "data": os.getenv(value)}
for key, value in self._job_config_inputs.items()
]
job_spec = {
"proc_id": proc_id,
# "process": self.get_proc(proc_id),
"inputs": job_params,
"job_publisher": self._job_publisher
"job_publisher": self._job_publisher,
}
ades_resp = self._ades.exec_job(job_spec)

# ades_resp will return platform specific information that should be
# kept in the database with the job ID record
sqlite_exec_job(proc_id, ades_resp["job_id"], ades_resp["inputs"], ades_resp)
return {"code": 201, "location": "{}/processes/{}/jobs/{}".format(self.host, proc_id, ades_resp["job_id"])}

return {
"code": 201,
"location": "{}/processes/{}/jobs/{}".format(
self.host, proc_id, ades_resp["job_id"]
),
}

def dismiss_job(self, proc_id, job_id):
"""
Stop / Revoke Job
Expand Down
58 changes: 34 additions & 24 deletions flask_ades_wpst/flask_wpst.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,36 +49,46 @@ def root():
@app.route("/processes", methods=["GET", "POST"])
def processes():
resp_dict = {}
status_code = 200
ades_base = ADES_Base(app.config)
if request.method == "GET":
# Retrieve available processes
# Get list of all available algorithms
proc_list = ades_base.get_procs()
resp_dict = {"processes": proc_list}
elif request.method == "POST":
# Deploy a process
# Register a new algorithm
req_vals = request.get_json()
proc_info = ades_base.deploy_proc(req_vals)
resp_dict = {"deploymentResult": {"processSummary": proc_info}}
status_code = 201
try:
status_code = 200
ades_base = ADES_Base(app.config)
if request.method == "GET":
# Retrieve available processes
# Get list of all available algorithms
proc_list = ades_base.get_procs()
resp_dict = {"processes": proc_list}
elif request.method == "POST":
# Deploy a process
# Register a new algorithm
req_vals = request.get_json()
proc_id = req_vals["processDescription"]["process"]["id"]
if ades_base.get_proc(proc_id):
raise ValueError(f"Process ({proc_id}) is already deployed.")
proc_info = ades_base.deploy_proc(req_vals)
resp_dict = {"deploymentResult": {"processSummary": proc_info}}
status_code = 201
except:
status_code = 500
return resp_dict, status_code, {"ContentType": "application/json"}


@app.route("/processes/<procID>", methods=["GET", "DELETE"])
def processes_id(procID):
resp_dict = {}
status_code = 200
ades_base = ADES_Base(app.config)
if request.method == "GET":
# Retrieve a process description
# Get a full description of the algorithm
resp_dict = {"process": ades_base.get_proc(procID)}
elif request.method == "DELETE":
# Undeploy a process
# Delete the algorithm
resp_dict = {"undeploymentResult": ades_base.undeploy_proc(procID)}
try:
status_code = 200
ades_base = ADES_Base(app.config)
if request.method == "GET":
# Retrieve a process description
# Get a full description of the algorithm
proc_dict = ades_base.get_proc(procID)
resp_dict = {"process": proc_dict}
elif request.method == "DELETE":
# Undeploy a process
# Delete the algorithm
resp_dict = {"undeploymentResult": ades_base.undeploy_proc(procID)}
except:
status_code = 500
return resp_dict, status_code, {"ContentType": "application/json"}


Expand Down
93 changes: 63 additions & 30 deletions flask_ades_wpst/sqlite_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@


def create_connection(db_file):
""" create a database connection to the SQLite database
"""create a database connection to the SQLite database
specified by db_file
:param db_file: database file
:return: Connection object or None
Expand All @@ -24,7 +24,7 @@ def create_connection(db_file):


def create_table(conn, create_table_sql):
""" create a table from the create_table_sql statement
"""create a table from the create_table_sql statement
:param conn: Connection object
:param create_table_sql: a CREATE TABLE statement
:return:
Expand Down Expand Up @@ -68,8 +68,10 @@ def wrapper_sqlite_db(*args, **kwargs):
create_table(conn, sql_create_procs_table)
create_table(conn, sql_create_jobs_table)
return func(*args, **kwargs)

return wrapper_sqlite_db


@sqlite_db
def sqlite_get_procs():
conn = create_connection(db_name)
Expand All @@ -84,9 +86,17 @@ def sqlite_get_proc(proc_id):
conn = create_connection(db_name)
cur = conn.cursor()
sql_str = """SELECT * FROM processes
WHERE id = \"{}\"""".format(proc_id)
WHERE id = \"{}\"""".format(
proc_id
)
cur.execute(sql_str)
return cur.fetchall()[0]

result = cur.fetchall()
proc = None
if result:
proc = result[0]
return proc


@sqlite_db
def sqlite_deploy_proc(proc_spec):
Expand All @@ -95,43 +105,53 @@ def sqlite_deploy_proc(proc_spec):
proc_desc2 = proc_desc["process"]
conn = create_connection(db_name)
cur = conn.cursor()
sql_str = """INSERT INTO processes(id, title, abstract, keywords,
owsContextURL, inputs, outputs, processVersion,
sql_str = """INSERT INTO processes(id, title, abstract, keywords,
owsContextURL, inputs, outputs, processVersion,
jobControlOptions, outputTransmission,
immediateDeployment, executionUnit)
VALUES(?, ?, ?, ? ,?, ?, ?, ?, ?, ?, ?, ?);"""
cur.execute(sql_str, [f"{proc_desc2['id']}:{proc_desc['processVersion']}", proc_desc2["title"],
proc_desc2["abstract"],
','.join(proc_desc2["keywords"]),
proc_desc2["owsContext"]["offering"]["content"]["href"],
json.dumps(proc_desc2["inputs"]),
json.dumps(proc_desc2["outputs"]),
proc_desc["processVersion"],
','.join(proc_desc["jobControlOptions"]),
','.join(proc_desc["outputTransmission"]),
int(proc_spec["immediateDeployment"]),
','.join([d["href"]
for d in proc_spec["executionUnit"]])])
cur.execute(
sql_str,
[
f"{proc_desc2['id']}:{proc_desc['processVersion']}",
proc_desc2["title"],
proc_desc2["abstract"],
",".join(proc_desc2["keywords"]),
proc_desc2["owsContext"]["offering"]["content"]["href"],
json.dumps(proc_desc2["inputs"]),
json.dumps(proc_desc2["outputs"]),
proc_desc["processVersion"],
",".join(proc_desc["jobControlOptions"]),
",".join(proc_desc["outputTransmission"]),
int(proc_spec["immediateDeployment"]),
",".join([d["href"] for d in proc_spec["executionUnit"]]),
],
)
conn.commit()
return sqlite_get_proc(f"{proc_desc2['id']}:{proc_desc['processVersion']}")


@sqlite_db
def sqlite_undeploy_proc(proc_id):
proc_desc = sqlite_get_proc(proc_id)
conn = create_connection(db_name)
cur = conn.cursor()
sql_str = """DELETE FROM processes
WHERE id = \"{}\"""".format(proc_id)
WHERE id = \"{}\"""".format(
proc_id
)
cur.execute(sql_str)
conn.commit()
return proc_desc


def sqlite_get_headers(cur, tname):
sql_str = "SELECT name FROM PRAGMA_TABLE_INFO(\"{}\");".format(tname)
sql_str = 'SELECT name FROM PRAGMA_TABLE_INFO("{}");'.format(tname)
cur.execute(sql_str)
col_headers = [t[0] for t in cur.fetchall()]
return col_headers


@sqlite_db
def sqlite_get_jobs(proc_id=None):
conn = create_connection(db_name)
Expand All @@ -145,12 +165,15 @@ def sqlite_get_jobs(proc_id=None):
job_dicts = [dict(zip(col_headers, job)) for job in job_list]
return job_dicts


@sqlite_db
def sqlite_get_job(job_id):
conn = create_connection(db_name)
cur = conn.cursor()
sql_str = """SELECT * FROM jobs
WHERE jobID = \"{}\"""".format(job_id)
WHERE jobID = \"{}\"""".format(
job_id
)
job = cur.execute(sql_str).fetchall()[0]
col_headers = sqlite_get_headers(cur, "jobs")
job_dict = {}
Expand All @@ -162,32 +185,42 @@ def sqlite_get_job(job_id):
job_dict[col] = job[i]
return job_dict


@sqlite_db
def sqlite_exec_job(proc_id, job_id, job_spec, backend_info):
conn = create_connection(db_name)
cur = conn.cursor()
cur.execute("""INSERT INTO jobs(jobID, procID, inputs, backend_info, status, timestamp)
VALUES(?, ?, ?, ?, ?, ?)""", [
job_id, proc_id, json.dumps(job_spec), json.dumps(backend_info),
"accepted", datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")])
cur.execute(
"""INSERT INTO jobs(jobID, procID, inputs, backend_info, status, timestamp)
VALUES(?, ?, ?, ?, ?, ?)""",
[
job_id,
proc_id,
json.dumps(job_spec),
json.dumps(backend_info),
"accepted",
datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"),
],
)
conn.commit()
return sqlite_get_job(job_id)


@sqlite_db
def sqlite_update_job_status(job_id, status):
conn = create_connection(db_name)
cur = conn.cursor()
sql_str = """UPDATE jobs
SET status = \"{}\",
timestamp = \"{}\"
WHERE jobID = \"{}\"""".\
format(status,
datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"),
job_id)
WHERE jobID = \"{}\"""".format(
status, datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"), job_id
)
cur.execute(sql_str)
conn.commit()
return sqlite_get_job(job_id)


@sqlite_db
def sqlite_dismiss_job(job_id):
return sqlite_update_job_status(job_id, "dismissed")
return sqlite_update_job_status(job_id, "dismissed")
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Flask==2.0.2
Werkzeug==2.2.2
requests==2.29.0
urllib3==1.26.15
pyyaml==5.4.1
Expand Down