diff --git a/gqstat b/gqstat new file mode 100755 index 0000000..96ff356 --- /dev/null +++ b/gqstat @@ -0,0 +1,288 @@ +#!/g/data3/hh5/public/apps/nci_scripts/python-analysis3 + +import os +import grp +import argparse +import json + +from qtools import job_data_from_pbsnodes, nqstat, format_size +from typing import List, Dict, Any +from datetime import timedelta + +def truncate(s: str, l: int) -> str: + return f"{s[:l-1]+'*' if len(s)>l else s}" + +def str_to_timedelta_seconds(s: str) -> float: + t=[ int(i) for i in s.split(':') ] + return timedelta(hours=t[0],minutes=t[1],seconds=t[2]).total_seconds() + +def get_all_jobs(groups: List[str],get_all=False) -> Dict[str,Dict[str,Any]]: + """ + Grab every shred of information we can + """ + ### Get all the info we can out of nqstat + all_jobs = {} + for group in groups: + all_jobs.update(nqstat(group)) + + ### Fall back to pbsnodes -a for everything else if requested + if get_all: + all_jobs.update( {k:v for k,v in job_data_from_pbsnodes().items() if k not in all_jobs } ) + + return all_jobs + +description = """ +Prints job details for some or all projects you're a member of + +Returns the following columns for each job in table and wide_table modes: + + Job ID: PBS Jobid + Username: NCI username of job owner + Proj: NCI project the job was submitted under + queue: Queue the job was submitted to + Jobname: Name of the job + cpu%: Percent of time CPUs have been active + mem%: Percent of the memory request used + TSK: Number of cpus requested + Req'd Memory: Memory requested + Req'd Time: Walltime requested + state: Current state - 'Q' in queue, 'R' running, 'H' held, 'E' finished + Elap Time: Walltime elapsed + +This is designed to emulate PBS 'qstat -s' output, though with some more useful +details. + +If 'mem%' is below 80% make sure you're not requesting too much memory (4GB +per CPU or less is fine) + +If 'cpu%' is below 80% and you're requesting more than one CPU make sure +your job is making proper use of parallelisation + +The cpu% column and mem% column are only visible in table mode on sufficiently +wide terminals ( >92 columns ). These columns are always visible in wide_table +formatting mode. + +The --all option gathers as much information as is available to general users, +it cannot show queued jobs, nor can it show username, project or request +details for jobs belonging to projects that you are not a member of. +""" + +def format_table(jobs: Dict[str,Dict[str,Any]],print_comment: bool = False,wide: bool=False) -> None: + + try: + wide_mode=wide | ( os.get_terminal_size().columns > 92 ) + except OSError: + wide_mode=wide + + d=create_out_dict(jobs,print_comment) + + if wide_mode: + header=''' + Req'd Req'd Elap +Job ID Username Proj Queue Jobname cpu% mem% TSK Memory Time S Time +------------------ -------- ---- -------- ---------- ------ ------ ---- ------ ----- - ----''' + else: + header=''' + Req'd Req'd Elap +Job ID Username Proj Queue Jobname TSK Memory Time S Time +------------------ -------- ---- -------- ---------- ---- ------ ----- - ----''' + print(header) + for jobid,job in sorted(d.items()): + j = truncate(jobid,18) + if job['user']: + user=job['user'] + else: + user='???' + + if job['project']: + proj=job['project'] + else: + proj='???' + + queue=truncate(job['queue'],8) + try: + jobname=truncate(job['name'],10) + except TypeError: + jobname='???' + + try: + tasks = job['ncpus'] + except KeyError: + tasks = 0 + + try: + mem_req = format_size(job['mem_request'],suffix='',dp=0) + except TypeError: + mem_req = '???' + + try: + timereq = f"{int(job['walltime_request']//3600):02d}:{int((job['walltime_request']//60)%60):02d}" + except TypeError: + timereq = '???' + + try: + timeelapsed = f"{int(job['walltime']//3600):02d}:{int((job['walltime']//60)%60):02d}" + except TypeError: + if job['state'] == 'R': + timeelapsed = '???' + else: + timeelapsed = '-' + + if wide_mode: + try: + cpupc=f"{100*job['cpu_pct']:.2f}" + except TypeError: + if job['state'] == 'R': + cpupc='???' + else: + cpupc='-' + try: + mempc=f"{100*job['mem_pct']:.2f}" + except TypeError: + if job['state'] == 'R': + mempc="???" + else: + mempc='-' + + print(f"{j:<19}{user:<9}{proj:<5}{queue:<9}{jobname:<11}{cpupc:>6} {mempc:>6} {tasks:4} {mem_req:>6} {timereq:5} {job['state']} {timeelapsed:5}") + else: + print(f"{j:<19}{user:<9}{proj:<5}{queue:<9}{jobname:<11}{tasks:4} {mem_req:>6} {timereq:5} {job['state']} {timeelapsed:5}") + + if print_comment: + if 'comment' in job: + print(" " + job['comment']) + else: + print(" -") + +def format_wide_table(out_jobs: Dict[str,Dict[str,Any]],comment: bool) -> None: + format_table(out_jobs,comment,wide=True) + +def create_out_dict(out_jobs: Dict[str,Dict[str,Any]],comment: bool = False) -> Dict[str,Dict[str,Dict]]: + d={} + + key_translate={'user':'Job_Owner', + 'project':'project', + 'name':'Job_Name', + 'queue':'queue', + 'ncpus':'Resource_List.ncpus', + 'mem_request':'Resource_List.mem', + 'mem_used':'resources_used.mem', + 'walltime':'resources_used.walltime', + 'walltime_request':'Resource_List.walltime', + 'state':'job_state' + } + + for jobid,job in sorted(out_jobs.items()): + d[jobid] = {} + for k,v in key_translate.items(): + try: + d[jobid][k] = job[v] + except KeyError: + d[jobid][k] = None + + ### Fix the items that need fixing, and derive + ### cpu% and mem% + try: + d[jobid]['user']=d[jobid]['user'].split('@')[0] + except (KeyError,AttributeError): + pass + try: + d[jobid]['ncpus']=int(d[jobid]['ncpus']) + except (KeyError,TypeError): + pass + try: + d[jobid]['walltime']=str_to_timedelta_seconds(d[jobid]['walltime']) + except (KeyError,AttributeError): + pass + try: + d[jobid]['walltime_request']=str_to_timedelta_seconds(d[jobid]['walltime_request']) + except (KeyError,AttributeError): + pass + try: + d[jobid]['mem_pct']=job['resources_used.mem'] / job['Resource_List.mem'] + except KeyError: + d[jobid]['mem_pct']=None + try: + d[jobid]['cpu_pct']=str_to_timedelta_seconds(job['resources_used.cput']) / str_to_timedelta_seconds(job['resources_used.walltime']) / int(job['Resource_List.ncpus']) + except KeyError: + d[jobid]['cpu_pct']=None + except ZeroDivisionError: + d[jobid]['cpu_pct']=0.0 + + if comment: + try: + d[jobid]['comment'] = out_jobs[jobid]['comment'] + except KeyError: + d[job]['comment'] = None + + return d + +def format_json(out_jobs: Dict[str,Dict[str,Any]],comment: bool) -> None: + d = create_out_dict(out_jobs,comment) + print(json.dumps(d,indent=4)) + +def format_csv(out_jobs: Dict[str,Dict[str,Any]],comment: bool) -> None: + d = create_out_dict(out_jobs,comment) + print('jobid,'+','.join(list(d.values())[0])) + for jobid,job in sorted(d.items()): + print(f"{jobid},{','.join([str(i) if i else '' for i in job.values()])}") + +def main(): + """ + This version of qstat uses a few different sources of information to try and gather data + on an arbitrary jobid that may or may not belong to the user invoking this utility + """ + parser = argparse.ArgumentParser(description=description, formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument('--format','-f', help='Output format', choices=['table','csv','json','wide_table'], default='table') + parser.add_argument('--project','-P', help='Show all jobs in a project') + parser.add_argument('--comment','-c', help='Show PBS queue comment', action='store_true') + parser.add_argument('--all','-a', help='Gather information on all PBS jobs, not just for projects you are a member of', action='store_true') + parser.add_argument('jobids',metavar='',type=str,nargs='*',help="List of jobids") + args = parser.parse_args() + + groups=[ grp.getgrgid(g).gr_name for g in os.getgroups() ] + + out_jobs={} + + if not args.jobids and not args.project: + out_jobs = get_all_jobs(groups,args.all) + + if args.project: + if args.project not in groups: + exit("Cannot get jobs from projects you don't belong to") + ### First of all, use the nqstat service, this is faster than qstat anyway + #for group in groups: + #nqstat(group) + out_jobs = nqstat(args.project) + + if args.jobids: + jobids=[ j if j.endswith('.gadi-pbs') else j + '.gadi-pbs' for j in args.jobids ] + ### First, try to grab the job data from nqstat + all_jobs = {} + for group in groups: + all_jobs.update(nqstat(group)) + ### Make as few API calls as possible, bail as soon as we have + ### all the requested jobids + if all( [ j in all_jobs.keys() for j in jobids ] ): + break + + if not all( [ j in all_jobs.keys() for j in jobids ] ): + ### We're looking for a jobid that doesn't belong to one of our + ### projects - grab the jobs from pbsnodes + pbsnodes_jobs = job_data_from_pbsnodes(jobids) + + for jobid in jobids: + try: + out_jobs[jobid] = all_jobs[jobid] + continue + except KeyError: + pass + try: + out_jobs[jobid] = pbsnodes_jobs[jobid] + except KeyError: + print(f"Warning: {jobid} does not exist") + + globals()[f"format_{args.format}"](out_jobs,args.comment) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/qtools.py b/qtools.py index 43097be..2b7a611 100644 --- a/qtools.py +++ b/qtools.py @@ -15,17 +15,53 @@ # limitations under the License. -import typing as T -import pandas import re import subprocess import json +import sys +import pymunge +import requests +### Hopefully speed up initialisation +from pandas import isnull +from math import floor,pow,log + +from typing import List, Dict, Union + +def format_size(val: float,suffix: str="iB",base: float=1024.0,dp: int=2) -> str: + prefixes=( "", "K", "M", "G", "T", "P", "E", "Z", "Y") + multiplier=1.0 + if val < 0: + multiplier=-1.0 + val = -val + i=0 + if ( val < base ): + if suffix == 'iB': suffix='B' + else: + i = int(floor(log(val,base))) + p = pow(base,i) + s = round(val/p,dp) + return f"{multiplier*s:.{dp}f}{prefixes[i]}{suffix}" + +def node_to_queue(node: str) -> str: + + if node.startswith('gadi-cpu-clx-'): return 'normal-exec' + if node.startswith('gadi-dm-'): return 'copyq-exec' + if node.startswith('gadi-hmem-clx-'): return 'hugemem-exec' + if node.startswith('gadi-gpu-v100-'): return 'gpuvolta-exec' + if node.startswith('gadi-hmem-bdw-'): return 'hugemembw-exec' + if node.startswith('gadi-mmem-clx-'): return 'megamem-exec' + if node.startswith('gadi-mmem-bdw-'): return 'megamembw-exec' + if node.startswith('gadi-cpu-bdw-'): return 'normalbw-exec' + if node.startswith('gadi-cpu-skl'): return 'normalsl-exec' + if node.startswith('gadi-dgx-a100'): return 'dgxa100-exec' + if node.startswith('gadi-analysis-'): return 'analysis-exec' + return '???' def decode_bytes(s): """ Convert a formatted size to number of bytes """ - if pandas.isnull(s): + if isnull(s): return s scales = { @@ -63,7 +99,7 @@ def clean_qstat_json(stream): return json.loads(''.join(lines)) -def qstat(jobids: T.List[str], show_finished: bool=False): +def qstat(jobids: List[str], show_finished: bool=False): """ Returns the information from qstat """ @@ -83,4 +119,127 @@ def qstat(jobids: T.List[str], show_finished: bool=False): return jobs - +nqstat_size_keys=('resources_used.mem', 'resources_used.vmem','resources_used.jobfs', + 'Resource_List.mem', 'Resource_List.jobfs') + +def nqstat(project: str) -> Dict[str,Dict[str,Union[str,int]]]: + + url = 'http://gadi-pbs-01.gadi.nci.org.au:8812/qstat' + headers = { 'Authorization': f"MUNGE {pymunge.encode().decode(sys.getdefaultencoding())}" } + params = {'project': project} + + response = requests.get(url, params=params, headers=headers, timeout=120.0) + response.raise_for_status() + + if response.status_code == 200: + ### This function outputs a list - transform it so it looks like + ### qstat json output + d = {} + for job in response.json()['qstat']: + d[job['Job_ID']] = { k:v for k,v in job.items() if k not in [ 'Job_ID', 'Submit_arguments'] } + d[job['Job_ID']]['Submit_arguments'] = job['Submit_arguments'].replace('','').replace('',' ') + for k in nqstat_size_keys: + if k in d[job['Job_ID']]: + d[job['Job_ID']][k] = decode_bytes(d[job['Job_ID']][k]) + return d + else: + return {} + +def pbsnodes(json: bool = False) -> Dict[str,Dict[str,str]]: + + cmd=["pbsnodes","-a"] + if json: + cmd.extend(["-F","json"]) + + try: + p = subprocess.run(cmd,text=True,check=True,capture_output=True) + except subprocess.CalledProcessError: + exit("Unable to call pbsnodes") + + if json: + d = json.dumps(p.stdout["nodes"]) + else: + d={} + lines=[ l.strip() for l in p.stdout.split('\n') ] + ##startline=0 + ##endline=0 + innodeblock=False + for l in lines: + if '=' not in l: + if innodeblock: + ### Found the end of the nodeblock + ##endline=i+1 + innodeblock=False + else: + if l.startswith('gadi-'): + ##startline=i+1 + innodeblock=True + node=l + d[node]={} + continue + if innodeblock: + line=l.split(' ') + if line[0] == 'jobs': + d[node]['jobs'] = [ job.split('/')[0] for job in line[2:] ] + elif line[0].startswith('resources_available') or line[0].startswith("resources_assigned"): + dname,dkey = line[0].split('.') + try: + d[node][dname].update({dkey:' '.join(line[2:])}) + except KeyError: + d[node][dname] = { dkey:' '.join(line[2:]) } + else: + d[node][line[0]] = ' '.join(line[2:]) + + return d + +def job_data_from_pbsnodes(jobids: List[str] = []) -> Dict[str,Dict[str,Union[str,int]]]: + + nodes={} + for i,node in pbsnodes().items(): + if 'jobs' in node: + nodes[i] = [] + tmp = list(dict.fromkeys(node['jobs'])) + for j in tmp: + nodes[i].append( ( j, node['jobs'].count(j), decode_bytes(node['resources_assigned']['mem'])//len(tmp) ) ) + + ### Create dict objects that sort of match nqstat output + if jobids: + data=dict.fromkeys([ j if j.endswith('.gadi-pbs') else j + '.gadi-pbs' for j in jobids ]) + else: + data=dict.fromkeys([ j[0] for n in nodes.values() for j in n ]) + for d in data: + data[d] = {'Resource_List.ncpus':0,'resources_used.mem':0,'exec_host':''} + + for host,node in nodes.items(): + if jobids: + for jobid in data.keys(): + for i in node: + if i[0] == jobid: + data[jobid]['Resource_List.ncpus'] += i[1] + data[jobid]['resources_used.mem'] += i[2] + data[jobid]['queue'] = node_to_queue(host) + if data[jobid]['exec_host']: + data[jobid]['exec_host'] = data[jobid]['exec_host'] + '+' + host + else: + data[jobid]['exec_host'] = host + else: + for i in node: + data[i[0]]['Resource_List.ncpus'] += i[1] + data[i[0]]['resources_used.mem'] += i[2] + data[i[0]]['queue'] = node_to_queue(host) + data[i[0]]['job_state']='R' + ### Create an exec_host string that, when split on '+' creates + ### a list corresponding to the nodes, just like nqstat does + if data[i[0]]['exec_host']: + data[i[0]]['exec_host'] = data[i[0]]['exec_host'] + '+' + host + else: + data[i[0]]['exec_host'] = host + + if jobids: + ### Clear out dict keys for non-existent jobs + for k,v in data.copy().items(): + if v['Resource_List.ncpus'] == 0 and v['resources_used.mem'] == 0: + del(data[k]) + + return data +