Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
288 changes: 288 additions & 0 deletions gqstat
Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
#!/g/data3/hh5/public/apps/nci_scripts/python-analysis3

import os
import grp
import argparse
import json

from qtools import job_data_from_pbsnodes, nqstat, format_size
from typing import List, Dict, Any
from datetime import timedelta

def truncate(s: str, l: int) -> str:
return f"{s[:l-1]+'*' if len(s)>l else s}"

def str_to_timedelta_seconds(s: str) -> float:
t=[ int(i) for i in s.split(':') ]
return timedelta(hours=t[0],minutes=t[1],seconds=t[2]).total_seconds()

def get_all_jobs(groups: List[str],get_all=False) -> Dict[str,Dict[str,Any]]:
"""
Grab every shred of information we can
"""
### Get all the info we can out of nqstat
all_jobs = {}
for group in groups:
all_jobs.update(nqstat(group))

### Fall back to pbsnodes -a for everything else if requested
if get_all:
all_jobs.update( {k:v for k,v in job_data_from_pbsnodes().items() if k not in all_jobs } )

return all_jobs

description = """
Prints job details for some or all projects you're a member of

Returns the following columns for each job in table and wide_table modes:

Job ID: PBS Jobid
Username: NCI username of job owner
Proj: NCI project the job was submitted under
queue: Queue the job was submitted to
Jobname: Name of the job
cpu%: Percent of time CPUs have been active
mem%: Percent of the memory request used
TSK: Number of cpus requested
Req'd Memory: Memory requested
Req'd Time: Walltime requested
state: Current state - 'Q' in queue, 'R' running, 'H' held, 'E' finished
Elap Time: Walltime elapsed

This is designed to emulate PBS 'qstat -s' output, though with some more useful
details.

If 'mem%' is below 80% make sure you're not requesting too much memory (4GB
per CPU or less is fine)

If 'cpu%' is below 80% and you're requesting more than one CPU make sure
your job is making proper use of parallelisation

The cpu% column and mem% column are only visible in table mode on sufficiently
wide terminals ( >92 columns ). These columns are always visible in wide_table
formatting mode.

The --all option gathers as much information as is available to general users,
it cannot show queued jobs, nor can it show username, project or request
details for jobs belonging to projects that you are not a member of.
"""

def format_table(jobs: Dict[str,Dict[str,Any]],print_comment: bool = False,wide: bool=False) -> None:

try:
wide_mode=wide | ( os.get_terminal_size().columns > 92 )
except OSError:
wide_mode=wide

d=create_out_dict(jobs,print_comment)

if wide_mode:
header='''
Req'd Req'd Elap
Job ID Username Proj Queue Jobname cpu% mem% TSK Memory Time S Time
------------------ -------- ---- -------- ---------- ------ ------ ---- ------ ----- - ----'''
else:
header='''
Req'd Req'd Elap
Job ID Username Proj Queue Jobname TSK Memory Time S Time
------------------ -------- ---- -------- ---------- ---- ------ ----- - ----'''
print(header)
for jobid,job in sorted(d.items()):
j = truncate(jobid,18)
if job['user']:
user=job['user']
else:
user='???'

if job['project']:
proj=job['project']
else:
proj='???'

queue=truncate(job['queue'],8)
try:
jobname=truncate(job['name'],10)
except TypeError:
jobname='???'

try:
tasks = job['ncpus']
except KeyError:
tasks = 0

try:
mem_req = format_size(job['mem_request'],suffix='',dp=0)
except TypeError:
mem_req = '???'

try:
timereq = f"{int(job['walltime_request']//3600):02d}:{int((job['walltime_request']//60)%60):02d}"
except TypeError:
timereq = '???'

try:
timeelapsed = f"{int(job['walltime']//3600):02d}:{int((job['walltime']//60)%60):02d}"
except TypeError:
if job['state'] == 'R':
timeelapsed = '???'
else:
timeelapsed = '-'

if wide_mode:
try:
cpupc=f"{100*job['cpu_pct']:.2f}"
except TypeError:
if job['state'] == 'R':
cpupc='???'
else:
cpupc='-'
try:
mempc=f"{100*job['mem_pct']:.2f}"
except TypeError:
if job['state'] == 'R':
mempc="???"
else:
mempc='-'

print(f"{j:<19}{user:<9}{proj:<5}{queue:<9}{jobname:<11}{cpupc:>6} {mempc:>6} {tasks:4} {mem_req:>6} {timereq:5} {job['state']} {timeelapsed:5}")
else:
print(f"{j:<19}{user:<9}{proj:<5}{queue:<9}{jobname:<11}{tasks:4} {mem_req:>6} {timereq:5} {job['state']} {timeelapsed:5}")

if print_comment:
if 'comment' in job:
print(" " + job['comment'])
else:
print(" -")

def format_wide_table(out_jobs: Dict[str,Dict[str,Any]],comment: bool) -> None:
format_table(out_jobs,comment,wide=True)

def create_out_dict(out_jobs: Dict[str,Dict[str,Any]],comment: bool = False) -> Dict[str,Dict[str,Dict]]:
d={}

key_translate={'user':'Job_Owner',
'project':'project',
'name':'Job_Name',
'queue':'queue',
'ncpus':'Resource_List.ncpus',
'mem_request':'Resource_List.mem',
'mem_used':'resources_used.mem',
'walltime':'resources_used.walltime',
'walltime_request':'Resource_List.walltime',
'state':'job_state'
}

for jobid,job in sorted(out_jobs.items()):
d[jobid] = {}
for k,v in key_translate.items():
try:
d[jobid][k] = job[v]
except KeyError:
d[jobid][k] = None

### Fix the items that need fixing, and derive
### cpu% and mem%
try:
d[jobid]['user']=d[jobid]['user'].split('@')[0]
except (KeyError,AttributeError):
pass
try:
d[jobid]['ncpus']=int(d[jobid]['ncpus'])
except (KeyError,TypeError):
pass
try:
d[jobid]['walltime']=str_to_timedelta_seconds(d[jobid]['walltime'])
except (KeyError,AttributeError):
pass
try:
d[jobid]['walltime_request']=str_to_timedelta_seconds(d[jobid]['walltime_request'])
except (KeyError,AttributeError):
pass
try:
d[jobid]['mem_pct']=job['resources_used.mem'] / job['Resource_List.mem']
except KeyError:
d[jobid]['mem_pct']=None
try:
d[jobid]['cpu_pct']=str_to_timedelta_seconds(job['resources_used.cput']) / str_to_timedelta_seconds(job['resources_used.walltime']) / int(job['Resource_List.ncpus'])
except KeyError:
d[jobid]['cpu_pct']=None
except ZeroDivisionError:
d[jobid]['cpu_pct']=0.0

if comment:
try:
d[jobid]['comment'] = out_jobs[jobid]['comment']
except KeyError:
d[job]['comment'] = None

return d

def format_json(out_jobs: Dict[str,Dict[str,Any]],comment: bool) -> None:
d = create_out_dict(out_jobs,comment)
print(json.dumps(d,indent=4))

def format_csv(out_jobs: Dict[str,Dict[str,Any]],comment: bool) -> None:
d = create_out_dict(out_jobs,comment)
print('jobid,'+','.join(list(d.values())[0]))
for jobid,job in sorted(d.items()):
print(f"{jobid},{','.join([str(i) if i else '' for i in job.values()])}")

def main():
"""
This version of qstat uses a few different sources of information to try and gather data
on an arbitrary jobid that may or may not belong to the user invoking this utility
"""
parser = argparse.ArgumentParser(description=description, formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('--format','-f', help='Output format', choices=['table','csv','json','wide_table'], default='table')
parser.add_argument('--project','-P', help='Show all jobs in a project')
parser.add_argument('--comment','-c', help='Show PBS queue comment', action='store_true')
parser.add_argument('--all','-a', help='Gather information on all PBS jobs, not just for projects you are a member of', action='store_true')
parser.add_argument('jobids',metavar='<jobid>',type=str,nargs='*',help="List of jobids")
args = parser.parse_args()

groups=[ grp.getgrgid(g).gr_name for g in os.getgroups() ]

out_jobs={}

if not args.jobids and not args.project:
out_jobs = get_all_jobs(groups,args.all)

if args.project:
if args.project not in groups:
exit("Cannot get jobs from projects you don't belong to")
### First of all, use the nqstat service, this is faster than qstat anyway
#for group in groups:
#nqstat(group)
out_jobs = nqstat(args.project)

if args.jobids:
jobids=[ j if j.endswith('.gadi-pbs') else j + '.gadi-pbs' for j in args.jobids ]
### First, try to grab the job data from nqstat
all_jobs = {}
for group in groups:
all_jobs.update(nqstat(group))
### Make as few API calls as possible, bail as soon as we have
### all the requested jobids
if all( [ j in all_jobs.keys() for j in jobids ] ):
break

if not all( [ j in all_jobs.keys() for j in jobids ] ):
### We're looking for a jobid that doesn't belong to one of our
### projects - grab the jobs from pbsnodes
pbsnodes_jobs = job_data_from_pbsnodes(jobids)

for jobid in jobids:
try:
out_jobs[jobid] = all_jobs[jobid]
continue
except KeyError:
pass
try:
out_jobs[jobid] = pbsnodes_jobs[jobid]
except KeyError:
print(f"Warning: {jobid} does not exist")

globals()[f"format_{args.format}"](out_jobs,args.comment)

if __name__ == "__main__":
main()
Loading