diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e43b0f9 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.DS_Store diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..f21b204 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,19 @@ +language: java +# sudo: required instructs Travis to use a "real VM" instead of a docker VM +sudo: required +jdk: +- oraclejdk9 + +before_install: +- sudo apt-get update -qq +- sudo apt-get install -qq ant-optional + +#notifications: + # email: + # on_success: always + # on_failure: always +install: +- sudo apt-get install ant +script: +- cd scripts +- ./travis_performance_test.sh diff --git a/scripts/MSocketClient.java b/scripts/MSocketClient.java new file mode 100644 index 0000000..46c2253 --- /dev/null +++ b/scripts/MSocketClient.java @@ -0,0 +1,100 @@ +import edu.umass.cs.msocket.FlowPath; +import edu.umass.cs.msocket.MSocket; +import edu.umass.cs.msocket.mobility.MobilityManagerClient; + +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.text.DecimalFormat; +import java.util.Arrays; + +public class MSocketClient implements Runnable { + + + private static final int LOCAL_PORT = 5556; + private static final String LOCALHOST = "127.0.0.1"; + private static DecimalFormat df = new DecimalFormat("0.00##"); + private static final int TOTAL_ROUND = 100; + private static int numBytes = 64000000; + + public static Long calc_median(Long[] input){ + Arrays.sort(input); + Long median; + if (input.length % 2 == 0) + median = ((long)input[input.length/2] + (long)input[input.length/2 - 1])/2; + else + median = (long) input[input.length/2]; + return median; + } + public static Long calc_avg(Long[] input){ + int len = input.length; + Long sum = 0L; + for(int i=0;i= 0) + totalRead += numRead; + + } while (totalRead < numSent); + + long elapsed = System.currentTimeMillis() - start; + transferTime[current_round] = elapsed; + current_round++; + } + + os.write(-1); + os.flush(); + + median_time = calc_avg(transferTime); + System.out.println(median_time); + ms.close(); + + + return; + + }catch(Exception e){ + e.printStackTrace(); + } + + + + } +} diff --git a/scripts/MSocketServer.java b/scripts/MSocketServer.java new file mode 100644 index 0000000..71e29f4 --- /dev/null +++ b/scripts/MSocketServer.java @@ -0,0 +1,108 @@ +import edu.umass.cs.msocket.MServerSocket; +import edu.umass.cs.msocket.MSocket; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Random; +import java.util.Scanner; + +public class MSocketServer implements Runnable{ + + private static final int LOCAL_PORT = 5556; + private static final String LOCALHOST = "127.0.0.1"; + private static MServerSocket mss = null; + static DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss"); + + public MSocketServer(){ + + } + public void run(){ + String serverIPOrName = LOCALHOST; + int serverPort = LOCAL_PORT; + try{ + mss = new MServerSocket(serverPort, 0, InetAddress.getByName(serverIPOrName)); + MSocket msocket = mss.accept(); + RequestHandlingThread requestThread = new RequestHandlingThread(msocket); + requestThread.start(); + }catch(IOException e){ + e.printStackTrace(); + } + } + + private static class RequestHandlingThread extends Thread + { + private MSocket msocket; + + public RequestHandlingThread(MSocket msocket) + { + this.msocket = msocket; + } + + public void run() + { + int numRead = 0; + Scanner reader = new Scanner(System.in); + InputStream is = null; + OutputStream os = null; + try { + is = msocket.getInputStream(); + os = msocket.getOutputStream(); + } catch (IOException e) { + e.printStackTrace(); + } + if(is != null) { + + + while(numRead >= 0) { + long start = System.currentTimeMillis(); + + + byte[] numByteArr = new byte[8]; + try { + + is.read(numByteArr); + ByteBuffer wrapped = ByteBuffer.wrap(numByteArr); + numRead = wrapped.getInt(); + } catch (IOException e) { + e.printStackTrace(); + } + if (numRead > 0) { + + + byte[] b = new byte[numRead]; + new Random().nextBytes(b); + + try { + os.write(b); + os.flush(); + } catch (IOException e) { + e.printStackTrace(); + } + // reset + b = null; + numRead = 0; + long elapsed = System.currentTimeMillis() - start; + LocalDateTime now = LocalDateTime.now(); + + } + + } + + try { + msocket.close(); + System.exit(0); + } catch (IOException e) { + e.printStackTrace(); + } + + } + + + } + } +} diff --git a/scripts/TCPClient.java b/scripts/TCPClient.java new file mode 100644 index 0000000..9093587 --- /dev/null +++ b/scripts/TCPClient.java @@ -0,0 +1,105 @@ +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.text.DecimalFormat; +import java.io.*; +import java.net.*; +import java.lang.*; +import java.nio.*; +import java.util.Arrays; + + +public class TCPClient implements Runnable{ + + + private static final int LOCAL_PORT = 5455; + private static final String LOCALHOST = "127.0.0.1"; + private static final DecimalFormat df = new DecimalFormat("0.00##"); + private static final int TOTAL_ROUND = 100; + private static final int numBytes = 64000000; + + public TCPClient(){ + + } + public static Long calc_avg(Long[] input){ + int len = input.length; + Long sum = 0L; + for(int i=0;i= 0) + totalRead += numRead; + + } while (totalRead < numSent); + + long elapsed = System.currentTimeMillis() - start; + + + transferTime[current_round] = elapsed; + current_round++; + } + + os.write(-1); + os.flush(); + + median_time = calc_avg(transferTime); + System.out.println(median_time); + socket.close(); + + } catch (Exception e) { + e.printStackTrace(); + } + + + + + } + +} diff --git a/scripts/TCPServer.java b/scripts/TCPServer.java new file mode 100644 index 0000000..e3f282c --- /dev/null +++ b/scripts/TCPServer.java @@ -0,0 +1,116 @@ +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Random; +import java.io.*; +import java.net.*; +import java.lang.*; +import java.nio.*; + +public class TCPServer implements Runnable{ + + private static final int LOCAL_PORT = 5455; + private static final String LOCALHOST = "0.0.0.0"; + + private static ServerSocket server; + + static DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss"); + + public TCPServer(){ + } + + public void run(){ + String serverIPOrName = LOCALHOST; + int serverPort = LOCAL_PORT; + try{ + server = new ServerSocket(serverPort); + }catch(IOException e){ + e.printStackTrace(); + } + + try{ + Socket socket = server.accept(); + + RequestHandlingThread requestThread = new RequestHandlingThread(socket); + requestThread.start(); + }catch(IOException e){ + e.printStackTrace(); + } + + + + } + private static class RequestHandlingThread extends Thread + { + private Socket socket; + + public RequestHandlingThread(Socket sock) + { + this.socket = sock; + } + + public void run() + { + int numRead = 0; + + InputStream is = null; + OutputStream os = null; + try { + is = socket.getInputStream(); + os = socket.getOutputStream(); + } catch (IOException e) { + e.printStackTrace(); + } + if(is != null) { + while(numRead >= 0) { + long start = System.currentTimeMillis(); + byte[] numByteArr = new byte[4]; + try { + is.read(numByteArr); + ByteBuffer wrapped = ByteBuffer.wrap(numByteArr); + numRead = wrapped.getInt(); + } catch (IOException e) { + e.printStackTrace(); + } + + // send random bytes + if (numRead > 0) { + + + byte[] b = new byte[numRead]; + new Random().nextBytes(b); + try { + os.write(b); + os.flush(); + } catch (IOException e) { + e.printStackTrace(); + } + // reset + numRead = 0; + long elapsed = System.currentTimeMillis() - start; + LocalDateTime now = LocalDateTime.now(); + + } + + } + + try { + socket.close(); + } catch (IOException e) { + e.printStackTrace(); + } + + } + + + } + } + + + +} diff --git a/scripts/check_performance_diff.py b/scripts/check_performance_diff.py new file mode 100644 index 0000000..f01ef7d --- /dev/null +++ b/scripts/check_performance_diff.py @@ -0,0 +1,13 @@ + + +if __name__ == "__main__": + f = open("mstime.txt",'r') + mstime = int(f.read()) + f = open("tcptime.txt",'r') + tcptime = int(f.read()) + overhead = ((mstime - tcptime)/tcptime) * 100 + # print("the overhead is :- " + str(overhead)) + if(overhead > 0 and overhead < 250): + print(0) + else: + print(1) diff --git a/scripts/runmsocketserverclient.java b/scripts/runmsocketserverclient.java new file mode 100644 index 0000000..667457f --- /dev/null +++ b/scripts/runmsocketserverclient.java @@ -0,0 +1,16 @@ +import java.io.*; + +public class runmsocketserverclient{ + public static void main(String[] args){ + MSocketServer server = new MSocketServer(); + Thread s = new Thread(server); + s.start(); + + + MSocketClient client = new MSocketClient(); + Thread c = new Thread(client); + c.start(); + + + } +} diff --git a/scripts/runtcpserverclient.java b/scripts/runtcpserverclient.java new file mode 100644 index 0000000..c0629cd --- /dev/null +++ b/scripts/runtcpserverclient.java @@ -0,0 +1,18 @@ +import java.io.*; + + +public class runtcpserverclient{ + + public static void main(String[] args){ + TCPServer server = new TCPServer(); + Thread s = new Thread(server); + s.start(); + + + TCPClient client = new TCPClient(); + Thread c = new Thread(client); + c.start(); + + + } +} diff --git a/scripts/travis_performance_test.sh b/scripts/travis_performance_test.sh new file mode 100755 index 0000000..21370e5 --- /dev/null +++ b/scripts/travis_performance_test.sh @@ -0,0 +1,21 @@ +cd .. && +ant jar && +cd scripts && +javac -cp ../jars/msocket-1.0.0.jar:. runmsocketserverclient.java +java -cp ../jars/msocket-1.0.0.jar:. runmsocketserverclient > mstime.txt +javac runtcpserverclient.java; +java runtcpserverclient > tcptime.txt +head mstime.txt +head tcptime.txt +difference=`python3 check_performance_diff.py` +echo $difference +rm mstime.txt tcptime.txt *.class +cd .. +ant clean +rm build.number +if [[ $difference -eq 0 ]] +then + exit 0 +else + exit 1 +fi diff --git a/src/edu/umass/cs/msocket/BackgroundWritingThread.java b/src/edu/umass/cs/msocket/BackgroundWritingThread.java index 2ba7e49..096f73e 100644 --- a/src/edu/umass/cs/msocket/BackgroundWritingThread.java +++ b/src/edu/umass/cs/msocket/BackgroundWritingThread.java @@ -515,7 +515,7 @@ else if (minOutBytesPath.getOutStandingBytes() - finishedPaths.get(i).getOutStan { if (minOutBytesPath.getSocketIdentifer() != socketList.get(i).getSocketIdentifer()) { - + MSocketLogger.getLogger().log(Level.FINE,"SocketID: {0} added to the unfinished path. Sent Bytes: {1}, Outstanding Bytes: {2}.", new Object[]{socketList.get(i).getSocketIdentifer(),socketList.get(i).getSentBytes(),socketList.get(i).getOutStandingBytes()}); unfinishedPaths.add(socketList.get(i)); } @@ -556,7 +556,7 @@ private void continousAckReads() catch (IOException e) { // e.printStackTrace(); - MSocketLogger.getLogger().fine(e.getMessage()); + MSocketLogger.getLogger().log(Level.FINE, e.getMessage()); } long newDataBaseSeqNum = cinfo.getDataBaseSeq(); diff --git a/src/edu/umass/cs/msocket/ConnectionInfo.java b/src/edu/umass/cs/msocket/ConnectionInfo.java index 0b9ba58..30db365 100644 --- a/src/edu/umass/cs/msocket/ConnectionInfo.java +++ b/src/edu/umass/cs/msocket/ConnectionInfo.java @@ -2508,7 +2508,8 @@ private void setupControlWrite(InetAddress ControllerAddress, long lfid, int mst SCToUse.write(buf); } - MSocketLogger.getLogger().fine("Sent IP:port " + ControllerPort + "; ackSeq = " + DataAckSeq); + // MSocketLogger.getLogger().fine("Sent IP:port " + ControllerPort + "; ackSeq = " + DataAckSeq); + MSocketLogger.getLogger().log(Level.FINE, "Sent IP:port {0}; ackSeq = {1}", new Object[]{ControllerPort,DataAckSeq}); } private SetupControlMessage setupControlRead(SocketChannel SCToUse) throws IOException @@ -2845,7 +2846,7 @@ public void blockOnInputStreamSelector() catch (Exception e) { //e.printStackTrace(); - MSocketLogger.getLogger().fine(e.getMessage()); + MSocketLogger.getLogger().log(Level.FINE, e.getMessage()); // if selector not open, then break if (!getInputStreamSelector().isOpen()) { @@ -2890,7 +2891,7 @@ public void blockOnOutputStreamSelector() } catch (Exception e) { - MSocketLogger.getLogger().fine(e.getMessage()); + MSocketLogger.getLogger().log(Level.FINE, e.getMessage()); } } @@ -2911,7 +2912,7 @@ public void blockOnOutputStreamSelector() catch (Exception e) { // e.printStackTrace(); - MSocketLogger.getLogger().fine(e.toString()); + MSocketLogger.getLogger().log(Level.FINE,e.toString()); // if not open then break from loop if (!getOutputStreamSelector().isOpen()) @@ -2926,7 +2927,7 @@ public void blockOnOutputStreamSelector() } else { - MSocketLogger.getLogger().fine(this.getServerOrClient() + "unblocked on the selector"); + // MSocketLogger.getLogger().fine(this.getServerOrClient() + "unblocked on the selector"); MSocketLogger.getLogger().log(Level.FINE, "{0} unblocked on the selector", this.getServerOrClient()); Set selectedKeys = getOutputStreamSelector().selectedKeys(); selectedKeys.clear(); diff --git a/src/edu/umass/cs/msocket/FlowIDToControllerMapping.java b/src/edu/umass/cs/msocket/FlowIDToControllerMapping.java index 6c588aa..94daf94 100644 --- a/src/edu/umass/cs/msocket/FlowIDToControllerMapping.java +++ b/src/edu/umass/cs/msocket/FlowIDToControllerMapping.java @@ -244,7 +244,7 @@ public void run() } catch (Exception e) { - MSocketLogger.getLogger().fine(e.toString()); + MSocketLogger.getLogger().log(Level.FINE,e.toString()); } MSocketLogger.getLogger().log(Level.FINE,"FlowIDToControllerMapping UDP recv thread exits"); diff --git a/src/edu/umass/cs/msocket/MSocketClient.java b/src/edu/umass/cs/msocket/MSocketClient.java index a70cc96..db4b12a 100644 --- a/src/edu/umass/cs/msocket/MSocketClient.java +++ b/src/edu/umass/cs/msocket/MSocketClient.java @@ -3,7 +3,7 @@ import edu.umass.cs.msocket.FlowPath; import edu.umass.cs.msocket.MSocket; import edu.umass.cs.msocket.mobility.MobilityManagerClient; - +import java.util.Arrays; import java.io.InputStream; import java.io.OutputStream; import java.net.InetAddress; @@ -13,78 +13,139 @@ public class MSocketClient { - private static final int LOCAL_PORT = 5555; + private static final int LOCAL_PORT = 6666; private static final String LOCALHOST = "127.0.0.1"; private static DecimalFormat df = new DecimalFormat("0.00##"); - private static final int TOTAL_ROUND = 2; - private static int numBytes = Integer.MAX_VALUE - 2; + private static final int TOTAL_ROUND = 500; + + private static int numBytes = 512000; + + + public static double calc_avg(Long[] input){ + int len = input.length; + double sum = 0; + for(int i=0;i= 0) totalRead += numRead; - } while (totalRead < numOfBytes); - b = null; - long elapsed = System.currentTimeMillis() - start; - System.out.println("[Latency:] " + elapsed + " ms"); - System.out.println("[Thruput:] " + df.format(numOfBytes/1000.0/elapsed ) + " MB/s"); - + } while (totalRead < numSent); + long read_time_elapsed = System.nanoTime() - read_time_start; + long app_level_elapsed = System.nanoTime() - app_level_start; + System.out.println("[Write:] " + write_time_elapsed + " ns"); + System.out.println("[Read:] " + read_time_elapsed/ 1000000 + " ms"); + System.out.println("[TransferTime:] " + app_level_elapsed + " nano seconds"); + + double n_bytes = numOfBytes; + n_bytes = n_bytes/1000000.0; + double app_level_elap = app_level_elapsed; + app_level_elap = app_level_elap / 1000000000.0; + System.out.println("[Thruput:] " + n_bytes/app_level_elap + " MB/s"); + transferTime[rd] = app_level_elapsed; rd++; } @@ -94,6 +155,8 @@ public static void main(String[] args) { ms.close(); System.out.println("Socket closed"); + System.out.println("this is the average transferTime for MSocket of " + Integer.toString(numRound) + " rounds : " + Double.toString(calc_avg(transferTime)/1000000) + " ms"); + System.out.println("this is the median transferTime for MSocket of " + Integer.toString(numRound) + " rounds : " + Double.toString(calc_median(transferTime)/1000000) + " ms"); MobilityManagerClient.shutdownMobilityManager(); } catch (Exception e) { e.printStackTrace(); diff --git a/src/edu/umass/cs/msocket/MSocketServer.java b/src/edu/umass/cs/msocket/MSocketServer.java index d63f211..1cb6dc7 100644 --- a/src/edu/umass/cs/msocket/MSocketServer.java +++ b/src/edu/umass/cs/msocket/MSocketServer.java @@ -1,5 +1,9 @@ package edu.umass.cs.msocket; +import edu.umass.cs.msocket.MServerSocket; +import edu.umass.cs.msocket.MSocket; + + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -9,11 +13,10 @@ import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Random; -import java.util.Scanner; public class MSocketServer { - private static final int LOCAL_PORT = 5555; + private static final int LOCAL_PORT = 6666; private static final String LOCALHOST = "127.0.0.1"; private static MServerSocket mss = null; @@ -59,7 +62,7 @@ public RequestHandlingThread(MSocket msocket) public void run() { int numRead = 0; - Scanner reader = new Scanner(System.in); + InputStream is = null; OutputStream os = null; try { @@ -70,42 +73,49 @@ public void run() } if(is != null) { // client sends 0 to close the socket - + long run_number = 0; while(numRead >= 0) { - long start = System.currentTimeMillis(); + System.out.println("Round Numebr : " + run_number); + long start = System.nanoTime(); // get number of bytes to send - byte[] numByteArr = new byte[8]; + byte[] numByteArr = new byte[4]; try { - System.out.println("waiting before read "); + long read_time_start = System.nanoTime(); is.read(numByteArr); + long read_time_elapsed = System.nanoTime() - read_time_start; + System.out.println("MSocket read from the client time is: " + read_time_elapsed / 1000000 + " ms"); ByteBuffer wrapped = ByteBuffer.wrap(numByteArr); numRead = wrapped.getInt(); } catch (IOException e) { e.printStackTrace(); } - System.out.println("received number of bytes"); + // send random bytes if (numRead > 0) { + System.out.println("Ready to send "+numRead+" bytes."); byte[] b = new byte[numRead]; new Random().nextBytes(b); - System.out.println("array initialized"); + try { - os.write(b); - os.flush(); + long write_time_start = System.nanoTime(); + os.write(b); + os.flush(); + long write_time_elapsed = System.nanoTime() - write_time_start; + System.out.println("MSocket Time to write to the socket is: " + write_time_elapsed/1000000 + " ms"); + } catch (IOException e) { e.printStackTrace(); } // reset - b = null; numRead = 0; - long elapsed = System.currentTimeMillis() - start; + long elapsed = System.nanoTime() - start; LocalDateTime now = LocalDateTime.now(); - System.out.println("[" + dtf.format(now) + "] Data sending finished. It takes " + elapsed / 1000.0 + " seconds"); + System.out.println("[" + dtf.format(now) + "] Data sending finished. App level write time " + elapsed/1000000 + " ms"); } - + run_number += 1; } try { diff --git a/src/edu/umass/cs/msocket/MWrappedInputStream.java b/src/edu/umass/cs/msocket/MWrappedInputStream.java index 96f3bb3..5ae1c6a 100644 --- a/src/edu/umass/cs/msocket/MWrappedInputStream.java +++ b/src/edu/umass/cs/msocket/MWrappedInputStream.java @@ -66,7 +66,7 @@ public synchronized int read(byte[] b, int offset, int length) throws IOExceptio if (cinfo.getMSocketState() == MSocketConstants.CLOSED) throw new IOException(" socket already closed"); - MSocketLogger.getLogger().fine(cinfo.getServerOrClient()+" app read called"); + MSocketLogger.getLogger().log(Level.FINE,"{0} app read called",cinfo.getServerOrClient()); int nread = 0; while(nread == 0) diff --git a/src/edu/umass/cs/msocket/MWrappedOutputStream.java b/src/edu/umass/cs/msocket/MWrappedOutputStream.java index 55b6b7b..64749e0 100644 --- a/src/edu/umass/cs/msocket/MWrappedOutputStream.java +++ b/src/edu/umass/cs/msocket/MWrappedOutputStream.java @@ -31,6 +31,7 @@ import edu.umass.cs.msocket.common.policies.MultipathWritingPolicy; import edu.umass.cs.msocket.common.policies.RTTBasedWritingPolicy; import edu.umass.cs.msocket.common.policies.RoundRobinWritingPolicy; +import edu.umass.cs.msocket.common.policies.ReMP; import edu.umass.cs.msocket.logger.MSocketLogger; import java.util.logging.Level; /** @@ -48,7 +49,7 @@ public class MWrappedOutputStream extends OutputStream public static final int WRITE_CHUNK_SIZE = 1000000; private ConnectionInfo cinfo = null; - private MultipathPolicy writePolicy = MultipathPolicy.MULTIPATH_POLICY_RTX_OPT; + private MultipathPolicy writePolicy = MultipathPolicy.MULTIPATH_POLICY_REMP; /** * @param out @@ -86,6 +87,12 @@ public class MWrappedOutputStream extends OutputStream cinfo.setMultipathWritingPolicy(multipathPolicy); break; } + case MULTIPATH_POLICY_REMP: + { + MultipathWritingPolicy multipathPolicy = new ReMP(cinfo); + cinfo.setMultipathWritingPolicy(multipathPolicy); + break; + } } } diff --git a/src/edu/umass/cs/msocket/MultipathPolicy.java b/src/edu/umass/cs/msocket/MultipathPolicy.java index c2a94ea..5eb5e9b 100644 --- a/src/edu/umass/cs/msocket/MultipathPolicy.java +++ b/src/edu/umass/cs/msocket/MultipathPolicy.java @@ -2,18 +2,18 @@ * Mobility First - Global Name Resolution Service (GNS) * Copyright (C) 2013 University of Massachusetts - Emmanuel Cecchet. * Contact: cecchet@cs.umass.edu - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and - * limitations under the License. + * limitations under the License. * * Initial developer(s): Emmanuel Cecchet. * Contributor(s): ______________________. @@ -23,7 +23,7 @@ /** * This class defines a MultipathPolicy - * + * * @author Emmanuel Cecchet * @version 1.0 */ @@ -56,5 +56,9 @@ public enum MultipathPolicy /** * Black box writing policy */ - MULTIPATH_POLICY_BLACKBOX -} \ No newline at end of file + MULTIPATH_POLICY_BLACKBOX, + /** + * Replicate data on all paths + */ + MULTIPATH_POLICY_REMP +} diff --git a/src/edu/umass/cs/msocket/TimerTaskClass.java b/src/edu/umass/cs/msocket/TimerTaskClass.java index e3482e5..aa35ba9 100644 --- a/src/edu/umass/cs/msocket/TimerTaskClass.java +++ b/src/edu/umass/cs/msocket/TimerTaskClass.java @@ -39,7 +39,7 @@ class TimerTaskClass /** * Proxy failure timeout in seconds (default is 15 seconds) */ - private static final int proxyFailureTimeout = MSocket.KEEP_ALIVE_FREQ*3; + private static final int proxyFailureTimeout = MSocket.KEEP_ALIVE_FREQ*300; /** * Creates a new TimerTaskClass object diff --git a/src/edu/umass/cs/msocket/common/policies/MultipathWritingPolicy.java b/src/edu/umass/cs/msocket/common/policies/MultipathWritingPolicy.java index 4921a3f..2bc9631 100644 --- a/src/edu/umass/cs/msocket/common/policies/MultipathWritingPolicy.java +++ b/src/edu/umass/cs/msocket/common/policies/MultipathWritingPolicy.java @@ -61,7 +61,7 @@ public abstract class MultipathWritingPolicy { protected void handleMigrationInMultiPath(SocketInfo Obj) throws IOException { - MSocketLogger.getLogger().fine("handleMigrationInMultiPath called"); + MSocketLogger.getLogger().log(Level.FINE, "handleMigrationInMultiPath called"); // if queue size is > 0 then it means that there is a non-blocking // write pending and it should be sent first, instead of migration data if ((Integer) Obj.queueOperations(SocketInfo.QUEUE_SIZE, null) > 0) diff --git a/src/edu/umass/cs/msocket/common/policies/ReMP.java b/src/edu/umass/cs/msocket/common/policies/ReMP.java new file mode 100644 index 0000000..8d4338a --- /dev/null +++ b/src/edu/umass/cs/msocket/common/policies/ReMP.java @@ -0,0 +1,404 @@ +/** + * Mobility First - Global Name Resolution Service (GNS) + * Copyright (C) 2013 University of Massachusetts - Emmanuel Cecchet. + * Contact: cecchet@cs.umass.edu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Initial developer(s): Emmanuel Cecchet. + * Contributor(s): ______________________. + */ + +package edu.umass.cs.msocket.common.policies; + +import java.io.IOException; +import java.util.Collection; +import java.util.Vector; +import java.util.logging.Level; +import edu.umass.cs.msocket.ConnectionInfo; +import edu.umass.cs.msocket.DataMessage; +import edu.umass.cs.msocket.MSocketConstants; +import edu.umass.cs.msocket.MWrappedOutputStream; +import edu.umass.cs.msocket.MultipathPolicy; +import edu.umass.cs.msocket.SocketInfo; +import edu.umass.cs.msocket.logger.MSocketLogger; + +public class ReMP extends MultipathWritingPolicy +{ + private int defaultPolicyInterfaceNum = 0; + + public ReMP(ConnectionInfo cinfo) + { + this.cinfo = cinfo; +// cinfo.startRetransmissionThread(); +// cinfo.startEmptyQueueThread(); + } + + @Override + public void writeAccordingToPolicy(byte[] b, int offset, int length, int MesgType) throws IOException + { + int threshold = 20000000; + int currpos = 0; + int remaining = length; + int tempDataSendSeqNum = cinfo.getDataSendSeq(); + + if(length <= threshold){ + cinfo.blockOnOutputStreamSelector(); + try + { + cinfo.multiSocketRead(); + } + catch (IOException ex) + { + ex.printStackTrace(); + } + + Vector socketList = new Vector(); + socketList.addAll((Collection) cinfo.getAllSocketInfo()); + for (int i = 0; i < socketList.size(); i++) + { + SocketInfo Obj = socketList.get(i); + if (Obj != null) + { + + while(currpos < length) + { + while (!Obj.acquireLock()); + Obj.byteInfoVectorOperations(SocketInfo.QUEUE_REMOVE, cinfo.getDataBaseSeq(), -1); + int tobesent = 0; + if (remaining < MWrappedOutputStream.WRITE_CHUNK_SIZE) + { + tobesent = remaining; + } + else + { + tobesent = MWrappedOutputStream.WRITE_CHUNK_SIZE; + } + try{ + + if (Obj.getneedToReqeustACK()) + { + handleMigrationInMultiPath(Obj); + Obj.releaseLock(); + continue; + } +// int arrayCopyOffset = offset + currpos; + DataMessage dm = new DataMessage(MesgType, tempDataSendSeqNum, cinfo.getDataAckSeq(), tobesent, 0, b, + 0); + byte[] writebuf = dm.getBytes(); + + if ((Integer) Obj.queueOperations(SocketInfo.QUEUE_SIZE, null) > 0) + { + cinfo.attemptSocketWrite(Obj); + Obj.releaseLock(); + continue; + } + else + { + Obj.queueOperations(SocketInfo.QUEUE_PUT, writebuf); + Obj.byteInfoVectorOperations(SocketInfo.QUEUE_PUT, tempDataSendSeqNum, tobesent); + cinfo.attemptSocketWrite(Obj); + } + Obj.updateSentBytes(tobesent); + currpos += tobesent; + remaining -= tobesent; + tempDataSendSeqNum += tobesent; + Obj.releaseLock(); + + }catch (IOException ex){ + MSocketLogger.getLogger().log(Level.FINE,"Write exception caused"); + Obj.setStatus(false); + Obj.setneedToReqeustACK(true); + + Obj.updateSentBytes(tobesent); + currpos += tobesent; + remaining -= tobesent; + tempDataSendSeqNum += tobesent; + Obj.releaseLock(); + } + } + + } + + } + + + + }else{ + + while (currpos < length) + { + // blocks on selector, until the channel is + // available to write + cinfo.blockOnOutputStreamSelector(); + // reads input stream for ACKs and stores data in input buffer + try + { + cinfo.multiSocketRead(); + } + catch (IOException ex) + { + ex.printStackTrace(); + } + + SocketInfo Obj = getNextSocketToWrite(); // randomly + // choosing the + // socket to send + // chunk + if (Obj != null) + { + while (!Obj.acquireLock()); + Obj.byteInfoVectorOperations(SocketInfo.QUEUE_REMOVE, cinfo.getDataBaseSeq(), -1); + int tobesent = 0; + if (remaining < MWrappedOutputStream.WRITE_CHUNK_SIZE) + { + tobesent = remaining; + } + else + { + tobesent = MWrappedOutputStream.WRITE_CHUNK_SIZE; + } + + try + { + if (Obj.getneedToReqeustACK()) + { + handleMigrationInMultiPath(Obj); + Obj.releaseLock(); + continue; + } + + // System.arraycopy(b, offset + currpos, buf, 0, tobesent); + int arrayCopyOffset = offset + currpos; + DataMessage dm = new DataMessage(MesgType, tempDataSendSeqNum, cinfo.getDataAckSeq(), tobesent, 0, b, + arrayCopyOffset); + byte[] writebuf = dm.getBytes(); + + // exception of write means that socket is undergoing migration, + // make it not active, and transfer same data chunk over another + // available socket. + // at receiving side, receiver will take care of redundantly + // received data + + + if ((Integer) Obj.queueOperations(SocketInfo.QUEUE_SIZE, null) > 0) + { + cinfo.attemptSocketWrite(Obj); + Obj.releaseLock(); + continue; + } + else + { + Obj.queueOperations(SocketInfo.QUEUE_PUT, writebuf); + Obj.byteInfoVectorOperations(SocketInfo.QUEUE_PUT, tempDataSendSeqNum, tobesent); + } + + cinfo.attemptSocketWrite(Obj); + //if (cinfo.getServerOrClient() == MSocketConstants.CLIENT) + { + + MSocketLogger.getLogger().log(Level.FINE,"Using socketID: {0}, Remote IP: {1}, for writing tempDataSendSeqNum {2}", new Object[]{Obj.getSocketIdentifer(),Obj.getSocket().getInetAddress(),tempDataSendSeqNum}); + } + + Obj.updateSentBytes(tobesent); + currpos += tobesent; + remaining -= tobesent; + tempDataSendSeqNum += tobesent; + Obj.releaseLock(); + + } + catch (IOException ex) + { + + MSocketLogger.getLogger().log(Level.FINE,"Write exception caused"); + Obj.setStatus(false); + Obj.setneedToReqeustACK(true); + + Obj.updateSentBytes(tobesent); + currpos += tobesent; + remaining -= tobesent; + tempDataSendSeqNum += tobesent; + Obj.releaseLock(); + } + } + else + { + // throw exception and block or wait in while loop to check for any + // available sockets + synchronized (cinfo.getSocketMonitor()) + { + while ((cinfo.getActiveSocket(MultipathPolicy.MULTIPATH_POLICY_RANDOM) == null) + && (cinfo.getMSocketState() == MSocketConstants.ACTIVE)) + { + try + { + cinfo.getSocketMonitor().wait(); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + } + } + + if (cinfo.getMSocketState() == MSocketConstants.CLOSED) + { + throw new IOException(" socket already closed"); + + } + } + + } + } + + + Vector socketList = new Vector(); + socketList.addAll((Collection) cinfo.getAllSocketInfo()); + String print = ""; + for (int i = 0; i < socketList.size(); i++) + { + SocketInfo Obj = socketList.get(i); + if (Obj.getStatus()) + { + print += "socketID " + Obj.getSocketIdentifer() + " SentBytes " + Obj.getSentBytes() + " " + + " RecvdBytesOtherSide " + Obj.getRecvdBytesOtherSide() + " "; + } + } + + // MSocketLogger.getLogger().fine(print); + // need to empty the write queues here, can't return + // before that, otherwise it would desynchronize the output stream + //long emptyQueueStartTime = System.currentTimeMillis(); + cinfo.emptyTheWriteQueues(); + //long emptyQueueEndTime = System.currentTimeMillis(); + //System.out.println( "Write empty queue time "+(emptyQueueEndTime-emptyQueueStartTime) ); + } + + private Vector removeClosingFPsFromList() + { + Vector socketMapValues = new Vector(); + socketMapValues.addAll(cinfo.getAllSocketInfo()); + + Vector closingRemoved = new Vector(); + + for (int i=0;i socketMapValues = removeClosingFPsFromList(); + + int i = 0; + SocketInfo value = null; + SocketInfo retvalue = null; + int nummaxrun = 0; + + i = 0; + while (i < socketMapValues.size()) + { + + value = socketMapValues.get(i); + + if (value.getStatus()) // true means active + { + if (((Integer) value.queueOperations(SocketInfo.QUEUE_SIZE, null) == 0)) + { + } + else + { + while (!value.acquireLock()); + cinfo.attemptSocketWrite(value); + value.releaseLock(); + } + } + i++; + } + + do + { + i = 0; + while (i < socketMapValues.size()) + { + value = socketMapValues.get(i); + + if (value.getStatus()) // true means active + { + if (((Integer) value.queueOperations(SocketInfo.QUEUE_SIZE, null) == 0)) + { + if ((i >= defaultPolicyInterfaceNum)) + { + retvalue = value; + break; + } + else if (nummaxrun == 1) + { // rollover case + retvalue = value; + break; + } + } + } + i++; + } + nummaxrun++; + } + while ((retvalue == null) && (nummaxrun < 2)); // to rollover for + // the first one + + if (retvalue == null) + { // all paths have + // choked send buffers + nummaxrun = 0; + do + { + i = 0; + while (i < socketMapValues.size()) + { + value = socketMapValues.get(i); + + if (value.getStatus()) // true means active + { + if (nummaxrun == 1) + { + retvalue = value; + break; + } + else if ((i >= defaultPolicyInterfaceNum)) + { + retvalue = value; + break; + } + } + i++; + } + nummaxrun++; + } + while ((retvalue == null) && (nummaxrun < 2)); // to rollover + // for the first one + } + + int Size = socketMapValues.size(); + defaultPolicyInterfaceNum++; + defaultPolicyInterfaceNum = defaultPolicyInterfaceNum % Size; + + return retvalue; + } +}