diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/NativeOozieProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/NativeOozieProcessWorkflowBuilder.java index 78e049de9..cf93db088 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/process/NativeOozieProcessWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/process/NativeOozieProcessWorkflowBuilder.java @@ -34,6 +34,7 @@ import org.apache.falcon.entity.v0.process.Property; import org.apache.falcon.expression.ExpressionHelper; import org.apache.falcon.util.DateUtil; +import org.apache.falcon.util.OozieUtils; import org.apache.falcon.workflow.WorkflowExecutionArgs; import org.apache.hadoop.fs.Path; import org.joda.time.format.DateTimeFormat; @@ -69,7 +70,7 @@ public java.util.Properties build(Cluster cluster, DateUtil.setTimeZone(entity.getTimezone().getID()); ExpressionHelper.setReferenceDate(new Date(getNominalTime().getMillis())); - elProps.putAll(getInputProps(cluster)); + elProps.putAll(getInputProps(cluster, suppliedProps)); elProps.putAll(getOutputProps()); elProps.putAll(evalProperties()); Properties buildProps = build(cluster, buildPath); @@ -144,7 +145,7 @@ private Properties getOutputProps() throws FalconException { return props; } - private Properties getInputProps(Cluster clusterObj) throws FalconException { + private Properties getInputProps(Cluster clusterObj, Properties suppliedProps) throws FalconException { Properties props = new Properties(); if (entity.getInputs() == null) { @@ -168,47 +169,12 @@ private Properties getInputProps(Cluster clusterObj) throws FalconException { falconInputNames.add(input.getName()); falconInputFeedStorageTypes.add(storage.getType().name()); String partition = input.getPartition(); - - String startTimeExp = input.getStart(); - String endTimeExp = input.getEnd(); - ExpressionHelper.setReferenceDate(new Date(getNominalTime().getMillis())); - Date startTime = EXPRESSION_HELPER.evaluate(startTimeExp, Date.class); - Date endTime = EXPRESSION_HELPER.evaluate(endTimeExp, Date.class); - - for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) { - org.apache.falcon.entity.v0.cluster.Cluster clusterEntity = - EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName()); - if (!EntityUtil.responsibleFor(clusterEntity.getColo())) { - continue; - } - - List locations = FeedHelper.getLocations(cluster, feed); - for (Location loc : locations) { - if (loc.getType() != LocationType.DATA) { - continue; - } - List paths = new ArrayList<>(); - List instanceTimes = EntityUtil.getEntityInstanceTimes(feed, cluster.getName(), - startTime, endTime); // test when startTime and endTime are equal. - for (Date instanceTime : instanceTimes) { - String path = EntityUtil.evaluateDependentPath(loc.getPath(), instanceTime); - if (StringUtils.isNotBlank(partition)) { - if (!path.endsWith("/") && !partition.startsWith("/")) { - path = path + "/"; - } - path = path + partition; - } - path = getStoragePath(path); - paths.add(path); - } - if (loc.getType() != LocationType.DATA) { - props.put(input.getName() + "." + loc.getType().toString().toLowerCase(), - StringUtils.join(paths, ",")); - } else { - props.put(input.getName(), StringUtils.join(paths, ",")); - } - falconInputPaths.add(StringUtils.join(paths, ",")); - } + if (suppliedProps!= null + && suppliedProps.containsKey(OozieUtils.FALCON_PROCESS_INPUT_PATHS + "." + input.getName())) { + String paths = suppliedProps.getProperty(OozieUtils.FALCON_PROCESS_INPUT_PATHS + + "." + input.getName()); + List inPaths = getInPaths(paths, partition); + falconInputPaths.addAll(inPaths); } } props.put(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), StringUtils.join(falconInputFeeds, "#")); @@ -219,4 +185,22 @@ private Properties getInputProps(Cluster clusterObj) throws FalconException { return props; } + private String addPartition(String path, String partition) { + if (StringUtils.isNotBlank(partition)) { + if (!path.endsWith("/") && !partition.startsWith("/")) { + path = path + "/"; + } + path = path + partition; + } + return path; + } + + private List getInPaths(String paths, String partition) { + List inPaths = new ArrayList<>(); + for (String path : paths.split(",")) { + path = addPartition(path, partition); + inPaths.add(getStoragePath(path)); + } + return inPaths; + } } diff --git a/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java b/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java index 708788ba7..70c1a4d75 100644 --- a/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java +++ b/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java @@ -40,6 +40,7 @@ * Help methods relating to oozie configuration. */ public final class OozieUtils { + public static final String FALCON_PROCESS_INPUT_PATHS = "falcon.system.inpaths"; public static final JAXBContext WORKFLOW_JAXB_CONTEXT; public static final JAXBContext ACTION_JAXB_CONTEXT; public static final JAXBContext COORD_JAXB_CONTEXT; diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java index 49e112090..fe8088309 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java @@ -39,6 +39,7 @@ import org.apache.falcon.notification.service.impl.DataAvailabilityService; import org.apache.falcon.predicate.Predicate; import org.apache.falcon.state.InstanceID; +import org.apache.falcon.util.OozieUtils; import org.apache.falcon.util.RuntimeProperties; import org.apache.falcon.workflow.engine.DAGEngine; import org.apache.falcon.workflow.engine.DAGEngineFactory; @@ -122,18 +123,22 @@ public void registerForNotifications(boolean isResume) throws FalconException { return; } for (Input input : process.getInputs().getInputs()) { - // Register for notification for every required input - if (input.isOptional()) { - continue; - } Feed feed = ConfigurationStore.get().get(EntityType.FEED, input.getFeed()); String startTimeExp = input.getStart(); String endTimeExp = input.getEnd(); + SchedulerUtil.validateELExpType(startTimeExp, endTimeExp, input.getName()); DateTime processInstanceTime = getInstanceTime(); - expressionHelper.setReferenceDate(new Date(processInstanceTime.getMillis())); - - Date startTime = expressionHelper.evaluate(startTimeExp, Date.class); - Date endTime = expressionHelper.evaluate(endTimeExp, Date.class); + Date startTime = null, endTime = null; + SchedulerUtil.EXPTYPE exptype = SchedulerUtil.getExpType(startTimeExp); + if (exptype == SchedulerUtil.EXPTYPE.ABSOLUTE) { + expressionHelper.setReferenceDate(new Date(processInstanceTime.getMillis())); + startTime = expressionHelper.evaluate(startTimeExp, Date.class); + endTime = expressionHelper.evaluate(endTimeExp, Date.class); + SchedulerUtil.validateStartAndEndTime(startTime, endTime); + } else { + SchedulerUtil.validateStartEndForNonAbsExp(startTimeExp, endTimeExp, input.getName(), + process.getName()); + } for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) { org.apache.falcon.entity.v0.cluster.Cluster clusterEntity = @@ -141,48 +146,96 @@ public void registerForNotifications(boolean isResume) throws FalconException { if (!EntityUtil.responsibleFor(clusterEntity.getColo())) { continue; } - List paths = new ArrayList<>(); - List locations = FeedHelper.getLocations(cluster, feed); - for (Location loc : locations) { - if (loc.getType() != LocationType.DATA) { + + if (exptype == SchedulerUtil.EXPTYPE.ABSOLUTE) { + List paths = getPaths(cluster, feed, startTime, endTime); + Predicate predicate = Predicate.createDataPredicate(paths.size()); + // To ensure we evaluate only predicates not evaluated before when an instance is resumed. + if (isResume && !awaitedPredicates.contains(predicate)) { continue; } - List instanceTimes = EntityUtil.getEntityInstanceTimes(feed, cluster.getName(), - startTime, endTime); - for (Date instanceTime : instanceTimes) { - String path = EntityUtil.evaluateDependentPath(loc.getPath(), instanceTime); - if (feed.getAvailabilityFlag() != null && !feed.getAvailabilityFlag().isEmpty()) { - if (!path.endsWith("/")) { - path = path + "/"; - } - path = path + feed.getAvailabilityFlag(); - } - if (!paths.contains(new Path(path))) { - paths.add(new Path(path)); - } + addDataPredicate(predicate); + DataAvailabilityService.DataRequestBuilder requestBuilder = + (DataAvailabilityService.DataRequestBuilder) + NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.DATA) + .createRequestBuilder(executionService, getId()); + requestBuilder.setLocations(paths) + .setCluster(cluster.getName()) + .setPollingFrequencyInMillis(SchedulerUtil + .getPollingFrequencyinMillis(process.getFrequency())) + .setTimeoutInMillis(getTimeOutInMillis()) + .setLocations(paths) + .setInputName(input.getName()) + .setExpType(exptype) + .setIsOptional(input.isOptional()); + NotificationServicesRegistry.register(requestBuilder.build()); + LOG.info("Registered for a data notification for process {} of instance time {} " + + "for data location {}", process.getName(), getInstanceTime(), + StringUtils.join(paths, ",")); + } else { + int startInstance = Math.abs(SchedulerUtil.getExpInstance(startTimeExp, exptype)); + int endInstance = Math.abs(SchedulerUtil.getExpInstance(endTimeExp, exptype)); + int noOfPaths = Math.abs(endInstance - startInstance) + 1; + Predicate predicate = Predicate.createDataPredicate(noOfPaths); + + // check may be awaiting predicates already contains this + if (isResume && !awaitedPredicates.contains(predicate)) { + continue; } + addDataPredicate(predicate); + DataAvailabilityService.DataRequestBuilder regexDataRequestBuilder = + (DataAvailabilityService.DataRequestBuilder) + NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.DATA) + .createRequestBuilder(executionService, getId()); + Date referenceTime = new Date(System.currentTimeMillis()); // need to configure this. + regexDataRequestBuilder.setExpType(exptype) + .setCluster(cluster.getName()) + .setPollingFrequencyInMillis(SchedulerUtil. + getPollingFrequencyinMillis(process.getFrequency())) + .setTimeoutInMillis(getTimeOutInMillis()) + .setStartInstance(startInstance) + .setEndInstance(endInstance) + .setStartTimeInMillis(SchedulerUtil.getStartTimeInMillis(cluster.getValidity().getStart(), + feed.getFrequency(), process.getTimezone(), referenceTime, exptype)) + .setEndTimeInMillis(SchedulerUtil.getEndTimeInMillis(cluster.getValidity().getStart(), + feed.getFrequency(), process.getTimezone(), referenceTime, exptype, + SchedulerUtil.getExpLimit(startTimeExp, exptype))) + .setBasePath(FeedHelper.getLocation(feed, clusterEntity, LocationType.DATA).getPath()) + .setInputName(input.getName()) + .setFrequencyInMillis(SchedulerUtil.getFrequencyInMillis(feed.getFrequency())) + .setIsOptional(input.isOptional()); + NotificationServicesRegistry.register(regexDataRequestBuilder.build()); + LOG.info("Registered for a data notification for process {} of instance time {} " + + "for expression type {}", process.getName(), getInstanceTime(), exptype); } + } + } + } - Predicate predicate = Predicate.createDataPredicate(paths); - // To ensure we evaluate only predicates not evaluated before when an instance is resumed. - if (isResume && !awaitedPredicates.contains(predicate)) { - continue; + private List getPaths(org.apache.falcon.entity.v0.feed.Cluster cluster, + Feed feed, Date startTime, Date endTime) { + List paths = new ArrayList<>(); + List locations = FeedHelper.getLocations(cluster, feed); + for (Location loc : locations) { + if (loc.getType() != LocationType.DATA) { + continue; + } + List instanceTimes = EntityUtil.getEntityInstanceTimes(feed, cluster.getName(), + startTime, endTime); + for (Date instanceTime : instanceTimes) { + String path = EntityUtil.evaluateDependentPath(loc.getPath(), instanceTime); + if (feed.getAvailabilityFlag() != null && !feed.getAvailabilityFlag().isEmpty()) { + if (!path.endsWith("/")) { + path = path + "/"; + } + path = path + feed.getAvailabilityFlag(); + } + if (!paths.contains(new Path(path))) { + paths.add(new Path(path)); } - addDataPredicate(predicate); - DataAvailabilityService.DataRequestBuilder requestBuilder = - (DataAvailabilityService.DataRequestBuilder) - NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.DATA) - .createRequestBuilder(executionService, getId()); - requestBuilder.setLocations(paths) - .setCluster(cluster.getName()) - .setPollingFrequencyInMillis(SchedulerUtil.getPollingFrequencyinMillis(process.getFrequency())) - .setTimeoutInMillis(getTimeOutInMillis()) - .setLocations(paths); - NotificationServicesRegistry.register(requestBuilder.build()); - LOG.info("Registered for a data notification for process {} of instance time {} for data location {}", - process.getName(), getInstanceTime(), StringUtils.join(paths, ",")); } } + return paths; } @Override @@ -197,9 +250,17 @@ public void onEvent(Event event) throws FalconException { setActualEnd(((JobCompletedEvent)event).getEndTime()); break; case DATA_AVAILABLE: + DataEvent dataEvent = (DataEvent) event; // Data has not become available and the wait time has passed - if (((DataEvent) event).getStatus() == DataEvent.STATUS.UNAVAILABLE) { + if (dataEvent.getStatus() == DataEvent.STATUS.UNAVAILABLE) { hasTimedOut = true; + } else { + String feedName = dataEvent.getInputName(); + if (this.getProperties() == null) { + this.setProperties(new Properties()); + } + this.getProperties().setProperty(OozieUtils.FALCON_PROCESS_INPUT_PATHS + + "." + feedName, StringUtils.join(dataEvent.getDataLocations(), ",")); } // If the event matches any of the awaited predicates, remove the predicate of the awaited list Predicate toRemove = null; diff --git a/scheduler/src/main/java/org/apache/falcon/execution/SchedulerUtil.java b/scheduler/src/main/java/org/apache/falcon/execution/SchedulerUtil.java index 236da11c9..81ab8fc51 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/SchedulerUtil.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/SchedulerUtil.java @@ -17,10 +17,18 @@ */ package org.apache.falcon.execution; +import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.v0.Frequency; +import org.apache.falcon.entity.v0.SchemaHelper; import org.apache.falcon.util.RuntimeProperties; import org.joda.time.DateTime; +import java.util.Arrays; +import java.util.Date; +import java.util.TimeZone; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + /** * Contains utility methods. */ @@ -28,6 +36,8 @@ public final class SchedulerUtil { private static final long MINUTE_IN_MS = 60 * 1000L; private static final long HOUR_IN_MS = 60 * MINUTE_IN_MS; + private static final long DAY_IN_MS = 24 * HOUR_IN_MS; + private static final long MONTH_IN_MS = 30 * HOUR_IN_MS; public static final String MINUTELY_PROCESS_FREQUENCY_POLLING_IN_MILLIS = "falcon.scheduler.minutely.process.polling.frequency.millis"; public static final String HOURLY_PROCESS_FREQUENCY_POLLING_IN_MILLIS = @@ -37,6 +47,19 @@ public final class SchedulerUtil { public static final String MONTHLY_PROCESS_FREQUENCY_POLLING_IN_MILLIS = "falcon.scheduler.monthly.process.polling.frequency.millis"; + private static final Pattern LATEST_PATTERN = Pattern.compile("latest\\(\\s*(-*\\d+)\\)"); + private static final Pattern FUTURE_PATTERN = Pattern.compile("future\\(\\s*(\\d+),\\s*(\\d+)\\)"); + + + /** + * Type of EL Expressions. + */ + public enum EXPTYPE { + ABSOLUTE, + LATEST, + FUTURE + } + private SchedulerUtil(){}; /** @@ -61,6 +84,62 @@ public static long getFrequencyInMillis(DateTime referenceTime, Frequency freque } } + public static long getFrequencyInMillis(Frequency frequency) { + switch (frequency.getTimeUnit()) { + case minutes: + return MINUTE_IN_MS * frequency.getFrequencyAsInt(); + case hours: + return HOUR_IN_MS * frequency.getFrequencyAsInt(); + case days: + return DAY_IN_MS * frequency.getFrequencyAsInt(); + case months: + return MONTH_IN_MS * frequency.getFrequencyAsInt(); + default: + throw new IllegalArgumentException("Invalid time unit " + frequency.getTimeUnit().name()); + } + } + + /** + * + * @param start + * @param frequency + * @param timezone + * @param referenceTime + * @return + */ + public static long getEndTimeInMillis(Date start, Frequency frequency, TimeZone timezone, + Date referenceTime, EXPTYPE exptype, int limit) { + Date startTime = new Date(getStartTimeInMillis(start, frequency, timezone, referenceTime, exptype)); + if (exptype == EXPTYPE.LATEST) { + return start.getTime(); + } else if (exptype == EXPTYPE.FUTURE) { + return EntityUtil.getNextInstanceTime(startTime, frequency, timezone, limit).getTime(); + } else { + throw new IllegalArgumentException("End time cannot be obtained for exp " + exptype); + } + } + + /** + * + * @param start + * @param frequency + * @param timezone + * @param referenceTime + * @param exptype + * @return + */ + public static long getStartTimeInMillis(Date start, Frequency frequency, TimeZone timezone, + Date referenceTime, EXPTYPE exptype) { + if (exptype == EXPTYPE.LATEST) { + return EntityUtil.getPreviousInstanceTime(start, frequency, timezone, referenceTime).getTime(); + } else if (exptype == EXPTYPE.FUTURE) { + return EntityUtil.getNextStartTime(start, frequency, timezone, referenceTime).getTime(); + } else { + throw new IllegalArgumentException("Start time cannot be obtained for exp " + exptype); + } + } + + /** * * @param frequency @@ -85,4 +164,130 @@ public static long getPollingFrequencyinMillis(Frequency frequency) { } } + /** + * Retrives expression type from given feed input expression. + * @param expression + * @return + */ + public static EXPTYPE getExpType(String expression) { + if (expression.contains("latest")) { + return EXPTYPE.LATEST; + } else if (expression.contains("future")) { + return EXPTYPE.FUTURE; + } else { + return EXPTYPE.ABSOLUTE; + } + } + + /** + * Retrieves index inside Latest EL Expression. + * @param expression + * @return + */ + public static int getLatestInstance(String expression) { + Matcher matcher = LATEST_PATTERN.matcher(expression); + if (matcher.find()) { + return Integer.parseInt(matcher.group(1)); + } + throw new IllegalArgumentException("Expression not valid " + expression); + } + + /** + * Retrieves index inside Input Expression based on expression type. + * @param expression + * @param exptype + * @return + */ + public static int getExpInstance(String expression, EXPTYPE exptype) { + if (exptype == EXPTYPE.LATEST) { + return getLatestInstance(expression); + } else if (exptype == EXPTYPE.FUTURE) { + return getFutureExpInstance(expression); + } else { + throw new IllegalArgumentException("Expression not valid " + expression); + } + } + + /** + * Retrieves index inside Future EL Expression. + * @param expression + * @return + */ + public static int getFutureExpInstance(String expression) { + Matcher matcher = FUTURE_PATTERN.matcher(expression); + if (matcher.find()) { + return Integer.parseInt(matcher.group(1)); + } + throw new IllegalArgumentException("Expression not valid " + expression); + } + + public static int getExpLimit(String expression, EXPTYPE exptype) { + if (exptype == EXPTYPE.FUTURE) { + Matcher matcher = FUTURE_PATTERN.matcher(expression); + if (matcher.find()) { + return Integer.parseInt(matcher.group(2)); + } + throw new IllegalArgumentException("Expression not valid " + expression); + } + return 0; + } + + /** + * Validates Start and End Expressions for given input feed. + * @param startTimeExp + * @param endTimeExp + * @param inputName + */ + public static void validateELExpType(String startTimeExp, String endTimeExp, String inputName) { + EXPTYPE startExpType = getExpType(startTimeExp); + EXPTYPE endExpType = getExpType(endTimeExp); + if (startExpType != endExpType) { + throw new IllegalArgumentException("Start exp and End exp for given input: " + inputName + + " should be of same type. Type can be of: " + + Arrays.toString(EXPTYPE.values())); + } + } + + /** + * Validates Start and End Times for given input feed. + * @param startTime + * @param endTime + */ + public static void validateStartAndEndTime(Date startTime, Date endTime) { + if (startTime.after(endTime)) { + throw new IllegalArgumentException("Specified End time " + + SchemaHelper.getDateFormat().format(endTime) + + " is before the start time of given input" + + SchemaHelper.getDateFormat().format(startTime)); + } + } + + /** + * Validates Start and End expressions for latest and future expressions. + * @param startTimeExp + * @param endTimeExp + * @param inputName + * @param processName + */ + public static void validateStartEndForNonAbsExp(String startTimeExp, String endTimeExp, String inputName, + String processName) { + EXPTYPE exptype = getExpType(startTimeExp); // we can take any exptype since both exp's should have same type + int startExpInstance = getExpInstance(startTimeExp, exptype); + int endExpInstance = getExpInstance(endTimeExp, exptype); + if (exptype == EXPTYPE.LATEST) { + if ((startExpInstance > 0) || (endExpInstance > 0) || (startExpInstance > endExpInstance)) { + throw new IllegalArgumentException(" Start el expression and End el expression should not be positive" + + " and start should be before end for expType: " + exptype.name() + "for input: " + inputName + + "of process: " + processName); + } + } else if (exptype == EXPTYPE.FUTURE) { + if ((startExpInstance < 0) || (endExpInstance < 0) || (getExpLimit(startTimeExp, exptype) <=0) + || (startExpInstance > endExpInstance)) { + throw new IllegalArgumentException(" Start el expression and End el expression should not be negative" + + " and start should be before end for expType: " + exptype.name() + "for input: " + inputName + + "of process: " + processName); + } + } + } + } diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/FalconNotificationService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/FalconNotificationService.java index 41d20a832..d28ef5eb4 100644 --- a/scheduler/src/main/java/org/apache/falcon/notification/service/FalconNotificationService.java +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/FalconNotificationService.java @@ -51,6 +51,7 @@ public interface FalconNotificationService extends FalconService { */ RequestBuilder createRequestBuilder(NotificationHandler handler, ID callbackID); + /** * Builder to build appropriate {@link NotificationRequest}. * @param diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java index 083f66c43..39e2bd29a 100644 --- a/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java @@ -31,6 +31,7 @@ public class DataEvent extends Event { private final ID callbackID; private List dataLocations; private STATUS status; + private String inputName; /** * Enumerates the status of data. @@ -40,11 +41,12 @@ public enum STATUS { UNAVAILABLE } - public DataEvent(ID callbackID, List dataLocations, STATUS availability) { + public DataEvent(ID callbackID, List dataLocations, STATUS availability, String inName) { this.callbackID = callbackID; this.dataLocations = dataLocations; this.status = availability; this.type = EventType.DATA_AVAILABLE; + this.inputName = inName; } public STATUS getStatus() { @@ -59,12 +61,12 @@ public List getDataLocations() { return dataLocations; } - public void setDataLocations(List locations) { - this.dataLocations = locations; - } - @Override public ID getTarget() { return callbackID; } + + public String getInputName() { + return inputName; + } } diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java index 1240be9b5..189de6159 100644 --- a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java @@ -17,29 +17,23 @@ */ package org.apache.falcon.notification.service.impl; +import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; -import org.apache.falcon.entity.ClusterHelper; -import org.apache.falcon.entity.EntityUtil; -import org.apache.falcon.entity.v0.Entity; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.exception.NotificationServiceException; import org.apache.falcon.execution.NotificationHandler; -import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.falcon.execution.SchedulerUtil; import org.apache.falcon.notification.service.FalconNotificationService; import org.apache.falcon.notification.service.event.DataEvent; +import org.apache.falcon.notification.service.request.LocationBasedDataNotificationRequest; +import org.apache.falcon.notification.service.request.RegexBasedDataNotificationRequest; import org.apache.falcon.notification.service.request.DataNotificationRequest; import org.apache.falcon.notification.service.request.NotificationRequest; import org.apache.falcon.state.ID; import org.apache.falcon.util.StartupProperties; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -107,6 +101,7 @@ public void destroy() throws FalconException { executorService.shutdown(); } + /** * Builds {@link DataNotificationRequest}. */ @@ -115,11 +110,58 @@ public static class DataRequestBuilder extends RequestBuilder locations; + private long startTimeInMillis; + private long endTimeInMillis; + private int startInstance; + private int endInstance; + private SchedulerUtil.EXPTYPE expType; + private String basePath; + private long frequencyInMillis; + private String inputName; + private boolean isOptional; + public DataRequestBuilder(NotificationHandler handler, ID callbackID) { super(handler, callbackID); } + @Override + public DataNotificationRequest build() { + checkMandatoryArgs(); + if (expType == SchedulerUtil.EXPTYPE.ABSOLUTE) { + checkLocations(); + return new LocationBasedDataNotificationRequest(handler, callbackId, cluster, + pollingFrequencyInMillis, timeoutInMillis, locations, inputName, isOptional); + } else { + checkArgsForRegexRequest(); + return new RegexBasedDataNotificationRequest(handler, callbackId, cluster, + pollingFrequencyInMillis, timeoutInMillis, startTimeInMillis, + endTimeInMillis, startInstance, endInstance, expType, basePath, + frequencyInMillis, inputName, isOptional); + } + } + + private void checkArgsForRegexRequest() { + if (startTimeInMillis <= 0 || endTimeInMillis <=0 || basePath == null || frequencyInMillis <=0) { + throw new IllegalArgumentException("Missing or incorrect, one or more of the mandatory arguments" + + " for Regex Request: startTime, endTime, basepath, frequency"); + } + } + + private void checkLocations() { + if (locations == null) { + throw new IllegalArgumentException("Locations cannot be null for Absolute Expression Request"); + } + } + + private void checkMandatoryArgs() { + if (callbackId == null || cluster == null || pollingFrequencyInMillis <= 0 + || timeoutInMillis < pollingFrequencyInMillis || inputName == null || expType == null) { + throw new IllegalArgumentException("Missing or incorrect, one or more of the mandatory arguments:" + + " callbackId, cluster, pollingFrequency, timeOut, inputName, expType"); + } + } + public DataRequestBuilder setLocations(List locPaths) { Map paths = new HashMap<>(); for (Path loc : locPaths) { @@ -129,18 +171,6 @@ public DataRequestBuilder setLocations(List locPaths) { return this; } - @Override - public DataNotificationRequest build() { - if (callbackId == null || locations == null - || cluster == null || pollingFrequencyInMillis <= 0 - || timeoutInMillis < pollingFrequencyInMillis) { - throw new IllegalArgumentException("Missing or incorrect, one or more of the mandatory arguments:" - + " callbackId, locations, dataType, cluster, pollingFrequency, waitTime"); - } - return new DataNotificationRequest(handler, callbackId, cluster, - pollingFrequencyInMillis, timeoutInMillis, locations); - } - public DataRequestBuilder setCluster(String clusterName) { this.cluster = clusterName; return this; @@ -161,9 +191,82 @@ public DataRequestBuilder setTimeoutInMillis(long timeout) { this.timeoutInMillis = timeout; return this; } + + public DataRequestBuilder setStartTimeInMillis(long startTime) { + if (startTime < 0) { + throw new IllegalArgumentException("StartTime should be greater than zero"); + } + this.startTimeInMillis = startTime; + return this; + } + + public DataRequestBuilder setEndTimeInMillis(long endTime) { + if (endTime < 0) { + throw new IllegalArgumentException("EndTime should be greater than startTime"); + } + this.endTimeInMillis = endTime; + return this; + } + + public DataRequestBuilder setStartInstance(int startInst) { + if (startInst < 0) { + throw new IllegalArgumentException("startInstance should be greater than zero"); + } + this.startInstance = startInst; + return this; + } + + public DataRequestBuilder setEndInstance(int endInst) { + if (endInst < 0) { + throw new IllegalArgumentException("endInstance should be greater than zero"); + } + this.endInstance = endInst; + return this; + } + + public DataRequestBuilder setBasePath(String basePathExp) { + if (StringUtils.isBlank(basePathExp)) { + throw new IllegalArgumentException("base path cannot be null or empty"); + } + this.basePath = basePathExp; + return this; + } + + public DataRequestBuilder setFrequencyInMillis(long freqInMillis) { + if (freqInMillis <= 0) { + throw new IllegalArgumentException("Frequency cannot be less than zero"); + } + this.frequencyInMillis = freqInMillis; + return this; + } + + public DataRequestBuilder setInputName(String inName) { + if (StringUtils.isBlank(inName)) { + throw new IllegalArgumentException("Feed name cannot be null or empty"); + } + this.inputName = inName; + return this; + } + + public DataRequestBuilder setExpType(SchedulerUtil.EXPTYPE exp) { + if (exp == null) { + throw new IllegalArgumentException("exptype cannot be null"); + } + this.expType = exp; + return this; + } + + public DataRequestBuilder setIsOptional(boolean optional) { + this.isOptional = optional; + return this; + } + } + + + private class EventConsumer implements Runnable { public EventConsumer() { @@ -179,10 +282,17 @@ public void run() { if (isUnRegistered) { continue; } - boolean isDataArrived = checkConditions(dataNotificationRequest); + DataServiceHandler dataServiceHandler = getDataServiceHandler(dataNotificationRequest); + boolean isDataArrived = dataServiceHandler.handleRequest(dataNotificationRequest); + if (isDataArrived) { notifyHandler(dataNotificationRequest, DataEvent.STATUS.AVAILABLE); } else { + if (dataNotificationRequest.isOptional()) { + getAvailablePaths(dataNotificationRequest); + notifyHandler(dataNotificationRequest, DataEvent.STATUS.AVAILABLE); + continue; + } if (dataNotificationRequest.isTimedout()) { notifyHandler(dataNotificationRequest, DataEvent.STATUS.UNAVAILABLE); continue; @@ -196,16 +306,42 @@ public void run() { } } + private void getAvailablePaths(DataNotificationRequest dataNotificationRequest) { + Map locPaths = dataNotificationRequest.getLocationMap(); + Map availableLocPaths = new HashMap<>(); + for (Map.Entry entry : locPaths.entrySet()) { + if (entry.getValue()) { + availableLocPaths.put(entry.getKey(), entry.getValue()); + } + } + if (availableLocPaths.isEmpty()) { + // add Empty Directory from clusterHelper + System.out.println("have to modify"); + } + dataNotificationRequest.setLocationMap(availableLocPaths); + } + + private DataServiceHandler getDataServiceHandler(DataNotificationRequest dataNotificationRequest) { + if (dataNotificationRequest instanceof LocationBasedDataNotificationRequest) { + return new PathServiceHandler(); + } else if (dataNotificationRequest instanceof RegexBasedDataNotificationRequest) { + return new RegexServiceHandler(); + } else { + throw new IllegalArgumentException(" Service Handler not defined for request " + + dataNotificationRequest); // add toString to this + } + } + private void notifyHandler(DataNotificationRequest dataNotificationRequest, DataEvent.STATUS status) { DataEvent dataEvent = new DataEvent(dataNotificationRequest.getCallbackId(), - dataNotificationRequest.getLocations(), status); + dataNotificationRequest.getLocations(), status, dataNotificationRequest.getInputName()); boolean isUnRegistered = isUnRegistered(dataNotificationRequest); if (isUnRegistered) { return; } try { - LOG.debug("Notifying Handler for Data Notification Request of id {} " , + LOG.debug("Notifying Handler for Data Notification Request of id {} ", dataNotificationRequest.getCallbackId().toString()); dataNotificationRequest.getHandler().onEvent(dataEvent); } catch (FalconException e) { @@ -224,56 +360,6 @@ private boolean isUnRegistered(DataNotificationRequest dataNotificationRequest) } return false; } - - private boolean checkConditions(DataNotificationRequest dataNotificationRequest) { - try { - Entity entity = EntityUtil.getEntity(EntityType.CLUSTER, dataNotificationRequest.getCluster()); - Cluster clusterEntity = (Cluster) entity; - Configuration conf = ClusterHelper.getConfiguration(clusterEntity); - FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf); - Map locations = dataNotificationRequest.getLocationMap(); - List nonAvailablePaths = getUnAvailablePaths(locations); - updatePathsAvailability(nonAvailablePaths, fs, locations); - if (allPathsExist(locations)) { - return true; - } - } catch (FalconException e) { - LOG.error("Retrieving the Cluster Entity " + e); - } catch (IOException e) { - LOG.error("Unable to connect to FileSystem " + e); - } - return false; - } - - private void updatePathsAvailability(List unAvailablePaths, FileSystem fs, - Map locations) throws IOException { - for (Path path : unAvailablePaths) { - if (fs.exists(path)) { - if (locations.containsKey(path)) { - locations.put(path, true); - } else { - locations.put(new Path(path.toUri().getPath()), true); - } - } - } - } - - private List getUnAvailablePaths(Map locations) { - List paths = new ArrayList<>(); - for (Map.Entry pathInfo : locations.entrySet()) { - if (!pathInfo.getValue()) { - paths.add(pathInfo.getKey()); - } - } - return paths; - } - - private boolean allPathsExist(Map locations) { - if (locations.containsValue(Boolean.FALSE)) { - return false; - } - return true; - } } } diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataServiceHandler.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataServiceHandler.java new file mode 100644 index 000000000..6235e3ff4 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataServiceHandler.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.notification.service.impl; + +import org.apache.falcon.notification.service.request.DataNotificationRequest; + +/** + * Handler based on Data based Requests. + */ +public interface DataServiceHandler { + boolean handleRequest(DataNotificationRequest dataNotificationRequest); +} diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/PathServiceHandler.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/PathServiceHandler.java new file mode 100644 index 000000000..e30820673 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/PathServiceHandler.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.notification.service.impl; + +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.ClusterHelper; +import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.falcon.notification.service.request.DataNotificationRequest; +import org.apache.falcon.notification.service.request.LocationBasedDataNotificationRequest; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Handles Path based Data requests for ABSOLUTE El expressions. + */ +public class PathServiceHandler implements DataServiceHandler { + private static final Logger LOG = LoggerFactory.getLogger(PathServiceHandler.class); + + @Override + public boolean handleRequest(DataNotificationRequest request) { + LocationBasedDataNotificationRequest dataNotificationRequest = null; + if (request instanceof LocationBasedDataNotificationRequest) { + dataNotificationRequest = (LocationBasedDataNotificationRequest)request; + } else { + throw new IllegalArgumentException("Request should be LocationBased for to be processed"); + } + try { + Entity entity = EntityUtil.getEntity(EntityType.CLUSTER, dataNotificationRequest.getCluster()); + Cluster clusterEntity = (Cluster) entity; + Configuration conf = ClusterHelper.getConfiguration(clusterEntity); + FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf); + Map locations = dataNotificationRequest.getLocationMap(); + List nonAvailablePaths = getUnAvailablePaths(locations); + updatePathsAvailability(nonAvailablePaths, fs, locations); + if (allPathsExist(locations)) { + return true; + } + } catch (FalconException e) { + LOG.error("Retrieving the Cluster Entity " + e); + } catch (IOException e) { + LOG.error("Unable to connect to FileSystem " + e); + } + return false; + } + + private void updatePathsAvailability(List unAvailablePaths, FileSystem fs, + Map locations) throws IOException { + for (Path path : unAvailablePaths) { + if (fs.exists(path)) { + if (locations.containsKey(path)) { + locations.put(path, true); + } else { + locations.put(new Path(path.toUri().getPath()), true); + } + } + } + } + + private List getUnAvailablePaths(Map locations) { + List paths = new ArrayList<>(); + for (Map.Entry pathInfo : locations.entrySet()) { + if (!pathInfo.getValue()) { + paths.add(pathInfo.getKey()); + } + } + return paths; + } + + private boolean allPathsExist(Map locations) { + if (locations.containsValue(Boolean.FALSE)) { + return false; + } + return true; + } +} diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/RegexServiceHandler.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/RegexServiceHandler.java new file mode 100644 index 000000000..bb703d862 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/RegexServiceHandler.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.notification.service.impl; + +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.ClusterHelper; +import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.execution.SchedulerUtil; +import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.falcon.notification.service.request.DataNotificationRequest; +import org.apache.falcon.notification.service.request.RegexBasedDataNotificationRequest; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Calendar; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + + +/** + * Handler interface for DataNotificationRequests for Latest and Future EL Exps. + */ +public class RegexServiceHandler implements DataServiceHandler { + private static final Logger LOG = LoggerFactory.getLogger(RegexServiceHandler.class); + + @Override + public boolean handleRequest(DataNotificationRequest request) { + RegexBasedDataNotificationRequest regexBasedDataNotificationRequest; + if (request instanceof RegexBasedDataNotificationRequest) { + regexBasedDataNotificationRequest = (RegexBasedDataNotificationRequest)request; + } else { + throw new IllegalArgumentException("Request should be Regex based to be processed"); + } + try { + Entity entity = EntityUtil.getEntity(EntityType.CLUSTER, regexBasedDataNotificationRequest.getCluster()); + Cluster clusterEntity = (Cluster) entity; + Configuration conf = ClusterHelper.getConfiguration(clusterEntity); + FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf); + return checkPaths(fs, regexBasedDataNotificationRequest); + } catch (FalconException e) { + LOG.error("Retrieving the Cluster Entity " + e); + } catch (IOException e) { + LOG.error("unable to get path"); + } + return false; + } + + private boolean checkPaths(FileSystem fs, RegexBasedDataNotificationRequest regexBasedDataNotificationRequest) + throws IOException { + Date startTime = new Date(regexBasedDataNotificationRequest.getStartTimeinMillis()); + Date endTime = new Date(regexBasedDataNotificationRequest.getEndTimeInMillis()); + Map locations = regexBasedDataNotificationRequest.getLocationMap(); + if (locations == null) { + locations = new HashMap<>(); + } + String basePath = regexBasedDataNotificationRequest.getBasePath(); + int endInstance = regexBasedDataNotificationRequest.getEndInstance(); + int startInstance = regexBasedDataNotificationRequest.getStartInstance(); + int requiredPaths = Math.abs(endInstance - startInstance) + 1; + int availablePaths = 0; + int pathsAdded = 0; + while (shouldRun(startTime, endTime, regexBasedDataNotificationRequest.getExpType()) + && pathsAdded < requiredPaths) { + String pathString = EntityUtil.evaluateDependentPath(basePath, startTime); + Path path = new Path(pathString); + if (fs.exists(path)) { + if (shouldAdd(availablePaths, startInstance, endInstance, + regexBasedDataNotificationRequest.getExpType())) { + if (locations.containsKey(path)) { + locations.put(path, true); + } else { + locations.put(new Path(path.toUri().getPath()), true); + } + pathsAdded++; + } + availablePaths++; + } + Calendar calendar = Calendar.getInstance(); + calendar.setTime(startTime); + int mul = getMultipleConstant(regexBasedDataNotificationRequest.getExpType()); + calendar.add(Calendar.MILLISECOND, (int) (mul * regexBasedDataNotificationRequest.getFrequencyInMillis())); + startTime = calendar.getTime(); + } + regexBasedDataNotificationRequest.setLocationMap(locations); + if (pathsAdded == requiredPaths) { + return true; + } + return false; + } + + private boolean shouldAdd(int availablePaths, int startInstance, int endInstance, SchedulerUtil.EXPTYPE expType) { + if (expType == SchedulerUtil.EXPTYPE.LATEST) { + return (availablePaths >= endInstance && availablePaths <= startInstance); + } + return (availablePaths >= startInstance && availablePaths <= endInstance); + } + + private boolean shouldRun(Date startTime, Date endTime, SchedulerUtil.EXPTYPE exptype) { + if (exptype == SchedulerUtil.EXPTYPE.LATEST) { + return startTime.compareTo(endTime) > 0; + } + return startTime.compareTo(endTime) < 0; + } + + private int getMultipleConstant(SchedulerUtil.EXPTYPE expType) { + if (expType == SchedulerUtil.EXPTYPE.LATEST) { + return -1; + } + return 1; + } + +} diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java index a110e6485..228e71175 100644 --- a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java @@ -306,9 +306,16 @@ public void run() { DAGEngineFactory.getDAGEngine(instance.getCluster()).reRun(instance, props, isForced); } } else { + Properties props = new Properties(); + if (instance.getProperties() != null && !instance.getProperties().isEmpty()) { + props.putAll(instance.getProperties()); + } EntityState entityState = STATE_STORE.getEntity(new EntityID(instance.getEntity())); + if (entityState.getProperties() != null && !entityState.getProperties().isEmpty()) { + props.putAll(entityState.getProperties()); + } externalId = DAGEngineFactory.getDAGEngine(instance.getCluster()) - .run(instance, entityState.getProperties()); + .run(instance, props); } LOG.info("Scheduled job {} for instance {}", externalId, instance.getId()); JobScheduledEvent event = new JobScheduledEvent(instance.getId(), diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java b/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java index 9e2b993c5..ebe889c22 100644 --- a/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java @@ -19,6 +19,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.falcon.execution.NotificationHandler; +import org.apache.falcon.execution.SchedulerUtil; import org.apache.falcon.notification.service.NotificationServicesRegistry; import org.apache.falcon.state.ID; import org.apache.hadoop.fs.Path; @@ -35,15 +36,18 @@ * The setter methods of the class support chaining similar to a builder class. */ public class DataNotificationRequest extends NotificationRequest implements Delayed { - // Boolean represents path availability to avoid checking all paths for every poll. - private Map locations; - private long pollingFrequencyInMillis; - private long timeoutInMillis; - private String cluster; - private long accessTimeInMillis; - private long createdTimeInMillis; + + protected long pollingFrequencyInMillis; + protected long timeoutInMillis; + protected String cluster; + protected long accessTimeInMillis; + protected long createdTimeInMillis; + protected Map locations; + protected String inputName; // Represents request was accessed by DataAvailability service first time or not. private boolean isFirst; + protected SchedulerUtil.EXPTYPE expType; + protected boolean isOptional; /** @@ -63,21 +67,23 @@ public enum INSTANCELIMIT { * @param cluster * @param pollingFrequencyInMillis * @param timeoutInMillis - * @param locations + * @param input + * @param optional */ public DataNotificationRequest(NotificationHandler notifHandler, ID callbackId, String cluster, long pollingFrequencyInMillis, - long timeoutInMillis, Map locations) { + long timeoutInMillis, String input, boolean optional) { this.handler = notifHandler; this.callbackId = callbackId; this.service = NotificationServicesRegistry.SERVICE.DATA; this.cluster = cluster; this.pollingFrequencyInMillis = pollingFrequencyInMillis; this.timeoutInMillis = timeoutInMillis; - this.locations = locations; this.accessTimeInMillis = System.currentTimeMillis(); this.createdTimeInMillis = accessTimeInMillis; this.isFirst = true; + this.inputName = input; + this.isOptional = optional; } @@ -98,7 +104,6 @@ public boolean isTimedout() { return false; } - /** * Obtain list of paths from locations map. * @return List of paths to check. @@ -121,6 +126,12 @@ public Map getLocationMap() { return this.locations; } + + public void setLocationMap(Map locationMap) { + this.locations = locationMap; + } + + @Override public long getDelay(TimeUnit unit) { if (isFirst) { @@ -136,6 +147,20 @@ public int compareTo(Delayed other) { return (int) (this.getDelay(TimeUnit.MILLISECONDS) - other.getDelay(TimeUnit.MILLISECONDS)); } + public String getInputName() { + return inputName; + } + + public SchedulerUtil.EXPTYPE getExpType() { + return expType; + } + + public boolean isOptional() { + return isOptional; + } + + + @Override public boolean equals(Object o) { if (this == o) { @@ -144,7 +169,7 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - DataNotificationRequest that = (DataNotificationRequest) o; + LocationBasedDataNotificationRequest that = (LocationBasedDataNotificationRequest) o; if (!StringUtils.equals(cluster, that.cluster)) { return false; } @@ -170,13 +195,14 @@ public int hashCode() { result = 31 * result + Long.valueOf(pollingFrequencyInMillis).hashCode(); result = 31 * result + Long.valueOf(timeoutInMillis).hashCode(); result = 31 * result + Long.valueOf(createdTimeInMillis).hashCode(); + result = 31 * result + expType.hashCode(); return result; } @Override public String toString() { return "cluster: " + this.getCluster() + " locations: " + this.locations + " createdTime: " - + this.createdTimeInMillis; + + this.createdTimeInMillis + " expType: " + this.expType; } } diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/request/LocationBasedDataNotificationRequest.java b/scheduler/src/main/java/org/apache/falcon/notification/service/request/LocationBasedDataNotificationRequest.java new file mode 100644 index 000000000..7ac44ead0 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/request/LocationBasedDataNotificationRequest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.notification.service.request; + +import org.apache.falcon.execution.NotificationHandler; +import org.apache.falcon.state.ID; +import org.apache.hadoop.fs.Path; + +import java.util.Map; + +/** + * Request intended for {@link import org.apache.falcon.notification.service.impl.DataAvailabilityService} + * for data notifications. + * The setter methods of the class support chaining similar to a builder class. + */ +public class LocationBasedDataNotificationRequest extends DataNotificationRequest { + + //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck + /** + * Constructor. + * @param notifHandler + * @param callbackId + * @param cluster + * @param pollingFrequencyInMillis + * @param timeoutInMillis + * @param locations + * @param inputName + */ + public LocationBasedDataNotificationRequest(NotificationHandler notifHandler, ID callbackId, + String cluster, long pollingFrequencyInMillis, + long timeoutInMillis, Map locations, + String inputName, boolean optional) { + super(notifHandler, callbackId, cluster, pollingFrequencyInMillis, timeoutInMillis, inputName, optional); + this.locations = locations; + } + + //RESUME CHECKSTYLE CHECK ParameterNumberCheck +} diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/request/RegexBasedDataNotificationRequest.java b/scheduler/src/main/java/org/apache/falcon/notification/service/request/RegexBasedDataNotificationRequest.java new file mode 100644 index 000000000..c42464384 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/request/RegexBasedDataNotificationRequest.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.notification.service.request; + +import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.execution.NotificationHandler; +import org.apache.falcon.execution.SchedulerUtil; +import org.apache.falcon.state.ID; + + +/** + * Request intended for {@link import org.apache.falcon.notification.service.impl.DataAvailabilityService} + * for data notifications. + * The setter methods of the class support chaining similar to a builder class. + */ +public class RegexBasedDataNotificationRequest extends DataNotificationRequest { + + + private long startTimeinMillis; + private int startInstance; + private int endInstance; + private long endTimeInMillis; + private String basePath; + private long frequencyInMillis; + + //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck + /** + * Constructor. + * @param notifHandler + * @param callbackId + * @param cluster + * @param pollingFrequencyInMillis + * @param timeoutInMillis + * @param basePath + * @param frequencyInMillis + * @param inputName + */ + public RegexBasedDataNotificationRequest(NotificationHandler notifHandler, ID callbackId, String cluster, + long pollingFrequencyInMillis, long timeoutInMillis, + long startTimeinMillis, long endTimeInMillis, int startInstance, + int endInstance, SchedulerUtil.EXPTYPE expType, String basePath, + long frequencyInMillis, String inputName, boolean optional) { + super(notifHandler, callbackId, cluster, pollingFrequencyInMillis, timeoutInMillis, inputName, optional); + this.startTimeinMillis = startTimeinMillis; + this.endTimeInMillis = endTimeInMillis; + this.startInstance = startInstance; + this.endInstance = endInstance; + this.expType = expType; + this.basePath = basePath; + this.frequencyInMillis = frequencyInMillis; + } + //RESUME CHECKSTYLE CHECK ParameterNumberCheck + + public long getEndTimeInMillis() { + return endTimeInMillis; + } + + public String getBasePath() { + return basePath; + } + + public long getFrequencyInMillis() { + return frequencyInMillis; + } + + public long getStartTimeinMillis() { + return startTimeinMillis; + } + + public int getStartInstance() { + return startInstance; + } + + public int getEndInstance() { + return endInstance; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RegexBasedDataNotificationRequest that = (RegexBasedDataNotificationRequest) o; + if (!StringUtils.equals(cluster, that.cluster)) { + return false; + } + if (pollingFrequencyInMillis != (that.pollingFrequencyInMillis)) { + return false; + } + if (timeoutInMillis != that.timeoutInMillis) { + return false; + } + if (createdTimeInMillis != that.createdTimeInMillis) { + return false; + } + if (expType != that.expType) { + return false; + } + if (startTimeinMillis != that.startTimeinMillis) { + return false; + } + if (endTimeInMillis != that.endTimeInMillis) { + return false; + } + if (!StringUtils.equals(basePath, that.basePath)) { + return false; + } + return true; + } + + @Override + public int hashCode() { + int result = cluster.hashCode(); + result = 31 * result + Long.valueOf(pollingFrequencyInMillis).hashCode(); + result = 31 * result + Long.valueOf(timeoutInMillis).hashCode(); + result = 31 * result + Long.valueOf(createdTimeInMillis).hashCode(); + result = 31 * result + expType.hashCode(); + result = 31 * result + Long.valueOf(startTimeinMillis).hashCode(); + result = 31 * result + Long.valueOf(endTimeInMillis).hashCode(); + result = 31 * result + basePath.hashCode(); + result = 31 * result + Long.valueOf(startInstance).hashCode(); + result = 31 * result + Long.valueOf(endInstance).hashCode(); + result = 31 * result + Long.valueOf(frequencyInMillis).hashCode(); + return result; + } + + @Override + public String toString() { + return "cluster: " + this.getCluster() + " expType: " + this.expType + " createdTime: " + + this.createdTimeInMillis + " basePath: " + this.basePath + " pollingFrequencyInMillis: " + + this.pollingFrequencyInMillis + " frequencyInMillis: " + this.frequencyInMillis; + } + +} diff --git a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java index 93dcb123a..b83e02a0e 100644 --- a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java +++ b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java @@ -17,7 +17,6 @@ */ package org.apache.falcon.predicate; -import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.execution.NotificationHandler; import org.apache.falcon.notification.service.event.DataEvent; @@ -26,7 +25,6 @@ import org.apache.falcon.notification.service.event.RerunEvent; import org.apache.falcon.notification.service.event.TimeElapsedEvent; import org.apache.falcon.state.ID; -import org.apache.hadoop.fs.Path; import java.io.Serializable; import java.util.Collections; @@ -42,6 +40,8 @@ */ public class Predicate implements Serializable { + + /** * Type of predicate, currently data and time are supported. */ @@ -159,16 +159,14 @@ public static Predicate createTimePredicate(long start, long end, long instanceT /** * Creates a predicate of type DATA. * - * @param paths List of paths to check + * @param noOfPaths Lengths of paths to check * @return */ - public static Predicate createDataPredicate(List paths) { - Collections.sort(paths); + public static Predicate createDataPredicate(int noOfPaths) { return new Predicate(TYPE.DATA) - .addClause("path", StringUtils.join(paths, ",")); + .addClause("numpaths", noOfPaths); } - /** * Creates a predicate of type JOB_COMPLETION. * @@ -205,7 +203,7 @@ public static Predicate getPredicate(Event event) throws FalconException { if (event.getType() == EventType.DATA_AVAILABLE) { DataEvent dataEvent = (DataEvent) event; if (dataEvent.getDataLocations() != null) { - return createDataPredicate(dataEvent.getDataLocations()); + return createDataPredicate(dataEvent.getDataLocations().size()); } else { throw new FalconException("Event does not have enough data to create a predicate"); } diff --git a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java index 89709dc48..2f66d3fbc 100644 --- a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java +++ b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java @@ -606,7 +606,7 @@ private Event createEvent(NotificationServicesRegistry.SERVICE type, Process pro case DATA: DataEvent dataEvent = new DataEvent(id, new ArrayList(Arrays.asList(new Path("/projects/falcon/clicks"))), - DataEvent.STATUS.AVAILABLE); + DataEvent.STATUS.AVAILABLE, "in"); return dataEvent; default: return null; @@ -620,7 +620,7 @@ private Event createEvent(NotificationServicesRegistry.SERVICE type, case DATA: DataEvent dataEvent = new DataEvent(id, new ArrayList(Arrays.asList(new Path("/projects/falcon/clicks/_SUCCESS"))), - DataEvent.STATUS.AVAILABLE); + DataEvent.STATUS.AVAILABLE, "in"); return dataEvent; case JOB_SCHEDULE: JobScheduledEvent scheduledEvent = new JobScheduledEvent(id, JobScheduledEvent.STATUS.SUCCESSFUL); diff --git a/scheduler/src/test/java/org/apache/falcon/execution/SchedulerUtilTest.java b/scheduler/src/test/java/org/apache/falcon/execution/SchedulerUtilTest.java index 768be7a3c..1a0a19b3a 100644 --- a/scheduler/src/test/java/org/apache/falcon/execution/SchedulerUtilTest.java +++ b/scheduler/src/test/java/org/apache/falcon/execution/SchedulerUtilTest.java @@ -47,4 +47,22 @@ public Object[][] getTestFrequencies() { {formatter.parseDateTime("02/10/2015 03:30:00 +0530"), new Frequency("months(2)"), (31+30)*24*60*60*1000L}, }; } + + @Test + public void testLatestExp() { + String exp = "latest( -2)"; + Assert.assertEquals(SchedulerUtil.getLatestInstance(exp), -2); + exp = "latest(10)"; + Assert.assertEquals(SchedulerUtil.getLatestInstance(exp), 10); + } + + @Test + public void testFutureExp() { + String exp = "future(0,25)"; + Assert.assertEquals(SchedulerUtil.getFutureExpInstance(exp), 0); + Assert.assertEquals(SchedulerUtil.getExpLimit(exp, SchedulerUtil.EXPTYPE.FUTURE), 25); + exp = "future(10, 20)"; + Assert.assertEquals(SchedulerUtil.getFutureExpInstance(exp), 10); + Assert.assertEquals(SchedulerUtil.getExpLimit(exp, SchedulerUtil.EXPTYPE.FUTURE), 20); + } } diff --git a/scheduler/src/test/java/org/apache/falcon/notification/service/DataAvailabilityServiceTest.java b/scheduler/src/test/java/org/apache/falcon/notification/service/DataAvailabilityServiceTest.java index 20c99b583..ddf992681 100644 --- a/scheduler/src/test/java/org/apache/falcon/notification/service/DataAvailabilityServiceTest.java +++ b/scheduler/src/test/java/org/apache/falcon/notification/service/DataAvailabilityServiceTest.java @@ -23,6 +23,7 @@ import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.execution.NotificationHandler; +import org.apache.falcon.execution.SchedulerUtil; import org.apache.falcon.notification.service.event.DataEvent; import org.apache.falcon.notification.service.impl.DataAvailabilityService; import org.apache.falcon.notification.service.request.DataNotificationRequest; @@ -128,7 +129,8 @@ private DataNotificationRequest getDataNotificationRequest(List locations, DataAvailabilityService.DataRequestBuilder dataRequestBuilder = new DataAvailabilityService.DataRequestBuilder(handler, id); dataRequestBuilder.setPollingFrequencyInMillis(20).setCluster("testCluster") - .setTimeoutInMillis(100).setLocations(locations); + .setTimeoutInMillis(100).setLocations(locations).setExpType(SchedulerUtil.EXPTYPE.ABSOLUTE) + .setInputName("input"); return dataRequestBuilder.build(); } diff --git a/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java index c5425b206..8c07fb974 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java +++ b/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java @@ -58,6 +58,7 @@ public class AbstractSchedulerManagerJerseyIT extends FalconUnitTestBase { public static final String PROCESS_TEMPLATE = "/local-process-noinputs-template.xml"; public static final String PROCESS_TEMPLATE_NOLATE_DATA = "/process-nolatedata-template.xml"; + public static final String PROCESS_TEMPLATE_NOLATE_DATA_LATEST = "/process-nolatedata-latest-template.xml"; public static final String PROCESS_NAME = "processName"; protected static final String START_INSTANCE = "2012-04-20T00:00Z"; private static FalconJPAService falconJPAService = FalconJPAService.get(); diff --git a/webapp/src/test/java/org/apache/falcon/resource/AbstractTestContext.java b/webapp/src/test/java/org/apache/falcon/resource/AbstractTestContext.java index ed273069f..ebd0e2c78 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/AbstractTestContext.java +++ b/webapp/src/test/java/org/apache/falcon/resource/AbstractTestContext.java @@ -36,6 +36,8 @@ public abstract class AbstractTestContext { public static final String FEED_TEMPLATE5 = "/feed-template5.xml"; public static final String FEED_EXPORT_TEMPLATE6 = "/feed-export-template6.xml"; public static final String PROCESS_TEMPLATE = "/process-template.xml"; + public static final String FEED_TEMPLATE6 = "/feed-template6.xml"; + public static final String FEED_TEMPLATE7 = "/feed-template7.xml"; protected static void mkdir(FileSystem fileSystem, Path path) throws Exception { if (!fileSystem.exists(path) && !fileSystem.mkdirs(path)) { diff --git a/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java index 6d6d40b49..981f0d956 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java +++ b/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java @@ -196,4 +196,33 @@ public void testProcessWithInputs() throws Exception { processName, START_INSTANCE); Assert.assertEquals(status, InstancesResult.WorkflowStatus.SUCCEEDED); } + + @Test + public void testProcessWithInputsLatest() throws Exception { + UnitTestContext context = new UnitTestContext(); + Map overlay = context.getUniqueOverlay(); + String colo = overlay.get(COLO); + String cluster = overlay.get(CLUSTER); + + submitCluster(colo, cluster, null); + String tmpFile = TestContext.overlayParametersOverTemplate(UnitTestContext.FEED_TEMPLATE6, overlay); + APIResult result = falconUnitClient.submit(EntityType.FEED.name(), tmpFile, null); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); + tmpFile = TestContext.overlayParametersOverTemplate(UnitTestContext.FEED_TEMPLATE7, overlay); + result = falconUnitClient.submit(EntityType.FEED.name(), tmpFile, null); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); + context.prepare(HELLO_WORLD_WORKFLOW); + + submitProcess(PROCESS_TEMPLATE_NOLATE_DATA_LATEST, overlay); + + String processName = overlay.get(PROCESS_NAME); + scheduleProcess(processName, cluster, "2016-03-20T00:00Z", 1); + + waitForStatus(EntityType.PROCESS.toString(), processName, + "2016-03-20T00:00Z", InstancesResult.WorkflowStatus.SUCCEEDED); + + InstancesResult.WorkflowStatus status = getClient().getInstanceStatus(EntityType.PROCESS.name(), + processName, "2016-03-20T00:00Z"); + Assert.assertEquals(status, InstancesResult.WorkflowStatus.SUCCEEDED); + } } diff --git a/webapp/src/test/java/org/apache/falcon/resource/UnitTestContext.java b/webapp/src/test/java/org/apache/falcon/resource/UnitTestContext.java index 2eb2e69e6..b56629b01 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/UnitTestContext.java +++ b/webapp/src/test/java/org/apache/falcon/resource/UnitTestContext.java @@ -88,6 +88,12 @@ protected void prepare(String workflow) throws Exception { new Path(wfPath, "workflow.xml")); mkdir(fs, new Path(wfParent, "input/2012/04/20/00")); mkdir(fs, new Path(wfParent, "input/2012/04/21/00")); + mkdir(fs, new Path(wfParent, "input/2016/03/20/00")); + mkdir(fs, new Path(wfParent, "input/2016/03/18/00")); + mkdir(fs, new Path(wfParent, "input/2016/03/16/00")); + mkdir(fs, new Path(wfParent, "input/2016/03/31/20")); + mkdir(fs, new Path(wfParent, "input/2016/03/31/12")); + mkdir(fs, new Path(wfParent, "input/2016/04/01/00")); Path outPath = new Path(wfParent, "output"); mkdir(fs, outPath, new FsPermission((short) 511)); } diff --git a/webapp/src/test/resources/feed-template6.xml b/webapp/src/test/resources/feed-template6.xml new file mode 100644 index 000000000..f98d0b3a7 --- /dev/null +++ b/webapp/src/test/resources/feed-template6.xml @@ -0,0 +1,45 @@ + + + + + + + input + + hours(1) + UTC + + + + + + + + + + + + + + + + + + + diff --git a/webapp/src/test/resources/feed-template7.xml b/webapp/src/test/resources/feed-template7.xml new file mode 100644 index 000000000..4a23820f6 --- /dev/null +++ b/webapp/src/test/resources/feed-template7.xml @@ -0,0 +1,46 @@ + + + + output + + days(1) + UTC + + + + + + + + + + + + + + + + + + + + + + + diff --git a/webapp/src/test/resources/process-nolatedata-latest-template.xml b/webapp/src/test/resources/process-nolatedata-latest-template.xml new file mode 100644 index 000000000..816b56d5a --- /dev/null +++ b/webapp/src/test/resources/process-nolatedata-latest-template.xml @@ -0,0 +1,49 @@ + + + + + consumer=consumer@xyz.com, owner=producer@xyz.com, department=forecasting + testPipeline,dataReplicationPipeline + + + + + + + 1 + FIFO + days(1) + UTC + + + + + + + + + + + + + + + + +