From f2b7090e3e02bda6910e05dc23695c7b6ea39fb9 Mon Sep 17 00:00:00 2001 From: kun22kun Date: Sat, 30 May 2015 16:06:57 +0800 Subject: [PATCH 01/37] modify analyzer.py indent and add ssl to synchronizer --- synchronizer/sslMain.py | 54 ++++++ synchronizer/sslMover.py | 343 +++++++++++++++++++++++++++++++++++ synchronizer/ssl_move-submit | 71 ++++++++ 3 files changed, 468 insertions(+) create mode 100755 synchronizer/sslMain.py create mode 100755 synchronizer/sslMover.py create mode 100644 synchronizer/ssl_move-submit diff --git a/synchronizer/sslMain.py b/synchronizer/sslMain.py new file mode 100755 index 0000000..abe0de6 --- /dev/null +++ b/synchronizer/sslMain.py @@ -0,0 +1,54 @@ +#! /usr/bin/env python +import os,time +import sys,getopt +import TimedExec +from IDPLException import * +import sslMover + + +## ***************************** +## main routine +## ***************************** + +print("\n%s\n" % time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) +timeout = 120 +sslexe = "./sslMover.py" +log_path = "" +port = "" +config = "" + +def usage(): + print("sslMain.py -l -p -c ") + +if len(sys.argv) < 7: + usage() + sys.exit() + +try: + opts, args = getopt.getopt(sys.argv[1:], "hl:p:c:", ["help", "log_path=", "port=", "config="]) +except getopt.GetoptError: + usage() + sys.exit() + +for opt, arg in opts: + if opt in ("-h", "--help" ): + usage() + sys.exit() + elif opt in ("-l", "--log_path"): + log_path = arg + elif opt in ("-p", "--port"): + port = arg + elif opt in ("-c", "--config"): + config = arg + +resultcode,output,err=TimedExec.runTimedCmd(timeout,[sslexe, "-l", log_path, "-p", port, "-c", config]) +print ("".join(output)) +if resultcode < 0: + side = int(os.environ['_CONDOR_PROCNO']) + print ("Timeout! Result code %d" % resultcode) + if side == 0: + raise TimeOutException("client") + else: + raise TimeOutException("server") + +# vim: ts=4:sw=4 diff --git a/synchronizer/sslMover.py b/synchronizer/sslMover.py new file mode 100755 index 0000000..c20e6b4 --- /dev/null +++ b/synchronizer/sslMover.py @@ -0,0 +1,343 @@ +#!/usr/bin/env python +# encoding: UTF-8 + +import socket,ssl +import sys,os +import time +from thread import * +import re, getopt +import CondorTools +from xml.etree import ElementTree as ET + +chirp = CondorTools.CondorChirp() + +class FileReader: + + def __init__(self): + + self.fileUri = "" + self.offsetLast = 0 + self.offsetNew = 0 + self.offsetNow = 0 + self.timestampNew = "" + self.isHasChanged = False + + def read(self, fileUriNow): + with open(fileUriNow) as text: + return text.readlines() + + def getALine(self, fileLines): + return fileLines.pop() + + def match(self, reg, strToMatch, timestamp): + match = re.compile(reg).search(strToMatch) + + if match: + timestampNow = match.group()[1:(len(match.group()) - 1)].split(',')[3] + if not self.isHasChanged: + self.timestampNew = timestampNow + self.offsetNew = self.offsetNow + self.isHasChanged = True + return float(timestampNow) > timestamp + else: + if not self.isHasChanged: + self.offsetNow += 1 + return True + + def getNewFilePath(self, suffix): + return "%s.%d" % (self.fileUri, suffix), suffix + 1 + + + def chooseLinesInAFile(self, fileLines, reg, timestamp, strAdded, isTimeReached): + while len(fileLines): + line = self.getALine(fileLines) + if self.match(reg, line, timestamp): + strAdded.insert(0, line) + else: + isTimeReached = True + break + return strAdded, isTimeReached + + + def chooseLines(self, timestamp, offsetL, path): + self.fileUri = path + self.offsetLast = offsetL + reg = "'iperf.*'" + isTimeReached = False + strAdded = [] + fileUriNow = self.fileUri + suffix = 0 + if not os.path.exists(fileUriNow): + print("file not exist!") + sys.exit(0) + while (not isTimeReached and os.path.exists(fileUriNow)): + fileLines = self.read(fileUriNow) + strAdded, isTimeReached = self.chooseLinesInAFile(fileLines, reg, timestamp, strAdded, isTimeReached) + fileUriNow, suffix = self.getNewFilePath(suffix) + return ''.join(strAdded[self.offsetLast:]), self.timestampNew, str(self.offsetNew) + +class Server: + def __init__(self, path, port): + self.path = path + self.host = socket.getfqdn() + self.port = port + + def commuWithClient(self, conn): + timestamp = "" + timestampNew = "" + offset = "" + offsetNew = "" + while True: + data = conn.recv(64) + + if self.match(r"\d+(\.\d+)?,\d+", data): + timestamp, offset = data.split(',') + strAdded, timestampNew, offsetNew = FileReader().chooseLines(float(timestamp), int(offset), self.path) + dataToSend = "%sKUNSIGN%sKUNSIGN%s" % (strAdded, timestampNew, offsetNew) + conn.sendall(str(len(dataToSend))) + + elif self.match("KUNBEGIN", data): + conn.sendall(dataToSend) + + elif self.match("KUNSTOP", data): + print("value error") + raise + else: + break + + def match(self, reg, strToMatch): + return re.compile(reg).match(strToMatch) + + + + def changeFlag(self): + time.sleep(2) + chirp.setJobAttr("SSLServer","'%s %d'" % (self.host, int(self.port))) + + def serve(self): + """ create an x509 cert and an rsa private key """ + path = "/tmp/" + certpath = "%scert.pem" % path + keypath = "%skey.pem" % path + os.popen("echo '\n\n\n\n\n\n\n' | openssl req -newkey rsa:1024 -x509 -days 365 -nodes -out %s -keyout %s" % (certpath, keypath)) + + """ transfer SSL certificate to client via chirp""" + certStr = "" + with open(certpath) as cert: + for line in cert.readlines(): + certStr = "%s%s" % (certStr, line.strip("\n")) + chirp.setJobAttr("SSLCert", "'%s'" % certStr) + + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + sock.bind(("", int(self.port))) + except socket.error, msg: + print ("Bind failed. Error Code: %s Message %s" % (str(msg[0]), msg[1])) + sys.exit() + + + sock.listen(1) + + start_new_thread(self.changeFlag, ()) + conn, addr = sock.accept() + conn_ssl = ssl.wrap_socket(conn, server_side = True, certfile = certpath, keyfile = keypath) + try: + self.commuWithClient(conn_ssl) + except : + print ("value error!") + raise + finally: + sock.close() + print ("socket close now !") + chirp.setJobAttr("SSLServer", None) + chirp.setJobAttr("SSLCert", None) + sys.exit() + + +class XmlHandler: + def __init__(self, xmlfile): + self.xmlTree = self.readXml(xmlfile) + + def readXml(self, in_path): + if not os.path.exists(in_path): + print ("there is no such file: %s" % in_path) + sys.exit() + try: + tree = ET.parse(in_path) + except: + print ("tree parse error") + raise + return tree + + def getNodes(self, tree): + root = tree.getroot() + return root.getchildren() + + def findNode(self, nodes, tag): + for node in nodes: + if node.tag == tag: + return node + + def getTexts(self, nodes, tags): + texts = [] + for tag in tags: + texts.append(self.findNode(nodes, tag).text) + return texts + + def read(self): + nodes = self.getNodes(self.xmlTree) + path, timestamp, offset= self.getTexts(nodes, ["path", "timestamp", "offset"]) + return path, timestamp, offset + + def writeXml(self, node, text): + node.text = text + + def setTexts(self, texts, tags): + nodes = self.getNodes(self.xmlTree) + for text, tag in zip(texts, tags): + self.writeXml(self.findNode(nodes, tag), text) + + def write(self, newTimestamp, newOffset, xmlfile): + self.setTexts([newTimestamp, newOffset], ["timestamp", "offset"]) + self.xmlTree.write(xmlfile, encoding="utf-8") + + + +class Client: + + def __init__(self,config): + self.config = config + + def get_constant(self, prefix): + """Create a dictionary mapping socket module constants to their names.""" + return dict( (getattr(socket, n), n) + for n in dir(socket) if n.startswith(prefix) + ) + + def get_constants(self, sock): + families = self.get_constant('AF_') + types = self.get_constant('SOCK_') + protocols = self.get_constant('IPPROTO_') + + print 'Family :', families[sock.family] + print 'Type :', types[sock.type] + print 'Protocol:', protocols[sock.proto] + + def writeSSLCert(self, path, sslCert): + certBegin = sslCert[ : 27] + certEnd = sslCert[-25 : ] + #TODO if 27 > len -25 ? + certContent = sslCert[27 : -25] + certContentList = [] + for i in range(0, len(certContent), 64): + line = certContent[i : i + 64] + certContentList.append(line + "\n") + certContent = "".join(certContentList) + certDealt = "%s\n%s%s" % (certBegin, certContent, certEnd) + + with open(path, "w") as certfile: + certfile.write(certDealt) + + + + + def request(self): + """ Read xml file """ + try: + xmlHandler = XmlHandler(self.config) + except: + print "xml read error" + sys.exit() + path, timestamp, offset = xmlHandler.read() + + """ Get host and port from chirp """ + interval = 5 + maxtries = 12*3 + serverInfo = chirp.getJobAttrWait("SSLServer",None,interval, maxtries) + host,port = serverInfo.strip("'").split() + + """ Write the ssl certificate """ + certpath = "/tmp/cert.pem" + sslCert = chirp.getJobAttrWait("SSLCert", None, interval, maxtries).strip("'") + self.writeSSLCert(certpath, sslCert) + + """ Create a TCP/IP socket with SSL """ + sock = socket.create_connection((host, int(port))) + self.get_constants(sock) + sockSSL = ssl.wrap_socket(sock, ca_certs = certpath, cert_reqs = ssl.CERT_REQUIRED) + + + try: + """ Send data """ + message = "%s,%s" % (timestamp, offset) + sockSSL.sendall(message) + amount_received = 0 + rec = sockSSL.recv(64) + amount = int(rec) + sockSSL.sendall("KUNBEGIN") + + dataComp = "" + while amount_received < int(amount): + data = sockSSL.recv(min(4096, int(amount) - amount_received)) + dataComp += data + amount_received += len(data) + + strAdded, timestamp, offset = dataComp.split("KUNSIGN") + + if not amount_received < amount: + with open(path, "a") as output: + output.write(strAdded) + #if timestamp and offset: + #xmlHandler.write(timestamp, offset, self.config) + except: + sockSSL.sendall("KUNSTOP") + print "amount value error" + raise + + finally: + print 'closing socket' + sockSSL.close() + sock.close() + + +def usage(): + print("sslMain.py -l -p -c ") + + +def main(argv): + if len(sys.argv) < 6: + usage() + sys.exit() + + try: + opts, args = getopt.getopt(argv, "hl:p:c:", ["help", "log_path=", "port=", "config="]) + except getopt.GetoptError: + usage() + sys.exit() + + for opt, arg in opts: + if opt in ("-h", "--help" ): + usage() + sys.exit() + elif opt in ("-l", "--log_path"): + log_path = arg + elif opt in ("-p", "--port"): + port = arg + elif opt in ("-c", "--config"): + config = arg + + if int(os.environ['_CONDOR_PROCNO']) == 0: + client = Client(config) + client.request() + + else: + chirp.setJobAttr("SSLServer",None) + chirp.setJobAttr("SSLCert", None) + server = Server(log_path, port) + server.serve() + +if __name__ == '__main__': + main(sys.argv[1:]) + +# vim: ts=4:sw=4 diff --git a/synchronizer/ssl_move-submit b/synchronizer/ssl_move-submit new file mode 100644 index 0000000..c7d6682 --- /dev/null +++ b/synchronizer/ssl_move-submit @@ -0,0 +1,71 @@ +############ +# +# Parallel Job +# +############ + +universe = parallel +executable = sslMain.py + + +#SRC_HOST=mickey.buaa.edu.cn +SRC_HOST=JSI-iDPL01 +#DST_HOST=komatsu.chtc.wisc.edu +DST_HOST=JSI-iDPL02 +#log_path=/home/phil/placement/placement2.log +log_path=/tmp/testbykun/test.txt +port=8888 +config=/tmp/testbykun/client_config.xml +#config=/home/idpl/remoteLogs/komatsu/client_config.xml + +### Crondor Settings +# A promise that jobs will not run more often than this (in seconds) +# Required for the the job to run multiple times successfully. +#LEASE=1500 + +# A run is allowed to take this long (in seconds) to set up; otherwise +# that run is skipped +#cron_window=60 + +# Try to run jobs on this schedule +#cron_minute=55 +#cron_hour=11,23 +# +# Keep running the job +#on_exit_remove=false + +# Arguments are: +# 1. Sending host +# 2. File to send (on the sending host) +# 3. Receiving host +# 4. Location to write file (on the receiving host) +#arguments = $(SRC_HOST) $(SRC_PATH) $(DST_HOST) $(DST_PATH) $(LEASE) +arguments= -l $(log_path) -p $(port) -c $(config) + +## Enable Chirp ++WantIOProxy = true + +input = /dev/null +output = ./out/sslMain.out.$(Node) +error = ./err/sslMain.err.$(Node) +log = ./log/sslMain.log +getenv = true + +#+SrcPath = "$(SRC_PATH)" +#+DstHost = "$(DST_HOST)" +#+DstPath = "$(DST_PATH)" + ++ParallelShutdownPolicy = "WAIT_FOR_ALL" + +transfer_input_files = TimedExec.py,IDPLException.py,CondorTools.py,sslMover.py + +should_transfer_files = YES +when_to_transfer_output = ON_EXIT + +machine_count = 1 +requirements = (Machine == "$(SRC_HOST)") +queue + +machine_count = 1 +requirements = (Machine == "$(DST_HOST)") +queue From 958c7730ffc12b43380b452ba1a55e20d2e58a80 Mon Sep 17 00:00:00 2001 From: kun22kun Date: Sat, 30 May 2015 16:08:58 +0800 Subject: [PATCH 02/37] modify indent of analyzer --- analyzer/analyzer.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/analyzer/analyzer.py b/analyzer/analyzer.py index 1e5be88..472df71 100755 --- a/analyzer/analyzer.py +++ b/analyzer/analyzer.py @@ -14,9 +14,12 @@ def match(self, reg, strToMatch, result): def combi(self, strToCombi, tool): strArray = strToCombi.split(',')[1:] + print strArray if tool == "netcat": + print(strArray[len(strArray) - 1]) datasize = strArray[len(strArray) - 1] strArray[len(strArray) - 1] = str(float(datasize) / 1024) + print(strArray[len(strArray) - 1]) if (not self.deal(strArray)): return False, '' #print(strArray) @@ -39,6 +42,7 @@ def analyze(self, strToMatch, tools): reg = "'" + tool + ".*'" matchResult = self.match(reg, strToMatch, result) if matchResult[0]: + print(matchResult[1]) resultSet = self.combi(matchResult[1], tool) resultSet.append(tool) break From 671b0d2251e21ac274d3a7c643c71776912c1bb0 Mon Sep 17 00:00:00 2001 From: kun22kun Date: Sat, 30 May 2015 16:13:01 +0800 Subject: [PATCH 03/37] combine to into two python files --- synchronizer/fileReader.py | 86 --------------------------- synchronizer/server_client-submit | 73 ----------------------- synchronizer/socketClient.py | 96 ------------------------------- synchronizer/socketServer.py | 96 ------------------------------- synchronizer/transmission.py | 55 ------------------ synchronizer/xmlReader.py | 63 -------------------- 6 files changed, 469 deletions(-) delete mode 100755 synchronizer/fileReader.py delete mode 100644 synchronizer/server_client-submit delete mode 100755 synchronizer/socketClient.py delete mode 100755 synchronizer/socketServer.py delete mode 100755 synchronizer/transmission.py delete mode 100755 synchronizer/xmlReader.py diff --git a/synchronizer/fileReader.py b/synchronizer/fileReader.py deleted file mode 100755 index 3823bae..0000000 --- a/synchronizer/fileReader.py +++ /dev/null @@ -1,86 +0,0 @@ -#!/usr/bin/env python -# encoding: UTF-8 - -import re,os - -class FileReader: - - fileUri = "" - - offsetLast = 0 - offsetNew = 0 - offsetNow = 0 - timestampNew = "" - isHasChanged = False - - def read(self, fileUriNow): - with open(fileUriNow) as text: - #print("fileReader, file is " + fileUriNow) - return text.readlines() - - def getALine(self, fileLines): - return fileLines.pop() - - def match(self, reg, strToMatch, timestamp): - match = re.compile(reg).search(strToMatch) - - if match: - timestampNow = match.group()[1:(len(match.group()) - 1)].split(',')[3] - #print (timestampNow) - if not self.isHasChanged: - #print (self.offsetNow) - self.timestampNew = timestampNow - self.offsetNew = self.offsetNow - self.isHasChanged = True - return float(timestampNow) > timestamp - else: - if not self.isHasChanged: - #print("add!!!!") - self.offsetNow += 1 - #print(self.offsetNow) - return True - - def getNewFilePath(self, suffix): - return self.fileUri + '.' + str(suffix), suffix + 1 - - - def chooseLinesInAFile(self, fileLines, reg, timestamp, strAdded, isTimeReached): - while len(fileLines): - line = self.getALine(fileLines) - #print ("line: " + line) - if self.match(reg, line, timestamp): - strAdded.insert(0, line) - else: - isTimeReached = True - break - return strAdded, isTimeReached - - - def chooseLines(self, timestamp, offsetL, path): - #self.fileUri = "/home/kun/GraduationProject/ServerClient0.11/test.txt" - self.fileUri = path - print "file is: " + self.fileUri + " t is: " + str(timestamp) + " o is: " + str(offsetL) - self.offsetLast = offsetL - reg = "'iperf.*'" - isTimeReached = False - strAdded = [] - fileUriNow = self.fileUri - suffix = 0 - if not os.path.exists(fileUriNow): - print("file not exist!") - sys.exit(0) - while (not isTimeReached and os.path.exists(fileUriNow)): - fileLines = self.read(fileUriNow) - strAdded, isTimeReached = self.chooseLinesInAFile(fileLines, reg, timestamp, strAdded, isTimeReached) - fileUriNow, suffix = self.getNewFilePath(suffix) - print ("isTimeReached: " + str(isTimeReached) + " file is: " + fileUriNow + " suffix is: " + str(suffix) + " is: " + str(os.path.exists(fileUriNow))) - #print (''.join(strAdded)) - #print("Now offset is: " + str(self.offsetNew)) - return ''.join(strAdded[self.offsetLast:]), self.timestampNew, str(self.offsetNew) - -if __name__ == '__main__': - timestamp = 0 - fileReader = FileReader() - fileReader.chooseLines(timestamp) - - diff --git a/synchronizer/server_client-submit b/synchronizer/server_client-submit deleted file mode 100644 index d41ab45..0000000 --- a/synchronizer/server_client-submit +++ /dev/null @@ -1,73 +0,0 @@ -############ -# -# Parallel Job -# -############ - -universe = parallel -executable = transmission.py - - -SRC_HOST=mickey.buaa.edu.cn -#SRC_HOST=JSI-iDPL01 -#SRC_PATH=/home/phil/htcondor-8.2.2-4.x86_64.disk1.iso -DST_HOST=komatsu.chtc.wisc.edu -#DST_HOST=JSI-iDPL02 -#DST_PATH=htcondor-8.2.2-4.x86_64.disk1.iso -path=/home/phil/placement/placement2.log -#path=/tmp/testbykun/placement2.log -host= -port=5022 -#config=/tmp/testbykun/client_config.xml -config=/home/idpl/remoteLogs/komatsu/client_config.xml - -### Crondor Settings -# A promise that jobs will not run more often than this (in seconds) -# Required for the the job to run multiple times successfully. -#LEASE=1500 - -# A run is allowed to take this long (in seconds) to set up; otherwise -# that run is skipped -cron_window=60 - -# Try to run jobs on this schedule -cron_minute=0-59/30 -# -# Keep running the job -on_exit_remove=false - -# Arguments are: -# 1. Sending host -# 2. File to send (on the sending host) -# 3. Receiving host -# 4. Location to write file (on the receiving host) -#arguments = $(SRC_HOST) $(SRC_PATH) $(DST_HOST) $(DST_PATH) $(LEASE) -arguments= $(path) $(host) $(port) $(config) - -## Enable Chirp -+WantIOProxy = true - -input = /dev/null -output = ./out/transmission.out.$(Node) -error = ./err/transmission.err.$(Node) -log = ./log/transmission.log -getenv = true - -#+SrcPath = "$(SRC_PATH)" -#+DstHost = "$(DST_HOST)" -#+DstPath = "$(DST_PATH)" - -+ParallelShutdownPolicy = "WAIT_FOR_ALL" - -transfer_input_files = TimedExec.py,IDPLException.py,CondorTools.py,socketServer.py,socketClient.py,fileReader.py,xmlReader.py - -should_transfer_files = YES -when_to_transfer_output = ON_EXIT - -machine_count = 1 -requirements = (Machine == "$(SRC_HOST)") -queue - -machine_count = 1 -requirements = (Machine == "$(DST_HOST)") -queue diff --git a/synchronizer/socketClient.py b/synchronizer/socketClient.py deleted file mode 100755 index 9be87a6..0000000 --- a/synchronizer/socketClient.py +++ /dev/null @@ -1,96 +0,0 @@ -#!/usr/bin/env python -# encoding: UTF-8 - -import socket -import sys,getopt -import xmlReader -import CondorTools - -chirp = CondorTools.CondorChirp() -class Client: - - def __init__(self,config): - self.config = config - print ("client init") - - def get_constant(self, prefix): - """Create a dictionary mapping socket module constants to their names.""" - return dict( (getattr(socket, n), n) - for n in dir(socket) if n.startswith(prefix) - ) - - def get_constants(self, sock): - families = self.get_constant('AF_') - types = self.get_constant('SOCK_') - protocols = self.get_constant('IPPROTO_') - - print >>sys.stderr, 'Family :', families[sock.family] - print >>sys.stderr, 'Type :', types[sock.type] - print >>sys.stderr, 'Protocol:', protocols[sock.proto] - print >>sys.stderr - - def demand(self): - - print ("client demand") - try: - xmlHandler = xmlReader.XmlHandler(self.config) - except: - print "xml read error" - sys.exit() - host, port, path, timestamp, offset = xmlHandler.read() - print "five vals:" - print host, port, path, timestamp, offset - - interval = 5 - maxtries = 12*3 - serverInfo = chirp.getJobAttrWait("SocketServer",None,interval, maxtries) - print "serverInfo is:" + serverInfo - #hostFromCondor,portFromCondor = serverInfo.strip("'").split() - #print hostFromCondor, portFromCondor - # Create a TCP/IP socket - sock = socket.create_connection((host, int(port))) - self.get_constants(sock) - - print host, port, path, timestamp, offset - - try: - # Send data - message = timestamp + ',' + offset - sock.sendall(message) - amount_received = 0 - rec = sock.recv(64) - print("rec is: " + rec) - amount = int(rec) - #print amount - sock.sendall("kunBegin") - - dataComp = "" - while amount_received < int(amount): - print (amount_received, amount) - data = sock.recv(min(4096, int(amount) - amount_received)) - dataComp += data - amount_received += len(data) - - strAdded, timestamp, offset = dataComp.split("KUNSIGN") - - print strAdded - if not amount_received < amount: - with open(path, "a") as output: - output.write(strAdded) - print "time is " + timestamp - if timestamp and offset: - xmlHandler.write(timestamp, offset, self.config) - except: - sock.sendall("kunStop") - print "amount value error" - - finally: - print >>sys.stderr, 'closing socket' - sock.close() - print("dadada") - -if __name__ == '__main__': - if len(sys.argv) < 2: - print ("client val num error!") - client = Client(sys.argv[1]) - client.demand() diff --git a/synchronizer/socketServer.py b/synchronizer/socketServer.py deleted file mode 100755 index ff3f299..0000000 --- a/synchronizer/socketServer.py +++ /dev/null @@ -1,96 +0,0 @@ -#!/usr/bin/env python -# encoding: UTF-8 - -import socket -import sys -import time -from thread import * -import fileReader -import re, getopt -import CondorTools - -HOST = "" -PORT = 5022 -chirp = CondorTools.CondorChirp() - -class Server: - def __init__(self, path, host, port): - self.path = path - HOST = host - PORT = port - - def commuWithClient(self, conn): - #conn.send("welcome!\n") - timestamp = "" - timestampNew = "" - offset = "" - offsetNew = "" - while True: - data = conn.recv(64) - print ("data is: " + data + " type is: " + str(type(data))) - if self.match(r"\d+(\.\d+)?,\d+", data): - print("begin to read file!!") - timestamp, offset = data.split(',') - print "cat is: " + timestamp + " o is: " + offset + " p is: " + self.path - strAdded, timestampNew, offsetNew = fileReader.FileReader().chooseLines(float(timestamp), int(offset), self.path) - dataToSend = strAdded + "KUNSIGN" + timestampNew + "KUNSIGN" + offsetNew - print(len(strAdded)) - conn.sendall(str(len(dataToSend))) - elif self.match("kunBegin", data): - conn.sendall(dataToSend) - elif self.match("kunStop", data): - #print("value error") - raise - else: - print("Done!") - break - #conn.send("bye\n") - #conn.close() - - def match(self, reg, strToMatch): - return re.compile(reg).match(strToMatch) - - - - def changeFlag(self): - time.sleep(2) - chirp.setJobAttr("SocketServer","'%s %d'" % (HOST, PORT)) - print("ChangeFlag()") - - def serve(self): - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - print ("Socket created") - try: - s.bind((HOST, PORT)) - except socket.error, msg: - print ("Bind failed. Error Code : " + str(msg[0]) + " Message " + msg[1]) - sys.exit() - - print ("Socket bind complete") - - s.listen(1) - print ("socket now listening") - - flagOfCondor = "" - start_new_thread(self.changeFlag, ()) - conn, addr = s.accept() - print ("Connected with " + addr[0] + ":" + str(addr[1])) - try: - self.commuWithClient(conn) - except : - print ("value error!") - raise - finally: - s.close() - print ("socket close now !") - chirp.setJobAttr("SocketServer",None) - print("exit") - sys.exit() - -if __name__ == '__main__': - chirp.setJobAttr("SocketServer",None) - if len(sys.argv) < 4: - print ("server val num error") - server = Server(sys.argv[1], sys.argv[2], sys.argv[3]) - server.serve() - diff --git a/synchronizer/transmission.py b/synchronizer/transmission.py deleted file mode 100755 index 09963a6..0000000 --- a/synchronizer/transmission.py +++ /dev/null @@ -1,55 +0,0 @@ -#! /usr/bin/env python -import os,time -import sys -import socketClient -import socketServer -import socket -import TimedExec -from IDPLException import * - - -def myprint(string): - print("*********************************************************\n" + string) - -## ***************************** -## main routine -## ***************************** - -print('************************' + time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time())) + '************************\n') -serverTimeout = 120 -clientTimeout = 120 -server = "./socketServer.py" -client = "./socketClient.py" - -if len(sys.argv) < 3: - print ("val number error") -for string in sys.argv: - print string - -path = sys.argv[1] -host = "" -port = sys.argv[2] -config = sys.argv[3] - -print path, host, port, config -if int(os.environ['_CONDOR_PROCNO']) == 0: - print ("client start") - client = socketClient.Client(config) - client.demand() - #print socket.gethostname(), socket.gethostbyname(socket.gethostname()) - #resultcode,output,err=TimedExec.runTimedCmd(clientTimeout,[client, config]) - #print(output) - #if resultcode < 0: - #print("Timeout! Result code: %d" % resultcode) - #print(err) - #raise TimeOutException("client") -else: - print("server start") - server = socketServer.Server(path, host, port) - server.serve() - #resultcode,output,err=TimedExec.runTimedCmd(serverTimeout, [server, path, host, port]) - #print(output) - #if resultcode < 0: - #print("Result code: %d" % resultcode) - #print(err) - #raise TimeOutException("server") diff --git a/synchronizer/xmlReader.py b/synchronizer/xmlReader.py deleted file mode 100755 index f493e4d..0000000 --- a/synchronizer/xmlReader.py +++ /dev/null @@ -1,63 +0,0 @@ -#!/usr/bin/env python -# encoding: UTF-8 - -from xml.etree import ElementTree as ET -import os - -class XmlHandler: - def __init__(self, xmlfile): - print ("xmlHandler init: " + xmlfile) - self.xmlTree = self.readXml(xmlfile) - - def readXml(self, in_path): - if not os.path.exists(in_path): - print ("there is no such file: " + in_path) - sys.exit() - try: - tree = ET.parse(in_path) - except: - print ("tree parse error") - print ("return tree successfully") - return tree - - def getNodes(self, tree): - root = tree.getroot() - print ("return root successfully") - return root.getchildren() - - def findNode(self, nodes, tag): - for node in nodes: - if node.tag == tag: - return node - - def getTexts(self, nodes, tags): - texts = [] - for tag in tags: - texts.append(self.findNode(nodes, tag).text) - return texts - - def read(self): - nodes = self.getNodes(self.xmlTree) - host, port, path, timestamp, offset= self.getTexts(nodes, ["host", "port", "path", "timestamp", "offset"]) - return host, port, path, timestamp, offset - - def writeXml(self, node, text): - node.text = text - #print node.tag, node.text - - def setTexts(self, texts, tags): - nodes = self.getNodes(self.xmlTree) - for text, tag in zip(texts, tags): - self.writeXml(self.findNode(nodes, tag), text) - - def write(self, newTimestamp, newOffset, xmlfile): - #int "time is " + newTimestamp - self.setTexts([newTimestamp, newOffset], ["timestamp", "offset"]) - self.xmlTree.write(xmlfile, encoding="utf-8") - - -if __name__ == '__main__': - xmlHandler = XmlHandler("client_config.xml") - print xmlHandler.read() - #xmlHandler.write("newTimestamp", "newOffset", "client_config.xml") - print xmlHandler.read() From cdf3f5ee35542e8472326b9aeb77dc3f01f16f32 Mon Sep 17 00:00:00 2001 From: kun Date: Mon, 1 Jun 2015 11:08:10 +0800 Subject: [PATCH 04/37] code review --- synchronizer/sslMain.py | 1 + synchronizer/sslMover.py | 158 ++++++++++++++++++++++----------------- 2 files changed, 89 insertions(+), 70 deletions(-) diff --git a/synchronizer/sslMain.py b/synchronizer/sslMain.py index abe0de6..64a32f9 100755 --- a/synchronizer/sslMain.py +++ b/synchronizer/sslMain.py @@ -41,6 +41,7 @@ def usage(): elif opt in ("-c", "--config"): config = arg + resultcode,output,err=TimedExec.runTimedCmd(timeout,[sslexe, "-l", log_path, "-p", port, "-c", config]) print ("".join(output)) if resultcode < 0: diff --git a/synchronizer/sslMover.py b/synchronizer/sslMover.py index c20e6b4..7eb7caa 100755 --- a/synchronizer/sslMover.py +++ b/synchronizer/sslMover.py @@ -2,7 +2,7 @@ # encoding: UTF-8 import socket,ssl -import sys,os +import sys,os,traceback import time from thread import * import re, getopt @@ -12,53 +12,55 @@ chirp = CondorTools.CondorChirp() class FileReader: - + def __init__(self): - self.fileUri = "" self.offsetLast = 0 self.offsetNew = 0 self.offsetNow = 0 self.timestampNew = "" - self.isHasChanged = False - + self.hasChanged = False + def read(self, fileUriNow): with open(fileUriNow) as text: return text.readlines() - - def getALine(self, fileLines): - return fileLines.pop() def match(self, reg, strToMatch, timestamp): - match = re.compile(reg).search(strToMatch) - + """ identify if a line matches the rule to add. + if passes the regular expression test, + update timestampNew and offsetNew when first time passes + compare the timestamp, choose the line timestampNow > timestamp + else + add the line""" + + match = re.compile(reg).search(strToMatch) if match: timestampNow = match.group()[1:(len(match.group()) - 1)].split(',')[3] - if not self.isHasChanged: + if not self.hasChanged: self.timestampNew = timestampNow self.offsetNew = self.offsetNow - self.isHasChanged = True + self.hasChanged = True return float(timestampNow) > timestamp else: - if not self.isHasChanged: + if not self.hasChanged: self.offsetNow += 1 return True def getNewFilePath(self, suffix): + """ get the log rotated """ return "%s.%d" % (self.fileUri, suffix), suffix + 1 - def chooseLinesInAFile(self, fileLines, reg, timestamp, strAdded, isTimeReached): - while len(fileLines): - line = self.getALine(fileLines) + """ choose lines match the rules in a file """ + for line in fileLines[::-1]: if self.match(reg, line, timestamp): strAdded.insert(0, line) else: isTimeReached = True break return strAdded, isTimeReached - - + + """ get all lines in a transfer """ def chooseLines(self, timestamp, offsetL, path): self.fileUri = path self.offsetLast = offsetL @@ -67,15 +69,24 @@ def chooseLines(self, timestamp, offsetL, path): strAdded = [] fileUriNow = self.fileUri suffix = 0 + if not os.path.exists(fileUriNow): print("file not exist!") sys.exit(0) + + """ scan all logs including rotated if exist """ while (not isTimeReached and os.path.exists(fileUriNow)): fileLines = self.read(fileUriNow) strAdded, isTimeReached = self.chooseLinesInAFile(fileLines, reg, timestamp, strAdded, isTimeReached) fileUriNow, suffix = self.getNewFilePath(suffix) + return ''.join(strAdded[self.offsetLast:]), self.timestampNew, str(self.offsetNew) +class TransmissionException(Exception): + def __init__(self, msg): + Exception.__init__(self, msg) + self.msg = msg + class Server: def __init__(self, path, port): self.path = path @@ -87,36 +98,39 @@ def commuWithClient(self, conn): timestampNew = "" offset = "" offsetNew = "" + + """ get data to transfer via timestamp and offset from client and send data to client """ while True: data = conn.recv(64) if self.match(r"\d+(\.\d+)?,\d+", data): timestamp, offset = data.split(',') strAdded, timestampNew, offsetNew = FileReader().chooseLines(float(timestamp), int(offset), self.path) - dataToSend = "%sKUNSIGN%sKUNSIGN%s" % (strAdded, timestampNew, offsetNew) + dataToSend = "%s" % strAdded conn.sendall(str(len(dataToSend))) - elif self.match("KUNBEGIN", data): + elif self.match("BEGIN", data): conn.sendall(dataToSend) - elif self.match("KUNSTOP", data): - print("value error") - raise - else: + elif self.match("STOP", data): + raise TransmissionException("value error") + + elif self.match("END", data): break + + """ send the new timestamp and offset """ + conn.sendall("%s,%s" % (timestampNew, offsetNew)) def match(self, reg, strToMatch): return re.compile(reg).match(strToMatch) - - def changeFlag(self): time.sleep(2) chirp.setJobAttr("SSLServer","'%s %d'" % (self.host, int(self.port))) def serve(self): """ create an x509 cert and an rsa private key """ - path = "/tmp/" + path = "./" certpath = "%scert.pem" % path keypath = "%skey.pem" % path os.popen("echo '\n\n\n\n\n\n\n' | openssl req -newkey rsa:1024 -x509 -days 365 -nodes -out %s -keyout %s" % (certpath, keypath)) @@ -128,28 +142,31 @@ def serve(self): certStr = "%s%s" % (certStr, line.strip("\n")) chirp.setJobAttr("SSLCert", "'%s'" % certStr) - + """ create a socket""" sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: sock.bind(("", int(self.port))) except socket.error, msg: - print ("Bind failed. Error Code: %s Message %s" % (str(msg[0]), msg[1])) + print("Bind failed. Error Code: %s Message %s" % (str(msg[0]), msg[1])) sys.exit() - + """ wait to connect from client """ sock.listen(1) - start_new_thread(self.changeFlag, ()) conn, addr = sock.accept() + + """ wrap socket via ssl """ conn_ssl = ssl.wrap_socket(conn, server_side = True, certfile = certpath, keyfile = keypath) try: self.commuWithClient(conn_ssl) - except : - print ("value error!") - raise + except TransmissionException, trans: + print("%s" % trans.msg) + traceback.print_exc() + + finally: sock.close() - print ("socket close now !") + print("socket close now !") chirp.setJobAttr("SSLServer", None) chirp.setJobAttr("SSLCert", None) sys.exit() @@ -159,15 +176,15 @@ class XmlHandler: def __init__(self, xmlfile): self.xmlTree = self.readXml(xmlfile) - def readXml(self, in_path): + def readXml(self, in_path): + """ get the tree of xml file """ if not os.path.exists(in_path): - print ("there is no such file: %s" % in_path) + print("there is no such file: %s" % in_path) sys.exit() try: tree = ET.parse(in_path) - except: - print ("tree parse error") - raise + except : + print("tree parse error") return tree def getNodes(self, tree): @@ -180,12 +197,13 @@ def findNode(self, nodes, tag): return node def getTexts(self, nodes, tags): + """ get the content in xml flie via tags """ texts = [] for tag in tags: texts.append(self.findNode(nodes, tag).text) return texts - def read(self): + def read(self): nodes = self.getNodes(self.xmlTree) path, timestamp, offset= self.getTexts(nodes, ["path", "timestamp", "offset"]) return path, timestamp, offset @@ -202,8 +220,6 @@ def write(self, newTimestamp, newOffset, xmlfile): self.setTexts([newTimestamp, newOffset], ["timestamp", "offset"]) self.xmlTree.write(xmlfile, encoding="utf-8") - - class Client: def __init__(self,config): @@ -211,18 +227,18 @@ def __init__(self,config): def get_constant(self, prefix): """Create a dictionary mapping socket module constants to their names.""" - return dict( (getattr(socket, n), n) - for n in dir(socket) if n.startswith(prefix) - ) + return dict( + (getattr(socket, n), n) for n in dir(socket) if n.startswith(prefix) + ) def get_constants(self, sock): families = self.get_constant('AF_') types = self.get_constant('SOCK_') protocols = self.get_constant('IPPROTO_') - print 'Family :', families[sock.family] - print 'Type :', types[sock.type] - print 'Protocol:', protocols[sock.proto] + print('Family :', families[sock.family]) + print('Type :', types[sock.type]) + print('Protocol:', protocols[sock.proto]) def writeSSLCert(self, path, sslCert): certBegin = sslCert[ : 27] @@ -230,6 +246,7 @@ def writeSSLCert(self, path, sslCert): #TODO if 27 > len -25 ? certContent = sslCert[27 : -25] certContentList = [] + for i in range(0, len(certContent), 64): line = certContent[i : i + 64] certContentList.append(line + "\n") @@ -239,15 +256,12 @@ def writeSSLCert(self, path, sslCert): with open(path, "w") as certfile: certfile.write(certDealt) - - - def request(self): """ Read xml file """ try: xmlHandler = XmlHandler(self.config) except: - print "xml read error" + print("xml read error") sys.exit() path, timestamp, offset = xmlHandler.read() @@ -258,7 +272,7 @@ def request(self): host,port = serverInfo.strip("'").split() """ Write the ssl certificate """ - certpath = "/tmp/cert.pem" + certpath = "./cert.pem" sslCert = chirp.getJobAttrWait("SSLCert", None, interval, maxtries).strip("'") self.writeSSLCert(certpath, sslCert) @@ -268,43 +282,47 @@ def request(self): sockSSL = ssl.wrap_socket(sock, ca_certs = certpath, cert_reqs = ssl.CERT_REQUIRED) + """ Send data """ try: - """ Send data """ + + """ get amount of data to receive """ message = "%s,%s" % (timestamp, offset) sockSSL.sendall(message) - amount_received = 0 rec = sockSSL.recv(64) amount = int(rec) - sockSSL.sendall("KUNBEGIN") - - dataComp = "" + amount_received = 0 + + """ receive data """ + sockSSL.sendall("BEGIN") + strAdded = "" while amount_received < int(amount): data = sockSSL.recv(min(4096, int(amount) - amount_received)) - dataComp += data + strAdded += data amount_received += len(data) + sockSSL.sendall("END") - strAdded, timestamp, offset = dataComp.split("KUNSIGN") + """ receive new timestamp and offset """ + data = sockSSL.recv(1024) + timestamp, offset = data.split(",") + """ write data to log, write timestamp and offset to xml file """ if not amount_received < amount: with open(path, "a") as output: output.write(strAdded) - #if timestamp and offset: - #xmlHandler.write(timestamp, offset, self.config) - except: - sockSSL.sendall("KUNSTOP") - print "amount value error" - raise + if timestamp and offset: + xmlHandler.write(timestamp, offset, self.config) + except Exception, e: + sockSSL.sendall("STOP") + traceback.print_exc() finally: - print 'closing socket' + print('closing socket') sockSSL.close() sock.close() - def usage(): print("sslMain.py -l -p -c ") - def main(argv): if len(sys.argv) < 6: usage() From 60439de29656d8848b7f5ba791d64b1719c31d33 Mon Sep 17 00:00:00 2001 From: kun Date: Tue, 2 Jun 2015 16:10:35 +0800 Subject: [PATCH 05/37] add scp, remove netcat --- analyzer/analyzer.py | 2 +- analyzer/client.py | 13 ++++++------- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/analyzer/analyzer.py b/analyzer/analyzer.py index 472df71..872e056 100755 --- a/analyzer/analyzer.py +++ b/analyzer/analyzer.py @@ -15,7 +15,7 @@ def match(self, reg, strToMatch, result): def combi(self, strToCombi, tool): strArray = strToCombi.split(',')[1:] print strArray - if tool == "netcat": + if tool == "scp": print(strArray[len(strArray) - 1]) datasize = strArray[len(strArray) - 1] strArray[len(strArray) - 1] = str(float(datasize) / 1024) diff --git a/analyzer/client.py b/analyzer/client.py index a68da24..b181031 100755 --- a/analyzer/client.py +++ b/analyzer/client.py @@ -6,10 +6,6 @@ import sys,getopt class Client: - #uriLog = '/home/idpl/results/idpl.cnic/b2c.log' - #uriTime = '/home/kunq/DataAccessor0.12/timeRead.txt' - #uriLog = '/home/kun/GraduationProject/b2c.log' - #uriTime = '/home/kun/GraduationProject/DataAccessor0.12/timeRead.txt' uriLog = "" uriTime = "" shellPath = "" @@ -35,8 +31,11 @@ def readLog(self, uri): def closeFile(self): self.sourceFile.close - def combi(self, result, reg): - return self.shellPath + "post_" + reg + "_time.sh" + " " + result + " USERNAME=username PASSWORD=password HOSTNAME=hostname:port" + def combi(self, result, tool): + if tool == "iperf": + return self.shellPath + "post_iperf_time.sh " + result + " USERNAME=username PASSWORD=password HOSTNAME=hostname:port" + elif tool == "netcat" or tool == "scp": + return self.shellPath + "post_netcat_time.sh " + result + " USERNAME=username PASSWORD=password HOSTNAME=hostname:port" def excuteShell(self, result): output = os.popen(result) @@ -69,7 +68,7 @@ def main(self, analyzer): isFinished = False isNewTime = True self.getOptions() - tools = ["iperf", "netcat"] + tools = ["iperf", "scp"] if not os.path.exists(self.uriTime): print('WARN! Create a timeRead file!') From 66d775a7a44d175a1ae5a97325790e16bc6222ac Mon Sep 17 00:00:00 2001 From: kun Date: Tue, 2 Jun 2015 17:31:02 +0800 Subject: [PATCH 06/37] code review --- analyzer/analyzer.py | 72 +++++++++++++++++++------------------------- analyzer/client.py | 53 +++++++++++++++++++++----------- 2 files changed, 66 insertions(+), 59 deletions(-) diff --git a/analyzer/analyzer.py b/analyzer/analyzer.py index 872e056..c5cdca1 100755 --- a/analyzer/analyzer.py +++ b/analyzer/analyzer.py @@ -2,47 +2,49 @@ import re class Analyzer: - #regOfIperf = "'iperf.*'" - def match(self, reg, strToMatch, result): - pattern = re.compile(reg) - match = pattern.search(strToMatch) - if match: - result = match.group()[1:(len(result) - 1)] - #print("bingo! " + result) - return True, result - return False,result + """ use regular expression to judge if a line matches """ + def match(self, reg, strToMatch, result): + pattern = re.compile(reg) + match = pattern.search(strToMatch) + if match: + result = match.group()[1:-1] + return True, result + return False,result - def combi(self, strToCombi, tool): + """ combine bandwidth into a line """ + def combi(self, strToCombi, tool): strArray = strToCombi.split(',')[1:] - print strArray + + """ transform the unit of datasize in scp from B to KB """ if tool == "scp": - print(strArray[len(strArray) - 1]) - datasize = strArray[len(strArray) - 1] - strArray[len(strArray) - 1] = str(float(datasize) / 1024) - print(strArray[len(strArray) - 1]) + datasize = strArray[-1] + strArray[-1] = str(float(datasize) / 1024) + + """ remove the point whose bandwidth is 0 """ if (not self.deal(strArray)): - return False, '' - #print(strArray) - seq = ' ' - strToCombi = seq.join(strArray) - #print(strToCombi) + return [False, ''] + + strToCombi = ' '.join(strArray) return [True, strToCombi] - def deal(self, strArray): - bandwidth = float('%0.2f'%((float(strArray[len(strArray) - 1]) * 1024 * 8) / float(strArray[len(strArray) - 2]))) - if (abs(bandwidth) <= 0.000001): - return False - strArray.append(str(bandwidth)) - return True - #print(bandwidth) + """ compute the bandwidth """ + def deal(self, strArray): + bandwidth = float('%0.2f'%((float(strArray[-1]) * 1024 * 8) / float(strArray[-2]))) + + """ remove the point whose bandwidth is 0 """ + if (abs(bandwidth) <= 0.000001): + return False - def analyze(self, strToMatch, tools): + strArray.append(str(bandwidth)) + return True + + """ analyze a line """ + def analyze(self, strToMatch, tools): result = '' for tool in tools: reg = "'" + tool + ".*'" matchResult = self.match(reg, strToMatch, result) if matchResult[0]: - print(matchResult[1]) resultSet = self.combi(matchResult[1], tool) resultSet.append(tool) break @@ -50,15 +52,3 @@ def analyze(self, strToMatch, tools): resultSet = [False, result, tool] return resultSet - - - -''' -file = open("D:\\Homework\\GraduationProject\\analyzer\\placement.log") -for line in file: - print(line) -file.close - -analyzer = Analyzer() -analyzer.analyze() -''' diff --git a/analyzer/client.py b/analyzer/client.py index b181031..8236ba9 100755 --- a/analyzer/client.py +++ b/analyzer/client.py @@ -6,17 +6,26 @@ import sys,getopt class Client: - uriLog = "" - uriTime = "" - shellPath = "" - sorceFile = [] + def __init__(self): + self.uriLog = "" + self.uriTime = "" + self.shellPath = "" + self.sourceFile = [] + + def usage(self): + print("client.py -l -t -s ") def getOptions(self): + + if len(sys.argv) < 7: + self.usage() + sys.exit() + opts, args = getopt.getopt(sys.argv[1:], "hl:t:s:", ["help", "log=", "timeStamp=", "shellScript="]) for op,value in opts: if op in ("-h", "--help"): - print("client.py -l -t -s ") - sys.exit(1) + self.usage() + sys.exit() elif op in ("-l", "--log"): self.uriLog = value elif op in ("-t", "--timeStamp"): @@ -24,40 +33,48 @@ def getOptions(self): elif op in ("-s", "--shellScript"): self.shellPath = value + """ read the log rotated """ def readLog(self, uri): + #TODO with open(uri) as self.sourceFile: return self.sourceFile.readlines(), True def closeFile(self): self.sourceFile.close + """ choose the corresponding shell to insert into database """ def combi(self, result, tool): if tool == "iperf": return self.shellPath + "post_iperf_time.sh " + result + " USERNAME=username PASSWORD=password HOSTNAME=hostname:port" elif tool == "netcat" or tool == "scp": return self.shellPath + "post_netcat_time.sh " + result + " USERNAME=username PASSWORD=password HOSTNAME=hostname:port" - + + """ insert data into database """ def excuteShell(self, result): output = os.popen(result) - #print(output.read()) + """ check the data if inserted + if timestmap in the line analyzed now less than timestamp last, indicate the data is inserted + else if timestamps are equal, if tool is equal, indicate the data is inserted + else the data has not inserted""" def check(self, result, timeR, offset): resultArray = result[1].split(' ') - if(float(resultArray[len(resultArray) - offset]) < float(timeR[1])): + if(float(resultArray[-offset]) < float(timeR[1])): return True - elif(abs(float(resultArray[len(resultArray) - offset]) - float(timeR[1])) < 0.000001): + elif(abs(float(resultArray[-offset]) - float(timeR[1])) < 0.000001): return result[2] == timeR[0] return False def splitStr(self, string, char, offset): stringArray = string.split(char) - return stringArray[len(stringArray) - offset] - + return stringArray[-offset] + + """ get the timestamp last read to """ def readTimeRead(self, uri): try: timeReadFile = open(uri) except: - print "file not exists!" + print "timestamp file not exists!" timeRead = timeReadFile.read().strip("\n") timeReadFile.close return timeRead @@ -78,11 +95,12 @@ def main(self, analyzer): timeR = timeRead.split(",") timeRNew = timeRead + """ analyze log """ while(not isFinished): + #TODO read log rotated fileLines, isFinished = self.readLog(self.uriLog) - for i in range (len(fileLines) - 1, 0, -1): - #print(line) + for i in range (len(fileLines)-1, 0, -1): result = analyzer.analyze(fileLines[i], tools) if result[0]: if self.check(result, timeR, offset): @@ -93,12 +111,11 @@ def main(self, analyzer): timestampNew = self.splitStr(result[1], ' ', offset) timeRNew = result[2] + "," + timestampNew isNewTime = False - print self.combi(result[1], result[2]) - #self.excuteShell(self.combi(result[1], result[2])) + #print self.combi(result[1], result[2]) + self.excuteShell(self.combi(result[1], result[2])) self.closeFile() with open(self.uriTime, 'w') as timeReadFile: - #print(timeRNew) timeReadFile.write(timeRNew) From ad0f4c9a57d4a960b864985c03482a6731a85f03 Mon Sep 17 00:00:00 2001 From: kun Date: Mon, 8 Jun 2015 10:55:16 +0800 Subject: [PATCH 07/37] add some prompts --- synchronizer/sslMover.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/synchronizer/sslMover.py b/synchronizer/sslMover.py index 7eb7caa..28c92d9 100755 --- a/synchronizer/sslMover.py +++ b/synchronizer/sslMover.py @@ -62,6 +62,7 @@ def chooseLinesInAFile(self, fileLines, reg, timestamp, strAdded, isTimeReached) """ get all lines in a transfer """ def chooseLines(self, timestamp, offsetL, path): + print("extract data") self.fileUri = path self.offsetLast = offsetL reg = "'iperf.*'" @@ -130,12 +131,14 @@ def changeFlag(self): def serve(self): """ create an x509 cert and an rsa private key """ + print("create an ssl certificate and a private key") path = "./" certpath = "%scert.pem" % path keypath = "%skey.pem" % path os.popen("echo '\n\n\n\n\n\n\n' | openssl req -newkey rsa:1024 -x509 -days 365 -nodes -out %s -keyout %s" % (certpath, keypath)) """ transfer SSL certificate to client via chirp""" + print("send certificate to client") certStr = "" with open(certpath) as cert: for line in cert.readlines(): @@ -143,6 +146,7 @@ def serve(self): chirp.setJobAttr("SSLCert", "'%s'" % certStr) """ create a socket""" + print("create a sockect connection") sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: sock.bind(("", int(self.port))) @@ -152,10 +156,12 @@ def serve(self): """ wait to connect from client """ sock.listen(1) + print("set SSLServer chirp") start_new_thread(self.changeFlag, ()) conn, addr = sock.accept() """ wrap socket via ssl """ + print("transaction with client") conn_ssl = ssl.wrap_socket(conn, server_side = True, certfile = certpath, keyfile = keypath) try: self.commuWithClient(conn_ssl) @@ -166,7 +172,7 @@ def serve(self): finally: sock.close() - print("socket close now !") + print("socket close") chirp.setJobAttr("SSLServer", None) chirp.setJobAttr("SSLCert", None) sys.exit() @@ -260,23 +266,27 @@ def request(self): """ Read xml file """ try: xmlHandler = XmlHandler(self.config) + print("read xml config file") except: print("xml read error") sys.exit() path, timestamp, offset = xmlHandler.read() """ Get host and port from chirp """ + print("get SSLServer chirp") interval = 5 maxtries = 12*3 serverInfo = chirp.getJobAttrWait("SSLServer",None,interval, maxtries) host,port = serverInfo.strip("'").split() """ Write the ssl certificate """ + print("get ssl certificate from server") certpath = "./cert.pem" sslCert = chirp.getJobAttrWait("SSLCert", None, interval, maxtries).strip("'") self.writeSSLCert(certpath, sslCert) """ Create a TCP/IP socket with SSL """ + print("create a connection with ssl") sock = socket.create_connection((host, int(port))) self.get_constants(sock) sockSSL = ssl.wrap_socket(sock, ca_certs = certpath, cert_reqs = ssl.CERT_REQUIRED) @@ -286,6 +296,7 @@ def request(self): try: """ get amount of data to receive """ + print("begin to get data") message = "%s,%s" % (timestamp, offset) sockSSL.sendall(message) rec = sockSSL.recv(64) @@ -308,8 +319,10 @@ def request(self): """ write data to log, write timestamp and offset to xml file """ if not amount_received < amount: with open(path, "a") as output: + print("update log") output.write(strAdded) if timestamp and offset: + print("update config file") xmlHandler.write(timestamp, offset, self.config) except Exception, e: sockSSL.sendall("STOP") @@ -346,10 +359,12 @@ def main(argv): config = arg if int(os.environ['_CONDOR_PROCNO']) == 0: + print("client start") client = Client(config) client.request() else: + print("server start") chirp.setJobAttr("SSLServer",None) chirp.setJobAttr("SSLCert", None) server = Server(log_path, port) From 2d9d725e801efa77b0a28159f1779cd11a4fe5f9 Mon Sep 17 00:00:00 2001 From: kun Date: Mon, 8 Jun 2015 16:42:00 +0800 Subject: [PATCH 08/37] redirect output to log, rm certificate and private key file of ssl after transmission --- synchronizer/sslMain.py | 7 ++--- synchronizer/sslMover.py | 63 ++++++++++++++++++++++++++-------------- 2 files changed, 45 insertions(+), 25 deletions(-) diff --git a/synchronizer/sslMain.py b/synchronizer/sslMain.py index 64a32f9..9779dd1 100755 --- a/synchronizer/sslMain.py +++ b/synchronizer/sslMain.py @@ -5,7 +5,6 @@ from IDPLException import * import sslMover - ## ***************************** ## main routine ## ***************************** @@ -41,12 +40,12 @@ def usage(): elif opt in ("-c", "--config"): config = arg - resultcode,output,err=TimedExec.runTimedCmd(timeout,[sslexe, "-l", log_path, "-p", port, "-c", config]) -print ("".join(output)) +sys.stdout.write("output: %s" % "".join(output)) +sys.stderr.write("err: %s" % "".join(err)) if resultcode < 0: side = int(os.environ['_CONDOR_PROCNO']) - print ("Timeout! Result code %d" % resultcode) + sys.stderr.write("Timeout! Result code %d" % resultcode) if side == 0: raise TimeOutException("client") else: diff --git a/synchronizer/sslMover.py b/synchronizer/sslMover.py index 28c92d9..8f4fbb0 100755 --- a/synchronizer/sslMover.py +++ b/synchronizer/sslMover.py @@ -62,7 +62,8 @@ def chooseLinesInAFile(self, fileLines, reg, timestamp, strAdded, isTimeReached) """ get all lines in a transfer """ def chooseLines(self, timestamp, offsetL, path): - print("extract data") + iam = "server" + ulog(iam, "extract data") self.fileUri = path self.offsetLast = offsetL reg = "'iperf.*'" @@ -72,6 +73,7 @@ def chooseLines(self, timestamp, offsetL, path): suffix = 0 if not os.path.exists(fileUriNow): + ulog(iam, "file not exist!") print("file not exist!") sys.exit(0) @@ -93,7 +95,8 @@ def __init__(self, path, port): self.path = path self.host = socket.getfqdn() self.port = port - + self.iam = "server" + def commuWithClient(self, conn): timestamp = "" timestampNew = "" @@ -131,14 +134,14 @@ def changeFlag(self): def serve(self): """ create an x509 cert and an rsa private key """ - print("create an ssl certificate and a private key") + ulog(self.iam, "create an ssl certificate and a private key") path = "./" certpath = "%scert.pem" % path keypath = "%skey.pem" % path os.popen("echo '\n\n\n\n\n\n\n' | openssl req -newkey rsa:1024 -x509 -days 365 -nodes -out %s -keyout %s" % (certpath, keypath)) """ transfer SSL certificate to client via chirp""" - print("send certificate to client") + ulog(self.iam, "send certificate to client") certStr = "" with open(certpath) as cert: for line in cert.readlines(): @@ -146,35 +149,41 @@ def serve(self): chirp.setJobAttr("SSLCert", "'%s'" % certStr) """ create a socket""" - print("create a sockect connection") + ulog(self.iam, "create a sockect connection") sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: sock.bind(("", int(self.port))) except socket.error, msg: + ulog(self.iam, "Bind failed. Error Code: %s Message %s" % (str(msg[0]), msg[1])) print("Bind failed. Error Code: %s Message %s" % (str(msg[0]), msg[1])) sys.exit() """ wait to connect from client """ sock.listen(1) - print("set SSLServer chirp") + ulog(self.iam, "set SSLServer chirp") start_new_thread(self.changeFlag, ()) conn, addr = sock.accept() """ wrap socket via ssl """ - print("transaction with client") + ulog(self.iam, "transaction with client") conn_ssl = ssl.wrap_socket(conn, server_side = True, certfile = certpath, keyfile = keypath) try: self.commuWithClient(conn_ssl) except TransmissionException, trans: + ulog(self.iam, "%s" % trans.msg) print("%s" % trans.msg) traceback.print_exc() finally: sock.close() - print("socket close") + ulog(self.iam, "socket close") chirp.setJobAttr("SSLServer", None) chirp.setJobAttr("SSLCert", None) + if os.path.exists(certpath): + os.remove(certpath) + if os.path.exists(keypath): + os.remove(keypath) sys.exit() @@ -185,11 +194,13 @@ def __init__(self, xmlfile): def readXml(self, in_path): """ get the tree of xml file """ if not os.path.exists(in_path): + chirp.ulog("client : there is no such file: %s" % in_path) print("there is no such file: %s" % in_path) sys.exit() try: tree = ET.parse(in_path) except : + chirp.ulog("client : tree parse error") print("tree parse error") return tree @@ -230,6 +241,7 @@ class Client: def __init__(self,config): self.config = config + self.iam = "client" def get_constant(self, prefix): """Create a dictionary mapping socket module constants to their names.""" @@ -242,9 +254,9 @@ def get_constants(self, sock): types = self.get_constant('SOCK_') protocols = self.get_constant('IPPROTO_') - print('Family :', families[sock.family]) - print('Type :', types[sock.type]) - print('Protocol:', protocols[sock.proto]) + ulog(self.iam, 'Family : %s' % families[sock.family]) + ulog(self.iam, 'Type : %s' % types[sock.type]) + ulog(self.iam, 'Protocol: %s' % protocols[sock.proto]) def writeSSLCert(self, path, sslCert): certBegin = sslCert[ : 27] @@ -266,27 +278,28 @@ def request(self): """ Read xml file """ try: xmlHandler = XmlHandler(self.config) - print("read xml config file") + ulog(self.iam, "read xml config file") except: + ulog(self.iam, "xml read error") print("xml read error") sys.exit() path, timestamp, offset = xmlHandler.read() """ Get host and port from chirp """ - print("get SSLServer chirp") + ulog(self.iam, "get SSLServer chirp") interval = 5 maxtries = 12*3 serverInfo = chirp.getJobAttrWait("SSLServer",None,interval, maxtries) host,port = serverInfo.strip("'").split() """ Write the ssl certificate """ - print("get ssl certificate from server") + ulog(self.iam, "get ssl certificate from server") certpath = "./cert.pem" sslCert = chirp.getJobAttrWait("SSLCert", None, interval, maxtries).strip("'") self.writeSSLCert(certpath, sslCert) """ Create a TCP/IP socket with SSL """ - print("create a connection with ssl") + ulog(self.iam, "create a connection with ssl") sock = socket.create_connection((host, int(port))) self.get_constants(sock) sockSSL = ssl.wrap_socket(sock, ca_certs = certpath, cert_reqs = ssl.CERT_REQUIRED) @@ -296,7 +309,7 @@ def request(self): try: """ get amount of data to receive """ - print("begin to get data") + ulog(self.iam, "begin to get data") message = "%s,%s" % (timestamp, offset) sockSSL.sendall(message) rec = sockSSL.recv(64) @@ -319,19 +332,25 @@ def request(self): """ write data to log, write timestamp and offset to xml file """ if not amount_received < amount: with open(path, "a") as output: - print("update log") + ulog(self.iam, "update log") output.write(strAdded) if timestamp and offset: - print("update config file") + ulog(self.iam, "update config file") xmlHandler.write(timestamp, offset, self.config) except Exception, e: sockSSL.sendall("STOP") traceback.print_exc() finally: - print('closing socket') + ulog(self.iam, 'closing socket') sockSSL.close() sock.close() + if os.path.exists(certpath): + os.remove(certpath) + +def ulog(who, message): + logMessage = "%s : %s" % (who, message) + chirp.ulog(logMessage) def usage(): print("sslMain.py -l -p -c ") @@ -359,16 +378,18 @@ def main(argv): config = arg if int(os.environ['_CONDOR_PROCNO']) == 0: - print("client start") + chirp.ulog("client start") client = Client(config) client.request() else: - print("server start") + chirp.ulog("server start") chirp.setJobAttr("SSLServer",None) chirp.setJobAttr("SSLCert", None) server = Server(log_path, port) server.serve() + chirp.setJobAttr("SSLServer", None) + chirp.setJobAttr("SSLCert", None) if __name__ == '__main__': main(sys.argv[1:]) From 1b7efbe7f590c764ae45a34b260fea79278a2d87 Mon Sep 17 00:00:00 2001 From: kun Date: Fri, 12 Jun 2015 19:09:19 +0800 Subject: [PATCH 09/37] rm xml read --- synchronizer/sslMain.py | 13 +++++----- synchronizer/sslMover.py | 48 ++++++++++++++++++++++-------------- synchronizer/ssl_move-submit | 6 ++--- 3 files changed, 39 insertions(+), 28 deletions(-) diff --git a/synchronizer/sslMain.py b/synchronizer/sslMain.py index 9779dd1..d5d4a59 100755 --- a/synchronizer/sslMain.py +++ b/synchronizer/sslMain.py @@ -14,17 +14,17 @@ sslexe = "./sslMover.py" log_path = "" port = "" -config = "" +syn_log = "" def usage(): - print("sslMain.py -l -p -c ") + print("sslMain.py -l -p -s ") if len(sys.argv) < 7: usage() sys.exit() try: - opts, args = getopt.getopt(sys.argv[1:], "hl:p:c:", ["help", "log_path=", "port=", "config="]) + opts, args = getopt.getopt(sys.argv[1:], "hl:p:s:", ["help", "log_path=", "port=", "syn_log="]) except getopt.GetoptError: usage() sys.exit() @@ -37,10 +37,11 @@ def usage(): log_path = arg elif opt in ("-p", "--port"): port = arg - elif opt in ("-c", "--config"): - config = arg + elif opt in ("-s", "--syn_log"): + syn_log = arg -resultcode,output,err=TimedExec.runTimedCmd(timeout,[sslexe, "-l", log_path, "-p", port, "-c", config]) + +resultcode,output,err=TimedExec.runTimedCmd(timeout,[sslexe, "-l", log_path, "-p", port, "-s", syn_log]) sys.stdout.write("output: %s" % "".join(output)) sys.stderr.write("err: %s" % "".join(err)) if resultcode < 0: diff --git a/synchronizer/sslMover.py b/synchronizer/sslMover.py index 8f4fbb0..b6d8bec 100755 --- a/synchronizer/sslMover.py +++ b/synchronizer/sslMover.py @@ -239,8 +239,8 @@ def write(self, newTimestamp, newOffset, xmlfile): class Client: - def __init__(self,config): - self.config = config + def __init__(self,syn_log): + self.syn_log = syn_log self.iam = "client" def get_constant(self, prefix): @@ -274,17 +274,26 @@ def writeSSLCert(self, path, sslCert): with open(path, "w") as certfile: certfile.write(certDealt) + def getTimestampOffset(self): + reg = "'iperf.*'" + pattern = re.compile(reg) + timestamp = "0" + offset = 0 + with open(self.syn_log) as s_log: + fileLines = s_log.readlines() + for line in fileLines[::-1]: + match = pattern.search(line) + if match: + timestamp = match.group()[1:-1].split(",")[3] + break + else: + offset += 1 + return timestamp, offset + def request(self): - """ Read xml file """ - try: - xmlHandler = XmlHandler(self.config) - ulog(self.iam, "read xml config file") - except: - ulog(self.iam, "xml read error") - print("xml read error") - sys.exit() - path, timestamp, offset = xmlHandler.read() + timestamp, offset = self.getTimestampOffset() + """ Get host and port from chirp """ ulog(self.iam, "get SSLServer chirp") interval = 5 @@ -331,7 +340,7 @@ def request(self): """ write data to log, write timestamp and offset to xml file """ if not amount_received < amount: - with open(path, "a") as output: + with open(self.syn_log, "a") as output: ulog(self.iam, "update log") output.write(strAdded) if timestamp and offset: @@ -353,15 +362,16 @@ def ulog(who, message): chirp.ulog(logMessage) def usage(): - print("sslMain.py -l -p -c ") + print("sslMain.py -l -p -s ") def main(argv): - if len(sys.argv) < 6: + print "argvs are: " + "".join(argv) + if len(argv) < 6: usage() sys.exit() - + try: - opts, args = getopt.getopt(argv, "hl:p:c:", ["help", "log_path=", "port=", "config="]) + opts, args = getopt.getopt(argv, "hl:p:s:", ["help", "log_path=", "port=", "syn_log="]) except getopt.GetoptError: usage() sys.exit() @@ -374,12 +384,12 @@ def main(argv): log_path = arg elif opt in ("-p", "--port"): port = arg - elif opt in ("-c", "--config"): - config = arg + elif opt in ("-s", "--syn_log"): + syn_log = arg if int(os.environ['_CONDOR_PROCNO']) == 0: chirp.ulog("client start") - client = Client(config) + client = Client(syn_log) client.request() else: diff --git a/synchronizer/ssl_move-submit b/synchronizer/ssl_move-submit index c7d6682..bd0293c 100644 --- a/synchronizer/ssl_move-submit +++ b/synchronizer/ssl_move-submit @@ -15,8 +15,8 @@ DST_HOST=JSI-iDPL02 #log_path=/home/phil/placement/placement2.log log_path=/tmp/testbykun/test.txt port=8888 -config=/tmp/testbykun/client_config.xml -#config=/home/idpl/remoteLogs/komatsu/client_config.xml +syn_log=/tmp/testbykun/test1.txt +#syn_log=/home/idpl/remoteLogs/komatsu/k2b.log ### Crondor Settings # A promise that jobs will not run more often than this (in seconds) @@ -40,7 +40,7 @@ config=/tmp/testbykun/client_config.xml # 3. Receiving host # 4. Location to write file (on the receiving host) #arguments = $(SRC_HOST) $(SRC_PATH) $(DST_HOST) $(DST_PATH) $(LEASE) -arguments= -l $(log_path) -p $(port) -c $(config) +arguments= -l $(log_path) -p $(port) -s $(syn_log) ## Enable Chirp +WantIOProxy = true From 64e64b23c3f70987846e61fd06aae68d600a6919 Mon Sep 17 00:00:00 2001 From: kun Date: Sat, 13 Jun 2015 16:56:29 +0800 Subject: [PATCH 10/37] rm writing xml file --- synchronizer/sslMover.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/synchronizer/sslMover.py b/synchronizer/sslMover.py index b6d8bec..055dd86 100755 --- a/synchronizer/sslMover.py +++ b/synchronizer/sslMover.py @@ -343,9 +343,6 @@ def request(self): with open(self.syn_log, "a") as output: ulog(self.iam, "update log") output.write(strAdded) - if timestamp and offset: - ulog(self.iam, "update config file") - xmlHandler.write(timestamp, offset, self.config) except Exception, e: sockSSL.sendall("STOP") traceback.print_exc() From 5b8cc34f5939b6c8011c7dee7063583a4dce61ac Mon Sep 17 00:00:00 2001 From: kun Date: Sat, 13 Jun 2015 17:23:31 +0800 Subject: [PATCH 11/37] rm sending timestamp, offset and receiving --- synchronizer/sslMain.py | 1 - synchronizer/sslMover.py | 7 ------- 2 files changed, 8 deletions(-) diff --git a/synchronizer/sslMain.py b/synchronizer/sslMain.py index d5d4a59..ddc4855 100755 --- a/synchronizer/sslMain.py +++ b/synchronizer/sslMain.py @@ -40,7 +40,6 @@ def usage(): elif opt in ("-s", "--syn_log"): syn_log = arg - resultcode,output,err=TimedExec.runTimedCmd(timeout,[sslexe, "-l", log_path, "-p", port, "-s", syn_log]) sys.stdout.write("output: %s" % "".join(output)) sys.stderr.write("err: %s" % "".join(err)) diff --git a/synchronizer/sslMover.py b/synchronizer/sslMover.py index 055dd86..7b294a5 100755 --- a/synchronizer/sslMover.py +++ b/synchronizer/sslMover.py @@ -122,9 +122,6 @@ def commuWithClient(self, conn): elif self.match("END", data): break - """ send the new timestamp and offset """ - conn.sendall("%s,%s" % (timestampNew, offsetNew)) - def match(self, reg, strToMatch): return re.compile(reg).match(strToMatch) @@ -334,10 +331,6 @@ def request(self): amount_received += len(data) sockSSL.sendall("END") - """ receive new timestamp and offset """ - data = sockSSL.recv(1024) - timestamp, offset = data.split(",") - """ write data to log, write timestamp and offset to xml file """ if not amount_received < amount: with open(self.syn_log, "a") as output: From 88dd06d3acd176777d21943aaceed0e77e827345 Mon Sep 17 00:00:00 2001 From: kun Date: Wed, 17 Jun 2015 18:49:04 +0800 Subject: [PATCH 12/37] add md5 test --- synchronizer/sslMain.py | 5 +++-- synchronizer/sslMover.py | 18 +++++++++++++++--- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/synchronizer/sslMain.py b/synchronizer/sslMain.py index ddc4855..74a1ba4 100755 --- a/synchronizer/sslMain.py +++ b/synchronizer/sslMain.py @@ -40,9 +40,10 @@ def usage(): elif opt in ("-s", "--syn_log"): syn_log = arg + resultcode,output,err=TimedExec.runTimedCmd(timeout,[sslexe, "-l", log_path, "-p", port, "-s", syn_log]) -sys.stdout.write("output: %s" % "".join(output)) -sys.stderr.write("err: %s" % "".join(err)) +sys.stdout.write("output: %s" % " ".join(output)) +sys.stderr.write("err: %s" % " ".join(err)) if resultcode < 0: side = int(os.environ['_CONDOR_PROCNO']) sys.stderr.write("Timeout! Result code %d" % resultcode) diff --git a/synchronizer/sslMover.py b/synchronizer/sslMover.py index 7b294a5..bc44395 100755 --- a/synchronizer/sslMover.py +++ b/synchronizer/sslMover.py @@ -3,7 +3,7 @@ import socket,ssl import sys,os,traceback -import time +import time, hashlib from thread import * import re, getopt import CondorTools @@ -112,6 +112,9 @@ def commuWithClient(self, conn): strAdded, timestampNew, offsetNew = FileReader().chooseLines(float(timestamp), int(offset), self.path) dataToSend = "%s" % strAdded conn.sendall(str(len(dataToSend))) + md5OfData = hashlib.md5() + md5OfData.update(dataToSend) + chirp.setJobAttr("MD5OfData", "'%s'" % md5OfData.hexdigest()) elif self.match("BEGIN", data): conn.sendall(dataToSend) @@ -177,6 +180,7 @@ def serve(self): ulog(self.iam, "socket close") chirp.setJobAttr("SSLServer", None) chirp.setJobAttr("SSLCert", None) + chirp.setJobAttr("MD5OfData", None) if os.path.exists(certpath): os.remove(certpath) if os.path.exists(keypath): @@ -331,8 +335,14 @@ def request(self): amount_received += len(data) sockSSL.sendall("END") - """ write data to log, write timestamp and offset to xml file """ - if not amount_received < amount: + """ get md5 from server and generate md5 data received """ + md5FromServer = chirp.getJobAttrWait("MD5OfData", None, interval, maxtries).strip("'") + md5LocalGen = hashlib.md5() + md5LocalGen.update(strAdded) + md5Local = md5LocalGen.hexdigest() + + """ write data to log """ + if not amount_received < amount and md5FromServer == md5Local: with open(self.syn_log, "a") as output: ulog(self.iam, "update log") output.write(strAdded) @@ -386,10 +396,12 @@ def main(argv): chirp.ulog("server start") chirp.setJobAttr("SSLServer",None) chirp.setJobAttr("SSLCert", None) + chirp.setJobAttr("MD5OfData", None) server = Server(log_path, port) server.serve() chirp.setJobAttr("SSLServer", None) chirp.setJobAttr("SSLCert", None) + chirp.setJobAttr("MD5OfData", None) if __name__ == '__main__': main(sys.argv[1:]) From bb54999c3f8992e72418455243e2b8b7aeffdba3 Mon Sep 17 00:00:00 2001 From: kun Date: Thu, 18 Jun 2015 16:03:05 +0800 Subject: [PATCH 13/37] create syn_log if it doesn't exist --- synchronizer/sslMover.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/synchronizer/sslMover.py b/synchronizer/sslMover.py index bc44395..85e2588 100755 --- a/synchronizer/sslMover.py +++ b/synchronizer/sslMover.py @@ -1,8 +1,8 @@ #!/usr/bin/env python # encoding: UTF-8 -import socket,ssl -import sys,os,traceback +import socket, ssl +import sys, os, traceback import time, hashlib from thread import * import re, getopt @@ -280,6 +280,11 @@ def getTimestampOffset(self): pattern = re.compile(reg) timestamp = "0" offset = 0 + if not os.path.exists(self.syn_log): + f = open(self.syn_log, "w") + f.close + return timestamp, offset + with open(self.syn_log) as s_log: fileLines = s_log.readlines() for line in fileLines[::-1]: From a48d0ebec28717401849bcc516332057df4dca71 Mon Sep 17 00:00:00 2001 From: kun Date: Thu, 18 Jun 2015 17:58:47 +0800 Subject: [PATCH 14/37] modify submit file --- synchronizer/ssl_move-submit | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synchronizer/ssl_move-submit b/synchronizer/ssl_move-submit index bd0293c..63f5f73 100644 --- a/synchronizer/ssl_move-submit +++ b/synchronizer/ssl_move-submit @@ -28,7 +28,7 @@ syn_log=/tmp/testbykun/test1.txt #cron_window=60 # Try to run jobs on this schedule -#cron_minute=55 +#cron_minute=40,45 #cron_hour=11,23 # # Keep running the job From 5949df17092182b84fcb0e8320e138c38b75300c Mon Sep 17 00:00:00 2001 From: kun Date: Wed, 24 Jun 2015 22:12:16 +0800 Subject: [PATCH 15/37] use dagman to submit several similar jobs --- synchronizer/submit.dag | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 synchronizer/submit.dag diff --git a/synchronizer/submit.dag b/synchronizer/submit.dag new file mode 100644 index 0000000..26fc6e0 --- /dev/null +++ b/synchronizer/submit.dag @@ -0,0 +1,5 @@ +JOB A ssl_move-submit +JOB B ssl_move-submit +VARS A log_path="/tmp/testbykun/test.txt" syn_log="/tmp/testbykun/test1.txt" +VARS B log_path="/tmp/testbykun/k2b.txt" syn_log="/tmp/testbykun/k2b1.txt" +PARENT A CHILD B From 32dbf75517ac28672e6054e3fd370d3f4e99448c Mon Sep 17 00:00:00 2001 From: kun22kun Date: Thu, 25 Jun 2015 10:55:05 +0800 Subject: [PATCH 16/37] deal when no update --- synchronizer/sslMover.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/synchronizer/sslMover.py b/synchronizer/sslMover.py index 85e2588..04ae050 100755 --- a/synchronizer/sslMover.py +++ b/synchronizer/sslMover.py @@ -111,6 +111,14 @@ def commuWithClient(self, conn): timestamp, offset = data.split(',') strAdded, timestampNew, offsetNew = FileReader().chooseLines(float(timestamp), int(offset), self.path) dataToSend = "%s" % strAdded + lenOfData = len(dataToSend) + + """ nothing to update """ + if lenOfData == 0: + conn.sendall("NONE") + ulog(self.iam, "nothing to update") + break + conn.sendall(str(len(dataToSend))) md5OfData = hashlib.md5() md5OfData.update(dataToSend) @@ -328,6 +336,13 @@ def request(self): message = "%s,%s" % (timestamp, offset) sockSSL.sendall(message) rec = sockSSL.recv(64) + + """ nothing to update """ + if rec == "NONE": + sockSSL.close() + sock.close() + sys.exit() + amount = int(rec) amount_received = 0 @@ -370,7 +385,7 @@ def usage(): print("sslMain.py -l -p -s ") def main(argv): - print "argvs are: " + "".join(argv) + print "argvs are: " + " ".join(argv) if len(argv) < 6: usage() sys.exit() From 6647f84ddb1696aa0e2be157469719c63ecdf705 Mon Sep 17 00:00:00 2001 From: kun Date: Fri, 26 Jun 2015 14:40:54 +0800 Subject: [PATCH 17/37] rm xmlHandler, fix the bug of client md5 receive timeout --- synchronizer/sslMover.py | 53 ++-------------------------------------- 1 file changed, 2 insertions(+), 51 deletions(-) diff --git a/synchronizer/sslMover.py b/synchronizer/sslMover.py index 04ae050..ae67c7d 100755 --- a/synchronizer/sslMover.py +++ b/synchronizer/sslMover.py @@ -196,56 +196,6 @@ def serve(self): sys.exit() -class XmlHandler: - def __init__(self, xmlfile): - self.xmlTree = self.readXml(xmlfile) - - def readXml(self, in_path): - """ get the tree of xml file """ - if not os.path.exists(in_path): - chirp.ulog("client : there is no such file: %s" % in_path) - print("there is no such file: %s" % in_path) - sys.exit() - try: - tree = ET.parse(in_path) - except : - chirp.ulog("client : tree parse error") - print("tree parse error") - return tree - - def getNodes(self, tree): - root = tree.getroot() - return root.getchildren() - - def findNode(self, nodes, tag): - for node in nodes: - if node.tag == tag: - return node - - def getTexts(self, nodes, tags): - """ get the content in xml flie via tags """ - texts = [] - for tag in tags: - texts.append(self.findNode(nodes, tag).text) - return texts - - def read(self): - nodes = self.getNodes(self.xmlTree) - path, timestamp, offset= self.getTexts(nodes, ["path", "timestamp", "offset"]) - return path, timestamp, offset - - def writeXml(self, node, text): - node.text = text - - def setTexts(self, texts, tags): - nodes = self.getNodes(self.xmlTree) - for text, tag in zip(texts, tags): - self.writeXml(self.findNode(nodes, tag), text) - - def write(self, newTimestamp, newOffset, xmlfile): - self.setTexts([newTimestamp, newOffset], ["timestamp", "offset"]) - self.xmlTree.write(xmlfile, encoding="utf-8") - class Client: def __init__(self,syn_log): @@ -353,7 +303,6 @@ def request(self): data = sockSSL.recv(min(4096, int(amount) - amount_received)) strAdded += data amount_received += len(data) - sockSSL.sendall("END") """ get md5 from server and generate md5 data received """ md5FromServer = chirp.getJobAttrWait("MD5OfData", None, interval, maxtries).strip("'") @@ -361,6 +310,8 @@ def request(self): md5LocalGen.update(strAdded) md5Local = md5LocalGen.hexdigest() + sockSSL.sendall("END") + """ write data to log """ if not amount_received < amount and md5FromServer == md5Local: with open(self.syn_log, "a") as output: From 6c5ce9cca1f17c15cb20b8c7f92c8ca16fbdd452 Mon Sep 17 00:00:00 2001 From: kun Date: Sun, 5 Jul 2015 16:46:59 +0800 Subject: [PATCH 18/37] revise readme --- analyzer/readme | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/analyzer/readme b/analyzer/readme index 81512af..34b81af 100644 --- a/analyzer/readme +++ b/analyzer/readme @@ -1 +1,2 @@ -scripts to synchronize data on the other nodes +scripts to analyze and deal with data + From cc9c3afe737eb93a4b856983527aa74c4491c1c8 Mon Sep 17 00:00:00 2001 From: kun Date: Mon, 20 Jul 2015 15:06:46 +0800 Subject: [PATCH 19/37] add the dag cron file --- synchronizer/readme | 2 +- synchronizer/submit.dag.cron | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) create mode 100644 synchronizer/submit.dag.cron diff --git a/synchronizer/readme b/synchronizer/readme index 99c7fd7..81512af 100644 --- a/synchronizer/readme +++ b/synchronizer/readme @@ -1 +1 @@ -scripts to analyze and deal with data +scripts to synchronize data on the other nodes diff --git a/synchronizer/submit.dag.cron b/synchronizer/submit.dag.cron new file mode 100644 index 0000000..1288f50 --- /dev/null +++ b/synchronizer/submit.dag.cron @@ -0,0 +1,6 @@ +# run the dag job periodly +cron_minute = 55 +cron_hour = 11,23 +cron_window = 120 +on_exit_remove = false +universe = local From a2de14db47c28d573f25146944b735fb9f6acbd8 Mon Sep 17 00:00:00 2001 From: kun Date: Mon, 20 Jul 2015 16:19:53 +0800 Subject: [PATCH 20/37] move regular expression set to the submit file --- synchronizer/sslMain.py | 10 ++++++++-- synchronizer/sslMover.py | 25 ++++++++++++++++--------- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/synchronizer/sslMain.py b/synchronizer/sslMain.py index 74a1ba4..4bfea84 100755 --- a/synchronizer/sslMain.py +++ b/synchronizer/sslMain.py @@ -15,6 +15,7 @@ log_path = "" port = "" syn_log = "" +reg_exp = "" def usage(): print("sslMain.py -l -p -s ") @@ -24,7 +25,7 @@ def usage(): sys.exit() try: - opts, args = getopt.getopt(sys.argv[1:], "hl:p:s:", ["help", "log_path=", "port=", "syn_log="]) + opts, args = getopt.getopt(sys.argv[1:], "hl:p:s:r:", ["help", "log_path=", "port=", "syn_log=", "reg_exp"]) except getopt.GetoptError: usage() sys.exit() @@ -39,9 +40,14 @@ def usage(): port = arg elif opt in ("-s", "--syn_log"): syn_log = arg + elif opt in ("-r", "--reg_exp"): + reg_exp = arg +#sslMover.main(sys.argv[1:]) +if reg_exp == "": + reg_exp = "'.*writerecord:iperf.*'" -resultcode,output,err=TimedExec.runTimedCmd(timeout,[sslexe, "-l", log_path, "-p", port, "-s", syn_log]) +resultcode,output,err=TimedExec.runTimedCmd(timeout,[sslexe, "-l", log_path, "-p", port, "-s", syn_log, "-r", reg_exp]) sys.stdout.write("output: %s" % " ".join(output)) sys.stderr.write("err: %s" % " ".join(err)) if resultcode < 0: diff --git a/synchronizer/sslMover.py b/synchronizer/sslMover.py index ae67c7d..81adb27 100755 --- a/synchronizer/sslMover.py +++ b/synchronizer/sslMover.py @@ -61,12 +61,11 @@ def chooseLinesInAFile(self, fileLines, reg, timestamp, strAdded, isTimeReached) return strAdded, isTimeReached """ get all lines in a transfer """ - def chooseLines(self, timestamp, offsetL, path): + def chooseLines(self, timestamp, offsetL, path, reg): iam = "server" ulog(iam, "extract data") self.fileUri = path self.offsetLast = offsetL - reg = "'iperf.*'" isTimeReached = False strAdded = [] fileUriNow = self.fileUri @@ -91,10 +90,11 @@ def __init__(self, msg): self.msg = msg class Server: - def __init__(self, path, port): + def __init__(self, path, port, reg_exp): self.path = path self.host = socket.getfqdn() self.port = port + self.reg_exp = reg_exp self.iam = "server" def commuWithClient(self, conn): @@ -109,7 +109,7 @@ def commuWithClient(self, conn): if self.match(r"\d+(\.\d+)?,\d+", data): timestamp, offset = data.split(',') - strAdded, timestampNew, offsetNew = FileReader().chooseLines(float(timestamp), int(offset), self.path) + strAdded, timestampNew, offsetNew = FileReader().chooseLines(float(timestamp), int(offset), self.path, self.reg_exp) dataToSend = "%s" % strAdded lenOfData = len(dataToSend) @@ -198,8 +198,9 @@ def serve(self): class Client: - def __init__(self,syn_log): + def __init__(self, syn_log, reg_exp): self.syn_log = syn_log + self.reg_exp = reg_exp self.iam = "client" def get_constant(self, prefix): @@ -234,7 +235,7 @@ def writeSSLCert(self, path, sslCert): certfile.write(certDealt) def getTimestampOffset(self): - reg = "'iperf.*'" + reg = self.reg_exp pattern = re.compile(reg) timestamp = "0" offset = 0 @@ -342,11 +343,12 @@ def main(argv): sys.exit() try: - opts, args = getopt.getopt(argv, "hl:p:s:", ["help", "log_path=", "port=", "syn_log="]) + opts, args = getopt.getopt(argv, "hl:p:s:r:", ["help", "log_path=", "port=", "syn_log=", "reg_exp"]) except getopt.GetoptError: usage() sys.exit() + reg_exp = "" for opt, arg in opts: if opt in ("-h", "--help" ): usage() @@ -357,10 +359,15 @@ def main(argv): port = arg elif opt in ("-s", "--syn_log"): syn_log = arg + elif opt in ("-r", "--reg_exp"): + reg_exp = arg + + if reg_exp == "": + reg_exp = "'.*writerecord:iperf.*'" if int(os.environ['_CONDOR_PROCNO']) == 0: chirp.ulog("client start") - client = Client(syn_log) + client = Client(syn_log, reg_exp) client.request() else: @@ -368,7 +375,7 @@ def main(argv): chirp.setJobAttr("SSLServer",None) chirp.setJobAttr("SSLCert", None) chirp.setJobAttr("MD5OfData", None) - server = Server(log_path, port) + server = Server(log_path, port, reg_exp) server.serve() chirp.setJobAttr("SSLServer", None) chirp.setJobAttr("SSLCert", None) From 07e9686d3a589355493509f102edad768428570f Mon Sep 17 00:00:00 2001 From: kun Date: Tue, 21 Jul 2015 11:17:57 +0800 Subject: [PATCH 21/37] add reg_exp variable in submit file --- synchronizer/ssl_move-submit | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/synchronizer/ssl_move-submit b/synchronizer/ssl_move-submit index 63f5f73..e3a1bd7 100644 --- a/synchronizer/ssl_move-submit +++ b/synchronizer/ssl_move-submit @@ -13,9 +13,10 @@ SRC_HOST=JSI-iDPL01 #DST_HOST=komatsu.chtc.wisc.edu DST_HOST=JSI-iDPL02 #log_path=/home/phil/placement/placement2.log -log_path=/tmp/testbykun/test.txt +#log_path=/tmp/testbykun/test.txt port=8888 -syn_log=/tmp/testbykun/test1.txt +reg_exp='iperf.*' +#syn_log=/tmp/testbykun/test1.txt #syn_log=/home/idpl/remoteLogs/komatsu/k2b.log ### Crondor Settings @@ -40,7 +41,7 @@ syn_log=/tmp/testbykun/test1.txt # 3. Receiving host # 4. Location to write file (on the receiving host) #arguments = $(SRC_HOST) $(SRC_PATH) $(DST_HOST) $(DST_PATH) $(LEASE) -arguments= -l $(log_path) -p $(port) -s $(syn_log) +arguments= -l $(log_path) -p $(port) -s $(syn_log) -r $(reg_exp) ## Enable Chirp +WantIOProxy = true From eab52feccb573ba969e83ab9398e4d5060290db4 Mon Sep 17 00:00:00 2001 From: kun Date: Tue, 21 Jul 2015 11:23:09 +0800 Subject: [PATCH 22/37] modify 'usage()' --- synchronizer/sslMain.py | 3 +-- synchronizer/sslMover.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/synchronizer/sslMain.py b/synchronizer/sslMain.py index 4bfea84..ba70c95 100755 --- a/synchronizer/sslMain.py +++ b/synchronizer/sslMain.py @@ -18,7 +18,7 @@ reg_exp = "" def usage(): - print("sslMain.py -l -p -s ") + print("sslMain.py -l -p -s [-r ]") if len(sys.argv) < 7: usage() @@ -43,7 +43,6 @@ def usage(): elif opt in ("-r", "--reg_exp"): reg_exp = arg -#sslMover.main(sys.argv[1:]) if reg_exp == "": reg_exp = "'.*writerecord:iperf.*'" diff --git a/synchronizer/sslMover.py b/synchronizer/sslMover.py index 81adb27..6a3e41f 100755 --- a/synchronizer/sslMover.py +++ b/synchronizer/sslMover.py @@ -334,7 +334,7 @@ def ulog(who, message): chirp.ulog(logMessage) def usage(): - print("sslMain.py -l -p -s ") + print("sslMain.py -l -p -s [-r ]") def main(argv): print "argvs are: " + " ".join(argv) From 44216f2d90467735b21b5aaff3207c113eb868fd Mon Sep 17 00:00:00 2001 From: kun Date: Tue, 21 Jul 2015 20:17:19 +0800 Subject: [PATCH 23/37] modify openssl cmd --- synchronizer/sslMover.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synchronizer/sslMover.py b/synchronizer/sslMover.py index 6a3e41f..5e1c3d1 100755 --- a/synchronizer/sslMover.py +++ b/synchronizer/sslMover.py @@ -146,7 +146,7 @@ def serve(self): path = "./" certpath = "%scert.pem" % path keypath = "%skey.pem" % path - os.popen("echo '\n\n\n\n\n\n\n' | openssl req -newkey rsa:1024 -x509 -days 365 -nodes -out %s -keyout %s" % (certpath, keypath)) + os.popen("openssl req -newkey rsa:1024 -x509 -days 365 -nodes -out %s -keyout %s -batch" % (certpath, keypath)) """ transfer SSL certificate to client via chirp""" ulog(self.iam, "send certificate to client") From b33a71ae568d0dbb1b45fd8eaa7dce3f25558661 Mon Sep 17 00:00:00 2001 From: kun Date: Wed, 22 Jul 2015 16:48:24 +0800 Subject: [PATCH 24/37] repair debug of reg_exp addition --- synchronizer/sslMain.py | 3 ++- synchronizer/sslMover.py | 2 +- synchronizer/ssl_move-submit | 7 +------ 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/synchronizer/sslMain.py b/synchronizer/sslMain.py index ba70c95..ff0040d 100755 --- a/synchronizer/sslMain.py +++ b/synchronizer/sslMain.py @@ -25,7 +25,7 @@ def usage(): sys.exit() try: - opts, args = getopt.getopt(sys.argv[1:], "hl:p:s:r:", ["help", "log_path=", "port=", "syn_log=", "reg_exp"]) + opts, args = getopt.getopt(sys.argv[1:], "hl:p:s:r:", ["help", "log_path=", "port=", "syn_log=", "reg_exp="]) except getopt.GetoptError: usage() sys.exit() @@ -43,6 +43,7 @@ def usage(): elif opt in ("-r", "--reg_exp"): reg_exp = arg +#sslMover.main(sys.argv[1:]) if reg_exp == "": reg_exp = "'.*writerecord:iperf.*'" diff --git a/synchronizer/sslMover.py b/synchronizer/sslMover.py index 5e1c3d1..25c620e 100755 --- a/synchronizer/sslMover.py +++ b/synchronizer/sslMover.py @@ -343,7 +343,7 @@ def main(argv): sys.exit() try: - opts, args = getopt.getopt(argv, "hl:p:s:r:", ["help", "log_path=", "port=", "syn_log=", "reg_exp"]) + opts, args = getopt.getopt(argv, "hl:p:s:r:", ["help", "log_path=", "port=", "syn_log=", "reg_exp="]) except getopt.GetoptError: usage() sys.exit() diff --git a/synchronizer/ssl_move-submit b/synchronizer/ssl_move-submit index e3a1bd7..9e2bfd6 100644 --- a/synchronizer/ssl_move-submit +++ b/synchronizer/ssl_move-submit @@ -6,18 +6,13 @@ universe = parallel executable = sslMain.py +reg_exp='iperf.*' - -#SRC_HOST=mickey.buaa.edu.cn SRC_HOST=JSI-iDPL01 -#DST_HOST=komatsu.chtc.wisc.edu DST_HOST=JSI-iDPL02 -#log_path=/home/phil/placement/placement2.log #log_path=/tmp/testbykun/test.txt port=8888 -reg_exp='iperf.*' #syn_log=/tmp/testbykun/test1.txt -#syn_log=/home/idpl/remoteLogs/komatsu/k2b.log ### Crondor Settings # A promise that jobs will not run more often than this (in seconds) From af6f180d41aa654ae5116fcc6fd8cdb370f65cd5 Mon Sep 17 00:00:00 2001 From: kun22kun Date: Wed, 22 Jul 2015 16:53:21 +0800 Subject: [PATCH 25/37] revise default reg_exp --- synchronizer/ssl_move-submit | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synchronizer/ssl_move-submit b/synchronizer/ssl_move-submit index 9e2bfd6..ad2ab5f 100644 --- a/synchronizer/ssl_move-submit +++ b/synchronizer/ssl_move-submit @@ -6,7 +6,7 @@ universe = parallel executable = sslMain.py -reg_exp='iperf.*' +reg_exp='.*writerecord:iperf.*' SRC_HOST=JSI-iDPL01 DST_HOST=JSI-iDPL02 From 01364d9d5502b8c30f9ba417d4252db7e01edc81 Mon Sep 17 00:00:00 2001 From: kun22kun Date: Wed, 29 Jul 2015 15:39:28 +0800 Subject: [PATCH 26/37] add new shell and modify regular input way --- analyzer/analyzer.py | 15 ++++++++------- analyzer/client.py | 22 +++++++++++++--------- analyzer/post_measuredata.sh | 23 +++++++++++++++++++++++ 3 files changed, 44 insertions(+), 16 deletions(-) create mode 100755 analyzer/post_measuredata.sh diff --git a/analyzer/analyzer.py b/analyzer/analyzer.py index c5cdca1..5f78969 100755 --- a/analyzer/analyzer.py +++ b/analyzer/analyzer.py @@ -16,19 +16,19 @@ def combi(self, strToCombi, tool): strArray = strToCombi.split(',')[1:] """ transform the unit of datasize in scp from B to KB """ - if tool == "scp": + if tool != "iperf": datasize = strArray[-1] strArray[-1] = str(float(datasize) / 1024) - """ remove the point whose bandwidth is 0 """ - if (not self.deal(strArray)): + """ append bandwidth and remove the point whose bandwidth is 0 """ + if (not self.deal(strArray, tool)): return [False, ''] strToCombi = ' '.join(strArray) return [True, strToCombi] - """ compute the bandwidth """ - def deal(self, strArray): + """ compute and append bandwidth """ + def deal(self, strArray, tool): bandwidth = float('%0.2f'%((float(strArray[-1]) * 1024 * 8) / float(strArray[-2]))) """ remove the point whose bandwidth is 0 """ @@ -36,13 +36,14 @@ def deal(self, strArray): return False strArray.append(str(bandwidth)) + strArray.append(tool) return True """ analyze a line """ - def analyze(self, strToMatch, tools): + def analyze(self, strToMatch, tools, reg_prefix): result = '' for tool in tools: - reg = "'" + tool + ".*'" + reg = "'" + reg_prefix + tool + ".*'" matchResult = self.match(reg, strToMatch, result) if matchResult[0]: resultSet = self.combi(matchResult[1], tool) diff --git a/analyzer/client.py b/analyzer/client.py index 8236ba9..45775bc 100755 --- a/analyzer/client.py +++ b/analyzer/client.py @@ -10,10 +10,11 @@ def __init__(self): self.uriLog = "" self.uriTime = "" self.shellPath = "" + self.reg_prefix = "undefined" self.sourceFile = [] def usage(self): - print("client.py -l -t -s ") + print("client.py -l -t -s [-r ]") def getOptions(self): @@ -21,7 +22,7 @@ def getOptions(self): self.usage() sys.exit() - opts, args = getopt.getopt(sys.argv[1:], "hl:t:s:", ["help", "log=", "timeStamp=", "shellScript="]) + opts, args = getopt.getopt(sys.argv[1:], "hl:t:s:r:", ["help", "log=", "timeStamp=", "shellScript=", "regular="]) for op,value in opts: if op in ("-h", "--help"): self.usage() @@ -32,6 +33,12 @@ def getOptions(self): self.uriTime = value elif op in ("-s", "--shellScript"): self.shellPath = value + elif op in ("-r", "--regular"): + self.reg_prefix = value + if self.reg_prefix == "undefined": + self.reg_prefix = ".*writerecord:" + elif self.reg_prefix == "NULL": + self.reg_prefix = "" """ read the log rotated """ def readLog(self, uri): @@ -44,10 +51,7 @@ def closeFile(self): """ choose the corresponding shell to insert into database """ def combi(self, result, tool): - if tool == "iperf": - return self.shellPath + "post_iperf_time.sh " + result + " USERNAME=username PASSWORD=password HOSTNAME=hostname:port" - elif tool == "netcat" or tool == "scp": - return self.shellPath + "post_netcat_time.sh " + result + " USERNAME=username PASSWORD=password HOSTNAME=hostname:port" + return self.shellPath + "post_measuredata.sh " + result + " USERNAME=username PASSWORD=password HOSTNAME=hostname:port" """ insert data into database """ def excuteShell(self, result): @@ -85,7 +89,7 @@ def main(self, analyzer): isFinished = False isNewTime = True self.getOptions() - tools = ["iperf", "scp"] + tools = ["iperf", "scp", "netcat"] if not os.path.exists(self.uriTime): print('WARN! Create a timeRead file!') @@ -100,8 +104,8 @@ def main(self, analyzer): #TODO read log rotated fileLines, isFinished = self.readLog(self.uriLog) - for i in range (len(fileLines)-1, 0, -1): - result = analyzer.analyze(fileLines[i], tools) + for line in range fileLines[::-1]: + result = analyzer.analyze(line, tools, self.reg_prefix) if result[0]: if self.check(result, timeR, offset): isFinished = True diff --git a/analyzer/post_measuredata.sh b/analyzer/post_measuredata.sh new file mode 100755 index 0000000..23efeb4 --- /dev/null +++ b/analyzer/post_measuredata.sh @@ -0,0 +1,23 @@ +#!/bin/sh + +USERNAME="idpl" +PASSWORD="idpl" +HOSTNAME="localhost:8000" +API_URI="/condor/measurementdata/" + +SENDER=$1 +RECEIVER=$2 +TIME_START=$3 +TIME_END=$4 +CHECKSUM_EQUAL=$5 +DURATION=$6 +DATA_SIZE=$7 +BANDWIDTH=$8 +MEASUREMENT=$9 + +shift 9 +export "$@" + +API_URL=http://$HOSTNAME$API_URI + +curl -u $USERNAME:$PASSWORD -H "Content-Type: application/json" -d "{\"source\": \"$SENDER\", \"destination\": \"$RECEIVER\", \"time_start\": $TIME_START, \"time_end\": $TIME_END, \"md5_equal\": $CHECKSUM_EQUAL, \"duration\": $DURATION, \"data_size\": $DATA_SIZE, \"bandwidth\": $BANDWIDTH, \"measurement\": \"$MEASUREMENT\"}" $API_URL From 5a2ad1a2568d5594bfeddd44f7020da21c9e09e5 Mon Sep 17 00:00:00 2001 From: kun22kun Date: Wed, 29 Jul 2015 16:02:04 +0800 Subject: [PATCH 27/37] datasize units of tools --- analyzer/readme | 3 +++ 1 file changed, 3 insertions(+) diff --git a/analyzer/readme b/analyzer/readme index 34b81af..558916a 100644 --- a/analyzer/readme +++ b/analyzer/readme @@ -1,2 +1,5 @@ scripts to analyze and deal with data +unit of datasize: +iperf KB +netcat, scp, fdt B From e783c45f01ce96b2c77617f58b879e70de834626 Mon Sep 17 00:00:00 2001 From: kun22kun Date: Wed, 29 Jul 2015 16:05:20 +0800 Subject: [PATCH 28/37] repair syntax error --- analyzer/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/analyzer/client.py b/analyzer/client.py index 45775bc..1e1191b 100755 --- a/analyzer/client.py +++ b/analyzer/client.py @@ -104,7 +104,7 @@ def main(self, analyzer): #TODO read log rotated fileLines, isFinished = self.readLog(self.uriLog) - for line in range fileLines[::-1]: + for line in fileLines[::-1]: result = analyzer.analyze(line, tools, self.reg_prefix) if result[0]: if self.check(result, timeR, offset): From 47c7c5ceb619f08d6c25b35a9b486d32f39afbce Mon Sep 17 00:00:00 2001 From: kun22kun Date: Wed, 29 Jul 2015 19:05:19 +0800 Subject: [PATCH 29/37] add schedule --- analyzer/schedule | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 analyzer/schedule diff --git a/analyzer/schedule b/analyzer/schedule new file mode 100644 index 0000000..81159ff --- /dev/null +++ b/analyzer/schedule @@ -0,0 +1,14 @@ +buaa 10/22:40:00 + buaa2wisc + buaa2cnic + buaa2ucsd +wisc 11/23:20:00 + wisc2buaa + wisc2cnic + wisc2ucsd +ucsd 0/12:20:00 + ucsd2buaa + ucsd2cnic + ucsd2wisc +cnic #TODO + From f6746e9f4b2e5635660dc4c91beb2bcefceddba2 Mon Sep 17 00:00:00 2001 From: kun22kun Date: Thu, 30 Jul 2015 11:20:57 +0800 Subject: [PATCH 30/37] add synchronization schedule --- synchronizer/schedule | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 synchronizer/schedule diff --git a/synchronizer/schedule b/synchronizer/schedule new file mode 100644 index 0000000..e7bf262 --- /dev/null +++ b/synchronizer/schedule @@ -0,0 +1,10 @@ +cnic + 11:45:00 + 23:45:00 +komatsu + 11:50:00 + 23:50:00 +murpa + 11:55:00 + 23:55:00 + From 4a5a61a6c8e26ccf453e397b3b315d40f1b14259 Mon Sep 17 00:00:00 2001 From: kun22kun Date: Thu, 30 Jul 2015 11:23:36 +0800 Subject: [PATCH 31/37] modify analysis schedule --- analyzer/schedule | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/analyzer/schedule b/analyzer/schedule index 81159ff..d9efc1d 100644 --- a/analyzer/schedule +++ b/analyzer/schedule @@ -1,12 +1,12 @@ -buaa 10/22:40:00 +buaa 00:05:00 12:05:00 buaa2wisc buaa2cnic buaa2ucsd -wisc 11/23:20:00 +wisc 00:05:00 12:05:00 wisc2buaa wisc2cnic wisc2ucsd -ucsd 0/12:20:00 +ucsd 00:05:00 12:05:00 ucsd2buaa ucsd2cnic ucsd2wisc From 79387a3ccf937b2730f5eb20ed146be49caf1c8d Mon Sep 17 00:00:00 2001 From: kun22kun Date: Thu, 30 Jul 2015 19:28:05 +0800 Subject: [PATCH 32/37] modify schedules of analysis and synchronization --- analyzer/schedule | 37 ++++++++++++++++++++++++------------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/analyzer/schedule b/analyzer/schedule index d9efc1d..0cbbefb 100644 --- a/analyzer/schedule +++ b/analyzer/schedule @@ -1,14 +1,25 @@ -buaa 00:05:00 12:05:00 - buaa2wisc - buaa2cnic - buaa2ucsd -wisc 00:05:00 12:05:00 - wisc2buaa - wisc2cnic - wisc2ucsd -ucsd 00:05:00 12:05:00 - ucsd2buaa - ucsd2cnic - ucsd2wisc -cnic #TODO +International + buaa 11:55:00 23:55:00 + buaa2wisc + buaa2cnic + buaa2ucsd + wisc 00:05:00 12:05:00 + wisc2buaa + wisc2cnic + wisc2ucsd + ucsd 00:05:00 12:05:00 + ucsd2buaa + ucsd2cnic + ucsd2wisc + cnic #TODO +Domestic + buaa 11:55:00 23:55:00 + buaa2cnic + buaa2yunnan + yunnan 00:05:00 12:05:00 + yunnan2buaa + yunnan2cnic + cnic 00:05:00 00:05:00 + cnic2buaa + cnic2yunnan From ea19b045d7197aafa096fc6157ca3c6c37a74dc4 Mon Sep 17 00:00:00 2001 From: kun22kun Date: Thu, 30 Jul 2015 20:14:30 +0800 Subject: [PATCH 33/37] add fdt and modifty schedule of analysis --- analyzer/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/analyzer/client.py b/analyzer/client.py index 1e1191b..bb7aab1 100755 --- a/analyzer/client.py +++ b/analyzer/client.py @@ -89,7 +89,7 @@ def main(self, analyzer): isFinished = False isNewTime = True self.getOptions() - tools = ["iperf", "scp", "netcat"] + tools = ["iperf", "scp", "netcat", "fdt"] if not os.path.exists(self.uriTime): print('WARN! Create a timeRead file!') From 96477ddf39e835d9fe0a0b0ae4124d7626b9eec8 Mon Sep 17 00:00:00 2001 From: kun22kun Date: Fri, 31 Jul 2015 14:32:18 +0800 Subject: [PATCH 34/37] modify schedule of synchronization --- synchronizer/schedule | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/synchronizer/schedule b/synchronizer/schedule index e7bf262..21b7dd3 100644 --- a/synchronizer/schedule +++ b/synchronizer/schedule @@ -1,10 +1,22 @@ -cnic - 11:45:00 - 23:45:00 -komatsu - 11:50:00 - 23:50:00 -murpa - 11:55:00 - 23:55:00 +International: + cnic + 11:45:00 + 23:45:00 + komatsu + 11:50:00 + 23:50:00 + murpa + 11:55:00 + 23:55:00 + +Domestic + cnic + 11:35:00 + 23:35:00 + yunnan + 11:55:00 + 23:55:00 + + + From 30ea3c9b0e34a971879c1aa92977bfa05b4d8eb3 Mon Sep 17 00:00:00 2001 From: kun22kun Date: Mon, 3 Aug 2015 16:58:41 +0800 Subject: [PATCH 35/37] modify the offset due to new format of post shell command which adds tool --- analyzer/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/analyzer/client.py b/analyzer/client.py index bb7aab1..b9110ae 100755 --- a/analyzer/client.py +++ b/analyzer/client.py @@ -85,7 +85,7 @@ def readTimeRead(self, uri): def main(self, analyzer): - offset = 6 + offset = 7 isFinished = False isNewTime = True self.getOptions() From 3a8906657a3c67d8604e62ca8377d008515b6c5f Mon Sep 17 00:00:00 2001 From: kun22kun Date: Tue, 4 Aug 2015 03:46:30 +0800 Subject: [PATCH 36/37] add schedule.sh --- analyzer/schedule.sh | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100755 analyzer/schedule.sh diff --git a/analyzer/schedule.sh b/analyzer/schedule.sh new file mode 100755 index 0000000..e443d52 --- /dev/null +++ b/analyzer/schedule.sh @@ -0,0 +1,35 @@ +#!/bin/sh +#buaa +buaa2ucsd="/home/zwzhang/placement4/buaa2ucsd/placement4.log /home/idpl/results/buaa2ucsd/timeRead /home/kunq/logs_analyzer/buaa/buaa2ucsd_out.log" +buaa2cnic="/home/zwzhang/placement4/buaa2cnic/placement4.log /home/idpl/results/buaa2cnic/timeRead /home/kunq/logs_analyzer/buaa/buaa2cnic_out.log" +buaa2wisc="/home/zwzhang/placement4/buaa2wisc/placement4.log /home/idpl/results/buaa2wisc/timeRead /home/kunq/logs_analyzer/buaa/buaa2wisc_out.log" + +#wisc +wisc2buaa="/home/idpl/results/wisc2buaa/placement4.log /home/idpl/results/wisc2buaa/timeRead /home/kunq/logs_analyzer/wisc/wisc2buaa_out.log" +wisc2ucsd="/home/idpl/results/wisc2ucsd/placement4.log /home/idpl/results/wisc2ucsd/timeRead /home/kunq/logs_analyzer/wisc/wisc2ucsd_out.log" +wisc2cnic="/home/idpl/results/wisc2cnic/placement4.log /home/idpl/results/wisc2cnic/timeRead /home/kunq/logs_analyzer/wisc/wisc2cnic_out.log" +wisc2calit2="/home/idpl/results/wisc2calit2/placement4.log /home/idpl/results/wisc2calit2/timeRead /home/kunq/logs_analyzer/wisc/wisc2calit2_out.log" + +#ucsd +ucsd2buaa="/home/idpl/results/ucsd2buaa/placement4.log /home/idpl/results/ucsd2buaa/timeRead /home/kunq/logs_analyzer/ucsd/ucsd2buaa_out.log" +ucsd2cnic="/home/idpl/results/ucsd2cnic/placement4.log /home/idpl/results/ucsd2cnic/timeRead /home/kunq/logs_analyzer/ucsd/ucsd2cnic_out.log" +ucsd2wisc="/home/idpl/results/ucsd2wisc/placement4.log /home/idpl/results/ucsd2wisc/timeRead /home/kunq/logs_analyzer/ucsd/ucsd2wisc_out.log" +physics2calit2="/home/idpl/results/physics2calit2/placement4.log /home/idpl/results/physics2calit2/timeRead /home/kunq/logs_analyzer/physics/physics2calit2_out.log" + +#calit2 +calit2physics="/home/idpl/results/calit2physics/placement4.log /home/idpl/results/calit2physics/timeRead /home/kunq/logs_analyzer/calit2/calit2physics_out.log" +calit2wisc="/home/idpl/results/calit2wisc/placement4.log /home/idpl/results/calit2wisc/timeRead /home/kunq/logs_analyzer/calit2/calit2wisc_out.log" + + + +PATH="/home/kunq/analyzer0728/" +ANALYZER_PATH="${PATH}client.py" +SHELL_PATH=${PATH} +ALL_PARAMS=(buaa2ucsd buaa2cnic buaa2wisc wisc2buaa wisc2ucsd wisc2cnic wisc2calit2 ucsd2buaa ucsd2cnic ucsd2wisc physics2calit2 calit2physics calit2wisc) + +for index in ${ALL_PARAMS[@]} +do + eval params=\$${index} + params_arr=($params) + ${ANALYZER_PATH} -l ${params_arr[0]} -t ${params_arr[1]} -s ${SHELL_PATH} >> ${params_arr[2]} 2>&1 +done From c26d3a92e97a190d93f33cde0324834d4fcea8a0 Mon Sep 17 00:00:00 2001 From: kun22kun Date: Tue, 4 Aug 2015 21:38:53 +0800 Subject: [PATCH 37/37] fix name conflict between sys var and my var --- analyzer/schedule.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/analyzer/schedule.sh b/analyzer/schedule.sh index e443d52..50d8bb4 100755 --- a/analyzer/schedule.sh +++ b/analyzer/schedule.sh @@ -22,9 +22,9 @@ calit2wisc="/home/idpl/results/calit2wisc/placement4.log /home/idpl/results/cali -PATH="/home/kunq/analyzer0728/" -ANALYZER_PATH="${PATH}client.py" -SHELL_PATH=${PATH} +WORKSPACE="/home/kunq/analyzer0728/" +ANALYZER_PATH="${WORKSPACE}client.py" +SHELL_PATH=${WORKSPACE} ALL_PARAMS=(buaa2ucsd buaa2cnic buaa2wisc wisc2buaa wisc2ucsd wisc2cnic wisc2calit2 ucsd2buaa ucsd2cnic ucsd2wisc physics2calit2 calit2physics calit2wisc) for index in ${ALL_PARAMS[@]}