From 9d007220ae067e45e2dd6704565ed8ca0b673304 Mon Sep 17 00:00:00 2001 From: sandeep Date: Mon, 20 Jun 2016 14:21:00 +0530 Subject: [PATCH 1/7] FALCON-2034. Make numThreads and timeOut configurable In ConfigurationStore init --- .../entity/store/ConfigurationStore.java | 29 ++++++++++++++++--- src/conf/startup.properties | 6 ++++ 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java index debf106af..6711b5135 100644 --- a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java +++ b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java @@ -70,6 +70,10 @@ public final class ConfigurationStore implements FalconService { private static final Logger LOG = LoggerFactory.getLogger(ConfigurationStore.class); private static final Logger AUDIT = LoggerFactory.getLogger("AUDIT"); private static final String UTF_8 = CharEncoding.UTF_8; + private static final String LOAD_ENTITIES_THREADS = "falcon.default.threads.load.restore.entities"; + private static final String TIMEOUT_MINS_LOAD_ENTITIES = "falcon.default.timeout.minutes"; + private int numThreads; + private int restoreTimeOutInMins; private final boolean shouldPersist; private static final FsPermission STORE_PERMISSION = @@ -150,6 +154,21 @@ private FileSystem initializeFileSystem() { @Override public void init() throws FalconException { + try { + numThreads = Integer.parseInt(StartupProperties.get().getProperty(LOAD_ENTITIES_THREADS, "100")); + LOG.info("Number of threads used to restore entities: {}", restoreTimeOutInMins); + } catch (NumberFormatException nfe) { + throw new FalconException("Invalid value specified for start up property \"" + + LOAD_ENTITIES_THREADS + "\".Please provide an integer value"); + } + try { + restoreTimeOutInMins = Integer.parseInt(StartupProperties.get(). + getProperty(TIMEOUT_MINS_LOAD_ENTITIES, "30")); + LOG.info("TimeOut to load Entities is taken as {} mins", restoreTimeOutInMins); + } catch (NumberFormatException nfe) { + throw new FalconException("Invalid value specified for start up property \"" + + TIMEOUT_MINS_LOAD_ENTITIES + "\".Please provide an integer value"); + } String listenerClassNames = StartupProperties.get(). getProperty("configstore.listeners", "org.apache.falcon.entity.v0.EntityGraph"); for (String listenerClassName : listenerClassNames.split(",")) { @@ -173,7 +192,8 @@ private void loadEntity(final EntityType type) throws FalconException { final ConcurrentHashMap entityMap = dictionary.get(type); FileStatus[] files = fs.globStatus(new Path(storePath, type.name() + Path.SEPARATOR + "*")); if (files != null) { - final ExecutorService service = Executors.newFixedThreadPool(100); + + final ExecutorService service = Executors.newFixedThreadPool(numThreads); for (final FileStatus file : files) { service.execute(new Runnable() { @Override @@ -184,6 +204,7 @@ public void run() { // ".xml" String entityName = URLDecoder.decode(encodedEntityName, UTF_8); Entity entity = restore(type, entityName); + LOG.info("Restored configuration {}/{}", type, entityName); entityMap.put(entityName, entity); } catch (IOException | FalconException e) { LOG.error("Unable to restore entity of", file); @@ -192,10 +213,10 @@ public void run() { }); } service.shutdown(); - if (service.awaitTermination(10, TimeUnit.MINUTES)) { + if (service.awaitTermination(restoreTimeOutInMins, TimeUnit.MINUTES)) { LOG.info("Restored Configurations for entity type: {} ", type.name()); } else { - LOG.warn("Time out happened while waiting for all threads to finish while restoring entities " + LOG.warn("Timed out while waiting for all threads to finish while restoring entities " + "for type: {}", type.name()); } // Checking if all entities were loaded @@ -341,6 +362,7 @@ public T get(EntityType type, String name) throws FalconExcep } catch (IOException e) { throw new StoreAccessException(e); } + LOG.info("Restored configuration {}/{}", type, name); entityMap.put(name, entity); return entity; } else { @@ -450,7 +472,6 @@ private synchronized T restore(EntityType type, String name) throw new StoreAccessException("Unable to un-marshall xml definition for " + type + "/" + name, e); } finally { in.close(); - LOG.info("Restored configuration {}/{}", type, name); } } diff --git a/src/conf/startup.properties b/src/conf/startup.properties index ae50d518c..c1602c031 100644 --- a/src/conf/startup.properties +++ b/src/conf/startup.properties @@ -136,6 +136,12 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ *.falcon.cleanup.service.frequency=days(1) +# Default number of threads to be used to restore entities. +*.falcon.default.threads.load.restore.entities=100 + +# Default timeout in minutes to load entities +*.falcon.default.timeout.minutes=30 + ######### Properties for Feed SLA Monitoring ######### # frequency of serialization for the state of FeedSLAMonitoringService - 1 hour *.feed.sla.serialization.frequency.millis=3600000 From 78b98d5b9a555a56ff0155a8e198a9aa7dd771a0 Mon Sep 17 00:00:00 2001 From: sandeep Date: Mon, 20 Jun 2016 14:42:05 +0530 Subject: [PATCH 2/7] FALCON-2034. Make numThreads and timeOut configurable In ConfigurationStore init --- .../org/apache/falcon/entity/store/ConfigurationStore.java | 4 ++-- src/conf/startup.properties | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java index 6711b5135..19e10bd2d 100644 --- a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java +++ b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java @@ -70,8 +70,8 @@ public final class ConfigurationStore implements FalconService { private static final Logger LOG = LoggerFactory.getLogger(ConfigurationStore.class); private static final Logger AUDIT = LoggerFactory.getLogger("AUDIT"); private static final String UTF_8 = CharEncoding.UTF_8; - private static final String LOAD_ENTITIES_THREADS = "falcon.default.threads.load.restore.entities"; - private static final String TIMEOUT_MINS_LOAD_ENTITIES = "falcon.default.timeout.minutes"; + private static final String LOAD_ENTITIES_THREADS = "config.store.num.threads.load.entities"; + private static final String TIMEOUT_MINS_LOAD_ENTITIES = "config.store.start.timeout.minutes"; private int numThreads; private int restoreTimeOutInMins; private final boolean shouldPersist; diff --git a/src/conf/startup.properties b/src/conf/startup.properties index c1602c031..5ac3d5c1e 100644 --- a/src/conf/startup.properties +++ b/src/conf/startup.properties @@ -137,10 +137,10 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ *.falcon.cleanup.service.frequency=days(1) # Default number of threads to be used to restore entities. -*.falcon.default.threads.load.restore.entities=100 +*.config.store.num.threads.load.entities=100 # Default timeout in minutes to load entities -*.falcon.default.timeout.minutes=30 +*.config.store.start.timeout.minutes=30 ######### Properties for Feed SLA Monitoring ######### # frequency of serialization for the state of FeedSLAMonitoringService - 1 hour From 759e88929ab822bc74d39fa19b837537f7c4423e Mon Sep 17 00:00:00 2001 From: sandeep Date: Sat, 13 Aug 2016 16:22:53 +0530 Subject: [PATCH 3/7] FALCON-2112. Set property value to set map memory for replication and retention --- common/src/main/resources/runtime.properties | 2 ++ .../engine/oozie/retention/AgeBasedWorkflowBuilder.java | 5 +++++ .../src/main/resources/action/feed/eviction-action.xml | 1 + .../falcon/oozie/feed/FeedReplicationWorkflowBuilder.java | 8 ++++++++ .../falcon/oozie/feed/FeedRetentionWorkflowBuilder.java | 7 +++++++ oozie/src/main/resources/action/feed/eviction-action.xml | 5 +++++ .../src/main/resources/action/feed/replication-action.xml | 5 +++++ .../falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java | 6 ++++-- oozie/src/test/resources/feed/feed.xml | 1 + .../resources/feed/fs-local-retention-lifecycle-feed.xml | 1 + .../test/resources/feed/fs-replication-feed-counters.xml | 1 + oozie/src/test/resources/feed/fs-replication-feed.xml | 1 + oozie/src/test/resources/feed/fs-retention-feed.xml | 1 + .../test/resources/feed/fs-retention-lifecycle-feed.xml | 1 + webapp/src/test/resources/runtime.properties | 1 + 15 files changed, 44 insertions(+), 2 deletions(-) diff --git a/common/src/main/resources/runtime.properties b/common/src/main/resources/runtime.properties index ba4c0558d..505171710 100644 --- a/common/src/main/resources/runtime.properties +++ b/common/src/main/resources/runtime.properties @@ -23,8 +23,10 @@ *.falcon.replication.workflow.maxmaps=5 *.falcon.replication.workflow.mapbandwidth=100 +*.falcon.feed.workflow.mapreduce.map.memory.mb=512 *.webservices.default.results.per.page=10 + # If true, do not run retention past feedCluster validity end time. # This will retain recent instances beyond feedCluster validity end time. *.falcon.retention.keep.instances.beyond.validity=true diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java index dd0c6d2ad..7cc62371e 100644 --- a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java +++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java @@ -29,6 +29,7 @@ import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.feed.Property; import org.apache.falcon.entity.v0.feed.RetentionStage; import org.apache.falcon.lifecycle.engine.oozie.utils.OozieBuilderUtils; import org.apache.falcon.lifecycle.retention.AgeBasedDelete; @@ -98,6 +99,10 @@ public static Properties build(Cluster cluster, Path basePath, Feed feed) throws props.put(OozieBuilderUtils.MR_JOB_PRIORITY, retentionStage.getPriority()); } + for (Property retentionStageProperty : retentionStage.getProperties().getProperties()) { + props.put(retentionStageProperty.getName(), retentionStageProperty.getValue()); + } + if (EntityUtil.isTableStorageType(cluster, feed)) { setupHiveCredentials(cluster, buildPath, workflow); // copy paste todo kludge send source hcat creds for coord dependency check to pass diff --git a/lifecycle/src/main/resources/action/feed/eviction-action.xml b/lifecycle/src/main/resources/action/feed/eviction-action.xml index bded1d6bf..64635c256 100644 --- a/lifecycle/src/main/resources/action/feed/eviction-action.xml +++ b/lifecycle/src/main/resources/action/feed/eviction-action.xml @@ -39,6 +39,7 @@ org.apache.falcon.retention.FeedEvictor + -Dmapreduce.map.memory.mb={mapMemory} -feedBasePath ${feedDataPath} -falconFeedStorageType diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java index db647aa6f..109381ea3 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java @@ -48,6 +48,7 @@ public abstract class FeedReplicationWorkflowBuilder extends OozieOrchestrationW protected static final String REPLICATION_ACTION_NAME = "replication"; private static final String MR_MAX_MAPS = "maxMaps"; private static final String MR_MAP_BANDWIDTH = "mapBandwidth"; + private static final String MR_MAP_MEMORY = "mapMemory"; private static final String REPLICATION_JOB_COUNTER = "job.counter"; private static final String TDE_ENCRYPTION_ENABLED = "tdeEncryptionEnabled"; @@ -96,6 +97,9 @@ protected Properties getWorkflowProperties(Feed feed) throws FalconException { if (props.getProperty(MR_MAP_BANDWIDTH) == null) { // set default if user has not overridden props.put(MR_MAP_BANDWIDTH, getDefaultMapBandwidth()); } + if (props.getProperty(MR_MAP_MEMORY) == null) { // set default memory if user has not overridden + props.put(MR_MAP_MEMORY, getDefaultMapMemory()); + } return props; } @@ -145,6 +149,10 @@ private String getDefaultMapBandwidth() { return RuntimeProperties.get().getProperty("falcon.replication.workflow.mapbandwidth", "100"); } + private String getDefaultMapMemory() { + return RuntimeProperties.get().getProperty("falcon.feed.workflow.mapreduce.map.memory.mb", "512"); + } + private boolean isTDEEnabled() { String tdeEncryptionEnabled = FeedHelper.getPropertyValue(entity, TDE_ENCRYPTION_ENABLED); return "true" .equalsIgnoreCase(tdeEncryptionEnabled); diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java index fd51ed06f..5ed63513d 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java @@ -30,6 +30,7 @@ import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder; import org.apache.falcon.oozie.workflow.ACTION; import org.apache.falcon.oozie.workflow.WORKFLOWAPP; +import org.apache.falcon.util.RuntimeProperties; import org.apache.falcon.workflow.WorkflowExecutionArgs; import org.apache.falcon.workflow.WorkflowExecutionContext; import org.apache.hadoop.fs.Path; @@ -43,6 +44,7 @@ public class FeedRetentionWorkflowBuilder extends OozieOrchestrationWorkflowBuil private static final String EVICTION_ACTION_TEMPLATE = "/action/feed/eviction-action.xml"; private static final String EVICTION_ACTION_NAME = "eviction"; + private static final String MR_MAP_MEMORY = "mapMemory"; public FeedRetentionWorkflowBuilder(Feed entity) { super(entity, LifeCycle.EVICTION); @@ -63,6 +65,11 @@ public FeedRetentionWorkflowBuilder(Feed entity) { props.putAll(createDefaultConfiguration(cluster)); props.putAll(FeedHelper.getUserWorkflowProperties(getLifecycle())); + if (props.getProperty(MR_MAP_MEMORY) == null) { // set default memory if user has not overridden + props.put(MR_MAP_MEMORY, + RuntimeProperties.get().getProperty("falcon.feed.workflow.mapreduce.map.memory.mb", "512")); + } + if (EntityUtil.isTableStorageType(cluster, entity)) { setupHiveCredentials(cluster, buildPath, workflow); // todo: kludge send source hcat creds for coord dependency check to pass diff --git a/oozie/src/main/resources/action/feed/eviction-action.xml b/oozie/src/main/resources/action/feed/eviction-action.xml index bded1d6bf..07a55fd98 100644 --- a/oozie/src/main/resources/action/feed/eviction-action.xml +++ b/oozie/src/main/resources/action/feed/eviction-action.xml @@ -37,8 +37,13 @@ oozie.launcher.oozie.libpath ${wf:conf("falcon.libpath")} + + oozie.launcher.mapreduce.map.memory.mb + ${mapMemory} + org.apache.falcon.retention.FeedEvictor + -Dmapreduce.map.memory.mb=${mapMemory} -feedBasePath ${feedDataPath} -falconFeedStorageType diff --git a/oozie/src/main/resources/action/feed/replication-action.xml b/oozie/src/main/resources/action/feed/replication-action.xml index ff8f4f39d..1493f0988 100644 --- a/oozie/src/main/resources/action/feed/replication-action.xml +++ b/oozie/src/main/resources/action/feed/replication-action.xml @@ -41,11 +41,16 @@ oozie.launcher.oozie.libpath ${wf:conf("falcon.libpath")} + + oozie.launcher.mapreduce.map.memory.mb + ${mapMemory} + org.apache.falcon.replication.FeedReplicator -Dfalcon.include.path=${sourceRelativePaths} -Dmapred.job.queue.name=${queueName} -Dmapred.job.priority=${jobPriority} + -Dmapreduce.map.memory.mb=${mapMemory} -maxMaps ${maxMaps} -mapBandwidth diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java index fde5532d4..e9044df52 100644 --- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java +++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java @@ -200,6 +200,7 @@ public void testRetentionWithLifecycle(String keepInstancesPostValidity, String Assert.assertEquals(wfProps.get("queueName"), "retention"); Assert.assertEquals(wfProps.get("limit"), "hours(2)"); Assert.assertEquals(wfProps.get("jobPriority"), "LOW"); + Assert.assertEquals(wfProps.get("mapMemory"), "1024"); } @Test @@ -392,6 +393,7 @@ public void testReplicationCoordsForFSStorage() throws Exception { Assert.assertEquals(wfProps.get("jobPriority"), "NORMAL"); Assert.assertEquals(wfProps.get("maxMaps"), "5"); Assert.assertEquals(wfProps.get("mapBandwidth"), "100"); + Assert.assertEquals(wfProps.get("mapMemory"), "1024"); assertLibExtensions(coord, "replication"); WORKFLOWAPP wf = getWorkflowapp(trgMiniDFS.getFileSystem(), coord); @@ -501,9 +503,9 @@ private void assertReplCoord(COORDINATORAPP coord, Feed aFeed, Cluster aCluster, JAVA replication = replicationActionNode.getJava(); List args = replication.getArg(); if (args.contains("-counterLogDir")) { - Assert.assertEquals(args.size(), 17); + Assert.assertEquals(args.size(), 18); } else { - Assert.assertEquals(args.size(), 15); + Assert.assertEquals(args.size(), 16); } HashMap props = getCoordProperties(coord); diff --git a/oozie/src/test/resources/feed/feed.xml b/oozie/src/test/resources/feed/feed.xml index 6e3126273..ee146804c 100644 --- a/oozie/src/test/resources/feed/feed.xml +++ b/oozie/src/test/resources/feed/feed.xml @@ -53,5 +53,6 @@ + diff --git a/oozie/src/test/resources/feed/fs-local-retention-lifecycle-feed.xml b/oozie/src/test/resources/feed/fs-local-retention-lifecycle-feed.xml index bdf1e59dd..4a4e18fea 100644 --- a/oozie/src/test/resources/feed/fs-local-retention-lifecycle-feed.xml +++ b/oozie/src/test/resources/feed/fs-local-retention-lifecycle-feed.xml @@ -38,6 +38,7 @@ HIGH + diff --git a/oozie/src/test/resources/feed/fs-replication-feed-counters.xml b/oozie/src/test/resources/feed/fs-replication-feed-counters.xml index 230e2b096..2518a9a89 100644 --- a/oozie/src/test/resources/feed/fs-replication-feed-counters.xml +++ b/oozie/src/test/resources/feed/fs-replication-feed-counters.xml @@ -55,5 +55,6 @@ + diff --git a/oozie/src/test/resources/feed/fs-replication-feed.xml b/oozie/src/test/resources/feed/fs-replication-feed.xml index 0e9065c27..53afcddc9 100644 --- a/oozie/src/test/resources/feed/fs-replication-feed.xml +++ b/oozie/src/test/resources/feed/fs-replication-feed.xml @@ -64,5 +64,6 @@ + diff --git a/oozie/src/test/resources/feed/fs-retention-feed.xml b/oozie/src/test/resources/feed/fs-retention-feed.xml index 7eb85fa3d..b655c3414 100644 --- a/oozie/src/test/resources/feed/fs-retention-feed.xml +++ b/oozie/src/test/resources/feed/fs-retention-feed.xml @@ -45,6 +45,7 @@ + diff --git a/oozie/src/test/resources/feed/fs-retention-lifecycle-feed.xml b/oozie/src/test/resources/feed/fs-retention-lifecycle-feed.xml index 2cadfe001..8ddfdfb8d 100644 --- a/oozie/src/test/resources/feed/fs-retention-lifecycle-feed.xml +++ b/oozie/src/test/resources/feed/fs-retention-lifecycle-feed.xml @@ -45,6 +45,7 @@ + diff --git a/webapp/src/test/resources/runtime.properties b/webapp/src/test/resources/runtime.properties index 7dec191a8..d1c47613d 100644 --- a/webapp/src/test/resources/runtime.properties +++ b/webapp/src/test/resources/runtime.properties @@ -23,6 +23,7 @@ *.falcon.replication.workflow.maxmaps=5 *.falcon.replication.workflow.mapbandwidth=100 +*.falcon.feed.workflow.mapreduce.map.memory.mb=512 *.webservices.default.results.per.page=10 # If true, do not run retention past feedCluster validity end time. From 4f9b32c4ed7504e3e6f2b4397af406bf9b363979 Mon Sep 17 00:00:00 2001 From: sandeep Date: Sat, 13 Aug 2016 16:25:24 +0530 Subject: [PATCH 4/7] FALCON-2112. Set property value to set map memory for replication and retention. Removed extra unwanted line --- common/src/main/resources/runtime.properties | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/common/src/main/resources/runtime.properties b/common/src/main/resources/runtime.properties index 505171710..d0ea24ec5 100644 --- a/common/src/main/resources/runtime.properties +++ b/common/src/main/resources/runtime.properties @@ -25,8 +25,7 @@ *.falcon.replication.workflow.mapbandwidth=100 *.falcon.feed.workflow.mapreduce.map.memory.mb=512 *.webservices.default.results.per.page=10 - - + # If true, do not run retention past feedCluster validity end time. # This will retain recent instances beyond feedCluster validity end time. *.falcon.retention.keep.instances.beyond.validity=true From dd93d27df41d7fb1433dd806a29e0ea98c7fe26d Mon Sep 17 00:00:00 2001 From: sandeep Date: Tue, 16 Aug 2016 13:44:44 +0530 Subject: [PATCH 5/7] FALCON-2112 Incorporated review comments. Added Launcher memory to all eviction actions and updated the documentation --- docs/src/site/twiki/EntitySpecification.twiki | 4 +++- lifecycle/src/main/resources/action/feed/eviction-action.xml | 4 ++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/docs/src/site/twiki/EntitySpecification.twiki b/docs/src/site/twiki/EntitySpecification.twiki index 11d1e1b0b..b62ad8bb8 100644 --- a/docs/src/site/twiki/EntitySpecification.twiki +++ b/docs/src/site/twiki/EntitySpecification.twiki @@ -392,6 +392,7 @@ permission indicates the permission. + @@ -416,7 +417,8 @@ waiting for the feed instance, parallel decides the concurrent replication insta order decides the execution order for replication instances like FIFO, LIFO and LAST_ONLY. DistCp options can be passed as custom properties, which will be propagated to the DistCp tool. "maxMaps" represents the maximum number of maps used during replication. "mapBandwidth" represents the bandwidth in MB/s -used by each mapper during replication. "overwrite" represents overwrite destination during replication. +used by each mapper during replication. "mapMemory" represents the mapreduce map memory in mb to be specified to the +respective replication and retention jobs. "overwrite" represents overwrite destination during replication. "ignoreErrors" represents ignore failures not causing the job to fail during replication. "skipChecksum" represents bypassing checksum verification during replication. "removeDeletedFiles" represents deleting the files existing in the destination but not in source during replication. "preserveBlockSize" represents preserving block size during diff --git a/lifecycle/src/main/resources/action/feed/eviction-action.xml b/lifecycle/src/main/resources/action/feed/eviction-action.xml index 64635c256..0a7c3803a 100644 --- a/lifecycle/src/main/resources/action/feed/eviction-action.xml +++ b/lifecycle/src/main/resources/action/feed/eviction-action.xml @@ -37,6 +37,10 @@ oozie.launcher.oozie.libpath ${wf:conf("falcon.libpath")} + + oozie.launcher.mapreduce.map.memory.mb + ${mapMemory} + org.apache.falcon.retention.FeedEvictor -Dmapreduce.map.memory.mb={mapMemory} From e59b3b5c150cd7f9f3638074f2b71593b24ac1ff Mon Sep 17 00:00:00 2001 From: sandeep Date: Sun, 21 Aug 2016 13:01:27 +0530 Subject: [PATCH 6/7] FALCON-2112 Adding AM memory, am command opts and map Java Opts to configs for feed workflows --- common/src/main/resources/runtime.properties | 4 +++ docs/src/site/twiki/EntitySpecification.twiki | 11 ++++++-- .../resources/action/feed/eviction-action.xml | 15 ++++++++++ .../feed/FeedReplicationWorkflowBuilder.java | 28 ++++++++++++++++++- .../feed/FeedRetentionWorkflowBuilder.java | 22 +++++++++++++-- .../resources/action/feed/eviction-action.xml | 17 ++++++++++- .../action/feed/replication-action.xml | 17 ++++++++++- .../feed/OozieFeedWorkflowBuilderTest.java | 12 ++++++-- oozie/src/test/resources/feed/feed.xml | 5 +++- .../fs-local-retention-lifecycle-feed.xml | 5 +++- .../feed/fs-replication-feed-counters.xml | 5 +++- .../resources/feed/fs-replication-feed.xml | 5 +++- .../test/resources/feed/fs-retention-feed.xml | 5 +++- .../feed/fs-retention-lifecycle-feed.xml | 3 ++ webapp/src/test/resources/runtime.properties | 3 ++ 15 files changed, 143 insertions(+), 14 deletions(-) diff --git a/common/src/main/resources/runtime.properties b/common/src/main/resources/runtime.properties index d0ea24ec5..79faedf0a 100644 --- a/common/src/main/resources/runtime.properties +++ b/common/src/main/resources/runtime.properties @@ -23,7 +23,11 @@ *.falcon.replication.workflow.maxmaps=5 *.falcon.replication.workflow.mapbandwidth=100 +*.falcon.feed.workflow.yarn.app.mapreduce.am.resource.mb=512 +*.falcon.feed.workflow.yarn.app.mapreduce.am.command-opts=-Xmx400m -XX:MaxMetaspaceSize=64m *.falcon.feed.workflow.mapreduce.map.memory.mb=512 +*.falcon.feed.workflow.mapreduce.map.java.opts=-Xmx400m -XX:MaxMetaspaceSize=64m + *.webservices.default.results.per.page=10 # If true, do not run retention past feedCluster validity end time. diff --git a/docs/src/site/twiki/EntitySpecification.twiki b/docs/src/site/twiki/EntitySpecification.twiki index b62ad8bb8..6c6a13857 100644 --- a/docs/src/site/twiki/EntitySpecification.twiki +++ b/docs/src/site/twiki/EntitySpecification.twiki @@ -392,7 +392,10 @@ permission indicates the permission. + + + @@ -417,8 +420,12 @@ waiting for the feed instance, parallel decides the concurrent replication insta order decides the execution order for replication instances like FIFO, LIFO and LAST_ONLY. DistCp options can be passed as custom properties, which will be propagated to the DistCp tool. "maxMaps" represents the maximum number of maps used during replication. "mapBandwidth" represents the bandwidth in MB/s -used by each mapper during replication. "mapMemory" represents the mapreduce map memory in mb to be specified to the -respective replication and retention jobs. "overwrite" represents overwrite destination during replication. +used by each mapper during replication. +"mapMemory" represents the mapreduce map memory in mb to be specified to the respective replication and retention jobs. +"mapJavaOpts" represents the mapreduce java opts to be specified to therespective replication and retention jobs. +"amMemory" represents the application master memory in mb to be specified to the respective replication and retention +application masters. "amJavaOpts" represents the application master java opts to be specified to the respective +replication and retention application masters. "overwrite" represents overwrite destination during replication. "ignoreErrors" represents ignore failures not causing the job to fail during replication. "skipChecksum" represents bypassing checksum verification during replication. "removeDeletedFiles" represents deleting the files existing in the destination but not in source during replication. "preserveBlockSize" represents preserving block size during diff --git a/lifecycle/src/main/resources/action/feed/eviction-action.xml b/lifecycle/src/main/resources/action/feed/eviction-action.xml index 0a7c3803a..df00beb0c 100644 --- a/lifecycle/src/main/resources/action/feed/eviction-action.xml +++ b/lifecycle/src/main/resources/action/feed/eviction-action.xml @@ -37,13 +37,28 @@ oozie.launcher.oozie.libpath ${wf:conf("falcon.libpath")} + + oozie.launcher.yarn.app.mapreduce.am.resource.mb + ${amMemory} + + + oozie.launcher.yarn.app.mapreduce.am.command-opts + ${amCommandOpts} + oozie.launcher.mapreduce.map.memory.mb ${mapMemory} + + oozie.launcher.mapreduce.map.java.opts + ${mapJavaOpts} + org.apache.falcon.retention.FeedEvictor + -Dmapreduce.map.memory.mb={amMemory} + -Dmapreduce.map.memory.mb={amCommandOpts} -Dmapreduce.map.memory.mb={mapMemory} + -Dmapreduce.map.memory.mb={mapJavaOpts} -feedBasePath ${feedDataPath} -falconFeedStorageType diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java index 109381ea3..ff83d73b9 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java @@ -48,7 +48,10 @@ public abstract class FeedReplicationWorkflowBuilder extends OozieOrchestrationW protected static final String REPLICATION_ACTION_NAME = "replication"; private static final String MR_MAX_MAPS = "maxMaps"; private static final String MR_MAP_BANDWIDTH = "mapBandwidth"; + private static final String MR_AM_MEMORY = "amMemory"; + private static final String MR_AM_COMMAND_OPTS = "amCommandOpts"; private static final String MR_MAP_MEMORY = "mapMemory"; + private static final String MR_MAP_JAVA_OPTS = "mapJavaOpts"; private static final String REPLICATION_JOB_COUNTER = "job.counter"; private static final String TDE_ENCRYPTION_ENABLED = "tdeEncryptionEnabled"; @@ -97,9 +100,18 @@ protected Properties getWorkflowProperties(Feed feed) throws FalconException { if (props.getProperty(MR_MAP_BANDWIDTH) == null) { // set default if user has not overridden props.put(MR_MAP_BANDWIDTH, getDefaultMapBandwidth()); } - if (props.getProperty(MR_MAP_MEMORY) == null) { // set default memory if user has not overridden + if (props.getProperty(MR_AM_MEMORY) == null) { // set default app master memory if user has not overridden + props.put(MR_AM_MEMORY, getDefaultAmMemory()); + } + if (props.getProperty(MR_AM_COMMAND_OPTS) == null) { // set default app maseter opts if user has not overridden + props.put(MR_AM_COMMAND_OPTS, getDefaultAmCommandOpts()); + } + if (props.getProperty(MR_MAP_MEMORY) == null) { // set default map memory if user has not overridden props.put(MR_MAP_MEMORY, getDefaultMapMemory()); } + if (props.getProperty(MR_MAP_JAVA_OPTS) == null) { // set default map java opts if user has not overridden + props.put(MR_MAP_JAVA_OPTS, getDefaultMapJavaOpts()); + } return props; } @@ -149,10 +161,24 @@ private String getDefaultMapBandwidth() { return RuntimeProperties.get().getProperty("falcon.replication.workflow.mapbandwidth", "100"); } + private String getDefaultAmMemory() { + return RuntimeProperties.get().getProperty("falcon.feed.workflow.yarn.app.mapreduce.am.resource.mb", "512"); + } + + private String getDefaultAmCommandOpts() { + return RuntimeProperties.get().getProperty("falcon.feed.workflow.yarn.app.mapreduce.am.command-opts", + "-Xmx400m -XX:MaxMetaspaceSize=64m"); + } + private String getDefaultMapMemory() { return RuntimeProperties.get().getProperty("falcon.feed.workflow.mapreduce.map.memory.mb", "512"); } + private String getDefaultMapJavaOpts() { + return RuntimeProperties.get().getProperty("falcon.feed.workflow.mapreduce.map.java.opts", + "-Xmx400m -XX:MaxMetaspaceSize=64m"); + } + private boolean isTDEEnabled() { String tdeEncryptionEnabled = FeedHelper.getPropertyValue(entity, TDE_ENCRYPTION_ENABLED); return "true" .equalsIgnoreCase(tdeEncryptionEnabled); diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java index 5ed63513d..8e85c254e 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java @@ -44,7 +44,11 @@ public class FeedRetentionWorkflowBuilder extends OozieOrchestrationWorkflowBuil private static final String EVICTION_ACTION_TEMPLATE = "/action/feed/eviction-action.xml"; private static final String EVICTION_ACTION_NAME = "eviction"; + private static final String MR_AM_MEMORY = "amMemory"; + private static final String MR_AM_COMMAND_OPTS = "amCommandOpts"; private static final String MR_MAP_MEMORY = "mapMemory"; + private static final String MR_MAP_JAVA_OPTS = "mapJavaOpts"; + public FeedRetentionWorkflowBuilder(Feed entity) { super(entity, LifeCycle.EVICTION); @@ -65,9 +69,23 @@ public FeedRetentionWorkflowBuilder(Feed entity) { props.putAll(createDefaultConfiguration(cluster)); props.putAll(FeedHelper.getUserWorkflowProperties(getLifecycle())); + if (props.getProperty(MR_AM_MEMORY) == null) { // set default app master memory if user has not overridden + props.put(MR_AM_MEMORY, RuntimeProperties.get() + .getProperty("falcon.feed.workflow.yarn.app.mapreduce.am.resource.mb", "512")); + } + if (props.getProperty(MR_AM_COMMAND_OPTS) == null) { // set default app maseter opts if user has not overridden + props.put(MR_AM_COMMAND_OPTS, RuntimeProperties.get() + .getProperty("falcon.feed.workflow.yarn.app.mapreduce.am.command-opts", + "-Xmx400m -XX:MaxMetaspaceSize=64m")); + } if (props.getProperty(MR_MAP_MEMORY) == null) { // set default memory if user has not overridden - props.put(MR_MAP_MEMORY, - RuntimeProperties.get().getProperty("falcon.feed.workflow.mapreduce.map.memory.mb", "512")); + props.put(MR_MAP_MEMORY, RuntimeProperties.get(). + getProperty("falcon.feed.workflow.mapreduce.map.memory.mb", "512")); + } + if (props.getProperty(MR_MAP_JAVA_OPTS) == null) { // set default map java opts if user has not overridden + props.put(MR_MAP_JAVA_OPTS, + RuntimeProperties.get().getProperty("falcon.feed.workflow.mapreduce.map.java.opts", + "-Xmx400m -XX:MaxMetaspaceSize=64m")); } if (EntityUtil.isTableStorageType(cluster, entity)) { diff --git a/oozie/src/main/resources/action/feed/eviction-action.xml b/oozie/src/main/resources/action/feed/eviction-action.xml index 07a55fd98..df00beb0c 100644 --- a/oozie/src/main/resources/action/feed/eviction-action.xml +++ b/oozie/src/main/resources/action/feed/eviction-action.xml @@ -37,13 +37,28 @@ oozie.launcher.oozie.libpath ${wf:conf("falcon.libpath")} + + oozie.launcher.yarn.app.mapreduce.am.resource.mb + ${amMemory} + + + oozie.launcher.yarn.app.mapreduce.am.command-opts + ${amCommandOpts} + oozie.launcher.mapreduce.map.memory.mb ${mapMemory} + + oozie.launcher.mapreduce.map.java.opts + ${mapJavaOpts} + org.apache.falcon.retention.FeedEvictor - -Dmapreduce.map.memory.mb=${mapMemory} + -Dmapreduce.map.memory.mb={amMemory} + -Dmapreduce.map.memory.mb={amCommandOpts} + -Dmapreduce.map.memory.mb={mapMemory} + -Dmapreduce.map.memory.mb={mapJavaOpts} -feedBasePath ${feedDataPath} -falconFeedStorageType diff --git a/oozie/src/main/resources/action/feed/replication-action.xml b/oozie/src/main/resources/action/feed/replication-action.xml index 1493f0988..9f9503e42 100644 --- a/oozie/src/main/resources/action/feed/replication-action.xml +++ b/oozie/src/main/resources/action/feed/replication-action.xml @@ -41,16 +41,31 @@ oozie.launcher.oozie.libpath ${wf:conf("falcon.libpath")} + + oozie.launcher.yarn.app.mapreduce.am.resource.mb + ${amMemory} + + + oozie.launcher.yarn.app.mapreduce.am.command-opts + ${amCommandOpts} + oozie.launcher.mapreduce.map.memory.mb ${mapMemory} + + oozie.launcher.mapreduce.map.java.opts + ${mapJavaOpts} + org.apache.falcon.replication.FeedReplicator -Dfalcon.include.path=${sourceRelativePaths} -Dmapred.job.queue.name=${queueName} -Dmapred.job.priority=${jobPriority} - -Dmapreduce.map.memory.mb=${mapMemory} + -Dmapreduce.map.memory.mb={amMemory} + -Dmapreduce.map.memory.mb={amCommandOpts} + -Dmapreduce.map.memory.mb={mapMemory} + -Dmapreduce.map.memory.mb={mapJavaOpts} -maxMaps ${maxMaps} -mapBandwidth diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java index ee1de7a54..3f2079362 100644 --- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java +++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java @@ -201,7 +201,11 @@ public void testRetentionWithLifecycle(String keepInstancesPostValidity, String Assert.assertEquals(wfProps.get("queueName"), "ageBasedDeleteQueue"); Assert.assertEquals(wfProps.get("limit"), "hours(2)"); Assert.assertEquals(wfProps.get("jobPriority"), "LOW"); + + Assert.assertEquals(wfProps.get("amMemory"), "1024"); + Assert.assertEquals(wfProps.get("amCommandOpts"), "-Xmx800m -XX:MaxMetaspaceSize=128m"); Assert.assertEquals(wfProps.get("mapMemory"), "1024"); + Assert.assertEquals(wfProps.get("mapJavaOpts"), "-Xmx800m -XX:MaxMetaspaceSize=128m"); } @Test @@ -394,7 +398,11 @@ public void testReplicationCoordsForFSStorage() throws Exception { Assert.assertEquals(wfProps.get("jobPriority"), "NORMAL"); Assert.assertEquals(wfProps.get("maxMaps"), "5"); Assert.assertEquals(wfProps.get("mapBandwidth"), "100"); + + Assert.assertEquals(wfProps.get("amMemory"), "1024"); + Assert.assertEquals(wfProps.get("amCommandOpts"), "-Xmx800m -XX:MaxMetaspaceSize=128m"); Assert.assertEquals(wfProps.get("mapMemory"), "1024"); + Assert.assertEquals(wfProps.get("mapJavaOpts"), "-Xmx800m -XX:MaxMetaspaceSize=128m"); assertLibExtensions(coord, "replication"); WORKFLOWAPP wf = getWorkflowapp(trgMiniDFS.getFileSystem(), coord); @@ -504,9 +512,9 @@ private void assertReplCoord(COORDINATORAPP coord, Feed aFeed, Cluster aCluster, JAVA replication = replicationActionNode.getJava(); List args = replication.getArg(); if (args.contains("-counterLogDir")) { - Assert.assertEquals(args.size(), 18); + Assert.assertEquals(args.size(), 21); } else { - Assert.assertEquals(args.size(), 16); + Assert.assertEquals(args.size(), 19); } HashMap props = getCoordProperties(coord); diff --git a/oozie/src/test/resources/feed/feed.xml b/oozie/src/test/resources/feed/feed.xml index ee146804c..10271352d 100644 --- a/oozie/src/test/resources/feed/feed.xml +++ b/oozie/src/test/resources/feed/feed.xml @@ -53,6 +53,9 @@ - + + + + diff --git a/oozie/src/test/resources/feed/fs-local-retention-lifecycle-feed.xml b/oozie/src/test/resources/feed/fs-local-retention-lifecycle-feed.xml index 4a4e18fea..145708a04 100644 --- a/oozie/src/test/resources/feed/fs-local-retention-lifecycle-feed.xml +++ b/oozie/src/test/resources/feed/fs-local-retention-lifecycle-feed.xml @@ -38,7 +38,10 @@ HIGH - + + + + diff --git a/oozie/src/test/resources/feed/fs-replication-feed-counters.xml b/oozie/src/test/resources/feed/fs-replication-feed-counters.xml index 2518a9a89..aa9eaec18 100644 --- a/oozie/src/test/resources/feed/fs-replication-feed-counters.xml +++ b/oozie/src/test/resources/feed/fs-replication-feed-counters.xml @@ -55,6 +55,9 @@ - + + + + diff --git a/oozie/src/test/resources/feed/fs-replication-feed.xml b/oozie/src/test/resources/feed/fs-replication-feed.xml index 53afcddc9..b6a34c6a1 100644 --- a/oozie/src/test/resources/feed/fs-replication-feed.xml +++ b/oozie/src/test/resources/feed/fs-replication-feed.xml @@ -64,6 +64,9 @@ - + + + + diff --git a/oozie/src/test/resources/feed/fs-retention-feed.xml b/oozie/src/test/resources/feed/fs-retention-feed.xml index b655c3414..6a144ffb2 100644 --- a/oozie/src/test/resources/feed/fs-retention-feed.xml +++ b/oozie/src/test/resources/feed/fs-retention-feed.xml @@ -45,7 +45,10 @@ - + + + + diff --git a/oozie/src/test/resources/feed/fs-retention-lifecycle-feed.xml b/oozie/src/test/resources/feed/fs-retention-lifecycle-feed.xml index 04725d79e..55aa228c1 100644 --- a/oozie/src/test/resources/feed/fs-retention-lifecycle-feed.xml +++ b/oozie/src/test/resources/feed/fs-retention-lifecycle-feed.xml @@ -45,7 +45,10 @@ + + + diff --git a/webapp/src/test/resources/runtime.properties b/webapp/src/test/resources/runtime.properties index d1c47613d..179336c75 100644 --- a/webapp/src/test/resources/runtime.properties +++ b/webapp/src/test/resources/runtime.properties @@ -23,7 +23,10 @@ *.falcon.replication.workflow.maxmaps=5 *.falcon.replication.workflow.mapbandwidth=100 +*.falcon.feed.workflow.yarn.app.mapreduce.am.resource.mb=512 +*.falcon.feed.workflow.yarn.app.mapreduce.am.command-opts=-Xmx400m -XX:MaxMetaspaceSize=64m *.falcon.feed.workflow.mapreduce.map.memory.mb=512 +*.falcon.feed.workflow.mapreduce.map.java.opts=-Xmx400m -XX:MaxMetaspaceSize=64m *.webservices.default.results.per.page=10 # If true, do not run retention past feedCluster validity end time. From d603fa9e2ea71f83c50749907bdd78337beca9c3 Mon Sep 17 00:00:00 2001 From: sandeep Date: Tue, 30 Aug 2016 14:16:53 +0530 Subject: [PATCH 7/7] Rebased the patch and made few documentation corrections for formatting --- docs/src/site/twiki/EntitySpecification.twiki | 34 ++++++++++++------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/docs/src/site/twiki/EntitySpecification.twiki b/docs/src/site/twiki/EntitySpecification.twiki index 6c6a13857..7b23aad0e 100644 --- a/docs/src/site/twiki/EntitySpecification.twiki +++ b/docs/src/site/twiki/EntitySpecification.twiki @@ -418,22 +418,30 @@ available to user to specify the Hadoop job queue and priority, the same values "timeout", "parallel" and "order" are other special properties which decides replication instance's timeout value while waiting for the feed instance, parallel decides the concurrent replication instances that can run at any given time and order decides the execution order for replication instances like FIFO, LIFO and LAST_ONLY. -DistCp options can be passed as custom properties, which will be propagated to the DistCp tool. "maxMaps" represents -the maximum number of maps used during replication. "mapBandwidth" represents the bandwidth in MB/s -used by each mapper during replication. +DistCp options can be passed as custom properties, which will be propagated to the DistCp tool. Below are the few +properties that can be passed as key value properties to propagate into workflow engine. +"maxMaps" represents the maximum number of maps used during replication. +"mapBandwidth" represents the bandwidth in MB/s used by each mapper during replication. "mapMemory" represents the mapreduce map memory in mb to be specified to the respective replication and retention jobs. "mapJavaOpts" represents the mapreduce java opts to be specified to therespective replication and retention jobs. "amMemory" represents the application master memory in mb to be specified to the respective replication and retention -application masters. "amJavaOpts" represents the application master java opts to be specified to the respective -replication and retention application masters. "overwrite" represents overwrite destination during replication. -"ignoreErrors" represents ignore failures not causing the job to fail during replication. "skipChecksum" represents -bypassing checksum verification during replication. "removeDeletedFiles" represents deleting the files existing in the -destination but not in source during replication. "preserveBlockSize" represents preserving block size during -replication. "preserveReplicationNumber" represents preserving replication number during replication. -"preservePermission" represents preserving permission during replication. "preserveUser" represents preserving user during replication. -"preserveGroup" represents preserving group during replication. "preserveChecksumType" represents preserving checksum type during replication. -"preserveAcl" represents preserving ACL during replication. "preserveXattr" represents preserving Xattr during replication. -"preserveTimes" represents preserving access and modification times during replication. "tdeEncryptionEnabled" if TDE is enabled. +application masters. +"amJavaOpts" represents the application master java opts to be specified to the respective +replication and retention application masters. +"overwrite" represents overwrite destination during replication. +"ignoreErrors" represents ignore failures not causing the job to fail during replication. +"skipChecksum" represents bypassing checksum verification during replication. +"removeDeletedFiles" represents deleting the files existing in the destination but not in source during replication. +"preserveBlockSize" represents preserving block size during replication. +"preserveReplicationNumber" represents preserving replication number during replication. +"preservePermission" represents preserving permission during replication. +"preserveUser" represents preserving user during replication. +"preserveGroup" represents preserving group during replication. +"preserveChecksumType" represents preserving checksum type during replication. +"preserveAcl" represents preserving ACL during replication. +"preserveXattr" represents preserving Xattr during replication. +"preserveTimes" represents preserving access and modification times during replication. +"tdeEncryptionEnabled" if TDE is enabled. ---+++ Lifecycle