-
Notifications
You must be signed in to change notification settings - Fork 111
Archival patch (Checkstyles not handled) #45
base: master
Are you sure you want to change the base?
Changes from all commits
3b7fd63
4c19ec0
f037385
3c302e2
db094ea
cd0664a
26c462a
b674558
bbf551a
a1a15af
5ff0493
97a155f
b92d3f2
9a882df
d244279
cb335be
8b95448
9009d39
4fd3cf8
c800405
a7baab5
4727e1d
9b65194
d53fd46
fcfc291
93a7253
204532f
753f7ca
ab8eb90
73c354c
e332108
e107a94
38a70c3
0bbe6e4
724f8b5
16955ca
098548f
3eb2109
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -363,7 +363,8 @@ | |
| </xs:annotation> | ||
|
|
||
| <xs:all> | ||
| <xs:element type="retention-stage" name="retention-stage" minOccurs="0"></xs:element> | ||
| <xs:element type="retention-stage" name="retention-stage" minOccurs="0" maxOccurs="1"></xs:element> | ||
| <xs:element type="archival-stage" name="archival-stage" minOccurs="0" maxOccurs="1"></xs:element> | ||
| </xs:all> | ||
|
|
||
| </xs:complexType> | ||
|
|
@@ -575,4 +576,22 @@ | |
| <xs:element type="properties" name="properties" minOccurs="0" maxOccurs="1"></xs:element> | ||
| </xs:all> | ||
| </xs:complexType> | ||
|
|
||
| <xs:complexType name="archival-stage"> | ||
| <xs:annotation> | ||
| <xs:documentation> | ||
| Archival stage is the new way to define archival for a feed using feed lifecycle feature. Archival | ||
| has a configurable policy which does the validation and the real execution through workflow engine. | ||
| This method of specifying archival gives you more control for archival rather than defining archival | ||
| source and target in replication lifecycle stage of feed. | ||
| </xs:documentation> | ||
| </xs:annotation> | ||
| <xs:all> | ||
| <xs:element type="non-empty-string" name="policy" minOccurs="0" maxOccurs="1"></xs:element> | ||
| <xs:element type="xs:string" name="deleteFromSource" minOccurs="0"></xs:element> | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be a boolean. missing maxOccurs=1 |
||
| <xs:element type="xs:string" name="queue" minOccurs="0" maxOccurs="1"></xs:element> | ||
| <xs:element type="location" name="location" minOccurs="1"></xs:element> | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we allow more than one location. If not? maxOccurs should be present. |
||
| <xs:element type="properties" name="properties" minOccurs="0"></xs:element> | ||
| </xs:all> | ||
| </xs:complexType> | ||
| </xs:schema> | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,22 +27,7 @@ | |
| import org.apache.falcon.entity.v0.EntityType; | ||
| import org.apache.falcon.entity.v0.Frequency; | ||
| import org.apache.falcon.entity.v0.datasource.DatasourceType; | ||
| import org.apache.falcon.entity.v0.feed.CatalogTable; | ||
| import org.apache.falcon.entity.v0.feed.Cluster; | ||
| import org.apache.falcon.entity.v0.feed.ClusterType; | ||
| import org.apache.falcon.entity.v0.feed.ExtractMethod; | ||
| import org.apache.falcon.entity.v0.feed.Feed; | ||
| import org.apache.falcon.entity.v0.feed.FieldIncludeExclude; | ||
| import org.apache.falcon.entity.v0.feed.Lifecycle; | ||
| import org.apache.falcon.entity.v0.feed.Load; | ||
| import org.apache.falcon.entity.v0.feed.Location; | ||
| import org.apache.falcon.entity.v0.feed.LocationType; | ||
| import org.apache.falcon.entity.v0.feed.Locations; | ||
| import org.apache.falcon.entity.v0.feed.MergeType; | ||
| import org.apache.falcon.entity.v0.feed.Property; | ||
| import org.apache.falcon.entity.v0.feed.RetentionStage; | ||
| import org.apache.falcon.entity.v0.feed.Sla; | ||
| import org.apache.falcon.entity.v0.feed.Validity; | ||
| import org.apache.falcon.entity.v0.feed.*; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Has been a conscious choice not to use wild card imports, so far. Lets not deviate from that. |
||
| import org.apache.falcon.entity.v0.process.Input; | ||
| import org.apache.falcon.entity.v0.process.Output; | ||
| import org.apache.falcon.entity.v0.process.Process; | ||
|
|
@@ -401,13 +386,27 @@ public static RetentionStage getRetentionStage(Feed feed, String clusterName) th | |
|
|
||
| if (clusterLifecycle != null && clusterLifecycle.getRetentionStage() != null) { | ||
| return clusterLifecycle.getRetentionStage(); | ||
| } else if (globalLifecycle != null) { | ||
| } else if (globalLifecycle != null && globalLifecycle.getRetentionStage() != null) { | ||
| return globalLifecycle.getRetentionStage(); | ||
| } | ||
| } | ||
| return null; | ||
| } | ||
|
|
||
| public static ArchivalStage getArchivalStage(Feed feed, String clusterName) throws FalconException { | ||
| if (isLifecycleEnabled(feed, clusterName)) { | ||
| Lifecycle globalLifecycle = feed.getLifecycle(); | ||
| Lifecycle clusterLifecycle = getCluster(feed, clusterName).getLifecycle(); | ||
|
|
||
| if (clusterLifecycle != null && clusterLifecycle.getArchivalStage() != null) { | ||
| return clusterLifecycle.getArchivalStage(); | ||
| } else if (globalLifecycle != null && globalLifecycle.getArchivalStage() != null) { | ||
| return globalLifecycle.getArchivalStage(); | ||
| } | ||
| } | ||
| return null; | ||
| } | ||
|
|
||
| public static Date getFeedValidityStart(Feed feed, String clusterName) throws FalconException { | ||
| org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, clusterName); | ||
| if (feedCluster != null) { | ||
|
|
@@ -437,10 +436,20 @@ public static List<String> getPolicies(Feed feed, String clusterName) throws Fal | |
| Cluster cluster = getCluster(feed, clusterName); | ||
| if (cluster != null) { | ||
| if (isLifecycleEnabled(feed, clusterName)) { | ||
| String policy = getRetentionStage(feed, clusterName).getPolicy(); | ||
| policy = StringUtils.isBlank(policy) | ||
| ? FeedLifecycleStage.RETENTION.getDefaultPolicyName() : policy; | ||
| result.add(policy); | ||
| String policy = ""; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. getPolicies method : rather than return the generic list of all policies across stages, please have separate methods. Clubbing policies across lifecycle stages does not make sense. |
||
| if (getRetentionStage(feed, clusterName) != null) { | ||
| policy = getRetentionStage(feed, clusterName).getPolicy(); | ||
| policy = StringUtils.isBlank(policy) | ||
| ? FeedLifecycleStage.RETENTION.getDefaultPolicyName() : policy; | ||
| result.add(policy); | ||
| } | ||
|
|
||
| if (getArchivalStage(feed, clusterName) != null) { | ||
| policy = getArchivalStage(feed, clusterName).getPolicy(); | ||
| policy = StringUtils.isBlank(policy) | ||
| ? FeedLifecycleStage.ARCHIVAL.getDefaultPolicyName() : policy; | ||
| result.add(policy); | ||
| } | ||
| } | ||
| return result; | ||
| } | ||
|
|
@@ -1295,4 +1304,17 @@ public static List<FeedInstanceStatus> getListing(Feed feed, String clusterName, | |
| return storage.getListing(feed, clusterName, locationType, start, end); | ||
| } | ||
|
|
||
| public static String getArchivalPath(Feed feed, String clusterName) throws FalconException { | ||
| String archivalPath = ""; | ||
| ArchivalStage archivalStage = getArchivalStage(feed, clusterName); | ||
| if (archivalStage != null) { | ||
| Location location = archivalStage.getLocation(); | ||
| if ((location != null) && (location.getPath() != null)) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use StringUtils. |
||
| archivalPath = location.getPath(); | ||
| } else { | ||
| throw new FalconException("Location cannot be empty."); | ||
| } | ||
| } | ||
| return archivalPath; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,18 +33,18 @@ | |
| import org.apache.falcon.entity.v0.EntityGraph; | ||
| import org.apache.falcon.entity.v0.EntityType; | ||
| import org.apache.falcon.entity.v0.Frequency; | ||
| import org.apache.falcon.entity.v0.feed.ACL; | ||
| import org.apache.falcon.entity.v0.feed.Extract; | ||
| import org.apache.falcon.entity.v0.feed.ExtractMethod; | ||
| import org.apache.falcon.entity.v0.feed.Feed; | ||
| import org.apache.falcon.entity.v0.feed.Cluster; | ||
| import org.apache.falcon.entity.v0.feed.ClusterType; | ||
| import org.apache.falcon.entity.v0.feed.Location; | ||
| import org.apache.falcon.entity.v0.feed.LocationType; | ||
| import org.apache.falcon.entity.v0.feed.MergeType; | ||
| import org.apache.falcon.entity.v0.feed.Properties; | ||
| import org.apache.falcon.entity.v0.feed.Property; | ||
| import org.apache.falcon.entity.v0.feed.Sla; | ||
| import org.apache.falcon.entity.v0.feed.Feed; | ||
| import org.apache.falcon.entity.v0.feed.Cluster; | ||
| import org.apache.falcon.entity.v0.feed.ClusterType; | ||
| import org.apache.falcon.entity.v0.feed.ACL; | ||
| import org.apache.falcon.entity.v0.feed.Location; | ||
| import org.apache.falcon.entity.v0.feed.Extract; | ||
| import org.apache.falcon.entity.v0.feed.ExtractMethod; | ||
| import org.apache.falcon.entity.v0.process.Input; | ||
| import org.apache.falcon.entity.v0.process.Output; | ||
| import org.apache.falcon.entity.v0.process.Process; | ||
|
|
@@ -157,14 +157,30 @@ private void validateLifecycle(Feed feed) throws FalconException { | |
| LifecyclePolicyMap map = LifecyclePolicyMap.get(); | ||
| for (Cluster cluster : feed.getClusters().getClusters()) { | ||
| if (FeedHelper.isLifecycleEnabled(feed, cluster.getName())) { | ||
| if (FeedHelper.getRetentionStage(feed, cluster.getName()) == null) { | ||
| throw new ValidationException("Retention is a mandatory stage, didn't find it for cluster: " | ||
| if ((FeedHelper.getRetentionStage(feed, cluster.getName()) != null) || (FeedHelper.getArchivalStage(feed, cluster.getName()) != null)) { | ||
| validateRetentionStage(feed, cluster); | ||
| validateArchivalStage(feed, cluster); | ||
| for (String policyName : FeedHelper.getPolicies(feed, cluster.getName())) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Once you split the getPolicies method. Use the appropriate ones inside validateRetentionStage and validateArchivalStage. |
||
| map.get(policyName).validate(feed, cluster.getName()); | ||
| } | ||
| } else { | ||
| throw new ValidationException("Atleast one of Retention/Archival is a mandatory stage, didn't find it for cluster: " | ||
| + cluster.getName()); | ||
| } | ||
| validateRetentionFrequency(feed, cluster.getName()); | ||
| for (String policyName : FeedHelper.getPolicies(feed, cluster.getName())) { | ||
| map.get(policyName).validate(feed, cluster.getName()); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private void validateRetentionStage(Feed feed, Cluster cluster) throws FalconException { | ||
| if (FeedHelper.getRetentionStage(feed, cluster.getName()) != null) { | ||
| validateRetentionFrequency(feed, cluster.getName()); | ||
| } | ||
| } | ||
|
|
||
| private void validateArchivalStage(Feed feed, Cluster cluster) throws FalconException { | ||
| if (FeedHelper.getArchivalStage(feed, cluster.getName()) != null) { | ||
| if (FeedHelper.getArchivalStage(feed, cluster.getName()).getLocation().getPath().isEmpty()) { | ||
| throw new ValidationException("Location path cannot be empty."); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A little more details on the message like the mention of archival stage for the given cluster will be useful. |
||
| } | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,7 +22,8 @@ | |
| */ | ||
| public enum FeedLifecycleStage { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This class needs to be re-visited. A common set of policies for all lifecycles and a default policy across stages does not make sense to me. May be have a base class and child classes. |
||
|
|
||
| RETENTION("AgeBasedDelete"); | ||
| RETENTION("AgeBasedDelete"), | ||
| ARCHIVAL("AgeBasedArchival"); | ||
|
|
||
| private String defaultPolicyName; | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| /** | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
|
|
||
| package org.apache.falcon.lifecycle.archival; | ||
|
|
||
| import org.apache.falcon.FalconException; | ||
| import org.apache.falcon.entity.v0.feed.Feed; | ||
|
|
||
| /** | ||
| * Archival policy which archives all instances of instance time depending on the given frequency. | ||
| * It will create the workflow and coordinators for this policy. | ||
| */ | ||
| public class AgeBasedArchival extends ArchivalPolicy { | ||
|
|
||
| @Override | ||
| public void validate(Feed feed, String clusterName) throws FalconException { | ||
|
|
||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,54 @@ | ||
| /** | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.falcon.lifecycle.archival; | ||
|
|
||
| import org.apache.falcon.FalconException; | ||
| import org.apache.falcon.entity.v0.cluster.Cluster; | ||
| import org.apache.falcon.entity.v0.feed.Feed; | ||
| import org.apache.falcon.lifecycle.AbstractPolicyBuilderFactory; | ||
| import org.apache.falcon.lifecycle.FeedLifecycleStage; | ||
| import org.apache.falcon.lifecycle.LifecyclePolicy; | ||
| import org.apache.falcon.lifecycle.PolicyBuilder; | ||
| import org.apache.falcon.workflow.WorkflowEngineFactory; | ||
| import org.apache.hadoop.fs.Path; | ||
|
|
||
| import java.util.Properties; | ||
|
|
||
| /** | ||
| * All archival policies must implement this interface. | ||
| */ | ||
| public abstract class ArchivalPolicy implements LifecyclePolicy { | ||
|
|
||
| @Override | ||
| public String getName() { | ||
| return this.getClass().getSimpleName(); | ||
| } | ||
|
|
||
| @Override | ||
| public FeedLifecycleStage getStage() { | ||
| return FeedLifecycleStage.ARCHIVAL; | ||
| } | ||
|
|
||
| @Override | ||
| public Properties build(Cluster cluster, Path buildPath, Feed feed) throws FalconException { | ||
| AbstractPolicyBuilderFactory factory = WorkflowEngineFactory.getLifecycleEngine(); | ||
| PolicyBuilder builder = factory.getPolicyBuilder(getName()); | ||
| return builder.build(cluster, buildPath, feed); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove new... It will get old at some point :-).
In my mind, the key difference between replication and archival is that, in replication, it is a data copy to the same type of storage. With archival, the destination storage can be different and it is a data move. Bring that difference out in the documentation.