From 213c3d65e73de846bdf8914fe86dd06d3e55831a Mon Sep 17 00:00:00 2001 From: wanhanbo Date: Sun, 7 Dec 2025 22:02:34 +0800 Subject: [PATCH] feat:support elasticSearch connector --- .../pom.xml | 62 ++++ .../ElasticSearchConfigKeys.java | 98 +++++ .../ElasticSearchTableConnector.java | 46 +++ .../elasticsearch/ElasticSearchTableSink.java | 262 ++++++++++++++ .../ElasticSearchTableSource.java | 334 ++++++++++++++++++ .../utils/ElasticSearchConnectorUtils.java | 46 +++ .../utils/ElasticSearchJsonSerializer.java | 55 +++ ...e.geaflow.dsl.connector.api.TableConnector | 20 ++ .../ElasticsearchTableConnectorTest.java | 68 ++++ .../geaflow-dsl/geaflow-dsl-connector/pom.xml | 1 + 10 files changed, 992 insertions(+) create mode 100644 geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/pom.xml create mode 100644 geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchConfigKeys.java create mode 100644 geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchTableConnector.java create mode 100644 geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchTableSink.java create mode 100644 geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchTableSource.java create mode 100644 geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/utils/ElasticSearchConnectorUtils.java create mode 100644 geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/utils/ElasticSearchJsonSerializer.java create mode 100644 geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/resources/META-INF/services/org.apache.geaflow.dsl.connector.api.TableConnector create mode 100644 geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/test/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableConnectorTest.java diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/pom.xml b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/pom.xml new file mode 100644 index 000000000..b528135e0 --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/pom.xml @@ -0,0 +1,62 @@ + + + + + + org.apache.geaflow + geaflow-dsl-connector + 0.6.8-SNAPSHOT + + + 4.0.0 + + geaflow-dsl-connector-elasticsearch + + + 7.17.0 + + + + + org.apache.geaflow + geaflow-dsl-common + + + + org.apache.geaflow + geaflow-dsl-connector-api + + + + org.elasticsearch.client + elasticsearch-rest-high-level-client + ${elasticsearch.version} + + + + org.testng + testng + ${testng.version} + test + + + diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchConfigKeys.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchConfigKeys.java new file mode 100644 index 000000000..37ea65911 --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchConfigKeys.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.geaflow.dsl.connector.elasticsearch; + +import org.apache.geaflow.common.config.ConfigKey; +import org.apache.geaflow.common.config.ConfigKeys; + +public class ElasticSearchConfigKeys { + + public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_HOSTS = ConfigKeys + .key("geaflow.dsl.elasticsearch.hosts") + .noDefaultValue() + .description("ElasticSearch cluster hosts list (comma-separated)."); + + public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_INDEX = ConfigKeys + .key("geaflow.dsl.elasticsearch.index") + .noDefaultValue() + .description("ElasticSearch index name."); + + public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_TIMESTAMP_FIELD = ConfigKeys + .key("geaflow.dsl.elasticsearch.timestamp.field") + .defaultValue("@timestamp") + .description("ElasticSearch timestamp field name."); + + public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_MAX_FETCH_SIZE = ConfigKeys + .key("geaflow.dsl.elasticsearch.max.fetch.size") + .defaultValue(1000) + .description("Batch size for each fetch operation."); + + + public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_CONNECT_TIMEOUT = ConfigKeys + .key("geaflow.dsl.elasticsearch.connect.timeout.ms") + .defaultValue(5000) + .description("ElasticSearch client connection timeout in milliseconds."); + + public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_SOCKET_TIMEOUT = ConfigKeys + .key("geaflow.dsl.elasticsearch.socket.timeout.ms") + .defaultValue(60000) + .description("ElasticSearch client socket timeout in milliseconds."); + + public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_REQUEST_TIMEOUT = ConfigKeys + .key("geaflow.dsl.elasticsearch.request.timeout.ms") + .defaultValue(1000) + .description("ElasticSearch client request timeout in milliseconds."); + + public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_ID_FIELD = ConfigKeys + .key("geaflow.dsl.elasticsearch.id.field") + .noDefaultValue() + .description("ElasticSearch client field name."); + + public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_FLUSH_INTERVAL_MS = ConfigKeys + .key("geaflow.dsl.elasticsearch.flush.interval.ms") + .defaultValue(1000L) + .description("ElasticSearch client flush interval in milliseconds."); + + public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_BULK_SIZE_MB = ConfigKeys + .key("geaflow.dsl.elasticsearch.bulk.size.mb") + .defaultValue(5L) + .description("ElasticSearch client bulk size in megabytes."); + + public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_CONCURRENT_REQUESTS = ConfigKeys + .key("geaflow.dsl.elasticsearch.concurrent.requests") + .defaultValue(1) + .description("ElasticSearch client concurrent requests"); + + public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_BACKOFF = ConfigKeys + .key("geaflow.dsl.elasticsearch.backoff") + .defaultValue("true") + .description("ElasticSearch client backoff"); + + public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_USERNAME = ConfigKeys + .key("geaflow.dsl.elasticsearch.username") + .noDefaultValue() + .description("Elasticsearch username for authentication."); + + public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_PASSWORD = ConfigKeys + .key("geaflow.dsl.elasticsearch.password") + .noDefaultValue() + .description("Elasticsearch password for authentication."); + + +} \ No newline at end of file diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchTableConnector.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchTableConnector.java new file mode 100644 index 000000000..19fabb549 --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchTableConnector.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.dsl.connector.elasticsearch; + +import org.apache.geaflow.common.config.Configuration; +import org.apache.geaflow.dsl.connector.api.TableReadableConnector; +import org.apache.geaflow.dsl.connector.api.TableSink; +import org.apache.geaflow.dsl.connector.api.TableSource; +import org.apache.geaflow.dsl.connector.api.TableWritableConnector; + +public class ElasticSearchTableConnector implements TableReadableConnector, TableWritableConnector { + + public static final String TYPE = "ELASTICSEARCH"; + + @Override + public String getType() { + return TYPE; + } + + @Override + public TableSource createSource(Configuration conf) { + return new ElasticSearchTableSource(); + } + + @Override + public TableSink createSink(Configuration conf) { + return new ElasticSearchTableSink(); + } +} \ No newline at end of file diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchTableSink.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchTableSink.java new file mode 100644 index 000000000..5d8f02020 --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchTableSink.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.dsl.connector.elasticsearch; + +import org.apache.geaflow.dsl.common.exception.GeaFlowDSLException; +import org.apache.geaflow.dsl.common.types.StructType; +import org.apache.geaflow.dsl.connector.api.TableSink; +import org.apache.geaflow.common.config.Configuration; +import org.apache.geaflow.api.context.RuntimeContext; +import org.apache.geaflow.dsl.common.data.Row; +import org.apache.geaflow.dsl.connector.elasticsearch.utils.ElasticSearchConnectorUtils; +import org.apache.geaflow.dsl.connector.elasticsearch.utils.ElasticSearchJsonSerializer; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.elasticsearch.action.bulk.*; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.TimeValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +public class ElasticSearchTableSink implements TableSink { + + private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchTableSink.class); + + private String hosts; + private String indexName; + private String timestampField; + private String idField; + private int connectTimeout; + private int socketTimeout; + private int requestTimeout; + private long flushIntervalMs; + private long bulkSizeMb; + private int concurrentRequests; + private String backoffEnabled; + private StructType schema; + private String username; + private String password; + + + private transient RestHighLevelClient esClient; + private transient BulkProcessor bulkProcessor; + private transient AtomicInteger pendingDocs = new AtomicInteger(0); + private transient Map failedDocs = new ConcurrentHashMap<>(); + private transient ElasticSearchJsonSerializer elasticSearchJsonSerializer; + + @Override + public void init(Configuration conf, StructType schema) { + this.schema = schema; + this.hosts = conf.getString(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS); + this.indexName = conf.getString(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_INDEX); + this.timestampField = conf.getString(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_TIMESTAMP_FIELD); + this.idField = conf.getString(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_ID_FIELD); + this.flushIntervalMs = conf.getLong(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_FLUSH_INTERVAL_MS); + this.bulkSizeMb = conf.getLong(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_BULK_SIZE_MB); + this.concurrentRequests = conf.getInteger(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_CONCURRENT_REQUESTS); + this.backoffEnabled = conf.getString(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_BACKOFF); + + this.connectTimeout = conf.getInteger(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_CONNECT_TIMEOUT); + this.socketTimeout = conf.getInteger(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_SOCKET_TIMEOUT); + this.requestTimeout = conf.getInteger(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_REQUEST_TIMEOUT); + + this.username = conf.getString(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_USERNAME, ""); + this.password = conf.getString(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_PASSWORD, ""); + } + + @Override + public void open(RuntimeContext context) { + elasticSearchJsonSerializer = new ElasticSearchJsonSerializer(schema, timestampField); + + HttpHost[] httpHosts = ElasticSearchConnectorUtils.toHttpHosts(this.hosts.split(",")); + RestClientBuilder builder= RestClient.builder(httpHosts) + .setRequestConfigCallback( + requestConfigBuilder -> requestConfigBuilder + .setConnectTimeout(connectTimeout) + .setSocketTimeout(socketTimeout) + .setConnectionRequestTimeout(requestTimeout) + ); + + if (username != null && !username.isEmpty() && password != null) { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, + new UsernamePasswordCredentials(username, password)); + + builder.setHttpClientConfigCallback(httpClientBuilder -> + httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider) + ); + + } + this.esClient = new RestHighLevelClient(builder); + + + + BulkProcessor.Listener listener = new BulkProcessor.Listener() { + @Override + public void beforeBulk(long executionId, BulkRequest request) { + LOGGER.debug("Executing bulk request with {} actions", request.numberOfActions()); + } + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + int count = request.numberOfActions(); + pendingDocs.addAndGet(-count); + + if (response.hasFailures()) { + handleBulkFailures(response); + } + } + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + int count = request.numberOfActions(); + pendingDocs.addAndGet(-count); + LOGGER.error("Bulk request failed", failure); + } + }; + BulkProcessor.Builder bulkBuilder = BulkProcessor.builder( + (request, bulkListener) -> esClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), + listener, + "geaflow-bulk-processor" + ); + // bulk size + bulkBuilder.setBulkSize(new ByteSizeValue(bulkSizeMb, ByteSizeUnit.MB)); + // flush interval + bulkBuilder.setFlushInterval(TimeValue.timeValueMillis(flushIntervalMs)); + // concurrent requests + bulkBuilder.setConcurrentRequests(concurrentRequests); + // backoff policy + if ("true".equalsIgnoreCase(backoffEnabled)) { + bulkBuilder.setBackoffPolicy( + BackoffPolicy.exponentialBackoff( + TimeValue.timeValueMillis(100), 3)); + } + this.bulkProcessor = bulkBuilder.build(); + } + + @Override + public void write(Row row) throws IOException { + try{ + Map document = elasticSearchJsonSerializer.convert((row)); + + IndexRequest request = new IndexRequest(indexName) + .source(document); + + if (idField != null) { + int idFieldIndex = schema.indexOf(idField); + if(idFieldIndex >= 0){ + Object idValue = row.getField(idFieldIndex, schema.getType(idFieldIndex)); + request.id(String.valueOf(idValue)); + } + } + bulkProcessor.add(request); + pendingDocs.incrementAndGet(); + }catch (Exception e){ + LOGGER.error("Failed to convert row to document", e); + throw new IOException("Row conversion error", e); + } + + + } + + @Override + public void finish() throws IOException { + try { + flush(); + + // close bulkProcessor and wait for completion + if (bulkProcessor != null) { + bulkProcessor.awaitClose(30, java.util.concurrent.TimeUnit.SECONDS); + } + + // check for failed documents + if (!failedDocs.isEmpty()) { + LOGGER.error("Failed to index {} documents with errors:", pendingDocs.get()); + failedDocs.forEach((msg, count) -> + LOGGER.error(" - {} occurrences: {}", count, msg) + ); + throw new GeaFlowDSLException( + "Failed to index " + failedDocs.values().stream().mapToInt(i->i).sum() + + " documents. See logs for details."); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Bulk processor close interrupted", e); + } + } + + private void flush(){ + if (bulkProcessor != null) { + bulkProcessor.flush(); + } + } + + @Override + public void close() { + try { + // flush remaining documents + flush(); + if (bulkProcessor != null) { + bulkProcessor.close(); + } + if (esClient != null) { + esClient.close(); + } + } catch (IOException e) { + throw new GeaFlowDSLException("Failed to close Elasticsearch client", e); + } + } + + + private void handleBulkFailures(BulkResponse response) { + AtomicInteger failureCount = new AtomicInteger(); + for (BulkItemResponse item : response) { + if (item.isFailed()) { + failureCount.incrementAndGet(); + String docId = extractDocId(item); + failedDocs.merge(item.getFailureMessage(), 1, Integer::sum); + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Failed to index document {}: {}", docId, item.getFailureMessage()); + } + } + } + LOGGER.warn("Bulk operation had {} failures out of {} requests", + failureCount.get(), response.getItems().length); + } + + private String extractDocId(BulkItemResponse item) { + return item.getFailure().getId() != null ? + item.getFailure().getId() : + "unknown"; + } +} \ No newline at end of file diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchTableSource.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchTableSource.java new file mode 100644 index 000000000..afb201ac7 --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchTableSource.java @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.dsl.connector.elasticsearch; + +import org.apache.geaflow.dsl.common.exception.GeaFlowDSLException; +import org.apache.geaflow.dsl.common.types.TableSchema; +import org.apache.geaflow.dsl.connector.api.AbstractTableSource; +import org.apache.geaflow.dsl.connector.api.FetchData; +import org.apache.geaflow.dsl.connector.api.Offset; +import org.apache.geaflow.dsl.connector.api.Partition; +import org.apache.geaflow.dsl.connector.api.serde.DeserializerFactory; +import org.apache.geaflow.dsl.connector.api.serde.TableDeserializer; +import org.apache.geaflow.dsl.connector.api.window.SizeFetchWindow; +import org.apache.geaflow.dsl.connector.api.window.TimeFetchWindow; +import org.apache.geaflow.common.config.Configuration; +import org.apache.geaflow.api.context.RuntimeContext; +import org.apache.geaflow.dsl.connector.elasticsearch.utils.ElasticSearchConnectorUtils; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.search.sort.SortOrder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.*; +import java.util.stream.Collectors; + +public class ElasticSearchTableSource extends AbstractTableSource { + + private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchTableSource.class); + + private String hosts; + private String indexName; + private String timestampField; + private int maxFetchSize; + private int connectTimeout; + private int socketTimeout; + private int requestTimeout; + private String username; + private String password; + + + + private transient RestHighLevelClient esClient; + + @Override + public void init(Configuration conf, TableSchema schema) { + this.hosts = conf.getString(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS); + this.indexName = conf.getString(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_INDEX); + this.timestampField = conf.getString(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_TIMESTAMP_FIELD); + this.maxFetchSize = conf.getInteger(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_MAX_FETCH_SIZE); + + this.connectTimeout = conf.getInteger(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_CONNECT_TIMEOUT); + this.socketTimeout = conf.getInteger(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_SOCKET_TIMEOUT); + this.requestTimeout = conf.getInteger(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_REQUEST_TIMEOUT); + + this.username = conf.getString(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_USERNAME, ""); + this.password = conf.getString(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_PASSWORD, ""); + + } + + + @Override + public void open(RuntimeContext context) { + HttpHost[] httpHosts = ElasticSearchConnectorUtils.toHttpHosts(this.hosts.split(",")); + RestClientBuilder builder= RestClient.builder(httpHosts) + .setRequestConfigCallback( + requestConfigBuilder -> requestConfigBuilder + .setConnectTimeout(connectTimeout) + .setSocketTimeout(socketTimeout) + .setConnectionRequestTimeout(requestTimeout) + ); + + if (username != null && !username.isEmpty() && password != null) { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, + new UsernamePasswordCredentials(username, password)); + + builder.setHttpClientConfigCallback(httpClientBuilder -> + httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider) + ); + + } + this.esClient = new RestHighLevelClient(builder); + } + + @Override + public List listPartitions() { + List partitions = new ArrayList<>(); + + try { + GetSettingsRequest request = new GetSettingsRequest(); + request.indices(indexName); + + GetSettingsResponse response = esClient.indices().getSettings(request, RequestOptions.DEFAULT); + + int shardCount = Integer.parseInt(response.getIndexToSettings() + .get(indexName) + .get("index.number_of_shards")); + for (int shardId = 0; shardId < shardCount; shardId++) { + partitions.add(new ElasticSearchPartition(indexName, shardId)); + } + } catch (Exception e) { + throw new RuntimeException("Failed to fetch Elasticsearch partitions for index: " + indexName, e); + } + + return partitions; + } + + @Override + public TableDeserializer getDeserializer(Configuration conf) { + return DeserializerFactory.loadDeserializer(conf); + } + + @Override + public FetchData fetch(Partition partition, Optional startOffset, TimeFetchWindow window) throws IOException { + ElasticSearchPartition esPartition = (ElasticSearchPartition) partition; + long windowStartTime = window.getStartWindowTime(0); + long windowEndTime = window.getEndWindowTime(0); + + // build range query + RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery(timestampField) + .gte(windowStartTime) + .lt(windowEndTime); + + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + sourceBuilder.query(rangeQuery) + .size(maxFetchSize) + .sort(timestampField, SortOrder.ASC) + .sort("_shard_doc", SortOrder.ASC); + + if (startOffset.isPresent()) { + ElasticSearchOffset offset = (ElasticSearchOffset) startOffset.get(); + sourceBuilder.searchAfter(offset.getSortValues()); + } + + SearchRequest searchRequest = new SearchRequest(indexName); + searchRequest.source(sourceBuilder); + if(esPartition.getShardId() >= 0){ + searchRequest.preference("_shards:" + esPartition.getShardId()); + } + + SearchResponse response = esClient.search(searchRequest, RequestOptions.DEFAULT); + SearchHit[] hits = response.getHits().getHits(); + + ProcessedHits processedHits = processSearchHits(hits, startOffset, maxFetchSize); + + return (FetchData) FetchData.createStreamFetch(processedHits.records, processedHits.nextOffset, processedHits.hasMore); + } + + @Override + public FetchData fetch(Partition partition, Optional startOffset, SizeFetchWindow window) throws IOException { + ElasticSearchPartition esPartition = (ElasticSearchPartition) partition; + long windowSize = Math.min(window.windowSize(), maxFetchSize); + if (window.windowSize() > maxFetchSize) { + LOGGER.warn("windowSize {} exceeds maxFetchSize {}, using {}", + window.windowSize(), maxFetchSize, maxFetchSize); + } + + List records = new ArrayList<>(); + + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder() + .size((int) windowSize) + .sort(timestampField, SortOrder.ASC) + .sort("_shard_doc", SortOrder.ASC); + + if (startOffset.isPresent()) { + ElasticSearchOffset esOffset = (ElasticSearchOffset) startOffset.get(); + sourceBuilder.searchAfter(esOffset.getSortValues()); + } + + SearchRequest searchRequest = new SearchRequest(indexName); + searchRequest.source(sourceBuilder); + if(esPartition.getShardId() >= 0){ + searchRequest.preference("_shards:" + esPartition.getShardId()); + } + + SearchResponse response = esClient.search(searchRequest, RequestOptions.DEFAULT); + SearchHit[] hits = response.getHits().getHits(); + ProcessedHits processedHits = processSearchHits(hits, startOffset, (int) windowSize); + + return (FetchData) FetchData.createStreamFetch(processedHits.records, processedHits.nextOffset, processedHits.hasMore); + } + + private static class ProcessedHits { + List records = new ArrayList<>(); + ElasticSearchOffset nextOffset; + boolean hasMore; + } + + // process search results and nextOffset + private ProcessedHits processSearchHits(SearchHit[] hits, Optional startOffset, int requestedSize) { + ProcessedHits result = new ProcessedHits(); + + for (SearchHit hit : hits) { + result.records.add(hit.getSourceAsString()); + } + + ElasticSearchOffset nextOffset = null; + boolean hasMore = false; + if (hits.length > 0) { + nextOffset = new ElasticSearchOffset(hits[hits.length - 1].getSortValues()); + hasMore = hits.length == requestedSize; + } else { + if (startOffset.isPresent()){ + nextOffset = new ElasticSearchOffset(((ElasticSearchOffset) startOffset.get()).getSortValues()); + } + } + result.hasMore = hasMore; + result.nextOffset = nextOffset; + return result; + } + + + @Override + public void close() { + try { + if(esClient != null){ + esClient.close(); + } + } catch (IOException e) { + throw new GeaFlowDSLException("Failed to close Elasticsearch client", e); + } + } + + public static class ElasticSearchPartition implements Partition { + private final String index; + private final int shardId; + + public ElasticSearchPartition(String index, int shardId) { + this.index = index; + this.shardId = shardId; + } + + @Override + public String getName() { + return index + "-" + shardId; + } + + public String getIndex() { + return index; + } + + public int getShardId() { + return shardId; + } + } + + + public static class ElasticSearchOffset implements Offset { + + private final Object[] sortValues; + private final String sortValuesStr; // for serialization + + public ElasticSearchOffset(Object[] sortValues) { + this.sortValues = sortValues; + this.sortValuesStr = Arrays.stream(sortValues) + .map(Objects::toString) + .collect(Collectors.joining(",")); + } + public Object[] getSortValues() { + return sortValues; + } + @Override + public long getOffset() { + if (sortValues != null && sortValues.length > 0 && sortValues[0] instanceof Number) { + return ((Number) sortValues[0]).longValue(); + } + return 0L; + } + @Override + public boolean isTimestamp() { + return true; + } + + @Override + public String humanReadable() { + if (sortValues == null || sortValues.length == 0) { + return "EmptyOffset"; + } + StringBuilder sb = new StringBuilder("["); + for (Object value : sortValues) { + if (sb.length() > 1) sb.append(","); + sb.append(value); + } + sb.append("]"); + return sb.toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ElasticSearchOffset that = (ElasticSearchOffset) o; + return Arrays.equals(sortValues, that.sortValues); + } + @Override + public int hashCode() { + return Arrays.hashCode(sortValues); + } + } +} \ No newline at end of file diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/utils/ElasticSearchConnectorUtils.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/utils/ElasticSearchConnectorUtils.java new file mode 100644 index 000000000..2329f3f3e --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/utils/ElasticSearchConnectorUtils.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.dsl.connector.elasticsearch.utils; + +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; + +/** + * @author wanhanbo + * @version ElasticSearchConnectorUtils.java, v 0.1 2025年11月15日 22:14 wanhanbo + */ +public class ElasticSearchConnectorUtils { + + public static HttpHost[] toHttpHosts(String[] hostStrings) { + HttpHost[] hosts = new HttpHost[hostStrings.length]; + for (int i = 0; i < hostStrings.length; i++) { + String host = hostStrings[i].trim(); + if (!host.startsWith("http://") && !host.startsWith("https://")) { + host = "http://" + host; //default HTTP + } + hosts[i] = HttpHost.create(host); + } + return hosts; + } + +} \ No newline at end of file diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/utils/ElasticSearchJsonSerializer.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/utils/ElasticSearchJsonSerializer.java new file mode 100644 index 000000000..17134bcfb --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/utils/ElasticSearchJsonSerializer.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.dsl.connector.elasticsearch.utils; + +import org.apache.geaflow.dsl.common.data.Row; +import org.apache.geaflow.dsl.common.types.StructType; + + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ElasticSearchJsonSerializer { + private final StructType schema; + private final String timestampField; + + public ElasticSearchJsonSerializer(StructType schema, String timestampField) { + this.schema = schema; + this.timestampField = timestampField; + } + + public Map convert(Row row) { + Map document = new HashMap<>(); + List fieldNames = schema.getFieldNames(); + for (int i = 0; i < fieldNames.size(); i++) { + String fieldName = fieldNames.get(i); + Object fieldValue = row.getField(i, schema.getType(i)); + document.put(fieldName, fieldValue); + } + + // add timestamp field + if (!document.containsKey(timestampField)) { + document.put(timestampField, System.currentTimeMillis()); + } + + return document; + } +} \ No newline at end of file diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/resources/META-INF/services/org.apache.geaflow.dsl.connector.api.TableConnector b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/resources/META-INF/services/org.apache.geaflow.dsl.connector.api.TableConnector new file mode 100644 index 000000000..d4e71281b --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/resources/META-INF/services/org.apache.geaflow.dsl.connector.api.TableConnector @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +org.apache.geaflow.dsl.connector.elasticsearch.ElasticSearchTableConnector diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/test/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableConnectorTest.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/test/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableConnectorTest.java new file mode 100644 index 000000000..4dcba39ea --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/test/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableConnectorTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.dsl.connector.elasticsearch; + +import org.testng.annotations.Test; +import static org.testng.Assert.*; +import org.apache.geaflow.common.config.Configuration; + + +public class ElasticsearchTableConnectorTest { + + @Test + public void testGetType() { + ElasticSearchTableConnector connector = new ElasticSearchTableConnector(); + assertEquals(connector.getType(), "ELASTICSEARCH"); + } + + @Test + public void testCreateSource() { + ElasticSearchTableConnector connector = new ElasticSearchTableConnector(); + Configuration conf = new Configuration(); + conf.put(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS, "localhost:9200"); + conf.put(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_INDEX, "test_index"); + + // Test that source can be created without throwing exception + assertNotNull(connector.createSource(conf)); + } + + @Test + public void testCreateSink() { + ElasticSearchTableConnector connector = new ElasticSearchTableConnector(); + Configuration conf = new Configuration(); + conf.put(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS, "localhost:9200"); + conf.put(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_INDEX, "test_index"); + + // Test that sink can be created without throwing exception + assertNotNull(connector.createSink(conf)); + } + + @Test + public void testConfigKeys() { + // Test that all config keys are properly defined + assertNotNull(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS); + assertNotNull(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_INDEX); + assertNotNull(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_USERNAME); + assertNotNull(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_PASSWORD); + assertNotNull(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_SOCKET_TIMEOUT); + assertNotNull(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_BACKOFF); + assertNotNull(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_BULK_SIZE_MB); + } +} \ No newline at end of file diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/pom.xml b/geaflow/geaflow-dsl/geaflow-dsl-connector/pom.xml index b7c9821a6..d43e32050 100644 --- a/geaflow/geaflow-dsl/geaflow-dsl-connector/pom.xml +++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/pom.xml @@ -47,6 +47,7 @@ geaflow-dsl-connector-pulsar geaflow-dsl-connector-random geaflow-dsl-connector-paimon + geaflow-dsl-connector-elasticsearch