diff --git a/build.gradle b/build.gradle index 75a4354cc9439..4dbe7d7907c99 100644 --- a/build.gradle +++ b/build.gradle @@ -1453,6 +1453,8 @@ project(':connect:runtime') { testCompile libs.powermockEasymock testCompile project(':clients').sourceSets.test.output + testCompile project(':core') + testCompile project(':core').sourceSets.test.output testRuntime libs.slf4jlog4j } diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index c69f94d25bbce..8c98f8d447865 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -347,8 +347,6 @@ - - @@ -366,6 +364,18 @@ + + + + + + + + + + + + diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java index dd43c37cadda5..a6c6d98facacc 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.runtime.Connect; +import org.apache.kafka.connect.runtime.HerderProvider; import org.apache.kafka.connect.runtime.Worker; import org.apache.kafka.connect.runtime.WorkerConfigTransformer; import org.apache.kafka.connect.runtime.WorkerInfo; @@ -54,62 +55,26 @@ public class ConnectDistributed { private static final Logger log = LoggerFactory.getLogger(ConnectDistributed.class); + private final Time time = Time.SYSTEM; + private final long initStart = time.hiResClockMs(); + public static void main(String[] args) { + if (args.length < 1 || Arrays.asList(args).contains("--help")) { log.info("Usage: ConnectDistributed worker.properties"); Exit.exit(1); } try { - Time time = Time.SYSTEM; - log.info("Kafka Connect distributed worker initializing ..."); - long initStart = time.hiResClockMs(); WorkerInfo initInfo = new WorkerInfo(); initInfo.logAll(); String workerPropsFile = args[0]; Map workerProps = !workerPropsFile.isEmpty() ? - Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.emptyMap(); - - log.info("Scanning for plugin classes. This might take a moment ..."); - Plugins plugins = new Plugins(workerProps); - plugins.compareAndSwapWithDelegatingLoader(); - DistributedConfig config = new DistributedConfig(workerProps); - - String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config); - log.debug("Kafka cluster ID: {}", kafkaClusterId); - - RestServer rest = new RestServer(config); - URI advertisedUrl = rest.advertisedUrl(); - String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort(); - - KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(); - offsetBackingStore.configure(config); - - Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore); - WorkerConfigTransformer configTransformer = worker.configTransformer(); - - Converter internalValueConverter = worker.getInternalValueConverter(); - StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter); - statusBackingStore.configure(config); - - ConfigBackingStore configBackingStore = new KafkaConfigBackingStore( - internalValueConverter, - config, - configTransformer); - - DistributedHerder herder = new DistributedHerder(config, time, worker, - kafkaClusterId, statusBackingStore, configBackingStore, - advertisedUrl.toString()); - final Connect connect = new Connect(herder, rest); - log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart); - try { - connect.start(); - } catch (Exception e) { - log.error("Failed to start Connect", e); - connect.stop(); - Exit.exit(3); - } + Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.emptyMap(); + + ConnectDistributed connectDistributed = new ConnectDistributed(); + Connect connect = connectDistributed.startConnect(workerProps); // Shutdown will be triggered by Ctrl-C or via HTTP shutdown request connect.awaitStop(); @@ -119,4 +84,55 @@ public static void main(String[] args) { Exit.exit(2); } } + + public Connect startConnect(Map workerProps) { + log.info("Scanning for plugin classes. This might take a moment ..."); + Plugins plugins = new Plugins(workerProps); + plugins.compareAndSwapWithDelegatingLoader(); + DistributedConfig config = new DistributedConfig(workerProps); + + String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config); + log.debug("Kafka cluster ID: {}", kafkaClusterId); + + RestServer rest = new RestServer(config); + HerderProvider provider = new HerderProvider(); + rest.start(provider, plugins); + + URI advertisedUrl = rest.advertisedUrl(); + String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort(); + + KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(); + offsetBackingStore.configure(config); + + Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore); + WorkerConfigTransformer configTransformer = worker.configTransformer(); + + Converter internalValueConverter = worker.getInternalValueConverter(); + StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter); + statusBackingStore.configure(config); + + ConfigBackingStore configBackingStore = new KafkaConfigBackingStore( + internalValueConverter, + config, + configTransformer); + + DistributedHerder herder = new DistributedHerder(config, time, worker, + kafkaClusterId, statusBackingStore, configBackingStore, + advertisedUrl.toString()); + + final Connect connect = new Connect(herder, rest); + log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart); + try { + connect.start(); + // herder has initialized now, and ready to be used by the RestServer. + provider.setHerder(herder); + } catch (Exception e) { + log.error("Failed to start Connect", e); + connect.stop(); + Exit.exit(3); + } + + return connect; + } + } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java index 846ed1a883570..965046cccf003 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java @@ -20,6 +20,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.URI; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; @@ -50,7 +51,6 @@ public void start() { Runtime.getRuntime().addShutdownHook(shutdownHook); herder.start(); - rest.start(herder); log.info("Kafka Connect started"); } finally { @@ -82,6 +82,11 @@ public void awaitStop() { } } + // Visible for testing + public URI restUrl() { + return rest.serverUrl(); + } + private class ShutdownHook extends Thread { @Override public void run() { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderProvider.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderProvider.java new file mode 100644 index 0000000000000..42c0925a704a2 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderProvider.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.connect.errors.ConnectException; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * A supplier for {@link Herder}s. + */ +public class HerderProvider { + + private final CountDownLatch initialized = new CountDownLatch(1); + volatile Herder herder = null; + + public HerderProvider() { + } + + /** + * Create a herder provider with a herder. + * @param herder the herder that will be supplied to threads waiting on this provider + */ + public HerderProvider(Herder herder) { + this.herder = herder; + initialized.countDown(); + } + + /** + * @return the contained herder. + * @throws ConnectException if a herder was not available within a duration of calling this method + */ + public Herder get() { + try { + // wait for herder to be initialized + if (!initialized.await(1, TimeUnit.MINUTES)) { + throw new ConnectException("Timed out waiting for herder to be initialized."); + } + } catch (InterruptedException e) { + throw new ConnectException("Interrupted while waiting for herder to be initialized.", e); + } + return herder; + } + + /** + * @param herder set a herder, and signal to all threads waiting on get(). + */ + public void setHerder(Herder herder) { + this.herder = herder; + initialized.countDown(); + } + +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java index a0f7fdeea9fc8..ea93a72d5006e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java @@ -22,7 +22,7 @@ import org.apache.kafka.connect.health.ConnectorState; import org.apache.kafka.connect.health.ConnectorType; import org.apache.kafka.connect.health.TaskState; -import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.HerderProvider; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.util.Callback; @@ -34,16 +34,16 @@ public class ConnectClusterStateImpl implements ConnectClusterState { - private Herder herder; + private HerderProvider herderProvider; - public ConnectClusterStateImpl(Herder herder) { - this.herder = herder; + public ConnectClusterStateImpl(HerderProvider herderProvider) { + this.herderProvider = herderProvider; } @Override public Collection connectors() { final Collection connectors = new ArrayList<>(); - herder.connectors(new Callback>() { + herderProvider.get().connectors(new Callback>() { @Override public void onCompletion(Throwable error, Collection result) { connectors.addAll(result); @@ -55,7 +55,7 @@ public void onCompletion(Throwable error, Collection result) { @Override public ConnectorHealth connectorHealth(String connName) { - ConnectorStateInfo state = herder.connectorStatus(connName); + ConnectorStateInfo state = herderProvider.get().connectorStatus(connName); ConnectorState connectorState = new ConnectorState( state.connector().state(), state.connector().workerId(), diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java index c0d83f291039e..5cc31cd5cca3c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java @@ -17,14 +17,14 @@ package org.apache.kafka.connect.runtime.rest; import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; - import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.rest.ConnectRestExtension; import org.apache.kafka.connect.rest.ConnectRestExtensionContext; -import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.HerderProvider; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.health.ConnectClusterStateImpl; +import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper; import org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource; import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource; @@ -50,6 +50,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.servlet.DispatcherType; +import javax.ws.rs.core.UriBuilder; import java.io.IOException; import java.net.URI; import java.util.ArrayList; @@ -60,9 +62,6 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; -import javax.servlet.DispatcherType; -import javax.ws.rs.core.UriBuilder; - /** * Embedded server for the REST API that provides the control plane for Kafka Connect workers. */ @@ -161,20 +160,20 @@ public Connector createConnector(String listener) { return connector; } - public void start(Herder herder) { + public void start(HerderProvider herderProvider, Plugins plugins) { log.info("Starting REST server"); ResourceConfig resourceConfig = new ResourceConfig(); resourceConfig.register(new JacksonJsonProvider()); - resourceConfig.register(new RootResource(herder)); - resourceConfig.register(new ConnectorsResource(herder, config)); - resourceConfig.register(new ConnectorPluginsResource(herder)); + resourceConfig.register(new RootResource(herderProvider)); + resourceConfig.register(new ConnectorsResource(herderProvider, config)); + resourceConfig.register(new ConnectorPluginsResource(herderProvider)); resourceConfig.register(ConnectExceptionMapper.class); resourceConfig.property(ServerProperties.WADL_FEATURE_DISABLE, true); - registerRestExtensions(herder, resourceConfig); + registerRestExtensions(herderProvider, plugins, resourceConfig); ServletContainer servletContainer = new ServletContainer(resourceConfig); ServletHolder servletHolder = new ServletHolder(servletContainer); @@ -220,7 +219,9 @@ public void start(Herder herder) { log.info("REST server listening at " + jettyServer.getURI() + ", advertising URL " + advertisedUrl()); } - + public URI serverUrl() { + return jettyServer.getURI(); + } public void stop() { log.info("Stopping REST server"); @@ -264,7 +265,7 @@ else if (serverConnector != null && serverConnector.getHost() != null && serverC Integer advertisedPort = config.getInt(WorkerConfig.REST_ADVERTISED_PORT_CONFIG); if (advertisedPort != null) builder.port(advertisedPort); - else if (serverConnector != null) + else if (serverConnector != null && serverConnector.getPort() > 0) builder.port(serverConnector.getPort()); log.info("Advertised URI: {}", builder.build()); @@ -302,15 +303,15 @@ ServerConnector findConnector(String protocol) { return null; } - void registerRestExtensions(Herder herder, ResourceConfig resourceConfig) { - connectRestExtensions = herder.plugins().newPlugins( + void registerRestExtensions(HerderProvider provider, Plugins plugins, ResourceConfig resourceConfig) { + connectRestExtensions = plugins.newPlugins( config.getList(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG), config, ConnectRestExtension.class); ConnectRestExtensionContext connectRestExtensionContext = new ConnectRestExtensionContextImpl( new ConnectRestConfigurable(resourceConfig), - new ConnectClusterStateImpl(herder) + new ConnectClusterStateImpl(provider) ); for (ConnectRestExtension connectRestExtension : connectRestExtensions) { connectRestExtension.register(connectRestExtensionContext); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java index 80192ca064489..6280473af964d 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java @@ -90,7 +90,10 @@ public String trace() { } public static class ConnectorState extends AbstractState { - public ConnectorState(String state, String worker, String msg) { + @JsonCreator + public ConnectorState(@JsonProperty("state") String state, + @JsonProperty("worker_id") String worker, + @JsonProperty("msg") String msg) { super(state, worker, msg); } } @@ -98,7 +101,11 @@ public ConnectorState(String state, String worker, String msg) { public static class TaskState extends AbstractState implements Comparable { private final int id; - public TaskState(int id, String state, String worker, String msg) { + @JsonCreator + public TaskState(@JsonProperty("id") int id, + @JsonProperty("state") String state, + @JsonProperty("worker_id") String worker, + @JsonProperty("msg") String msg) { super(state, worker, msg); this.id = id; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java index 24eb93b8c0d5f..87f25b29cb51a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java @@ -18,7 +18,7 @@ import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.runtime.ConnectorConfig; -import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.HerderProvider; import org.apache.kafka.connect.runtime.isolation.PluginDesc; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo; @@ -49,7 +49,7 @@ public class ConnectorPluginsResource { private static final String ALIAS_SUFFIX = "Connector"; - private final Herder herder; + private final HerderProvider herderProvider; private final List connectorPlugins; private static final List> CONNECTOR_EXCLUDES = Arrays.asList( @@ -58,8 +58,8 @@ public class ConnectorPluginsResource { SchemaSourceConnector.class ); - public ConnectorPluginsResource(Herder herder) { - this.herder = herder; + public ConnectorPluginsResource(HerderProvider herderProvider) { + this.herderProvider = herderProvider; this.connectorPlugins = new ArrayList<>(); } @@ -78,7 +78,7 @@ public ConfigInfos validateConfigs( ); } - return herder.validateConnectorConfig(connectorConfig); + return herderProvider.get().validateConnectorConfig(connectorConfig); } @GET @@ -90,7 +90,7 @@ public List listConnectorPlugins() { // TODO: improve once plugins are allowed to be added/removed during runtime. private synchronized List getConnectorPlugins() { if (connectorPlugins.isEmpty()) { - for (PluginDesc plugin : herder.plugins().connectors()) { + for (PluginDesc plugin : herderProvider.get().plugins().connectors()) { if (!CONNECTOR_EXCLUDES.contains(plugin.pluginClass())) { connectorPlugins.add(new ConnectorPluginInfo(plugin)); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java index e9661046d31e9..29a8c39028ef7 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.HerderProvider; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException; import org.apache.kafka.connect.runtime.distributed.RequestTargetException; @@ -67,21 +68,25 @@ public class ConnectorsResource { // but currently a worker simply leaving the group can take this long as well. private static final long REQUEST_TIMEOUT_MS = 90 * 1000; - private final Herder herder; + private final HerderProvider herderProvider; private final WorkerConfig config; @javax.ws.rs.core.Context private ServletContext context; - public ConnectorsResource(Herder herder, WorkerConfig config) { - this.herder = herder; + public ConnectorsResource(HerderProvider herder, WorkerConfig config) { + this.herderProvider = herder; this.config = config; } + private Herder herder() { + return herderProvider.get(); + } + @GET @Path("/") public Collection listConnectors(final @QueryParam("forward") Boolean forward) throws Throwable { FutureCallback> cb = new FutureCallback<>(); - herder.connectors(cb); + herder().connectors(cb); return completeOrForwardRequest(cb, "/connectors", "GET", null, new TypeReference>() { }, forward); } @@ -99,7 +104,7 @@ public Response createConnector(final @QueryParam("forward") Boolean forward, checkAndPutConnectorConfigName(name, configs); FutureCallback> cb = new FutureCallback<>(); - herder.putConnectorConfig(name, configs, false, cb); + herder().putConnectorConfig(name, configs, false, cb); Herder.Created info = completeOrForwardRequest(cb, "/connectors", "POST", createRequest, new TypeReference() { }, new CreatedConnectorInfoTranslator(), forward); @@ -112,7 +117,7 @@ public Response createConnector(final @QueryParam("forward") Boolean forward, public ConnectorInfo getConnector(final @PathParam("connector") String connector, final @QueryParam("forward") Boolean forward) throws Throwable { FutureCallback cb = new FutureCallback<>(); - herder.connectorInfo(connector, cb); + herder().connectorInfo(connector, cb); return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", null, forward); } @@ -121,14 +126,14 @@ public ConnectorInfo getConnector(final @PathParam("connector") String connector public Map getConnectorConfig(final @PathParam("connector") String connector, final @QueryParam("forward") Boolean forward) throws Throwable { FutureCallback> cb = new FutureCallback<>(); - herder.connectorConfig(connector, cb); + herder().connectorConfig(connector, cb); return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", null, forward); } @GET @Path("/{connector}/status") public ConnectorStateInfo getConnectorStatus(final @PathParam("connector") String connector) throws Throwable { - return herder.connectorStatus(connector); + return herder().connectorStatus(connector); } @PUT @@ -139,7 +144,7 @@ public Response putConnectorConfig(final @PathParam("connector") String connecto FutureCallback> cb = new FutureCallback<>(); checkAndPutConnectorConfigName(connector, connectorConfig); - herder.putConnectorConfig(connector, connectorConfig, true, cb); + herder().putConnectorConfig(connector, connectorConfig, true, cb); Herder.Created createdInfo = completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "PUT", connectorConfig, new TypeReference() { }, new CreatedConnectorInfoTranslator(), forward); Response.ResponseBuilder response; @@ -157,21 +162,21 @@ public Response putConnectorConfig(final @PathParam("connector") String connecto public void restartConnector(final @PathParam("connector") String connector, final @QueryParam("forward") Boolean forward) throws Throwable { FutureCallback cb = new FutureCallback<>(); - herder.restartConnector(connector, cb); + herder().restartConnector(connector, cb); completeOrForwardRequest(cb, "/connectors/" + connector + "/restart", "POST", null, forward); } @PUT @Path("/{connector}/pause") public Response pauseConnector(@PathParam("connector") String connector) { - herder.pauseConnector(connector); + herder().pauseConnector(connector); return Response.accepted().build(); } @PUT @Path("/{connector}/resume") public Response resumeConnector(@PathParam("connector") String connector) { - herder.resumeConnector(connector); + herder().resumeConnector(connector); return Response.accepted().build(); } @@ -180,7 +185,7 @@ public Response resumeConnector(@PathParam("connector") String connector) { public List getTaskConfigs(final @PathParam("connector") String connector, final @QueryParam("forward") Boolean forward) throws Throwable { FutureCallback> cb = new FutureCallback<>(); - herder.taskConfigs(connector, cb); + herder().taskConfigs(connector, cb); return completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "GET", null, new TypeReference>() { }, forward); } @@ -191,7 +196,7 @@ public void putTaskConfigs(final @PathParam("connector") String connector, final @QueryParam("forward") Boolean forward, final List> taskConfigs) throws Throwable { FutureCallback cb = new FutureCallback<>(); - herder.putTaskConfigs(connector, taskConfigs, cb); + herder().putTaskConfigs(connector, taskConfigs, cb); completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "POST", taskConfigs, forward); } @@ -199,7 +204,7 @@ public void putTaskConfigs(final @PathParam("connector") String connector, @Path("/{connector}/tasks/{task}/status") public ConnectorStateInfo.TaskState getTaskStatus(final @PathParam("connector") String connector, final @PathParam("task") Integer task) throws Throwable { - return herder.taskStatus(new ConnectorTaskId(connector, task)); + return herder().taskStatus(new ConnectorTaskId(connector, task)); } @POST @@ -209,7 +214,7 @@ public void restartTask(final @PathParam("connector") String connector, final @QueryParam("forward") Boolean forward) throws Throwable { FutureCallback cb = new FutureCallback<>(); ConnectorTaskId taskId = new ConnectorTaskId(connector, task); - herder.restartTask(taskId, cb); + herder().restartTask(taskId, cb); completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks/" + task + "/restart", "POST", null, forward); } @@ -218,7 +223,7 @@ public void restartTask(final @PathParam("connector") String connector, public void destroyConnector(final @PathParam("connector") String connector, final @QueryParam("forward") Boolean forward) throws Throwable { FutureCallback> cb = new FutureCallback<>(); - herder.deleteConnectorConfig(connector, cb); + herder().deleteConnectorConfig(connector, cb); completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", null, forward); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java index 9666bf15954f9..56516cd410924 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.connect.runtime.rest.resources; -import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.HerderProvider; import org.apache.kafka.connect.runtime.rest.entities.ServerInfo; import javax.ws.rs.GET; @@ -28,15 +28,15 @@ @Produces(MediaType.APPLICATION_JSON) public class RootResource { - private final Herder herder; + private final HerderProvider herder; - public RootResource(Herder herder) { + public RootResource(HerderProvider herder) { this.herder = herder; } @GET @Path("/") public ServerInfo serverInfo() { - return new ServerInfo(herder.kafkaClusterId()); + return new ServerInfo(herder.get().kafkaClusterId()); } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java new file mode 100644 index 0000000000000..e59691b843d09 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.integration; + +import org.apache.kafka.connect.errors.DataException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * A handle to a connector executing in a Connect cluster. + */ +public class ConnectorHandle { + + private static final Logger log = LoggerFactory.getLogger(ConnectorHandle.class); + + private final String connectorName; + private final Map taskHandles = new ConcurrentHashMap<>(); + + private CountDownLatch recordsRemainingLatch; + private int expectedRecords = -1; + + public ConnectorHandle(String connectorName) { + this.connectorName = connectorName; + } + + /** + * Get or create a task handle for a given task id. The task need not be created when this method is called. If the + * handle is called before the task is created, the task will bind to the handle once it starts (or restarts). + * + * @param taskId the task id + * @return a non-null {@link TaskHandle} + */ + public TaskHandle taskHandle(String taskId) { + return taskHandles.computeIfAbsent(taskId, k -> new TaskHandle(this, taskId)); + } + + public Collection tasks() { + return taskHandles.values(); + } + + /** + * Delete the task handle for this task id. + * + * @param taskId the task id. + */ + public void deleteTask(String taskId) { + log.info("Removing handle for {} task in connector {}", taskId, connectorName); + taskHandles.remove(taskId); + } + + /** + * Set the number of expected records for this task. + * + * @param expectedRecords number of records + */ + public void expectedRecords(int expectedRecords) { + this.expectedRecords = expectedRecords; + this.recordsRemainingLatch = new CountDownLatch(expectedRecords); + } + + /** + * Record a message arrival at the connector. + */ + public void record() { + if (recordsRemainingLatch != null) { + recordsRemainingLatch.countDown(); + } + } + + /** + * Wait for this task to receive the expected number of records. + * + * @param consumeMaxDurationMs max duration to wait for records + * @throws InterruptedException if another threads interrupts this one while waiting for records + */ + public void awaitRecords(int consumeMaxDurationMs) throws InterruptedException { + if (recordsRemainingLatch == null || expectedRecords < 0) { + throw new IllegalStateException("expectedRecords() was not set for this task?"); + } + if (!recordsRemainingLatch.await(consumeMaxDurationMs, TimeUnit.MILLISECONDS)) { + String msg = String.format("Insufficient records seen by connector %s in %d millis. Records expected=%d, actual=%d", + connectorName, + consumeMaxDurationMs, + expectedRecords, + expectedRecords - recordsRemainingLatch.getCount()); + throw new DataException(msg); + } + } + + @Override + public String toString() { + return "ConnectorHandle{" + + "connectorName='" + connectorName + '\'' + + '}'; + } +} diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java new file mode 100644 index 0000000000000..af3ab4421a363 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.integration; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.errors.RetriableException; +import org.apache.kafka.connect.storage.StringConverter; +import org.apache.kafka.connect.transforms.Transformation; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.test.IntegrationTest; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.ERRORS_RETRY_TIMEOUT_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.ERRORS_TOLERANCE_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TRANSFORMS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.SinkConnectorConfig.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG; +import static org.apache.kafka.connect.runtime.SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG; +import static org.apache.kafka.connect.runtime.SinkConnectorConfig.DLQ_TOPIC_REPLICATION_FACTOR_CONFIG; +import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG; +import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION; +import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION_MESSAGE; +import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_ORIG_TOPIC; +import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Integration test for the different error handling policies in Connect (namely, retry policies, skipping bad records, + * and dead letter queues). + */ +@Category(IntegrationTest.class) +public class ErrorHandlingIntegrationTest { + + private static final Logger log = LoggerFactory.getLogger(ErrorHandlingIntegrationTest.class); + + private static final String DLQ_TOPIC = "my-connector-errors"; + private static final String CONNECTOR_NAME = "error-conn"; + private static final String TASK_ID = "error-conn-0"; + private static final int NUM_RECORDS_PRODUCED = 20; + private static final int EXPECTED_CORRECT_RECORDS = 19; + private static final int EXPECTED_INCORRECT_RECORDS = 1; + private static final int CONNECTOR_SETUP_DURATION_MS = 5000; + private static final int CONSUME_MAX_DURATION_MS = 5000; + + private EmbeddedConnectCluster connect; + private ConnectorHandle connectorHandle; + + @Before + public void setup() throws IOException { + // setup Connect cluster with defaults + connect = new EmbeddedConnectCluster.Builder().build(); + + // start Connect cluster + connect.start(); + + // get connector handles before starting test. + connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME); + } + + @After + public void close() { + RuntimeHandles.get().deleteConnector(CONNECTOR_NAME); + connect.stop(); + } + + @Test + public void testSkipRetryAndDLQWithHeaders() throws Exception { + // create test topic + connect.kafka().createTopic("test-topic"); + + // setup connector config + Map props = new HashMap<>(); + props.put(CONNECTOR_CLASS_CONFIG, "MonitorableSink"); + props.put(TASKS_MAX_CONFIG, "1"); + props.put(TOPICS_CONFIG, "test-topic"); + props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + props.put(TRANSFORMS_CONFIG, "failing_transform"); + props.put("transforms.failing_transform.type", FaultyPassthrough.class.getName()); + + // log all errors, along with message metadata + props.put(ERRORS_LOG_ENABLE_CONFIG, "true"); + props.put(ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); + + // produce bad messages into dead letter queue + props.put(DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC); + props.put(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true"); + props.put(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, "1"); + + // tolerate all erros + props.put(ERRORS_TOLERANCE_CONFIG, "all"); + + // retry for up to one second + props.put(ERRORS_RETRY_TIMEOUT_CONFIG, "1000"); + + // set expected records to successfully reach the task + connectorHandle.taskHandle(TASK_ID).expectedRecords(EXPECTED_CORRECT_RECORDS); + + connect.configureConnector(CONNECTOR_NAME, props); + + waitForCondition(() -> connect.connectorStatus(CONNECTOR_NAME).tasks().size() == 1 + && connectorHandle.taskHandle(TASK_ID).partitionsAssigned() == 1, + CONNECTOR_SETUP_DURATION_MS, + "Connector task was not assigned a partition."); + + // produce some strings into test topic + for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) { + connect.kafka().produce("test-topic", "key-" + i, "value-" + i); + } + + // consume all records from test topic + log.info("Consuming records from test topic"); + int i = 0; + for (ConsumerRecord rec : connect.kafka().consume(NUM_RECORDS_PRODUCED, CONSUME_MAX_DURATION_MS, "test-topic")) { + String k = new String(rec.key()); + String v = new String(rec.value()); + log.debug("Consumed record (key='{}', value='{}') from topic {}", k, v, rec.topic()); + assertEquals("Unexpected key", k, "key-" + i); + assertEquals("Unexpected value", v, "value-" + i); + i++; + } + + // wait for records to reach the task + connectorHandle.taskHandle(TASK_ID).awaitRecords(CONSUME_MAX_DURATION_MS); + + // consume failed records from dead letter queue topic + log.info("Consuming records from test topic"); + ConsumerRecords messages = connect.kafka().consume(EXPECTED_INCORRECT_RECORDS, CONSUME_MAX_DURATION_MS, DLQ_TOPIC); + for (ConsumerRecord recs : messages) { + log.debug("Consumed record (key={}, value={}) from dead letter queue topic {}", + new String(recs.key()), new String(recs.value()), DLQ_TOPIC); + assertTrue(recs.headers().toArray().length > 0); + assertValue("test-topic", recs.headers(), ERROR_HEADER_ORIG_TOPIC); + assertValue(RetriableException.class.getName(), recs.headers(), ERROR_HEADER_EXCEPTION); + assertValue("Error when value='value-7'", recs.headers(), ERROR_HEADER_EXCEPTION_MESSAGE); + } + + connect.deleteConnector(CONNECTOR_NAME); + } + + private void assertValue(String expected, Headers headers, String headerKey) { + byte[] actual = headers.lastHeader(headerKey).value(); + if (expected == null && actual == null) { + return; + } + if (expected == null || actual == null) { + fail(); + } + assertEquals(expected, new String(actual)); + } + + public static class FaultyPassthrough> implements Transformation { + + static final ConfigDef CONFIG_DEF = new ConfigDef(); + + /** + * An arbitrary id which causes this transformation to fail with a {@link RetriableException}, but succeeds + * on subsequent attempt. + */ + static final int BAD_RECORD_VAL_RETRIABLE = 4; + + /** + * An arbitrary id which causes this transformation to fail with a {@link RetriableException}. + */ + static final int BAD_RECORD_VAL = 7; + + private boolean shouldFail = true; + + @Override + public R apply(R record) { + String badValRetriable = "value-" + BAD_RECORD_VAL_RETRIABLE; + if (badValRetriable.equals(record.value()) && shouldFail) { + shouldFail = false; + throw new RetriableException("Error when value='" + badValRetriable + + "'. A reattempt with this record will succeed."); + } + String badVal = "value-" + BAD_RECORD_VAL; + if (badVal.equals(record.value())) { + throw new RetriableException("Error when value='" + badVal + "'"); + } + return record; + } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + + @Override + public void close() { + } + + @Override + public void configure(Map configs) { + } + } +} diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java new file mode 100644 index 0000000000000..5d887cf4cbf89 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.integration; + +import org.apache.kafka.connect.storage.StringConverter; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.test.IntegrationTest; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG; +import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG; +import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.junit.Assert.assertEquals; + +/** + * An example integration test that demonstrates how to setup an integration test for Connect. + *

+ * The following test configures and executes up a sink connector pipeline in a worker, produces messages into + * the source topic-partitions, and demonstrates how to check the overall behavior of the pipeline. + */ +@Category(IntegrationTest.class) +public class ExampleConnectIntegrationTest { + + private static final int NUM_RECORDS_PRODUCED = 2000; + private static final int NUM_TOPIC_PARTITIONS = 3; + private static final int CONSUME_MAX_DURATION_MS = 5000; + private static final int CONNECTOR_SETUP_DURATION_MS = 15000; + private static final String CONNECTOR_NAME = "simple-conn"; + + private EmbeddedConnectCluster connect; + private ConnectorHandle connectorHandle; + + @Before + public void setup() throws IOException { + // setup Connect worker properties + Map exampleWorkerProps = new HashMap<>(); + exampleWorkerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, "30000"); + + // setup Kafka broker properties + Properties exampleBrokerProps = new Properties(); + exampleBrokerProps.put("auto.create.topics.enable", "false"); + + // build a Connect cluster backed by Kakfa and Zk + connect = new EmbeddedConnectCluster.Builder() + .name("example-cluster") + .numWorkers(3) + .numBrokers(1) + .workerProps(exampleWorkerProps) + .brokerProps(exampleBrokerProps) + .build(); + + // start the clusters + connect.start(); + + // get a handle to the connector + connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME); + } + + @After + public void close() { + // delete connector handle + RuntimeHandles.get().deleteConnector(CONNECTOR_NAME); + + // stop all Connect, Kakfa and Zk threads. + connect.stop(); + } + + /** + * Simple test case to configure and execute an embedded Connect cluster. The test will produce and consume + * records, and start up a sink connector which will consume these records. + */ + @Test + public void testProduceConsumeConnector() throws Exception { + // create test topic + connect.kafka().createTopic("test-topic", NUM_TOPIC_PARTITIONS); + + // setup up props for the sink connector + Map props = new HashMap<>(); + props.put(CONNECTOR_CLASS_CONFIG, "MonitorableSink"); + props.put(TASKS_MAX_CONFIG, "3"); + props.put(TOPICS_CONFIG, "test-topic"); + props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + + // expect all records to be consumed by the connector + connectorHandle.expectedRecords(NUM_RECORDS_PRODUCED); + + // start a sink connector + connect.configureConnector(CONNECTOR_NAME, props); + + waitForCondition(() -> connect.connectorStatus(CONNECTOR_NAME).tasks().size() == 3 + && connectorHandle.tasks().stream().allMatch(th -> th.partitionsAssigned() == 1), + CONNECTOR_SETUP_DURATION_MS, + "Connector tasks were not assigned a partition each."); + + // produce some messages into source topic partitions + for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) { + connect.kafka().produce("test-topic", i % NUM_TOPIC_PARTITIONS, "key", "simple-message-value-" + i); + } + + // consume all records from the source topic or fail, to ensure that they were correctly produced. + assertEquals("Unexpected number of records consumed", NUM_RECORDS_PRODUCED, + connect.kafka().consume(NUM_RECORDS_PRODUCED, CONSUME_MAX_DURATION_MS, "test-topic").count()); + + // wait for the connector tasks to consume all records. + connectorHandle.awaitRecords(CONSUME_MAX_DURATION_MS); + + // delete connector + connect.deleteConnector(CONNECTOR_NAME); + } +} diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java new file mode 100644 index 0000000000000..23a8d99e84edc --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.integration; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.runtime.TestSinkConnector; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A connector to be used in integration tests. This class provides methods to find task instances + * which are initiated by the embedded connector, and wait for them to consume a desired number of + * messages. + */ +public class MonitorableSinkConnector extends TestSinkConnector { + + private static final Logger log = LoggerFactory.getLogger(MonitorableSinkConnector.class); + + private String connectorName; + + @Override + public void start(Map props) { + connectorName = props.get("name"); + log.info("Starting connector {}", props.get("name")); + } + + @Override + public Class taskClass() { + return MonitorableSinkTask.class; + } + + @Override + public List> taskConfigs(int maxTasks) { + List> configs = new ArrayList<>(); + for (int i = 0; i < maxTasks; i++) { + Map config = new HashMap<>(); + config.put("connector.name", connectorName); + config.put("task.id", connectorName + "-" + i); + configs.add(config); + } + return configs; + } + + @Override + public void stop() { + } + + @Override + public ConfigDef config() { + return new ConfigDef(); + } + + public static class MonitorableSinkTask extends SinkTask { + + private String connectorName; + private String taskId; + private TaskHandle taskHandle; + + @Override + public String version() { + return "unknown"; + } + + @Override + public void start(Map props) { + taskId = props.get("task.id"); + connectorName = props.get("connector.name"); + taskHandle = RuntimeHandles.get().connectorHandle(connectorName).taskHandle(taskId); + log.debug("Starting task {}", taskId); + } + + @Override + public void open(Collection partitions) { + log.debug("Opening {} partitions", partitions.size()); + super.open(partitions); + taskHandle.partitionsAssigned(partitions.size()); + } + + @Override + public void put(Collection records) { + for (SinkRecord rec : records) { + taskHandle.record(); + log.trace("Task {} obtained record (key='{}' value='{}')", taskId, rec.key(), rec.value()); + } + } + + @Override + public void stop() { + } + } +} diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RuntimeHandles.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RuntimeHandles.java new file mode 100644 index 0000000000000..c9900f3a7fb62 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RuntimeHandles.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.integration; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * A singleton class which provides a shared class for {@link ConnectorHandle}s and {@link TaskHandle}s that are + * required for integration tests. + */ +public class RuntimeHandles { + + private static final RuntimeHandles INSTANCE = new RuntimeHandles(); + + private final Map connectorHandles = new ConcurrentHashMap<>(); + + private RuntimeHandles() { + } + + /** + * @return the shared {@link RuntimeHandles} instance. + */ + public static RuntimeHandles get() { + return INSTANCE; + } + + /** + * Get or create a connector handle for a given connector name. The connector need not be running at the time + * this method is called. Once the connector is created, it will bind to this handle. Binding happens with the + * connectorName. + * + * @param connectorName the name of the connector + * @return a non-null {@link ConnectorHandle} + */ + public ConnectorHandle connectorHandle(String connectorName) { + return connectorHandles.computeIfAbsent(connectorName, k -> new ConnectorHandle(connectorName)); + } + + /** + * Delete the connector handle for this connector name. + * + * @param connectorName name of the connector + */ + public void deleteConnector(String connectorName) { + connectorHandles.remove(connectorName); + } + +} diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TaskHandle.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TaskHandle.java new file mode 100644 index 0000000000000..de3d9240d1be7 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TaskHandle.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.integration; + +import org.apache.kafka.connect.errors.DataException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A handle to an executing task in a worker. Use this class to record progress, for example: number of records seen + * by the task using so far, or waiting for partitions to be assigned to the task. + */ +public class TaskHandle { + + private static final Logger log = LoggerFactory.getLogger(TaskHandle.class); + + private final String taskId; + private final ConnectorHandle connectorHandle; + private final AtomicInteger partitionsAssigned = new AtomicInteger(0); + + private CountDownLatch recordsRemainingLatch; + private int expectedRecords = -1; + + public TaskHandle(ConnectorHandle connectorHandle, String taskId) { + log.info("Created task {} for connector {}", taskId, connectorHandle); + this.taskId = taskId; + this.connectorHandle = connectorHandle; + } + + /** + * Record a message arrival at the task. + */ + public void record() { + if (recordsRemainingLatch != null) { + recordsRemainingLatch.countDown(); + } + connectorHandle.record(); + } + + /** + * Set the number of expected records for this task. + * + * @param expectedRecords number of records + */ + public void expectedRecords(int expectedRecords) { + this.expectedRecords = expectedRecords; + this.recordsRemainingLatch = new CountDownLatch(expectedRecords); + } + + /** + * Set the number of partitions assigned to this task. + * + * @param numPartitions number of partitions + */ + public void partitionsAssigned(int numPartitions) { + partitionsAssigned.set(numPartitions); + } + + /** + * @return the number of topic partitions assigned to this task. + */ + public int partitionsAssigned() { + return partitionsAssigned.get(); + } + + /** + * Wait for this task to receive the expected number of records. + * + * @param consumeMaxDurationMs max duration to wait for records + * @throws InterruptedException if another threads interrupts this one while waiting for records + */ + public void awaitRecords(int consumeMaxDurationMs) throws InterruptedException { + if (recordsRemainingLatch == null) { + throw new IllegalStateException("Illegal state encountered. expectedRecords() was not set for this task?"); + } + if (!recordsRemainingLatch.await(consumeMaxDurationMs, TimeUnit.MILLISECONDS)) { + String msg = String.format("Insufficient records seen by task %s in %d millis. Records expected=%d, actual=%d", + taskId, + consumeMaxDurationMs, + expectedRecords, + expectedRecords - recordsRemainingLatch.getCount()); + throw new DataException(msg); + } + log.debug("Task {} saw {} records, expected {} records", taskId, expectedRecords - recordsRemainingLatch.getCount(), expectedRecords); + } + + @Override + public String toString() { + return "Handle{" + + "taskId='" + taskId + '\'' + + '}'; + } +} diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java index 39e35cb01df76..d5802cb5222d2 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.connect.rest.ConnectRestExtension; import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.HerderProvider; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.runtime.isolation.Plugins; @@ -177,7 +178,7 @@ public void testOptionsDoesNotIncludeWadlOutput() { PowerMock.replayAll(); server = new RestServer(workerConfig); - server.start(herder); + server.start(new HerderProvider(herder), herder.plugins()); Response response = request("/connectors") .accept(MediaType.WILDCARD) @@ -214,7 +215,7 @@ public void checkCORSRequest(String corsDomain, String origin, String expectedHe server = new RestServer(workerConfig); - server.start(herder); + server.start(new HerderProvider(herder), herder.plugins()); Response response = request("/connectors") .header("Referer", origin + "/page") diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java index a3aee6a407d2e..684064d30df4d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java @@ -30,6 +30,7 @@ import org.apache.kafka.connect.runtime.AbstractHerder; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.HerderProvider; import org.apache.kafka.connect.runtime.TestSinkConnector; import org.apache.kafka.connect.runtime.TestSourceConnector; import org.apache.kafka.connect.runtime.WorkerConfig; @@ -186,7 +187,7 @@ public void setUp() throws Exception { plugins = PowerMock.createMock(Plugins.class); herder = PowerMock.createMock(AbstractHerder.class); - connectorPluginsResource = new ConnectorPluginsResource(herder); + connectorPluginsResource = new ConnectorPluginsResource(new HerderProvider(herder)); } private void expectPlugins() { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java index f84cd258fd4ef..5a520744bcd26 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.connect.errors.NotFoundException; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.HerderProvider; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.distributed.NotAssignedException; import org.apache.kafka.connect.runtime.distributed.NotLeaderException; @@ -126,7 +127,7 @@ public class ConnectorsResourceTest { public void setUp() throws NoSuchMethodException { PowerMock.mockStatic(RestClient.class, RestClient.class.getMethod("httpRequest", String.class, String.class, Object.class, TypeReference.class, WorkerConfig.class)); - connectorsResource = new ConnectorsResource(herder, null); + connectorsResource = new ConnectorsResource(new HerderProvider(herder), null); } private static final Map getConnectorConfig(Map mapToClone) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java index 4e928a370372f..be80e28f42da9 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.HerderProvider; import org.apache.kafka.connect.runtime.rest.entities.ServerInfo; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; @@ -39,7 +40,7 @@ public class RootResourceTest extends EasyMockSupport { @Before public void setUp() { - rootResource = new RootResource(herder); + rootResource = new RootResource(new HerderProvider(herder)); } @Test diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java new file mode 100644 index 0000000000000..9ba0e06bfd013 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util.clusters; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.connect.cli.ConnectDistributed; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.runtime.Connect; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; +import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStreamWriter; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.WorkerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.connect.runtime.WorkerConfig.REST_HOST_NAME_CONFIG; +import static org.apache.kafka.connect.runtime.WorkerConfig.REST_PORT_CONFIG; +import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG; +import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONFIG_TOPIC_CONFIG; +import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG; +import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG; +import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG; +import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG; + +/** + * Start an embedded connect worker. Internally, this class will spin up a Kafka and Zk cluster, setup any tmp + * directories and clean up them on them. + */ +public class EmbeddedConnectCluster { + + private static final Logger log = LoggerFactory.getLogger(EmbeddedConnectCluster.class); + + private static final int DEFAULT_NUM_BROKERS = 1; + private static final int DEFAULT_NUM_WORKERS = 1; + private static final Properties DEFAULT_BROKER_CONFIG = new Properties(); + private static final String REST_HOST_NAME = "localhost"; + + private final Connect[] connectCluster; + private final EmbeddedKafkaCluster kafkaCluster; + private final Map workerProps; + private final String connectClusterName; + private final int numBrokers; + + private EmbeddedConnectCluster(String name, Map workerProps, int numWorkers, int numBrokers, Properties brokerProps) { + this.workerProps = workerProps; + this.connectClusterName = name; + this.numBrokers = numBrokers; + this.kafkaCluster = new EmbeddedKafkaCluster(numBrokers, brokerProps); + this.connectCluster = new Connect[numWorkers]; + } + + /** + * Start the connect cluster and the embedded Kafka and Zookeeper cluster. + */ + public void start() throws IOException { + kafkaCluster.before(); + startConnect(); + } + + /** + * Stop the connect cluster and the embedded Kafka and Zookeeper cluster. + * Clean up any temp directories created locally. + */ + public void stop() { + for (Connect worker : this.connectCluster) { + try { + worker.stop(); + } catch (Exception e) { + log.error("Could not stop connect", e); + throw new RuntimeException("Could not stop worker", e); + } + } + + try { + kafkaCluster.after(); + } catch (Exception e) { + log.error("Could not stop kafka", e); + throw new RuntimeException("Could not stop brokers", e); + } + } + + @SuppressWarnings("deprecation") + public void startConnect() { + log.info("Starting Connect cluster with {} workers. clusterName {}", connectCluster.length, connectClusterName); + + workerProps.put(BOOTSTRAP_SERVERS_CONFIG, kafka().bootstrapServers()); + workerProps.put(REST_HOST_NAME_CONFIG, REST_HOST_NAME); + workerProps.put(REST_PORT_CONFIG, "0"); // use a random available port + + String internalTopicsReplFactor = String.valueOf(numBrokers); + putIfAbsent(workerProps, GROUP_ID_CONFIG, "connect-integration-test-" + connectClusterName); + putIfAbsent(workerProps, OFFSET_STORAGE_TOPIC_CONFIG, "connect-offset-topic-" + connectClusterName); + putIfAbsent(workerProps, OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG, internalTopicsReplFactor); + putIfAbsent(workerProps, CONFIG_TOPIC_CONFIG, "connect-config-topic-" + connectClusterName); + putIfAbsent(workerProps, CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG, internalTopicsReplFactor); + putIfAbsent(workerProps, STATUS_STORAGE_TOPIC_CONFIG, "connect-storage-topic-" + connectClusterName); + putIfAbsent(workerProps, STATUS_STORAGE_REPLICATION_FACTOR_CONFIG, internalTopicsReplFactor); + putIfAbsent(workerProps, KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); + putIfAbsent(workerProps, VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); + + for (int i = 0; i < connectCluster.length; i++) { + connectCluster[i] = new ConnectDistributed().startConnect(workerProps); + } + } + + /** + * Configure a connector. If the connector does not already exist, a new one will be created and + * the given configuration will be applied to it. + * + * @param connName the name of the connector + * @param connConfig the intended configuration + * @throws IOException if call to the REST api fails. + * @throws ConnectRestException if REST api returns error status + */ + public void configureConnector(String connName, Map connConfig) throws IOException { + String url = endpointForResource(String.format("connectors/%s/config", connName)); + ObjectMapper mapper = new ObjectMapper(); + int status; + try { + String content = mapper.writeValueAsString(connConfig); + status = executePut(url, content); + } catch (IOException e) { + log.error("Could not execute PUT request to " + url, e); + throw e; + } + if (status >= HttpServletResponse.SC_BAD_REQUEST) { + throw new ConnectRestException(status, "Could not execute PUT request"); + } + } + + /** + * Delete an existing connector. + * + * @param connName name of the connector to be deleted + * @throws IOException if call to the REST api fails. + */ + public void deleteConnector(String connName) throws IOException { + String url = endpointForResource(String.format("connectors/%s", connName)); + int status = executeDelete(url); + if (status >= HttpServletResponse.SC_BAD_REQUEST) { + throw new ConnectRestException(status, "Could not execute DELETE request."); + } + } + + public ConnectorStateInfo connectorStatus(String connectorName) { + ObjectMapper mapper = new ObjectMapper(); + String url = endpointForResource(String.format("connectors/%s/status", connectorName)); + try { + return mapper.readerFor(ConnectorStateInfo.class).readValue(executeGet(url)); + } catch (IOException e) { + log.error("Could not read connector state", e); + throw new ConnectException("Could not read connector state", e); + } + } + + private String endpointForResource(String resource) { + String url = String.valueOf(connectCluster[0].restUrl()); + return url + resource; + } + + private static void putIfAbsent(Map props, String propertyKey, String propertyValue) { + if (!props.containsKey(propertyKey)) { + props.put(propertyKey, propertyValue); + } + } + + public EmbeddedKafkaCluster kafka() { + return kafkaCluster; + } + + public int executePut(String url, String body) throws IOException { + log.debug("Executing PUT request to URL={}. Payload={}", url, body); + HttpURLConnection httpCon = (HttpURLConnection) new URL(url).openConnection(); + httpCon.setDoOutput(true); + httpCon.setRequestProperty("Content-Type", "application/json"); + httpCon.setRequestMethod("PUT"); + try (OutputStreamWriter out = new OutputStreamWriter(httpCon.getOutputStream())) { + out.write(body); + } + try (InputStream is = httpCon.getInputStream()) { + int c; + StringBuilder response = new StringBuilder(); + while ((c = is.read()) != -1) { + response.append((char) c); + } + log.info("Put response for URL={} is {}", url, response); + } + return httpCon.getResponseCode(); + } + + public String executeGet(String url) throws IOException { + log.debug("Executing GET request to URL={}.", url); + HttpURLConnection httpCon = (HttpURLConnection) new URL(url).openConnection(); + httpCon.setDoOutput(true); + httpCon.setRequestMethod("GET"); + try (InputStream is = httpCon.getInputStream()) { + int c; + StringBuilder response = new StringBuilder(); + while ((c = is.read()) != -1) { + response.append((char) c); + } + log.debug("Get response for URL={} is {}", url, response); + return response.toString(); + } + } + + public int executeDelete(String url) throws IOException { + log.debug("Executing DELETE request to URL={}", url); + HttpURLConnection httpCon = (HttpURLConnection) new URL(url).openConnection(); + httpCon.setDoOutput(true); + httpCon.setRequestMethod("DELETE"); + httpCon.connect(); + return httpCon.getResponseCode(); + } + + public static class Builder { + private String name = UUID.randomUUID().toString(); + private Map workerProps = new HashMap<>(); + private int numWorkers = DEFAULT_NUM_WORKERS; + private int numBrokers = DEFAULT_NUM_BROKERS; + private Properties brokerProps = DEFAULT_BROKER_CONFIG; + + public Builder name(String name) { + this.name = name; + return this; + } + + public Builder workerProps(Map workerProps) { + this.workerProps = workerProps; + return this; + } + + public Builder numWorkers(int numWorkers) { + this.numWorkers = numWorkers; + return this; + } + + public Builder numBrokers(int numBrokers) { + this.numBrokers = numBrokers; + return this; + } + + public Builder brokerProps(Properties brokerProps) { + this.brokerProps = brokerProps; + return this; + } + + public EmbeddedConnectCluster build() { + return new EmbeddedConnectCluster(name, workerProps, numWorkers, numBrokers, brokerProps); + } + } + +} diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java new file mode 100644 index 0000000000000..109ba14a0ddb2 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util.clusters; + +import kafka.server.KafkaConfig; +import kafka.server.KafkaConfig$; +import kafka.server.KafkaServer; +import kafka.utils.CoreUtils; +import kafka.utils.TestUtils; +import kafka.zk.EmbeddedZookeeper; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.errors.ConnectException; +import org.junit.rules.ExternalResource; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; + +/** + * Setup an embedded Kafka cluster with specified number of brokers and specified broker properties. To be used for + * integration tests. + */ +public class EmbeddedKafkaCluster extends ExternalResource { + + private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); + + private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 500; + + // Kafka Config + private final KafkaServer[] brokers; + private final Properties brokerConfig; + private final Time time = new MockTime(); + + private EmbeddedZookeeper zookeeper = null; + private ListenerName listenerName = new ListenerName("PLAINTEXT"); + private KafkaProducer producer; + + public EmbeddedKafkaCluster(final int numBrokers, + final Properties brokerConfig) { + brokers = new KafkaServer[numBrokers]; + this.brokerConfig = brokerConfig; + } + + @Override + protected void before() throws IOException { + start(); + } + + @Override + protected void after() { + stop(); + } + + private void start() throws IOException { + zookeeper = new EmbeddedZookeeper(); + + brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString()); + brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), 0); // pick a random port + + putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.HostNameProp(), "localhost"); + putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true); + putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.GroupInitialRebalanceDelayMsProp(), 0); + putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), (short) brokers.length); + putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), false); + + Object listenerConfig = brokerConfig.get(KafkaConfig$.MODULE$.InterBrokerListenerNameProp()); + if (listenerConfig != null) { + listenerName = new ListenerName(listenerConfig.toString()); + } + + for (int i = 0; i < brokers.length; i++) { + brokerConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), i); + brokerConfig.put(KafkaConfig$.MODULE$.LogDirProp(), createLogDir()); + brokers[i] = TestUtils.createServer(new KafkaConfig(brokerConfig, true), time); + } + + Map producerProps = new HashMap<>(); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + producer = new KafkaProducer<>(producerProps); + } + + private void stop() { + + try { + producer.close(); + } catch (Exception e) { + log.error("Could not shutdown producer ", e); + throw new RuntimeException("Could not shutdown producer", e); + } + + for (KafkaServer broker : brokers) { + try { + broker.shutdown(); + } catch (Throwable t) { + String msg = String.format("Could not shutdown broker at %s", address(broker)); + log.error(msg, t); + throw new RuntimeException(msg, t); + } + } + + for (KafkaServer broker : brokers) { + try { + log.info("Cleaning up kafka log dirs at {}", broker.config().logDirs()); + CoreUtils.delete(broker.config().logDirs()); + } catch (Throwable t) { + String msg = String.format("Could not clean up log dirs for broker at %s", address(broker)); + log.error(msg, t); + throw new RuntimeException(msg, t); + } + } + + try { + zookeeper.shutdown(); + } catch (Throwable t) { + String msg = String.format("Could not shutdown zookeeper at %s", zKConnectString()); + log.error(msg, t); + throw new RuntimeException(msg, t); + } + } + + private static void putIfAbsent(final Properties props, final String propertyKey, final Object propertyValue) { + if (!props.containsKey(propertyKey)) { + props.put(propertyKey, propertyValue); + } + } + + private String createLogDir() throws IOException { + TemporaryFolder tmpFolder = new TemporaryFolder(); + tmpFolder.create(); + return tmpFolder.newFolder().getAbsolutePath(); + } + + public String bootstrapServers() { + return Arrays.stream(brokers) + .map(this::address) + .collect(Collectors.joining(",")); + } + + public String address(KafkaServer server) { + return server.config().hostName() + ":" + server.boundPort(listenerName); + } + + public String zKConnectString() { + return "127.0.0.1:" + zookeeper.port(); + } + + /** + * Create a Kafka topic with 1 partition and a replication factor of 1. + * + * @param topic The name of the topic. + */ + public void createTopic(String topic) { + createTopic(topic, 1); + } + + /** + * Create a Kafka topic with given partition and a replication factor of 1. + * + * @param topic The name of the topic. + */ + public void createTopic(String topic, int partitions) { + createTopic(topic, partitions, 1, new HashMap<>()); + } + + /** + * Create a Kafka topic with the given parameters. + * + * @param topic The name of the topic. + * @param partitions The number of partitions for this topic. + * @param replication The replication factor for (partitions of) this topic. + * @param topicConfig Additional topic-level configuration settings. + */ + public void createTopic(String topic, int partitions, int replication, Map topicConfig) { + if (replication > brokers.length) { + throw new InvalidReplicationFactorException("Insufficient brokers (" + + brokers.length + ") for desired replication (" + replication + ")"); + } + + log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }", + topic, partitions, replication, topicConfig); + final NewTopic newTopic = new NewTopic(topic, partitions, (short) replication); + newTopic.configs(topicConfig); + + try (final AdminClient adminClient = createAdminClient()) { + adminClient.createTopics(Collections.singletonList(newTopic)).all().get(); + } catch (final InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + public void produce(String topic, String value) { + produce(topic, null, null, value); + } + + public void produce(String topic, String key, String value) { + produce(topic, null, key, value); + } + + public void produce(String topic, Integer partition, String key, String value) { + ProducerRecord msg = new ProducerRecord<>(topic, partition, key == null ? null : key.getBytes(), value == null ? null : value.getBytes()); + try { + producer.send(msg).get(DEFAULT_PRODUCE_SEND_DURATION_MS, TimeUnit.MILLISECONDS); + } catch (Exception e) { + throw new KafkaException("Could not produce message to topic=" + topic, e); + } + } + + public AdminClient createAdminClient() { + final Properties adminClientConfig = new Properties(); + adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()); + final Object listeners = brokerConfig.get(KafkaConfig$.MODULE$.ListenersProp()); + if (listeners != null && listeners.toString().contains("SSL")) { + adminClientConfig.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)); + adminClientConfig.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ((Password) brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value()); + adminClientConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); + } + return AdminClient.create(adminClientConfig); + } + + /** + * Consume at least n records in a given duration or throw an exception. + * + * @param n the number of expected records in this topic. + * @param maxDuration the max duration to wait for these records (in milliseconds). + * @param topics the topics to subscribe and consume records from. + * @return a {@link ConsumerRecords} collection containing at least n records. + */ + public ConsumerRecords consume(int n, long maxDuration, String... topics) { + Map>> records = new HashMap<>(); + int consumedRecords = 0; + try (KafkaConsumer consumer = createConsumerAndSubscribeTo(Collections.emptyMap(), topics)) { + final long startMillis = System.currentTimeMillis(); + long allowedDuration = maxDuration; + while (allowedDuration > 0) { + log.debug("Consuming from {} for {} millis.", Arrays.toString(topics), allowedDuration); + ConsumerRecords rec = consumer.poll(Duration.ofMillis(allowedDuration)); + if (rec.isEmpty()) { + allowedDuration = maxDuration - (System.currentTimeMillis() - startMillis); + continue; + } + for (TopicPartition partition: rec.partitions()) { + final List> r = rec.records(partition); + records.computeIfAbsent(partition, t -> new ArrayList<>()).addAll(r); + consumedRecords += r.size(); + } + if (consumedRecords >= n) { + return new ConsumerRecords<>(records); + } + allowedDuration = maxDuration - (System.currentTimeMillis() - startMillis); + } + } + + throw new RuntimeException("Could not find enough records. found " + consumedRecords + ", expected " + n); + } + + public KafkaConsumer createConsumer(Map consumerProps) { + Map props = new HashMap<>(consumerProps); + + putIfAbsent(props, GROUP_ID_CONFIG, UUID.randomUUID().toString()); + putIfAbsent(props, BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()); + putIfAbsent(props, ENABLE_AUTO_COMMIT_CONFIG, "false"); + putIfAbsent(props, AUTO_OFFSET_RESET_CONFIG, "earliest"); + putIfAbsent(props, KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + putIfAbsent(props, VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + + KafkaConsumer consumer; + try { + consumer = new KafkaConsumer<>(props); + } catch (Throwable t) { + throw new ConnectException("Failed to create consumer", t); + } + return consumer; + } + + public KafkaConsumer createConsumerAndSubscribeTo(Map consumerProps, String... topics) { + KafkaConsumer consumer = createConsumer(consumerProps); + consumer.subscribe(Arrays.asList(topics)); + return consumer; + } + + private static void putIfAbsent(final Map props, final String propertyKey, final Object propertyValue) { + if (!props.containsKey(propertyKey)) { + props.put(propertyKey, propertyValue); + } + } +} diff --git a/connect/runtime/src/test/resources/log4j.properties b/connect/runtime/src/test/resources/log4j.properties index d5e90fe788f76..1feedb89721bc 100644 --- a/connect/runtime/src/test/resources/log4j.properties +++ b/connect/runtime/src/test/resources/log4j.properties @@ -18,6 +18,7 @@ log4j.rootLogger=OFF, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n +log4j.appender.stdout.layout.ConversionPattern=[%d] (%t) %p %m (%c:%L)%n +log4j.logger.org.reflections=ERROR log4j.logger.org.apache.kafka=ERROR