diff --git a/pom.xml b/pom.xml
index de0bcc9..912e947 100644
--- a/pom.xml
+++ b/pom.xml
@@ -399,6 +399,11 @@
${wiremock.version}
test
+
+ org.apache.beam
+ beam-sdks-java-io-sparkreceiver
+ 2.41.0
+
diff --git a/src/main/java/io/cdap/plugin/hubspot/common/HubspotPagesIterator.java b/src/main/java/io/cdap/plugin/hubspot/common/HubspotPagesIterator.java
index 696260f..7e6a334 100644
--- a/src/main/java/io/cdap/plugin/hubspot/common/HubspotPagesIterator.java
+++ b/src/main/java/io/cdap/plugin/hubspot/common/HubspotPagesIterator.java
@@ -28,7 +28,7 @@ public class HubspotPagesIterator implements Iterator {
private HubspotPage currentPage;
private Iterator currentPageIterator;
private int iteratorPosition = 0;
- private String currentPageOffset = null;
+ private String currentPageOffset;
/**
* Constructor for HubspotPagesIterator object.
@@ -43,8 +43,12 @@ public HubspotPagesIterator(SourceHubspotConfig config, HubspotPage currentPage,
this.currentPageOffset = currentPageOffset;
}
+ public HubspotPagesIterator(SourceHubspotConfig config, String offset) throws IOException {
+ this(config, new HubspotHelper().getHubspotPage(config, offset), offset);
+ }
+
public HubspotPagesIterator(SourceHubspotConfig config) throws IOException {
- this(config, new HubspotHelper().getHubspotPage(config, null), null);
+ this(config, null);
}
/**
@@ -91,6 +95,10 @@ public int getIteratorPosition() {
return iteratorPosition;
}
+ public HubspotPage getCurrentPage() {
+ return currentPage;
+ }
+
/**
* Here, just set the position of iteration.
* @param iteratorPosition the iterator position
diff --git a/src/main/java/io/cdap/plugin/hubspot/source/streaming/HubspotReceiver.java b/src/main/java/io/cdap/plugin/hubspot/source/streaming/HubspotReceiver.java
index b5bb661..480cb2b 100644
--- a/src/main/java/io/cdap/plugin/hubspot/source/streaming/HubspotReceiver.java
+++ b/src/main/java/io/cdap/plugin/hubspot/source/streaming/HubspotReceiver.java
@@ -20,6 +20,7 @@
import io.cdap.plugin.hubspot.common.HubspotHelper;
import io.cdap.plugin.hubspot.common.HubspotPage;
import io.cdap.plugin.hubspot.common.HubspotPagesIterator;
+import org.apache.beam.sdk.io.sparkreceiver.HasOffset;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;
import org.slf4j.Logger;
@@ -33,10 +34,12 @@
/**
* Implementation of Spark receiver to receive Salesforce push topic events.
*/
-public class HubspotReceiver extends Receiver {
+public class HubspotReceiver extends Receiver implements HasOffset {
private static final Logger LOG = LoggerFactory.getLogger(HubspotReceiver.class);
private static final String RECEIVER_THREAD_NAME = "hubspot_api_listener";
private final HubspotStreamingSourceConfig config;
+ private String startOffset = null;
+ private Long endOffset = Long.MAX_VALUE;
HubspotReceiver(HubspotStreamingSourceConfig config) throws IOException {
super(StorageLevel.MEMORY_AND_DISK_2());
@@ -44,10 +47,23 @@ public class HubspotReceiver extends Receiver {
}
@Override
+ public void setStartOffset(Long startOffset) {
+ if (startOffset != null) {
+ // startOffset - 1, because offset should be inclusive.
+ this.startOffset = String.valueOf(startOffset == 0L ? 0 : startOffset - 1);
+ }
+ }
+
+ public HubspotStreamingSourceConfig getConfig() {
+ return config;
+ }
+
+ @Override
+ @SuppressWarnings("FutureReturnValueIgnored")
public void onStart() {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
- .setNameFormat(RECEIVER_THREAD_NAME + "-%d")
- .build();
+ .setNameFormat(RECEIVER_THREAD_NAME + "-%d")
+ .build();
Executors.newSingleThreadExecutor(namedThreadFactory).submit(this::receive);
}
@@ -58,13 +74,21 @@ public void onStop() {
// is designed to stop by itself if isStopped() returns false
}
+ @Override
+ public Long getEndOffset() {
+ return endOffset;
+ }
+
private void receive() {
try {
- HubspotPagesIterator hubspotPagesIterator = new HubspotPagesIterator(config);
+ HubspotPagesIterator hubspotPagesIterator = new HubspotPagesIterator(config, startOffset);
while (!isStopped()) {
+ this.endOffset = Long.parseLong(hubspotPagesIterator.getCurrentPage().getOffset());
if (hubspotPagesIterator.hasNext()) {
- store(hubspotPagesIterator.next().toString());
+ if (!isStopped()) {
+ store(hubspotPagesIterator.next().toString());
+ }
} else {
Integer minutesToSleep = config.getPullFrequency().getMinutesValue();
LOG.debug(String.format("Waiting for '%d' minutes to pull.", minutesToSleep));
@@ -72,17 +96,17 @@ private void receive() {
// reload current page
HubspotPage currentPage = new HubspotHelper().getHubspotPage(config,
- hubspotPagesIterator.getCurrentPageOffset());
+ hubspotPagesIterator.getCurrentPageOffset());
int iteratorPosition = hubspotPagesIterator.getIteratorPosition();
hubspotPagesIterator = new HubspotPagesIterator(config, currentPage,
- hubspotPagesIterator.getCurrentPageOffset());
+ hubspotPagesIterator.getCurrentPageOffset());
hubspotPagesIterator.setIteratorPosition(iteratorPosition);
}
}
} catch (Exception e) {
String errorMessage = "Exception while receiving messages from hubspot";
- /* TO DO https://issues.cask.co/browse/PLUGIN-357
+ /* TODO https://issues.cask.co/browse/PLUGIN-357
The receiver will get terminated on error and stop receiving messages.
Retry Logic needs to be implemented.
*/