From 5fd20759e6dd8b6683d3efff9ab06b113237e248 Mon Sep 17 00:00:00 2001 From: Andrew Egelhofer Date: Mon, 8 Jun 2020 13:06:21 -0700 Subject: [PATCH 1/8] Bump Confluent to 6.1.0-SNAPSHOT, Kafka to 6.1.0-SNAPSHOT --- core/pom.xml | 2 +- examples/pom.xml | 2 +- package/pom.xml | 2 +- pom.xml | 2 +- test/pom.xml | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index 91f8d75e1f..85250dbef6 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -7,7 +7,7 @@ io.confluent rest-utils-parent - 6.0.0-SNAPSHOT + 6.1.0-SNAPSHOT rest-utils diff --git a/examples/pom.xml b/examples/pom.xml index db563d4e73..0724108cf1 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -8,7 +8,7 @@ rest-utils-parent io.confluent - 6.0.0-SNAPSHOT + 6.1.0-SNAPSHOT rest-utils-examples diff --git a/package/pom.xml b/package/pom.xml index fa8498c177..9fe549e51f 100644 --- a/package/pom.xml +++ b/package/pom.xml @@ -7,7 +7,7 @@ io.confluent rest-utils-parent - 6.0.0-SNAPSHOT + 6.1.0-SNAPSHOT rest-utils-package diff --git a/pom.xml b/pom.xml index df48b3c21d..ab6c7a3695 100644 --- a/pom.xml +++ b/pom.xml @@ -8,7 +8,7 @@ io.confluent common - 6.0.0-SNAPSHOT + 6.1.0-SNAPSHOT rest-utils-parent diff --git a/test/pom.xml b/test/pom.xml index 6357f4cfe1..fa08a40a10 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -8,7 +8,7 @@ io.confluent rest-utils-parent - 6.0.0-SNAPSHOT + 6.1.0-SNAPSHOT rest-utils-test From 23ffc8b6f0ea800ee038b1dd141e10f66383eb35 Mon Sep 17 00:00:00 2001 From: Konstantine Karantasis Date: Mon, 29 Jun 2020 10:42:36 -0700 Subject: [PATCH 2/8] MINOR: Upgrade jetty to 9.4.30.v20200611 and jersey to 2.31 (#184) Aligns versions with AK. See also: https://github.com/apache/kafka/pull/8893 Jetty is currently upgraded to a version higher than what Jersey depends on, because the renaming of Response#closeOutput to Response#completeOutput has been reversed in recent versions. --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index ab6c7a3695..132cafcd70 100644 --- a/pom.xml +++ b/pom.xml @@ -50,8 +50,8 @@ 1.1.1 4.5.3 http://packages.confluent.io/maven/ - 2.30 - 9.4.24.v20191120 + 2.31 + 9.4.30.v20200611 2.2.0 24.0-jre checkstyle/suppressions.xml From 8bbac4391f4598104c6d1b21b9bf404f393ef415 Mon Sep 17 00:00:00 2001 From: xiaodongdu Date: Thu, 23 Jul 2020 09:48:53 -0700 Subject: [PATCH 3/8] Fix build failure (#194) --- core/src/test/java/io/confluent/rest/ApiHeadersTest.java | 2 +- core/src/test/java/io/confluent/rest/SslTest.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/java/io/confluent/rest/ApiHeadersTest.java b/core/src/test/java/io/confluent/rest/ApiHeadersTest.java index ef5a0e13eb..e91adb8012 100644 --- a/core/src/test/java/io/confluent/rest/ApiHeadersTest.java +++ b/core/src/test/java/io/confluent/rest/ApiHeadersTest.java @@ -128,7 +128,7 @@ private static void createKeystoreWithCert(File file, String alias, Map certs) throws Exception { KeyPair keypair = TestSslUtils.generateKeyPair("RSA"); CertificateBuilder certificateBuilder = new CertificateBuilder(30, "SHA1withRSA"); - X509Certificate cCert = certificateBuilder.sanDnsName("localhost") + X509Certificate cCert = certificateBuilder.sanDnsNames("localhost") .generate("CN=mymachine.local, O=A client", keypair); TestSslUtils.createKeyStore(file.getPath(), new Password(SSL_PASSWORD), alias, keypair.getPrivate(), cCert); certs.put(alias, cCert); @@ -123,7 +123,7 @@ private void enableSslClientAuth(Properties props) { private void createWrongKeystoreWithCert(File file, String alias, Map certs) throws Exception { KeyPair keypair = TestSslUtils.generateKeyPair("RSA"); CertificateBuilder certificateBuilder = new CertificateBuilder(30, "SHA1withRSA"); - X509Certificate cCert = certificateBuilder.sanDnsName("fail") + X509Certificate cCert = certificateBuilder.sanDnsNames("fail") .generate("CN=mymachine.local, O=A client", keypair); TestSslUtils.createKeyStore(file.getPath(), new Password(SSL_PASSWORD), alias, keypair.getPrivate(), cCert); certs.put(alias, cCert); From 2803b6aa2b2bbd5f56cee1179300ab48869ec08d Mon Sep 17 00:00:00 2001 From: Manikumar Reddy Date: Tue, 1 Sep 2020 20:09:48 +0530 Subject: [PATCH 4/8] CIAM-261: Add Jetty ThreadPool Metrics (#205) --- .../io/confluent/rest/ApplicationServer.java | 40 ++++++- .../rest/TestCustomizeThreadPool.java | 110 ++++++++++++++---- 2 files changed, 127 insertions(+), 23 deletions(-) diff --git a/core/src/main/java/io/confluent/rest/ApplicationServer.java b/core/src/main/java/io/confluent/rest/ApplicationServer.java index f3fa0a4c1d..0ea98fcff2 100644 --- a/core/src/main/java/io/confluent/rest/ApplicationServer.java +++ b/core/src/main/java/io/confluent/rest/ApplicationServer.java @@ -16,6 +16,8 @@ package io.confluent.rest; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Gauge; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.config.ConfigException; @@ -46,6 +48,7 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -166,6 +169,33 @@ private void attachMetricsListener(Metrics metrics, Map tags) { } } + private void addJettyThreadPoolMetrics(Metrics metrics, Map tags) { + //add metric for jetty thread pool queue size + String requestQueueSizeName = "request-queue-size"; + String metricGroupName = "jetty-metrics"; + + MetricName requestQueueSizeMetricName = metrics.metricName(requestQueueSizeName, + metricGroupName, "The number of requests in the jetty thread pool queue.", tags); + Gauge queueSize = (config, now) -> getQueueSize(); + metrics.addMetric(requestQueueSizeMetricName, queueSize); + + //add metric for thread pool busy thread count + String busyThreadCountName = "busy-thread-count"; + MetricName busyThreadCountMetricName = metrics.metricName(busyThreadCountName, + metricGroupName, "jetty thread pool busy thread count.", + tags); + Gauge busyThreadCount = (config, now) -> getBusyThreads(); + metrics.addMetric(busyThreadCountMetricName, busyThreadCount); + + //add metric for thread pool usage + String threadPoolUsageName = "thread-pool-usage"; + final MetricName threadPoolUsageMetricName = metrics.metricName(threadPoolUsageName, + metricGroupName, " jetty thread pool usage.", + Collections.emptyMap()); + Gauge threadPoolUsage = (config, now) -> (getBusyThreads() / (double) getMaxThreads()); + metrics.addMetric(threadPoolUsageMetricName, threadPoolUsage); + } + private void finalizeHandlerCollection(HandlerCollection handlers, HandlerCollection wsHandlers) { /* DefaultHandler must come last eo ensure all contexts * have a chance to handle a request first */ @@ -193,6 +223,7 @@ protected final void doStart() throws Exception { HandlerCollection wsHandlers = new HandlerCollection(); for (Application app : applications.getApplications()) { attachMetricsListener(app.getMetrics(), app.getMetricsTags()); + addJettyThreadPoolMetrics(app.getMetrics(), app.getMetricsTags()); handlers.addHandler(app.configureHandler()); wsHandlers.addHandler(app.configureWebSocketHandler()); } @@ -400,6 +431,13 @@ public int getThreads() { return getThreadPool().getThreads(); } + /** + * @return number of busy threads in the pool. + */ + public int getBusyThreads() { + return ((QueuedThreadPool)getThreadPool()).getBusyThreads(); + } + /** * For unit testing. * @@ -410,8 +448,6 @@ public int getMaxThreads() { } /** - * For unit testing. - * * @return the size of the queue in the pool. */ public int getQueueSize() { diff --git a/core/src/test/java/io/confluent/rest/TestCustomizeThreadPool.java b/core/src/test/java/io/confluent/rest/TestCustomizeThreadPool.java index a51a577c16..07918e6609 100644 --- a/core/src/test/java/io/confluent/rest/TestCustomizeThreadPool.java +++ b/core/src/test/java/io/confluent/rest/TestCustomizeThreadPool.java @@ -20,8 +20,16 @@ import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.test.TestUtils; import org.junit.Test; + +import java.util.Map; +import java.util.Optional; import java.util.Properties; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.RejectedExecutionException; import javax.ws.rs.GET; import javax.ws.rs.Path; @@ -119,12 +127,78 @@ public void testQueueFull() throws Exception { } } + @Test + public void testJettyThreadPoolMetrics() throws Exception { + RestResource.latch = new CountDownLatch(1); + TestCustomizeThreadPoolApplication app = new TestCustomizeThreadPoolApplication(); + String uri = app.getUri(); + try { + app.start(); + assertEquals(0, getIntMetricValue(app.metrics, "request-queue-size")); + + //send 18 requests: queueSize (8) + threads (10) + int numThread = 18; + Thread[] threads = sendRequests(uri + "/custom/resource", numThread); + TestUtils.waitForCondition(() -> app.server.getQueueSize() == 8, "Queue is not full"); + assertEquals(8, getIntMetricValue(app.metrics, "request-queue-size")); + assertEquals(10, getIntMetricValue(app.metrics, "busy-thread-count")); + assertEquals(1.0, getDoubleMetricValue(app.metrics, "thread-pool-usage"), 0.0); + + RestResource.latch.countDown(); + for(int i = 0; i < numThread; i++) { + threads[i].join(); + } + + TestUtils.waitForCondition(() -> app.server.getQueueSize() == 0, "Queue is not empty"); + assertEquals(0, getIntMetricValue(app.metrics, "request-queue-size")); + assertTrue(getDoubleMetricValue(app.metrics, "thread-pool-usage") > 0); + assertTrue(getDoubleMetricValue(app.metrics, "thread-pool-usage") < 1); + } finally { + RestResource.latch = null; + app.stop(); + } + } + + public static int getIntMetricValue(Metrics metrics, String attribute) { + Map allMetrics = metrics.metrics(); + Optional metric = allMetrics.entrySet().stream().filter((m) -> { + return m.getKey().name().equals(attribute); + }).map(Map.Entry::getValue).findFirst(); + return metric.isPresent() ? (Integer) metric.get().metricValue() : -1; + } + + public static double getDoubleMetricValue(Metrics metrics, String attribute) { + Map allMetrics = metrics.metrics(); + Optional metric = allMetrics.entrySet().stream().filter((m) -> { + return m.getKey().name().equals(attribute); + }).map(Map.Entry::getValue).findFirst(); + return metric.isPresent() ? (Double) metric.get().metricValue() : -1; + } + /** * Simulate multiple HTTP clients sending HTTP requests same time. Each client will send one HTTP request. * The requests will be put in queue if the number of clients are more than the working threads. * */ @SuppressWarnings("SameParameterValue") private void makeConcurrentGetRequests(String uri, int numThread, TestCustomizeThreadPoolApplication app) throws Exception { + Thread[] threads = sendRequests(uri, numThread); + + long startingTime = System.currentTimeMillis(); + while(System.currentTimeMillis() - startingTime < 360*1000) { + log.info("Queue size {}, queue capacity {} ", app.getServer().getQueueSize(), app.getServer().getQueueCapacity()); + assertTrue("Number of jobs in queue is not more than capacity of queue ", app.getServer().getQueueSize() <= app.getServer().getQueueCapacity()); + Thread.sleep(2000); + if (app.getServer().getQueueSize() == 0) + break; + } + + for(int i = 0; i < numThread; i++) { + threads[i].join(); + } + log.info("End queue size {}, queue capacity {} ", app.getServer().getQueueSize(), app.getServer().getQueueCapacity()); + } + + private Thread[] sendRequests(final String uri, final int numThread) { Thread[] threads = new Thread[numThread]; for(int i = 0; i < numThread; i++) { threads[i] = new Thread() { @@ -134,7 +208,7 @@ public void run() { CloseableHttpResponse response = null; try { response = httpclient.execute(httpget); - HttpStatus.Code statusCode = HttpStatus.getCode(response.getStatusLine().getStatusCode()); + Code statusCode = HttpStatus.getCode(response.getStatusLine().getStatusCode()); log.info("Status code {}, reason {} ", statusCode, response.getStatusLine().getReasonPhrase()); assertThat(statusCode, is(Code.OK)); } catch (Exception e) { @@ -152,34 +226,28 @@ public void run() { threads[i].start(); } - - long startingTime = System.currentTimeMillis(); - while(System.currentTimeMillis() - startingTime < 360*1000) { - log.info("Queue size {}, queue capacity {} ", app.getServer().getQueueSize(), app.getServer().getQueueCapacity()); - assertTrue("Number of jobs in queue is not more than capacity of queue ", app.getServer().getQueueSize() <= app.getServer().getQueueCapacity()); - Thread.sleep(2000); - if (app.getServer().getQueueSize() == 0) - break; - } - - for(int i = 0; i < numThread; i++) { - threads[i].join(); - } - log.info("End queue size {}, queue capacity {} ", app.getServer().getQueueSize(), app.getServer().getQueueCapacity()); + return threads; } @Path("/custom") @Produces(MediaType.TEXT_PLAIN) public static class RestResource { + + static CountDownLatch latch = null; + @GET @Path("/resource") - public String get() { - synchronized(locker) { - try { - locker.wait(10000); - } catch (Exception e) { - log.info(e.getMessage()); + public String get() throws InterruptedException { + if (latch == null) { + synchronized(locker) { + try { + locker.wait(10000); + } catch (Exception e) { + log.info(e.getMessage()); + } } + } else { + latch.await(); } return "ThreadPool"; } From b275c66b9b3b80a5c431e0648b8d5c5ff9e61f11 Mon Sep 17 00:00:00 2001 From: Manikumar Reddy Date: Wed, 7 Oct 2020 15:54:58 +0530 Subject: [PATCH 5/8] MINOR: Ignore flaky testJettyThreadPoolMetrics test --- .../test/java/io/confluent/rest/TestCustomizeThreadPool.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/test/java/io/confluent/rest/TestCustomizeThreadPool.java b/core/src/test/java/io/confluent/rest/TestCustomizeThreadPool.java index 07918e6609..4e5cc8c498 100644 --- a/core/src/test/java/io/confluent/rest/TestCustomizeThreadPool.java +++ b/core/src/test/java/io/confluent/rest/TestCustomizeThreadPool.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.test.TestUtils; +import org.junit.Ignore; import org.junit.Test; import java.util.Map; @@ -128,6 +129,7 @@ public void testQueueFull() throws Exception { } @Test + @Ignore public void testJettyThreadPoolMetrics() throws Exception { RestResource.latch = new CountDownLatch(1); TestCustomizeThreadPoolApplication app = new TestCustomizeThreadPoolApplication(); From cd4aec9475c7f231b6c25d09374d2d12a38b22ee Mon Sep 17 00:00:00 2001 From: Matthew Wong Date: Tue, 13 Oct 2020 16:38:02 -0700 Subject: [PATCH 6/8] MINOR: Fix compilation error from createKeyStore (#207) confluentinc/kafka@7be8bd8#diff-10d99f28c40b6cd9640f95b1c763a7b2b4daaa0209249e1a724698f28a90df9bL118-L126 Removed the function definition of createKeyStore that tests in this repo were using. This PR fixes the compilation errors by conforming to the remaining definition of createKeyStore that required two Password args. --- core/src/test/java/io/confluent/rest/ApiHeadersTest.java | 2 +- core/src/test/java/io/confluent/rest/SslTest.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/java/io/confluent/rest/ApiHeadersTest.java b/core/src/test/java/io/confluent/rest/ApiHeadersTest.java index e91adb8012..54299c2182 100644 --- a/core/src/test/java/io/confluent/rest/ApiHeadersTest.java +++ b/core/src/test/java/io/confluent/rest/ApiHeadersTest.java @@ -130,7 +130,7 @@ private static void createKeystoreWithCert(File file, String alias, Map Date: Fri, 16 Oct 2020 17:55:33 -0700 Subject: [PATCH 7/8] ST-3461: Nano versioning (#203) --- Jenkinsfile | 6 +++++- core/pom.xml | 2 +- examples/pom.xml | 4 +++- package/pom.xml | 3 ++- pom.xml | 8 +++++--- test/pom.xml | 3 ++- 6 files changed, 18 insertions(+), 8 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index dd6a5e583a..fba315435a 100755 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -2,5 +2,9 @@ common { slackChannel = '#c3-alerts' - upstreamProjects = 'confluentinc/common' + downStreamRepos = ["schema-registry", "metadata-service", "kafka-rest", + "confluent-security-plugins", "ce-kafka-http-server", "secret-registry", + "confluent-cloud-plugins"] + nanoVersion = true } +//change \ No newline at end of file diff --git a/core/pom.xml b/core/pom.xml index 8599bd87d2..a320ebe77a 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -7,7 +7,7 @@ io.confluent rest-utils-parent - 6.1.0-SNAPSHOT + 6.1.0-0 rest-utils diff --git a/examples/pom.xml b/examples/pom.xml index f23348a339..01919419e3 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -8,7 +8,7 @@ rest-utils-parent io.confluent - 6.1.0-SNAPSHOT + 6.1.0-0 rest-utils-examples @@ -19,11 +19,13 @@ io.confluent rest-utils + ${io.confluent.rest-utils.version} io.confluent rest-utils-test + ${io.confluent.rest-utils.version} test diff --git a/package/pom.xml b/package/pom.xml index 9fe549e51f..cadaf48001 100644 --- a/package/pom.xml +++ b/package/pom.xml @@ -7,7 +7,7 @@ io.confluent rest-utils-parent - 6.1.0-SNAPSHOT + 6.1.0-0 rest-utils-package @@ -19,6 +19,7 @@ io.confluent rest-utils + ${io.confluent.rest-utils.version} diff --git a/pom.xml b/pom.xml index a718715f92..89590497b9 100644 --- a/pom.xml +++ b/pom.xml @@ -8,12 +8,13 @@ io.confluent common - 6.1.0-SNAPSHOT + [6.1.0-0, 6.1.1-0) rest-utils-parent pom rest-utils-parent + 6.1.0-0 Confluent, Inc. http://confluent.io @@ -54,6 +55,7 @@ 9.4.30.v20200611 2.2.0 checkstyle/suppressions.xml + 6.1.0-0 @@ -70,12 +72,12 @@ io.confluent rest-utils - ${project.version} + ${io.confluent.rest-utils.version} io.confluent rest-utils-test - ${project.version} + ${io.confluent.rest-utils.version} test diff --git a/test/pom.xml b/test/pom.xml index fa08a40a10..2dec383106 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -8,7 +8,7 @@ io.confluent rest-utils-parent - 6.1.0-SNAPSHOT + 6.1.0-0 rest-utils-test @@ -19,6 +19,7 @@ io.confluent rest-utils + ${io.confluent.rest-utils.version} junit From 9411c4c5b2d600b93bfa617e2f5474d7a57e4e43 Mon Sep 17 00:00:00 2001 From: AdiWehrli Date: Thu, 4 Feb 2021 06:11:44 +0100 Subject: [PATCH 8/8] Moved error logging into the if block An exception should only be logged, if the debug flag is set to true. JavaDoc also states: > Abstract exception mapper that checks the debug flag and generates an error message including the stack trace if it is enabled. --- .../io/confluent/rest/exceptions/DebuggableExceptionMapper.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/io/confluent/rest/exceptions/DebuggableExceptionMapper.java b/core/src/main/java/io/confluent/rest/exceptions/DebuggableExceptionMapper.java index c865c7cd2d..003b115c08 100644 --- a/core/src/main/java/io/confluent/rest/exceptions/DebuggableExceptionMapper.java +++ b/core/src/main/java/io/confluent/rest/exceptions/DebuggableExceptionMapper.java @@ -59,9 +59,9 @@ public DebuggableExceptionMapper(RestConfig restConfig) { */ public Response.ResponseBuilder createResponse(Throwable exc, int errorCode, Response.Status status, String msg) { - log.error("Request Failed with exception " , exc); String readableMessage = msg; if (restConfig != null && restConfig.getBoolean(RestConfig.DEBUG_CONFIG)) { + log.error("Request Failed with exception " , exc); readableMessage += " " + exc.getClass().getName() + ": " + exc.getMessage(); try { ByteArrayOutputStream os = new ByteArrayOutputStream();