Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions code/common/checkinputs.q
Original file line number Diff line number Diff line change
Expand Up @@ -226,3 +226,5 @@ checkpostback:{[dict;parameter]
checktimeout:{[dict;parameter]
checktype[-16h;dict;parameter];
:dict};

checkprocs:{[dict;parameter]:.checkinputs.checktype[-11 11h;dict;parameter];};
2 changes: 1 addition & 1 deletion code/dataaccess/dataaccessutils.q
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ readtableproperties:{[tablepropertiepath]
.lg.o[`readtableproperties;"loading table properties"];
table:`tablename`proctype xkey readcsv[tablepropertiepath;"ssssstsss"]; //read in table from file
alltable:?[table;enlist(in;`proctype;enlist`all`);0b;()]; //find any instance of the use "all" or blank for proctype
table:table,![alltable;();0b;(enlist`proctype)!enlist(enlist `hdb)],![alltable;();0b;(enlist`proctype)!enlist(enlist `rdb)]; //join rdb and hdb entries for any "all" or blank entries
table:table,raze{![x;();0b;(enlist`proctype)!enlist(enlist y)]}[alltable]each`rdb`hdb`idb; //join rdb, idb and hdb entries for any "all" or blank entries
table:![table;enlist(in;`proctype;enlist`all`);0b;`symbol$()]; //remove "all" or blank entries from table
table:?[table;$[.proc.proctype=`gateway;();enlist(=;`proctype;`.proc.proctype)];0b;()];
table:update .eodtime.datatimezone ^ datatimezone, .eodtime.rolltimeoffset ^ rolltimeoffset,.eodtime.rolltimezone^rolltimezone from table;
Expand Down
45 changes: 12 additions & 33 deletions code/gateway/dataaccess.q
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ partdict:{[input]
tabname:input[`tablename];
// Remove duplicate servertypes from the gw.servers
servers:select from .gw.servers where i=(first;i)fby servertype;
// Only target specified procs if defined
if[`procs in key input;servers:select from servers where servertype in ((),input`procs)];
// extract the procs which have the table defined
servers:select from servers where {[x;tabname]tabname in @[x;`tables]}[;tabname] each attributes;
// Create a dictionary of the attributes against servertypes
Expand All @@ -162,9 +164,6 @@ partdict:{[input]
procdict:@[procdict;key procdict;{[x;tabname]if[99h=type x;:x[tabname]];:x}[;tabname]];
// returns the dictionary as min date/ max date
procdict:asc @[procdict;key procdict;{:(min x; max x)}];
// prevents overlap if more than one process contains a specified date
if[1<count procdict;
procdict:{:$[y~`date$();x;$[within[x 0;(min y;max y)];(1+max[y];x 1);x]]}':[procdict]];
:procdict;
};

Expand All @@ -173,36 +172,16 @@ partdict:{[input]
adjustqueries:{[options;part]
// if only one process then no need to adjust
if[2>count p:options`procs;:options];
// get the tablename
tabname:options[`tablename];
// remove duplicate servertypes from the gw.servers
servers:select from .gw.servers where i=(first;i)fby servertype;
// extract the procs which have the table defined
servers:select from servers where {[x;tabname]tabname in @[x;`tables]}[;tabname] each attributes;
// create a dictionary of the attributes against servertypes
procdict:exec servertype!attributes[;`partition] from servers;
// if the response is a dictionary index into the tablename
procdict:@[procdict;key procdict;{[x;tabname]if[99h=type x;:x[tabname]];:x}[;tabname]];
// create list of all available partitions
possparts:raze value procdict;

//group partitions to relevant process
partitions:group key[part]where each{within[y;]each value x}[part]'[possparts];
partitions:possparts{(min x;max x)}'[partitions];
partitions:`timestamp$partitions;

// adjust the times to account for period end time when int partitioned
c:first[partitions`hdb],-1+ first[partitions`rdb];
d:first[partitions`rdb],options `endtime;
partitions:@[@[partitions;`hdb;:;c];`rdb;:;d];

// if start/end time not a date, then adjust dates parameter for the correct types
if[not a:-12h~tp:type start:options`starttime;
// converts partitions dictionary to timestamps/datetimes
partitions:$[-15h~tp;"z"$;]{(0D+x 0;x[1]+1D-1)}'[partitions];
// convert first and last timestamp to start and end time
partitions:@[partitions;f;:;(start;partitions[f:first key partitions;1])];
partitions:@[partitions;l;:;(partitions[l:last key partitions;0];options`endtime)]];

// ensure st/et are timestamps; if date adjust endtime
options:@[@[options;`starttime;:;st:"p"$options`starttime];`endtime;:;et:$[-14h~type et:options`endtime;-1+1D+et;"p"$et]];
// create dict of datetime coverage for each process
partitions:{"p"$x[0],x[1]+1D-1}each part;
// amend query datetimes on hdb
if[`hdb in key partitions;partitions:@[partitions;`hdb;:;(st;min(et;partitions[`hdb;1]))]];
// amend query datetimes on rdb/idb
if[`idb in key partitions;partitions:@[partitions;`idb;:;(max(st;partitions[`idb;0]);et)]];
if[`rdb in key partitions;partitions:@[partitions;`rdb;:;(max(st;partitions[`rdb;0]);et)]];

// adjust map reducable aggregations to get correct components
if[(1<count partitions)&`aggregations in key options;
Expand Down
4 changes: 3 additions & 1 deletion code/processes/idb.q
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ init:{[]
.lg.o[`init;"registering IDBs on WDB process..."];
/-send sync message to WDB to register the existing IDBs.
@[w;(`.servers.registerfromdiscovery;`idb;0b);{.lg.e[`connection;"Failed to register IDB with WDB."];'x}];
/-dataaccess initialisation must be done after wdb loaded.
if[`dataaccess in key .proc.params;.dataaccess.init[]];
.lg.o[`init; "Initialisation of the IDB is done."];
}

Expand All @@ -101,7 +103,7 @@ init:{[]
reload:.idb.intradayreload;

/-Get the relevant IDB attributes
.proc.getattributes:{`partition`tables!(.idb.currentpartition;tables[])};
.proc.getattributes:{`partition`tables!(enlist .idb.currentpartition;tables[])};

.idb.init[];

Expand Down