diff --git a/connectors/rocketmq-connect-clickhouse/README.md b/connectors/rocketmq-connect-clickhouse/README.md new file mode 100644 index 00000000..48b19874 --- /dev/null +++ b/connectors/rocketmq-connect-clickhouse/README.md @@ -0,0 +1,51 @@ +##### ClickHouseSourceConnector fully-qualified name +org.apache.rocketmq.connect.clickhouse.source.ClickHouseSourceConnector + +**clickhouse-source-connector** start + +``` +POST http://${runtime-ip}:${runtime-port}/connectors/clickhouseSourceConnector +{ + "connector.class":"org.apache.rocketmq.connect.clickhouse.source.ClickHouseSourceConnector", + "clickhousehost":"localhost", + "clickhouseport":8123, + "database":"default", + "username":"default", + "password":"123456", + "table":"tableName", + "topic":"testClickHouseTopic", + "value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", + "key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" +} +``` + +##### ClickHouseSinkConnector fully-qualified name +org.apache.rocketmq.connect.clickhouse.sink.ClickHouseSinkConnector + +**clickhouse-sink-connector** start + +``` +POST http://${runtime-ip}:${runtime-port}/connectors/clickhouseSinkConnector +{ + "connector.class":"org.apache.rocketmq.connect.clickhouse.sink.ClickHouseSinkConnector", + "clickhousehost":"localhost", + "clickhouseport":8123, + "database":"clickhouse", + "username":"default", + "password":"123456", + "connect.topicnames":"testClickHouseTopic", + "value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", + "key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" +} +``` + +##### parameter configuration + +| parameter | effect | required | default | +|--------------------|---------------------------------------------------|-------------------|---------| +| clickhousehost | The Host of the Clickhouse server | yes | null | +| clickhouseport | The Port of the Clickhouse server | yes | null | +| database | The database to read or write | yes | null | +| table | The source table to read | yes (source only) | null | +| topic | RocketMQ topic for source connector to write into | yes (source only) | null | +| connect.topicnames | RocketMQ topic for sink connector to read from | yes (sink only) | null | diff --git a/connectors/rocketmq-connect-clickhouse/pom.xml b/connectors/rocketmq-connect-clickhouse/pom.xml new file mode 100644 index 00000000..76d7b868 --- /dev/null +++ b/connectors/rocketmq-connect-clickhouse/pom.xml @@ -0,0 +1,204 @@ + + + + + 4.0.0 + + org.apache.rocketmq + rocketmq-connect-clickhouse + 1.0-SNAPSHOT + + connect-clickhouse + + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + + + + + jira + https://issues.apache.org/jira/browse/RocketMQ + + + + + + org.codehaus.mojo + versions-maven-plugin + 2.3 + + + org.codehaus.mojo + clirr-maven-plugin + 2.7 + + + maven-compiler-plugin + 3.6.1 + + ${maven.compiler.source} + ${maven.compiler.target} + ${maven.compiler.source} + true + true + + + + maven-surefire-plugin + 2.19.1 + + -Xms512m -Xmx1024m + always + + **/*Test.java + + + + + maven-site-plugin + 3.6 + + en_US + UTF-8 + UTF-8 + + + + maven-source-plugin + 3.0.1 + + + attach-sources + + jar + + + + + + maven-javadoc-plugin + 2.10.4 + + UTF-8 + en_US + io.openmessaging.internal + + + + aggregate + + aggregate + + site + + + + + maven-resources-plugin + 3.0.2 + + ${project.build.sourceEncoding} + + + + org.codehaus.mojo + findbugs-maven-plugin + 3.0.4 + + + org.apache.rat + apache-rat-plugin + 0.12 + + + README.md + README-CN.md + + + + + maven-assembly-plugin + 3.0.0 + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + + + + 8 + 8 + UTF-8 + + + + io.openmessaging + openmessaging-connector + 0.1.4 + compile + + + com.clickhouse + clickhouse-jdbc + 0.4.5 + + all + + + + org.lz4 + lz4-java + 1.8.0 + + + + com.clickhouse + + clickhouse-http-client + 0.4.5 + + + com.alibaba + fastjson + 1.2.83 + compile + + + junit + junit + RELEASE + test + + + org.slf4j + slf4j-api + 1.7.7 + + + + \ No newline at end of file diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseBaseConfig.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseBaseConfig.java new file mode 100644 index 00000000..64eb6766 --- /dev/null +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseBaseConfig.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.clickhouse.config; + +import io.openmessaging.KeyValue; +import java.lang.reflect.Method; + +public class ClickHouseBaseConfig { + + private String clickHouseHost; + + private Integer clickHousePort; + + private String database; + + private String userName; + + private String passWord; + + private String accessToken; + + private String topic; + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getClickHouseHost() { + return clickHouseHost; + } + + public void setClickHouseHost(String clickHouseHost) { + this.clickHouseHost = clickHouseHost; + } + + public Integer getClickHousePort() { + return clickHousePort; + } + + public void setClickHousePort(Integer clickHousePort) { + this.clickHousePort = clickHousePort; + } + + public String getUserName() { + return userName; + } + + public void setUserName(String userName) { + this.userName = userName; + } + + public String getPassWord() { + return passWord; + } + + public void setPassWord(String passWord) { + this.passWord = passWord; + } + + public String getAccessToken() { + return accessToken; + } + + public void setAccessToken(String accessToken) { + this.accessToken = accessToken; + } + + public String getDatabase() { + return database; + } + + public void setDatabase(String database) { + this.database = database; + } + + public void load(KeyValue props) { + properties2Object(props, this); + } + + private void properties2Object(final KeyValue p, final Object object) { + + Method[] methods = object.getClass().getMethods(); + for (Method method : methods) { + String mn = method.getName(); + if (mn.startsWith("set")) { + try { + String tmp = mn.substring(3); + String key = tmp.toLowerCase(); + + String property = p.getString(key); + if (property != null) { + Class[] pt = method.getParameterTypes(); + if (pt != null && pt.length > 0) { + String cn = pt[0].getSimpleName(); + Object arg; + if (cn.equals("int") || cn.equals("Integer")) { + arg = Integer.parseInt(property); + } else if (cn.equals("long") || cn.equals("Long")) { + arg = Long.parseLong(property); + } else if (cn.equals("double") || cn.equals("Double")) { + arg = Double.parseDouble(property); + } else if (cn.equals("boolean") || cn.equals("Boolean")) { + arg = Boolean.parseBoolean(property); + } else if (cn.equals("float") || cn.equals("Float")) { + arg = Float.parseFloat(property); + } else if (cn.equals("String")) { + arg = property; + } else { + continue; + } + method.invoke(object, arg); + } + } + } catch (Throwable ignored) { + } + } + } + } +} diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseConstants.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseConstants.java new file mode 100644 index 00000000..e9c026d1 --- /dev/null +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseConstants.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.clickhouse.config; + +public class ClickHouseConstants { + public static final String CLICKHOUSE_HOST = "clickhousehost"; + + public static final String CLICKHOUSE_PORT = "clickhouseport"; + + public static final String CLICKHOUSE_DATABASE = "database"; + + public static final String CLICKHOUSE_USERNAME = "username"; + + public static final String CLICKHOUSE_PASSWORD = "password"; + + public static final String CLICKHOUSE_ACCESSTOKEN = "accesstoken"; + + public static final String CLICKHOUSE_TABLE = "table"; + + public static final String TOPIC = "topic"; + + public static final String CLICKHOUSE_OFFSET = "OFFSET"; + + public static final String CLICKHOUSE_PARTITION = "CLICKHOUSE_PARTITION"; + + public static final Integer timeoutSecondsDefault = 30; + + public static final int MILLI_IN_A_SEC = 1000; + + public static final Integer retryCountDefault = 3; + + public static final int MAX_NUMBER_SEND_CONNECT_RECORD_EACH_TIME = 2000; + +} diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseSinkConfig.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseSinkConfig.java new file mode 100644 index 00000000..2353d1a8 --- /dev/null +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseSinkConfig.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.clickhouse.config; + +import java.util.HashSet; +import java.util.Set; + +public class ClickHouseSinkConfig extends ClickHouseBaseConfig { + public static final Set SINK_REQUEST_CONFIG = new HashSet() { + { + add(ClickHouseConstants.CLICKHOUSE_HOST); + add(ClickHouseConstants.CLICKHOUSE_PORT); + } + }; +} diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseSourceConfig.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseSourceConfig.java new file mode 100644 index 00000000..c7e754d9 --- /dev/null +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseSourceConfig.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.clickhouse.config; + +import java.util.HashSet; +import java.util.Set; + +public class ClickHouseSourceConfig extends ClickHouseBaseConfig { + + public static final Set REQUEST_CONFIG = new HashSet() { + { + add(ClickHouseConstants.CLICKHOUSE_HOST); + add(ClickHouseConstants.CLICKHOUSE_PORT); + add(ClickHouseConstants.CLICKHOUSE_TABLE); + add(ClickHouseConstants.TOPIC); + } + }; + private String table; + + public String getTable() { + return table; + } + + public void setTable(String table) { + this.table = table; + } +} diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/helper/ClickHouseHelperClient.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/helper/ClickHouseHelperClient.java new file mode 100644 index 00000000..ccd73c8e --- /dev/null +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/helper/ClickHouseHelperClient.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.clickhouse.helper; + +import com.clickhouse.client.ClickHouseClient; +import com.clickhouse.client.ClickHouseCredentials; +import com.clickhouse.client.ClickHouseNode; +import com.clickhouse.client.ClickHouseProtocol; +import com.clickhouse.client.ClickHouseResponse; +import com.clickhouse.data.ClickHouseFormat; +import com.clickhouse.data.ClickHouseOutputStream; +import com.clickhouse.data.ClickHouseRecord; +import com.clickhouse.data.ClickHouseWriter; +import com.clickhouse.jdbc.ClickHouseDataSource; +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import org.apache.rocketmq.connect.clickhouse.config.ClickHouseConstants; +import org.apache.rocketmq.connect.clickhouse.config.ClickHouseBaseConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ClickHouseHelperClient { + + private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseHelperClient.class); + + private ClickHouseBaseConfig config; + private int timeout = ClickHouseConstants.timeoutSecondsDefault * ClickHouseConstants.MILLI_IN_A_SEC; + private ClickHouseNode server = null; + private int retry = ClickHouseConstants.retryCountDefault; + + public ClickHouseHelperClient(ClickHouseBaseConfig config) { + this.config = config; + this.server = create(config); + } + + private ClickHouseNode create(ClickHouseBaseConfig config) { + this.server = ClickHouseNode.builder() + .host(config.getClickHouseHost()) + .port(ClickHouseProtocol.HTTP, config.getClickHousePort()) + .database(config.getDatabase()).credentials(getCredentials(config)) + .build(); + + return this.server; + } + + private ClickHouseCredentials getCredentials(ClickHouseBaseConfig config) { + if (config.getUserName() != null && config.getPassWord() != null) { + return ClickHouseCredentials.fromUserAndPassword(config.getUserName(), config.getPassWord()); + } + if (config.getAccessToken() != null) { + return ClickHouseCredentials.fromAccessToken(config.getAccessToken()); + } + throw new RuntimeException("Credentials cannot be empty!"); + + } + + public boolean ping() { + ClickHouseClient clientPing = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP); + LOGGER.debug(String.format("server [%s] , timeout [%d]", server, timeout)); + int retryCount = 0; + + while (retryCount < retry) { + if (clientPing.ping(server, timeout)) { + clientPing.close(); + return true; + } + retryCount++; + LOGGER.warn(String.format("Ping retry %d out of %d", retryCount, retry)); + } + LOGGER.error("unable to ping to clickhouse server. "); + clientPing.close(); + return false; + } + + public ClickHouseNode getServer() { + return this.server; + } + + public List query(String query) { + return query(query, ClickHouseFormat.RowBinaryWithNamesAndTypes); + } + + public List query(String query, ClickHouseFormat clickHouseFormat) { + int retryCount = 0; + Exception ce = null; + while (retryCount < retry) { + try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP); + ClickHouseResponse response = client.read(server) + .format(clickHouseFormat) + .query(query) + .execute().get()) { + + List recordList = new ArrayList<>(); + for (ClickHouseRecord r : response.records()) { + recordList.add(r); + } + return recordList; + + } catch (Exception e) { + retryCount++; + LOGGER.warn(String.format("Query retry %d out of %d", retryCount, retry), e); + ce = e; + } + } + throw new RuntimeException(ce); + + } + + private Connection getConnection(String url, Properties properties) throws SQLException { + ClickHouseDataSource dataSource = new ClickHouseDataSource(url, properties); + Connection conn = dataSource.getConnection(config.getUserName(), config.getPassWord()); + + System.out.println("Connected to: " + conn.getMetaData().getURL()); + return conn; + } + + private boolean insertJson(String jsonString, String table, String sql, String url) { + + try (Connection connection = getConnection(url, new Properties()); + PreparedStatement ps = connection.prepareStatement(sql)) { + ps.setObject(1, new ClickHouseWriter() { + @Override + public void write(ClickHouseOutputStream output) throws IOException { + output.writeBytes(jsonString.getBytes()); + } + }); + ps.executeUpdate(); + + } catch (Exception e) { + return false; + } + return true; + } + + public void insertJson(String jsonString, String table) { + String url = String.format("jdbc:clickhouse://%s:%s/%s", config.getClickHouseHost(), config.getClickHousePort(), config.getDatabase()); + String sql = String.format("INSERT INTO %s FORMAT JSONEachRow", table); + int retryCount = 0; + + while (retryCount < this.retry) { + if (insertJson(jsonString, table, sql, url)) { + return; + } + } + LOGGER.error(String.format("Insert into table %s error", table)); + } + +} diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkConnector.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkConnector.java new file mode 100644 index 00000000..ae236ca2 --- /dev/null +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkConnector.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.clickhouse.sink; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.Task; +import io.openmessaging.connector.api.component.task.sink.SinkConnector; +import java.util.ArrayList; +import java.util.List; +import org.apache.rocketmq.connect.clickhouse.config.ClickHouseSinkConfig; + +public class ClickHouseSinkConnector extends SinkConnector { + + private KeyValue keyValue; + + @Override public List taskConfigs(int maxTasks) { + List configs = new ArrayList<>(); + for (int i = 0; i < maxTasks; i++) { + configs.add(this.keyValue); + } + return configs; + } + + @Override public Class taskClass() { + return ClickHouseSinkTask.class; + } + + @Override public void start(KeyValue value) { + + for (String requestKey : ClickHouseSinkConfig.SINK_REQUEST_CONFIG) { + if (!value.containsKey(requestKey)) { + throw new RuntimeException("Request config key: " + requestKey); + } + } + + this.keyValue = value; + } + + @Override public void stop() { + this.keyValue = null; + } +} diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTask.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTask.java new file mode 100644 index 00000000..2cfc1c12 --- /dev/null +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTask.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.clickhouse.sink; + +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.sink.SinkTask; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.data.Field; +import io.openmessaging.connector.api.data.Struct; +import io.openmessaging.connector.api.errors.ConnectException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.rocketmq.connect.clickhouse.helper.ClickHouseHelperClient; +import org.apache.rocketmq.connect.clickhouse.config.ClickHouseSinkConfig; + +public class ClickHouseSinkTask extends SinkTask { + + public ClickHouseSinkConfig config; + + private ClickHouseHelperClient helperClient; + + @Override public void put(List sinkRecords) throws ConnectException { + if (sinkRecords == null || sinkRecords.size() < 1) { + return; + } + Map valueMap = new HashMap<>(); + for (ConnectRecord record : sinkRecords) { + String table = record.getSchema().getName(); + JSONArray jsonArray = valueMap.getOrDefault(table, new JSONArray()); + + final List fields = record.getSchema().getFields(); + final Struct structData = (Struct) record.getData(); + + JSONObject object = new JSONObject(); + for (Field field : fields) { + object.put(field.getName(), structData.get(field)); + } + + jsonArray.add(object); + valueMap.put(table, jsonArray); + } + + for (Map.Entry entry : valueMap.entrySet()) { + String jsonString = entry.getValue().toString(); + helperClient.insertJson(jsonString, entry.getKey()); + } + + } + + @Override public void start(KeyValue keyValue) { + this.config = new ClickHouseSinkConfig(); + this.config.load(keyValue); + this.helperClient = new ClickHouseHelperClient(this.config); + if (!helperClient.ping()) { + throw new RuntimeException("Cannot connect to clickhouse server!"); + } + } + + @Override public void stop() { + this.helperClient = null; + } +} diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceConnector.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceConnector.java new file mode 100644 index 00000000..3a9f8e4b --- /dev/null +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceConnector.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.clickhouse.source; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.Task; +import io.openmessaging.connector.api.component.task.source.SourceConnector; +import java.util.ArrayList; +import java.util.List; +import org.apache.rocketmq.connect.clickhouse.config.ClickHouseSourceConfig; + +public class ClickHouseSourceConnector extends SourceConnector { + + private KeyValue keyValue; + + @Override public List taskConfigs(int maxTasks) { + List configs = new ArrayList<>(); + for (int i = 0; i < maxTasks; i++) { + configs.add(this.keyValue); + } + return configs; + } + + @Override public Class taskClass() { + return ClickHouseSourceTask.class; + } + + @Override public void start(KeyValue config) { + + for (String requestKey : ClickHouseSourceConfig.REQUEST_CONFIG) { + if (!config.containsKey(requestKey)) { + throw new RuntimeException("Request config key: " + requestKey); + } + } + this.keyValue = config; + + } + + @Override public void stop() { + this.keyValue = null; + } +} diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceTask.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceTask.java new file mode 100644 index 00000000..0de60c2e --- /dev/null +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceTask.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.clickhouse.source; + +import com.clickhouse.data.ClickHouseColumn; +import com.clickhouse.data.ClickHouseRecord; +import com.clickhouse.data.value.UnsignedByte; +import com.clickhouse.data.value.UnsignedInteger; +import com.clickhouse.data.value.UnsignedShort; +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.source.SourceTask; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.data.Field; +import io.openmessaging.connector.api.data.RecordOffset; +import io.openmessaging.connector.api.data.RecordPartition; +import io.openmessaging.connector.api.data.Schema; +import io.openmessaging.connector.api.data.SchemaBuilder; +import io.openmessaging.connector.api.data.Struct; +import io.openmessaging.internal.DefaultKeyValue; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.rocketmq.connect.clickhouse.helper.ClickHouseHelperClient; +import org.apache.rocketmq.connect.clickhouse.config.ClickHouseConstants; +import org.apache.rocketmq.connect.clickhouse.config.ClickHouseSourceConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ClickHouseSourceTask extends SourceTask { + + private static final Logger log = LoggerFactory.getLogger(ClickHouseSourceTask.class); + + private ClickHouseSourceConfig config; + + private ClickHouseHelperClient helperClient; + + @Override public List poll() { + List res = new ArrayList<>(); + long offset = readRecordOffset(); + String sql = buildSql(config.getTable(), ClickHouseConstants.MAX_NUMBER_SEND_CONNECT_RECORD_EACH_TIME, offset); + + try { + List recordList = helperClient.query(sql); + for (ClickHouseRecord clickHouseRecord : recordList) { + res.add(clickHouseRecord2ConnectRecord(clickHouseRecord, ++offset)); + } + } catch (Exception e) { + log.error(String.format("Fail to poll data from clickhouse! Table=%s offset=%d", config.getTable(), offset)); + } + return res; + } + + private long readRecordOffset() { + final RecordOffset positionInfo = this.sourceTaskContext.offsetStorageReader().readOffset(buildRecordPartition(config.getTable())); + if (positionInfo == null) { + return 0; + } + Object offset = positionInfo.getOffset().get(config.getTable() + "_" + ClickHouseConstants.CLICKHOUSE_OFFSET); + return offset == null ? 0 : Long.parseLong(offset.toString()); + } + + private String buildSql(String table, int maxNum, long offset) { + return String.format("SELECT * FROM `%s` LIMIT %d OFFSET %d;", table, maxNum, offset); + } + + private ConnectRecord clickHouseRecord2ConnectRecord(ClickHouseRecord clickHouseRecord, + long offset) throws NoSuchFieldException, IllegalAccessException { + Schema schema = SchemaBuilder.struct().name(config.getTable()).build(); + final List fields = buildFields(clickHouseRecord); + schema.setFields(fields); + final ConnectRecord connectRecord = new ConnectRecord(buildRecordPartition(config.getTable()), + buildRecordOffset(offset), + System.currentTimeMillis(), + schema, + this.buildPayLoad(fields, schema, clickHouseRecord)); + connectRecord.setExtensions(this.buildExtensions(clickHouseRecord)); + return connectRecord; + } + + private List buildFields( + ClickHouseRecord clickHouseRecord) throws NoSuchFieldException, IllegalAccessException { + java.lang.reflect.Field columns = clickHouseRecord.getClass().getDeclaredField("columns"); + columns.setAccessible(true); + List fields = new ArrayList<>(); + for (ClickHouseColumn column : (Iterable) columns.get(clickHouseRecord)) { + fields.add(new Field(column.getColumnIndex(), column.getColumnName(), getSchema(column.getDataType().getObjectClass()))); + } + return fields; + } + + private RecordPartition buildRecordPartition(String partitionValue) { + Map partitionMap = new HashMap<>(); + partitionMap.put(ClickHouseConstants.CLICKHOUSE_PARTITION, partitionValue); + return new RecordPartition(partitionMap); + } + + private Struct buildPayLoad(List fields, Schema schema, ClickHouseRecord clickHouseRecord) { + Struct payLoad = new Struct(schema); + for (int i = 0; i < fields.size(); i++) { + payLoad.put(fields.get(i), clickHouseRecord.getValue(i).asObject()); + } + return payLoad; + } + + private KeyValue buildExtensions(ClickHouseRecord clickHouseRecord) { + KeyValue keyValue = new DefaultKeyValue(); + String topicName = config.getTopic(); + if (topicName == null || topicName.equals("")) { + String connectorName = this.sourceTaskContext.getConnectorName(); + topicName = config.getTable() + "_" + connectorName; + } + keyValue.put(ClickHouseConstants.TOPIC, topicName); + return keyValue; + } + + private RecordOffset buildRecordOffset(long offset) { + Map offsetMap = new HashMap<>(); + offsetMap.put(config.getTable() + "_" + ClickHouseConstants.CLICKHOUSE_OFFSET, offset); + return new RecordOffset(offsetMap); + } + + private static Schema getSchema(Class clazz) { + if (clazz.equals(Byte.class)) { + return SchemaBuilder.int8().build(); + } else if (clazz.equals(Short.class) || clazz.equals(UnsignedByte.class)) { + return SchemaBuilder.int16().build(); + } else if (clazz.equals(Integer.class) || clazz.equals(UnsignedShort.class)) { + return SchemaBuilder.int32().build(); + } else if (clazz.equals(Long.class) || clazz.equals(UnsignedInteger.class)) { + return SchemaBuilder.int64().build(); + } else if (clazz.equals(Float.class)) { + return SchemaBuilder.float32().build(); + } else if (clazz.equals(Double.class)) { + return SchemaBuilder.float64().build(); + } else if (clazz.equals(String.class)) { + return SchemaBuilder.string().build(); + } else if (clazz.equals(Date.class) || clazz.equals(LocalDateTime.class) || clazz.equals(LocalDate.class)) { + return SchemaBuilder.time().build(); + } else if (clazz.equals(Timestamp.class)) { + return SchemaBuilder.timestamp().build(); + } else if (clazz.equals(Boolean.class)) { + return SchemaBuilder.bool().build(); + } + return SchemaBuilder.string().build(); + } + + @Override public void start(KeyValue keyValue) { + this.config = new ClickHouseSourceConfig(); + this.config.load(keyValue); + this.helperClient = new ClickHouseHelperClient(this.config); + if (!helperClient.ping()) { + throw new RuntimeException("Cannot connect to clickhouse server!"); + } + } + + @Override public void stop() { + this.helperClient = null; + } +} diff --git a/connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTaskTest.java b/connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTaskTest.java new file mode 100644 index 00000000..11be1300 --- /dev/null +++ b/connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTaskTest.java @@ -0,0 +1,89 @@ +package org.apache.rocketmq.connect.clickhouse.sink; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.data.RecordOffset; +import io.openmessaging.connector.api.data.RecordPartition; +import io.openmessaging.connector.api.data.Schema; +import io.openmessaging.connector.api.data.SchemaBuilder; +import io.openmessaging.connector.api.data.Struct; +import io.openmessaging.internal.DefaultKeyValue; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.rocketmq.connect.clickhouse.config.ClickHouseConstants; + + +class ClickHouseSinkTaskTest { +// +// private static final String host = "120.48.26.195"; +// private static final String port = "8123"; +// private static final String db = "default"; +// private static final String username = "default"; +// private static final String password = "123456"; +// +// +// +// public static void main(String[] args) { +// List records = new ArrayList<>(); +// // build schema +// Schema schema = SchemaBuilder.struct() +// .name("tableName") +// .field("c1",SchemaBuilder.string().build()) +// .field("c2", SchemaBuilder.string().build()) +// .build(); +// // build record +// String param0 = "1001"; +// Struct struct= new Struct(schema); +// struct.put("c1",param0); +// struct.put("c2",String.format("test-data-%s", param0)); +// +// Schema schema2 = SchemaBuilder.struct() +// .name("t1") +// .field("c1",SchemaBuilder.string().build()) +// .field("c2", SchemaBuilder.string().build()) +// .build(); +// // build record +// Struct struct2= new Struct(schema2); +// struct.put("c1",param0); +// struct.put("c2",String.format("test-data-%s", param0)); +// +// for (int i = 0; i < 4; i++) { +// ConnectRecord record = new ConnectRecord( +// // offset partition +// // offset partition" +// new RecordPartition(new ConcurrentHashMap<>()), +// new RecordOffset(new HashMap<>()), +// System.currentTimeMillis(), +// schema, +// struct +// ); +// records.add(record); +// +// ConnectRecord record2 = new ConnectRecord( +// // offset partition +// // offset partition" +// new RecordPartition(new ConcurrentHashMap<>()), +// new RecordOffset(new HashMap<>()), +// System.currentTimeMillis(), +// schema2, +// struct +// ); +// records.add(record2); +// +// } +// +// ClickHouseSinkTask task = new ClickHouseSinkTask(); +// KeyValue config = new DefaultKeyValue(); +// config.put(ClickHouseConstants.CLICKHOUSE_HOST, host); +// config.put(ClickHouseConstants.CLICKHOUSE_PORT, port); +// config.put(ClickHouseConstants.CLICKHOUSE_DATABASE, db); +// config.put(ClickHouseConstants.CLICKHOUSE_USERNAME, username); +// config.put(ClickHouseConstants.CLICKHOUSE_PASSWORD, password); +// task.start(config); +// task.put(records); +// +// } + +} \ No newline at end of file diff --git a/connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceTaskTest.java b/connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceTaskTest.java new file mode 100644 index 00000000..e7a10a80 --- /dev/null +++ b/connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceTaskTest.java @@ -0,0 +1,41 @@ +package org.apache.rocketmq.connect.clickhouse.source; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.internal.DefaultKeyValue; +import java.util.List; +import junit.framework.TestCase; +import org.apache.rocketmq.connect.clickhouse.config.ClickHouseConstants; + +import static java.lang.Thread.sleep; + +public class ClickHouseSourceTaskTest { + +// private static final String host = "120.48.26.195"; +// private static final String port = "8123"; +// private static final String db = "default"; +// private static final String username = "default"; +// private static final String password = "123456"; +// +// public void testPoll() { +// } +// +// public void testStart() throws InterruptedException { +// ClickHouseSourceTask task = new ClickHouseSourceTask(); +// KeyValue config = new DefaultKeyValue(); +// config.put(ClickHouseConstants.CLICKHOUSE_HOST, host); +// config.put(ClickHouseConstants.CLICKHOUSE_PORT, port); +// config.put(ClickHouseConstants.CLICKHOUSE_DATABASE, db); +// config.put(ClickHouseConstants.CLICKHOUSE_USERNAME, username); +// config.put(ClickHouseConstants.CLICKHOUSE_PASSWORD, password); +// config.put(ClickHouseConstants.CLICKHOUSE_TABLE, "tableName"); +// task.start(config); +// while (true) { +// List records = task.poll(); +// for (ConnectRecord r : records) { +// System.out.println(r); +// } +// sleep(3000); +// } +// } +} \ No newline at end of file