diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/pom.xml b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/pom.xml
new file mode 100644
index 000000000..b528135e0
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/pom.xml
@@ -0,0 +1,62 @@
+
+
+
+
+
+ org.apache.geaflow
+ geaflow-dsl-connector
+ 0.6.8-SNAPSHOT
+
+
+ 4.0.0
+
+ geaflow-dsl-connector-elasticsearch
+
+
+ 7.17.0
+
+
+
+
+ org.apache.geaflow
+ geaflow-dsl-common
+
+
+
+ org.apache.geaflow
+ geaflow-dsl-connector-api
+
+
+
+ org.elasticsearch.client
+ elasticsearch-rest-high-level-client
+ ${elasticsearch.version}
+
+
+
+ org.testng
+ testng
+ ${testng.version}
+ test
+
+
+
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchConfigKeys.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchConfigKeys.java
new file mode 100644
index 000000000..37ea65911
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchConfigKeys.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.geaflow.dsl.connector.elasticsearch;
+
+import org.apache.geaflow.common.config.ConfigKey;
+import org.apache.geaflow.common.config.ConfigKeys;
+
+public class ElasticSearchConfigKeys {
+
+ public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_HOSTS = ConfigKeys
+ .key("geaflow.dsl.elasticsearch.hosts")
+ .noDefaultValue()
+ .description("ElasticSearch cluster hosts list (comma-separated).");
+
+ public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_INDEX = ConfigKeys
+ .key("geaflow.dsl.elasticsearch.index")
+ .noDefaultValue()
+ .description("ElasticSearch index name.");
+
+ public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_TIMESTAMP_FIELD = ConfigKeys
+ .key("geaflow.dsl.elasticsearch.timestamp.field")
+ .defaultValue("@timestamp")
+ .description("ElasticSearch timestamp field name.");
+
+ public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_MAX_FETCH_SIZE = ConfigKeys
+ .key("geaflow.dsl.elasticsearch.max.fetch.size")
+ .defaultValue(1000)
+ .description("Batch size for each fetch operation.");
+
+
+ public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_CONNECT_TIMEOUT = ConfigKeys
+ .key("geaflow.dsl.elasticsearch.connect.timeout.ms")
+ .defaultValue(5000)
+ .description("ElasticSearch client connection timeout in milliseconds.");
+
+ public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_SOCKET_TIMEOUT = ConfigKeys
+ .key("geaflow.dsl.elasticsearch.socket.timeout.ms")
+ .defaultValue(60000)
+ .description("ElasticSearch client socket timeout in milliseconds.");
+
+ public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_REQUEST_TIMEOUT = ConfigKeys
+ .key("geaflow.dsl.elasticsearch.request.timeout.ms")
+ .defaultValue(1000)
+ .description("ElasticSearch client request timeout in milliseconds.");
+
+ public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_ID_FIELD = ConfigKeys
+ .key("geaflow.dsl.elasticsearch.id.field")
+ .noDefaultValue()
+ .description("ElasticSearch client field name.");
+
+ public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_FLUSH_INTERVAL_MS = ConfigKeys
+ .key("geaflow.dsl.elasticsearch.flush.interval.ms")
+ .defaultValue(1000L)
+ .description("ElasticSearch client flush interval in milliseconds.");
+
+ public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_BULK_SIZE_MB = ConfigKeys
+ .key("geaflow.dsl.elasticsearch.bulk.size.mb")
+ .defaultValue(5L)
+ .description("ElasticSearch client bulk size in megabytes.");
+
+ public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_CONCURRENT_REQUESTS = ConfigKeys
+ .key("geaflow.dsl.elasticsearch.concurrent.requests")
+ .defaultValue(1)
+ .description("ElasticSearch client concurrent requests");
+
+ public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_BACKOFF = ConfigKeys
+ .key("geaflow.dsl.elasticsearch.backoff")
+ .defaultValue("true")
+ .description("ElasticSearch client backoff");
+
+ public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_USERNAME = ConfigKeys
+ .key("geaflow.dsl.elasticsearch.username")
+ .noDefaultValue()
+ .description("Elasticsearch username for authentication.");
+
+ public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_PASSWORD = ConfigKeys
+ .key("geaflow.dsl.elasticsearch.password")
+ .noDefaultValue()
+ .description("Elasticsearch password for authentication.");
+
+
+}
\ No newline at end of file
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchTableConnector.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchTableConnector.java
new file mode 100644
index 000000000..19fabb549
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchTableConnector.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.elasticsearch;
+
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.dsl.connector.api.TableReadableConnector;
+import org.apache.geaflow.dsl.connector.api.TableSink;
+import org.apache.geaflow.dsl.connector.api.TableSource;
+import org.apache.geaflow.dsl.connector.api.TableWritableConnector;
+
+public class ElasticSearchTableConnector implements TableReadableConnector, TableWritableConnector {
+
+ public static final String TYPE = "ELASTICSEARCH";
+
+ @Override
+ public String getType() {
+ return TYPE;
+ }
+
+ @Override
+ public TableSource createSource(Configuration conf) {
+ return new ElasticSearchTableSource();
+ }
+
+ @Override
+ public TableSink createSink(Configuration conf) {
+ return new ElasticSearchTableSink();
+ }
+}
\ No newline at end of file
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchTableSink.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchTableSink.java
new file mode 100644
index 000000000..5d8f02020
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchTableSink.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.elasticsearch;
+
+import org.apache.geaflow.dsl.common.exception.GeaFlowDSLException;
+import org.apache.geaflow.dsl.common.types.StructType;
+import org.apache.geaflow.dsl.connector.api.TableSink;
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.api.context.RuntimeContext;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.connector.elasticsearch.utils.ElasticSearchConnectorUtils;
+import org.apache.geaflow.dsl.connector.elasticsearch.utils.ElasticSearchJsonSerializer;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.elasticsearch.action.bulk.*;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.core.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ElasticSearchTableSink implements TableSink {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchTableSink.class);
+
+ private String hosts;
+ private String indexName;
+ private String timestampField;
+ private String idField;
+ private int connectTimeout;
+ private int socketTimeout;
+ private int requestTimeout;
+ private long flushIntervalMs;
+ private long bulkSizeMb;
+ private int concurrentRequests;
+ private String backoffEnabled;
+ private StructType schema;
+ private String username;
+ private String password;
+
+
+ private transient RestHighLevelClient esClient;
+ private transient BulkProcessor bulkProcessor;
+ private transient AtomicInteger pendingDocs = new AtomicInteger(0);
+ private transient Map failedDocs = new ConcurrentHashMap<>();
+ private transient ElasticSearchJsonSerializer elasticSearchJsonSerializer;
+
+ @Override
+ public void init(Configuration conf, StructType schema) {
+ this.schema = schema;
+ this.hosts = conf.getString(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS);
+ this.indexName = conf.getString(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_INDEX);
+ this.timestampField = conf.getString(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_TIMESTAMP_FIELD);
+ this.idField = conf.getString(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_ID_FIELD);
+ this.flushIntervalMs = conf.getLong(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_FLUSH_INTERVAL_MS);
+ this.bulkSizeMb = conf.getLong(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_BULK_SIZE_MB);
+ this.concurrentRequests = conf.getInteger(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_CONCURRENT_REQUESTS);
+ this.backoffEnabled = conf.getString(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_BACKOFF);
+
+ this.connectTimeout = conf.getInteger(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_CONNECT_TIMEOUT);
+ this.socketTimeout = conf.getInteger(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_SOCKET_TIMEOUT);
+ this.requestTimeout = conf.getInteger(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_REQUEST_TIMEOUT);
+
+ this.username = conf.getString(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_USERNAME, "");
+ this.password = conf.getString(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_PASSWORD, "");
+ }
+
+ @Override
+ public void open(RuntimeContext context) {
+ elasticSearchJsonSerializer = new ElasticSearchJsonSerializer(schema, timestampField);
+
+ HttpHost[] httpHosts = ElasticSearchConnectorUtils.toHttpHosts(this.hosts.split(","));
+ RestClientBuilder builder= RestClient.builder(httpHosts)
+ .setRequestConfigCallback(
+ requestConfigBuilder -> requestConfigBuilder
+ .setConnectTimeout(connectTimeout)
+ .setSocketTimeout(socketTimeout)
+ .setConnectionRequestTimeout(requestTimeout)
+ );
+
+ if (username != null && !username.isEmpty() && password != null) {
+ final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(AuthScope.ANY,
+ new UsernamePasswordCredentials(username, password));
+
+ builder.setHttpClientConfigCallback(httpClientBuilder ->
+ httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
+ );
+
+ }
+ this.esClient = new RestHighLevelClient(builder);
+
+
+
+ BulkProcessor.Listener listener = new BulkProcessor.Listener() {
+ @Override
+ public void beforeBulk(long executionId, BulkRequest request) {
+ LOGGER.debug("Executing bulk request with {} actions", request.numberOfActions());
+ }
+ @Override
+ public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
+ int count = request.numberOfActions();
+ pendingDocs.addAndGet(-count);
+
+ if (response.hasFailures()) {
+ handleBulkFailures(response);
+ }
+ }
+ @Override
+ public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
+ int count = request.numberOfActions();
+ pendingDocs.addAndGet(-count);
+ LOGGER.error("Bulk request failed", failure);
+ }
+ };
+ BulkProcessor.Builder bulkBuilder = BulkProcessor.builder(
+ (request, bulkListener) -> esClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
+ listener,
+ "geaflow-bulk-processor"
+ );
+ // bulk size
+ bulkBuilder.setBulkSize(new ByteSizeValue(bulkSizeMb, ByteSizeUnit.MB));
+ // flush interval
+ bulkBuilder.setFlushInterval(TimeValue.timeValueMillis(flushIntervalMs));
+ // concurrent requests
+ bulkBuilder.setConcurrentRequests(concurrentRequests);
+ // backoff policy
+ if ("true".equalsIgnoreCase(backoffEnabled)) {
+ bulkBuilder.setBackoffPolicy(
+ BackoffPolicy.exponentialBackoff(
+ TimeValue.timeValueMillis(100), 3));
+ }
+ this.bulkProcessor = bulkBuilder.build();
+ }
+
+ @Override
+ public void write(Row row) throws IOException {
+ try{
+ Map document = elasticSearchJsonSerializer.convert((row));
+
+ IndexRequest request = new IndexRequest(indexName)
+ .source(document);
+
+ if (idField != null) {
+ int idFieldIndex = schema.indexOf(idField);
+ if(idFieldIndex >= 0){
+ Object idValue = row.getField(idFieldIndex, schema.getType(idFieldIndex));
+ request.id(String.valueOf(idValue));
+ }
+ }
+ bulkProcessor.add(request);
+ pendingDocs.incrementAndGet();
+ }catch (Exception e){
+ LOGGER.error("Failed to convert row to document", e);
+ throw new IOException("Row conversion error", e);
+ }
+
+
+ }
+
+ @Override
+ public void finish() throws IOException {
+ try {
+ flush();
+
+ // close bulkProcessor and wait for completion
+ if (bulkProcessor != null) {
+ bulkProcessor.awaitClose(30, java.util.concurrent.TimeUnit.SECONDS);
+ }
+
+ // check for failed documents
+ if (!failedDocs.isEmpty()) {
+ LOGGER.error("Failed to index {} documents with errors:", pendingDocs.get());
+ failedDocs.forEach((msg, count) ->
+ LOGGER.error(" - {} occurrences: {}", count, msg)
+ );
+ throw new GeaFlowDSLException(
+ "Failed to index " + failedDocs.values().stream().mapToInt(i->i).sum() +
+ " documents. See logs for details.");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Bulk processor close interrupted", e);
+ }
+ }
+
+ private void flush(){
+ if (bulkProcessor != null) {
+ bulkProcessor.flush();
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ // flush remaining documents
+ flush();
+ if (bulkProcessor != null) {
+ bulkProcessor.close();
+ }
+ if (esClient != null) {
+ esClient.close();
+ }
+ } catch (IOException e) {
+ throw new GeaFlowDSLException("Failed to close Elasticsearch client", e);
+ }
+ }
+
+
+ private void handleBulkFailures(BulkResponse response) {
+ AtomicInteger failureCount = new AtomicInteger();
+ for (BulkItemResponse item : response) {
+ if (item.isFailed()) {
+ failureCount.incrementAndGet();
+ String docId = extractDocId(item);
+ failedDocs.merge(item.getFailureMessage(), 1, Integer::sum);
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Failed to index document {}: {}", docId, item.getFailureMessage());
+ }
+ }
+ }
+ LOGGER.warn("Bulk operation had {} failures out of {} requests",
+ failureCount.get(), response.getItems().length);
+ }
+
+ private String extractDocId(BulkItemResponse item) {
+ return item.getFailure().getId() != null ?
+ item.getFailure().getId() :
+ "unknown";
+ }
+}
\ No newline at end of file
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchTableSource.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchTableSource.java
new file mode 100644
index 000000000..afb201ac7
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchTableSource.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.elasticsearch;
+
+import org.apache.geaflow.dsl.common.exception.GeaFlowDSLException;
+import org.apache.geaflow.dsl.common.types.TableSchema;
+import org.apache.geaflow.dsl.connector.api.AbstractTableSource;
+import org.apache.geaflow.dsl.connector.api.FetchData;
+import org.apache.geaflow.dsl.connector.api.Offset;
+import org.apache.geaflow.dsl.connector.api.Partition;
+import org.apache.geaflow.dsl.connector.api.serde.DeserializerFactory;
+import org.apache.geaflow.dsl.connector.api.serde.TableDeserializer;
+import org.apache.geaflow.dsl.connector.api.window.SizeFetchWindow;
+import org.apache.geaflow.dsl.connector.api.window.TimeFetchWindow;
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.api.context.RuntimeContext;
+import org.apache.geaflow.dsl.connector.elasticsearch.utils.ElasticSearchConnectorUtils;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
+import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.RangeQueryBuilder;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.search.sort.SortOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class ElasticSearchTableSource extends AbstractTableSource {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchTableSource.class);
+
+ private String hosts;
+ private String indexName;
+ private String timestampField;
+ private int maxFetchSize;
+ private int connectTimeout;
+ private int socketTimeout;
+ private int requestTimeout;
+ private String username;
+ private String password;
+
+
+
+ private transient RestHighLevelClient esClient;
+
+ @Override
+ public void init(Configuration conf, TableSchema schema) {
+ this.hosts = conf.getString(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS);
+ this.indexName = conf.getString(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_INDEX);
+ this.timestampField = conf.getString(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_TIMESTAMP_FIELD);
+ this.maxFetchSize = conf.getInteger(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_MAX_FETCH_SIZE);
+
+ this.connectTimeout = conf.getInteger(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_CONNECT_TIMEOUT);
+ this.socketTimeout = conf.getInteger(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_SOCKET_TIMEOUT);
+ this.requestTimeout = conf.getInteger(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_REQUEST_TIMEOUT);
+
+ this.username = conf.getString(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_USERNAME, "");
+ this.password = conf.getString(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_PASSWORD, "");
+
+ }
+
+
+ @Override
+ public void open(RuntimeContext context) {
+ HttpHost[] httpHosts = ElasticSearchConnectorUtils.toHttpHosts(this.hosts.split(","));
+ RestClientBuilder builder= RestClient.builder(httpHosts)
+ .setRequestConfigCallback(
+ requestConfigBuilder -> requestConfigBuilder
+ .setConnectTimeout(connectTimeout)
+ .setSocketTimeout(socketTimeout)
+ .setConnectionRequestTimeout(requestTimeout)
+ );
+
+ if (username != null && !username.isEmpty() && password != null) {
+ final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(AuthScope.ANY,
+ new UsernamePasswordCredentials(username, password));
+
+ builder.setHttpClientConfigCallback(httpClientBuilder ->
+ httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
+ );
+
+ }
+ this.esClient = new RestHighLevelClient(builder);
+ }
+
+ @Override
+ public List listPartitions() {
+ List partitions = new ArrayList<>();
+
+ try {
+ GetSettingsRequest request = new GetSettingsRequest();
+ request.indices(indexName);
+
+ GetSettingsResponse response = esClient.indices().getSettings(request, RequestOptions.DEFAULT);
+
+ int shardCount = Integer.parseInt(response.getIndexToSettings()
+ .get(indexName)
+ .get("index.number_of_shards"));
+ for (int shardId = 0; shardId < shardCount; shardId++) {
+ partitions.add(new ElasticSearchPartition(indexName, shardId));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to fetch Elasticsearch partitions for index: " + indexName, e);
+ }
+
+ return partitions;
+ }
+
+ @Override
+ public TableDeserializer getDeserializer(Configuration conf) {
+ return DeserializerFactory.loadDeserializer(conf);
+ }
+
+ @Override
+ public FetchData fetch(Partition partition, Optional startOffset, TimeFetchWindow window) throws IOException {
+ ElasticSearchPartition esPartition = (ElasticSearchPartition) partition;
+ long windowStartTime = window.getStartWindowTime(0);
+ long windowEndTime = window.getEndWindowTime(0);
+
+ // build range query
+ RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery(timestampField)
+ .gte(windowStartTime)
+ .lt(windowEndTime);
+
+ SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
+ sourceBuilder.query(rangeQuery)
+ .size(maxFetchSize)
+ .sort(timestampField, SortOrder.ASC)
+ .sort("_shard_doc", SortOrder.ASC);
+
+ if (startOffset.isPresent()) {
+ ElasticSearchOffset offset = (ElasticSearchOffset) startOffset.get();
+ sourceBuilder.searchAfter(offset.getSortValues());
+ }
+
+ SearchRequest searchRequest = new SearchRequest(indexName);
+ searchRequest.source(sourceBuilder);
+ if(esPartition.getShardId() >= 0){
+ searchRequest.preference("_shards:" + esPartition.getShardId());
+ }
+
+ SearchResponse response = esClient.search(searchRequest, RequestOptions.DEFAULT);
+ SearchHit[] hits = response.getHits().getHits();
+
+ ProcessedHits processedHits = processSearchHits(hits, startOffset, maxFetchSize);
+
+ return (FetchData) FetchData.createStreamFetch(processedHits.records, processedHits.nextOffset, processedHits.hasMore);
+ }
+
+ @Override
+ public FetchData fetch(Partition partition, Optional startOffset, SizeFetchWindow window) throws IOException {
+ ElasticSearchPartition esPartition = (ElasticSearchPartition) partition;
+ long windowSize = Math.min(window.windowSize(), maxFetchSize);
+ if (window.windowSize() > maxFetchSize) {
+ LOGGER.warn("windowSize {} exceeds maxFetchSize {}, using {}",
+ window.windowSize(), maxFetchSize, maxFetchSize);
+ }
+
+ List records = new ArrayList<>();
+
+ SearchSourceBuilder sourceBuilder = new SearchSourceBuilder()
+ .size((int) windowSize)
+ .sort(timestampField, SortOrder.ASC)
+ .sort("_shard_doc", SortOrder.ASC);
+
+ if (startOffset.isPresent()) {
+ ElasticSearchOffset esOffset = (ElasticSearchOffset) startOffset.get();
+ sourceBuilder.searchAfter(esOffset.getSortValues());
+ }
+
+ SearchRequest searchRequest = new SearchRequest(indexName);
+ searchRequest.source(sourceBuilder);
+ if(esPartition.getShardId() >= 0){
+ searchRequest.preference("_shards:" + esPartition.getShardId());
+ }
+
+ SearchResponse response = esClient.search(searchRequest, RequestOptions.DEFAULT);
+ SearchHit[] hits = response.getHits().getHits();
+ ProcessedHits processedHits = processSearchHits(hits, startOffset, (int) windowSize);
+
+ return (FetchData) FetchData.createStreamFetch(processedHits.records, processedHits.nextOffset, processedHits.hasMore);
+ }
+
+ private static class ProcessedHits {
+ List records = new ArrayList<>();
+ ElasticSearchOffset nextOffset;
+ boolean hasMore;
+ }
+
+ // process search results and nextOffset
+ private ProcessedHits processSearchHits(SearchHit[] hits, Optional startOffset, int requestedSize) {
+ ProcessedHits result = new ProcessedHits();
+
+ for (SearchHit hit : hits) {
+ result.records.add(hit.getSourceAsString());
+ }
+
+ ElasticSearchOffset nextOffset = null;
+ boolean hasMore = false;
+ if (hits.length > 0) {
+ nextOffset = new ElasticSearchOffset(hits[hits.length - 1].getSortValues());
+ hasMore = hits.length == requestedSize;
+ } else {
+ if (startOffset.isPresent()){
+ nextOffset = new ElasticSearchOffset(((ElasticSearchOffset) startOffset.get()).getSortValues());
+ }
+ }
+ result.hasMore = hasMore;
+ result.nextOffset = nextOffset;
+ return result;
+ }
+
+
+ @Override
+ public void close() {
+ try {
+ if(esClient != null){
+ esClient.close();
+ }
+ } catch (IOException e) {
+ throw new GeaFlowDSLException("Failed to close Elasticsearch client", e);
+ }
+ }
+
+ public static class ElasticSearchPartition implements Partition {
+ private final String index;
+ private final int shardId;
+
+ public ElasticSearchPartition(String index, int shardId) {
+ this.index = index;
+ this.shardId = shardId;
+ }
+
+ @Override
+ public String getName() {
+ return index + "-" + shardId;
+ }
+
+ public String getIndex() {
+ return index;
+ }
+
+ public int getShardId() {
+ return shardId;
+ }
+ }
+
+
+ public static class ElasticSearchOffset implements Offset {
+
+ private final Object[] sortValues;
+ private final String sortValuesStr; // for serialization
+
+ public ElasticSearchOffset(Object[] sortValues) {
+ this.sortValues = sortValues;
+ this.sortValuesStr = Arrays.stream(sortValues)
+ .map(Objects::toString)
+ .collect(Collectors.joining(","));
+ }
+ public Object[] getSortValues() {
+ return sortValues;
+ }
+ @Override
+ public long getOffset() {
+ if (sortValues != null && sortValues.length > 0 && sortValues[0] instanceof Number) {
+ return ((Number) sortValues[0]).longValue();
+ }
+ return 0L;
+ }
+ @Override
+ public boolean isTimestamp() {
+ return true;
+ }
+
+ @Override
+ public String humanReadable() {
+ if (sortValues == null || sortValues.length == 0) {
+ return "EmptyOffset";
+ }
+ StringBuilder sb = new StringBuilder("[");
+ for (Object value : sortValues) {
+ if (sb.length() > 1) sb.append(",");
+ sb.append(value);
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ElasticSearchOffset that = (ElasticSearchOffset) o;
+ return Arrays.equals(sortValues, that.sortValues);
+ }
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(sortValues);
+ }
+ }
+}
\ No newline at end of file
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/utils/ElasticSearchConnectorUtils.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/utils/ElasticSearchConnectorUtils.java
new file mode 100644
index 000000000..2329f3f3e
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/utils/ElasticSearchConnectorUtils.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.elasticsearch.utils;
+
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+
+/**
+ * @author wanhanbo
+ * @version ElasticSearchConnectorUtils.java, v 0.1 2025年11月15日 22:14 wanhanbo
+ */
+public class ElasticSearchConnectorUtils {
+
+ public static HttpHost[] toHttpHosts(String[] hostStrings) {
+ HttpHost[] hosts = new HttpHost[hostStrings.length];
+ for (int i = 0; i < hostStrings.length; i++) {
+ String host = hostStrings[i].trim();
+ if (!host.startsWith("http://") && !host.startsWith("https://")) {
+ host = "http://" + host; //default HTTP
+ }
+ hosts[i] = HttpHost.create(host);
+ }
+ return hosts;
+ }
+
+}
\ No newline at end of file
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/utils/ElasticSearchJsonSerializer.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/utils/ElasticSearchJsonSerializer.java
new file mode 100644
index 000000000..17134bcfb
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/utils/ElasticSearchJsonSerializer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.elasticsearch.utils;
+
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.types.StructType;
+
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ElasticSearchJsonSerializer {
+ private final StructType schema;
+ private final String timestampField;
+
+ public ElasticSearchJsonSerializer(StructType schema, String timestampField) {
+ this.schema = schema;
+ this.timestampField = timestampField;
+ }
+
+ public Map convert(Row row) {
+ Map document = new HashMap<>();
+ List fieldNames = schema.getFieldNames();
+ for (int i = 0; i < fieldNames.size(); i++) {
+ String fieldName = fieldNames.get(i);
+ Object fieldValue = row.getField(i, schema.getType(i));
+ document.put(fieldName, fieldValue);
+ }
+
+ // add timestamp field
+ if (!document.containsKey(timestampField)) {
+ document.put(timestampField, System.currentTimeMillis());
+ }
+
+ return document;
+ }
+}
\ No newline at end of file
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/resources/META-INF/services/org.apache.geaflow.dsl.connector.api.TableConnector b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/resources/META-INF/services/org.apache.geaflow.dsl.connector.api.TableConnector
new file mode 100644
index 000000000..d4e71281b
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/resources/META-INF/services/org.apache.geaflow.dsl.connector.api.TableConnector
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+org.apache.geaflow.dsl.connector.elasticsearch.ElasticSearchTableConnector
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/test/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableConnectorTest.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/test/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableConnectorTest.java
new file mode 100644
index 000000000..4dcba39ea
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/test/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableConnectorTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.elasticsearch;
+
+import org.testng.annotations.Test;
+import static org.testng.Assert.*;
+import org.apache.geaflow.common.config.Configuration;
+
+
+public class ElasticsearchTableConnectorTest {
+
+ @Test
+ public void testGetType() {
+ ElasticSearchTableConnector connector = new ElasticSearchTableConnector();
+ assertEquals(connector.getType(), "ELASTICSEARCH");
+ }
+
+ @Test
+ public void testCreateSource() {
+ ElasticSearchTableConnector connector = new ElasticSearchTableConnector();
+ Configuration conf = new Configuration();
+ conf.put(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS, "localhost:9200");
+ conf.put(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_INDEX, "test_index");
+
+ // Test that source can be created without throwing exception
+ assertNotNull(connector.createSource(conf));
+ }
+
+ @Test
+ public void testCreateSink() {
+ ElasticSearchTableConnector connector = new ElasticSearchTableConnector();
+ Configuration conf = new Configuration();
+ conf.put(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS, "localhost:9200");
+ conf.put(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_INDEX, "test_index");
+
+ // Test that sink can be created without throwing exception
+ assertNotNull(connector.createSink(conf));
+ }
+
+ @Test
+ public void testConfigKeys() {
+ // Test that all config keys are properly defined
+ assertNotNull(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS);
+ assertNotNull(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_INDEX);
+ assertNotNull(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_USERNAME);
+ assertNotNull(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_PASSWORD);
+ assertNotNull(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_SOCKET_TIMEOUT);
+ assertNotNull(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_BACKOFF);
+ assertNotNull(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_BULK_SIZE_MB);
+ }
+}
\ No newline at end of file
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/pom.xml b/geaflow/geaflow-dsl/geaflow-dsl-connector/pom.xml
index b7c9821a6..d43e32050 100644
--- a/geaflow/geaflow-dsl/geaflow-dsl-connector/pom.xml
+++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/pom.xml
@@ -47,6 +47,7 @@
geaflow-dsl-connector-pulsar
geaflow-dsl-connector-random
geaflow-dsl-connector-paimon
+ geaflow-dsl-connector-elasticsearch