Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions code/hdb/hdbstandard.q
Original file line number Diff line number Diff line change
@@ -1,7 +1,28 @@
// reload function
reload:{
if[.z.w in key .hdb.reloadcalls;
.hdb.reloadcalls[.z.w]:1b;
$[not all .hdb.reloadcalls;
{.lg.o[`reload;"reload call received from handle ", string[.z.w], "; reload calls pending from handles ", ", "sv string where not .hdb.reloadcalls]; :(::)}[];
.lg.o[`reload;"reload call received from handle ", string[.z.w], "; no more reload calls pending"];
Comment on lines +5 to +7
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think less duplication is needed here

Suggested change
$[not all .hdb.reloadcalls;
{.lg.o[`reload;"reload call received from handle ", string[.z.w], "; reload calls pending from handles ", ", "sv string where not .hdb.reloadcalls]; :(::)}[];
.lg.o[`reload;"reload call received from handle ", string[.z.w], "; no more reload calls pending"];
.lg.o[`reload;"reload call received from handle ", string[.z.w], $[a:not all .hdb.reloadcalls;"; reload calls pending from handles ", ", "sv string where not .hdb.reloadcalls;""]];
// return early if there are still pending calls
if[a;:(::)];

or something like this

]
]
.lg.o[`reload;"reloading HDB"];
@[`.hdb.reloadcalls;key .hdb.reloadcalls;:;0b];
system"l ."}

// Get the relevant HDB attributes
.proc.getattributes:{`date`tables!(@[value;`date;`date$()];tables[])}

\d .hdb

// dictionary of handles to reload
reloadcalls:()!();

// function to add handle to reloadcalls dictionary
po:{[h] if[.proc.proctype in .hdb.connectedProcs;reloadcalls[h]:0b]};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.proc.proctype is the current process's proctype i.e. here it will always be hdb - I don't think this is right

.z.po:{[f;x] @[f;x;()];.hdb.po x} @[value;`.z.po;{{}}];

// function to remove handle from reloadcalls dictionary
pc:{[h] reloadcalls _: h; if[(all .hdb.reloadcalls) & count .hdb.reloadcalls;reload[]]};
.z.pc:{[f;x] @[f;x;()];.hdb.pc x} @[value;`.z.pc;{{}}];
28 changes: 24 additions & 4 deletions code/processes/gateway.q
Original file line number Diff line number Diff line change
Expand Up @@ -550,13 +550,23 @@ reloadstart:{
};

reloadend:{
if[.z.w in key .gw.reloadcalls;
.gw.reloadcalls[.z.w]:1b;
$[not all .gw.reloadcalls;
{.lg.o[`reload;"reload call received from handle ", string[.z.w], "; reload calls pending from handles ", ", "sv string where not .gw.reloadcalls]; :(::)}[];
.lg.o[`reload;"reload call received from handle ", string[.z.w], "; no more reload calls pending"];
]
]

.lg.o[`reload;"reload end called"];
/- set eod variable to false
.gw.seteod[0b];
@[`.gw.reloadcalls; key .gw.reloadcalls;:;0b];
/- retry connections - get updated attributes from servers and refresh servers tables
setattributes .' flip value flip select procname,proctype,@[;(`.proc.getattributes;`);()!()] each w from .servers.SERVERS where .dotz.liveh[w];
/- flush any async queries held during reload phase
.gw.runnextquery[];}
.gw.runnextquery[];
}

setattributes:{ [prcnme;prctyp;att]
/- get relevant atrributes
Expand Down Expand Up @@ -587,6 +597,18 @@ if[@[value;`.timer.enabled;0b];
f@connectiontab
}@[value;`.servers.connectcustom;{{[x]}}]

\d .gwreload

// dictionary of handles to reload
reloadcalls:()!();

// function to add handle to reloadcalls dictionary
po:{[h] if[.proc.proctype in .gw.connectedProcs;reloadcalls[h]:0b]};
.z.po:{[f;x] @[f;x;()];.gwreload.po x} @[value;`.z.po;{{}}];

// function to remove handle from reloadcalls dictionary
pc:{[h] reloadcalls _: h; if[(all .gwreload.reloadcalls) & count .gwreload.reloadcalls;reload[]]};
.z.pc:{[f;x] @[f;x;()];.gwreload.pc x} @[value;`.z.pc;{{}}];

/

Expand Down Expand Up @@ -635,6 +657,4 @@ neg[h](`.gw.asyncexec;"`$last .z.x";enlist[`tables]!enlist enlist`logmsgXXX);h[]
neg[h](`.gw.asyncexec;"`$last .z.x";`tables`servertype!(enlist`data;`rdb`hdb));h[]
neg[h](`.gw.asyncexecjpt;(`.q.system;"sleep 10");enlist[`servertype]!enlist`rdb`hdb;raze;();0D00:00:03);h[]
h(`.gw.syncexec;"`$last .z.x";enlist[`tables]!enlist enlist`logmsgXXX)
h(`.gw.syncexec;"`$last .z.x";`tables`servertype!(enlist`data;`rdb`hdb))


h(`.gw.syncexec;"`$last .z.x";`tables`servertype!(enlist`data;`rdb`hdb))
20 changes: 20 additions & 0 deletions code/processes/rdb.q
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ endofday:{[date;processdata]
};

reload:{[date]
if[.z.w in key .rdb.reloadcalls;
.rdb.reloadcalls[.z.w]:1b;
$[not all .rdb.reloadcalls;
{.lg.o[`reload;"reload call received from handle ", string[.z.w], "; reload calls pending from handles ", ", "sv string where not .rdb.reloadcalls]; :(::)}[];
.lg.o[`reload;"reload call received from handle ", string[.z.w], "; no more reload calls pending"];
]
]

.lg.o[`reload;"reload command has been called remotely"];
/-get all attributes from all tables before they are wiped
/-get a list of pairs (tablename;columnname!attributes)
Expand All @@ -131,7 +139,19 @@ reload:{[date]
/-restore original timeout back to rdb
restoretimeout[];
.lg.o[`reload;"Finished reloading RDB"];
@[`.rdb.reloadcalls; key .rdb.reloadcalls;:;0b];
};

// dictionary of handles to reload
reloadcalls:()!();

// function to add handle to reloadcalls dictionary
po:{[h] if[.proc.proctype in .rdb.connectedProcs;reloadcalls[h]:0b]};
.z.po:{[f;x] @[f;x;()];.rdb.po x} @[value;`.z.po;{{}}];

// function to remove handle from reloadcalls dictionary
pc:{[h] reloadcalls _: h; if[(all .rdb.reloadcalls) & count .rdb.reloadcalls;reload[]]};
.z.pc:{[f;x] @[f;x;()];.rdb.pc x} @[value;`.z.pc;{{}}];

/-drop date from rdbpartition
rmdtfromgetpar:{[date]
Expand Down
3 changes: 3 additions & 0 deletions config/settings/gateway.q
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ querykeeptime:0D00:30 // the time to keep queries in the
errorprefix:"error: " // the prefix for clients to look for in error strings
clearinactivetime:0D01:00 // the time to keep inactive handle data

// list of process types connected for sync reload
connectedProcs:()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't use camel case anywhere else in TorQ, so let's not introduce it here please


\d .kxdash
enabled:0b // Functionality for parsing and handling kx dashboard queries - disabled by default

Expand Down
2 changes: 2 additions & 0 deletions config/settings/hdb.q
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ loadprocesscode:1b // whether to load the process specific code def
CONNECTIONS:() // list of connections to make at start up
STARTUP:1b // create connections

// list of processes connected for sync reload
.hdb.connectedProcs:()
1 change: 1 addition & 0 deletions config/settings/rdb.q
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ gc:1b //if true .Q.gc will be called after
sortcsv:hsym first .proc.getconfigfile["sort.csv"] //location of csv file
reloadenabled:0b //if true, the RDB will not save when .u.end is called but
//will clear it's data using reload function (called by the WDB)

parvaluesrc:`log //where to source the rdb partition value, can be log (from tp log file name),
//tab (from the the first value in the time column of the table that is subscribed for)
//anything else will return a null date which is will be filled by pardefault
Expand Down
18 changes: 18 additions & 0 deletions tests/syncreload/connections.csv
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think more clear comments are needed in this file - it's not at all clear why numbers of connections should be what they are etc., so it's very unclear what is actually being tested here

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
action,ms,bytes,lang,code,repeat,minver,comment
beforeany,0,0,q,system "sleep 5",1,,"wait for all procs to go up"
before,0,0,q,hdbHandle:hopen`::41804:admin:admin,1,,"Get handle to the HDB"
before,0,0,q,rdb1Handle:hopen`::41802:admin:admin,1,,"Get handle to RDB1"
before,0,0,q,rdb2Handle:hopen`::41803:admin:admin,1,,"Get handle to RDB2"
before,0,0,q,wdbHandle:hopen`::41805:admin:admin,1,,"Get handle to WDB"
before,0,0,q,gwHandle:hopen`::41806:admin:admin,1,,"Get handle to gateway"
Comment on lines +3 to +7
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hardcoded ports in unit tests isn't ideal - a future developer should be able to run all unit tests in TorQ easily, without having to faff about with configuring base ports or worrying about port collisions etc.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm having some trouble resolving this issue. I've tried getting the baseport from the system variable and then joining it with sv, but this gives `::/41800/:admin:admin. I'm not sure where to go from here.

Copy link
Member

@jonathonmcmurray jonathonmcmurray Jul 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So couple of thoughts -

  1. It should be fairly simple to do via string manipulation without using sv e.g.
`$"::",getenv[`BASEPORT],":admin:admin"
  1. TorQ has functions for getting connections to other processes based on proctype/procname without manually constructing connection strings etc. - if we use those, we shouldn't have to worry about any of this

true,0,0,q,2~count hdbHandle(value;`.hdb.reloadcalls),1,,"check number of connections to the HDB"
true,0,0,q,1~count rdb1Handle(value;`.rdb.reloadcalls),1,,"check number of connections to RDB1"
true,0,0,q,1~count rdb2Handle(value;`.rdb.reloadcalls),1,,"check number of connections to RDB12"
true,0,0,q,1~count gwHandle(value;`.gwreload.reloadcalls),1,,"check number of connections to the gateway"
run,0,0,q,kill9proc["rdb1"],1,,"shut down rdb1 process"
run,0,0,q,system "sleep 5",1,,"to ensure rdb1 shuts down"
true,0,0,q,2~count hdbHandle(value;`.hdb.reloadcalls),1,,"recheck number of connections to the HDB"
true,0,0,q,1~count rdb2Handle(value;`.rdb.reloadcalls),1,,"recheck number of connections to RDB2"
run,0,0,q,kill9proc["rdb2"],1,,"shut down rdb2 process"
run,0,0,q,system "sleep 5",1,,"to ensure rdb2 shuts down"
true,0,0,q,1~count hdbHandle(value;`.hdb.reloadcalls),1,,"recheck number of connections to the HDB"
17 changes: 17 additions & 0 deletions tests/syncreload/eod.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
action,ms,bytes,lang,code,repeat,minver,comment
beforeany,0,0,q,system "sleep 5",1,,"wait for all procs to go up"
before,0,0,q,hdbHandle:hopen`::41804:admin:admin,1,,"Get handle to the HDB"
before,0,0,q,rdb1Handle:hopen`::41802:admin:admin,1,,"Get handle to RDB1"
before,0,0,q,rdb2Handle:hopen`::41803:admin:admin,1,,"Get handle to RDB2"
before,0,0,q,wdbHandle:hopen`::41805:admin:admin,1,,"Get handle to WDB"
before,0,0,q,gwHandle:hopen`::41806:admin:admin,1,,"Get handle to gateway"
true,0,0,q,0=all rdb1Handle(value;`.rdb.reloadcalls),1,,"check rdb1 reloadcalls dictionary"
true,0,0,q,0=all rdb2Handle(value;`.rdb.reloadcalls),1,,"check rdb2 reloadcalls dictionary"
true,0,0,q,0=all gwHandle(value;`.gwreload.reloadcalls),1,,"check gateway reloadcalls dictionary"
true,0,0,q,0=all hdbHandle(value;`.hdb.reloadcalls),1,,"check hdb reloadcalls dictionary"
run,0,0,q,wdbHandle(`.u.end;.z.d)
run,0,0,q,system "sleep 5",1,,"wait to allow wdb endofday process to finish running"
true,0,0,q,1=all rdb1Handle(value;`.rdb.reloadcalls),1,,"check rdb1 reloadcalls dictionary has updated"
true,0,0,q,0=all rdb2Handle(value;`.rdb.reloadcalls),1,,"check rdb2 reloadcalls dictionary has not updated"
true,0,0,q,0=all gwHandle(value;`.gwreload.reloadcalls),1,,"check gateway reloadcalls dictionary has updated"
true,0,0,q,1=sum hdbHandle(value;`.hdb.reloadcalls),1,,"check hdb reloadcalls dictionary has updated"
8 changes: 8 additions & 0 deletions tests/syncreload/process.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
host,port,proctype,procname,U,localtime,g,T,w,load,startwithall,extras,qcmd
localhost,{KDBBASEPORT}+1,discovery,discovery1,${TORQAPPHOME}/appconfig/passwords/accesslist.txt,1,0,,,${KDBCODE}/processes/discovery.q,1,,
localhost,{KDBBASEPORT},segmentedtickerplant,stp1,${TORQAPPHOME}/appconfig/passwords/accesslist.txt,1,0,,,${KDBCODE}/processes/segmentedtickerplant.q,1,-schemafile ${TORQAPPHOME}/database.q -tplogdir ${KDBTPLOG},
localhost,{KDBBASEPORT}+2,rdb,rdb1,${TORQAPPHOME}/appconfig/passwords/accesslist.txt,1,1,180,,${KDBCODE}/processes/rdb.q,1,,
localhost,{KDBBASEPORT}+3,rdb,rdb2,${TORQAPPHOME}/appconfig/passwords/accesslist.txt,1,1,180,,${KDBCODE}/processes/rdb.q,1,,
localhost,{KDBBASEPORT}+4,hdb,hdb1,${TORQAPPHOME}/appconfig/passwords/accesslist.txt,1,1,60,4000,${KDBHDB},1,,
localhost,{KDBBASEPORT}+5,wdb,wdb1,${TORQAPPHOME}/appconfig/passwords/accesslist.txt,1,1,,,${KDBCODE}/processes/wdb.q,1,,
localhost,{KDBBASEPORT}+6,gateway,gateway1,${TORQAPPHOME}/appconfig/passwords/accesslist.txt,1,1,,4000,${KDBCODE}/processes/gateway.q,1,,
2 changes: 2 additions & 0 deletions tests/syncreload/rdb2.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.rdb.hdbtypes:enlist`hdb
.servers.CONNECTIONS:`gateway`hdb
23 changes: 23 additions & 0 deletions tests/syncreload/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/bin/bash

# Handle command-line arguments
source $KDBTESTS/flagparse.sh

# Path to test directory
testpath=/home/creid/devTorQ/bin/syncreload

# Start procs
bin/torq.sh start discovery1 stp1 hdb1 rdb1 rdb2 wdb1 gateway1 -csv ${testpath}/process.csv

# Start test proc
/usr/bin/rlwrap q ${TORQHOME}/torq.q \
-proctype test -procname test1 \
-test ${testpath} \
-load ${KDBTESTS}/helperfunctions.q ${testpath}/settings.q \
-testresults ${KDBTESTS}/syncreload/results/ \
-procfile ${testpath}/process.csv \
-runtime $run \
$debug $stop $write $quiet

# Shut down procs
bin/torq.sh stop discovery1 stp1 hdb1 rdb1 rdb2 wdb1 gateway1 -csv ${testpath}/process.csv
6 changes: 6 additions & 0 deletions tests/syncreload/settings.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// IPC connection parameters
.servers.CONNECTIONS:`rdb`segmentedtickerplant;
.servers.USERPASS:`admin:admin;

// Paths to process CSV and test STP log directory
processcsv:getenv[`KDBTESTS],"/syncreload/connections/process.csv";