Skip to content
This repository was archived by the owner on Apr 4, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
3b7fd63
FALCON-1829 Add regression for submit and schedule process on native …
pragya-mittal Feb 11, 2016
4c19ec0
Merge branch 'master' of https://github.com/apache/falcon
pragya-mittal Feb 11, 2016
f037385
Merge branch 'master' of https://github.com/apache/falcon
pragya-mittal Feb 12, 2016
3c302e2
Merge branch 'master' of https://github.com/apache/falcon
pragya-mittal Feb 17, 2016
db094ea
FALCON-1832 Adding tags in SchedulableEntityInstance.compareTo()
pragya-mittal Feb 17, 2016
cd0664a
Revert "FALCON-1832 Adding tags in SchedulableEntityInstance.compareT…
pragya-mittal Feb 24, 2016
26c462a
Merge branch 'master' of https://github.com/apache/falcon
pragya-mittal Feb 24, 2016
b674558
FALCON-1839 Test case for APIs for entities scheduled on native sched…
pragya-mittal Feb 24, 2016
bbf551a
Review commenst addressed
pragya-mittal Feb 25, 2016
a1a15af
FALCON-1841 Grouping test in falcon for running nightly regression
pragya-mittal Feb 29, 2016
5ff0493
Merge branch 'master' of https://github.com/apache/falcon
pragya-mittal Feb 29, 2016
97a155f
Review comments addressed
pragya-mittal Feb 29, 2016
b92d3f2
Review comments addressed
pragya-mittal Feb 29, 2016
9a882df
Modifying totalMinutesToWait in NativeInstancEUtil to wait for defin…
pragya-mittal Feb 29, 2016
d244279
Modifying NativeInstanceUtil.waitTillInstancesAreCreated based on rev…
pragya-mittal Mar 1, 2016
cb335be
Modifying CHANGES.txt
pragya-mittal Mar 1, 2016
8b95448
Adding doc for NativeInstanceUtil.waitTillInstancesAreCreated
pragya-mittal Mar 1, 2016
9009d39
Adding assertion
pragya-mittal Mar 1, 2016
4fd3cf8
Making waitTillInstanceReachState generic
pragya-mittal Mar 3, 2016
c800405
Addressed review comments
pragya-mittal Mar 4, 2016
a7baab5
Modified Changes.txt
pragya-mittal Mar 7, 2016
4727e1d
Merge branch 'grouping'
pragya-mittal Mar 7, 2016
9b65194
Merge branch 'native-api'
pragya-mittal Mar 7, 2016
d53fd46
Merge branch 'master' of https://github.com/apache/falcon
pragya-mittal Mar 7, 2016
fcfc291
Merge branch 'master' of https://github.com/apache/falcon
pragya-mittal Oct 11, 2017
93a7253
Initial archival patch (Checkstyles, docs and test cases not handled)
pragya-mittal Nov 20, 2017
204532f
Adding queue to archival workflow
pragya-mittal Nov 21, 2017
753f7ca
Adding validation fro archivalPolicy
pragya-mittal Nov 21, 2017
ab8eb90
Adding policy to startup.properties
pragya-mittal Nov 21, 2017
73c354c
Changing validate lifecycle in FeedEntityParser
pragya-mittal Nov 21, 2017
e332108
Adding twiki changes and fixing FeedHelperTest
pragya-mittal Nov 24, 2017
e107a94
Adding test cases for feed archival
pragya-mittal Nov 27, 2017
38a70c3
Setting datasets in AgeBasedArchivalCoordinatorBuilder
pragya-mittal Nov 30, 2017
0bbe6e4
Minor nit
pragya-mittal Nov 30, 2017
724f8b5
Adding EntityOperations.ARCHIVE
pragya-mittal Dec 4, 2017
16955ca
Fixing path in workflow builder
pragya-mittal Dec 4, 2017
098548f
Minor checkstyle fix
pragya-mittal Dec 4, 2017
3eb2109
Minor checkstyle fix
pragya-mittal Dec 4, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion common-types/src/main/java/org/apache/falcon/LifeCycle.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ public enum LifeCycle {
EVICTION(Tag.RETENTION),
REPLICATION(Tag.REPLICATION),
IMPORT(Tag.IMPORT),
EXPORT(Tag.EXPORT);
EXPORT(Tag.EXPORT),
ARCHIVAL(Tag.ARCHIVAL);

private final Tag tag;

Expand Down
3 changes: 2 additions & 1 deletion common-types/src/main/java/org/apache/falcon/Tag.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public enum Tag {
RETENTION(EntityType.FEED),
REPLICATION(EntityType.FEED),
IMPORT(EntityType.FEED),
EXPORT(EntityType.FEED);
EXPORT(EntityType.FEED),
ARCHIVAL(EntityType.FEED);

private final EntityType entityType;

Expand Down
21 changes: 20 additions & 1 deletion common-types/src/main/resources/feed-0.1.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,8 @@
</xs:annotation>

<xs:all>
<xs:element type="retention-stage" name="retention-stage" minOccurs="0"></xs:element>
<xs:element type="retention-stage" name="retention-stage" minOccurs="0" maxOccurs="1"></xs:element>
<xs:element type="archival-stage" name="archival-stage" minOccurs="0" maxOccurs="1"></xs:element>
</xs:all>

</xs:complexType>
Expand Down Expand Up @@ -575,4 +576,22 @@
<xs:element type="properties" name="properties" minOccurs="0" maxOccurs="1"></xs:element>
</xs:all>
</xs:complexType>

<xs:complexType name="archival-stage">
<xs:annotation>
<xs:documentation>
Archival stage is the new way to define archival for a feed using feed lifecycle feature. Archival

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove new... It will get old at some point :-).
In my mind, the key difference between replication and archival is that, in replication, it is a data copy to the same type of storage. With archival, the destination storage can be different and it is a data move. Bring that difference out in the documentation.

has a configurable policy which does the validation and the real execution through workflow engine.
This method of specifying archival gives you more control for archival rather than defining archival
source and target in replication lifecycle stage of feed.
</xs:documentation>
</xs:annotation>
<xs:all>
<xs:element type="non-empty-string" name="policy" minOccurs="0" maxOccurs="1"></xs:element>
<xs:element type="xs:string" name="deleteFromSource" minOccurs="0"></xs:element>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a boolean. missing maxOccurs=1

<xs:element type="xs:string" name="queue" minOccurs="0" maxOccurs="1"></xs:element>
<xs:element type="location" name="location" minOccurs="1"></xs:element>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we allow more than one location. If not? maxOccurs should be present.

<xs:element type="properties" name="properties" minOccurs="0"></xs:element>
</xs:all>
</xs:complexType>
</xs:schema>
64 changes: 43 additions & 21 deletions common/src/main/java/org/apache/falcon/entity/FeedHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,7 @@
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.datasource.DatasourceType;
import org.apache.falcon.entity.v0.feed.CatalogTable;
import org.apache.falcon.entity.v0.feed.Cluster;
import org.apache.falcon.entity.v0.feed.ClusterType;
import org.apache.falcon.entity.v0.feed.ExtractMethod;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.FieldIncludeExclude;
import org.apache.falcon.entity.v0.feed.Lifecycle;
import org.apache.falcon.entity.v0.feed.Load;
import org.apache.falcon.entity.v0.feed.Location;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.feed.Locations;
import org.apache.falcon.entity.v0.feed.MergeType;
import org.apache.falcon.entity.v0.feed.Property;
import org.apache.falcon.entity.v0.feed.RetentionStage;
import org.apache.falcon.entity.v0.feed.Sla;
import org.apache.falcon.entity.v0.feed.Validity;
import org.apache.falcon.entity.v0.feed.*;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has been a conscious choice not to use wild card imports, so far. Lets not deviate from that.

import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Process;
Expand Down Expand Up @@ -401,13 +386,27 @@ public static RetentionStage getRetentionStage(Feed feed, String clusterName) th

if (clusterLifecycle != null && clusterLifecycle.getRetentionStage() != null) {
return clusterLifecycle.getRetentionStage();
} else if (globalLifecycle != null) {
} else if (globalLifecycle != null && globalLifecycle.getRetentionStage() != null) {
return globalLifecycle.getRetentionStage();
}
}
return null;
}

public static ArchivalStage getArchivalStage(Feed feed, String clusterName) throws FalconException {
if (isLifecycleEnabled(feed, clusterName)) {
Lifecycle globalLifecycle = feed.getLifecycle();
Lifecycle clusterLifecycle = getCluster(feed, clusterName).getLifecycle();

if (clusterLifecycle != null && clusterLifecycle.getArchivalStage() != null) {
return clusterLifecycle.getArchivalStage();
} else if (globalLifecycle != null && globalLifecycle.getArchivalStage() != null) {
return globalLifecycle.getArchivalStage();
}
}
return null;
}

public static Date getFeedValidityStart(Feed feed, String clusterName) throws FalconException {
org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, clusterName);
if (feedCluster != null) {
Expand Down Expand Up @@ -437,10 +436,20 @@ public static List<String> getPolicies(Feed feed, String clusterName) throws Fal
Cluster cluster = getCluster(feed, clusterName);
if (cluster != null) {
if (isLifecycleEnabled(feed, clusterName)) {
String policy = getRetentionStage(feed, clusterName).getPolicy();
policy = StringUtils.isBlank(policy)
? FeedLifecycleStage.RETENTION.getDefaultPolicyName() : policy;
result.add(policy);
String policy = "";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getPolicies method : rather than return the generic list of all policies across stages, please have separate methods. Clubbing policies across lifecycle stages does not make sense.

if (getRetentionStage(feed, clusterName) != null) {
policy = getRetentionStage(feed, clusterName).getPolicy();
policy = StringUtils.isBlank(policy)
? FeedLifecycleStage.RETENTION.getDefaultPolicyName() : policy;
result.add(policy);
}

if (getArchivalStage(feed, clusterName) != null) {
policy = getArchivalStage(feed, clusterName).getPolicy();
policy = StringUtils.isBlank(policy)
? FeedLifecycleStage.ARCHIVAL.getDefaultPolicyName() : policy;
result.add(policy);
}
}
return result;
}
Expand Down Expand Up @@ -1295,4 +1304,17 @@ public static List<FeedInstanceStatus> getListing(Feed feed, String clusterName,
return storage.getListing(feed, clusterName, locationType, start, end);
}

public static String getArchivalPath(Feed feed, String clusterName) throws FalconException {
String archivalPath = "";
ArchivalStage archivalStage = getArchivalStage(feed, clusterName);
if (archivalStage != null) {
Location location = archivalStage.getLocation();
if ((location != null) && (location.getPath() != null)) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use StringUtils.

archivalPath = location.getPath();
} else {
throw new FalconException("Location cannot be empty.");
}
}
return archivalPath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,18 @@
import org.apache.falcon.entity.v0.EntityGraph;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.feed.ACL;
import org.apache.falcon.entity.v0.feed.Extract;
import org.apache.falcon.entity.v0.feed.ExtractMethod;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.Cluster;
import org.apache.falcon.entity.v0.feed.ClusterType;
import org.apache.falcon.entity.v0.feed.Location;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.feed.MergeType;
import org.apache.falcon.entity.v0.feed.Properties;
import org.apache.falcon.entity.v0.feed.Property;
import org.apache.falcon.entity.v0.feed.Sla;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.Cluster;
import org.apache.falcon.entity.v0.feed.ClusterType;
import org.apache.falcon.entity.v0.feed.ACL;
import org.apache.falcon.entity.v0.feed.Location;
import org.apache.falcon.entity.v0.feed.Extract;
import org.apache.falcon.entity.v0.feed.ExtractMethod;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Process;
Expand Down Expand Up @@ -157,14 +157,30 @@ private void validateLifecycle(Feed feed) throws FalconException {
LifecyclePolicyMap map = LifecyclePolicyMap.get();
for (Cluster cluster : feed.getClusters().getClusters()) {
if (FeedHelper.isLifecycleEnabled(feed, cluster.getName())) {
if (FeedHelper.getRetentionStage(feed, cluster.getName()) == null) {
throw new ValidationException("Retention is a mandatory stage, didn't find it for cluster: "
if ((FeedHelper.getRetentionStage(feed, cluster.getName()) != null) || (FeedHelper.getArchivalStage(feed, cluster.getName()) != null)) {
validateRetentionStage(feed, cluster);
validateArchivalStage(feed, cluster);
for (String policyName : FeedHelper.getPolicies(feed, cluster.getName())) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once you split the getPolicies method. Use the appropriate ones inside validateRetentionStage and validateArchivalStage.

map.get(policyName).validate(feed, cluster.getName());
}
} else {
throw new ValidationException("Atleast one of Retention/Archival is a mandatory stage, didn't find it for cluster: "
+ cluster.getName());
}
validateRetentionFrequency(feed, cluster.getName());
for (String policyName : FeedHelper.getPolicies(feed, cluster.getName())) {
map.get(policyName).validate(feed, cluster.getName());
}
}
}
}

private void validateRetentionStage(Feed feed, Cluster cluster) throws FalconException {
if (FeedHelper.getRetentionStage(feed, cluster.getName()) != null) {
validateRetentionFrequency(feed, cluster.getName());
}
}

private void validateArchivalStage(Feed feed, Cluster cluster) throws FalconException {
if (FeedHelper.getArchivalStage(feed, cluster.getName()) != null) {
if (FeedHelper.getArchivalStage(feed, cluster.getName()).getLocation().getPath().isEmpty()) {
throw new ValidationException("Location path cannot be empty.");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little more details on the message like the mention of archival stage for the given cluster will be useful.

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
*/
public enum FeedLifecycleStage {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class needs to be re-visited. A common set of policies for all lifecycles and a default policy across stages does not make sense to me. May be have a base class and child classes.


RETENTION("AgeBasedDelete");
RETENTION("AgeBasedDelete"),
ARCHIVAL("AgeBasedArchival");

private String defaultPolicyName;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


package org.apache.falcon.lifecycle.archival;

import org.apache.falcon.FalconException;
import org.apache.falcon.entity.v0.feed.Feed;

/**
* Archival policy which archives all instances of instance time depending on the given frequency.
* It will create the workflow and coordinators for this policy.
*/
public class AgeBasedArchival extends ArchivalPolicy {

@Override
public void validate(Feed feed, String clusterName) throws FalconException {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.falcon.lifecycle.archival;

import org.apache.falcon.FalconException;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.lifecycle.AbstractPolicyBuilderFactory;
import org.apache.falcon.lifecycle.FeedLifecycleStage;
import org.apache.falcon.lifecycle.LifecyclePolicy;
import org.apache.falcon.lifecycle.PolicyBuilder;
import org.apache.falcon.workflow.WorkflowEngineFactory;
import org.apache.hadoop.fs.Path;

import java.util.Properties;

/**
* All archival policies must implement this interface.
*/
public abstract class ArchivalPolicy implements LifecyclePolicy {

@Override
public String getName() {
return this.getClass().getSimpleName();
}

@Override
public FeedLifecycleStage getStage() {
return FeedLifecycleStage.ARCHIVAL;
}

@Override
public Properties build(Cluster cluster, Path buildPath, Feed feed) throws FalconException {
AbstractPolicyBuilderFactory factory = WorkflowEngineFactory.getLifecycleEngine();
PolicyBuilder builder = factory.getPolicyBuilder(getName());
return builder.build(cluster, buildPath, feed);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void validate(Feed feed, String clusterName) throws FalconException {
// validate that it is a valid cluster
Cluster cluster = FeedHelper.getCluster(feed, clusterName);
Frequency retentionLimit = getRetentionLimit(feed, clusterName);
if (cluster != null) {
if (cluster != null && retentionLimit != null) {
validateLimitWithSla(feed, cluster, retentionLimit.toString());
validateLimitWithLateData(feed, cluster, retentionLimit.toString());
String lifecycleEngine = StartupProperties.get().getProperty("lifecycle.engine.impl",
Expand Down Expand Up @@ -122,9 +122,8 @@ public Frequency getRetentionLimit(Feed feed, String clusterName) throws FalconE
throw new FalconException("Invalid value for property: " + LIMIT_PROPERTY_NAME + ", should be a valid "
+ "frequency e.g. hours(2)", e);
}
} else {
throw new FalconException("Cluster " + clusterName + " doesn't contain retention stage");
}
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public enum Type {PRE_PROCESSING, POST_PROCESSING, WORKFLOW_JOB, COORDINATOR_ACT
* Entity operations supported.
*/
public enum EntityOperations {
GENERATE, DELETE, REPLICATE, IMPORT, EXPORT
GENERATE, DELETE, REPLICATE, IMPORT, EXPORT, ARCHIVE
}

public static final WorkflowExecutionArgs[] USER_MESSAGE_ARGS = {
Expand Down
7 changes: 5 additions & 2 deletions common/src/main/resources/startup.properties
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,12 @@


# List of Lifecycle policies configured.
*.falcon.feed.lifecycle.policies=org.apache.falcon.lifecycle.retention.AgeBasedDelete
*.falcon.feed.lifecycle.policies=org.apache.falcon.lifecycle.retention.AgeBasedDelete,\
org.apache.falcon.lifecycle.archival.AgeBasedArchival

# List of builders for the policies.
*.falcon.feed.lifecycle.policy.builders=org.apache.falcon.lifecycle.engine.oozie.retention.AgeBasedDeleteBuilder
*.falcon.feed.lifecycle.policy.builders=org.apache.falcon.lifecycle.engine.oozie.retention.AgeBasedDeleteBuilder,\
org.apache.falcon.lifecycle.engine.oozie.archival.AgeBasedArchivalBuilder
##### Falcon Configuration Store Change listeners #####
*.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
org.apache.falcon.entity.ColoClusterRelation,\
Expand Down
Loading