diff --git a/common-types/src/main/java/org/apache/falcon/LifeCycle.java b/common-types/src/main/java/org/apache/falcon/LifeCycle.java index 0ecddd1ea..766a9612a 100644 --- a/common-types/src/main/java/org/apache/falcon/LifeCycle.java +++ b/common-types/src/main/java/org/apache/falcon/LifeCycle.java @@ -27,7 +27,8 @@ public enum LifeCycle { EVICTION(Tag.RETENTION), REPLICATION(Tag.REPLICATION), IMPORT(Tag.IMPORT), - EXPORT(Tag.EXPORT); + EXPORT(Tag.EXPORT), + ARCHIVAL(Tag.ARCHIVAL); private final Tag tag; diff --git a/common-types/src/main/java/org/apache/falcon/Tag.java b/common-types/src/main/java/org/apache/falcon/Tag.java index 3c2f25e20..d2efaae9a 100644 --- a/common-types/src/main/java/org/apache/falcon/Tag.java +++ b/common-types/src/main/java/org/apache/falcon/Tag.java @@ -28,7 +28,8 @@ public enum Tag { RETENTION(EntityType.FEED), REPLICATION(EntityType.FEED), IMPORT(EntityType.FEED), - EXPORT(EntityType.FEED); + EXPORT(EntityType.FEED), + ARCHIVAL(EntityType.FEED); private final EntityType entityType; diff --git a/common-types/src/main/resources/feed-0.1.xsd b/common-types/src/main/resources/feed-0.1.xsd index cbc97b970..2be7cf22d 100644 --- a/common-types/src/main/resources/feed-0.1.xsd +++ b/common-types/src/main/resources/feed-0.1.xsd @@ -363,7 +363,8 @@ - + + @@ -575,4 +576,22 @@ + + + + + Archival stage is the new way to define archival for a feed using feed lifecycle feature. Archival + has a configurable policy which does the validation and the real execution through workflow engine. + This method of specifying archival gives you more control for archival rather than defining archival + source and target in replication lifecycle stage of feed. + + + + + + + + + + diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java index 757359f9c..44cc3a4c6 100644 --- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java +++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java @@ -27,22 +27,7 @@ import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.Frequency; import org.apache.falcon.entity.v0.datasource.DatasourceType; -import org.apache.falcon.entity.v0.feed.CatalogTable; -import org.apache.falcon.entity.v0.feed.Cluster; -import org.apache.falcon.entity.v0.feed.ClusterType; -import org.apache.falcon.entity.v0.feed.ExtractMethod; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.feed.FieldIncludeExclude; -import org.apache.falcon.entity.v0.feed.Lifecycle; -import org.apache.falcon.entity.v0.feed.Load; -import org.apache.falcon.entity.v0.feed.Location; -import org.apache.falcon.entity.v0.feed.LocationType; -import org.apache.falcon.entity.v0.feed.Locations; -import org.apache.falcon.entity.v0.feed.MergeType; -import org.apache.falcon.entity.v0.feed.Property; -import org.apache.falcon.entity.v0.feed.RetentionStage; -import org.apache.falcon.entity.v0.feed.Sla; -import org.apache.falcon.entity.v0.feed.Validity; +import org.apache.falcon.entity.v0.feed.*; import org.apache.falcon.entity.v0.process.Input; import org.apache.falcon.entity.v0.process.Output; import org.apache.falcon.entity.v0.process.Process; @@ -401,13 +386,27 @@ public static RetentionStage getRetentionStage(Feed feed, String clusterName) th if (clusterLifecycle != null && clusterLifecycle.getRetentionStage() != null) { return clusterLifecycle.getRetentionStage(); - } else if (globalLifecycle != null) { + } else if (globalLifecycle != null && globalLifecycle.getRetentionStage() != null) { return globalLifecycle.getRetentionStage(); } } return null; } + public static ArchivalStage getArchivalStage(Feed feed, String clusterName) throws FalconException { + if (isLifecycleEnabled(feed, clusterName)) { + Lifecycle globalLifecycle = feed.getLifecycle(); + Lifecycle clusterLifecycle = getCluster(feed, clusterName).getLifecycle(); + + if (clusterLifecycle != null && clusterLifecycle.getArchivalStage() != null) { + return clusterLifecycle.getArchivalStage(); + } else if (globalLifecycle != null && globalLifecycle.getArchivalStage() != null) { + return globalLifecycle.getArchivalStage(); + } + } + return null; + } + public static Date getFeedValidityStart(Feed feed, String clusterName) throws FalconException { org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, clusterName); if (feedCluster != null) { @@ -437,10 +436,20 @@ public static List getPolicies(Feed feed, String clusterName) throws Fal Cluster cluster = getCluster(feed, clusterName); if (cluster != null) { if (isLifecycleEnabled(feed, clusterName)) { - String policy = getRetentionStage(feed, clusterName).getPolicy(); - policy = StringUtils.isBlank(policy) - ? FeedLifecycleStage.RETENTION.getDefaultPolicyName() : policy; - result.add(policy); + String policy = ""; + if (getRetentionStage(feed, clusterName) != null) { + policy = getRetentionStage(feed, clusterName).getPolicy(); + policy = StringUtils.isBlank(policy) + ? FeedLifecycleStage.RETENTION.getDefaultPolicyName() : policy; + result.add(policy); + } + + if (getArchivalStage(feed, clusterName) != null) { + policy = getArchivalStage(feed, clusterName).getPolicy(); + policy = StringUtils.isBlank(policy) + ? FeedLifecycleStage.ARCHIVAL.getDefaultPolicyName() : policy; + result.add(policy); + } } return result; } @@ -1295,4 +1304,17 @@ public static List getListing(Feed feed, String clusterName, return storage.getListing(feed, clusterName, locationType, start, end); } + public static String getArchivalPath(Feed feed, String clusterName) throws FalconException { + String archivalPath = ""; + ArchivalStage archivalStage = getArchivalStage(feed, clusterName); + if (archivalStage != null) { + Location location = archivalStage.getLocation(); + if ((location != null) && (location.getPath() != null)) { + archivalPath = location.getPath(); + } else { + throw new FalconException("Location cannot be empty."); + } + } + return archivalPath; + } } diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java index 6b72174ac..381495c42 100644 --- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java +++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java @@ -33,18 +33,18 @@ import org.apache.falcon.entity.v0.EntityGraph; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.Frequency; -import org.apache.falcon.entity.v0.feed.ACL; -import org.apache.falcon.entity.v0.feed.Extract; -import org.apache.falcon.entity.v0.feed.ExtractMethod; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.feed.Cluster; -import org.apache.falcon.entity.v0.feed.ClusterType; -import org.apache.falcon.entity.v0.feed.Location; import org.apache.falcon.entity.v0.feed.LocationType; import org.apache.falcon.entity.v0.feed.MergeType; import org.apache.falcon.entity.v0.feed.Properties; import org.apache.falcon.entity.v0.feed.Property; import org.apache.falcon.entity.v0.feed.Sla; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.feed.Cluster; +import org.apache.falcon.entity.v0.feed.ClusterType; +import org.apache.falcon.entity.v0.feed.ACL; +import org.apache.falcon.entity.v0.feed.Location; +import org.apache.falcon.entity.v0.feed.Extract; +import org.apache.falcon.entity.v0.feed.ExtractMethod; import org.apache.falcon.entity.v0.process.Input; import org.apache.falcon.entity.v0.process.Output; import org.apache.falcon.entity.v0.process.Process; @@ -157,14 +157,30 @@ private void validateLifecycle(Feed feed) throws FalconException { LifecyclePolicyMap map = LifecyclePolicyMap.get(); for (Cluster cluster : feed.getClusters().getClusters()) { if (FeedHelper.isLifecycleEnabled(feed, cluster.getName())) { - if (FeedHelper.getRetentionStage(feed, cluster.getName()) == null) { - throw new ValidationException("Retention is a mandatory stage, didn't find it for cluster: " + if ((FeedHelper.getRetentionStage(feed, cluster.getName()) != null) || (FeedHelper.getArchivalStage(feed, cluster.getName()) != null)) { + validateRetentionStage(feed, cluster); + validateArchivalStage(feed, cluster); + for (String policyName : FeedHelper.getPolicies(feed, cluster.getName())) { + map.get(policyName).validate(feed, cluster.getName()); + } + } else { + throw new ValidationException("Atleast one of Retention/Archival is a mandatory stage, didn't find it for cluster: " + cluster.getName()); } - validateRetentionFrequency(feed, cluster.getName()); - for (String policyName : FeedHelper.getPolicies(feed, cluster.getName())) { - map.get(policyName).validate(feed, cluster.getName()); - } + } + } + } + + private void validateRetentionStage(Feed feed, Cluster cluster) throws FalconException { + if (FeedHelper.getRetentionStage(feed, cluster.getName()) != null) { + validateRetentionFrequency(feed, cluster.getName()); + } + } + + private void validateArchivalStage(Feed feed, Cluster cluster) throws FalconException { + if (FeedHelper.getArchivalStage(feed, cluster.getName()) != null) { + if (FeedHelper.getArchivalStage(feed, cluster.getName()).getLocation().getPath().isEmpty()) { + throw new ValidationException("Location path cannot be empty."); } } } diff --git a/common/src/main/java/org/apache/falcon/lifecycle/FeedLifecycleStage.java b/common/src/main/java/org/apache/falcon/lifecycle/FeedLifecycleStage.java index 833ad0488..5b2775688 100644 --- a/common/src/main/java/org/apache/falcon/lifecycle/FeedLifecycleStage.java +++ b/common/src/main/java/org/apache/falcon/lifecycle/FeedLifecycleStage.java @@ -22,7 +22,8 @@ */ public enum FeedLifecycleStage { - RETENTION("AgeBasedDelete"); + RETENTION("AgeBasedDelete"), + ARCHIVAL("AgeBasedArchival"); private String defaultPolicyName; diff --git a/common/src/main/java/org/apache/falcon/lifecycle/archival/AgeBasedArchival.java b/common/src/main/java/org/apache/falcon/lifecycle/archival/AgeBasedArchival.java new file mode 100644 index 000000000..f14a2a0b1 --- /dev/null +++ b/common/src/main/java/org/apache/falcon/lifecycle/archival/AgeBasedArchival.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.falcon.lifecycle.archival; + +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.v0.feed.Feed; + +/** + * Archival policy which archives all instances of instance time depending on the given frequency. + * It will create the workflow and coordinators for this policy. + */ +public class AgeBasedArchival extends ArchivalPolicy { + + @Override + public void validate(Feed feed, String clusterName) throws FalconException { + + } +} diff --git a/common/src/main/java/org/apache/falcon/lifecycle/archival/ArchivalPolicy.java b/common/src/main/java/org/apache/falcon/lifecycle/archival/ArchivalPolicy.java new file mode 100644 index 000000000..b1c7df5e7 --- /dev/null +++ b/common/src/main/java/org/apache/falcon/lifecycle/archival/ArchivalPolicy.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.lifecycle.archival; + +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.lifecycle.AbstractPolicyBuilderFactory; +import org.apache.falcon.lifecycle.FeedLifecycleStage; +import org.apache.falcon.lifecycle.LifecyclePolicy; +import org.apache.falcon.lifecycle.PolicyBuilder; +import org.apache.falcon.workflow.WorkflowEngineFactory; +import org.apache.hadoop.fs.Path; + +import java.util.Properties; + +/** + * All archival policies must implement this interface. + */ +public abstract class ArchivalPolicy implements LifecyclePolicy { + + @Override + public String getName() { + return this.getClass().getSimpleName(); + } + + @Override + public FeedLifecycleStage getStage() { + return FeedLifecycleStage.ARCHIVAL; + } + + @Override + public Properties build(Cluster cluster, Path buildPath, Feed feed) throws FalconException { + AbstractPolicyBuilderFactory factory = WorkflowEngineFactory.getLifecycleEngine(); + PolicyBuilder builder = factory.getPolicyBuilder(getName()); + return builder.build(cluster, buildPath, feed); + } +} diff --git a/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java b/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java index 8d735f97a..30f2f613a 100644 --- a/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java +++ b/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java @@ -46,7 +46,7 @@ public void validate(Feed feed, String clusterName) throws FalconException { // validate that it is a valid cluster Cluster cluster = FeedHelper.getCluster(feed, clusterName); Frequency retentionLimit = getRetentionLimit(feed, clusterName); - if (cluster != null) { + if (cluster != null && retentionLimit != null) { validateLimitWithSla(feed, cluster, retentionLimit.toString()); validateLimitWithLateData(feed, cluster, retentionLimit.toString()); String lifecycleEngine = StartupProperties.get().getProperty("lifecycle.engine.impl", @@ -122,9 +122,8 @@ public Frequency getRetentionLimit(Feed feed, String clusterName) throws FalconE throw new FalconException("Invalid value for property: " + LIMIT_PROPERTY_NAME + ", should be a valid " + "frequency e.g. hours(2)", e); } - } else { - throw new FalconException("Cluster " + clusterName + " doesn't contain retention stage"); } + return null; } } diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java index d8040f08d..30731ec3d 100644 --- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java +++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java @@ -75,7 +75,7 @@ public enum Type {PRE_PROCESSING, POST_PROCESSING, WORKFLOW_JOB, COORDINATOR_ACT * Entity operations supported. */ public enum EntityOperations { - GENERATE, DELETE, REPLICATE, IMPORT, EXPORT + GENERATE, DELETE, REPLICATE, IMPORT, EXPORT, ARCHIVE } public static final WorkflowExecutionArgs[] USER_MESSAGE_ARGS = { diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties index f89990577..7878403ea 100644 --- a/common/src/main/resources/startup.properties +++ b/common/src/main/resources/startup.properties @@ -69,9 +69,12 @@ # List of Lifecycle policies configured. -*.falcon.feed.lifecycle.policies=org.apache.falcon.lifecycle.retention.AgeBasedDelete +*.falcon.feed.lifecycle.policies=org.apache.falcon.lifecycle.retention.AgeBasedDelete,\ + org.apache.falcon.lifecycle.archival.AgeBasedArchival + # List of builders for the policies. -*.falcon.feed.lifecycle.policy.builders=org.apache.falcon.lifecycle.engine.oozie.retention.AgeBasedDeleteBuilder +*.falcon.feed.lifecycle.policy.builders=org.apache.falcon.lifecycle.engine.oozie.retention.AgeBasedDeleteBuilder,\ + org.apache.falcon.lifecycle.engine.oozie.archival.AgeBasedArchivalBuilder ##### Falcon Configuration Store Change listeners ##### *.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ org.apache.falcon.entity.ColoClusterRelation,\ diff --git a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java index 450b25198..ffbace661 100644 --- a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java +++ b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java @@ -28,8 +28,8 @@ import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.cluster.Properties; import org.apache.falcon.entity.v0.cluster.Property; -import org.apache.falcon.entity.v0.feed.Argument; import org.apache.falcon.entity.v0.feed.Arguments; +import org.apache.falcon.entity.v0.feed.Argument; import org.apache.falcon.entity.v0.feed.ClusterType; import org.apache.falcon.entity.v0.feed.Clusters; import org.apache.falcon.entity.v0.feed.Extract; @@ -38,12 +38,13 @@ import org.apache.falcon.entity.v0.feed.FieldIncludeExclude; import org.apache.falcon.entity.v0.feed.FieldsType; import org.apache.falcon.entity.v0.feed.Import; -import org.apache.falcon.entity.v0.feed.Lifecycle; +import org.apache.falcon.entity.v0.feed.Locations; import org.apache.falcon.entity.v0.feed.Location; import org.apache.falcon.entity.v0.feed.LocationType; -import org.apache.falcon.entity.v0.feed.Locations; +import org.apache.falcon.entity.v0.feed.Lifecycle; import org.apache.falcon.entity.v0.feed.MergeType; import org.apache.falcon.entity.v0.feed.RetentionStage; +import org.apache.falcon.entity.v0.feed.ArchivalStage; import org.apache.falcon.entity.v0.feed.Datasource; import org.apache.falcon.entity.v0.feed.Validity; import org.apache.falcon.entity.v0.process.Input; @@ -266,8 +267,14 @@ public void testGetPolicies() throws Exception { .getParser(EntityType.FEED); Feed feed = parser.parse(this.getClass().getResourceAsStream(FEED3_XML)); List policies = FeedHelper.getPolicies(feed, "testCluster"); - Assert.assertEquals(policies.size(), 1); + Assert.assertEquals(policies.size(), 2); Assert.assertEquals(policies.get(0), "AgeBasedDelete"); + Assert.assertEquals(policies.get(1), "AgeBasedArchival"); + + feed.getLifecycle().setRetentionStage(null); + policies = FeedHelper.getPolicies(feed, "backupCluster"); + Assert.assertEquals(policies.size(), 1); + Assert.assertEquals(policies.get(0), "AgeBasedArchival"); } @Test @@ -903,6 +910,180 @@ public void testGetRetentionFrequency() throws Exception { new Frequency("hours(4)")); } + @Test + public void testGetArchivalStage() throws Exception { + Feed feed = new Feed(); + feed.setFrequency(new Frequency("hours(1)")); + + // archival stage location is not defined + Lifecycle globalLifecycle = new Lifecycle(); + ArchivalStage globalArchivalStage = new ArchivalStage(); + globalLifecycle.setArchivalStage(globalArchivalStage); + feed.setLifecycle(globalLifecycle); + + Clusters clusters = new Clusters(); + org.apache.falcon.entity.v0.feed.Cluster cluster = new org.apache.falcon.entity.v0.feed.Cluster(); + cluster.setName("cluster1"); + clusters.getClusters().add(cluster); + feed.setClusters(clusters); + + // lifecycle is defined only at global level + Location globalLocation = new Location(); + globalLocation.setType(LocationType.DATA); + globalLocation.setPath("s4://globalPath/archival/"); + globalArchivalStage.setLocation(globalLocation); + globalLifecycle.setArchivalStage(globalArchivalStage); + feed.setLifecycle(globalLifecycle); + Assert.assertNotNull(FeedHelper.getArchivalStage(feed, cluster.getName())); + Assert.assertEquals(FeedHelper.getArchivalPath(feed, cluster.getName()), + feed.getLifecycle().getArchivalStage().getLocation().getPath()); + + // lifecycle is defined at both global and cluster level + Lifecycle clusterLifecycle = new Lifecycle(); + ArchivalStage clusterArchivalStage = new ArchivalStage(); + Location clusterLocation = new Location(); + clusterLocation.setType(LocationType.DATA); + clusterLocation.setPath("s4://clusterPath/archival/"); + clusterArchivalStage.setLocation(clusterLocation); + clusterLifecycle.setArchivalStage(clusterArchivalStage); + feed.getClusters().getClusters().get(0).setLifecycle(clusterLifecycle); + Assert.assertNotNull(FeedHelper.getArchivalStage(feed, cluster.getName())); + Assert.assertEquals(FeedHelper.getArchivalPath(feed, cluster.getName()), + cluster.getLifecycle().getArchivalStage().getLocation().getPath()); + + // lifecycle at both level - archival only at cluster level. + feed.getLifecycle().setArchivalStage(null); + Assert.assertNotNull(FeedHelper.getArchivalStage(feed, cluster.getName())); + Assert.assertEquals(FeedHelper.getArchivalPath(feed, cluster.getName()), + cluster.getLifecycle().getArchivalStage().getLocation().getPath()); + + // lifecycle at both level - archival only at global level. + feed.getLifecycle().setArchivalStage(globalArchivalStage); + feed.getClusters().getClusters().get(0).getLifecycle().setArchivalStage(null); + Assert.assertNotNull(FeedHelper.getArchivalStage(feed, cluster.getName())); + Assert.assertEquals(FeedHelper.getArchivalPath(feed, cluster.getName()), + feed.getLifecycle().getArchivalStage().getLocation().getPath()); + + // lifecycle is defined only at cluster level + feed.setLifecycle(null); + feed.getClusters().getClusters().get(0).getLifecycle().setArchivalStage(clusterArchivalStage); + Assert.assertNotNull(FeedHelper.getArchivalStage(feed, cluster.getName())); + Assert.assertEquals(FeedHelper.getArchivalPath(feed, cluster.getName()), + cluster.getLifecycle().getArchivalStage().getLocation().getPath()); + } + + @Test + public void testLifecycleStage() throws Exception { + Feed feed = new Feed(); + feed.setFrequency(new Frequency("days(1)")); + + Clusters clusters = new Clusters(); + org.apache.falcon.entity.v0.feed.Cluster cluster = new org.apache.falcon.entity.v0.feed.Cluster(); + cluster.setName("cluster1"); + clusters.getClusters().add(cluster); + feed.setClusters(clusters); + + // retention stage frequency is not defined + // archival stage location is not defined + Lifecycle globalLifecycle = new Lifecycle(); + + ArchivalStage globalArchivalStage = new ArchivalStage(); + globalLifecycle.setArchivalStage(globalArchivalStage); + RetentionStage globalRetentionStage = new RetentionStage(); + globalLifecycle.setRetentionStage(globalRetentionStage); + feed.setLifecycle(globalLifecycle); + + // lifecycle is defined only at global level + Location globalLocation = new Location(); + globalLocation.setType(LocationType.DATA); + globalLocation.setPath("s4://globalPath/archival/"); + globalArchivalStage.setLocation(globalLocation); + globalLifecycle.setArchivalStage(globalArchivalStage); + globalRetentionStage.setFrequency(new Frequency("hours(2)")); + globalLifecycle.setRetentionStage(globalRetentionStage); + feed.setLifecycle(globalLifecycle); + Assert.assertNotNull(FeedHelper.getArchivalStage(feed, cluster.getName())); + Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName())); + Assert.assertEquals(FeedHelper.getArchivalPath(feed, cluster.getName()), + feed.getLifecycle().getArchivalStage().getLocation().getPath()); + Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()), + feed.getLifecycle().getRetentionStage().getFrequency()); + + // lifecycle is defined at both global and cluster level + Lifecycle clusterLifecycle = new Lifecycle(); + ArchivalStage clusterArchivalStage = new ArchivalStage(); + Location clusterLocation = new Location(); + clusterLocation.setType(LocationType.DATA); + clusterLocation.setPath("s4://clusterPath/archival/"); + clusterArchivalStage.setLocation(clusterLocation); + clusterLifecycle.setArchivalStage(clusterArchivalStage); + RetentionStage clusterRetentionStage = new RetentionStage(); + clusterRetentionStage.setFrequency(new Frequency("hours(4)")); + clusterLifecycle.setRetentionStage(clusterRetentionStage); + feed.getClusters().getClusters().get(0).setLifecycle(clusterLifecycle); + Assert.assertNotNull(FeedHelper.getArchivalStage(feed, cluster.getName())); + Assert.assertEquals(FeedHelper.getArchivalPath(feed, cluster.getName()), + cluster.getLifecycle().getArchivalStage().getLocation().getPath()); + Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName())); + Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()), + cluster.getLifecycle().getRetentionStage().getFrequency()); + + // lifecycle at both level - retention/archival only at cluster level. + feed.getLifecycle().setArchivalStage(null); + feed.getLifecycle().setRetentionStage(null); + Assert.assertNotNull(FeedHelper.getArchivalStage(feed, cluster.getName())); + Assert.assertEquals(FeedHelper.getArchivalPath(feed, cluster.getName()), + cluster.getLifecycle().getArchivalStage().getLocation().getPath()); + Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName())); + Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()), + cluster.getLifecycle().getRetentionStage().getFrequency()); + + // lifecycle at both level - retention/archival only at global level. + feed.getLifecycle().setArchivalStage(globalArchivalStage); + feed.getLifecycle().setRetentionStage(globalRetentionStage); + feed.getClusters().getClusters().get(0).getLifecycle().setArchivalStage(null); + feed.getClusters().getClusters().get(0).getLifecycle().setRetentionStage(null); + Assert.assertNotNull(FeedHelper.getArchivalStage(feed, cluster.getName())); + Assert.assertEquals(FeedHelper.getArchivalPath(feed, cluster.getName()), + feed.getLifecycle().getArchivalStage().getLocation().getPath()); + Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName())); + Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()), + feed.getLifecycle().getRetentionStage().getFrequency()); + + // lifecycle at both level- retention at cluster level, archival at global level + feed.getClusters().getClusters().get(0).getLifecycle().setRetentionStage(clusterRetentionStage); + feed.getLifecycle().setRetentionStage(null); + Assert.assertNotNull(FeedHelper.getArchivalStage(feed, cluster.getName())); + Assert.assertEquals(FeedHelper.getArchivalPath(feed, cluster.getName()), + feed.getLifecycle().getArchivalStage().getLocation().getPath()); + Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName())); + Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()), + cluster.getLifecycle().getRetentionStage().getFrequency()); + + // lifecycle at both level- retention at global level, archival at cluster level + feed.getClusters().getClusters().get(0).getLifecycle().setArchivalStage(clusterArchivalStage); + feed.getClusters().getClusters().get(0).getLifecycle().setRetentionStage(null); + feed.getLifecycle().setRetentionStage(globalRetentionStage); + feed.getLifecycle().setArchivalStage(null); + Assert.assertNotNull(FeedHelper.getArchivalStage(feed, cluster.getName())); + Assert.assertEquals(FeedHelper.getArchivalPath(feed, cluster.getName()), + cluster.getLifecycle().getArchivalStage().getLocation().getPath()); + Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName())); + Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()), + feed.getLifecycle().getRetentionStage().getFrequency()); + + // lifecycle is defined only at cluster level + feed.setLifecycle(null); + feed.getClusters().getClusters().get(0).getLifecycle().setArchivalStage(clusterArchivalStage); + feed.getClusters().getClusters().get(0).getLifecycle().setRetentionStage(clusterRetentionStage); + Assert.assertNotNull(FeedHelper.getArchivalStage(feed, cluster.getName())); + Assert.assertEquals(FeedHelper.getArchivalPath(feed, cluster.getName()), + cluster.getLifecycle().getArchivalStage().getLocation().getPath()); + Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName())); + Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()), + cluster.getLifecycle().getRetentionStage().getFrequency()); + } + @Test public void testFeedImportSnapshot() throws Exception { Cluster cluster = publishCluster(); diff --git a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java index ced4fc502..559b7116c 100644 --- a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java +++ b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java @@ -187,14 +187,20 @@ public void testLifecycleParse() throws Exception { assertEquals("AgeBasedDelete", FeedHelper.getPolicies(feed, "testCluster").get(0)); assertEquals("reports", feed.getLifecycle().getRetentionStage().getQueue()); assertEquals("NORMAL", feed.getLifecycle().getRetentionStage().getPriority()); + assertEquals("default", feed.getLifecycle().getArchivalStage().getQueue()); + assertEquals("s4://prod/falcon/${YEAR}-${MONTH}-${DAY}-${HOUR}/", feed.getLifecycle().getArchivalStage().getLocation().getPath()); + assertEquals("AgeBasedArchival", FeedHelper.getPolicies(feed, "testCluster").get(1)); + assertEquals("s4://prodnew/falcon/${YEAR}-${MONTH}-${DAY}-${HOUR}/", feed.getClusters().getClusters().get(0).getLifecycle().getArchivalStage().getLocation().getPath()); } @Test(expectedExceptions = ValidationException.class, - expectedExceptionsMessageRegExp = ".*Retention is a mandatory stage.*") - public void testMandatoryRetention() throws Exception { + expectedExceptionsMessageRegExp = ".*Atleast one of Retention/Archival is a mandatory stage, didn't find it for cluster.*") + public void testLifecycleStage() throws Exception { Feed feed = parser.parseAndValidate(this.getClass() .getResourceAsStream(FEED3_XML)); feed.getLifecycle().setRetentionStage(null); + feed.getLifecycle().setArchivalStage(null); + assertEquals("AgeBasedDelete", FeedHelper.getPolicies(feed, "testCluster").get(0)); parser.validate(feed); } @@ -253,6 +259,15 @@ public void testRetentionFrequency() throws Exception { parser.validate(feed); } + @Test(expectedExceptions = ValidationException.class, + expectedExceptionsMessageRegExp = ".*Location path cannot be empty.*") + public void testArchivalPath() throws Exception { + Feed feed = parser.parseAndValidate(this.getClass() + .getResourceAsStream(FEED3_XML)); + feed.getLifecycle().getArchivalStage().getLocation().setPath(""); + parser.validate(feed); + } + @Test(expectedExceptions = ValidationException.class) public void applyValidationInvalidFeed() throws Exception { Feed feed = parser.parseAndValidate(ProcessEntityParserTest.class diff --git a/common/src/test/resources/config/feed/feed-0.3.xml b/common/src/test/resources/config/feed/feed-0.3.xml index e6d3e01ac..ccfd0dad1 100644 --- a/common/src/test/resources/config/feed/feed-0.3.xml +++ b/common/src/test/resources/config/feed/feed-0.3.xml @@ -53,6 +53,11 @@ + + false + default + + @@ -79,5 +84,10 @@ + + false + default + + diff --git a/docs/src/site/twiki/EntitySpecification.twiki b/docs/src/site/twiki/EntitySpecification.twiki index 11d1e1b0b..332df28d7 100644 --- a/docs/src/site/twiki/EntitySpecification.twiki +++ b/docs/src/site/twiki/EntitySpecification.twiki @@ -439,13 +439,16 @@ replication. "preserveReplicationNumber" represents preserving replication numbe + + false + default + + -lifecycle tag is the new way to define various stages of a feed's lifecycle. In the example above we have defined a -retention-stage using lifecycle tag. You may define lifecycle at global level or a cluster level or both. Cluster level -configuration takes precedence and falcon falls back to global definition if cluster level specification is missing. +lifecycle tag is the new way to define various stages of a feed's lifecycle. In the example above we have defined retention-stage and archival-stage using lifecycle tag. Atleast one of the satges should be defined if lifecycle tag is used. You may define lifecycle at global level or a cluster level or both. Cluster level configuration takes precedence and falcon falls back to global definition if cluster level specification is missing. ----++++ Retention Stage @@ -468,6 +471,13 @@ wastage of compute resources. In future, we will allow more customisation like customising how to choose instances to be deleted through this method. +----++++ Archival Stage +Lifecycle archival stage allows users to archive data instead of creating dummy replication feed for archival. This is +done through tag in tag. + +In this new method of defining archival you can specify the path at which data is to be archived and the queue for +archival jobs. The behavior of archival-stage is to archive all the data corresponding to the given instance-time. +Tag "location" is a mandatory property and must contain a valid path e.g. "s4://prod/falcon/${YEAR}-${MONTH}-${DAY}" ---++ Process Specification diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/archival/AgeBasedArchivalBuilder.java b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/archival/AgeBasedArchivalBuilder.java new file mode 100644 index 000000000..0bab471af --- /dev/null +++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/archival/AgeBasedArchivalBuilder.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.lifecycle.engine.oozie.archival; + +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.lifecycle.PolicyBuilder; +import org.apache.falcon.lifecycle.archival.AgeBasedArchival; +import org.apache.hadoop.fs.Path; + +import java.util.Properties; + +public class AgeBasedArchivalBuilder implements PolicyBuilder { + + private static final String NAME = new AgeBasedArchival().getName(); + + @Override + public Properties build(Cluster cluster, Path buildPath, Feed feed) throws FalconException { + Properties wfProps = buildWorkflow(cluster, buildPath, feed); + return buildCoordinator(cluster, buildPath, feed, wfProps); + } + + @Override + public String getPolicyName() { + return NAME; + } + + public Properties buildCoordinator(Cluster cluster, Path buildPath, Feed feed, Properties wfProps) + throws FalconException { + return AgeBasedArchivalCoordinatorBuilder.build(cluster, buildPath, feed, wfProps); + } + + public Properties buildWorkflow(Cluster cluster, Path buildPath, Feed feed) throws FalconException { + return AgeBasedArchivalWorkflowBuilder.build(cluster, buildPath, feed); + } +} diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/archival/AgeBasedArchivalCoordinatorBuilder.java b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/archival/AgeBasedArchivalCoordinatorBuilder.java new file mode 100644 index 000000000..7ffe5baf7 --- /dev/null +++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/archival/AgeBasedArchivalCoordinatorBuilder.java @@ -0,0 +1,347 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.lifecycle.engine.oozie.archival; + +import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.LifeCycle; +import org.apache.falcon.Tag; +import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.FeedHelper; +import org.apache.falcon.entity.Storage; +import org.apache.falcon.entity.v0.Frequency; +import org.apache.falcon.entity.v0.SchemaHelper; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.feed.LocationType; +import org.apache.falcon.entity.v0.process.ExecutionType; +import org.apache.falcon.expression.ExpressionHelper; +import org.apache.falcon.lifecycle.engine.oozie.utils.OozieBuilderUtils; +import org.apache.falcon.oozie.coordinator.SYNCDATASET; +import org.apache.falcon.oozie.coordinator.DATAOUT; +import org.apache.falcon.oozie.coordinator.DATAIN; +import org.apache.falcon.oozie.coordinator.COORDINATORAPP; +import org.apache.falcon.oozie.coordinator.ACTION; +import org.apache.falcon.oozie.coordinator.WORKFLOW; +import org.apache.falcon.oozie.coordinator.DATASETS; +import org.apache.falcon.oozie.coordinator.CONTROLS; +import org.apache.falcon.oozie.coordinator.INPUTEVENTS; +import org.apache.falcon.oozie.coordinator.OUTPUTEVENTS; +import org.apache.falcon.workflow.WorkflowExecutionArgs; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +public class AgeBasedArchivalCoordinatorBuilder { + + private AgeBasedArchivalCoordinatorBuilder() { + + } + + private static final Logger LOG = LoggerFactory.getLogger(AgeBasedArchivalCoordinatorBuilder.class); + private static final int THIRTY_MINUTES = 30 * 60 * 1000; + private static final String PARALLEL = "parallel"; + private static final String TIMEOUT = "timeout"; + private static final String ORDER = "order"; + public static final String IN_DATASET_NAME = "input-dataset"; + public static final String OUT_DATASET_NAME = "output-dataset"; + public static final String DATAIN_NAME = "input"; + public static final String DATAOUT_NAME = "output"; + + /** + * Builds the coordinator app. + * @param cluster - cluster to schedule retention on. + * @param basePath - Base path to marshal coordinator app. + * @param feed - feed for which retention is to be scheduled. + * @param wfProp - properties passed from workflow to coordinator e.g. ENTITY_PATH + * @return - Properties from creating the coordinator application to be used by Bundle. + * @throws FalconException + */ + public static Properties build(Cluster cluster, Path basePath, Feed feed, Properties wfProp) + throws FalconException { + + org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName()); + + // workflow is serialized to a specific dir + Path coordPath = new Path(basePath, Tag.ARCHIVAL.name() + "/" + feedCluster.getName()); + + COORDINATORAPP coord = new COORDINATORAPP(); + + String coordName = EntityUtil.getWorkflowName(LifeCycle.ARCHIVAL.getTag(), feed).toString(); + + long replicationDelayInMillis = getReplicationDelayInMillis(feed, cluster); + Date sourceStartDate = getStartDate(feed, cluster, replicationDelayInMillis); + Date sourceEndDate = getEndDate(feed, cluster); + + coord.setName(coordName); + coord.setFrequency("${coord:" + feed.getFrequency().toString() + "}"); + coord.setEnd(SchemaHelper.formatDateUTC(sourceEndDate)); + coord.setStart(SchemaHelper.formatDateUTC(sourceStartDate)); + coord.setTimezone(feed.getTimezone().getID()); + + if (replicationDelayInMillis > 0) { + long delayInMins = -1 * replicationDelayInMillis / (1000 * 60); + String elExp = "${now(0," + delayInMins + ")}"; + + initializeInputOutputPath(coord, cluster, feed, elExp); + } + + setCoordControls(feed, coord); + + final Storage sourceStorage = FeedHelper.createReadOnlyStorage(cluster, feed); + initializeInputDataSet(feed, cluster, coord, sourceStorage); + + String archivalPath = FeedHelper.getArchivalPath(feed, cluster.getName()); + initializeOutputDataSet(feed, cluster, coord, archivalPath); + + ACTION replicationWorkflowAction = getReplicationWorkflowAction(feed, cluster, coordPath, coordName, sourceStorage); + coord.setAction(replicationWorkflowAction); + + Path marshalPath = OozieBuilderUtils.marshalCoordinator(cluster, coord, coordPath); + wfProp.putAll(OozieBuilderUtils.getProperties(marshalPath, coordName)); + + return wfProp; + + } + + private static void initializeInputOutputPath(COORDINATORAPP coord, Cluster cluster, Feed feed, String elExp) + throws FalconException { + + if (coord.getInputEvents() == null) { + coord.setInputEvents(new INPUTEVENTS()); + } + + if (coord.getOutputEvents() == null) { + coord.setOutputEvents(new OUTPUTEVENTS()); + } + + DATAIN datain = createDataIn(feed, cluster); + coord.getInputEvents().getDataIn().add(datain); + + DATAOUT dataout = createDataOut(feed, cluster); + coord.getOutputEvents().getDataOut().add(dataout); + + coord.getInputEvents().getDataIn().get(0).getInstance().set(0, elExp); + coord.getOutputEvents().getDataOut().get(0).setInstance(elExp); + } + + + private static DATAIN createDataIn(Feed feed, Cluster cluster) { + DATAIN datain = new DATAIN(); + datain.setName(DATAIN_NAME); + datain.setDataset(IN_DATASET_NAME); + org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName()); + datain.getInstance().add(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart())); + return datain; + } + + private static DATAOUT createDataOut(Feed feed, Cluster cluster) { + DATAOUT dataout = new DATAOUT(); + dataout.setName(DATAOUT_NAME); + dataout.setDataset(OUT_DATASET_NAME); + org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName()); + dataout.setInstance("${coord:current(0)}"); + return dataout; + } + + private static ACTION getReplicationWorkflowAction(Feed feed, Cluster cluster, Path buildPath, String coordName, + Storage sourceStorage) throws FalconException { + ACTION action = new ACTION(); + WORKFLOW workflow = new WORKFLOW(); + + workflow.setAppPath(OozieBuilderUtils.getStoragePath(String.valueOf(buildPath))); + Properties props = OozieBuilderUtils.createCoordDefaultConfiguration(coordName, feed); + + // Setting CLUSTER_NAME property to include source cluster + props.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), cluster.getName()); + props.put("srcClusterName", cluster.getName()); + props.put("srcClusterColo", cluster.getColo()); + + // the storage type is uniform across source and target feeds for replication + props.put("falconFeedStorageType", sourceStorage.getType().name()); + + String instancePaths = ""; + if (sourceStorage.getType() == Storage.TYPE.FILESYSTEM) { + String pathsWithPartitions = getPathsWithPartitions(feed, cluster); + instancePaths = pathsWithPartitions; + propagateFileSystemCopyProperties(pathsWithPartitions, props); + } + + propagateLateDataProperties(feed, instancePaths, sourceStorage.getType().name(), props); + // Add the custom properties set in feed. Else, dryrun won't catch any missing props. + props.putAll(EntityUtil.getEntityProperties(feed)); + workflow.setConfiguration(OozieBuilderUtils.getCoordinatorConfig(props)); + action.setWorkflow(workflow); + + return action; + } + + private static String getPathsWithPartitions(Feed feed, Cluster cluster) throws FalconException { + String srcPart = FeedHelper.normalizePartitionExpression( + FeedHelper.getCluster(feed, cluster.getName()).getPartition()); + srcPart = FeedHelper.evaluateClusterExp(cluster, srcPart); + + StringBuilder pathsWithPartitions = new StringBuilder(); + pathsWithPartitions.append("${coord:dataIn('input')}/") + .append(FeedHelper.normalizePartitionExpression(srcPart)); + + String parts = pathsWithPartitions.toString().replaceAll("//+", "/"); + parts = StringUtils.stripEnd(parts, "/"); + return parts; + } + + private static void propagateLateDataProperties(Feed feed, String instancePaths, + String falconFeedStorageType, Properties props) { + // todo these pairs are the same but used in different context + // late data handler - should-record action + props.put(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), feed.getName()); + props.put(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), instancePaths); + props.put(WorkflowExecutionArgs.INPUT_NAMES.getName(), feed.getName()); + + // storage type for each corresponding feed - in this case only one feed is involved + // needed to compute usage based on storage type in LateDataHandler + props.put(WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName(), falconFeedStorageType); + + // falcon post processing + props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), feed.getName()); + props.put(WorkflowExecutionArgs.OUTPUT_NAMES.getName(), feed.getName()); + props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), "${coord:dataOut('output')}"); + } + + private static void propagateFileSystemCopyProperties(String paths, Properties props) throws FalconException { + props.put("sourceRelativePaths", paths); + + props.put("distcpSourcePaths", "${coord:dataIn('input')}"); + props.put("distcpTargetPaths", "${coord:dataOut('output')}"); + } + + private static void setCoordControls(Feed feed, COORDINATORAPP coord) throws FalconException { + // set controls + CONTROLS controls = new CONTROLS(); + + long frequencyInMillis = ExpressionHelper.get().evaluate(feed.getFrequency().toString(), Long.class); + long timeoutInMillis = frequencyInMillis * 6; + if (timeoutInMillis < THIRTY_MINUTES) { + timeoutInMillis = THIRTY_MINUTES; + } + + Properties props = EntityUtil.getEntityProperties(feed); + String timeout = props.getProperty(TIMEOUT); + if (timeout!=null) { + try{ + timeoutInMillis= ExpressionHelper.get().evaluate(timeout, Long.class); + } catch (Exception ignore) { + LOG.error("Unable to evaluate timeout:", ignore); + } + } + + String parallelProp = props.getProperty(PARALLEL); + int parallel = 1; + if (parallelProp != null) { + try { + parallel = Integer.parseInt(parallelProp); + } catch (NumberFormatException ignore) { + LOG.error("Unable to parse parallel:", ignore); + } + } + + String orderProp = props.getProperty(ORDER); + ExecutionType order = ExecutionType.FIFO; + if (orderProp != null) { + try { + order = ExecutionType.fromValue(orderProp); + } catch (IllegalArgumentException ignore) { + LOG.error("Unable to parse order:", ignore); + } + } + + controls.setTimeout(String.valueOf(timeoutInMillis / (1000 * 60))); + controls.setThrottle(String.valueOf(timeoutInMillis / frequencyInMillis * 2)); + controls.setConcurrency(String.valueOf(parallel)); + controls.setExecution(order.name()); + coord.setControls(controls); + } + + private static void initializeInputDataSet(Feed feed, Cluster cluster, COORDINATORAPP coord, Storage storage) throws FalconException { + if (coord.getDatasets() == null) { + coord.setDatasets(new DATASETS()); + } + + SYNCDATASET inputDataset = new SYNCDATASET(); + inputDataset.setName(IN_DATASET_NAME); + + String uriTemplate = storage.getUriTemplate(LocationType.DATA); + inputDataset.setUriTemplate(uriTemplate); + + setDatasetValues(feed, inputDataset, cluster); + + if (feed.getAvailabilityFlag() == null) { + inputDataset.setDoneFlag(""); + } else { + inputDataset.setDoneFlag(feed.getAvailabilityFlag()); + } + + coord.getDatasets().getDatasetOrAsyncDataset().add(inputDataset); + } + + private static void initializeOutputDataSet(Feed feed, Cluster cluster, COORDINATORAPP coord, + String targetPath) throws FalconException { + if (coord.getDatasets() == null) { + coord.setDatasets(new DATASETS()); + } + + SYNCDATASET outputDataset = new SYNCDATASET(); + outputDataset.setName(OUT_DATASET_NAME); + outputDataset.setUriTemplate(targetPath); + + setDatasetValues(feed, outputDataset, cluster); + coord.getDatasets().getDatasetOrAsyncDataset().add(outputDataset); + } + + + private static void setDatasetValues(Feed feed, SYNCDATASET dataset, Cluster cluster) { + dataset.setInitialInstance(SchemaHelper.formatDateUTC( + FeedHelper.getCluster(feed, cluster.getName()).getValidity().getStart())); + dataset.setTimezone(feed.getTimezone().getID()); + dataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}"); + } + + private static long getReplicationDelayInMillis(Feed feed, Cluster srcCluster) throws FalconException { + Frequency replicationDelay = FeedHelper.getCluster(feed, srcCluster.getName()).getDelay(); + long delayInMillis=0; + if (replicationDelay != null) { + delayInMillis = ExpressionHelper.get().evaluate( + replicationDelay.toString(), Long.class); + } + + return delayInMillis; + } + + private static Date getStartDate(Feed feed, Cluster cluster, long replicationDelayInMillis) { + Date startDate = FeedHelper.getCluster(feed, cluster.getName()).getValidity().getStart(); + return replicationDelayInMillis == 0 ? startDate : new Date(startDate.getTime() + replicationDelayInMillis); + } + + private static Date getEndDate(Feed feed, Cluster cluster) { + return FeedHelper.getCluster(feed, cluster.getName()).getValidity().getEnd(); + } + +} diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/archival/AgeBasedArchivalWorkflowBuilder.java b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/archival/AgeBasedArchivalWorkflowBuilder.java new file mode 100644 index 000000000..5ba60c50b --- /dev/null +++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/archival/AgeBasedArchivalWorkflowBuilder.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.lifecycle.engine.oozie.archival; + +import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.LifeCycle; +import org.apache.falcon.Tag; +import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.FeedHelper; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.feed.ArchivalStage; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.feed.Property; +import org.apache.falcon.lifecycle.engine.oozie.utils.OozieBuilderUtils; +import org.apache.falcon.oozie.workflow.WORKFLOWAPP; +import org.apache.falcon.oozie.workflow.ACTION; +import org.apache.falcon.util.ReplicationDistCpOption; +import org.apache.falcon.util.RuntimeProperties; +import org.apache.falcon.workflow.WorkflowExecutionArgs; +import org.apache.falcon.workflow.WorkflowExecutionContext; +import org.apache.hadoop.fs.Path; + +import java.util.List; +import java.util.Properties; + +public class AgeBasedArchivalWorkflowBuilder { + private static final String ARCHIVAL_ACTION_TEMPLATE = "/action/feed/archival-action.xml"; + private static final String ARCHIVAL_ACTION_NAME = "archival"; + + private static final String ARCHIVAL_JOB_COUNTER = "job.counter"; + private static final String TDE_ENCRYPTION_ENABLED = "tdeEncryptionEnabled"; + private static final String MR_MAX_MAPS = "maxMaps"; + private static final String MR_MAP_BANDWIDTH = "mapBandwidth"; + + private AgeBasedArchivalWorkflowBuilder(){ + + } + + public static Properties build(Cluster cluster, Path basePath, Feed feed) throws FalconException { + org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName()); + + // workflow is serialized to a specific dir + Path buildPath = new Path(basePath, Tag.ARCHIVAL.name() + "/" + feedCluster.getName()); + + WORKFLOWAPP workflow = new WORKFLOWAPP(); + String wfName = EntityUtil.getWorkflowName(Tag.ARCHIVAL, feed).toString(); + + String start = ARCHIVAL_ACTION_NAME; + + //Add pre-processing + if (shouldPreProcess(feed)) { + ACTION action = OozieBuilderUtils.getPreProcessingAction(Tag.ARCHIVAL); + addTransition(action, ARCHIVAL_ACTION_NAME, OozieBuilderUtils.getFailAction()); + workflow.getDecisionOrForkOrJoin().add(action); + start = OozieBuilderUtils.PREPROCESS_ACTION_NAME; + } + + //Add replication + ACTION archival = OozieBuilderUtils.unmarshalAction(ARCHIVAL_ACTION_TEMPLATE); + addAdditionalArchivalProperties(feed, archival); + enableCounters(feed, archival); + enableTDE(feed, archival); + addPostProcessing(workflow, archival); + OozieBuilderUtils.decorateWorkflow(workflow, wfName, start); + + OozieBuilderUtils.addLibExtensionsToWorkflow(cluster, workflow, Tag.ARCHIVAL, EntityType.FEED); + + OozieBuilderUtils.marshalWokflow(cluster, workflow, buildPath); + + + Properties props = OozieBuilderUtils.getProperties(buildPath, wfName); + props.putAll(OozieBuilderUtils.createDefaultConfiguration(cluster, feed, WorkflowExecutionContext.EntityOperations.ARCHIVE)); + + props.putAll(getWorkflowProperties(feed, cluster)); + props.putAll(FeedHelper.getUserWorkflowProperties(LifeCycle.ARCHIVAL)); + + ArchivalStage archivalStage = FeedHelper.getArchivalStage(feed, cluster.getName()); + if (StringUtils.isNotBlank(archivalStage.getQueue())) { + props.put(OozieBuilderUtils.MR_QUEUE_NAME, archivalStage.getQueue()); + } + + // Write out the config to config-default.xml + OozieBuilderUtils.marshalDefaultConfig(cluster, workflow, props, buildPath); + return props; + + } + + private static Properties getWorkflowProperties(Feed feed, Cluster cluster) throws FalconException { + Properties props = FeedHelper.getFeedProperties(feed); + if (props.getProperty(MR_MAX_MAPS) == null) { // set default if user has not overridden + props.put(MR_MAX_MAPS, getDefaultMaxMaps()); + } + if (props.getProperty(MR_MAP_BANDWIDTH) == null) { // set default if user has not overridden + props.put(MR_MAP_BANDWIDTH, getDefaultMapBandwidth()); + } + + if (feed.getAvailabilityFlag() == null) { + props.put("availabilityFlag", "NA"); + } else { + props.put("availabilityFlag", feed.getAvailabilityFlag()); + } + props.put(WorkflowExecutionArgs.DATASOURCE_NAME.getName(), "NA"); + + return props; + } + + private static String getDefaultMaxMaps() { + return RuntimeProperties.get().getProperty("falcon.replication.workflow.maxmaps", "5"); + } + + private static String getDefaultMapBandwidth() { + return RuntimeProperties.get().getProperty("falcon.replication.workflow.mapbandwidth", "100"); + } + + private static boolean shouldPreProcess(Feed feed) throws FalconException { + return !(EntityUtil.getLateProcess(feed) == null + || EntityUtil.getLateProcess(feed).getLateInputs() == null + || EntityUtil.getLateProcess(feed).getLateInputs().size() == 0); + } + + private static void addTransition(ACTION action, String ok, String fail) { + action.getOk().setTo(ok); + action.getError().setTo(fail); + } + + private static void addAdditionalArchivalProperties(Feed feed, ACTION archivalAction) { + List args = archivalAction.getJava().getArg(); + Properties props = EntityUtil.getEntityProperties(feed); + + for (ReplicationDistCpOption distcpOption : ReplicationDistCpOption.values()) { + String propertyValue = props.getProperty(distcpOption.getName()); + if (StringUtils.isNotEmpty(propertyValue)) { + args.add("-" + distcpOption.getName()); + args.add(propertyValue); + } + } + } + + private static ACTION enableCounters(Feed feed, ACTION action) throws FalconException { + if (isCounterEnabled(feed)) { + List args = action.getJava().getArg(); + args.add("-counterLogDir"); + args.add("${logDir}/job-${nominalTime}/${srcClusterName == 'NA' ? '' : srcClusterName}"); + } + return action; + } + + private static boolean isCounterEnabled(Feed feed) throws FalconException { + if (feed.getProperties() != null) { + List propertyList = feed.getProperties().getProperties(); + for (Property prop : propertyList) { + if (prop.getName().equals(ARCHIVAL_JOB_COUNTER) && "true" .equalsIgnoreCase(prop.getValue())) { + return true; + } + } + } + return false; + } + + private static ACTION enableTDE(Feed feed, ACTION action) throws FalconException { + if (isTDEEnabled(feed)) { + List args = action.getJava().getArg(); + args.add("-tdeEncryptionEnabled"); + args.add("true"); + } + return action; + } + + private static boolean isTDEEnabled(Feed feed) { + String tdeEncryptionEnabled = FeedHelper.getPropertyValue(feed, TDE_ENCRYPTION_ENABLED); + return "true" .equalsIgnoreCase(tdeEncryptionEnabled); + } + + private static void addPostProcessing(WORKFLOWAPP workflow, ACTION action) throws FalconException{ + if (!Boolean.parseBoolean(OozieBuilderUtils.ENABLE_POSTPROCESSING)){ + OozieBuilderUtils.addTransition(action, OozieBuilderUtils.OK_ACTION_NAME, + OozieBuilderUtils.FAIL_ACTION_NAME); + workflow.getDecisionOrForkOrJoin().add(action); + } else { + OozieBuilderUtils.addTransition(action, OozieBuilderUtils.SUCCESS_POSTPROCESS_ACTION_NAME, + OozieBuilderUtils.FAIL_POSTPROCESS_ACTION_NAME); + workflow.getDecisionOrForkOrJoin().add(action); + + //Add post-processing actions + ACTION success = OozieBuilderUtils.getSuccessPostProcessAction(); + OozieBuilderUtils.addTransition(success, OozieBuilderUtils.OK_ACTION_NAME, + OozieBuilderUtils.FAIL_ACTION_NAME); + workflow.getDecisionOrForkOrJoin().add(success); + + ACTION fail = OozieBuilderUtils.getFailPostProcessAction(); + OozieBuilderUtils.addTransition(fail, OozieBuilderUtils.FAIL_ACTION_NAME, + OozieBuilderUtils.FAIL_ACTION_NAME); + workflow.getDecisionOrForkOrJoin().add(fail); + } + } + +} + + diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java index 7d51c9a71..5970c3cb8 100644 --- a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java +++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java @@ -79,6 +79,7 @@ public final class OozieBuilderUtils { private static final Logger LOG = LoggerFactory.getLogger(OozieBuilderUtils.class); private static final String POSTPROCESS_TEMPLATE = "/action/post-process.xml"; + private static final String PREPROCESS_TEMPLATE = "/action/pre-process.xml"; public static final String HIVE_CREDENTIAL_NAME = "falconHiveAuth"; public static final String MR_QUEUE_NAME = "queueName"; @@ -97,6 +98,7 @@ public final class OozieBuilderUtils { public static final String FAIL_POSTPROCESS_ACTION_NAME = "failed-post-processing"; public static final String OK_ACTION_NAME = "end"; public static final String FAIL_ACTION_NAME = "fail"; + public static final String PREPROCESS_ACTION_NAME = "pre-processing"; public static final String ENTITY_PATH = "ENTITY_PATH"; @@ -557,4 +559,26 @@ public static CONFIGURATION getCoordinatorConfig(Properties props) { } return conf; } + + public static ACTION getPreProcessingAction(Tag tag) throws FalconException { + ACTION action = unmarshalAction(PREPROCESS_TEMPLATE); + decorateWithOozieRetries(action); + + List args = action.getJava().getArg(); + args.add("-out"); + if (tag == Tag.ARCHIVAL) { + args.add("${logDir}/latedata/${nominalTime}/${srcClusterName}"); + } else { + args.add("${logDir}/latedata/${nominalTime}"); + } + return action; + } + + public static String getFailAction(){ + if (!Boolean.parseBoolean(OozieBuilderUtils.ENABLE_POSTPROCESSING)){ + return FAIL_ACTION_NAME; + } else { + return FAIL_POSTPROCESS_ACTION_NAME; + } + } } diff --git a/lifecycle/src/main/resources/action/feed/archival-action.xml b/lifecycle/src/main/resources/action/feed/archival-action.xml new file mode 100644 index 000000000..cca4517cb --- /dev/null +++ b/lifecycle/src/main/resources/action/feed/archival-action.xml @@ -0,0 +1,64 @@ + + + + + ${jobTracker} + ${nameNode} + + + oozie.launcher.mapreduce.job.user.classpath.first + true + + + mapred.job.queue.name + ${queueName} + + + oozie.launcher.mapred.job.priority + ${jobPriority} + + + oozie.action.sharelib.for.java + distcp + + + oozie.launcher.oozie.libpath + ${wf:conf("falcon.libpath")} + + + org.apache.falcon.replication.FeedReplicator + -Dfalcon.include.path=${sourceRelativePaths} + -Dmapred.job.queue.name=${queueName} + -Dmapred.job.priority=${jobPriority} + -maxMaps + ${maxMaps} + -mapBandwidth + ${mapBandwidth} + -sourcePaths + ${distcpSourcePaths} + -targetPath + ${distcpTargetPaths} + -falconFeedStorageType + ${falconFeedStorageType} + -availabilityFlag + ${availabilityFlag == 'NA' ? "NA" : availabilityFlag} + + + + diff --git a/lifecycle/src/main/resources/action/post-process.xml b/lifecycle/src/main/resources/action/post-process.xml new file mode 100644 index 000000000..d649a0f80 --- /dev/null +++ b/lifecycle/src/main/resources/action/post-process.xml @@ -0,0 +1,98 @@ + + + + ${jobTracker} + ${nameNode} + + + mapred.job.queue.name + ${queueName} + + + oozie.launcher.mapred.job.priority + ${jobPriority} + + + oozie.launcher.oozie.libpath + ${wf:conf("falcon.libpath")} + + + org.apache.falcon.workflow.FalconPostProcessing + -cluster + ${cluster} + -entityType + ${entityType} + -entityName + ${entityName} + -nominalTime + ${nominalTime} + -operation + ${falconDataOperation} + -workflowId + ${wf:id()} + -runId + ${wf:run()} + -status + ${wf:lastErrorNode() == null ? 'SUCCEEDED' : 'FAILED'} + -timeStamp + ${timeStamp} + -brokerImplClass + ${brokerImplClass} + -brokerUrl + ${brokerUrl} + -userBrokerImplClass + ${userBrokerImplClass} + -userBrokerUrl + ${userBrokerUrl} + -userJMSNotificationEnabled + ${userJMSNotificationEnabled} + -systemJMSNotificationEnabled + ${systemJMSNotificationEnabled} + -brokerTTL + ${brokerTTL} + -feedNames + ${feedNames} + -feedInstancePaths + ${feedInstancePaths} + -logFile + ${logDir}/job-${nominalTime}/${wf:run()}/evicted-instancePaths.csv + -workflowEngineUrl + ${workflowEngineUrl} + -subflowId + ${wf:id()}${userWorkflowEngine == "oozie" ? "@user-action" : ""} + -userWorkflowEngine + ${userWorkflowEngine} + -userWorkflowName + ${userWorkflowName} + -userWorkflowVersion + ${userWorkflowVersion} + -logDir + ${logDir}/job-${nominalTime}/${srcClusterName == 'NA' ? '' : srcClusterName}/ + -workflowUser + ${wf:user()} + -falconInputFeeds + ${falconInputFeeds} + -falconInPaths + ${falconInPaths} + -datasource + ${datasource == 'NA' ? 'IGNORE' : datasource} + + + + diff --git a/lifecycle/src/main/resources/action/pre-process.xml b/lifecycle/src/main/resources/action/pre-process.xml new file mode 100644 index 000000000..fc4125c05 --- /dev/null +++ b/lifecycle/src/main/resources/action/pre-process.xml @@ -0,0 +1,54 @@ + + + + ${jobTracker} + ${nameNode} + + + mapred.job.queue.name + ${queueName} + + + oozie.launcher.mapred.job.priority + ${jobPriority} + + + + oozie.action.sharelib.for.java + hcatalog + + + oozie.launcher.oozie.libpath + ${wf:conf("falcon.libpath")} + + + org.apache.falcon.workflow.LateDataHandler + -out + ${logDir}/latedata/${nominalTime}/${srcClusterName} + -paths + ${falconInPaths} + -falconInputNames + ${falconInputNames} + -falconInputFeedStorageTypes + ${falconInputFeedStorageTypes} + + + + + diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java index c7584119b..e094e0970 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java @@ -58,7 +58,8 @@ protected List buildCoords(Cluster cluster, Path buildPath) throws F props.add(appProps); } } - } else { + } + if (FeedHelper.getRetentionStage(this.entity, cluster.getName()) == null){ List evictionProps = OozieCoordinatorBuilder.get(entity, Tag.RETENTION).buildCoords(cluster, buildPath); if (evictionProps != null) { diff --git a/prism/src/test/resources/startup.properties b/prism/src/test/resources/startup.properties index 63fcd3b70..d66d50534 100644 --- a/prism/src/test/resources/startup.properties +++ b/prism/src/test/resources/startup.properties @@ -59,9 +59,13 @@ # List of Lifecycle policies configured. -*.falcon.feed.lifecycle.policies=org.apache.falcon.lifecycle.retention.AgeBasedDelete +*.falcon.feed.lifecycle.policies=org.apache.falcon.lifecycle.retention.AgeBasedDelete,\ + org.apache.falcon.lifecycle.archival.AgeBasedArchival + # List of builders for the policies. -*.falcon.feed.lifecycle.policy.builders=org.apache.falcon.lifecycle.engine.oozie.retention.AgeBasedDeleteBuilder +*.falcon.feed.lifecycle.policy.builders=org.apache.falcon.lifecycle.engine.oozie.retention.AgeBasedDeleteBuilder,\ + org.apache.falcon.lifecycle.engine.oozie.archival.AgeBasedArchivalBuilder + ##### Falcon Configuration Store Change listeners ##### *.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ org.apache.falcon.entity.ColoClusterRelation,\ diff --git a/src/conf/startup.properties b/src/conf/startup.properties index fae937ac0..8bd2bbe31 100644 --- a/src/conf/startup.properties +++ b/src/conf/startup.properties @@ -104,9 +104,12 @@ prism.application.services=org.apache.falcon.service.LifecyclePolicyMap,\ # List of Lifecycle policies configured. -*.falcon.feed.lifecycle.policies=org.apache.falcon.lifecycle.retention.AgeBasedDelete +*.falcon.feed.lifecycle.policies=org.apache.falcon.lifecycle.retention.AgeBasedDelete,\ + org.apache.falcon.lifecycle.archival.AgeBasedArchival + # List of builders for the policies. -*.falcon.feed.lifecycle.policy.builders=org.apache.falcon.lifecycle.engine.oozie.retention.AgeBasedDeleteBuilder +*.falcon.feed.lifecycle.policy.builders=org.apache.falcon.lifecycle.engine.oozie.retention.AgeBasedDeleteBuilder,\ + org.apache.falcon.lifecycle.engine.oozie.archival.AgeBasedArchivalBuilder ##### Falcon Configuration Store Change listeners ##### *.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\