From 386b8746fe82468e0ff02d880880ce02a15cee97 Mon Sep 17 00:00:00 2001 From: sengjun0624 Date: Tue, 8 Apr 2025 10:24:21 +0900 Subject: [PATCH 01/10] :sparkles [WEEK14] java-nio --- week14/src-backend-java-nio | 1 + 1 file changed, 1 insertion(+) create mode 160000 week14/src-backend-java-nio diff --git a/week14/src-backend-java-nio b/week14/src-backend-java-nio new file mode 160000 index 0000000..2c612e5 --- /dev/null +++ b/week14/src-backend-java-nio @@ -0,0 +1 @@ +Subproject commit 2c612e5265fc1d4465b12d48c8b2bb95a10385dc From 170a12bf463a42e769886fc4b5f99227febd7b38 Mon Sep 17 00:00:00 2001 From: sengjun0624 Date: Tue, 8 Apr 2025 14:33:40 +0900 Subject: [PATCH 02/10] =?UTF-8?q?:sparkles=20[WEEK14]=20Selection=20?= =?UTF-8?q?=EC=9D=B4=EB=B2=A4=ED=8A=B8=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- week14/src-backend-java-nio | 1 - week14/src-backend-java-nio/.gitignore | 2 + week14/src-backend-java-nio/README.md | 1 + .../step01_blocking_socket/.gitignore | 38 +++ .../step01_blocking_socket/pom.xml | 37 +++ .../src/main/java/dev/oio/App.java | 13 + .../step01_thread_per_request/EchoClient.java | 31 +++ .../step01_thread_per_request/EchoServer.java | 67 +++++ .../oio/step02_thread_pooling/EchoClient.java | 37 +++ .../oio/step02_thread_pooling/EchoServer.java | 98 +++++++ .../src/test/java/dev/oio/AppTest.java | 38 +++ .../step02_non_blocking/.gitignore | 38 +++ .../step02_non_blocking/index.txt | 61 +++++ .../step02_non_blocking/pom.xml | 25 ++ .../src/main/java/dev/nio/App.java | 13 + .../nio/step01_java_nio/ChannelOverview.java | 34 +++ .../nio/step01_java_nio/SelectorOverview.java | 35 +++ .../SampleClient.java | 15 ++ .../SelectableChannelOverview.java | 96 +++++++ .../SampleClient.java | 15 ++ .../SelectableChannelOverview.java | 240 ++++++++++++++++++ .../src/test/java/dev/nio/AppTest.java | 38 +++ .../step03_netty/.gitignore | 38 +++ .../src-backend-java-nio/step03_netty/pom.xml | 44 ++++ .../src/main/java/dev/netty/App.java | 13 + .../step01_hello_netty/DiscardServer.java | 41 +++ .../DiscardServerHandler.java | 21 ++ .../netty/step01_hello_netty/EchoServer.java | 43 ++++ .../step01_hello_netty/EchoServerHandler.java | 37 +++ .../src/test/java/dev/netty/AppTest.java | 38 +++ 30 files changed, 1247 insertions(+), 1 deletion(-) delete mode 160000 week14/src-backend-java-nio create mode 100644 week14/src-backend-java-nio/.gitignore create mode 100644 week14/src-backend-java-nio/README.md create mode 100644 week14/src-backend-java-nio/step01_blocking_socket/.gitignore create mode 100644 week14/src-backend-java-nio/step01_blocking_socket/pom.xml create mode 100644 week14/src-backend-java-nio/step01_blocking_socket/src/main/java/dev/oio/App.java create mode 100644 week14/src-backend-java-nio/step01_blocking_socket/src/main/java/dev/oio/step01_thread_per_request/EchoClient.java create mode 100644 week14/src-backend-java-nio/step01_blocking_socket/src/main/java/dev/oio/step01_thread_per_request/EchoServer.java create mode 100644 week14/src-backend-java-nio/step01_blocking_socket/src/main/java/dev/oio/step02_thread_pooling/EchoClient.java create mode 100644 week14/src-backend-java-nio/step01_blocking_socket/src/main/java/dev/oio/step02_thread_pooling/EchoServer.java create mode 100644 week14/src-backend-java-nio/step01_blocking_socket/src/test/java/dev/oio/AppTest.java create mode 100644 week14/src-backend-java-nio/step02_non_blocking/.gitignore create mode 100644 week14/src-backend-java-nio/step02_non_blocking/index.txt create mode 100644 week14/src-backend-java-nio/step02_non_blocking/pom.xml create mode 100644 week14/src-backend-java-nio/step02_non_blocking/src/main/java/dev/nio/App.java create mode 100644 week14/src-backend-java-nio/step02_non_blocking/src/main/java/dev/nio/step01_java_nio/ChannelOverview.java create mode 100644 week14/src-backend-java-nio/step02_non_blocking/src/main/java/dev/nio/step01_java_nio/SelectorOverview.java create mode 100644 week14/src-backend-java-nio/step02_non_blocking/src/main/java/dev/nio/step02_using_selectable_channel/SampleClient.java create mode 100644 week14/src-backend-java-nio/step02_non_blocking/src/main/java/dev/nio/step02_using_selectable_channel/SelectableChannelOverview.java create mode 100644 week14/src-backend-java-nio/step02_non_blocking/src/main/java/dev/nio/step03_adding_more_event/SampleClient.java create mode 100644 week14/src-backend-java-nio/step02_non_blocking/src/main/java/dev/nio/step03_adding_more_event/SelectableChannelOverview.java create mode 100644 week14/src-backend-java-nio/step02_non_blocking/src/test/java/dev/nio/AppTest.java create mode 100644 week14/src-backend-java-nio/step03_netty/.gitignore create mode 100644 week14/src-backend-java-nio/step03_netty/pom.xml create mode 100644 week14/src-backend-java-nio/step03_netty/src/main/java/dev/netty/App.java create mode 100644 week14/src-backend-java-nio/step03_netty/src/main/java/dev/netty/step01_hello_netty/DiscardServer.java create mode 100644 week14/src-backend-java-nio/step03_netty/src/main/java/dev/netty/step01_hello_netty/DiscardServerHandler.java create mode 100644 week14/src-backend-java-nio/step03_netty/src/main/java/dev/netty/step01_hello_netty/EchoServer.java create mode 100644 week14/src-backend-java-nio/step03_netty/src/main/java/dev/netty/step01_hello_netty/EchoServerHandler.java create mode 100644 week14/src-backend-java-nio/step03_netty/src/test/java/dev/netty/AppTest.java diff --git a/week14/src-backend-java-nio b/week14/src-backend-java-nio deleted file mode 160000 index 2c612e5..0000000 --- a/week14/src-backend-java-nio +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 2c612e5265fc1d4465b12d48c8b2bb95a10385dc diff --git a/week14/src-backend-java-nio/.gitignore b/week14/src-backend-java-nio/.gitignore new file mode 100644 index 0000000..8af3bb4 --- /dev/null +++ b/week14/src-backend-java-nio/.gitignore @@ -0,0 +1,2 @@ +*/target/ +*/.idea/ diff --git a/week14/src-backend-java-nio/README.md b/week14/src-backend-java-nio/README.md new file mode 100644 index 0000000..82d9ff5 --- /dev/null +++ b/week14/src-backend-java-nio/README.md @@ -0,0 +1 @@ +# src-backend-java-nio diff --git a/week14/src-backend-java-nio/step01_blocking_socket/.gitignore b/week14/src-backend-java-nio/step01_blocking_socket/.gitignore new file mode 100644 index 0000000..5ff6309 --- /dev/null +++ b/week14/src-backend-java-nio/step01_blocking_socket/.gitignore @@ -0,0 +1,38 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/week14/src-backend-java-nio/step01_blocking_socket/pom.xml b/week14/src-backend-java-nio/step01_blocking_socket/pom.xml new file mode 100644 index 0000000..6d43a96 --- /dev/null +++ b/week14/src-backend-java-nio/step01_blocking_socket/pom.xml @@ -0,0 +1,37 @@ + + 4.0.0 + + dev.oio + step01_blocking_socket + 1.0-SNAPSHOT + jar + + step01_blocking_socket + http://maven.apache.org + + + UTF-8 + + + + + junit + junit + 3.8.1 + test + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + + + diff --git a/week14/src-backend-java-nio/step01_blocking_socket/src/main/java/dev/oio/App.java b/week14/src-backend-java-nio/step01_blocking_socket/src/main/java/dev/oio/App.java new file mode 100644 index 0000000..e28988f --- /dev/null +++ b/week14/src-backend-java-nio/step01_blocking_socket/src/main/java/dev/oio/App.java @@ -0,0 +1,13 @@ +package dev.oio; + +/** + * Hello world! + * + */ +public class App +{ + public static void main( String[] args ) + { + System.out.println( "Hello World!" ); + } +} diff --git a/week14/src-backend-java-nio/step01_blocking_socket/src/main/java/dev/oio/step01_thread_per_request/EchoClient.java b/week14/src-backend-java-nio/step01_blocking_socket/src/main/java/dev/oio/step01_thread_per_request/EchoClient.java new file mode 100644 index 0000000..d76be31 --- /dev/null +++ b/week14/src-backend-java-nio/step01_blocking_socket/src/main/java/dev/oio/step01_thread_per_request/EchoClient.java @@ -0,0 +1,31 @@ +package dev.oio.step01_thread_per_request; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.net.Socket; + +public class EchoClient { + public static void main(String[] args) { + String serverAddress = "localhost"; // 서버 주소 + int port = 12345; // 서버 포트 + + try (Socket socket = new Socket(serverAddress, port); + PrintWriter out = new PrintWriter(socket.getOutputStream(), true); + BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); + BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in))) { + + System.out.println("서버에 연결되었습니다."); + + String userInput; + while ((userInput = stdIn.readLine()) != null) { + out.println(userInput); // 서버로 메시지 전송 + System.out.println("서버로부터 수신: " + in.readLine()); // 서버로부터 에코 수신 + } + } catch (IOException e) { + e.printStackTrace(); + } + } +} + diff --git a/week14/src-backend-java-nio/step01_blocking_socket/src/main/java/dev/oio/step01_thread_per_request/EchoServer.java b/week14/src-backend-java-nio/step01_blocking_socket/src/main/java/dev/oio/step01_thread_per_request/EchoServer.java new file mode 100644 index 0000000..f592ebd --- /dev/null +++ b/week14/src-backend-java-nio/step01_blocking_socket/src/main/java/dev/oio/step01_thread_per_request/EchoServer.java @@ -0,0 +1,67 @@ +package dev.oio.step01_thread_per_request; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketException; + +public class EchoServer { + public static boolean isClientConnected = false; // 클라이언트 연결 상태 + + public static void main(String[] args) { + int port = 12345; // 서버 포트 + try (ServerSocket serverSocket = new ServerSocket(port)) { + System.out.println("서버가 시작되었습니다. 포트: " + port); + while (true) { + Socket clientSocket = serverSocket.accept(); + isClientConnected = true; // 클라이언트 연결 상태 설정 + System.out.println("클라이언트 연결: " + clientSocket.getInetAddress()); + + // 클라이언트와의 통신을 위한 스레드 생성 + new ClientHandler(clientSocket).start(); + } + } catch (IOException e) { + e.printStackTrace(); + } + } +} + +class ClientHandler extends Thread { + private Socket clientSocket; + + public ClientHandler(Socket socket) { + this.clientSocket = socket; + } + + @Override + public void run() { + System.out.println("클라이언트 스레드 ID: " + Thread.currentThread().getId()); // 스레드 ID 출력 + + try (BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); + PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true)) { + + String inputLine; + while ((inputLine = in.readLine()) != null) { + System.out.println("클라이언트 스레드 ID: " + Thread.currentThread().getId() + " 클라이언트로부터 수신: " + inputLine); + out.println(inputLine); // 클라이언트에게 에코 + } + } catch (SocketException e) { + System.out.println("클라이언트가 연결을 종료했습니다."); + } catch (IOException e) { + e.printStackTrace(); + } finally { + try { + clientSocket.close(); + System.out.println("클라이언트 연결 종료: " + clientSocket.getInetAddress()); + EchoServer.isClientConnected = false; // 연결 종료 시 상태 변경 + } catch (IOException e) { + e.printStackTrace(); + } + } + } +} + + diff --git a/week14/src-backend-java-nio/step01_blocking_socket/src/main/java/dev/oio/step02_thread_pooling/EchoClient.java b/week14/src-backend-java-nio/step01_blocking_socket/src/main/java/dev/oio/step02_thread_pooling/EchoClient.java new file mode 100644 index 0000000..562470b --- /dev/null +++ b/week14/src-backend-java-nio/step01_blocking_socket/src/main/java/dev/oio/step02_thread_pooling/EchoClient.java @@ -0,0 +1,37 @@ +package dev.oio.step02_thread_pooling; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.net.Socket; +import java.net.UnknownHostException; + +public class EchoClient { + public static void main(String[] args) { + String serverAddress = "localhost"; // 서버 주소 + int port = 12345; // 서버 포트 + + try { + // 서버에 연결 시도 + Socket socket = new Socket(serverAddress, port); + System.out.println("서버에 연결되었습니다."); // 연결 성공 시 메시지 출력 + + try (PrintWriter out = new PrintWriter(socket.getOutputStream(), true); + BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); + BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in))) { + + String userInput; + while ((userInput = stdIn.readLine()) != null) { + out.println(userInput); // 서버로 메시지 전송 + System.out.println("서버로부터 수신: " + in.readLine()); // 서버로부터 에코 수신 + } + } + } catch (UnknownHostException e) { + System.out.println("서버를 찾을 수 없습니다: " + e.getMessage()); + } catch (IOException e) { + System.out.println("서버에 연결 대기 중입니다."); // 연결 실패 시 대기 중 메시지 출력 + e.printStackTrace(); + } + } +} \ No newline at end of file diff --git a/week14/src-backend-java-nio/step01_blocking_socket/src/main/java/dev/oio/step02_thread_pooling/EchoServer.java b/week14/src-backend-java-nio/step01_blocking_socket/src/main/java/dev/oio/step02_thread_pooling/EchoServer.java new file mode 100644 index 0000000..91ca23c --- /dev/null +++ b/week14/src-backend-java-nio/step01_blocking_socket/src/main/java/dev/oio/step02_thread_pooling/EchoServer.java @@ -0,0 +1,98 @@ +package dev.oio.step02_thread_pooling; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; + +public class EchoServer { + private static final int MAX_THREADS = 2; // 최대 스레드 수 + private static final int MAX_QUEUE_SIZE = 5; // 대기 큐 크기 + private static ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(MAX_THREADS); // 스레드 풀 생성 + private static ArrayBlockingQueue waitingQueue = new ArrayBlockingQueue<>(MAX_QUEUE_SIZE); // 대기 큐 + + public static void main(String[] args) { + int port = 12345; // 서버 포트 + // 대기 클라이언트 처리 스레드 시작 + new Thread(EchoServer::handleWaitingClients).start(); + + try (ServerSocket serverSocket = new ServerSocket(port)) { + System.out.println("서버가 시작되었습니다. 포트: " + port); + while (true) { + // 실제 accept() 시스템 콜 호출 부분? -> PlainSocketImpl.socketAccept()의 accept0 + Socket clientSocket = serverSocket.accept(); + String clientAddress = clientSocket.getInetAddress().getHostAddress(); + int clientPort = clientSocket.getPort(); // 클라이언트 포트 번호 + System.out.println(clientAddress + ":" + clientPort + "클라이언트로부터 연결 시도"); + + // 현재 활성화된 스레드 수를 체크 + if (threadPool.getActiveCount() < MAX_THREADS) { + threadPool.execute(new ClientHandler(clientSocket)); + System.out.println(clientAddress + ":" + clientPort + " 클라이언트가 서버에 연결되었습니다"); + } else { + if (waitingQueue.offer(clientSocket)) { + System.out.println(clientAddress + ":" + clientPort + " 클라이언트가 서버에 연결 대기 중: "); + } else { + System.out.println("대기 큐가 가득 차서 연결을 거부합니다: " + clientAddress + ":" + clientPort); + clientSocket.close(); // 큐가 가득 차면 연결 종료 + } + } + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + private static void handleWaitingClients() { + while (true) { + try { + Socket clientSocket = waitingQueue.take(); // 대기 큐에서 클라이언트 소켓을 가져옴 + if (clientSocket != null) { + threadPool.execute(new ClientHandler(clientSocket)); + String clientAddress = clientSocket.getInetAddress().getHostAddress(); + int clientPort = clientSocket.getPort(); + // System.out.println(clientAddress + ":" + clientPort + " 클라이언트가 서버에 연결되었습니다"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + } +} + +class ClientHandler implements Runnable { + private Socket clientSocket; + + public ClientHandler(Socket socket) { + this.clientSocket = socket; + } + + @Override + public void run() { + System.out.println("클라이언트 스레드 ID: " + Thread.currentThread().getId()); // 스레드 ID 출력 + try (BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); + PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true)) { + + String inputLine; + while ((inputLine = in.readLine()) != null) { + System.out.println("클라이언트로부터 수신: " + inputLine); + out.println(inputLine); // 클라이언트에게 에코 + } + } catch (IOException e) { + System.out.println("클라이언트가 연결을 종료했습니다."); + } finally { + try { + clientSocket.close(); + System.out.println("클라이언트 연결 종료: " + clientSocket.getInetAddress().getHostAddress() + ":" + clientSocket.getPort()); + } catch (IOException e) { + e.printStackTrace(); + } + } + } +} diff --git a/week14/src-backend-java-nio/step01_blocking_socket/src/test/java/dev/oio/AppTest.java b/week14/src-backend-java-nio/step01_blocking_socket/src/test/java/dev/oio/AppTest.java new file mode 100644 index 0000000..55af013 --- /dev/null +++ b/week14/src-backend-java-nio/step01_blocking_socket/src/test/java/dev/oio/AppTest.java @@ -0,0 +1,38 @@ +package dev.oio; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +/** + * Unit test for simple App. + */ +public class AppTest + extends TestCase +{ + /** + * Create the test case + * + * @param testName name of the test case + */ + public AppTest( String testName ) + { + super( testName ); + } + + /** + * @return the suite of tests being tested + */ + public static Test suite() + { + return new TestSuite( AppTest.class ); + } + + /** + * Rigourous Test :-) + */ + public void testApp() + { + assertTrue( true ); + } +} diff --git a/week14/src-backend-java-nio/step02_non_blocking/.gitignore b/week14/src-backend-java-nio/step02_non_blocking/.gitignore new file mode 100644 index 0000000..5ff6309 --- /dev/null +++ b/week14/src-backend-java-nio/step02_non_blocking/.gitignore @@ -0,0 +1,38 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/week14/src-backend-java-nio/step02_non_blocking/index.txt b/week14/src-backend-java-nio/step02_non_blocking/index.txt new file mode 100644 index 0000000..e00a42b --- /dev/null +++ b/week14/src-backend-java-nio/step02_non_blocking/index.txt @@ -0,0 +1,61 @@ +1. 네트워크 프로그램 구현 방식 + - 블로킹 소켓 기반의 구현 + Q. 활용 API? + -> 소켓과 인풋/아웃풋 스트림 + +2. 서버 - 클라이언트 간 데이터 송/수신 방식 + - 스트림 + - 버퍼 + Q. 버퍼란? + -> 임시 메모리를 활용하여 스트림보다 더 빠르고 재사용 가능한 클래스 + + Q. 버퍼를 활용하여 송수신을 보다 빠르게 처리하는 방법? + -> 버퍼가 적용된 BufferedReader + +3. 다중 사용자 접속 처리하기 + Q. 처음에 구현한 블로킹 방식의 네트워크 프로그램의 특징? + -> Java OIO(Old IO, Java 1.4 이전) API인 ServerSocket과 Socket을 활용, + 블로킹으로 동작, 1:1 연결 밖에 지원하지 않음 + + Q. 하나의 서버로 둘 이상의 클라이언트를 받는 방법? + -> 총 2가지 + 1. 멀티 프로세싱 + 2. 멀티 스레딩 + 2-1. 요청당 스레딩 모델 + Q. 요청당 스레드 모델의 장단점? + + 2-2. 스레드 풀링 모델 + Q. 스레드 풀링 모델의 장단점? + Q. 스레드 풀에 생성될 적정 스레드 수 계산 방법? + + Q. 스레드 풀링 모델의 단점 개선하기 위한 방법? + -> 스레드가 IO 연산을 수행하는 동안에도 쉬지 않고, 다른 작업을 처리할 수 있는 + 논블로킹 메커니즘을 활용 + + 논블로킹에서는 스레드가 IO 작업을 수행하는 메서드를 호출하는 즉시 반환되며, + 작업이 아직 완료되지 않아도 스레드가 차단되지 않음 + + 이를 통해 하나의 스레드만 가지고도 둘 이상의 다중 접속 사용자를 처리할 수 있음 + (OS 레벨의 메커니즘에서는 IO 멀티 플렉싱, 입출력 다중화라고 함) + 정리하면, 하나의 서버로 둘 이상의 사용자 접속을 받는 방법은 두 가지가 아닌 총 세 가지 + -> 1. 멀티 프로세싱 + 2. 멀티 스레딩 + 3. 멀티 플렉싱(Java 1.4~에서 등장한 Java NIO, New, Non-blocking) +4. 논블로킹 기반의 네트워크 프로그램을 구현하기 위해 필요한 몇 가지 API + Q. 채널이란? + -> 스트림 기반의 블로킹 IO를 사용했던 초기 Java OIO에서는 대량의 데이터를 효율적으로 처리하거나 + 고성능 네트워크 서버를 구축하기에는 한계가 있었기 때문에 등장한 API + Selector와 결합할 경우에는 논블로킹 메커니즘을 구현할 수 있음 + + Q. 채널의 특징? + -> 채널은 Buffer를 활용하여 데이터를 블록 단위로 처리, 스트림에 비해 속도가 빠르고 효율적임 + 단방향으로 통신하는 스트림과는 다르게(InputStream, OutputStream) + 채널은 양방향으로 통신 + + Q. 셀렉터란? + -> OS의 IO 멀티플렉싱 메커니즘을 지원하는 API, + 여러 개의 Channel 들을 단일 스레드를 통해 감시하여(select()) OIO에 비해 서버의 확장성 개선 + Selector는 하나의 스레드가 여러 개의 Non-blocking 채널들을 감시할 수 있도록 도와주는 멀티 플렉싱 툴 + 이벤트 기반으로 IO를 처리함 + 이 맥락에서 이벤트의 예시는? + ex. 클라이언트 - 서버 간의 연결, 데이터 송/수신, 과정에서 발생한 에러 등 \ No newline at end of file diff --git a/week14/src-backend-java-nio/step02_non_blocking/pom.xml b/week14/src-backend-java-nio/step02_non_blocking/pom.xml new file mode 100644 index 0000000..8326a0c --- /dev/null +++ b/week14/src-backend-java-nio/step02_non_blocking/pom.xml @@ -0,0 +1,25 @@ + + 4.0.0 + + dev.nio + step02_non_blocking + 1.0-SNAPSHOT + jar + + step02_non_blocking + http://maven.apache.org + + + UTF-8 + + + + + junit + junit + 3.8.1 + test + + + diff --git a/week14/src-backend-java-nio/step02_non_blocking/src/main/java/dev/nio/App.java b/week14/src-backend-java-nio/step02_non_blocking/src/main/java/dev/nio/App.java new file mode 100644 index 0000000..ebde45c --- /dev/null +++ b/week14/src-backend-java-nio/step02_non_blocking/src/main/java/dev/nio/App.java @@ -0,0 +1,13 @@ +package dev.nio; + +/** + * Hello world! + * + */ +public class App +{ + public static void main( String[] args ) + { + System.out.println( "Hello World!" ); + } +} diff --git a/week14/src-backend-java-nio/step02_non_blocking/src/main/java/dev/nio/step01_java_nio/ChannelOverview.java b/week14/src-backend-java-nio/step02_non_blocking/src/main/java/dev/nio/step01_java_nio/ChannelOverview.java new file mode 100644 index 0000000..af09db6 --- /dev/null +++ b/week14/src-backend-java-nio/step02_non_blocking/src/main/java/dev/nio/step01_java_nio/ChannelOverview.java @@ -0,0 +1,34 @@ +package dev.nio.step01_java_nio; + +/** + * java.nio.channels.Channel + * + java.nio.channels 패키지(https://docs.oracle.com/javase/8/docs/api/java/nio/channels/package-summary.html) + * + * 파일 및 소켓과 같이 I/O 작업을 수행할 수 있는 엔티티에 대한 연결을 수행하는 역할인 Channel 클래스를 정의, + * 또한 다중화된(multiplexed) 논블로킹 I/O 작업을 위한 Selector 클래스를 정의함 + * + SelectableChannel + * A channel that can be multiplexed via a Selector. + * -> Selector를 통해 멀티플렉싱으로 동작하도록 하기 위해 필요한 채널 + * + * 이하 클래스들은 논블로킹이 가능하도록 Selector와 함께 사용될 수 있도록 상속받은 하위 클래스 + * -> XxxChannel 'extends AbstractSelectableChannel, SelectableChannel' + * + * DatagramChannel + * A selectable channel for datagram-oriented sockets. + * -> UDP 프로토콜 기반 통신 프로그램 구현 시 활용 + * + * FileChannel + * A channel for reading, writing, mapping, and manipulating a file. + * -> 파일 입출력 기반 통신 프로그램 구현 시 사용 + * + * ServerSocketChannel + * A selectable channel for stream-oriented listening sockets. + * -> TCP 프로토콜 기반 통신 프로그램 구현 시 사용 + */ +public class ChannelOverview { + public static void main(String[] args) { + + } +} diff --git a/week14/src-backend-java-nio/step02_non_blocking/src/main/java/dev/nio/step01_java_nio/SelectorOverview.java b/week14/src-backend-java-nio/step02_non_blocking/src/main/java/dev/nio/step01_java_nio/SelectorOverview.java new file mode 100644 index 0000000..b156b10 --- /dev/null +++ b/week14/src-backend-java-nio/step02_non_blocking/src/main/java/dev/nio/step01_java_nio/SelectorOverview.java @@ -0,0 +1,35 @@ +package dev.nio.step01_java_nio; + +import java.io.IOException; +import java.nio.channels.Selector; + +/** + * java.nio.channels.Selector + * + Selector + * A multiplexor of SelectableChannel objects. + * -> 논블로킹 IO(멀티플렉서)를 관리하는 핵심 컴포넌트 + * + */ +public class SelectorOverview { + public static void main(String[] args) { + try { + Selector selector = Selector.open(); // Selector는 open()을 통해 생성할 수 있음 + System.out.println("selector = " + selector); // 각 OS 전용 셀렉터 구현체가 생성됨 + /* + windows, selector = sun.nio.ch.WEPollSelectorImpl@cac736f + Mac OS, selector = sun.nio.ch.KQueueSelectorImpl@452b3a41 + */ + + // Selector가 열려있는지 확인할 수 있는 메서드, 열려있을 경우 true 반환 + // close() 메서드를 호출하기 전까지는 셀렉터는 열린 채로 유지됨 + System.out.println("selector.isOpen() = " + selector.isOpen()); + + // 클라이언트로부터 유입 이벤트를 기다림 + selector.select(); // 사용자의 연결이 올 때까지 스레드는 해당 라인에서 블로킹됨(프로그램이 종료되지 않음) + + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/week14/src-backend-java-nio/step02_non_blocking/src/main/java/dev/nio/step02_using_selectable_channel/SampleClient.java b/week14/src-backend-java-nio/step02_non_blocking/src/main/java/dev/nio/step02_using_selectable_channel/SampleClient.java new file mode 100644 index 0000000..3713305 --- /dev/null +++ b/week14/src-backend-java-nio/step02_non_blocking/src/main/java/dev/nio/step02_using_selectable_channel/SampleClient.java @@ -0,0 +1,15 @@ +package dev.nio.step02_using_selectable_channel; + +public class SampleClient { + /* + cmd에서 telnet 클라이언트를 통해 요청 테스트 + telnet localhost 5555와 같이 요청 수행 + + 둘 이상의 클라이언트를 테스트할 경우에는 + 새로운 터미널 실행 후 telnet~으로 동일하게 요청 수행 + + 한계점. + 현재 다중 클라이언트의 연결만 수용할 수 있도록 구현되어 있기 때문에 + 이후 클라이언트의 데이터 송/수신 처리는 아직 불가능 + */ +} diff --git a/week14/src-backend-java-nio/step02_non_blocking/src/main/java/dev/nio/step02_using_selectable_channel/SelectableChannelOverview.java b/week14/src-backend-java-nio/step02_non_blocking/src/main/java/dev/nio/step02_using_selectable_channel/SelectableChannelOverview.java new file mode 100644 index 0000000..8d637fb --- /dev/null +++ b/week14/src-backend-java-nio/step02_non_blocking/src/main/java/dev/nio/step02_using_selectable_channel/SelectableChannelOverview.java @@ -0,0 +1,96 @@ +package dev.nio.step02_using_selectable_channel; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.*; +import java.util.Iterator; +import java.util.Set; + +/** + * SelectableChannel + * + * Selector는 둘 이상의 Channel을 하나의 스레드를 통해 감시(select())할 수 있음 + * + * Selector에 채널을 등록하기 위해서는 SelectableChannel을 상속받아야 하며, + * SelectableChannel을 상속받았으며, TCP 연결용 채널인 ServerSocketChannel을 활용함 + * + * 결과적으로 Selector와 ServerSocketChannel을 통해 하나의 스레드로 둘 이상의 다중 클라이언트의 접속을 처리할 수 있음 + */ +public class SelectableChannelOverview { + public static void main(String[] args) { + final int DEFAULT_PORT = 5555; + + try { + // Selector 생성 + Selector selector = Selector.open(); + + // TCP 통신을 위한 ServerSocketChannel을 생성 + ServerSocketChannel serverSocketChannel + = ServerSocketChannel.open(); + + // Selector와 ServerSocketChannel이 성공적으로 열렸는지 확인 + if (serverSocketChannel.isOpen() && selector.isOpen()) { + // 생성한 논블로킹 소켓인 ServerSocketChannel을 논블로킹 모드로 설정 + serverSocketChannel.configureBlocking(false); + + // 클라이언트의 연결을 대기할 포트번호 지정(생성) + InetSocketAddress inetSocketAddress + = new InetSocketAddress(DEFAULT_PORT); + + // 생성한 포트를 서버 소켓 채널에 바인딩 + serverSocketChannel.bind(inetSocketAddress); + // -> serverSocketChannel 객체가 지정된 포트로부터 클라이언트의 연결을 받을 수 있게 됨 + + // 현재 생성된 서버 소켓 채널(serverSocketChannel)을 Selector 객체에 등록 + // SelectionKey.OP_ACCEPT - 셀렉터가 감지할 이벤트들 중에서 서버가 클라이언트의 연결 요청 수락 + // 두 번째 인자는 채널에서 발생하는 이벤트들 중 셀렉터를 통해 확인하고자(알림받고자) 하는 이벤트의 종류를 전달할 때 사용 + serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); + + System.out.println("서버는 클라이언트의 접속 대기 중.."); + while (true) { + selector.select(); // 클라이언트의 유입 이벤트가 올 때까지 대기(기다림) + + // SelectionKey 목록 조회 + Set selectionKeys = selector.selectedKeys(); + + Iterator keys = selectionKeys.iterator(); + + while (keys.hasNext()) { + SelectionKey key = keys.next(); + + // 현재 키에 해당하는 채널에서 조회된 IO 이벤트의 종류가 새로운 소켓 커넥션 연결 요청인지 확인 + if (key.isAcceptable()) { // 현재 키에 담긴 이벤트가 새로운 소켓 연결을 수락할 수 있는지? + // 소켓 연결 작업 수행 로직 작성 부분 + acceptOperation(key, selector); + } + } + } + } + + } catch (IOException e) { + throw new RuntimeException(e); + } + + } + + /* + 키의 채널(여기서는 ServerSocketChannel)이 새 소켓 연결을 수락할 수 있는 경우, + 처리할 수행 로직이 담긴 메서드 + */ + private static void acceptOperation(SelectionKey key, Selector selector) throws IOException { + // 클라이언트의 연결 요청 이벤트가 발생한 채널은 항상 ServerSocketChannel이기 때문에 + // 이벤트가 발생한 채널을 ServerSocketChannel 타입으로 캐스팅 + ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); + +// ServerSocketChannel을 활용하여 클라이언트의 연결을 수락하고, 연결된 클라이언트 SocketChannel 객체를 가져옴 + SocketChannel clientSocketChannel + = serverSocketChannel.accept();// accept()는 연결된 클라이언트 소켓 객체를 반환함 + System.out.println("clientSocketChannel = " + clientSocketChannel); + + // 연결된 클라이언트 소켓 모드를 논블로킹 모드로 설정 + clientSocketChannel.configureBlocking(false); + System.out.println(clientSocketChannel.getRemoteAddress() + " 로부터 클라이언트가 연결됨"); + } +} + + diff --git a/week14/src-backend-java-nio/step02_non_blocking/src/main/java/dev/nio/step03_adding_more_event/SampleClient.java b/week14/src-backend-java-nio/step02_non_blocking/src/main/java/dev/nio/step03_adding_more_event/SampleClient.java new file mode 100644 index 0000000..c4b0d0c --- /dev/null +++ b/week14/src-backend-java-nio/step02_non_blocking/src/main/java/dev/nio/step03_adding_more_event/SampleClient.java @@ -0,0 +1,15 @@ +package dev.nio.step03_adding_more_event; + +public class SampleClient { + /* + cmd에서 telnet 클라이언트를 통해 요청 테스트 + telnet localhost 5555와 같이 요청 수행 + + 둘 이상의 클라이언트를 테스트할 경우에는 + 새로운 터미널 실행 후 telnet~으로 동일하게 요청 수행 + + 한계점. + 현재 다중 클라이언트의 연결만 수용할 수 있도록 구현되어 있기 때문에 + 이후 클라이언트의 데이터 송/수신 처리는 아직 불가능 + */ +} diff --git a/week14/src-backend-java-nio/step02_non_blocking/src/main/java/dev/nio/step03_adding_more_event/SelectableChannelOverview.java b/week14/src-backend-java-nio/step02_non_blocking/src/main/java/dev/nio/step03_adding_more_event/SelectableChannelOverview.java new file mode 100644 index 0000000..72e0801 --- /dev/null +++ b/week14/src-backend-java-nio/step02_non_blocking/src/main/java/dev/nio/step03_adding_more_event/SelectableChannelOverview.java @@ -0,0 +1,240 @@ +package dev.nio.step03_adding_more_event; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.*; + +/** + * 현재 서버는 클라이언트의 연결 요청에 대한 이벤트만 처리할 수 있음 + * + * TODO: 따라서 추가적으로 클라이언트가 전송한 데이터를 읽을 수 있는 처리(Read) + * 읽은 데이터를 그대로 클라이언트에게 반환(Write)할 수 있는 처리 + */ +public class SelectableChannelOverview { + + /* + keepDataTrack + 클라이언트로부터 수신된 데이터를 추적, 관리하는 역할 + 각 클라이언트의 채널을 의미하는 SocketChannel을 key로 가지며, + 해당 클라이언트로부터 수신된 데이터를 저장하는 리스트(List)를 값으로 갖는 HashMap 객체 + 각 클라이언트가 보낸 메시지는 byte[] 형태로 리스트에 저장됨 + */ + private static Map> keepDataTrack = new HashMap<>(); + + /* + IO 작업 시 데이터의 임시 저장소 역할인 버퍼 생성 + */ + private static ByteBuffer buffer = ByteBuffer.allocate(2 * 1024); + + public static void main(String[] args) { + final int DEFAULT_PORT = 5555; + + try { + // Selector 생성 + Selector selector = Selector.open(); + + // TCP 통신을 위한 ServerSocketChannel을 생성 + ServerSocketChannel serverSocketChannel + = ServerSocketChannel.open(); + + // Selector와 ServerSocketChannel이 성공적으로 열렸는지 확인 + if (serverSocketChannel.isOpen() && selector.isOpen()) { + // 생성한 논블로킹 소켓인 ServerSocketChannel을 논블로킹 모드로 설정 + serverSocketChannel.configureBlocking(false); + + // 클라이언트의 연결을 대기할 포트번호 지정(생성) + InetSocketAddress inetSocketAddress + = new InetSocketAddress(DEFAULT_PORT); + + // 생성한 포트를 서버 소켓 채널에 바인딩 + serverSocketChannel.bind(inetSocketAddress); + // -> serverSocketChannel 객체가 지정된 포트로부터 클라이언트의 연결을 받을 수 있게 됨 + + // 현재 생성된 서버 소켓 채널(serverSocketChannel)을 Selector 객체에 등록 + // SelectionKey.OP_ACCEPT - 셀렉터가 감지할 이벤트들 중에서 서버가 클라이언트의 연결 요청 수락 + // 두 번째 인자는 채널에서 발생하는 이벤트들 중 셀렉터를 통해 확인하고자(알림받고자) 하는 이벤트의 종류를 전달할 때 사용 + serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); + + /* + SelectionKey + 채널이 Selector로 등록될 때마다 채널은 java.nio.channels.SelectionKey 클래스의 인스턴스를 통해 표현됨 + 이 인스턴스를 SelectionKey라고 함 + -> 서로 다른 채널에 속한 클라이언트들의 요청을 정렬하기 위해 셀렉터가 사용하는 헬퍼 객체 + + 각 헬퍼(key)는 단 하나의 클라이언트 서브 리퀘스트로 표현되며, + 클라이언트와 요청 유형(연결, 읽기, 쓰기 등)을 식별하는 정보가 들어있음 + + SelectionKey.OP_ACCEPT - 수락 가능, 연관된 클라이언트가 연결을 요청함 + SelectionKey.OP_CONNECT - 연결 가능, 서버가 연결을 수락함 + SelectionKey.OP_READ - 읽기 가능, 읽기 연산을 가리킴 + SelectionKey.OP_WRITE - 쓰기 가능, 쓰기 연산을 가리킴 + */ + + + System.out.println("서버는 클라이언트의 접속 대기 중.."); + while (true) { + selector.select(); // 클라이언트의 유입 이벤트가 올 때까지 대기(기다림) + + // SelectionKey 목록 조회 + Set selectionKeys = selector.selectedKeys(); + + Iterator keys = selectionKeys.iterator(); + + while (keys.hasNext()) { + SelectionKey key = keys.next(); + + // 같은 키가 반복해서 오는 것을 막기 위해 처리한 키는 제거, IO 이벤트가 발생한 채널에서 동일한 이벤트가 감지되는 것을 방지하기 위함 + keys.remove(); + + if (!key.isValid()) { // 키가 유효한지 확인, 키가 취소되거나 키의 채널이 닫혔거나 셀렉터가 닫혔다면 유효하지 않은 키 + continue; + } + + // 현재 키에 해당하는 채널에서 조회된 IO 이벤트의 종류가 새로운 소켓 커넥션 연결 요청인지 확인 + if (key.isAcceptable()) { // 현재 키에 담긴 이벤트가 새로운 소켓 연결을 수락할 수 있는지? + // 소켓 연결 작업 수행 로직 작성 부분 + acceptOperation(key, selector); + } else if (key.isReadable()) { // 조회된 IO 이벤트의 종류가 데이터 수신 요청인지 확인(키의 채널을 읽을 수 있는지 확인) + System.out.println("dd"); + readOperation(key); + } else if (key.isWritable()) { // 조회된 IO 이벤트의 종류가 데이터 쓰기 요청인지 확인(키의 채널에 쓸 수 있는지 확인) + writeOperation(key); + } + } + } + } + + } catch (IOException e) { + throw new RuntimeException(e); + } + + } + + /* + 키의 채널(여기서는 ServerSocketChannel)이 새 소켓 연결을 수락할 수 있는 경우, + 처리할 수행 로직이 담긴 메서드 + */ + private static void acceptOperation(SelectionKey key, Selector selector) throws IOException { + // 클라이언트의 연결 요청 이벤트가 발생한 채널은 항상 ServerSocketChannel이기 때문에 + // 이벤트가 발생한 채널을 ServerSocketChannel 타입으로 캐스팅 + ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); + + // ServerSocketChannel을 활용하여 클라이언트의 연결을 수락하고, 연결된 클라이언트 SocketChannel 객체를 가져옴 + SocketChannel clientSocketChannel + = serverSocketChannel.accept();// accept()는 연결된 클라이언트 소켓 객체를 반환함 + System.out.println("clientSocketChannel = " + clientSocketChannel); + + // 연결된 클라이언트 소켓 모드를 논블로킹 모드로 설정 + clientSocketChannel.configureBlocking(false); + System.out.println(clientSocketChannel.getRemoteAddress() + " 로부터 클라이언트가 연결됨"); + + // IO 처리를 위해 채널을 셀렉터에 등록 + keepDataTrack.put(clientSocketChannel, new ArrayList()); + // 클라이언트 채널에서 OP_READ가 발생하면 감지하여 처리할 수 있도록 + clientSocketChannel.register(selector, SelectionKey.OP_READ); + } + + /* + Key가 속한 채널의 데이터를 서버가 읽을 수 있는 경우 수행할 처리 로직 + → 클라이언트로부터 수신한 데이터를 읽어들임 + */ + private static void readOperation(SelectionKey key) { + try { + SocketChannel socketChannel = (SocketChannel) key.channel(); + + // 이전에 저장되었던 데이터를 지우고, 새로운 데이터를 받을 준비 + // 버퍼의 포지션을 0으로 설정, limit을 버퍼의 용량으로 설정하여 새로운 데이터를 쓸 수 있게 해줌 + buffer.clear(); + + // numRead - 클라이언트로부터 읽은 데이터의 바이트 길이를 나타내는 임시 변수 + int numRead = -1; + + try { + // 채널에 할당된 버퍼를 통해 바이트 값을 읽어들임, read()는 블로킹 방식으로 동작하기 때문에 클라이언트가 데이터를 보내기 전까지 호출한 스레드는 블로킹됨 + numRead = socketChannel.read(buffer); // 읽은 바이트 수를 반환, 읽을 수 있는 데이터가 없는 경우 -1을 반환(* -1은 클라이언트가 연결을 종료했음을 의미) + } + catch (IOException e) { + System.err.println("데이터 읽기 에러!"); + } + + if (numRead == -1) { // 클라이언트가 연결을 종료했을 경우, + keepDataTrack.remove(socketChannel); + System.out.println("클라이언트 연결(Connection) 종료 : " + + socketChannel.getRemoteAddress()); + socketChannel.close(); + key.cancel(); + return; + } + + // 클라이언트로부터 받은 데이터를 읽어서 클라이언트에게 그대로 응답해주는 처리 로직 + byte[] data = new byte[numRead]; // 읽은 데이터를 byte[] 타입 배열인 data 변수에 복사 + System.arraycopy(buffer.array(), 0, data, 0, numRead); + + System.out.println(new String(data, "UTF-8") + + " from " + socketChannel.getRemoteAddress()); // 문자열로 변환하여 콘솔에 출력 + + // 클라이언트에게 응답 처리 + doEchoJob(key, data); + + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /* + Key가 속한 채널에 서버가 데이터를 쓸 수 있는 경우 수행할 처리 로직 + → 클라이언트에게 데이터 응답 + */ + private static void writeOperation(SelectionKey key) throws IOException { + SocketChannel socketChannel = (SocketChannel) key.channel(); + List channelData = keepDataTrack.get(socketChannel); + + Iterator its = channelData.iterator(); // 특정 클라이언트 채널이 가진 데이터를 추출하기 위한 이터레이터 생성 + + while (its.hasNext()) { // 이터레이터를 순회하면서 데이터 추출 + byte[] it = its.next(); + its.remove(); + socketChannel.write(ByteBuffer.wrap(it)); + } + + // 관심 이벤트를 읽기 가능한 상태로 변경 + // 즉, 클라이언트에게 데이터를 전송한 후 서버는 다시 클라이언트로부터 수신할 데이터를 기다릴 준비가 되었음을 나타냄 + // 이 설정을 통해 서버는 클라이언트가 보낸 새로운 메시지를 읽기 위해 다시 준비됨 + key.interestOps(SelectionKey.OP_READ); + } + + + /* + 클라이언트로부터 수신한 데이터를 다시 그대로 클라이언트에게 응답하기 위한 처리 로직 + */ + private static void doEchoJob(SelectionKey key, byte[] data) { + // 특정 클라이언트에 해당하는 소켓채널을 불러옴 + SocketChannel socketChannel = (SocketChannel) key.channel(); + + // keepDataTrack에서 현재 클라이언트에 해당하는 채널과 관련된 데이터를 담고 있는 리스트를 가져옴 + List channelData + = keepDataTrack.get(socketChannel);// 특정 클라이언트가 가진 데이터 추출 + + channelData.add(data); // 클라이언트로부터 수신한 데이터를 byte[]에 추가 + + /* + SelectionKey가 감지해야할 IO이벤트의 관심사를 설정, + 여기서는 쓰기 가능한 상태(OP_WRITE)에 대한 관심사를 추가 + + 정리하면, 클라이언트로부터 수신한 데이터를 keepDataTrack에 추가한 후, + 서버가 클라이언트에게 데이터를 쓰기 위한 준비가 되었음을 나타냄 + + 따라서 이후 Selector가 SelectionKey에 대해 OP_WRITE 이벤트가 발생했을 때, + 클라이언트 소켓 채널에 데이터를 쓰기 위한 처리를 수행할 수 있게 됨 + */ + key.interestOps(SelectionKey.OP_WRITE); + + } +} + + diff --git a/week14/src-backend-java-nio/step02_non_blocking/src/test/java/dev/nio/AppTest.java b/week14/src-backend-java-nio/step02_non_blocking/src/test/java/dev/nio/AppTest.java new file mode 100644 index 0000000..48b1801 --- /dev/null +++ b/week14/src-backend-java-nio/step02_non_blocking/src/test/java/dev/nio/AppTest.java @@ -0,0 +1,38 @@ +package dev.nio; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +/** + * Unit test for simple App. + */ +public class AppTest + extends TestCase +{ + /** + * Create the test case + * + * @param testName name of the test case + */ + public AppTest( String testName ) + { + super( testName ); + } + + /** + * @return the suite of tests being tested + */ + public static Test suite() + { + return new TestSuite( AppTest.class ); + } + + /** + * Rigourous Test :-) + */ + public void testApp() + { + assertTrue( true ); + } +} diff --git a/week14/src-backend-java-nio/step03_netty/.gitignore b/week14/src-backend-java-nio/step03_netty/.gitignore new file mode 100644 index 0000000..5ff6309 --- /dev/null +++ b/week14/src-backend-java-nio/step03_netty/.gitignore @@ -0,0 +1,38 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/week14/src-backend-java-nio/step03_netty/pom.xml b/week14/src-backend-java-nio/step03_netty/pom.xml new file mode 100644 index 0000000..909e856 --- /dev/null +++ b/week14/src-backend-java-nio/step03_netty/pom.xml @@ -0,0 +1,44 @@ + + 4.0.0 + + dev.netty + step03_netty + 1.0-SNAPSHOT + jar + + step03_netty + http://maven.apache.org + + + UTF-8 + + + + + junit + junit + 3.8.1 + test + + + io.netty + netty-all + 4.1.119.Final + + + + + org.slf4j + slf4j-api + 2.0.7 + + + + + ch.qos.logback + logback-classic + 1.4.8 + + + diff --git a/week14/src-backend-java-nio/step03_netty/src/main/java/dev/netty/App.java b/week14/src-backend-java-nio/step03_netty/src/main/java/dev/netty/App.java new file mode 100644 index 0000000..4e8edf0 --- /dev/null +++ b/week14/src-backend-java-nio/step03_netty/src/main/java/dev/netty/App.java @@ -0,0 +1,13 @@ +package dev.netty; + +/** + * Hello world! + * + */ +public class App +{ + public static void main( String[] args ) + { + System.out.println( "Hello World!" ); + } +} diff --git a/week14/src-backend-java-nio/step03_netty/src/main/java/dev/netty/step01_hello_netty/DiscardServer.java b/week14/src-backend-java-nio/step03_netty/src/main/java/dev/netty/step01_hello_netty/DiscardServer.java new file mode 100644 index 0000000..1b5a763 --- /dev/null +++ b/week14/src-backend-java-nio/step03_netty/src/main/java/dev/netty/step01_hello_netty/DiscardServer.java @@ -0,0 +1,41 @@ +package dev.netty.step01_hello_netty; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.sctp.nio.NioSctpServerChannel; +import io.netty.channel.socket.SocketChannel; + +public class DiscardServer { + public static void main(String[] args) throws InterruptedException { + /* + worker와 boss를 구분 각각은 실제 작업(Data I/O)과 연결요청을 담당 + */ + EventLoopGroup bossGroup = new NioEventLoopGroup(1); + EventLoopGroup workerGroup = new NioEventLoopGroup(1); + + try { + ServerBootstrap bootstrap = new ServerBootstrap(); + + bootstrap.group(bossGroup, workerGroup) // event loop + .channel(NioSctpServerChannel.class) //socker mode (blocking or nonblocking) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel socketChannel) throws Exception { + ChannelPipeline pipeline = socketChannel.pipeline(); + pipeline.addLast(new DiscardServerHandler()); + } + }); + + ChannelFuture future = bootstrap.bind(8888).sync(); + + future.channel().closeFuture().sync(); + } finally { + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } + } +} diff --git a/week14/src-backend-java-nio/step03_netty/src/main/java/dev/netty/step01_hello_netty/DiscardServerHandler.java b/week14/src-backend-java-nio/step03_netty/src/main/java/dev/netty/step01_hello_netty/DiscardServerHandler.java new file mode 100644 index 0000000..76eee99 --- /dev/null +++ b/week14/src-backend-java-nio/step03_netty/src/main/java/dev/netty/step01_hello_netty/DiscardServerHandler.java @@ -0,0 +1,21 @@ +package dev.netty.step01_hello_netty; + +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.util.concurrent.EventExecutorGroup; + +public class DiscardServerHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + super.channelRead(ctx, msg); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + super.exceptionCaught(ctx, cause); + } +} + diff --git a/week14/src-backend-java-nio/step03_netty/src/main/java/dev/netty/step01_hello_netty/EchoServer.java b/week14/src-backend-java-nio/step03_netty/src/main/java/dev/netty/step01_hello_netty/EchoServer.java new file mode 100644 index 0000000..4bfd3cb --- /dev/null +++ b/week14/src-backend-java-nio/step03_netty/src/main/java/dev/netty/step01_hello_netty/EchoServer.java @@ -0,0 +1,43 @@ +package dev.netty.step01_hello_netty; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.sctp.nio.NioSctpServerChannel; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; + +public class EchoServer { + public static void main(String[] args) { + /* + worker와 boss를 구분 각각은 실제 작업(Data I/O)과 연결요청을 담당 + */ + EventLoopGroup bossGroup = new NioEventLoopGroup(1); + EventLoopGroup workerGroup = new NioEventLoopGroup(1); + + try { + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup) // event loop + .channel(NioServerSocketChannel.class) //socker mode (blocking or nonblocking) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel socketChannel) throws Exception { + ChannelPipeline pipeline = socketChannel.pipeline(); + pipeline.addLast(new EchoServerHandler()); + } + }); + + ChannelFuture future = bootstrap.bind(8888).sync(); + + future.channel().closeFuture().sync(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } + } +} diff --git a/week14/src-backend-java-nio/step03_netty/src/main/java/dev/netty/step01_hello_netty/EchoServerHandler.java b/week14/src-backend-java-nio/step03_netty/src/main/java/dev/netty/step01_hello_netty/EchoServerHandler.java new file mode 100644 index 0000000..2be3c8c --- /dev/null +++ b/week14/src-backend-java-nio/step03_netty/src/main/java/dev/netty/step01_hello_netty/EchoServerHandler.java @@ -0,0 +1,37 @@ +package dev.netty.step01_hello_netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +import java.nio.charset.Charset; + +public class EchoServerHandler extends ChannelInboundHandlerAdapter { + + /* + 서버 입장에서 채널을 통해 읽기 이벤트가 발생했을 때, 처리할 로직을 작성하는 부분 + */ + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + // 채널을 통해 버퍼에서 읽은 메시지를 문자열 타입으로 변환 + String readMessage = ((ByteBuf) msg).toString(Charset.defaultCharset()); + + StringBuilder builder = new StringBuilder(); + builder.append("수신한 문자열 ["); + builder.append(readMessage); + builder.append("]"); + System.out.println(builder.toString()); + + ctx.write(msg); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + // super.exceptionCaught(ctx, cause); + } +} diff --git a/week14/src-backend-java-nio/step03_netty/src/test/java/dev/netty/AppTest.java b/week14/src-backend-java-nio/step03_netty/src/test/java/dev/netty/AppTest.java new file mode 100644 index 0000000..cd60932 --- /dev/null +++ b/week14/src-backend-java-nio/step03_netty/src/test/java/dev/netty/AppTest.java @@ -0,0 +1,38 @@ +package dev.netty; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +/** + * Unit test for simple App. + */ +public class AppTest + extends TestCase +{ + /** + * Create the test case + * + * @param testName name of the test case + */ + public AppTest( String testName ) + { + super( testName ); + } + + /** + * @return the suite of tests being tested + */ + public static Test suite() + { + return new TestSuite( AppTest.class ); + } + + /** + * Rigourous Test :-) + */ + public void testApp() + { + assertTrue( true ); + } +} From 3494d35e87cf03d3a0a8754159e16d755026a52a Mon Sep 17 00:00:00 2001 From: sengjun0624 Date: Wed, 9 Apr 2025 09:17:22 +0900 Subject: [PATCH 03/10] =?UTF-8?q?:sparkles=20[WEEK14]=20Netty=20=EC=98=88?= =?UTF-8?q?=EC=A0=9C=20=EC=BD=94=EB=93=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../netty/step01_hello_netty/EchoServer.java | 46 +++++++++++++------ 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/week14/src-backend-java-nio/step03_netty/src/main/java/dev/netty/step01_hello_netty/EchoServer.java b/week14/src-backend-java-nio/step03_netty/src/main/java/dev/netty/step01_hello_netty/EchoServer.java index 4bfd3cb..b1af917 100644 --- a/week14/src-backend-java-nio/step03_netty/src/main/java/dev/netty/step01_hello_netty/EchoServer.java +++ b/week14/src-backend-java-nio/step03_netty/src/main/java/dev/netty/step01_hello_netty/EchoServer.java @@ -6,35 +6,53 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.sctp.nio.NioSctpServerChannel; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; +/* + Echo: 응답하다 + 클라이언트의 요청을 받고 응답하는 서버 + */ public class EchoServer { - public static void main(String[] args) { - /* - worker와 boss를 구분 각각은 실제 작업(Data I/O)과 연결요청을 담당 - */ + public static void main(String[] args) throws InterruptedException { + // Boss - 주로 클라이언트 연결 요청 처리 담당 + // 1 -> 부모 이벤트 루프 스레드 그룹은 단일 스레드로 동작 EventLoopGroup bossGroup = new NioEventLoopGroup(1); - EventLoopGroup workerGroup = new NioEventLoopGroup(1); + + // Worker - 데이터 송수신 처리 담당 + EventLoopGroup workerGroup = new NioEventLoopGroup(); + // NioEventLoopGroup()을 인수 없이 생성 -> 사용할 스레드 수를 서버 애플리케이션이 동작하는 하드웨어 코어 수를 기준으로 지정 + // 일반적으로 스레드 수는 하드웨어가 가지고 있는 CPU 코어 수의 2배를 사용함 + // 만약 서버 애플리케이션이 동작하는 하드웨어가 4코어 CPU이고 하이퍼 스레딩을 지원할 경우 스레드는 16개가 생성됨 try { + // 서버 초기화용 부트스트랩 객체 생성 ServerBootstrap bootstrap = new ServerBootstrap(); - bootstrap.group(bossGroup, workerGroup) // event loop - .channel(NioServerSocketChannel.class) //socker mode (blocking or nonblocking) + // 부트스트랩 객체를 통해 각종 네트워크 옵션 설정 적용 + bootstrap.group(bossGroup, workerGroup) // 이벤트 루프 설정, 미리 생성한 EventRoopGroup을 인수로 전달 group(부모 스레드, 자식 스레드) + + // 부모 스레드 - 클라이언트의 연결 요청 수락 담당 + // 자식 스레드 - 연결된 소켓에 대한 IO 처리 담당 + + + .channel(NioServerSocketChannel.class) // 서버 소켓(부모 스레드)이 사용할 네트워크 입출력 모드 설정, NIOServer~ - 논블로킹 모드 + + // ChannelInitializer - 클라이언트로부터 연결된 채널이 초기화될 때 수행할 동작이 지정된 클래스 .childHandler(new ChannelInitializer() { + @Override - protected void initChannel(SocketChannel socketChannel) throws Exception { - ChannelPipeline pipeline = socketChannel.pipeline(); - pipeline.addLast(new EchoServerHandler()); + protected void initChannel(SocketChannel socketChannel) throws Exception { // 채널 초기화 시 수행할 동작 + ChannelPipeline pipeline = socketChannel.pipeline(); // 채널 파이프라인 객체 생성 + pipeline.addLast(new EchoServerHandler()); // 채널 파이프라인에 EchoServerHandler 등록 + // EchoServerHandler - 클라이언트의 연결이 생성되었을 때 수행할 데이터 처리 로직 담당 } }); + // ChannelFuture를 통해 비동기 메서드의 처리 결과를 확인 + // sync(): ChannelFuture 객체의 요청이 완료될 때까지 대기 ChannelFuture future = bootstrap.bind(8888).sync(); - future.channel().closeFuture().sync(); - } catch (InterruptedException e) { - throw new RuntimeException(e); + } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); From f6ee6058b6770faf3b90e4afbe201e12e429c6e8 Mon Sep 17 00:00:00 2001 From: sengjun0624 Date: Wed, 9 Apr 2025 09:43:01 +0900 Subject: [PATCH 04/10] =?UTF-8?q?:sparkles=20[WEEK14]=20Rabbit=20Mq=20?= =?UTF-8?q?=ED=94=84=EB=A1=9C=EC=A0=9D=ED=8A=B8=20init?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rabbit-mq/hello_rabbitmq_java/.gitignore | 38 ++++++++++++++++ week14/rabbit-mq/hello_rabbitmq_java/pom.xml | 45 +++++++++++++++++++ .../src/main/java/dev/mq/App.java | 13 ++++++ .../src/main/java/dev/mq/steo01/Receiver.java | 5 +++ .../src/main/java/dev/mq/steo01/Sender.java | 4 ++ .../src/test/java/dev/mq/AppTest.java | 38 ++++++++++++++++ 6 files changed, 143 insertions(+) create mode 100644 week14/rabbit-mq/hello_rabbitmq_java/.gitignore create mode 100644 week14/rabbit-mq/hello_rabbitmq_java/pom.xml create mode 100644 week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/App.java create mode 100644 week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/steo01/Receiver.java create mode 100644 week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/steo01/Sender.java create mode 100644 week14/rabbit-mq/hello_rabbitmq_java/src/test/java/dev/mq/AppTest.java diff --git a/week14/rabbit-mq/hello_rabbitmq_java/.gitignore b/week14/rabbit-mq/hello_rabbitmq_java/.gitignore new file mode 100644 index 0000000..5ff6309 --- /dev/null +++ b/week14/rabbit-mq/hello_rabbitmq_java/.gitignore @@ -0,0 +1,38 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/week14/rabbit-mq/hello_rabbitmq_java/pom.xml b/week14/rabbit-mq/hello_rabbitmq_java/pom.xml new file mode 100644 index 0000000..c42dd63 --- /dev/null +++ b/week14/rabbit-mq/hello_rabbitmq_java/pom.xml @@ -0,0 +1,45 @@ + + 4.0.0 + + dev.mq + hello_rabbitmq_java + 1.0-SNAPSHOT + jar + + hello_rabbitmq_java + http://maven.apache.org + + + UTF-8 + + + + + junit + junit + 3.8.1 + test + + + com.rabbitmq + amqp-client + 5.18.0 + + + + + org.slf4j + slf4j-simple + 2.0.16 + test + + + + + org.slf4j + slf4j-api + 2.0.16 + + + diff --git a/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/App.java b/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/App.java new file mode 100644 index 0000000..75cf853 --- /dev/null +++ b/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/App.java @@ -0,0 +1,13 @@ +package dev.mq; + +/** + * Hello world! + * + */ +public class App +{ + public static void main( String[] args ) + { + System.out.println( "Hello World!" ); + } +} diff --git a/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/steo01/Receiver.java b/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/steo01/Receiver.java new file mode 100644 index 0000000..63671c8 --- /dev/null +++ b/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/steo01/Receiver.java @@ -0,0 +1,5 @@ +package dev.mq.steo01; + +public class Receiver { + +} diff --git a/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/steo01/Sender.java b/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/steo01/Sender.java new file mode 100644 index 0000000..25f6ba3 --- /dev/null +++ b/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/steo01/Sender.java @@ -0,0 +1,4 @@ +package dev.mq.steo01; + +public class Sender { +} diff --git a/week14/rabbit-mq/hello_rabbitmq_java/src/test/java/dev/mq/AppTest.java b/week14/rabbit-mq/hello_rabbitmq_java/src/test/java/dev/mq/AppTest.java new file mode 100644 index 0000000..c624fd1 --- /dev/null +++ b/week14/rabbit-mq/hello_rabbitmq_java/src/test/java/dev/mq/AppTest.java @@ -0,0 +1,38 @@ +package dev.mq; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +/** + * Unit test for simple App. + */ +public class AppTest + extends TestCase +{ + /** + * Create the test case + * + * @param testName name of the test case + */ + public AppTest( String testName ) + { + super( testName ); + } + + /** + * @return the suite of tests being tested + */ + public static Test suite() + { + return new TestSuite( AppTest.class ); + } + + /** + * Rigourous Test :-) + */ + public void testApp() + { + assertTrue( true ); + } +} From 887c9dee3420dda20a54bd89cbc3e92635085c7e Mon Sep 17 00:00:00 2001 From: sengjun0624 Date: Wed, 9 Apr 2025 10:08:26 +0900 Subject: [PATCH 05/10] :sparkles [WEEK14] Rabbit Mq Publisher --- .../src/main/java/dev/mq/steo01/Receiver.java | 4 +++ .../src/main/java/dev/mq/steo01/Sender.java | 35 +++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/steo01/Receiver.java b/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/steo01/Receiver.java index 63671c8..2be7eaf 100644 --- a/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/steo01/Receiver.java +++ b/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/steo01/Receiver.java @@ -1,5 +1,9 @@ package dev.mq.steo01; +/** + * Receiver - 메시지 소비자 (Consumer)행 + * + */ public class Receiver { } diff --git a/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/steo01/Sender.java b/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/steo01/Sender.java index 25f6ba3..005bf07 100644 --- a/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/steo01/Sender.java +++ b/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/steo01/Sender.java @@ -1,4 +1,39 @@ package dev.mq.steo01; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeoutException; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +/** + * Sender - 메시지 발행자 (Publisher) + * + * 퍼블리셔는 RabbitMq와 연결, 메시지를 전송하는 처리 수행 + * 컨슈머는 메시지 큐로부터 퍼블리셔가 적재한 메시지를 소비하는 처리 수행 + */ public class Sender { + private static final String QUEUE_NAME = "hello"; + + public static void main(String[] args) throws IOException, TimeoutException { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost("localhost"); + + // RabbitMQ 서버(5672)와 커넥션 연결 수행 + Connection connection = factory.newConnection(); + // 채녈 생성 + Channel channel = connection.createChannel(); + + // 큐 생성 + channel.queueDeclare(QUEUE_NAME, false, false, false, null); + + String messgae = "Hell World!"; + + // Publish + channel + .basicPublish("", QUEUE_NAME, null, messgae.getBytes(StandardCharsets.UTF_8)); + System.out.println(" [Publisher] Sent = " + messgae); + } } From 77a7fddd5a43e942140b06bad26b5ce30226ea32 Mon Sep 17 00:00:00 2001 From: sengjun0624 Date: Wed, 9 Apr 2025 10:20:22 +0900 Subject: [PATCH 06/10] =?UTF-8?q?:sparkles=20[WEEK14]=20Basic=20=EB=B0=9C?= =?UTF-8?q?=ED=96=89/=EC=86=8C=EB=B9=84=20=ED=8C=A8=ED=84=B4=20=EA=B5=AC?= =?UTF-8?q?=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- week14/rabbit-mq/hello_rabbitmq_java/pom.xml | 12 +++++++ .../src/main/java/dev/mq/steo01/Receiver.java | 31 +++++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/week14/rabbit-mq/hello_rabbitmq_java/pom.xml b/week14/rabbit-mq/hello_rabbitmq_java/pom.xml index c42dd63..05ece4d 100644 --- a/week14/rabbit-mq/hello_rabbitmq_java/pom.xml +++ b/week14/rabbit-mq/hello_rabbitmq_java/pom.xml @@ -42,4 +42,16 @@ 2.0.16 + + + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + + diff --git a/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/steo01/Receiver.java b/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/steo01/Receiver.java index 2be7eaf..7a5b7f3 100644 --- a/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/steo01/Receiver.java +++ b/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/steo01/Receiver.java @@ -1,9 +1,40 @@ package dev.mq.steo01; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeoutException; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DeliverCallback; + /** * Receiver - 메시지 소비자 (Consumer)행 * */ public class Receiver { + private static final String QUEUE_NAME = "hello"; + + public static void main(String[] args) throws IOException, TimeoutException { + + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost("localhost"); + + // RabbitMQ 서버(5672)와 커넥션 연결 수행 + Connection connection = factory.newConnection(); + // 채녈 생성 + Channel channel = connection.createChannel(); + + // 큐 생성 + channel.queueDeclare(QUEUE_NAME, false, false, false, null); + System.out.println(" [Receiver] Waiting ... "); + + DeliverCallback deliverCallback = (consumerTag, delivery) -> { + String message = new String(delivery.getBody(), StandardCharsets.UTF_8); + System.out.println("[Receiver] 메시지를 받음: " + message); + }; + channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {}); + } } From 982f2e3772ec365beb451d1ac6c052bbcd967204 Mon Sep 17 00:00:00 2001 From: sengjun0624 Date: Wed, 9 Apr 2025 11:22:57 +0900 Subject: [PATCH 07/10] =?UTF-8?q?:sparkles=20[WEEK14]=20=EC=9E=91=EC=97=85?= =?UTF-8?q?=EC=8B=9C=EA=B0=84=EC=9D=B4=20=EA=B8=B4=20=EB=A9=94=EC=8B=9C?= =?UTF-8?q?=EC=A7=80=20=EC=B2=98=EB=A6=AC=20=EC=98=88=EC=A0=9C=20=EA=B5=AC?= =?UTF-8?q?=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/java/dev/mq/step02/NewTask.java | 47 +++++++++++++++ .../src/main/java/dev/mq/step02/Worker.java | 58 +++++++++++++++++++ 2 files changed, 105 insertions(+) create mode 100644 week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step02/NewTask.java create mode 100644 week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step02/Worker.java diff --git a/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step02/NewTask.java b/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step02/NewTask.java new file mode 100644 index 0000000..a7e7619 --- /dev/null +++ b/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step02/NewTask.java @@ -0,0 +1,47 @@ +package dev.mq.step02; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +/* + 작업(Task)을 작업 큐(Work queue)에 스케줄링하는 프로그램 + 문자열값이 어떤 복잡한 작업이라고 가정할 때, + ex. Hello...이라고 하면 + . 하나는 1초가 소요되는 작업이라고 가정 + + 따라서 Hello...는 3초가 걸리는 작업이라고 가정 + + 실행 인자값(args) 활용해서 Hello...과 같은 값 입력 + */ +public class NewTask { + + private static final String TASK_QUEUE_NAME = "task_queue"; + + public static void main(String[] args) throws IOException, TimeoutException { + // 서버 연결 + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost("localhost"); + try (Connection connection = factory.newConnection(); + Channel channel = connection.createChannel()) { + + // 큐 생성 + // TODO: durable=true로 변경 + channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null); + + // 실행 옵션으로 메시지를 생성, ex. Hello... + String message = String.join(" ", args); + + // 메시지 적재 + channel.basicPublish("", TASK_QUEUE_NAME, + null, + message.getBytes("UTF-8")); + + System.out.println(" [Publisher] Sent " + message); + } + + } +} diff --git a/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step02/Worker.java b/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step02/Worker.java new file mode 100644 index 0000000..2d036cf --- /dev/null +++ b/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step02/Worker.java @@ -0,0 +1,58 @@ +package dev.mq.step02; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DeliverCallback; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +/* + 작업 큐에서 꺼낸 작업의 메시지를 처리하는 역할 + */ +public class Worker { + + private static final String TASK_QUEUE_NAME = "task_queue"; + + public static void main(String[] args) throws IOException, TimeoutException { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost("localhost"); + final Connection connection = factory.newConnection(); + final Channel channel = connection.createChannel(); + + channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null); + System.out.println("[Consumer(Worker)] 메시지를 기다리는 중.."); + + channel.basicQos(1); + + DeliverCallback deliverCallback = (consumerTag, delivery) -> { + // 메시지 추출 + String message = new String(delivery.getBody(), "UTF-8"); + + System.out.println(" [Consumer(Worker)] Received '" + message + "'"); + try { + doWork(message); // 받은 메시지 처리 작업 수행 + } finally { + System.out.println(" [Consumer(Worker)] Done"); + channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); + } + }; + + boolean autoAcknowledgement = false; + channel.basicConsume(TASK_QUEUE_NAME, autoAcknowledgement, deliverCallback, consumerTag -> { }); + } + + // Task 처리 메서드, Hello...일 경우 .하나당 1초씩 지연해서 처리해야함 + private static void doWork(String task) { + for (char ch : task.toCharArray()) { + if (ch == '.') { // .(dot) 한 개당 1초씩 지연 + try { + Thread.sleep(1000); + } catch (InterruptedException _ignored) { + Thread.currentThread().interrupt(); + } + } + } + } +} From 2a8fbed9fa4f6630b17341343f01fa92059be6e5 Mon Sep 17 00:00:00 2001 From: sengjun0624 Date: Wed, 9 Apr 2025 14:42:39 +0900 Subject: [PATCH 08/10] :sparkles [WEEK14] pub/sub --- .../src/main/java/dev/mq/step02/NewTask.java | 9 ++- .../src/main/java/dev/mq/step02/Worker.java | 8 ++- .../src/main/java/dev/mq/step03/EmitLogs.java | 45 ++++++++++++++ .../main/java/dev/mq/step03/ReceiveLogs.java | 34 ++++++++++ .../src/main/java/dev/mq/step03/Worker.java | 62 +++++++++++++++++++ 5 files changed, 151 insertions(+), 7 deletions(-) create mode 100644 week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step03/EmitLogs.java create mode 100644 week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step03/ReceiveLogs.java create mode 100644 week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step03/Worker.java diff --git a/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step02/NewTask.java b/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step02/NewTask.java index a7e7619..f9d14be 100644 --- a/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step02/NewTask.java +++ b/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step02/NewTask.java @@ -3,6 +3,7 @@ import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.MessageProperties; import java.io.IOException; import java.util.concurrent.TimeoutException; @@ -29,17 +30,15 @@ public static void main(String[] args) throws IOException, TimeoutException { Channel channel = connection.createChannel()) { // 큐 생성 - // TODO: durable=true로 변경 - channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null); - + boolean durable = true; + channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null); // 실행 옵션으로 메시지를 생성, ex. Hello... String message = String.join(" ", args); // 메시지 적재 channel.basicPublish("", TASK_QUEUE_NAME, - null, + MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); - System.out.println(" [Publisher] Sent " + message); } diff --git a/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step02/Worker.java b/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step02/Worker.java index 2d036cf..f193297 100644 --- a/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step02/Worker.java +++ b/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step02/Worker.java @@ -21,7 +21,8 @@ public static void main(String[] args) throws IOException, TimeoutException { final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); - channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null); + boolean durable = true; + channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null); System.out.println("[Consumer(Worker)] 메시지를 기다리는 중.."); channel.basicQos(1); @@ -40,14 +41,17 @@ public static void main(String[] args) throws IOException, TimeoutException { }; boolean autoAcknowledgement = false; - channel.basicConsume(TASK_QUEUE_NAME, autoAcknowledgement, deliverCallback, consumerTag -> { }); + channel.basicConsume(TASK_QUEUE_NAME, autoAcknowledgement, deliverCallback, consumerTag -> { + }); } // Task 처리 메서드, Hello...일 경우 .하나당 1초씩 지연해서 처리해야함 private static void doWork(String task) { + int cnt = 0; for (char ch : task.toCharArray()) { if (ch == '.') { // .(dot) 한 개당 1초씩 지연 try { + System.out.println(++cnt + "초 동안 작업 수행중"); Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); diff --git a/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step03/EmitLogs.java b/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step03/EmitLogs.java new file mode 100644 index 0000000..e518a75 --- /dev/null +++ b/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step03/EmitLogs.java @@ -0,0 +1,45 @@ +package dev.mq.step03; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.MessageProperties; + +/* + 작업(Task)을 작업 큐(Work queue)에 스케줄링하는 프로그램 + 문자열값이 어떤 복잡한 작업이라고 가정할 때, + ex. Hello...이라고 하면 + . 하나는 1초가 소요되는 작업이라고 가정 + + 따라서 Hello...는 3초가 걸리는 작업이라고 가정 + + 실행 인자값(args) 활용해서 Hello...과 같은 값 입력 + */ +public class EmitLogs { + + + private static final String EXCHANGE_NAME = "logs"; + + public static void main(String[] args) throws IOException, TimeoutException { + // 서버 연결 + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost("localhost"); + try (Connection connection = factory.newConnection(); + Channel channel = connection.createChannel()) { + + String message = args.length < 1 ? "info: Hello World!" : + String.join(" ", args); + + + // Exchange 생성 + channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); + + channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); + System.out.println(" [x] Sent '" + message + "'"); + } + + } +} diff --git a/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step03/ReceiveLogs.java b/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step03/ReceiveLogs.java new file mode 100644 index 0000000..a23f312 --- /dev/null +++ b/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step03/ReceiveLogs.java @@ -0,0 +1,34 @@ +package dev.mq.step03; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DeliverCallback; +import com.rabbitmq.client.MessageProperties; + +public class ReceiveLogs { + + private static final String EXCHANGE_NAME = "logs"; + + public static void main(String[] argv) throws Exception { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost("localhost"); + Connection connection = factory.newConnection(); + Channel channel = connection.createChannel(); + + channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); + String queueName = channel.queueDeclare().getQueue(); + channel.queueBind(queueName, EXCHANGE_NAME, ""); + + System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); + + DeliverCallback deliverCallback = (consumerTag, delivery) -> { + String message = new String(delivery.getBody(), "UTF-8"); + System.out.println(" [x] Received '" + message + "'"); + }; + channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); + } +} diff --git a/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step03/Worker.java b/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step03/Worker.java new file mode 100644 index 0000000..ba8c8e7 --- /dev/null +++ b/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step03/Worker.java @@ -0,0 +1,62 @@ +package dev.mq.step03; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DeliverCallback; + +/* + 작업 큐에서 꺼낸 작업의 메시지를 처리하는 역할 + */ +public class Worker { + + private static final String TASK_QUEUE_NAME = "task_queue"; + + public static void main(String[] args) throws IOException, TimeoutException { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost("localhost"); + final Connection connection = factory.newConnection(); + final Channel channel = connection.createChannel(); + + boolean durable = true; + channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null); + System.out.println("[Consumer(Worker)] 메시지를 기다리는 중.."); + + channel.basicQos(1); + + DeliverCallback deliverCallback = (consumerTag, delivery) -> { + // 메시지 추출 + String message = new String(delivery.getBody(), "UTF-8"); + + System.out.println(" [Consumer(Worker)] Received '" + message + "'"); + try { + doWork(message); // 받은 메시지 처리 작업 수행 + } finally { + System.out.println(" [Consumer(Worker)] Done"); + channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); + } + }; + + boolean autoAcknowledgement = false; + channel.basicConsume(TASK_QUEUE_NAME, autoAcknowledgement, deliverCallback, consumerTag -> { + }); + } + + // Task 처리 메서드, Hello...일 경우 .하나당 1초씩 지연해서 처리해야함 + private static void doWork(String task) { + int cnt = 0; + for (char ch : task.toCharArray()) { + if (ch == '.') { // .(dot) 한 개당 1초씩 지연 + try { + System.out.println(++cnt + "초 동안 작업 수행중"); + Thread.sleep(1000); + } catch (InterruptedException _ignored) { + Thread.currentThread().interrupt(); + } + } + } + } +} From db989805762591ff78e4ab0c9cbde2e5d3891e52 Mon Sep 17 00:00:00 2001 From: sengjun0624 Date: Wed, 9 Apr 2025 15:04:19 +0900 Subject: [PATCH 09/10] =?UTF-8?q?:sparkles=20[WEEK14]=20Direct=20=EB=B0=A9?= =?UTF-8?q?=EC=8B=9D=EC=9D=98=20Exchange=20=EC=82=AC=EC=9A=A9=20-=20Routin?= =?UTF-8?q?g?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/dev/mq/step04/EmitLogDirect.java | 56 +++++++++++++++++++ .../java/dev/mq/step04/ReceiveLogsDirect.java | 42 ++++++++++++++ 2 files changed, 98 insertions(+) create mode 100644 week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step04/EmitLogDirect.java create mode 100644 week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step04/ReceiveLogsDirect.java diff --git a/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step04/EmitLogDirect.java b/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step04/EmitLogDirect.java new file mode 100644 index 0000000..095abb1 --- /dev/null +++ b/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step04/EmitLogDirect.java @@ -0,0 +1,56 @@ +package dev.mq.step04; + +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.TimeoutException; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +/* + 작업(Task)을 작업 큐(Work queue)에 스케줄링하는 프로그램 + 문자열값이 어떤 복잡한 작업이라고 가정할 때, + ex. Hello...이라고 하면 + . 하나는 1초가 소요되는 작업이라고 가정 + + 따라서 Hello...는 3초가 걸리는 작업이라고 가정 + + 실행 인자값(args) 활용해서 Hello...과 같은 값 입력 + */ +public class EmitLogDirect { + + + + private static final String EXCHANGE_NAME = "direct_logs"; + + public static void main(String[] argv) throws Exception { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost("localhost"); + + try (Connection connection = factory.newConnection(); + Channel channel = connection.createChannel()) { + // fan-out아닌 direct로 routing key로 사용 + channel.exchangeDeclare(EXCHANGE_NAME, "direct"); + + //argv로 주어지는 로그 레벨 가져오기 + String severity = getSeverity(argv); + //argv로 주어지는 메시지 가져오기 + String message = getMessage(argv); + + // 로그 레벨에 맞는 메시지 publish + channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8")); + System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); + } + } + private static String getSeverity(String[] strings) { + if (strings.length < 1) return "info"; // 기본값은 info + return strings[0]; // 첫 번째 인자를 로그 레벨로 간주 + } + + private static String getMessage(String[] strings) { + if (strings.length < 2) return "Hello World"; + return String.join(" ", Arrays.copyOfRange(strings, 1, strings.length)); + } + +} diff --git a/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step04/ReceiveLogsDirect.java b/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step04/ReceiveLogsDirect.java new file mode 100644 index 0000000..48a342e --- /dev/null +++ b/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step04/ReceiveLogsDirect.java @@ -0,0 +1,42 @@ +package dev.mq.step04; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DeliverCallback; + +public class ReceiveLogsDirect { + + + private static final String EXCHANGE_NAME = "direct_logs"; + + public static void main(String[] argv) throws Exception { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost("localhost"); + Connection connection = factory.newConnection(); + Channel channel = connection.createChannel(); + + channel.exchangeDeclare(EXCHANGE_NAME, "direct"); + String queueName = channel.queueDeclare().getQueue(); + + if (argv.length < 1) { + // ERROR routing key로 사용할 log-level이 주어지지 않음. + System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]"); + System.exit(1); + } + + // 주어지는 log-level들을 queue에 바인딩 키로 넣어줌. + for (String severity : argv) { + channel.queueBind(queueName, EXCHANGE_NAME, severity); + } + + System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); + + DeliverCallback deliverCallback = (consumerTag, delivery) -> { + String message = new String(delivery.getBody(), "UTF-8"); + System.out.println(" [x] Received '" + + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); + }; + channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); + } +} From ef24e3751cad81f183c93c969a8008d1e6d1f87d Mon Sep 17 00:00:00 2001 From: sengjun0624 Date: Wed, 9 Apr 2025 15:21:48 +0900 Subject: [PATCH 10/10] =?UTF-8?q?:sparkles=20[WEEK14]=20Topic=20=EB=B0=A9?= =?UTF-8?q?=EC=8B=9D=EC=9D=98=20RabbitMq?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/dev/mq/step04/EmitLogDirect.java | 56 ------------------- .../main/java/dev/mq/step04/EmitLogTopic.java | 42 ++++++++++++++ ...eLogsDirect.java => ReceiveLogsTopic.java} | 18 +++--- 3 files changed, 51 insertions(+), 65 deletions(-) delete mode 100644 week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step04/EmitLogDirect.java create mode 100644 week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step04/EmitLogTopic.java rename week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step04/{ReceiveLogsDirect.java => ReceiveLogsTopic.java} (66%) diff --git a/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step04/EmitLogDirect.java b/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step04/EmitLogDirect.java deleted file mode 100644 index 095abb1..0000000 --- a/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step04/EmitLogDirect.java +++ /dev/null @@ -1,56 +0,0 @@ -package dev.mq.step04; - -import java.io.IOException; -import java.util.Arrays; -import java.util.concurrent.TimeoutException; - -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; - -/* - 작업(Task)을 작업 큐(Work queue)에 스케줄링하는 프로그램 - 문자열값이 어떤 복잡한 작업이라고 가정할 때, - ex. Hello...이라고 하면 - . 하나는 1초가 소요되는 작업이라고 가정 - - 따라서 Hello...는 3초가 걸리는 작업이라고 가정 - - 실행 인자값(args) 활용해서 Hello...과 같은 값 입력 - */ -public class EmitLogDirect { - - - - private static final String EXCHANGE_NAME = "direct_logs"; - - public static void main(String[] argv) throws Exception { - ConnectionFactory factory = new ConnectionFactory(); - factory.setHost("localhost"); - - try (Connection connection = factory.newConnection(); - Channel channel = connection.createChannel()) { - // fan-out아닌 direct로 routing key로 사용 - channel.exchangeDeclare(EXCHANGE_NAME, "direct"); - - //argv로 주어지는 로그 레벨 가져오기 - String severity = getSeverity(argv); - //argv로 주어지는 메시지 가져오기 - String message = getMessage(argv); - - // 로그 레벨에 맞는 메시지 publish - channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8")); - System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); - } - } - private static String getSeverity(String[] strings) { - if (strings.length < 1) return "info"; // 기본값은 info - return strings[0]; // 첫 번째 인자를 로그 레벨로 간주 - } - - private static String getMessage(String[] strings) { - if (strings.length < 2) return "Hello World"; - return String.join(" ", Arrays.copyOfRange(strings, 1, strings.length)); - } - -} diff --git a/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step04/EmitLogTopic.java b/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step04/EmitLogTopic.java new file mode 100644 index 0000000..1c61a9e --- /dev/null +++ b/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step04/EmitLogTopic.java @@ -0,0 +1,42 @@ +package dev.mq.step04; + +import java.util.Arrays; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + + +public class EmitLogTopic { + + + + private static final String EXCHANGE_NAME = "topic_logs"; + + public static void main(String[] argv) throws Exception { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost("localhost"); + try (Connection connection = factory.newConnection(); + Channel channel = connection.createChannel()) { + + channel.exchangeDeclare(EXCHANGE_NAME, "topic"); + + String routingKey = getRouting(argv); + String message = getMessage(argv); + + channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8")); + System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); + } + } + + private static String getRouting(String[] strings) { + if (strings.length < 1) return "#"; // 기본값은 # + return strings[0]; // 첫 번째 인자를 Topic 간주 + } + + private static String getMessage(String[] strings) { + if (strings.length < 2) return "Hello World"; + return String.join(" ", Arrays.copyOfRange(strings, 1, strings.length)); + } + +} diff --git a/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step04/ReceiveLogsDirect.java b/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step04/ReceiveLogsTopic.java similarity index 66% rename from week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step04/ReceiveLogsDirect.java rename to week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step04/ReceiveLogsTopic.java index 48a342e..331378f 100644 --- a/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step04/ReceiveLogsDirect.java +++ b/week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/step04/ReceiveLogsTopic.java @@ -5,29 +5,29 @@ import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; -public class ReceiveLogsDirect { +public class ReceiveLogsTopic { - - private static final String EXCHANGE_NAME = "direct_logs"; + private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); + Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); - channel.exchangeDeclare(EXCHANGE_NAME, "direct"); + channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1) { - // ERROR routing key로 사용할 log-level이 주어지지 않음. - System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]"); + System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); System.exit(1); } - // 주어지는 log-level들을 queue에 바인딩 키로 넣어줌. - for (String severity : argv) { - channel.queueBind(queueName, EXCHANGE_NAME, severity); + // RoutingKey로 사용할 토픽을 Exchange에 붙여서 구독중인 토픽에 대한 메시지 수신하게 함. + // 조건에 맞지 않는 메시지는 자동 삭제 + for (String bindingKey : argv) { + channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C");