diff --git a/connectors/rocketmq-connect-clickhouse/README.md b/connectors/rocketmq-connect-clickhouse/README.md
new file mode 100644
index 00000000..48b19874
--- /dev/null
+++ b/connectors/rocketmq-connect-clickhouse/README.md
@@ -0,0 +1,51 @@
+##### ClickHouseSourceConnector fully-qualified name
+org.apache.rocketmq.connect.clickhouse.source.ClickHouseSourceConnector
+
+**clickhouse-source-connector** start
+
+```
+POST http://${runtime-ip}:${runtime-port}/connectors/clickhouseSourceConnector
+{
+ "connector.class":"org.apache.rocketmq.connect.clickhouse.source.ClickHouseSourceConnector",
+ "clickhousehost":"localhost",
+ "clickhouseport":8123,
+ "database":"default",
+ "username":"default",
+ "password":"123456",
+ "table":"tableName",
+ "topic":"testClickHouseTopic",
+ "value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
+ "key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
+}
+```
+
+##### ClickHouseSinkConnector fully-qualified name
+org.apache.rocketmq.connect.clickhouse.sink.ClickHouseSinkConnector
+
+**clickhouse-sink-connector** start
+
+```
+POST http://${runtime-ip}:${runtime-port}/connectors/clickhouseSinkConnector
+{
+ "connector.class":"org.apache.rocketmq.connect.clickhouse.sink.ClickHouseSinkConnector",
+ "clickhousehost":"localhost",
+ "clickhouseport":8123,
+ "database":"clickhouse",
+ "username":"default",
+ "password":"123456",
+ "connect.topicnames":"testClickHouseTopic",
+ "value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
+ "key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
+}
+```
+
+##### parameter configuration
+
+| parameter | effect | required | default |
+|--------------------|---------------------------------------------------|-------------------|---------|
+| clickhousehost | The Host of the Clickhouse server | yes | null |
+| clickhouseport | The Port of the Clickhouse server | yes | null |
+| database | The database to read or write | yes | null |
+| table | The source table to read | yes (source only) | null |
+| topic | RocketMQ topic for source connector to write into | yes (source only) | null |
+| connect.topicnames | RocketMQ topic for sink connector to read from | yes (sink only) | null |
diff --git a/connectors/rocketmq-connect-clickhouse/pom.xml b/connectors/rocketmq-connect-clickhouse/pom.xml
new file mode 100644
index 00000000..76d7b868
--- /dev/null
+++ b/connectors/rocketmq-connect-clickhouse/pom.xml
@@ -0,0 +1,204 @@
+
+
+
+
+ 4.0.0
+
+ org.apache.rocketmq
+ rocketmq-connect-clickhouse
+ 1.0-SNAPSHOT
+
+ connect-clickhouse
+
+
+
+ The Apache Software License, Version 2.0
+ http://www.apache.org/licenses/LICENSE-2.0.txt
+
+
+
+
+ jira
+ https://issues.apache.org/jira/browse/RocketMQ
+
+
+
+
+
+ org.codehaus.mojo
+ versions-maven-plugin
+ 2.3
+
+
+ org.codehaus.mojo
+ clirr-maven-plugin
+ 2.7
+
+
+ maven-compiler-plugin
+ 3.6.1
+
+ ${maven.compiler.source}
+ ${maven.compiler.target}
+ ${maven.compiler.source}
+ true
+ true
+
+
+
+ maven-surefire-plugin
+ 2.19.1
+
+ -Xms512m -Xmx1024m
+ always
+
+ **/*Test.java
+
+
+
+
+ maven-site-plugin
+ 3.6
+
+ en_US
+ UTF-8
+ UTF-8
+
+
+
+ maven-source-plugin
+ 3.0.1
+
+
+ attach-sources
+
+ jar
+
+
+
+
+
+ maven-javadoc-plugin
+ 2.10.4
+
+ UTF-8
+ en_US
+ io.openmessaging.internal
+
+
+
+ aggregate
+
+ aggregate
+
+ site
+
+
+
+
+ maven-resources-plugin
+ 3.0.2
+
+ ${project.build.sourceEncoding}
+
+
+
+ org.codehaus.mojo
+ findbugs-maven-plugin
+ 3.0.4
+
+
+ org.apache.rat
+ apache-rat-plugin
+ 0.12
+
+
+ README.md
+ README-CN.md
+
+
+
+
+ maven-assembly-plugin
+ 3.0.0
+
+
+ jar-with-dependencies
+
+
+
+
+ make-assembly
+ package
+
+ single
+
+
+
+
+
+
+
+
+ 8
+ 8
+ UTF-8
+
+
+
+ io.openmessaging
+ openmessaging-connector
+ 0.1.4
+ compile
+
+
+ com.clickhouse
+ clickhouse-jdbc
+ 0.4.5
+
+ all
+
+
+
+ org.lz4
+ lz4-java
+ 1.8.0
+
+
+
+ com.clickhouse
+
+ clickhouse-http-client
+ 0.4.5
+
+
+ com.alibaba
+ fastjson
+ 1.2.83
+ compile
+
+
+ junit
+ junit
+ RELEASE
+ test
+
+
+ org.slf4j
+ slf4j-api
+ 1.7.7
+
+
+
+
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseBaseConfig.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseBaseConfig.java
new file mode 100644
index 00000000..64eb6766
--- /dev/null
+++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseBaseConfig.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.clickhouse.config;
+
+import io.openmessaging.KeyValue;
+import java.lang.reflect.Method;
+
+public class ClickHouseBaseConfig {
+
+ private String clickHouseHost;
+
+ private Integer clickHousePort;
+
+ private String database;
+
+ private String userName;
+
+ private String passWord;
+
+ private String accessToken;
+
+ private String topic;
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public String getClickHouseHost() {
+ return clickHouseHost;
+ }
+
+ public void setClickHouseHost(String clickHouseHost) {
+ this.clickHouseHost = clickHouseHost;
+ }
+
+ public Integer getClickHousePort() {
+ return clickHousePort;
+ }
+
+ public void setClickHousePort(Integer clickHousePort) {
+ this.clickHousePort = clickHousePort;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public void setUserName(String userName) {
+ this.userName = userName;
+ }
+
+ public String getPassWord() {
+ return passWord;
+ }
+
+ public void setPassWord(String passWord) {
+ this.passWord = passWord;
+ }
+
+ public String getAccessToken() {
+ return accessToken;
+ }
+
+ public void setAccessToken(String accessToken) {
+ this.accessToken = accessToken;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public void setDatabase(String database) {
+ this.database = database;
+ }
+
+ public void load(KeyValue props) {
+ properties2Object(props, this);
+ }
+
+ private void properties2Object(final KeyValue p, final Object object) {
+
+ Method[] methods = object.getClass().getMethods();
+ for (Method method : methods) {
+ String mn = method.getName();
+ if (mn.startsWith("set")) {
+ try {
+ String tmp = mn.substring(3);
+ String key = tmp.toLowerCase();
+
+ String property = p.getString(key);
+ if (property != null) {
+ Class>[] pt = method.getParameterTypes();
+ if (pt != null && pt.length > 0) {
+ String cn = pt[0].getSimpleName();
+ Object arg;
+ if (cn.equals("int") || cn.equals("Integer")) {
+ arg = Integer.parseInt(property);
+ } else if (cn.equals("long") || cn.equals("Long")) {
+ arg = Long.parseLong(property);
+ } else if (cn.equals("double") || cn.equals("Double")) {
+ arg = Double.parseDouble(property);
+ } else if (cn.equals("boolean") || cn.equals("Boolean")) {
+ arg = Boolean.parseBoolean(property);
+ } else if (cn.equals("float") || cn.equals("Float")) {
+ arg = Float.parseFloat(property);
+ } else if (cn.equals("String")) {
+ arg = property;
+ } else {
+ continue;
+ }
+ method.invoke(object, arg);
+ }
+ }
+ } catch (Throwable ignored) {
+ }
+ }
+ }
+ }
+}
diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseConstants.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseConstants.java
new file mode 100644
index 00000000..e9c026d1
--- /dev/null
+++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseConstants.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.clickhouse.config;
+
+public class ClickHouseConstants {
+ public static final String CLICKHOUSE_HOST = "clickhousehost";
+
+ public static final String CLICKHOUSE_PORT = "clickhouseport";
+
+ public static final String CLICKHOUSE_DATABASE = "database";
+
+ public static final String CLICKHOUSE_USERNAME = "username";
+
+ public static final String CLICKHOUSE_PASSWORD = "password";
+
+ public static final String CLICKHOUSE_ACCESSTOKEN = "accesstoken";
+
+ public static final String CLICKHOUSE_TABLE = "table";
+
+ public static final String TOPIC = "topic";
+
+ public static final String CLICKHOUSE_OFFSET = "OFFSET";
+
+ public static final String CLICKHOUSE_PARTITION = "CLICKHOUSE_PARTITION";
+
+ public static final Integer timeoutSecondsDefault = 30;
+
+ public static final int MILLI_IN_A_SEC = 1000;
+
+ public static final Integer retryCountDefault = 3;
+
+ public static final int MAX_NUMBER_SEND_CONNECT_RECORD_EACH_TIME = 2000;
+
+}
diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseSinkConfig.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseSinkConfig.java
new file mode 100644
index 00000000..2353d1a8
--- /dev/null
+++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseSinkConfig.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.clickhouse.config;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class ClickHouseSinkConfig extends ClickHouseBaseConfig {
+ public static final Set SINK_REQUEST_CONFIG = new HashSet() {
+ {
+ add(ClickHouseConstants.CLICKHOUSE_HOST);
+ add(ClickHouseConstants.CLICKHOUSE_PORT);
+ }
+ };
+}
diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseSourceConfig.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseSourceConfig.java
new file mode 100644
index 00000000..c7e754d9
--- /dev/null
+++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseSourceConfig.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.clickhouse.config;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class ClickHouseSourceConfig extends ClickHouseBaseConfig {
+
+ public static final Set REQUEST_CONFIG = new HashSet() {
+ {
+ add(ClickHouseConstants.CLICKHOUSE_HOST);
+ add(ClickHouseConstants.CLICKHOUSE_PORT);
+ add(ClickHouseConstants.CLICKHOUSE_TABLE);
+ add(ClickHouseConstants.TOPIC);
+ }
+ };
+ private String table;
+
+ public String getTable() {
+ return table;
+ }
+
+ public void setTable(String table) {
+ this.table = table;
+ }
+}
diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/helper/ClickHouseHelperClient.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/helper/ClickHouseHelperClient.java
new file mode 100644
index 00000000..ccd73c8e
--- /dev/null
+++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/helper/ClickHouseHelperClient.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.clickhouse.helper;
+
+import com.clickhouse.client.ClickHouseClient;
+import com.clickhouse.client.ClickHouseCredentials;
+import com.clickhouse.client.ClickHouseNode;
+import com.clickhouse.client.ClickHouseProtocol;
+import com.clickhouse.client.ClickHouseResponse;
+import com.clickhouse.data.ClickHouseFormat;
+import com.clickhouse.data.ClickHouseOutputStream;
+import com.clickhouse.data.ClickHouseRecord;
+import com.clickhouse.data.ClickHouseWriter;
+import com.clickhouse.jdbc.ClickHouseDataSource;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import org.apache.rocketmq.connect.clickhouse.config.ClickHouseConstants;
+import org.apache.rocketmq.connect.clickhouse.config.ClickHouseBaseConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ClickHouseHelperClient {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseHelperClient.class);
+
+ private ClickHouseBaseConfig config;
+ private int timeout = ClickHouseConstants.timeoutSecondsDefault * ClickHouseConstants.MILLI_IN_A_SEC;
+ private ClickHouseNode server = null;
+ private int retry = ClickHouseConstants.retryCountDefault;
+
+ public ClickHouseHelperClient(ClickHouseBaseConfig config) {
+ this.config = config;
+ this.server = create(config);
+ }
+
+ private ClickHouseNode create(ClickHouseBaseConfig config) {
+ this.server = ClickHouseNode.builder()
+ .host(config.getClickHouseHost())
+ .port(ClickHouseProtocol.HTTP, config.getClickHousePort())
+ .database(config.getDatabase()).credentials(getCredentials(config))
+ .build();
+
+ return this.server;
+ }
+
+ private ClickHouseCredentials getCredentials(ClickHouseBaseConfig config) {
+ if (config.getUserName() != null && config.getPassWord() != null) {
+ return ClickHouseCredentials.fromUserAndPassword(config.getUserName(), config.getPassWord());
+ }
+ if (config.getAccessToken() != null) {
+ return ClickHouseCredentials.fromAccessToken(config.getAccessToken());
+ }
+ throw new RuntimeException("Credentials cannot be empty!");
+
+ }
+
+ public boolean ping() {
+ ClickHouseClient clientPing = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP);
+ LOGGER.debug(String.format("server [%s] , timeout [%d]", server, timeout));
+ int retryCount = 0;
+
+ while (retryCount < retry) {
+ if (clientPing.ping(server, timeout)) {
+ clientPing.close();
+ return true;
+ }
+ retryCount++;
+ LOGGER.warn(String.format("Ping retry %d out of %d", retryCount, retry));
+ }
+ LOGGER.error("unable to ping to clickhouse server. ");
+ clientPing.close();
+ return false;
+ }
+
+ public ClickHouseNode getServer() {
+ return this.server;
+ }
+
+ public List query(String query) {
+ return query(query, ClickHouseFormat.RowBinaryWithNamesAndTypes);
+ }
+
+ public List query(String query, ClickHouseFormat clickHouseFormat) {
+ int retryCount = 0;
+ Exception ce = null;
+ while (retryCount < retry) {
+ try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP);
+ ClickHouseResponse response = client.read(server)
+ .format(clickHouseFormat)
+ .query(query)
+ .execute().get()) {
+
+ List recordList = new ArrayList<>();
+ for (ClickHouseRecord r : response.records()) {
+ recordList.add(r);
+ }
+ return recordList;
+
+ } catch (Exception e) {
+ retryCount++;
+ LOGGER.warn(String.format("Query retry %d out of %d", retryCount, retry), e);
+ ce = e;
+ }
+ }
+ throw new RuntimeException(ce);
+
+ }
+
+ private Connection getConnection(String url, Properties properties) throws SQLException {
+ ClickHouseDataSource dataSource = new ClickHouseDataSource(url, properties);
+ Connection conn = dataSource.getConnection(config.getUserName(), config.getPassWord());
+
+ System.out.println("Connected to: " + conn.getMetaData().getURL());
+ return conn;
+ }
+
+ private boolean insertJson(String jsonString, String table, String sql, String url) {
+
+ try (Connection connection = getConnection(url, new Properties());
+ PreparedStatement ps = connection.prepareStatement(sql)) {
+ ps.setObject(1, new ClickHouseWriter() {
+ @Override
+ public void write(ClickHouseOutputStream output) throws IOException {
+ output.writeBytes(jsonString.getBytes());
+ }
+ });
+ ps.executeUpdate();
+
+ } catch (Exception e) {
+ return false;
+ }
+ return true;
+ }
+
+ public void insertJson(String jsonString, String table) {
+ String url = String.format("jdbc:clickhouse://%s:%s/%s", config.getClickHouseHost(), config.getClickHousePort(), config.getDatabase());
+ String sql = String.format("INSERT INTO %s FORMAT JSONEachRow", table);
+ int retryCount = 0;
+
+ while (retryCount < this.retry) {
+ if (insertJson(jsonString, table, sql, url)) {
+ return;
+ }
+ }
+ LOGGER.error(String.format("Insert into table %s error", table));
+ }
+
+}
diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkConnector.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkConnector.java
new file mode 100644
index 00000000..ae236ca2
--- /dev/null
+++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkConnector.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.clickhouse.sink;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.connect.clickhouse.config.ClickHouseSinkConfig;
+
+public class ClickHouseSinkConnector extends SinkConnector {
+
+ private KeyValue keyValue;
+
+ @Override public List taskConfigs(int maxTasks) {
+ List configs = new ArrayList<>();
+ for (int i = 0; i < maxTasks; i++) {
+ configs.add(this.keyValue);
+ }
+ return configs;
+ }
+
+ @Override public Class extends Task> taskClass() {
+ return ClickHouseSinkTask.class;
+ }
+
+ @Override public void start(KeyValue value) {
+
+ for (String requestKey : ClickHouseSinkConfig.SINK_REQUEST_CONFIG) {
+ if (!value.containsKey(requestKey)) {
+ throw new RuntimeException("Request config key: " + requestKey);
+ }
+ }
+
+ this.keyValue = value;
+ }
+
+ @Override public void stop() {
+ this.keyValue = null;
+ }
+}
diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTask.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTask.java
new file mode 100644
index 00000000..2cfc1c12
--- /dev/null
+++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTask.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.clickhouse.sink;
+
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.connector.api.errors.ConnectException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.connect.clickhouse.helper.ClickHouseHelperClient;
+import org.apache.rocketmq.connect.clickhouse.config.ClickHouseSinkConfig;
+
+public class ClickHouseSinkTask extends SinkTask {
+
+ public ClickHouseSinkConfig config;
+
+ private ClickHouseHelperClient helperClient;
+
+ @Override public void put(List sinkRecords) throws ConnectException {
+ if (sinkRecords == null || sinkRecords.size() < 1) {
+ return;
+ }
+ Map valueMap = new HashMap<>();
+ for (ConnectRecord record : sinkRecords) {
+ String table = record.getSchema().getName();
+ JSONArray jsonArray = valueMap.getOrDefault(table, new JSONArray());
+
+ final List fields = record.getSchema().getFields();
+ final Struct structData = (Struct) record.getData();
+
+ JSONObject object = new JSONObject();
+ for (Field field : fields) {
+ object.put(field.getName(), structData.get(field));
+ }
+
+ jsonArray.add(object);
+ valueMap.put(table, jsonArray);
+ }
+
+ for (Map.Entry entry : valueMap.entrySet()) {
+ String jsonString = entry.getValue().toString();
+ helperClient.insertJson(jsonString, entry.getKey());
+ }
+
+ }
+
+ @Override public void start(KeyValue keyValue) {
+ this.config = new ClickHouseSinkConfig();
+ this.config.load(keyValue);
+ this.helperClient = new ClickHouseHelperClient(this.config);
+ if (!helperClient.ping()) {
+ throw new RuntimeException("Cannot connect to clickhouse server!");
+ }
+ }
+
+ @Override public void stop() {
+ this.helperClient = null;
+ }
+}
diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceConnector.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceConnector.java
new file mode 100644
index 00000000..3a9f8e4b
--- /dev/null
+++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceConnector.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.clickhouse.source;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.connect.clickhouse.config.ClickHouseSourceConfig;
+
+public class ClickHouseSourceConnector extends SourceConnector {
+
+ private KeyValue keyValue;
+
+ @Override public List taskConfigs(int maxTasks) {
+ List configs = new ArrayList<>();
+ for (int i = 0; i < maxTasks; i++) {
+ configs.add(this.keyValue);
+ }
+ return configs;
+ }
+
+ @Override public Class extends Task> taskClass() {
+ return ClickHouseSourceTask.class;
+ }
+
+ @Override public void start(KeyValue config) {
+
+ for (String requestKey : ClickHouseSourceConfig.REQUEST_CONFIG) {
+ if (!config.containsKey(requestKey)) {
+ throw new RuntimeException("Request config key: " + requestKey);
+ }
+ }
+ this.keyValue = config;
+
+ }
+
+ @Override public void stop() {
+ this.keyValue = null;
+ }
+}
diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceTask.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceTask.java
new file mode 100644
index 00000000..0de60c2e
--- /dev/null
+++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceTask.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.clickhouse.source;
+
+import com.clickhouse.data.ClickHouseColumn;
+import com.clickhouse.data.ClickHouseRecord;
+import com.clickhouse.data.value.UnsignedByte;
+import com.clickhouse.data.value.UnsignedInteger;
+import com.clickhouse.data.value.UnsignedShort;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.internal.DefaultKeyValue;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.connect.clickhouse.helper.ClickHouseHelperClient;
+import org.apache.rocketmq.connect.clickhouse.config.ClickHouseConstants;
+import org.apache.rocketmq.connect.clickhouse.config.ClickHouseSourceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ClickHouseSourceTask extends SourceTask {
+
+ private static final Logger log = LoggerFactory.getLogger(ClickHouseSourceTask.class);
+
+ private ClickHouseSourceConfig config;
+
+ private ClickHouseHelperClient helperClient;
+
+ @Override public List poll() {
+ List res = new ArrayList<>();
+ long offset = readRecordOffset();
+ String sql = buildSql(config.getTable(), ClickHouseConstants.MAX_NUMBER_SEND_CONNECT_RECORD_EACH_TIME, offset);
+
+ try {
+ List recordList = helperClient.query(sql);
+ for (ClickHouseRecord clickHouseRecord : recordList) {
+ res.add(clickHouseRecord2ConnectRecord(clickHouseRecord, ++offset));
+ }
+ } catch (Exception e) {
+ log.error(String.format("Fail to poll data from clickhouse! Table=%s offset=%d", config.getTable(), offset));
+ }
+ return res;
+ }
+
+ private long readRecordOffset() {
+ final RecordOffset positionInfo = this.sourceTaskContext.offsetStorageReader().readOffset(buildRecordPartition(config.getTable()));
+ if (positionInfo == null) {
+ return 0;
+ }
+ Object offset = positionInfo.getOffset().get(config.getTable() + "_" + ClickHouseConstants.CLICKHOUSE_OFFSET);
+ return offset == null ? 0 : Long.parseLong(offset.toString());
+ }
+
+ private String buildSql(String table, int maxNum, long offset) {
+ return String.format("SELECT * FROM `%s` LIMIT %d OFFSET %d;", table, maxNum, offset);
+ }
+
+ private ConnectRecord clickHouseRecord2ConnectRecord(ClickHouseRecord clickHouseRecord,
+ long offset) throws NoSuchFieldException, IllegalAccessException {
+ Schema schema = SchemaBuilder.struct().name(config.getTable()).build();
+ final List fields = buildFields(clickHouseRecord);
+ schema.setFields(fields);
+ final ConnectRecord connectRecord = new ConnectRecord(buildRecordPartition(config.getTable()),
+ buildRecordOffset(offset),
+ System.currentTimeMillis(),
+ schema,
+ this.buildPayLoad(fields, schema, clickHouseRecord));
+ connectRecord.setExtensions(this.buildExtensions(clickHouseRecord));
+ return connectRecord;
+ }
+
+ private List buildFields(
+ ClickHouseRecord clickHouseRecord) throws NoSuchFieldException, IllegalAccessException {
+ java.lang.reflect.Field columns = clickHouseRecord.getClass().getDeclaredField("columns");
+ columns.setAccessible(true);
+ List fields = new ArrayList<>();
+ for (ClickHouseColumn column : (Iterable extends ClickHouseColumn>) columns.get(clickHouseRecord)) {
+ fields.add(new Field(column.getColumnIndex(), column.getColumnName(), getSchema(column.getDataType().getObjectClass())));
+ }
+ return fields;
+ }
+
+ private RecordPartition buildRecordPartition(String partitionValue) {
+ Map partitionMap = new HashMap<>();
+ partitionMap.put(ClickHouseConstants.CLICKHOUSE_PARTITION, partitionValue);
+ return new RecordPartition(partitionMap);
+ }
+
+ private Struct buildPayLoad(List fields, Schema schema, ClickHouseRecord clickHouseRecord) {
+ Struct payLoad = new Struct(schema);
+ for (int i = 0; i < fields.size(); i++) {
+ payLoad.put(fields.get(i), clickHouseRecord.getValue(i).asObject());
+ }
+ return payLoad;
+ }
+
+ private KeyValue buildExtensions(ClickHouseRecord clickHouseRecord) {
+ KeyValue keyValue = new DefaultKeyValue();
+ String topicName = config.getTopic();
+ if (topicName == null || topicName.equals("")) {
+ String connectorName = this.sourceTaskContext.getConnectorName();
+ topicName = config.getTable() + "_" + connectorName;
+ }
+ keyValue.put(ClickHouseConstants.TOPIC, topicName);
+ return keyValue;
+ }
+
+ private RecordOffset buildRecordOffset(long offset) {
+ Map offsetMap = new HashMap<>();
+ offsetMap.put(config.getTable() + "_" + ClickHouseConstants.CLICKHOUSE_OFFSET, offset);
+ return new RecordOffset(offsetMap);
+ }
+
+ private static Schema getSchema(Class clazz) {
+ if (clazz.equals(Byte.class)) {
+ return SchemaBuilder.int8().build();
+ } else if (clazz.equals(Short.class) || clazz.equals(UnsignedByte.class)) {
+ return SchemaBuilder.int16().build();
+ } else if (clazz.equals(Integer.class) || clazz.equals(UnsignedShort.class)) {
+ return SchemaBuilder.int32().build();
+ } else if (clazz.equals(Long.class) || clazz.equals(UnsignedInteger.class)) {
+ return SchemaBuilder.int64().build();
+ } else if (clazz.equals(Float.class)) {
+ return SchemaBuilder.float32().build();
+ } else if (clazz.equals(Double.class)) {
+ return SchemaBuilder.float64().build();
+ } else if (clazz.equals(String.class)) {
+ return SchemaBuilder.string().build();
+ } else if (clazz.equals(Date.class) || clazz.equals(LocalDateTime.class) || clazz.equals(LocalDate.class)) {
+ return SchemaBuilder.time().build();
+ } else if (clazz.equals(Timestamp.class)) {
+ return SchemaBuilder.timestamp().build();
+ } else if (clazz.equals(Boolean.class)) {
+ return SchemaBuilder.bool().build();
+ }
+ return SchemaBuilder.string().build();
+ }
+
+ @Override public void start(KeyValue keyValue) {
+ this.config = new ClickHouseSourceConfig();
+ this.config.load(keyValue);
+ this.helperClient = new ClickHouseHelperClient(this.config);
+ if (!helperClient.ping()) {
+ throw new RuntimeException("Cannot connect to clickhouse server!");
+ }
+ }
+
+ @Override public void stop() {
+ this.helperClient = null;
+ }
+}
diff --git a/connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTaskTest.java b/connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTaskTest.java
new file mode 100644
index 00000000..11be1300
--- /dev/null
+++ b/connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTaskTest.java
@@ -0,0 +1,89 @@
+package org.apache.rocketmq.connect.clickhouse.sink;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.internal.DefaultKeyValue;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.connect.clickhouse.config.ClickHouseConstants;
+
+
+class ClickHouseSinkTaskTest {
+//
+// private static final String host = "120.48.26.195";
+// private static final String port = "8123";
+// private static final String db = "default";
+// private static final String username = "default";
+// private static final String password = "123456";
+//
+//
+//
+// public static void main(String[] args) {
+// List records = new ArrayList<>();
+// // build schema
+// Schema schema = SchemaBuilder.struct()
+// .name("tableName")
+// .field("c1",SchemaBuilder.string().build())
+// .field("c2", SchemaBuilder.string().build())
+// .build();
+// // build record
+// String param0 = "1001";
+// Struct struct= new Struct(schema);
+// struct.put("c1",param0);
+// struct.put("c2",String.format("test-data-%s", param0));
+//
+// Schema schema2 = SchemaBuilder.struct()
+// .name("t1")
+// .field("c1",SchemaBuilder.string().build())
+// .field("c2", SchemaBuilder.string().build())
+// .build();
+// // build record
+// Struct struct2= new Struct(schema2);
+// struct.put("c1",param0);
+// struct.put("c2",String.format("test-data-%s", param0));
+//
+// for (int i = 0; i < 4; i++) {
+// ConnectRecord record = new ConnectRecord(
+// // offset partition
+// // offset partition"
+// new RecordPartition(new ConcurrentHashMap<>()),
+// new RecordOffset(new HashMap<>()),
+// System.currentTimeMillis(),
+// schema,
+// struct
+// );
+// records.add(record);
+//
+// ConnectRecord record2 = new ConnectRecord(
+// // offset partition
+// // offset partition"
+// new RecordPartition(new ConcurrentHashMap<>()),
+// new RecordOffset(new HashMap<>()),
+// System.currentTimeMillis(),
+// schema2,
+// struct
+// );
+// records.add(record2);
+//
+// }
+//
+// ClickHouseSinkTask task = new ClickHouseSinkTask();
+// KeyValue config = new DefaultKeyValue();
+// config.put(ClickHouseConstants.CLICKHOUSE_HOST, host);
+// config.put(ClickHouseConstants.CLICKHOUSE_PORT, port);
+// config.put(ClickHouseConstants.CLICKHOUSE_DATABASE, db);
+// config.put(ClickHouseConstants.CLICKHOUSE_USERNAME, username);
+// config.put(ClickHouseConstants.CLICKHOUSE_PASSWORD, password);
+// task.start(config);
+// task.put(records);
+//
+// }
+
+}
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceTaskTest.java b/connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceTaskTest.java
new file mode 100644
index 00000000..e7a10a80
--- /dev/null
+++ b/connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceTaskTest.java
@@ -0,0 +1,41 @@
+package org.apache.rocketmq.connect.clickhouse.source;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.internal.DefaultKeyValue;
+import java.util.List;
+import junit.framework.TestCase;
+import org.apache.rocketmq.connect.clickhouse.config.ClickHouseConstants;
+
+import static java.lang.Thread.sleep;
+
+public class ClickHouseSourceTaskTest {
+
+// private static final String host = "120.48.26.195";
+// private static final String port = "8123";
+// private static final String db = "default";
+// private static final String username = "default";
+// private static final String password = "123456";
+//
+// public void testPoll() {
+// }
+//
+// public void testStart() throws InterruptedException {
+// ClickHouseSourceTask task = new ClickHouseSourceTask();
+// KeyValue config = new DefaultKeyValue();
+// config.put(ClickHouseConstants.CLICKHOUSE_HOST, host);
+// config.put(ClickHouseConstants.CLICKHOUSE_PORT, port);
+// config.put(ClickHouseConstants.CLICKHOUSE_DATABASE, db);
+// config.put(ClickHouseConstants.CLICKHOUSE_USERNAME, username);
+// config.put(ClickHouseConstants.CLICKHOUSE_PASSWORD, password);
+// config.put(ClickHouseConstants.CLICKHOUSE_TABLE, "tableName");
+// task.start(config);
+// while (true) {
+// List records = task.poll();
+// for (ConnectRecord r : records) {
+// System.out.println(r);
+// }
+// sleep(3000);
+// }
+// }
+}
\ No newline at end of file