Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
120 commits
Select commit Hold shift + click to select a range
c1fb4fc
Create trackqueries.q file with table schema
AlexCullenAquaQ Nov 2, 2022
9a4f356
Rename column headings
AlexCullenAquaQ Nov 2, 2022
442c4a2
Add success column to query logging table
AlexCullenAquaQ Nov 7, 2022
ac39c90
Add query logging functions and redefine .zs
AlexCullenAquaQ Nov 9, 2022
8471fb5
Change .z functions to fit with other handler scripts
AlexCullenAquaQ Nov 10, 2022
c746347
Added enable and ignore functions
acreehay Nov 11, 2022
ed6c9b8
Change runtime to a long in schema
AlexCullenAquaQ Nov 11, 2022
1694b68
fixed bug in logqueryerror function
acreehay Nov 11, 2022
37bd434
Merge remote-tracking branch 'refs/remotes/origin/torq_qlm' into torq…
acreehay Nov 11, 2022
18f36d7
fixed bug in logqueryerror function
acreehay Nov 11, 2022
cf6599b
Add save down functionality
AlexCullenAquaQ Nov 14, 2022
01d639a
Add timers to execute periodic save down
AlexCullenAquaQ Nov 15, 2022
669aeb1
disabled query tracking
acreehay Nov 15, 2022
19c4a28
added ignore user function and ignore user list
acreehay Nov 15, 2022
6184e54
Added function for ignoring users and/or queries
acreehay Nov 18, 2022
a32e4db
Replaced relative path to absolute in savedown function
acreehay Nov 18, 2022
155cce8
Add queryfeed process file
AlexCullenAquaQ Nov 29, 2022
ef38603
Add query process config files
AlexCullenAquaQ Nov 29, 2022
efb9e7e
Add qtp code directory
AlexCullenAquaQ Nov 29, 2022
162608d
Add usage log reload
AlexCullenAquaQ Dec 1, 2022
58f220c
Remove typo
AlexCullenAquaQ Dec 1, 2022
f7dab35
Add enabling functionality for log reload
AlexCullenAquaQ Dec 6, 2022
14fa853
Add log reload config variable
AlexCullenAquaQ Dec 6, 2022
e054bc5
Add queryrdb config file
AlexCullenAquaQ Dec 6, 2022
ccec51b
Add queryhdb config file
AlexCullenAquaQ Dec 6, 2022
1a3fe70
Add querysort csv
AlexCullenAquaQ Dec 6, 2022
c100c7d
Add querytrack enabling to logusage script
AlexCullenAquaQ Dec 6, 2022
f8e2c59
Addition of Query Gateway
David-Mc-Nally Dec 7, 2022
3c66894
Modified connections made by queryfeed
acreehay Dec 9, 2022
28503ae
Added config csv to enable proc query-tracking
acreehay Dec 9, 2022
d98f3ac
Config listing procs for queryfeed to make connections
acreehay Dec 9, 2022
d7e8c07
Query tracking enabled and usage published to queryfeed
acreehay Dec 9, 2022
0acca9f
Add Query Logging Management documentation
AlexCullenAquaQ Dec 9, 2022
4161e8d
Move doc files to correct directory
AlexCullenAquaQ Dec 9, 2022
ca9c5a7
added password files for new procs
acreehay Dec 12, 2022
59a1bdc
Merge branch 'torq_qlm' of https://github.com/AquaQAnalytics/TorQ int…
acreehay Dec 12, 2022
aca69cd
Change process type qtp to querytp
AlexCullenAquaQ Dec 13, 2022
1661c80
Change runtime column to timespan
AlexCullenAquaQ Dec 16, 2022
b22278c
Add column breakdown to docs
AlexCullenAquaQ Dec 21, 2022
b335f51
Add setup information to docs
AlexCullenAquaQ Dec 22, 2022
fcb92c1
Add query gateway functions script
AlexCullenAquaQ Dec 22, 2022
bd3775e
Typo fix in Query gateway functions script
AlexCullenAquaQ Dec 22, 2022
762e024
Add query gateway functions in functional form
AlexCullenAquaQ Dec 23, 2022
3dd3908
Add missing proc name change
mchbrn-q Jan 20, 2023
e65c2f5
Add GetDateRange fn
mchbrn-q Jan 20, 2023
04d78a8
Fix to handle .z args
mchbrn-q Jan 24, 2023
d20f026
Merge query count fns from torq_qlm_test
mchbrn-q Jan 26, 2023
1ec4d1b
Delete config/filealerterprocessed/ directory
mchbrn-q Jan 26, 2023
d464604
Add total query count fn
mchbrn-q Jan 26, 2023
9c94173
Merge branch 'torq_qlm' of github.com:AquaQAnalytics/TorQ into torq_qlm
mchbrn-q Jan 26, 2023
965c28d
Ignore config/filealerterprocessed/
mchbrn-q Jan 26, 2023
4eab35e
Add peak usage fn
mchbrn-q Jan 26, 2023
bc8a9a8
Rename peak usage col names
mchbrn-q Jan 26, 2023
b12d1ab
Fix typo
mchbrn-q Jan 26, 2023
b75991d
Fix peak usage query
mchbrn-q Jan 26, 2023
651d1d4
Add peak usage function
mchbrn-q Jan 27, 2023
c076962
Fix formatting
mchbrn-q Jan 27, 2023
60abde6
Fix type error in update stmt
mchbrn-q Jan 27, 2023
f9ef13b
Add longest running query fn
mchbrn-q Jan 30, 2023
e41b5d3
Add longest running heatmap fn
mchbrn-q Feb 1, 2023
8ba256a
Remove assignment
mchbrn-q Feb 1, 2023
00092ab
Update select stmt with Hamza's fix
mchbrn-q Feb 3, 2023
be45878
Remove old cmd parser
mchbrn-q Feb 3, 2023
63742c0
Remove redundant comment
mchbrn-q Feb 3, 2023
e539a53
Remove redundant selects
mchbrn-q Feb 3, 2023
bcd28e0
Add column names
mchbrn-q Feb 3, 2023
779244e
Add time to longest running query fn
mchbrn-q Feb 3, 2023
6fdbcbe
Add ignore clients list
mchbrn-q Feb 6, 2023
cf4d2f4
Add GetClients fn
mchbrn-q Feb 6, 2023
8f255d0
Add dynamic client tracking
mchbrn-q Feb 6, 2023
2578530
Fix typo
mchbrn-q Feb 6, 2023
ba1b2e3
Update name and value of ignoreclients
mchbrn-q Feb 6, 2023
c790f20
Add gateway to querygateway's connections
mchbrn-q Feb 6, 2023
0cd6486
Update GetUsers fn to use normal gateway
mchbrn-q Feb 6, 2023
6d4b84e
Add querygateway to ignoreusers
mchbrn-q Feb 6, 2023
32b1f13
Update sync queries to deferred sync
mchbrn-q Feb 7, 2023
b16a4cf
Fix deferred sync queries
mchbrn-q Feb 7, 2023
44ead6c
removed redundant config file
acreehay Feb 8, 2023
fa356c6
Add date getter for grafana
mchbrn-q Feb 8, 2023
2cf385f
Remove redundant fn
mchbrn-q Feb 8, 2023
f9195f2
Add QueryErrorPercentage fn
Feb 8, 2023
34d1060
Merge branch 'torq_qlm' of github.com:AquaQAnalytics/TorQ into torq_qlm
Feb 8, 2023
a0a7cff
added new hdbname to queryrdb config for hdb reload
acreehay Feb 9, 2023
79cd3a0
Add LongQuery and NumberOfUsers fn
Feb 9, 2023
ca18228
Merge branch 'torq_qlm' of github.com:AquaQAnalytics/TorQ into torq_qlm
Feb 9, 2023
5889106
Fix lexical scope issue
mchbrn-q Feb 10, 2023
2f94429
Add QueryErrorPercentageHistorical fn
Feb 13, 2023
ef15caa
added queryhdb reload function
acreehay Feb 14, 2023
64aecc6
Add Historical fns
Feb 14, 2023
2df2ea5
Filter by process
Feb 15, 2023
ef40422
extend users to ignore for query analytics
acreehay Feb 16, 2023
ae33013
modified method of finding users across multiple procs
acreehay Feb 16, 2023
d565476
Functions call getUsersRDB/HDB
Feb 16, 2023
0bbbd7d
update functions for grafana
Feb 22, 2023
6285ef6
Dashboard improvements
Feb 24, 2023
c1e2072
Update PeakUsage fn to handle null data
mchbrn-q Mar 1, 2023
4c41629
Clean up CmdParse
mchbrn-q Mar 2, 2023
59d4f4b
Remove duplicates by restricting analytics to completed queries
mchbrn-q Mar 2, 2023
18b5c05
Fix typo
mchbrn-q Mar 2, 2023
b8b707a
Fix typo
mchbrn-q Mar 2, 2023
9f20891
Overhaul proc picker functionality
mchbrn-q Mar 3, 2023
eff5575
Expand ParseCmd
mchbrn-q Mar 3, 2023
11da840
Begin extending proc picker logic to other fns
mchbrn-q Mar 3, 2023
9f0763a
Fix typos
mchbrn-q Mar 3, 2023
7af4c0d
Complete proc picker implementation
mchbrn-q Mar 7, 2023
4ed74e6
Fix wrong paramaters
mchbrn-q Mar 7, 2023
0e92e7c
Add GetHandle fn
mchbrn-q Mar 7, 2023
2867e4c
Error trap CmdParse
mchbrn-q Mar 7, 2023
afafe92
Fix GetUsers functions
mchbrn-q Mar 7, 2023
788c189
Add discovery and stp to .usage.ignoreusers
mchbrn-q Mar 7, 2023
03f1efa
Fix get user fn
mchbrn-q Mar 7, 2023
c20bf0b
Remove old GetUsers fn
mchbrn-q Mar 7, 2023
a536b6e
Fix error trap
mchbrn-q Mar 7, 2023
fb141a7
Start adding user picker logic for realtime fns
mchbrn-q Mar 7, 2023
d8618fb
Add data normalisation to queryfeed
mchbrn-q Mar 22, 2023
ad3283c
remove querygatewayfuncs.q
May 24, 2023
1b4a2a0
Updating QueryManagement.md with PR comment changes
mattdeshAQ May 25, 2023
4a87126
update logusage.q with PR changes
May 30, 2023
dce824d
changing querytrack from csv to txt file type
mattdeshAQ May 30, 2023
7cf8b28
Merge branch 'torq_qlm' of github.com:AquaQAnalytics/TorQ into torq_qlm
mattdeshAQ May 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
.DS_Store
config/filealerterprocessed/
10 changes: 10 additions & 0 deletions code/handlers/logusage.q
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,18 @@ logtodisk:@[value;`logtodisk;1b] // whether to log to disk or not
logtomemory:@[value;`logtomemory;1b] // write query logs to memory
ignore:@[value;`ignore;1b] // check the ignore list for functions to ignore
ignorelist:@[value;`ignorelist;(`upd;"upd")] // the list of functions to ignore
allowedusers:@[value;`allowedusers;()]
ignoreusers:@[value;`ignoreusers;()] // clients to ignore for query logging
flushinterval:@[value;`flushinterval;0D00:30:00] // default value for how often to flush the in-memory logs
flushtime:@[value;`flushtime;0D03] // default value for how long to persist the in-memory logs
suppressalias:@[value;`suppressalias;0b] // whether to suppress the log file alias creation
logtimestamp:@[value;`logtimestamp;{[x] {[].proc.cd[]}}] // function to generate the log file timestamp suffix
logroll:@[value;`logroll;1b] // whether to automatically roll the log file
LEVEL:@[value;`LEVEL;3] // Log level
querytrack:@[value;`querytrack;0b] // whether query tracking is enabled by default

// enable query tracking for proc if procname included in csv config file
querytrack:$[.proc.procname in "S"$read0 hsym `$(getenv `KDBCONFIG),"/querytrack.txt";1b;0b]

id:@[value;`id;0j]
nextid:{:id+::1}
Expand All @@ -38,8 +44,12 @@ logh:@[value;`logh;0]
write:{
if[logtodisk;@[neg logh;format x;()]];
if[logtomemory; `.usage.usage upsert x];
if[querytrack; .ps.publish[`.usage.usage;x]];
ext[x]}

// custom sub function to add usage table to subscribable tables if query tracking enabled
querysub:{if[querytrack; .stpps.t:`.usage.usage,.stpps.t]; .u.sub[x;y]}

// extension function to extend the logging e.g. publish the log message
ext:{[x]}

Expand Down
1 change: 1 addition & 0 deletions code/handlers/order.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ ldap.q
writeaccess.q
controlaccess.q
permissions.q
trackqueries.q
trackclients.q
logusage.q
apidetails.q
Expand Down
49 changes: 49 additions & 0 deletions code/processes/queryfeed.q
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

everything in this script is defined in root namespace, usually in TorQ we put most code in namespaces & keep root namespace reasonably clean

Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// queryfeed proc script - subs to .usage.usage tables and publishes to query tickerplant

// add connections to all procs for query tracking to be enabled
.servers.CONNECTIONS:.servers.CONNECTIONS,exec distinct proctype from (" SS ";enlist csv) 0: hsym `$getenv `TORQPROCESSES where procname in subprocs;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think subprocs should have a default value in this script, similar to other TorQ scripts (I know it is defined in config, but typically we also have a default to fall back on in the main script itself)


us:@[value;`us;([]querytime:`timestamp$();id:`long$();runtime:`timespan$();zcmd:`symbol$();proctype:`symbol$();procname:`symbol$;status:`char$();ip:`int$();user:`symbol$();handle:`int$();cmd:();mem:();sz:`long$();error:())];

upd:{[t;x] x[2]:`timespan$x[2]; if [t in `.usage.usage; `us insert x]};

.servers.startup[];
start_sub:{[subprocs]
hds:(),exec w from .servers.SERVERS where procname in subprocs;
{
.lg.o[`startsub;"subscribing to ", string first exec procname from .servers.SERVERS where w=x];
x(`.usage.querysub;`.usage.usage;`);
.lg.o[`completesub;"subscribed"];
}each hds;
};

start_sub[subprocs];

readlog:{[file]

// Remove leading backtick from symbol columns, convert a and w columns back to integers
update zcmd:`$1 _' string zcmd, procname:`$1 _' string procname, proctype:`$1 _' string proctype, u:`$1 _' string u,
a:"I"$-1 _' a, w:"I"$-1 _' w from
// Read in file
@[{update "J"$'" " vs' mem from flip (cols `us)!("PJJSSSC*S***JS";"|")0:x};hsym`$file;{'"failed to read log file : ",x}]};

queryfeed:{
// normalise cmd data for gateway users
usnorm:update cmd:-2#'";" vs' cmd from us where user=`gateway;
usnorm:update cmd:first each cmd from usnorm where (first each cmd)~'(last each cmd);

h(".u.upd";`usage;value flip select from usnorm);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we stick to the same coding convention everywhere?

h("functionname";args)
or
h(`functionname;args)

us::0#us;
};

flushreload:{
.lg.o[`flushreload1;"fr1"];
procnames:exec distinct procname from .servers.SERVERS where proctype in subprocs;
{h(".u.upd";`usage;value flip update timer:`timespan$1000*timer from readlog[raze string (getenv `KDBLOG),"/usage_",(raze x),"_",.z.d,".log"])} each string each procnames;
};

.servers.startupdepcycles[`querytp;10;0W];
h:.servers.gethandlebytype[`querytp;`any];

if[reloadenabled;.timer.once[.proc.cp[]+0D00:00:10.000;(`flushreload;`);"Flush reload"]];
.timer.repeat[.proc.cp[];0Wp;0D00:00:00.200;(`queryfeed;`);"Publish Query Feed"];
640 changes: 640 additions & 0 deletions code/processes/querygateway.q

Large diffs are not rendered by default.

42 changes: 42 additions & 0 deletions code/querygateway/daqrest.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
\d .dataaccess

// .gw.formatresponse:{[status;sync;result]$[not[status]and sync;'result;result]}};

//Gets the json and converts to input dict before executing .dataaccess.getdata on the input
qrest:{
// Set the response type
.gw.formatresponse:{[status;sync;result] $[sync and not status; 'result; `status`result!(status;result)]};
// Run the function
:getdata jsontodict x};
// Converts json payload to .dataaaccess input dictionary
jsontodict:{
// convert the input to a dictionary
dict:.j.k x;
k:key dict;
// Change the Type of `tabname`instruments`grouping to chars
dict:@[dict;`tablename`instruments`grouping`columns inter k;{`$x}];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be refactored

q)`a`b!("Hello";"World")
a| "Hello"
b| "World"
q)d:`a`b!("Hello";"World")
q)@[d;`a`b;`$]
a| Hello
b| World
q)@[d;`a;`$]
a| `Hello
b| "World"
Suggested change
dict:@[dict;`tablename`instruments`grouping`columns inter k;{`$x}];
dict:@[dict;`tablename`instruments`grouping`columns inter k;`$];

// Change the Type of `start/end time to timestamps (altering T -> D and - -> . if applicable)
dict:@[dict;`starttime`endtime inter k;{x:ssr[x;"T";"D"];x:ssr[x;"-";"."];value x}];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can potentially use the following parsing (but please check with a concrete example from what you get from the Jason file)

q)"P"$"2023-05-30T12:25:26.633"
2023.05.30D12:25:26.633000000

// retrieve aggregations
if[`aggregations in k;dict[`aggregations]:value dict[`aggregations]];
// Convert timebar
if[`timebar in k;dict[`timebar]:@[value dict[`timebar];1+til 2;{:`$x}]];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be refactored

Suggested change
if[`timebar in k;dict[`timebar]:@[value dict[`timebar];1+til 2;{:`$x}]];
if[`timebar in k;dict[`timebar]:@[value dict[`timebar];1+til 2;`$]];

// Convert the filters key
if [`filters in k;dict[`filters]:filterskey dict`filters];
//output
:dict};

quotefinder:{y[2#where y>x]}

filterskey:{[filtersstrings]
likelist:ss[filtersstrings;"like"];
if[0=count likelist;value filtersstrings];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what we are trying to achieve here. Can you please elaborate

// Get the location of all the backticks
apostlist:ss[filtersstrings;"'"];
// Get the location of all the likes
swaplist:raze {y[2#where y>x]}[;apostlist] each likelist;
// Swap the ' to "
filtersstrings:@[filtersstrings;swaplist;:;"\""];
// Convert the string to a dict
:value filtersstrings
};
211 changes: 211 additions & 0 deletions code/querygateway/dataaccess.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
\d .dataaccess

forceservers:0b;

// dictionary containing aggregate functions needed to calculate map-reducable
// values over multiple processes
aggadjust:(!). flip(
(`avg; {flip(`sum`count;2#x)});
(`cor; {flip(`wsum`count`sum`sum`sumsq`sumsq;@[x;(enlist(0;1);0;0;1;0;1)])});
(`count; `);
(`cov; {flip(`wsum`count`sum`sum;@[x;(enlist(0;1);0;0;1)])});
(`dev; {flip(`sumsq`count`sum;3#x)});
(`distinct;`);
(`first; `);
(`last; `);
(`max; `);
(`min; `);
(`prd; `);
(`sum; `);
(`var; {flip(`sumsq`count`sum;3#x)});
(`wavg; {flip(`wsum`sum;(enlist(x 0;x 1);x 0))});
(`wsum; {enlist(`wsum;enlist(x 0;x 1))}));

// function to make symbols strings with an upper case first letter
camel:{$[11h~type x;@[;0;upper]each string x;@[string x;0;upper]]};
// function that creates aggregation where X(X1,X2)=X(X(X1),X(X2)) where X is
// the aggregation and X1 and X2 are non overlapping subsets of a list
absagg:{enlist[`$x,y]!enlist(value x;`$x,y)};

// functions to calculate avg, cov and var in mapaggregate dictionary
avgf:{(%;(sum;`$"sum",x);scx y)};
covf:{(-;(%;swsum[x;y];scx x);(*;avgf[x;x];avgf[y;x]))};
varf:{(-;(%;(sum;`$"sumsq",y);scx x);(xexp;avgf[y;x];2))};
// functions to sum counts and wsums in mapaggregate dictioanry
scx:{(sum;`$"count",x)};
swsum:{(sum;`$"wsum",x,y)}

// dictionary containing the functions needed to aggregate results together for
// map reducable aggregations
mapaggregate:(!). flip(
(`avg; {enlist[`$"avg",x]!enlist(%;(sum;`$"sum",x);scx x)});
(`cor; {enlist[`$"cor",x,w]!enlist(%;covf[x;w];(*;(sqrt;varf[x;x]);(sqrt;varf[(x:x 0);w:x 1])))});
(`count; {enlist[`$"count",x]!enlist scx x});
(`cov; {enlist[`$"cov",x,w]!enlist covf[x:x 0;w:x 1]});
(`dev; {enlist[`$"dev",x]!enlist(sqrt;varf[x;x])});
(`first; {enlist[`$"first",x]!enlist(*:;`$"first",x)});
(`last; {absagg["last";x]});
(`max; {absagg["max";x]});
(`min; {absagg["min";x]});
(`prd; {absagg["prd";x]});
(`sum; {absagg["sum";x]});
(`var; {enlist[`$"var",x]!enlist varf[x;x]});
(`wavg; {enlist[`$"wavg",x,w]!enlist(%;swsum[x:x 0;w:x 1];(sum;`$"sum",x))});
(`wsum; {enlist[`$"wsum",x,w]!enlist swsum[x:x 0;w:x 1]}));

// function to convert sorting
go:{if[`asc=x[0];:(xasc;x[1])];:(xdesc;x[1])};

// Full generality dataaccess function in the gateway
getdata:{[o]
// Input checking in the gateway
reqno:.requests.initlogger[o];
o:@[.checkinputs.checkinputs;o;.requests.error[reqno]];
// Get the Procs
if[not `procs in key o;o[`procs]:attributesrouting[o;partdict[o]]];
// Get Default process behavior
default:`timeout`postback`sublist`getquery`queryoptimisation`postprocessing!(0Wn;();0W;0b;1b;{:x;});
// Use upserting logic to determine behaviour
options:default,o;
if[`ordering in key o;options[`ordering]: go each options`ordering];
o:adjustqueries[o;partdict o];
options[`mapreduce]:0b;
gr:$[`grouping in key options;options`grouping;`];
if[`aggregations in key options;
if[all key[options`aggregations]in key aggadjust;
options[`mapreduce]:not`date in gr]];
// Execute the queries
if[options`getquery;
$[.gw.call .z.w;
:.gw.syncexec[(`.dataaccess.buildquery;o);options[`procs]];
:.gw.asyncexec[(`.dataaccess.buildquery;o);options[`procs]]]];
:$[.gw.call .z.w;
// if sync
.gw.syncexecjt[(`getdata;o);options[`procs];autojoin[options];options[`timeout]];
// if async
.gw.asyncexecjpt[(`getdata;o);options[`procs];autojoin[options];options[`postback];options[`timeout]]];
};


// join results together if from multiple processes
autojoin:{[options]
// if there is only one proc queried output the table
if[1=count options`procs;:first];
// if there is no need for map reducable adjustment, return razed results
:$[not options`mapreduce;razeresults[options;];mapreduceres[options;]];
};

// raze results and call process res to apply postprocessing and sublist
razeresults:{[options;res]
res:raze res;
processres[options;res]
};

//apply sublist and post processing to joined results
processres:{[options;res]
res:(options`postprocessing)res;
:$[(options`sublist)<>0W;(options`sublist) sublist res;res];
};

// function to correctly reduce two tables to one
mapreduceres:{[options;res]
// raze the result sets together
res:$[all 99h=type each res;
(){x,0!y}/res;
(),/res];
aggs:options`aggregations;
aggs:flip(key[aggs]where count each value aggs;raze aggs);
// distinct may be present as only agg, so apply distinct again
if[all`distinct=first each aggs;:?[res;();1b;()]];
// collecting the appropriate grouping argument for map-reduce aggs
gr:$[all`grouping`timebar in key options;
a!a:options[`timebar;2],options`grouping;
`grouping in key options;
a!a:(),options`grouping;
`timebar in key options;
a!a:(),options[`timebar;2];
0b];
// select aggs by gr from res
res:?[res;();gr;raze{mapaggregate[x 0;camel x 1]}'[aggs]];
//apply sublist and postprocesing to map reduced results
processres[options;res]
};


// Dynamic routing finds all processes with relevant data
attributesrouting:{[options;procdict]
// Get the tablename and timespan
timespan:`date$options[`starttime`endtime];
// See if any of the provided partitions are with the requested ones
procdict:{[x;timespan] (all x within timespan) or any timespan within x}[;timespan] each procdict;
// Only return appropriate dates
types:(key procdict) where value procdict;
// If the dates are out of scope of processes then error
if[0=count types;
'`$"gateway error - no info found for that table name and time range. Either table does not exist; attributes are incorect in .gw.servers on gateway, or the date range is outside the ones present"
];
:types;
};

// Generates a dictionary of `tablename!mindate;maxdate
partdict:{[input]
tabname:input[`tablename];
// Remove duplicate servertypes from the gw.servers
servers:select from .gw.servers where i=(first;i)fby servertype;
// extract the procs which have the table defined
servers:select from servers where {[x;tabname]tabname in @[x;`tables]}[;tabname] each attributes;
// Create a dictionary of the attributes against servertypes
procdict:(exec servertype from servers)!(exec attributes from servers)@'(key each exec attributes from servers)[;0];
// If the response is a dictionary index into the tablename
procdict:@[procdict;key procdict;{[x;tabname]if[99h=type x;:x[tabname]];:x}[;tabname]];
// returns the dictionary as min date/ max date
procdict:asc @[procdict;key procdict;{:(min x; max x)}];
// prevents overlap if more than one process contains a specified date
if[1<count procdict;
procdict:{:$[y~`date$();x;$[within[x 0;(min y;max y)];(1+max[y];x 1);x]]}':[procdict]];
:procdict;
};

// function to adjust the queries being sent to processes to prevent overlap of
// time clause and data being queried on more than one process
adjustqueries:{[options;part]
// if only one process then no need to adjust
if[2>count p:options`procs;:options];
// get the date casting where relevant
st:$[a:-14h~tp:type start:options`starttime;start;`date$start];
et:$[a;options`endtime;`date$options`endtime];
// get the dates that are required by each process
dates:group key[part]where each{within[y;]each value x}[part]'[l:st+til 1+et-st];
dates:l{(min x;max x)}'[dates];
// if start/end time not a date, then adjust dates parameter for the
// correct types
if[not a;
// converts dates dictionary to timestamps/datetimes
dates:$[-15h~tp;{"z"$x};::]{(0D+x 0;x[1]+1D-1)}'[dates];
// convert first and last timestamp to start and end time
dates:@[dates;f;:;(start;dates[f:first key dates;1])];
dates:@[dates;l;:;(dates[l:last key dates;0];options`endtime)]];
// adjust map reducable aggregations to get correct components
if[(1<count dates)&`aggregations in key options;
if[all key[o:options`aggregations]in key aggadjust;
aggs:mapreduce[o;$[`grouping in key options;options`grouping;`]];
options:@[options;`aggregations;:;aggs]]];
// create a dictionary of procs and different queries
:{@[@[x;`starttime;:;y 0];`endtime;:;y 1]}[options]'[dates];
};

// function to grab the correct aggregations needed for aggregating over
// multiple processes
mapreduce:{[aggs;gr]
// if there is a date grouping any aggregation is allowed
if[`date in gr;:aggs];
// format aggregations into a paired list
aggs:flip(key[aggs]where count each value aggs;raze aggs);
// if aggregations are not map-reducable and there is no date grouping,
// then error
if[not all aggs[;0]in key aggadjust;
'`$"to perform non-map reducable aggregations automatically over multiple processes there must be a date grouping"];
// aggregations are map reducable (with potential non-date groupings)
aggs:distinct raze{$[`~a:.dataaccess.aggadjust x 0;enlist x;a x 1]}'[aggs];
:first'[aggs]!last'[aggs];
};
Loading