diff --git a/common-types/src/main/java/org/apache/falcon/LifeCycle.java b/common-types/src/main/java/org/apache/falcon/LifeCycle.java
index 0ecddd1ea..766a9612a 100644
--- a/common-types/src/main/java/org/apache/falcon/LifeCycle.java
+++ b/common-types/src/main/java/org/apache/falcon/LifeCycle.java
@@ -27,7 +27,8 @@ public enum LifeCycle {
EVICTION(Tag.RETENTION),
REPLICATION(Tag.REPLICATION),
IMPORT(Tag.IMPORT),
- EXPORT(Tag.EXPORT);
+ EXPORT(Tag.EXPORT),
+ ARCHIVAL(Tag.ARCHIVAL);
private final Tag tag;
diff --git a/common-types/src/main/java/org/apache/falcon/Tag.java b/common-types/src/main/java/org/apache/falcon/Tag.java
index 3c2f25e20..d2efaae9a 100644
--- a/common-types/src/main/java/org/apache/falcon/Tag.java
+++ b/common-types/src/main/java/org/apache/falcon/Tag.java
@@ -28,7 +28,8 @@ public enum Tag {
RETENTION(EntityType.FEED),
REPLICATION(EntityType.FEED),
IMPORT(EntityType.FEED),
- EXPORT(EntityType.FEED);
+ EXPORT(EntityType.FEED),
+ ARCHIVAL(EntityType.FEED);
private final EntityType entityType;
diff --git a/common-types/src/main/resources/feed-0.1.xsd b/common-types/src/main/resources/feed-0.1.xsd
index cbc97b970..2be7cf22d 100644
--- a/common-types/src/main/resources/feed-0.1.xsd
+++ b/common-types/src/main/resources/feed-0.1.xsd
@@ -363,7 +363,8 @@
-
+
+
@@ -575,4 +576,22 @@
+
+
+
+
+ Archival stage is the new way to define archival for a feed using feed lifecycle feature. Archival
+ has a configurable policy which does the validation and the real execution through workflow engine.
+ This method of specifying archival gives you more control for archival rather than defining archival
+ source and target in replication lifecycle stage of feed.
+
+
+
+
+
+
+
+
+
+
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index 757359f9c..44cc3a4c6 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -27,22 +27,7 @@
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.datasource.DatasourceType;
-import org.apache.falcon.entity.v0.feed.CatalogTable;
-import org.apache.falcon.entity.v0.feed.Cluster;
-import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.entity.v0.feed.ExtractMethod;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.FieldIncludeExclude;
-import org.apache.falcon.entity.v0.feed.Lifecycle;
-import org.apache.falcon.entity.v0.feed.Load;
-import org.apache.falcon.entity.v0.feed.Location;
-import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.entity.v0.feed.Locations;
-import org.apache.falcon.entity.v0.feed.MergeType;
-import org.apache.falcon.entity.v0.feed.Property;
-import org.apache.falcon.entity.v0.feed.RetentionStage;
-import org.apache.falcon.entity.v0.feed.Sla;
-import org.apache.falcon.entity.v0.feed.Validity;
+import org.apache.falcon.entity.v0.feed.*;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Process;
@@ -401,13 +386,27 @@ public static RetentionStage getRetentionStage(Feed feed, String clusterName) th
if (clusterLifecycle != null && clusterLifecycle.getRetentionStage() != null) {
return clusterLifecycle.getRetentionStage();
- } else if (globalLifecycle != null) {
+ } else if (globalLifecycle != null && globalLifecycle.getRetentionStage() != null) {
return globalLifecycle.getRetentionStage();
}
}
return null;
}
+ public static ArchivalStage getArchivalStage(Feed feed, String clusterName) throws FalconException {
+ if (isLifecycleEnabled(feed, clusterName)) {
+ Lifecycle globalLifecycle = feed.getLifecycle();
+ Lifecycle clusterLifecycle = getCluster(feed, clusterName).getLifecycle();
+
+ if (clusterLifecycle != null && clusterLifecycle.getArchivalStage() != null) {
+ return clusterLifecycle.getArchivalStage();
+ } else if (globalLifecycle != null && globalLifecycle.getArchivalStage() != null) {
+ return globalLifecycle.getArchivalStage();
+ }
+ }
+ return null;
+ }
+
public static Date getFeedValidityStart(Feed feed, String clusterName) throws FalconException {
org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, clusterName);
if (feedCluster != null) {
@@ -437,10 +436,20 @@ public static List getPolicies(Feed feed, String clusterName) throws Fal
Cluster cluster = getCluster(feed, clusterName);
if (cluster != null) {
if (isLifecycleEnabled(feed, clusterName)) {
- String policy = getRetentionStage(feed, clusterName).getPolicy();
- policy = StringUtils.isBlank(policy)
- ? FeedLifecycleStage.RETENTION.getDefaultPolicyName() : policy;
- result.add(policy);
+ String policy = "";
+ if (getRetentionStage(feed, clusterName) != null) {
+ policy = getRetentionStage(feed, clusterName).getPolicy();
+ policy = StringUtils.isBlank(policy)
+ ? FeedLifecycleStage.RETENTION.getDefaultPolicyName() : policy;
+ result.add(policy);
+ }
+
+ if (getArchivalStage(feed, clusterName) != null) {
+ policy = getArchivalStage(feed, clusterName).getPolicy();
+ policy = StringUtils.isBlank(policy)
+ ? FeedLifecycleStage.ARCHIVAL.getDefaultPolicyName() : policy;
+ result.add(policy);
+ }
}
return result;
}
@@ -1295,4 +1304,17 @@ public static List getListing(Feed feed, String clusterName,
return storage.getListing(feed, clusterName, locationType, start, end);
}
+ public static String getArchivalPath(Feed feed, String clusterName) throws FalconException {
+ String archivalPath = "";
+ ArchivalStage archivalStage = getArchivalStage(feed, clusterName);
+ if (archivalStage != null) {
+ Location location = archivalStage.getLocation();
+ if ((location != null) && (location.getPath() != null)) {
+ archivalPath = location.getPath();
+ } else {
+ throw new FalconException("Location cannot be empty.");
+ }
+ }
+ return archivalPath;
+ }
}
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
index 6b72174ac..381495c42 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
@@ -33,18 +33,18 @@
import org.apache.falcon.entity.v0.EntityGraph;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.feed.ACL;
-import org.apache.falcon.entity.v0.feed.Extract;
-import org.apache.falcon.entity.v0.feed.ExtractMethod;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.Cluster;
-import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.entity.v0.feed.Location;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.feed.MergeType;
import org.apache.falcon.entity.v0.feed.Properties;
import org.apache.falcon.entity.v0.feed.Property;
import org.apache.falcon.entity.v0.feed.Sla;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Cluster;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.entity.v0.feed.ACL;
+import org.apache.falcon.entity.v0.feed.Location;
+import org.apache.falcon.entity.v0.feed.Extract;
+import org.apache.falcon.entity.v0.feed.ExtractMethod;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Process;
@@ -157,14 +157,30 @@ private void validateLifecycle(Feed feed) throws FalconException {
LifecyclePolicyMap map = LifecyclePolicyMap.get();
for (Cluster cluster : feed.getClusters().getClusters()) {
if (FeedHelper.isLifecycleEnabled(feed, cluster.getName())) {
- if (FeedHelper.getRetentionStage(feed, cluster.getName()) == null) {
- throw new ValidationException("Retention is a mandatory stage, didn't find it for cluster: "
+ if ((FeedHelper.getRetentionStage(feed, cluster.getName()) != null) || (FeedHelper.getArchivalStage(feed, cluster.getName()) != null)) {
+ validateRetentionStage(feed, cluster);
+ validateArchivalStage(feed, cluster);
+ for (String policyName : FeedHelper.getPolicies(feed, cluster.getName())) {
+ map.get(policyName).validate(feed, cluster.getName());
+ }
+ } else {
+ throw new ValidationException("Atleast one of Retention/Archival is a mandatory stage, didn't find it for cluster: "
+ cluster.getName());
}
- validateRetentionFrequency(feed, cluster.getName());
- for (String policyName : FeedHelper.getPolicies(feed, cluster.getName())) {
- map.get(policyName).validate(feed, cluster.getName());
- }
+ }
+ }
+ }
+
+ private void validateRetentionStage(Feed feed, Cluster cluster) throws FalconException {
+ if (FeedHelper.getRetentionStage(feed, cluster.getName()) != null) {
+ validateRetentionFrequency(feed, cluster.getName());
+ }
+ }
+
+ private void validateArchivalStage(Feed feed, Cluster cluster) throws FalconException {
+ if (FeedHelper.getArchivalStage(feed, cluster.getName()) != null) {
+ if (FeedHelper.getArchivalStage(feed, cluster.getName()).getLocation().getPath().isEmpty()) {
+ throw new ValidationException("Location path cannot be empty.");
}
}
}
diff --git a/common/src/main/java/org/apache/falcon/lifecycle/FeedLifecycleStage.java b/common/src/main/java/org/apache/falcon/lifecycle/FeedLifecycleStage.java
index 833ad0488..5b2775688 100644
--- a/common/src/main/java/org/apache/falcon/lifecycle/FeedLifecycleStage.java
+++ b/common/src/main/java/org/apache/falcon/lifecycle/FeedLifecycleStage.java
@@ -22,7 +22,8 @@
*/
public enum FeedLifecycleStage {
- RETENTION("AgeBasedDelete");
+ RETENTION("AgeBasedDelete"),
+ ARCHIVAL("AgeBasedArchival");
private String defaultPolicyName;
diff --git a/common/src/main/java/org/apache/falcon/lifecycle/archival/AgeBasedArchival.java b/common/src/main/java/org/apache/falcon/lifecycle/archival/AgeBasedArchival.java
new file mode 100644
index 000000000..f14a2a0b1
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/lifecycle/archival/AgeBasedArchival.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.falcon.lifecycle.archival;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.feed.Feed;
+
+/**
+ * Archival policy which archives all instances of instance time depending on the given frequency.
+ * It will create the workflow and coordinators for this policy.
+ */
+public class AgeBasedArchival extends ArchivalPolicy {
+
+ @Override
+ public void validate(Feed feed, String clusterName) throws FalconException {
+
+ }
+}
diff --git a/common/src/main/java/org/apache/falcon/lifecycle/archival/ArchivalPolicy.java b/common/src/main/java/org/apache/falcon/lifecycle/archival/ArchivalPolicy.java
new file mode 100644
index 000000000..b1c7df5e7
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/lifecycle/archival/ArchivalPolicy.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle.archival;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.lifecycle.AbstractPolicyBuilderFactory;
+import org.apache.falcon.lifecycle.FeedLifecycleStage;
+import org.apache.falcon.lifecycle.LifecyclePolicy;
+import org.apache.falcon.lifecycle.PolicyBuilder;
+import org.apache.falcon.workflow.WorkflowEngineFactory;
+import org.apache.hadoop.fs.Path;
+
+import java.util.Properties;
+
+/**
+ * All archival policies must implement this interface.
+ */
+public abstract class ArchivalPolicy implements LifecyclePolicy {
+
+ @Override
+ public String getName() {
+ return this.getClass().getSimpleName();
+ }
+
+ @Override
+ public FeedLifecycleStage getStage() {
+ return FeedLifecycleStage.ARCHIVAL;
+ }
+
+ @Override
+ public Properties build(Cluster cluster, Path buildPath, Feed feed) throws FalconException {
+ AbstractPolicyBuilderFactory factory = WorkflowEngineFactory.getLifecycleEngine();
+ PolicyBuilder builder = factory.getPolicyBuilder(getName());
+ return builder.build(cluster, buildPath, feed);
+ }
+}
diff --git a/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java b/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java
index 8d735f97a..30f2f613a 100644
--- a/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java
+++ b/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java
@@ -46,7 +46,7 @@ public void validate(Feed feed, String clusterName) throws FalconException {
// validate that it is a valid cluster
Cluster cluster = FeedHelper.getCluster(feed, clusterName);
Frequency retentionLimit = getRetentionLimit(feed, clusterName);
- if (cluster != null) {
+ if (cluster != null && retentionLimit != null) {
validateLimitWithSla(feed, cluster, retentionLimit.toString());
validateLimitWithLateData(feed, cluster, retentionLimit.toString());
String lifecycleEngine = StartupProperties.get().getProperty("lifecycle.engine.impl",
@@ -122,9 +122,8 @@ public Frequency getRetentionLimit(Feed feed, String clusterName) throws FalconE
throw new FalconException("Invalid value for property: " + LIMIT_PROPERTY_NAME + ", should be a valid "
+ "frequency e.g. hours(2)", e);
}
- } else {
- throw new FalconException("Cluster " + clusterName + " doesn't contain retention stage");
}
+ return null;
}
}
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index d8040f08d..30731ec3d 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -75,7 +75,7 @@ public enum Type {PRE_PROCESSING, POST_PROCESSING, WORKFLOW_JOB, COORDINATOR_ACT
* Entity operations supported.
*/
public enum EntityOperations {
- GENERATE, DELETE, REPLICATE, IMPORT, EXPORT
+ GENERATE, DELETE, REPLICATE, IMPORT, EXPORT, ARCHIVE
}
public static final WorkflowExecutionArgs[] USER_MESSAGE_ARGS = {
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index f89990577..7878403ea 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -69,9 +69,12 @@
# List of Lifecycle policies configured.
-*.falcon.feed.lifecycle.policies=org.apache.falcon.lifecycle.retention.AgeBasedDelete
+*.falcon.feed.lifecycle.policies=org.apache.falcon.lifecycle.retention.AgeBasedDelete,\
+ org.apache.falcon.lifecycle.archival.AgeBasedArchival
+
# List of builders for the policies.
-*.falcon.feed.lifecycle.policy.builders=org.apache.falcon.lifecycle.engine.oozie.retention.AgeBasedDeleteBuilder
+*.falcon.feed.lifecycle.policy.builders=org.apache.falcon.lifecycle.engine.oozie.retention.AgeBasedDeleteBuilder,\
+ org.apache.falcon.lifecycle.engine.oozie.archival.AgeBasedArchivalBuilder
##### Falcon Configuration Store Change listeners #####
*.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
org.apache.falcon.entity.ColoClusterRelation,\
diff --git a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
index 450b25198..ffbace661 100644
--- a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
@@ -28,8 +28,8 @@
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.cluster.Properties;
import org.apache.falcon.entity.v0.cluster.Property;
-import org.apache.falcon.entity.v0.feed.Argument;
import org.apache.falcon.entity.v0.feed.Arguments;
+import org.apache.falcon.entity.v0.feed.Argument;
import org.apache.falcon.entity.v0.feed.ClusterType;
import org.apache.falcon.entity.v0.feed.Clusters;
import org.apache.falcon.entity.v0.feed.Extract;
@@ -38,12 +38,13 @@
import org.apache.falcon.entity.v0.feed.FieldIncludeExclude;
import org.apache.falcon.entity.v0.feed.FieldsType;
import org.apache.falcon.entity.v0.feed.Import;
-import org.apache.falcon.entity.v0.feed.Lifecycle;
+import org.apache.falcon.entity.v0.feed.Locations;
import org.apache.falcon.entity.v0.feed.Location;
import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.entity.v0.feed.Locations;
+import org.apache.falcon.entity.v0.feed.Lifecycle;
import org.apache.falcon.entity.v0.feed.MergeType;
import org.apache.falcon.entity.v0.feed.RetentionStage;
+import org.apache.falcon.entity.v0.feed.ArchivalStage;
import org.apache.falcon.entity.v0.feed.Datasource;
import org.apache.falcon.entity.v0.feed.Validity;
import org.apache.falcon.entity.v0.process.Input;
@@ -266,8 +267,14 @@ public void testGetPolicies() throws Exception {
.getParser(EntityType.FEED);
Feed feed = parser.parse(this.getClass().getResourceAsStream(FEED3_XML));
List policies = FeedHelper.getPolicies(feed, "testCluster");
- Assert.assertEquals(policies.size(), 1);
+ Assert.assertEquals(policies.size(), 2);
Assert.assertEquals(policies.get(0), "AgeBasedDelete");
+ Assert.assertEquals(policies.get(1), "AgeBasedArchival");
+
+ feed.getLifecycle().setRetentionStage(null);
+ policies = FeedHelper.getPolicies(feed, "backupCluster");
+ Assert.assertEquals(policies.size(), 1);
+ Assert.assertEquals(policies.get(0), "AgeBasedArchival");
}
@Test
@@ -903,6 +910,180 @@ public void testGetRetentionFrequency() throws Exception {
new Frequency("hours(4)"));
}
+ @Test
+ public void testGetArchivalStage() throws Exception {
+ Feed feed = new Feed();
+ feed.setFrequency(new Frequency("hours(1)"));
+
+ // archival stage location is not defined
+ Lifecycle globalLifecycle = new Lifecycle();
+ ArchivalStage globalArchivalStage = new ArchivalStage();
+ globalLifecycle.setArchivalStage(globalArchivalStage);
+ feed.setLifecycle(globalLifecycle);
+
+ Clusters clusters = new Clusters();
+ org.apache.falcon.entity.v0.feed.Cluster cluster = new org.apache.falcon.entity.v0.feed.Cluster();
+ cluster.setName("cluster1");
+ clusters.getClusters().add(cluster);
+ feed.setClusters(clusters);
+
+ // lifecycle is defined only at global level
+ Location globalLocation = new Location();
+ globalLocation.setType(LocationType.DATA);
+ globalLocation.setPath("s4://globalPath/archival/");
+ globalArchivalStage.setLocation(globalLocation);
+ globalLifecycle.setArchivalStage(globalArchivalStage);
+ feed.setLifecycle(globalLifecycle);
+ Assert.assertNotNull(FeedHelper.getArchivalStage(feed, cluster.getName()));
+ Assert.assertEquals(FeedHelper.getArchivalPath(feed, cluster.getName()),
+ feed.getLifecycle().getArchivalStage().getLocation().getPath());
+
+ // lifecycle is defined at both global and cluster level
+ Lifecycle clusterLifecycle = new Lifecycle();
+ ArchivalStage clusterArchivalStage = new ArchivalStage();
+ Location clusterLocation = new Location();
+ clusterLocation.setType(LocationType.DATA);
+ clusterLocation.setPath("s4://clusterPath/archival/");
+ clusterArchivalStage.setLocation(clusterLocation);
+ clusterLifecycle.setArchivalStage(clusterArchivalStage);
+ feed.getClusters().getClusters().get(0).setLifecycle(clusterLifecycle);
+ Assert.assertNotNull(FeedHelper.getArchivalStage(feed, cluster.getName()));
+ Assert.assertEquals(FeedHelper.getArchivalPath(feed, cluster.getName()),
+ cluster.getLifecycle().getArchivalStage().getLocation().getPath());
+
+ // lifecycle at both level - archival only at cluster level.
+ feed.getLifecycle().setArchivalStage(null);
+ Assert.assertNotNull(FeedHelper.getArchivalStage(feed, cluster.getName()));
+ Assert.assertEquals(FeedHelper.getArchivalPath(feed, cluster.getName()),
+ cluster.getLifecycle().getArchivalStage().getLocation().getPath());
+
+ // lifecycle at both level - archival only at global level.
+ feed.getLifecycle().setArchivalStage(globalArchivalStage);
+ feed.getClusters().getClusters().get(0).getLifecycle().setArchivalStage(null);
+ Assert.assertNotNull(FeedHelper.getArchivalStage(feed, cluster.getName()));
+ Assert.assertEquals(FeedHelper.getArchivalPath(feed, cluster.getName()),
+ feed.getLifecycle().getArchivalStage().getLocation().getPath());
+
+ // lifecycle is defined only at cluster level
+ feed.setLifecycle(null);
+ feed.getClusters().getClusters().get(0).getLifecycle().setArchivalStage(clusterArchivalStage);
+ Assert.assertNotNull(FeedHelper.getArchivalStage(feed, cluster.getName()));
+ Assert.assertEquals(FeedHelper.getArchivalPath(feed, cluster.getName()),
+ cluster.getLifecycle().getArchivalStage().getLocation().getPath());
+ }
+
+ @Test
+ public void testLifecycleStage() throws Exception {
+ Feed feed = new Feed();
+ feed.setFrequency(new Frequency("days(1)"));
+
+ Clusters clusters = new Clusters();
+ org.apache.falcon.entity.v0.feed.Cluster cluster = new org.apache.falcon.entity.v0.feed.Cluster();
+ cluster.setName("cluster1");
+ clusters.getClusters().add(cluster);
+ feed.setClusters(clusters);
+
+ // retention stage frequency is not defined
+ // archival stage location is not defined
+ Lifecycle globalLifecycle = new Lifecycle();
+
+ ArchivalStage globalArchivalStage = new ArchivalStage();
+ globalLifecycle.setArchivalStage(globalArchivalStage);
+ RetentionStage globalRetentionStage = new RetentionStage();
+ globalLifecycle.setRetentionStage(globalRetentionStage);
+ feed.setLifecycle(globalLifecycle);
+
+ // lifecycle is defined only at global level
+ Location globalLocation = new Location();
+ globalLocation.setType(LocationType.DATA);
+ globalLocation.setPath("s4://globalPath/archival/");
+ globalArchivalStage.setLocation(globalLocation);
+ globalLifecycle.setArchivalStage(globalArchivalStage);
+ globalRetentionStage.setFrequency(new Frequency("hours(2)"));
+ globalLifecycle.setRetentionStage(globalRetentionStage);
+ feed.setLifecycle(globalLifecycle);
+ Assert.assertNotNull(FeedHelper.getArchivalStage(feed, cluster.getName()));
+ Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName()));
+ Assert.assertEquals(FeedHelper.getArchivalPath(feed, cluster.getName()),
+ feed.getLifecycle().getArchivalStage().getLocation().getPath());
+ Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()),
+ feed.getLifecycle().getRetentionStage().getFrequency());
+
+ // lifecycle is defined at both global and cluster level
+ Lifecycle clusterLifecycle = new Lifecycle();
+ ArchivalStage clusterArchivalStage = new ArchivalStage();
+ Location clusterLocation = new Location();
+ clusterLocation.setType(LocationType.DATA);
+ clusterLocation.setPath("s4://clusterPath/archival/");
+ clusterArchivalStage.setLocation(clusterLocation);
+ clusterLifecycle.setArchivalStage(clusterArchivalStage);
+ RetentionStage clusterRetentionStage = new RetentionStage();
+ clusterRetentionStage.setFrequency(new Frequency("hours(4)"));
+ clusterLifecycle.setRetentionStage(clusterRetentionStage);
+ feed.getClusters().getClusters().get(0).setLifecycle(clusterLifecycle);
+ Assert.assertNotNull(FeedHelper.getArchivalStage(feed, cluster.getName()));
+ Assert.assertEquals(FeedHelper.getArchivalPath(feed, cluster.getName()),
+ cluster.getLifecycle().getArchivalStage().getLocation().getPath());
+ Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName()));
+ Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()),
+ cluster.getLifecycle().getRetentionStage().getFrequency());
+
+ // lifecycle at both level - retention/archival only at cluster level.
+ feed.getLifecycle().setArchivalStage(null);
+ feed.getLifecycle().setRetentionStage(null);
+ Assert.assertNotNull(FeedHelper.getArchivalStage(feed, cluster.getName()));
+ Assert.assertEquals(FeedHelper.getArchivalPath(feed, cluster.getName()),
+ cluster.getLifecycle().getArchivalStage().getLocation().getPath());
+ Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName()));
+ Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()),
+ cluster.getLifecycle().getRetentionStage().getFrequency());
+
+ // lifecycle at both level - retention/archival only at global level.
+ feed.getLifecycle().setArchivalStage(globalArchivalStage);
+ feed.getLifecycle().setRetentionStage(globalRetentionStage);
+ feed.getClusters().getClusters().get(0).getLifecycle().setArchivalStage(null);
+ feed.getClusters().getClusters().get(0).getLifecycle().setRetentionStage(null);
+ Assert.assertNotNull(FeedHelper.getArchivalStage(feed, cluster.getName()));
+ Assert.assertEquals(FeedHelper.getArchivalPath(feed, cluster.getName()),
+ feed.getLifecycle().getArchivalStage().getLocation().getPath());
+ Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName()));
+ Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()),
+ feed.getLifecycle().getRetentionStage().getFrequency());
+
+ // lifecycle at both level- retention at cluster level, archival at global level
+ feed.getClusters().getClusters().get(0).getLifecycle().setRetentionStage(clusterRetentionStage);
+ feed.getLifecycle().setRetentionStage(null);
+ Assert.assertNotNull(FeedHelper.getArchivalStage(feed, cluster.getName()));
+ Assert.assertEquals(FeedHelper.getArchivalPath(feed, cluster.getName()),
+ feed.getLifecycle().getArchivalStage().getLocation().getPath());
+ Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName()));
+ Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()),
+ cluster.getLifecycle().getRetentionStage().getFrequency());
+
+ // lifecycle at both level- retention at global level, archival at cluster level
+ feed.getClusters().getClusters().get(0).getLifecycle().setArchivalStage(clusterArchivalStage);
+ feed.getClusters().getClusters().get(0).getLifecycle().setRetentionStage(null);
+ feed.getLifecycle().setRetentionStage(globalRetentionStage);
+ feed.getLifecycle().setArchivalStage(null);
+ Assert.assertNotNull(FeedHelper.getArchivalStage(feed, cluster.getName()));
+ Assert.assertEquals(FeedHelper.getArchivalPath(feed, cluster.getName()),
+ cluster.getLifecycle().getArchivalStage().getLocation().getPath());
+ Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName()));
+ Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()),
+ feed.getLifecycle().getRetentionStage().getFrequency());
+
+ // lifecycle is defined only at cluster level
+ feed.setLifecycle(null);
+ feed.getClusters().getClusters().get(0).getLifecycle().setArchivalStage(clusterArchivalStage);
+ feed.getClusters().getClusters().get(0).getLifecycle().setRetentionStage(clusterRetentionStage);
+ Assert.assertNotNull(FeedHelper.getArchivalStage(feed, cluster.getName()));
+ Assert.assertEquals(FeedHelper.getArchivalPath(feed, cluster.getName()),
+ cluster.getLifecycle().getArchivalStage().getLocation().getPath());
+ Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName()));
+ Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()),
+ cluster.getLifecycle().getRetentionStage().getFrequency());
+ }
+
@Test
public void testFeedImportSnapshot() throws Exception {
Cluster cluster = publishCluster();
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
index ced4fc502..559b7116c 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
@@ -187,14 +187,20 @@ public void testLifecycleParse() throws Exception {
assertEquals("AgeBasedDelete", FeedHelper.getPolicies(feed, "testCluster").get(0));
assertEquals("reports", feed.getLifecycle().getRetentionStage().getQueue());
assertEquals("NORMAL", feed.getLifecycle().getRetentionStage().getPriority());
+ assertEquals("default", feed.getLifecycle().getArchivalStage().getQueue());
+ assertEquals("s4://prod/falcon/${YEAR}-${MONTH}-${DAY}-${HOUR}/", feed.getLifecycle().getArchivalStage().getLocation().getPath());
+ assertEquals("AgeBasedArchival", FeedHelper.getPolicies(feed, "testCluster").get(1));
+ assertEquals("s4://prodnew/falcon/${YEAR}-${MONTH}-${DAY}-${HOUR}/", feed.getClusters().getClusters().get(0).getLifecycle().getArchivalStage().getLocation().getPath());
}
@Test(expectedExceptions = ValidationException.class,
- expectedExceptionsMessageRegExp = ".*Retention is a mandatory stage.*")
- public void testMandatoryRetention() throws Exception {
+ expectedExceptionsMessageRegExp = ".*Atleast one of Retention/Archival is a mandatory stage, didn't find it for cluster.*")
+ public void testLifecycleStage() throws Exception {
Feed feed = parser.parseAndValidate(this.getClass()
.getResourceAsStream(FEED3_XML));
feed.getLifecycle().setRetentionStage(null);
+ feed.getLifecycle().setArchivalStage(null);
+ assertEquals("AgeBasedDelete", FeedHelper.getPolicies(feed, "testCluster").get(0));
parser.validate(feed);
}
@@ -253,6 +259,15 @@ public void testRetentionFrequency() throws Exception {
parser.validate(feed);
}
+ @Test(expectedExceptions = ValidationException.class,
+ expectedExceptionsMessageRegExp = ".*Location path cannot be empty.*")
+ public void testArchivalPath() throws Exception {
+ Feed feed = parser.parseAndValidate(this.getClass()
+ .getResourceAsStream(FEED3_XML));
+ feed.getLifecycle().getArchivalStage().getLocation().setPath("");
+ parser.validate(feed);
+ }
+
@Test(expectedExceptions = ValidationException.class)
public void applyValidationInvalidFeed() throws Exception {
Feed feed = parser.parseAndValidate(ProcessEntityParserTest.class
diff --git a/common/src/test/resources/config/feed/feed-0.3.xml b/common/src/test/resources/config/feed/feed-0.3.xml
index e6d3e01ac..ccfd0dad1 100644
--- a/common/src/test/resources/config/feed/feed-0.3.xml
+++ b/common/src/test/resources/config/feed/feed-0.3.xml
@@ -53,6 +53,11 @@
+
+ false
+ default
+
+
@@ -79,5 +84,10 @@
+
+ false
+ default
+
+
diff --git a/docs/src/site/twiki/EntitySpecification.twiki b/docs/src/site/twiki/EntitySpecification.twiki
index 11d1e1b0b..332df28d7 100644
--- a/docs/src/site/twiki/EntitySpecification.twiki
+++ b/docs/src/site/twiki/EntitySpecification.twiki
@@ -439,13 +439,16 @@ replication. "preserveReplicationNumber" represents preserving replication numbe
+
+ false
+ default
+
+
-lifecycle tag is the new way to define various stages of a feed's lifecycle. In the example above we have defined a
-retention-stage using lifecycle tag. You may define lifecycle at global level or a cluster level or both. Cluster level
-configuration takes precedence and falcon falls back to global definition if cluster level specification is missing.
+lifecycle tag is the new way to define various stages of a feed's lifecycle. In the example above we have defined retention-stage and archival-stage using lifecycle tag. Atleast one of the satges should be defined if lifecycle tag is used. You may define lifecycle at global level or a cluster level or both. Cluster level configuration takes precedence and falcon falls back to global definition if cluster level specification is missing.
----++++ Retention Stage
@@ -468,6 +471,13 @@ wastage of compute resources.
In future, we will allow more customisation like customising how to choose instances to be deleted through this method.
+----++++ Archival Stage
+Lifecycle archival stage allows users to archive data instead of creating dummy replication feed for archival. This is
+done through tag in tag.
+
+In this new method of defining archival you can specify the path at which data is to be archived and the queue for
+archival jobs. The behavior of archival-stage is to archive all the data corresponding to the given instance-time.
+Tag "location" is a mandatory property and must contain a valid path e.g. "s4://prod/falcon/${YEAR}-${MONTH}-${DAY}"
---++ Process Specification
diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/archival/AgeBasedArchivalBuilder.java b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/archival/AgeBasedArchivalBuilder.java
new file mode 100644
index 000000000..0bab471af
--- /dev/null
+++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/archival/AgeBasedArchivalBuilder.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle.engine.oozie.archival;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.lifecycle.PolicyBuilder;
+import org.apache.falcon.lifecycle.archival.AgeBasedArchival;
+import org.apache.hadoop.fs.Path;
+
+import java.util.Properties;
+
+public class AgeBasedArchivalBuilder implements PolicyBuilder {
+
+ private static final String NAME = new AgeBasedArchival().getName();
+
+ @Override
+ public Properties build(Cluster cluster, Path buildPath, Feed feed) throws FalconException {
+ Properties wfProps = buildWorkflow(cluster, buildPath, feed);
+ return buildCoordinator(cluster, buildPath, feed, wfProps);
+ }
+
+ @Override
+ public String getPolicyName() {
+ return NAME;
+ }
+
+ public Properties buildCoordinator(Cluster cluster, Path buildPath, Feed feed, Properties wfProps)
+ throws FalconException {
+ return AgeBasedArchivalCoordinatorBuilder.build(cluster, buildPath, feed, wfProps);
+ }
+
+ public Properties buildWorkflow(Cluster cluster, Path buildPath, Feed feed) throws FalconException {
+ return AgeBasedArchivalWorkflowBuilder.build(cluster, buildPath, feed);
+ }
+}
diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/archival/AgeBasedArchivalCoordinatorBuilder.java b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/archival/AgeBasedArchivalCoordinatorBuilder.java
new file mode 100644
index 000000000..7ffe5baf7
--- /dev/null
+++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/archival/AgeBasedArchivalCoordinatorBuilder.java
@@ -0,0 +1,347 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle.engine.oozie.archival;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.process.ExecutionType;
+import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.lifecycle.engine.oozie.utils.OozieBuilderUtils;
+import org.apache.falcon.oozie.coordinator.SYNCDATASET;
+import org.apache.falcon.oozie.coordinator.DATAOUT;
+import org.apache.falcon.oozie.coordinator.DATAIN;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.coordinator.ACTION;
+import org.apache.falcon.oozie.coordinator.WORKFLOW;
+import org.apache.falcon.oozie.coordinator.DATASETS;
+import org.apache.falcon.oozie.coordinator.CONTROLS;
+import org.apache.falcon.oozie.coordinator.INPUTEVENTS;
+import org.apache.falcon.oozie.coordinator.OUTPUTEVENTS;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class AgeBasedArchivalCoordinatorBuilder {
+
+ private AgeBasedArchivalCoordinatorBuilder() {
+
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(AgeBasedArchivalCoordinatorBuilder.class);
+ private static final int THIRTY_MINUTES = 30 * 60 * 1000;
+ private static final String PARALLEL = "parallel";
+ private static final String TIMEOUT = "timeout";
+ private static final String ORDER = "order";
+ public static final String IN_DATASET_NAME = "input-dataset";
+ public static final String OUT_DATASET_NAME = "output-dataset";
+ public static final String DATAIN_NAME = "input";
+ public static final String DATAOUT_NAME = "output";
+
+ /**
+ * Builds the coordinator app.
+ * @param cluster - cluster to schedule retention on.
+ * @param basePath - Base path to marshal coordinator app.
+ * @param feed - feed for which retention is to be scheduled.
+ * @param wfProp - properties passed from workflow to coordinator e.g. ENTITY_PATH
+ * @return - Properties from creating the coordinator application to be used by Bundle.
+ * @throws FalconException
+ */
+ public static Properties build(Cluster cluster, Path basePath, Feed feed, Properties wfProp)
+ throws FalconException {
+
+ org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
+
+ // workflow is serialized to a specific dir
+ Path coordPath = new Path(basePath, Tag.ARCHIVAL.name() + "/" + feedCluster.getName());
+
+ COORDINATORAPP coord = new COORDINATORAPP();
+
+ String coordName = EntityUtil.getWorkflowName(LifeCycle.ARCHIVAL.getTag(), feed).toString();
+
+ long replicationDelayInMillis = getReplicationDelayInMillis(feed, cluster);
+ Date sourceStartDate = getStartDate(feed, cluster, replicationDelayInMillis);
+ Date sourceEndDate = getEndDate(feed, cluster);
+
+ coord.setName(coordName);
+ coord.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
+ coord.setEnd(SchemaHelper.formatDateUTC(sourceEndDate));
+ coord.setStart(SchemaHelper.formatDateUTC(sourceStartDate));
+ coord.setTimezone(feed.getTimezone().getID());
+
+ if (replicationDelayInMillis > 0) {
+ long delayInMins = -1 * replicationDelayInMillis / (1000 * 60);
+ String elExp = "${now(0," + delayInMins + ")}";
+
+ initializeInputOutputPath(coord, cluster, feed, elExp);
+ }
+
+ setCoordControls(feed, coord);
+
+ final Storage sourceStorage = FeedHelper.createReadOnlyStorage(cluster, feed);
+ initializeInputDataSet(feed, cluster, coord, sourceStorage);
+
+ String archivalPath = FeedHelper.getArchivalPath(feed, cluster.getName());
+ initializeOutputDataSet(feed, cluster, coord, archivalPath);
+
+ ACTION replicationWorkflowAction = getReplicationWorkflowAction(feed, cluster, coordPath, coordName, sourceStorage);
+ coord.setAction(replicationWorkflowAction);
+
+ Path marshalPath = OozieBuilderUtils.marshalCoordinator(cluster, coord, coordPath);
+ wfProp.putAll(OozieBuilderUtils.getProperties(marshalPath, coordName));
+
+ return wfProp;
+
+ }
+
+ private static void initializeInputOutputPath(COORDINATORAPP coord, Cluster cluster, Feed feed, String elExp)
+ throws FalconException {
+
+ if (coord.getInputEvents() == null) {
+ coord.setInputEvents(new INPUTEVENTS());
+ }
+
+ if (coord.getOutputEvents() == null) {
+ coord.setOutputEvents(new OUTPUTEVENTS());
+ }
+
+ DATAIN datain = createDataIn(feed, cluster);
+ coord.getInputEvents().getDataIn().add(datain);
+
+ DATAOUT dataout = createDataOut(feed, cluster);
+ coord.getOutputEvents().getDataOut().add(dataout);
+
+ coord.getInputEvents().getDataIn().get(0).getInstance().set(0, elExp);
+ coord.getOutputEvents().getDataOut().get(0).setInstance(elExp);
+ }
+
+
+ private static DATAIN createDataIn(Feed feed, Cluster cluster) {
+ DATAIN datain = new DATAIN();
+ datain.setName(DATAIN_NAME);
+ datain.setDataset(IN_DATASET_NAME);
+ org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
+ datain.getInstance().add(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()));
+ return datain;
+ }
+
+ private static DATAOUT createDataOut(Feed feed, Cluster cluster) {
+ DATAOUT dataout = new DATAOUT();
+ dataout.setName(DATAOUT_NAME);
+ dataout.setDataset(OUT_DATASET_NAME);
+ org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
+ dataout.setInstance("${coord:current(0)}");
+ return dataout;
+ }
+
+ private static ACTION getReplicationWorkflowAction(Feed feed, Cluster cluster, Path buildPath, String coordName,
+ Storage sourceStorage) throws FalconException {
+ ACTION action = new ACTION();
+ WORKFLOW workflow = new WORKFLOW();
+
+ workflow.setAppPath(OozieBuilderUtils.getStoragePath(String.valueOf(buildPath)));
+ Properties props = OozieBuilderUtils.createCoordDefaultConfiguration(coordName, feed);
+
+ // Setting CLUSTER_NAME property to include source cluster
+ props.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), cluster.getName());
+ props.put("srcClusterName", cluster.getName());
+ props.put("srcClusterColo", cluster.getColo());
+
+ // the storage type is uniform across source and target feeds for replication
+ props.put("falconFeedStorageType", sourceStorage.getType().name());
+
+ String instancePaths = "";
+ if (sourceStorage.getType() == Storage.TYPE.FILESYSTEM) {
+ String pathsWithPartitions = getPathsWithPartitions(feed, cluster);
+ instancePaths = pathsWithPartitions;
+ propagateFileSystemCopyProperties(pathsWithPartitions, props);
+ }
+
+ propagateLateDataProperties(feed, instancePaths, sourceStorage.getType().name(), props);
+ // Add the custom properties set in feed. Else, dryrun won't catch any missing props.
+ props.putAll(EntityUtil.getEntityProperties(feed));
+ workflow.setConfiguration(OozieBuilderUtils.getCoordinatorConfig(props));
+ action.setWorkflow(workflow);
+
+ return action;
+ }
+
+ private static String getPathsWithPartitions(Feed feed, Cluster cluster) throws FalconException {
+ String srcPart = FeedHelper.normalizePartitionExpression(
+ FeedHelper.getCluster(feed, cluster.getName()).getPartition());
+ srcPart = FeedHelper.evaluateClusterExp(cluster, srcPart);
+
+ StringBuilder pathsWithPartitions = new StringBuilder();
+ pathsWithPartitions.append("${coord:dataIn('input')}/")
+ .append(FeedHelper.normalizePartitionExpression(srcPart));
+
+ String parts = pathsWithPartitions.toString().replaceAll("//+", "/");
+ parts = StringUtils.stripEnd(parts, "/");
+ return parts;
+ }
+
+ private static void propagateLateDataProperties(Feed feed, String instancePaths,
+ String falconFeedStorageType, Properties props) {
+ // todo these pairs are the same but used in different context
+ // late data handler - should-record action
+ props.put(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), feed.getName());
+ props.put(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), instancePaths);
+ props.put(WorkflowExecutionArgs.INPUT_NAMES.getName(), feed.getName());
+
+ // storage type for each corresponding feed - in this case only one feed is involved
+ // needed to compute usage based on storage type in LateDataHandler
+ props.put(WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName(), falconFeedStorageType);
+
+ // falcon post processing
+ props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), feed.getName());
+ props.put(WorkflowExecutionArgs.OUTPUT_NAMES.getName(), feed.getName());
+ props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), "${coord:dataOut('output')}");
+ }
+
+ private static void propagateFileSystemCopyProperties(String paths, Properties props) throws FalconException {
+ props.put("sourceRelativePaths", paths);
+
+ props.put("distcpSourcePaths", "${coord:dataIn('input')}");
+ props.put("distcpTargetPaths", "${coord:dataOut('output')}");
+ }
+
+ private static void setCoordControls(Feed feed, COORDINATORAPP coord) throws FalconException {
+ // set controls
+ CONTROLS controls = new CONTROLS();
+
+ long frequencyInMillis = ExpressionHelper.get().evaluate(feed.getFrequency().toString(), Long.class);
+ long timeoutInMillis = frequencyInMillis * 6;
+ if (timeoutInMillis < THIRTY_MINUTES) {
+ timeoutInMillis = THIRTY_MINUTES;
+ }
+
+ Properties props = EntityUtil.getEntityProperties(feed);
+ String timeout = props.getProperty(TIMEOUT);
+ if (timeout!=null) {
+ try{
+ timeoutInMillis= ExpressionHelper.get().evaluate(timeout, Long.class);
+ } catch (Exception ignore) {
+ LOG.error("Unable to evaluate timeout:", ignore);
+ }
+ }
+
+ String parallelProp = props.getProperty(PARALLEL);
+ int parallel = 1;
+ if (parallelProp != null) {
+ try {
+ parallel = Integer.parseInt(parallelProp);
+ } catch (NumberFormatException ignore) {
+ LOG.error("Unable to parse parallel:", ignore);
+ }
+ }
+
+ String orderProp = props.getProperty(ORDER);
+ ExecutionType order = ExecutionType.FIFO;
+ if (orderProp != null) {
+ try {
+ order = ExecutionType.fromValue(orderProp);
+ } catch (IllegalArgumentException ignore) {
+ LOG.error("Unable to parse order:", ignore);
+ }
+ }
+
+ controls.setTimeout(String.valueOf(timeoutInMillis / (1000 * 60)));
+ controls.setThrottle(String.valueOf(timeoutInMillis / frequencyInMillis * 2));
+ controls.setConcurrency(String.valueOf(parallel));
+ controls.setExecution(order.name());
+ coord.setControls(controls);
+ }
+
+ private static void initializeInputDataSet(Feed feed, Cluster cluster, COORDINATORAPP coord, Storage storage) throws FalconException {
+ if (coord.getDatasets() == null) {
+ coord.setDatasets(new DATASETS());
+ }
+
+ SYNCDATASET inputDataset = new SYNCDATASET();
+ inputDataset.setName(IN_DATASET_NAME);
+
+ String uriTemplate = storage.getUriTemplate(LocationType.DATA);
+ inputDataset.setUriTemplate(uriTemplate);
+
+ setDatasetValues(feed, inputDataset, cluster);
+
+ if (feed.getAvailabilityFlag() == null) {
+ inputDataset.setDoneFlag("");
+ } else {
+ inputDataset.setDoneFlag(feed.getAvailabilityFlag());
+ }
+
+ coord.getDatasets().getDatasetOrAsyncDataset().add(inputDataset);
+ }
+
+ private static void initializeOutputDataSet(Feed feed, Cluster cluster, COORDINATORAPP coord,
+ String targetPath) throws FalconException {
+ if (coord.getDatasets() == null) {
+ coord.setDatasets(new DATASETS());
+ }
+
+ SYNCDATASET outputDataset = new SYNCDATASET();
+ outputDataset.setName(OUT_DATASET_NAME);
+ outputDataset.setUriTemplate(targetPath);
+
+ setDatasetValues(feed, outputDataset, cluster);
+ coord.getDatasets().getDatasetOrAsyncDataset().add(outputDataset);
+ }
+
+
+ private static void setDatasetValues(Feed feed, SYNCDATASET dataset, Cluster cluster) {
+ dataset.setInitialInstance(SchemaHelper.formatDateUTC(
+ FeedHelper.getCluster(feed, cluster.getName()).getValidity().getStart()));
+ dataset.setTimezone(feed.getTimezone().getID());
+ dataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
+ }
+
+ private static long getReplicationDelayInMillis(Feed feed, Cluster srcCluster) throws FalconException {
+ Frequency replicationDelay = FeedHelper.getCluster(feed, srcCluster.getName()).getDelay();
+ long delayInMillis=0;
+ if (replicationDelay != null) {
+ delayInMillis = ExpressionHelper.get().evaluate(
+ replicationDelay.toString(), Long.class);
+ }
+
+ return delayInMillis;
+ }
+
+ private static Date getStartDate(Feed feed, Cluster cluster, long replicationDelayInMillis) {
+ Date startDate = FeedHelper.getCluster(feed, cluster.getName()).getValidity().getStart();
+ return replicationDelayInMillis == 0 ? startDate : new Date(startDate.getTime() + replicationDelayInMillis);
+ }
+
+ private static Date getEndDate(Feed feed, Cluster cluster) {
+ return FeedHelper.getCluster(feed, cluster.getName()).getValidity().getEnd();
+ }
+
+}
diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/archival/AgeBasedArchivalWorkflowBuilder.java b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/archival/AgeBasedArchivalWorkflowBuilder.java
new file mode 100644
index 000000000..5ba60c50b
--- /dev/null
+++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/archival/AgeBasedArchivalWorkflowBuilder.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle.engine.oozie.archival;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.ArchivalStage;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Property;
+import org.apache.falcon.lifecycle.engine.oozie.utils.OozieBuilderUtils;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.util.ReplicationDistCpOption;
+import org.apache.falcon.util.RuntimeProperties;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.hadoop.fs.Path;
+
+import java.util.List;
+import java.util.Properties;
+
+public class AgeBasedArchivalWorkflowBuilder {
+ private static final String ARCHIVAL_ACTION_TEMPLATE = "/action/feed/archival-action.xml";
+ private static final String ARCHIVAL_ACTION_NAME = "archival";
+
+ private static final String ARCHIVAL_JOB_COUNTER = "job.counter";
+ private static final String TDE_ENCRYPTION_ENABLED = "tdeEncryptionEnabled";
+ private static final String MR_MAX_MAPS = "maxMaps";
+ private static final String MR_MAP_BANDWIDTH = "mapBandwidth";
+
+ private AgeBasedArchivalWorkflowBuilder(){
+
+ }
+
+ public static Properties build(Cluster cluster, Path basePath, Feed feed) throws FalconException {
+ org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
+
+ // workflow is serialized to a specific dir
+ Path buildPath = new Path(basePath, Tag.ARCHIVAL.name() + "/" + feedCluster.getName());
+
+ WORKFLOWAPP workflow = new WORKFLOWAPP();
+ String wfName = EntityUtil.getWorkflowName(Tag.ARCHIVAL, feed).toString();
+
+ String start = ARCHIVAL_ACTION_NAME;
+
+ //Add pre-processing
+ if (shouldPreProcess(feed)) {
+ ACTION action = OozieBuilderUtils.getPreProcessingAction(Tag.ARCHIVAL);
+ addTransition(action, ARCHIVAL_ACTION_NAME, OozieBuilderUtils.getFailAction());
+ workflow.getDecisionOrForkOrJoin().add(action);
+ start = OozieBuilderUtils.PREPROCESS_ACTION_NAME;
+ }
+
+ //Add replication
+ ACTION archival = OozieBuilderUtils.unmarshalAction(ARCHIVAL_ACTION_TEMPLATE);
+ addAdditionalArchivalProperties(feed, archival);
+ enableCounters(feed, archival);
+ enableTDE(feed, archival);
+ addPostProcessing(workflow, archival);
+ OozieBuilderUtils.decorateWorkflow(workflow, wfName, start);
+
+ OozieBuilderUtils.addLibExtensionsToWorkflow(cluster, workflow, Tag.ARCHIVAL, EntityType.FEED);
+
+ OozieBuilderUtils.marshalWokflow(cluster, workflow, buildPath);
+
+
+ Properties props = OozieBuilderUtils.getProperties(buildPath, wfName);
+ props.putAll(OozieBuilderUtils.createDefaultConfiguration(cluster, feed, WorkflowExecutionContext.EntityOperations.ARCHIVE));
+
+ props.putAll(getWorkflowProperties(feed, cluster));
+ props.putAll(FeedHelper.getUserWorkflowProperties(LifeCycle.ARCHIVAL));
+
+ ArchivalStage archivalStage = FeedHelper.getArchivalStage(feed, cluster.getName());
+ if (StringUtils.isNotBlank(archivalStage.getQueue())) {
+ props.put(OozieBuilderUtils.MR_QUEUE_NAME, archivalStage.getQueue());
+ }
+
+ // Write out the config to config-default.xml
+ OozieBuilderUtils.marshalDefaultConfig(cluster, workflow, props, buildPath);
+ return props;
+
+ }
+
+ private static Properties getWorkflowProperties(Feed feed, Cluster cluster) throws FalconException {
+ Properties props = FeedHelper.getFeedProperties(feed);
+ if (props.getProperty(MR_MAX_MAPS) == null) { // set default if user has not overridden
+ props.put(MR_MAX_MAPS, getDefaultMaxMaps());
+ }
+ if (props.getProperty(MR_MAP_BANDWIDTH) == null) { // set default if user has not overridden
+ props.put(MR_MAP_BANDWIDTH, getDefaultMapBandwidth());
+ }
+
+ if (feed.getAvailabilityFlag() == null) {
+ props.put("availabilityFlag", "NA");
+ } else {
+ props.put("availabilityFlag", feed.getAvailabilityFlag());
+ }
+ props.put(WorkflowExecutionArgs.DATASOURCE_NAME.getName(), "NA");
+
+ return props;
+ }
+
+ private static String getDefaultMaxMaps() {
+ return RuntimeProperties.get().getProperty("falcon.replication.workflow.maxmaps", "5");
+ }
+
+ private static String getDefaultMapBandwidth() {
+ return RuntimeProperties.get().getProperty("falcon.replication.workflow.mapbandwidth", "100");
+ }
+
+ private static boolean shouldPreProcess(Feed feed) throws FalconException {
+ return !(EntityUtil.getLateProcess(feed) == null
+ || EntityUtil.getLateProcess(feed).getLateInputs() == null
+ || EntityUtil.getLateProcess(feed).getLateInputs().size() == 0);
+ }
+
+ private static void addTransition(ACTION action, String ok, String fail) {
+ action.getOk().setTo(ok);
+ action.getError().setTo(fail);
+ }
+
+ private static void addAdditionalArchivalProperties(Feed feed, ACTION archivalAction) {
+ List args = archivalAction.getJava().getArg();
+ Properties props = EntityUtil.getEntityProperties(feed);
+
+ for (ReplicationDistCpOption distcpOption : ReplicationDistCpOption.values()) {
+ String propertyValue = props.getProperty(distcpOption.getName());
+ if (StringUtils.isNotEmpty(propertyValue)) {
+ args.add("-" + distcpOption.getName());
+ args.add(propertyValue);
+ }
+ }
+ }
+
+ private static ACTION enableCounters(Feed feed, ACTION action) throws FalconException {
+ if (isCounterEnabled(feed)) {
+ List args = action.getJava().getArg();
+ args.add("-counterLogDir");
+ args.add("${logDir}/job-${nominalTime}/${srcClusterName == 'NA' ? '' : srcClusterName}");
+ }
+ return action;
+ }
+
+ private static boolean isCounterEnabled(Feed feed) throws FalconException {
+ if (feed.getProperties() != null) {
+ List propertyList = feed.getProperties().getProperties();
+ for (Property prop : propertyList) {
+ if (prop.getName().equals(ARCHIVAL_JOB_COUNTER) && "true" .equalsIgnoreCase(prop.getValue())) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ private static ACTION enableTDE(Feed feed, ACTION action) throws FalconException {
+ if (isTDEEnabled(feed)) {
+ List args = action.getJava().getArg();
+ args.add("-tdeEncryptionEnabled");
+ args.add("true");
+ }
+ return action;
+ }
+
+ private static boolean isTDEEnabled(Feed feed) {
+ String tdeEncryptionEnabled = FeedHelper.getPropertyValue(feed, TDE_ENCRYPTION_ENABLED);
+ return "true" .equalsIgnoreCase(tdeEncryptionEnabled);
+ }
+
+ private static void addPostProcessing(WORKFLOWAPP workflow, ACTION action) throws FalconException{
+ if (!Boolean.parseBoolean(OozieBuilderUtils.ENABLE_POSTPROCESSING)){
+ OozieBuilderUtils.addTransition(action, OozieBuilderUtils.OK_ACTION_NAME,
+ OozieBuilderUtils.FAIL_ACTION_NAME);
+ workflow.getDecisionOrForkOrJoin().add(action);
+ } else {
+ OozieBuilderUtils.addTransition(action, OozieBuilderUtils.SUCCESS_POSTPROCESS_ACTION_NAME,
+ OozieBuilderUtils.FAIL_POSTPROCESS_ACTION_NAME);
+ workflow.getDecisionOrForkOrJoin().add(action);
+
+ //Add post-processing actions
+ ACTION success = OozieBuilderUtils.getSuccessPostProcessAction();
+ OozieBuilderUtils.addTransition(success, OozieBuilderUtils.OK_ACTION_NAME,
+ OozieBuilderUtils.FAIL_ACTION_NAME);
+ workflow.getDecisionOrForkOrJoin().add(success);
+
+ ACTION fail = OozieBuilderUtils.getFailPostProcessAction();
+ OozieBuilderUtils.addTransition(fail, OozieBuilderUtils.FAIL_ACTION_NAME,
+ OozieBuilderUtils.FAIL_ACTION_NAME);
+ workflow.getDecisionOrForkOrJoin().add(fail);
+ }
+ }
+
+}
+
+
diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java
index 7d51c9a71..5970c3cb8 100644
--- a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java
+++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java
@@ -79,6 +79,7 @@ public final class OozieBuilderUtils {
private static final Logger LOG = LoggerFactory.getLogger(OozieBuilderUtils.class);
private static final String POSTPROCESS_TEMPLATE = "/action/post-process.xml";
+ private static final String PREPROCESS_TEMPLATE = "/action/pre-process.xml";
public static final String HIVE_CREDENTIAL_NAME = "falconHiveAuth";
public static final String MR_QUEUE_NAME = "queueName";
@@ -97,6 +98,7 @@ public final class OozieBuilderUtils {
public static final String FAIL_POSTPROCESS_ACTION_NAME = "failed-post-processing";
public static final String OK_ACTION_NAME = "end";
public static final String FAIL_ACTION_NAME = "fail";
+ public static final String PREPROCESS_ACTION_NAME = "pre-processing";
public static final String ENTITY_PATH = "ENTITY_PATH";
@@ -557,4 +559,26 @@ public static CONFIGURATION getCoordinatorConfig(Properties props) {
}
return conf;
}
+
+ public static ACTION getPreProcessingAction(Tag tag) throws FalconException {
+ ACTION action = unmarshalAction(PREPROCESS_TEMPLATE);
+ decorateWithOozieRetries(action);
+
+ List args = action.getJava().getArg();
+ args.add("-out");
+ if (tag == Tag.ARCHIVAL) {
+ args.add("${logDir}/latedata/${nominalTime}/${srcClusterName}");
+ } else {
+ args.add("${logDir}/latedata/${nominalTime}");
+ }
+ return action;
+ }
+
+ public static String getFailAction(){
+ if (!Boolean.parseBoolean(OozieBuilderUtils.ENABLE_POSTPROCESSING)){
+ return FAIL_ACTION_NAME;
+ } else {
+ return FAIL_POSTPROCESS_ACTION_NAME;
+ }
+ }
}
diff --git a/lifecycle/src/main/resources/action/feed/archival-action.xml b/lifecycle/src/main/resources/action/feed/archival-action.xml
new file mode 100644
index 000000000..cca4517cb
--- /dev/null
+++ b/lifecycle/src/main/resources/action/feed/archival-action.xml
@@ -0,0 +1,64 @@
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+
+
+ oozie.launcher.mapreduce.job.user.classpath.first
+ true
+
+
+ mapred.job.queue.name
+ ${queueName}
+
+
+ oozie.launcher.mapred.job.priority
+ ${jobPriority}
+
+
+ oozie.action.sharelib.for.java
+ distcp
+
+
+ oozie.launcher.oozie.libpath
+ ${wf:conf("falcon.libpath")}
+
+
+ org.apache.falcon.replication.FeedReplicator
+ -Dfalcon.include.path=${sourceRelativePaths}
+ -Dmapred.job.queue.name=${queueName}
+ -Dmapred.job.priority=${jobPriority}
+ -maxMaps
+ ${maxMaps}
+ -mapBandwidth
+ ${mapBandwidth}
+ -sourcePaths
+ ${distcpSourcePaths}
+ -targetPath
+ ${distcpTargetPaths}
+ -falconFeedStorageType
+ ${falconFeedStorageType}
+ -availabilityFlag
+ ${availabilityFlag == 'NA' ? "NA" : availabilityFlag}
+
+
+
+
diff --git a/lifecycle/src/main/resources/action/post-process.xml b/lifecycle/src/main/resources/action/post-process.xml
new file mode 100644
index 000000000..d649a0f80
--- /dev/null
+++ b/lifecycle/src/main/resources/action/post-process.xml
@@ -0,0 +1,98 @@
+
+
+
+ ${jobTracker}
+ ${nameNode}
+
+
+ mapred.job.queue.name
+ ${queueName}
+
+
+ oozie.launcher.mapred.job.priority
+ ${jobPriority}
+
+
+ oozie.launcher.oozie.libpath
+ ${wf:conf("falcon.libpath")}
+
+
+ org.apache.falcon.workflow.FalconPostProcessing
+ -cluster
+ ${cluster}
+ -entityType
+ ${entityType}
+ -entityName
+ ${entityName}
+ -nominalTime
+ ${nominalTime}
+ -operation
+ ${falconDataOperation}
+ -workflowId
+ ${wf:id()}
+ -runId
+ ${wf:run()}
+ -status
+ ${wf:lastErrorNode() == null ? 'SUCCEEDED' : 'FAILED'}
+ -timeStamp
+ ${timeStamp}
+ -brokerImplClass
+ ${brokerImplClass}
+ -brokerUrl
+ ${brokerUrl}
+ -userBrokerImplClass
+ ${userBrokerImplClass}
+ -userBrokerUrl
+ ${userBrokerUrl}
+ -userJMSNotificationEnabled
+ ${userJMSNotificationEnabled}
+ -systemJMSNotificationEnabled
+ ${systemJMSNotificationEnabled}
+ -brokerTTL
+ ${brokerTTL}
+ -feedNames
+ ${feedNames}
+ -feedInstancePaths
+ ${feedInstancePaths}
+ -logFile
+ ${logDir}/job-${nominalTime}/${wf:run()}/evicted-instancePaths.csv
+ -workflowEngineUrl
+ ${workflowEngineUrl}
+ -subflowId
+ ${wf:id()}${userWorkflowEngine == "oozie" ? "@user-action" : ""}
+ -userWorkflowEngine
+ ${userWorkflowEngine}
+ -userWorkflowName
+ ${userWorkflowName}
+ -userWorkflowVersion
+ ${userWorkflowVersion}
+ -logDir
+ ${logDir}/job-${nominalTime}/${srcClusterName == 'NA' ? '' : srcClusterName}/
+ -workflowUser
+ ${wf:user()}
+ -falconInputFeeds
+ ${falconInputFeeds}
+ -falconInPaths
+ ${falconInPaths}
+ -datasource
+ ${datasource == 'NA' ? 'IGNORE' : datasource}
+
+
+
+
diff --git a/lifecycle/src/main/resources/action/pre-process.xml b/lifecycle/src/main/resources/action/pre-process.xml
new file mode 100644
index 000000000..fc4125c05
--- /dev/null
+++ b/lifecycle/src/main/resources/action/pre-process.xml
@@ -0,0 +1,54 @@
+
+
+
+ ${jobTracker}
+ ${nameNode}
+
+
+ mapred.job.queue.name
+ ${queueName}
+
+
+ oozie.launcher.mapred.job.priority
+ ${jobPriority}
+
+
+
+ oozie.action.sharelib.for.java
+ hcatalog
+
+
+ oozie.launcher.oozie.libpath
+ ${wf:conf("falcon.libpath")}
+
+
+ org.apache.falcon.workflow.LateDataHandler
+ -out
+ ${logDir}/latedata/${nominalTime}/${srcClusterName}
+ -paths
+ ${falconInPaths}
+ -falconInputNames
+ ${falconInputNames}
+ -falconInputFeedStorageTypes
+ ${falconInputFeedStorageTypes}
+
+
+
+
+
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
index c7584119b..e094e0970 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
@@ -58,7 +58,8 @@ protected List buildCoords(Cluster cluster, Path buildPath) throws F
props.add(appProps);
}
}
- } else {
+ }
+ if (FeedHelper.getRetentionStage(this.entity, cluster.getName()) == null){
List evictionProps =
OozieCoordinatorBuilder.get(entity, Tag.RETENTION).buildCoords(cluster, buildPath);
if (evictionProps != null) {
diff --git a/prism/src/test/resources/startup.properties b/prism/src/test/resources/startup.properties
index 63fcd3b70..d66d50534 100644
--- a/prism/src/test/resources/startup.properties
+++ b/prism/src/test/resources/startup.properties
@@ -59,9 +59,13 @@
# List of Lifecycle policies configured.
-*.falcon.feed.lifecycle.policies=org.apache.falcon.lifecycle.retention.AgeBasedDelete
+*.falcon.feed.lifecycle.policies=org.apache.falcon.lifecycle.retention.AgeBasedDelete,\
+ org.apache.falcon.lifecycle.archival.AgeBasedArchival
+
# List of builders for the policies.
-*.falcon.feed.lifecycle.policy.builders=org.apache.falcon.lifecycle.engine.oozie.retention.AgeBasedDeleteBuilder
+*.falcon.feed.lifecycle.policy.builders=org.apache.falcon.lifecycle.engine.oozie.retention.AgeBasedDeleteBuilder,\
+ org.apache.falcon.lifecycle.engine.oozie.archival.AgeBasedArchivalBuilder
+
##### Falcon Configuration Store Change listeners #####
*.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
org.apache.falcon.entity.ColoClusterRelation,\
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index fae937ac0..8bd2bbe31 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -104,9 +104,12 @@ prism.application.services=org.apache.falcon.service.LifecyclePolicyMap,\
# List of Lifecycle policies configured.
-*.falcon.feed.lifecycle.policies=org.apache.falcon.lifecycle.retention.AgeBasedDelete
+*.falcon.feed.lifecycle.policies=org.apache.falcon.lifecycle.retention.AgeBasedDelete,\
+ org.apache.falcon.lifecycle.archival.AgeBasedArchival
+
# List of builders for the policies.
-*.falcon.feed.lifecycle.policy.builders=org.apache.falcon.lifecycle.engine.oozie.retention.AgeBasedDeleteBuilder
+*.falcon.feed.lifecycle.policy.builders=org.apache.falcon.lifecycle.engine.oozie.retention.AgeBasedDeleteBuilder,\
+ org.apache.falcon.lifecycle.engine.oozie.archival.AgeBasedArchivalBuilder
##### Falcon Configuration Store Change listeners #####
*.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\