diff --git a/pom.xml b/pom.xml index de0bcc9..912e947 100644 --- a/pom.xml +++ b/pom.xml @@ -399,6 +399,11 @@ ${wiremock.version} test + + org.apache.beam + beam-sdks-java-io-sparkreceiver + 2.41.0 + diff --git a/src/main/java/io/cdap/plugin/hubspot/common/HubspotPagesIterator.java b/src/main/java/io/cdap/plugin/hubspot/common/HubspotPagesIterator.java index 696260f..7e6a334 100644 --- a/src/main/java/io/cdap/plugin/hubspot/common/HubspotPagesIterator.java +++ b/src/main/java/io/cdap/plugin/hubspot/common/HubspotPagesIterator.java @@ -28,7 +28,7 @@ public class HubspotPagesIterator implements Iterator { private HubspotPage currentPage; private Iterator currentPageIterator; private int iteratorPosition = 0; - private String currentPageOffset = null; + private String currentPageOffset; /** * Constructor for HubspotPagesIterator object. @@ -43,8 +43,12 @@ public HubspotPagesIterator(SourceHubspotConfig config, HubspotPage currentPage, this.currentPageOffset = currentPageOffset; } + public HubspotPagesIterator(SourceHubspotConfig config, String offset) throws IOException { + this(config, new HubspotHelper().getHubspotPage(config, offset), offset); + } + public HubspotPagesIterator(SourceHubspotConfig config) throws IOException { - this(config, new HubspotHelper().getHubspotPage(config, null), null); + this(config, null); } /** @@ -91,6 +95,10 @@ public int getIteratorPosition() { return iteratorPosition; } + public HubspotPage getCurrentPage() { + return currentPage; + } + /** * Here, just set the position of iteration. * @param iteratorPosition the iterator position diff --git a/src/main/java/io/cdap/plugin/hubspot/source/streaming/HubspotReceiver.java b/src/main/java/io/cdap/plugin/hubspot/source/streaming/HubspotReceiver.java index b5bb661..480cb2b 100644 --- a/src/main/java/io/cdap/plugin/hubspot/source/streaming/HubspotReceiver.java +++ b/src/main/java/io/cdap/plugin/hubspot/source/streaming/HubspotReceiver.java @@ -20,6 +20,7 @@ import io.cdap.plugin.hubspot.common.HubspotHelper; import io.cdap.plugin.hubspot.common.HubspotPage; import io.cdap.plugin.hubspot.common.HubspotPagesIterator; +import org.apache.beam.sdk.io.sparkreceiver.HasOffset; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.receiver.Receiver; import org.slf4j.Logger; @@ -33,10 +34,12 @@ /** * Implementation of Spark receiver to receive Salesforce push topic events. */ -public class HubspotReceiver extends Receiver { +public class HubspotReceiver extends Receiver implements HasOffset { private static final Logger LOG = LoggerFactory.getLogger(HubspotReceiver.class); private static final String RECEIVER_THREAD_NAME = "hubspot_api_listener"; private final HubspotStreamingSourceConfig config; + private String startOffset = null; + private Long endOffset = Long.MAX_VALUE; HubspotReceiver(HubspotStreamingSourceConfig config) throws IOException { super(StorageLevel.MEMORY_AND_DISK_2()); @@ -44,10 +47,23 @@ public class HubspotReceiver extends Receiver { } @Override + public void setStartOffset(Long startOffset) { + if (startOffset != null) { + // startOffset - 1, because offset should be inclusive. + this.startOffset = String.valueOf(startOffset == 0L ? 0 : startOffset - 1); + } + } + + public HubspotStreamingSourceConfig getConfig() { + return config; + } + + @Override + @SuppressWarnings("FutureReturnValueIgnored") public void onStart() { ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() - .setNameFormat(RECEIVER_THREAD_NAME + "-%d") - .build(); + .setNameFormat(RECEIVER_THREAD_NAME + "-%d") + .build(); Executors.newSingleThreadExecutor(namedThreadFactory).submit(this::receive); } @@ -58,13 +74,21 @@ public void onStop() { // is designed to stop by itself if isStopped() returns false } + @Override + public Long getEndOffset() { + return endOffset; + } + private void receive() { try { - HubspotPagesIterator hubspotPagesIterator = new HubspotPagesIterator(config); + HubspotPagesIterator hubspotPagesIterator = new HubspotPagesIterator(config, startOffset); while (!isStopped()) { + this.endOffset = Long.parseLong(hubspotPagesIterator.getCurrentPage().getOffset()); if (hubspotPagesIterator.hasNext()) { - store(hubspotPagesIterator.next().toString()); + if (!isStopped()) { + store(hubspotPagesIterator.next().toString()); + } } else { Integer minutesToSleep = config.getPullFrequency().getMinutesValue(); LOG.debug(String.format("Waiting for '%d' minutes to pull.", minutesToSleep)); @@ -72,17 +96,17 @@ private void receive() { // reload current page HubspotPage currentPage = new HubspotHelper().getHubspotPage(config, - hubspotPagesIterator.getCurrentPageOffset()); + hubspotPagesIterator.getCurrentPageOffset()); int iteratorPosition = hubspotPagesIterator.getIteratorPosition(); hubspotPagesIterator = new HubspotPagesIterator(config, currentPage, - hubspotPagesIterator.getCurrentPageOffset()); + hubspotPagesIterator.getCurrentPageOffset()); hubspotPagesIterator.setIteratorPosition(iteratorPosition); } } } catch (Exception e) { String errorMessage = "Exception while receiving messages from hubspot"; - /* TO DO https://issues.cask.co/browse/PLUGIN-357 + /* TODO https://issues.cask.co/browse/PLUGIN-357 The receiver will get terminated on error and stop receiving messages. Retry Logic needs to be implemented. */