From 3b7fd63149f874c0d533957ba9c313d9ff81cb4b Mon Sep 17 00:00:00 2001 From: Pragya Date: Thu, 11 Feb 2016 10:23:51 +0000 Subject: [PATCH 1/3] FALCON-1829 Add regression for submit and schedule process on native scheduler (time based) --- falcon-regression/CHANGES.txt | 2 + .../helpers/entity/AbstractEntityHelper.java | 52 ++-- .../falcon/regression/AuthorizationTest.java | 4 +- .../nativeScheduler/NativeScheduleTest.java | 231 ++++++++++++++++++ .../falcon/regression/security/EntityOp.java | 4 +- .../src/test/resources/sleep/workflow.xml | 85 +++++++ 6 files changed, 354 insertions(+), 24 deletions(-) create mode 100644 falcon-regression/merlin/src/test/java/org/apache/falcon/regression/nativeScheduler/NativeScheduleTest.java create mode 100644 falcon-regression/merlin/src/test/resources/sleep/workflow.xml diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt index b4717f4d5..b3769f031 100644 --- a/falcon-regression/CHANGES.txt +++ b/falcon-regression/CHANGES.txt @@ -5,6 +5,8 @@ Trunk (Unreleased) INCOMPATIBLE CHANGES NEW FEATURES + FALCON-1829 Add regression for submit and schedule process on native scheduler (time based) (Pragya Mittal) + FALCON-1766 Add CLI metrics check for HiveDR, HDFS and feed replication (Paul Isaychuk) FALCON-1777 Add regression for HDFS replication (recipe) (Paul Isaychuk) diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java index 29c97b2ff..27e12d00f 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java @@ -301,44 +301,55 @@ public ServiceResponse validateEntity(String data) public ServiceResponse submitEntity(String data, String user) throws IOException, URISyntaxException, AuthenticationException, InterruptedException { LOGGER.info("Submitting " + getEntityType() + ": \n" + Util.prettyPrintXml(data)); - return Util.sendRequest(createUrl(this.hostname + URLS.SUBMIT_URL.getValue(), - getEntityType() + colo), "post", data, user); + return Util.sendRequest(createUrl(this.hostname + URLS.SUBMIT_URL.getValue(), getEntityType() + colo), "post", + data, user); } public ServiceResponse validateEntity(String data, String user) throws IOException, URISyntaxException, AuthenticationException, InterruptedException { LOGGER.info("Validating " + getEntityType() + ": \n" + Util.prettyPrintXml(data)); - return Util.sendRequest(createUrl(this.hostname + URLS.VALIDATE_URL.getValue(), - getEntityType() + colo), "post", data, user); + return Util.sendRequest(createUrl(this.hostname + URLS.VALIDATE_URL.getValue(), getEntityType() + colo), "post", + data, user); } public ServiceResponse schedule(String processData) throws IOException, URISyntaxException, AuthenticationException, InterruptedException { - return schedule(processData, null); + return schedule(processData, null, ""); } - public ServiceResponse schedule(String processData, String user) + public ServiceResponse schedule(String data, String user, String params) throws IOException, URISyntaxException, AuthenticationException, InterruptedException { - return Util.sendRequest(createUrl(this.hostname + URLS.SCHEDULE_URL.getValue(), - getEntityType(), getEntityName(processData) + colo), "post", user); + + String url = createUrl(this.hostname + URLS.SCHEDULE_URL.getValue(), getEntityType(), + getEntityName(data) + colo); + if (StringUtils.isNotBlank(params)) { + url += (colo.isEmpty() ? "?" : "&") + params; + } + LOGGER.info("url is : " + url); + return Util.sendRequest(createUrl(url), "post", data, user); } public ServiceResponse submitAndSchedule(String data) throws IOException, URISyntaxException, AuthenticationException, InterruptedException { - return submitAndSchedule(data, null); + return submitAndSchedule(data, null, ""); } - public ServiceResponse submitAndSchedule(String data, String user) + public ServiceResponse submitAndSchedule(String data, String user, String params) throws IOException, URISyntaxException, AuthenticationException, InterruptedException { LOGGER.info("Submitting " + getEntityType() + ": \n" + Util.prettyPrintXml(data)); - return Util.sendRequest(createUrl(this.hostname + URLS.SUBMIT_AND_SCHEDULE_URL.getValue(), - getEntityType()), "post", data, user); + + String url = createUrl(this.hostname + URLS.SUBMIT_AND_SCHEDULE_URL.getValue(), getEntityType() + colo); + if (StringUtils.isNotBlank(params)) { + url += (colo.isEmpty() ? "?" : "&") + params; + } + return Util.sendRequest(createUrl(url), "post", data, user); } public ServiceResponse deleteByName(String entityName, String user) throws AuthenticationException, IOException, URISyntaxException, InterruptedException { - return Util.sendRequest(createUrl(this.hostname + URLS.DELETE_URL.getValue(), - getEntityType(), entityName + colo), "delete", user); + return Util.sendRequest( + createUrl(this.hostname + URLS.DELETE_URL.getValue(), getEntityType(), entityName + colo), "delete", + user); } public ServiceResponse delete(String data) @@ -348,8 +359,9 @@ public ServiceResponse delete(String data) public ServiceResponse delete(String data, String user) throws IOException, URISyntaxException, AuthenticationException, InterruptedException { - return Util.sendRequest(createUrl(this.hostname + URLS.DELETE_URL.getValue(), - getEntityType(), getEntityName(data) + colo), "delete", user); + return Util.sendRequest( + createUrl(this.hostname + URLS.DELETE_URL.getValue(), getEntityType(), getEntityName(data) + colo), + "delete", user); } public ServiceResponse suspend(String data) @@ -398,8 +410,9 @@ public ServiceResponse getEntityDefinition(String data, String user) public ServiceResponse getEntityDependencies(String data, String user) throws IOException, URISyntaxException, AuthenticationException, InterruptedException { - return Util.sendRequest(createUrl(this.hostname + URLS.DEPENDENCIES.getValue(), - getEntityType(), getEntityName(data) + colo), "get", user); + return Util.sendRequest( + createUrl(this.hostname + URLS.DEPENDENCIES.getValue(), getEntityType(), getEntityName(data) + colo), + "get", user); } public InstancesResult getRunningInstance(String name) @@ -661,8 +674,7 @@ public InstancesResult listInstances(String entityName, String params, String us */ public ServiceResponse getDependencies(String entityName) throws URISyntaxException, AuthenticationException, InterruptedException, IOException { - String url = createUrl(this.hostname + URLS.DEPENDENCIES.getValue(), getEntityType(), - entityName + colo); + String url = createUrl(this.hostname + URLS.DEPENDENCIES.getValue(), getEntityType(), entityName + colo); return Util.sendRequest(url, "get", null, null); } diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java index 9c37562b2..714a21f81 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java @@ -581,7 +581,7 @@ public void u1ScheduleFeedU2ScheduleDependantProcessU1UpdateFeed() throws Except //by U2 schedule process dependant on scheduled feed by U1 ServiceResponse serviceResponse = prism.getProcessHelper() - .submitAndSchedule(bundles[0].getProcessData(), MerlinConstants.USER2_NAME); + .submitAndSchedule(bundles[0].getProcessData(), MerlinConstants.USER2_NAME, ""); AssertUtil.assertSucceeded(serviceResponse); AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(), Job.Status.RUNNING); @@ -631,7 +631,7 @@ public void u1ScheduleFeedU2ScheduleDependantProcessU2UpdateFeed() throws Except //by U2 schedule process dependent on scheduled feed by U1 ServiceResponse serviceResponse = prism.getProcessHelper().submitAndSchedule(bundles[0].getProcessData(), - MerlinConstants.USER2_NAME); + MerlinConstants.USER2_NAME, ""); AssertUtil.assertSucceeded(serviceResponse); AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(), Job.Status.RUNNING); diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/nativeScheduler/NativeScheduleTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/nativeScheduler/NativeScheduleTest.java new file mode 100644 index 000000000..fe61cdf28 --- /dev/null +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/nativeScheduler/NativeScheduleTest.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.regression.nativeScheduler; +import org.apache.falcon.entity.v0.Frequency; +import org.apache.falcon.regression.Entities.ProcessMerlin; +import org.apache.falcon.regression.core.bundle.Bundle; +import org.apache.falcon.regression.core.helpers.ColoHelper; +import org.apache.falcon.regression.core.response.ServiceResponse; +import org.apache.falcon.regression.core.util.*; +import org.apache.falcon.regression.testHelper.BaseTestClass; +import org.apache.log4j.Logger; +import org.apache.oozie.client.OozieClient; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Schedule process via native scheduler. + */ + +@Test(groups = "distributed") +public class NativeScheduleTest extends BaseTestClass { + private ColoHelper cluster1 = servers.get(0); + private ColoHelper cluster2 = servers.get(1); + private String baseTestHDFSDir = cleanAndGetTestDir(); + private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator"; + private static final Logger LOGGER = Logger.getLogger(NativeScheduleTest.class); + private String startTime; + private String endTime; + + + + @BeforeClass(alwaysRun = true) + public void uploadWorkflow() throws Exception { + uploadDirToClusters(aggregateWorkflowDir, OSUtil.concat(OSUtil.RESOURCES, "sleep")); + } + + @BeforeMethod(alwaysRun = true) + public void setUp() throws Exception { + startTime = TimeUtil.getTimeWrtSystemTime(-10); + endTime = TimeUtil.addMinsToTime(startTime, 50); + LOGGER.info("Time range between : " + startTime + " and " + endTime); + Bundle bundle = BundleUtil.readELBundle(); + + for (int i = 0; i < 2; i++) { + bundles[i] = new Bundle(bundle, servers.get(i)); + bundles[i].generateUniqueBundle(this); + bundles[i].setProcessWorkflow(aggregateWorkflowDir); + bundles[i].submitClusters(prism); + bundles[i].setProcessConcurrency(2); + bundles[i].setProcessValidity(startTime, endTime); + bundles[i].setProcessPeriodicity(1, Frequency.TimeUnit.minutes); + } + } + + @AfterMethod(alwaysRun = true) + public void tearDown() { + removeTestClassEntities(); + } + + + /** + * Successfully schedule process via native scheduler through prism and server on single cluster. + * Schedule the same process on oozie. It should fail. + */ + @Test + public void scheduleProcessWithNativeUsingProperties() throws Exception { + ProcessMerlin processMerlin = bundles[0].getProcessObject(); + processMerlin.setInputs(null); + processMerlin.setOutputs(null); + LOGGER.info(processMerlin.toString()); + + ServiceResponse response = prism.getProcessHelper().submitEntity(processMerlin.toString()); + AssertUtil.assertSucceeded(response); + + // Schedule with prism + response = prism.getProcessHelper().schedule(processMerlin.toString(), null, + "properties=falcon.scheduler:native"); + AssertUtil.assertSucceeded(response); + + // Schedule with server + response = cluster1.getProcessHelper().schedule(processMerlin.toString(), null, + "properties=falcon.scheduler:oozie"); + AssertUtil.assertFailed(response); + + // Schedule with oozie via prism + response = prism.getProcessHelper().schedule(processMerlin.toString(), null, + "properties=falcon.scheduler:oozie"); + AssertUtil.assertFailed(response); + + // Schedule with oozie via server + response = cluster1.getProcessHelper().schedule(processMerlin.toString(), null, + "properties=falcon.scheduler:native"); + AssertUtil.assertSucceeded(response); + + } + + /** + * Successfully schedule process via oozie scheduler (using properties) through prism and server on single cluster. + * Schedule the same process on native scheduler. It should fail. + */ + @Test + public void scheduleProcessWithOozieUsingProperties() throws Exception { + ProcessMerlin processMerlin = bundles[0].getProcessObject(); + processMerlin.setInputs(null); + processMerlin.setOutputs(null); + LOGGER.info(processMerlin.toString()); + + ServiceResponse response = prism.getProcessHelper().submitEntity(processMerlin.toString()); + AssertUtil.assertSucceeded(response); + + // Schedule with prism + response = prism.getProcessHelper().schedule(processMerlin.toString(), null, + "properties=falcon.scheduler:oozie"); + AssertUtil.assertSucceeded(response); + + // Schedule with server + response = cluster1.getProcessHelper().schedule(processMerlin.toString(), null, + "properties=falcon.scheduler:oozie"); + AssertUtil.assertSucceeded(response); + + // Schedule with native via server + response = cluster1.getProcessHelper().schedule(processMerlin.toString(), null, + "properties=falcon.scheduler:native"); + AssertUtil.assertFailed(response); + + // Schedule with native via prism + response = prism.getProcessHelper().schedule(processMerlin.toString(), null, + "properties=falcon.scheduler:native"); + AssertUtil.assertFailed(response); + + } + + /** + * Successfully schedule process via oozie scheduler(without properties) through prism and server on single cluster. + * Schedule the same process on native using properties. It should fail. + */ + @Test + public void scheduleProcessWithOozieWithNoParams() throws Exception { + ProcessMerlin processMerlin = bundles[0].getProcessObject(); + processMerlin.setInputs(null); + processMerlin.setOutputs(null); + LOGGER.info(processMerlin.toString()); + + ServiceResponse response = prism.getProcessHelper().submitEntity(processMerlin.toString()); + AssertUtil.assertSucceeded(response); + + // Schedule with prism + response = prism.getProcessHelper().schedule(processMerlin.toString(), null, ""); + AssertUtil.assertSucceeded(response); + + // Schedule with native via server + response = cluster1.getProcessHelper().schedule(processMerlin.toString(), null, + "properties=falcon.scheduler:native"); + AssertUtil.assertFailed(response); + + // Schedule with native via prism + response = prism.getProcessHelper().schedule(processMerlin.toString(), null, + "properties=falcon.scheduler:native"); + AssertUtil.assertFailed(response); + + } + + /** + * Successfully schedule process via native scheduler through prism and server on multiple cluster. + * Schedule the same process on oozie. It should fail. + */ + @Test(groups = {"prism", "0.2"}) + public void scheduleProcessWithNativeOnTwoClusters() throws Exception { + + ProcessMerlin processMerlinNative = bundles[0].getProcessObject(); + processMerlinNative.clearProcessCluster(); + processMerlinNative.addProcessCluster( + new ProcessMerlin.ProcessClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) + .withValidity(startTime, endTime).build()); + processMerlinNative.addProcessCluster( + new ProcessMerlin.ProcessClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) + .withValidity(startTime, endTime).build()); + processMerlinNative.setInputs(null); + processMerlinNative.setOutputs(null); + LOGGER.info(processMerlinNative.toString()); + + // Schedule with native via prism + ServiceResponse response = prism.getProcessHelper(). + submitAndSchedule(processMerlinNative.toString(), null, "properties=falcon.scheduler:native"); + AssertUtil.assertSucceeded(response); + + // Schedule with native via server1 + response = cluster1.getProcessHelper().schedule(processMerlinNative.toString(), null, + "properties=falcon.scheduler:native"); + AssertUtil.assertSucceeded(response); + + // Schedule with native via server2 + response = cluster2.getProcessHelper().schedule(processMerlinNative.toString(), null, + "properties=falcon.scheduler:native"); + AssertUtil.assertSucceeded(response); + + // Schedule with oozie via prism + response = prism.getProcessHelper().schedule(processMerlinNative.toString(), null, + "properties=falcon.scheduler:oozie"); + AssertUtil.assertFailed(response); + + // Schedule with oozie via server1 + response = cluster1.getProcessHelper().schedule(processMerlinNative.toString(), null, + "properties=falcon.scheduler:oozie"); + AssertUtil.assertFailed(response); + + // Schedule with oozie via server2 + response = cluster2.getProcessHelper().schedule(processMerlinNative.toString(), null, + "properties=falcon.scheduler:oozie"); + AssertUtil.assertFailed(response); + + } + +} diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/security/EntityOp.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/security/EntityOp.java index dbaae9b2c..7b03f326e 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/security/EntityOp.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/security/EntityOp.java @@ -163,7 +163,7 @@ public boolean executeAs(String user, AbstractEntityHelper helper, String data) public boolean executeAs(String user, AbstractEntityHelper helper, String data) { final ServiceResponse response; try { - response = helper.schedule(data, user); + response = helper.schedule(data, user, ""); } catch (IOException e) { logger.error("Caught exception: " + e); return false; @@ -207,7 +207,7 @@ public boolean executeAs(String user, AbstractEntityHelper helper, String data) public boolean executeAs(String user, AbstractEntityHelper helper, String data) { final ServiceResponse response; try { - response = helper.submitAndSchedule(data, user); + response = helper.submitAndSchedule(data, user, ""); } catch (IOException e) { logger.error("Caught exception: " + e); return false; diff --git a/falcon-regression/merlin/src/test/resources/sleep/workflow.xml b/falcon-regression/merlin/src/test/resources/sleep/workflow.xml new file mode 100644 index 000000000..bd7c82192 --- /dev/null +++ b/falcon-regression/merlin/src/test/resources/sleep/workflow.xml @@ -0,0 +1,85 @@ + + + + + + + + + + + + + + ${jobTracker} + ${nameNode} + + + + + + mapred.job.queue.name + ${queueName} + + + mapred.mapper.class + org.apache.hadoop.mapred.lib.IdentityMapper + + + mapred.reducer.class + org.apache.hadoop.mapred.lib.IdentityReducer + + + mapred.map.tasks + 1 + + + mapred.input.dir + ${nameNode}/tmp/falcon-regression/test + + + mapred.output.dir + ${nameNode}/tmp/falcon-regression/test/output/ + + + + + + + + + ${jobTracker} + ${nameNode} + org.apache.hadoop.mapreduce.SleepJob + -m + 1 + -mt + 60000 + -r + 0 + + + + + + + Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + From f5b2888605223b1c15da588902d8f44c2a5b9d9c Mon Sep 17 00:00:00 2001 From: Pragya Date: Mon, 15 Feb 2016 12:01:27 +0000 Subject: [PATCH 2/3] FALCON-1566 Add test for SLA monitoring API --- falcon-regression/CHANGES.txt | 2 + .../helpers/entity/AbstractEntityHelper.java | 17 +- .../core/response/ServiceResponse.java | 10 + .../falcon/regression/core/util/Util.java | 3 +- .../regression/SLA/FeedSLAMonitoringTest.java | 278 ++++++++++++++++++ .../nativeScheduler/NativeScheduleTest.java | 7 +- 6 files changed, 313 insertions(+), 4 deletions(-) create mode 100644 falcon-regression/merlin/src/test/java/org/apache/falcon/regression/SLA/FeedSLAMonitoringTest.java diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt index d22d66283..48edd8d2f 100644 --- a/falcon-regression/CHANGES.txt +++ b/falcon-regression/CHANGES.txt @@ -5,6 +5,8 @@ Trunk (Unreleased) INCOMPATIBLE CHANGES NEW FEATURES + FALCON-1566 Add test for SLA monitoring API (Pragya Mittal) + FALCON-1829 Add regression for submit and schedule process on native scheduler (time based) (Pragya Mittal) FALCON-1766 Add CLI metrics check for HiveDR, HDFS and feed replication (Paul Isaychuk) diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java index 27e12d00f..e1a92889d 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java @@ -701,7 +701,7 @@ public ServiceResponse getEntityLineage(String params) if (StringUtils.isNotEmpty(params)){ url += colo.isEmpty() ? "?" + params : "&" + params; } - return Util.sendRequestLineage(createUrl(url), "get", null, null); + return Util.sendJSONRequest(createUrl(url), "get", null, null); } /** @@ -715,4 +715,19 @@ public InstanceDependencyResult getInstanceDependencies( .createAndSendRequestProcessInstance(url, params, allColo, user); } + /** + * Retrieves sla alerts. + * @param params list of optional parameters + * @return instances with sla missed. + */ + public ServiceResponse getSlaAlert(String params) + throws URISyntaxException, AuthenticationException, InterruptedException, IOException { + String url = createUrl(this.hostname + URLS.SLA.getValue(), + getEntityType()); + if (StringUtils.isNotEmpty(params)) { + url += params; + } + return Util.sendJSONRequest(createUrl(url), "get", null, null); + } + } diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/ServiceResponse.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/ServiceResponse.java index 55e862cf4..f66d426ef 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/ServiceResponse.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/ServiceResponse.java @@ -25,6 +25,7 @@ import org.apache.falcon.resource.EntityList; import org.apache.falcon.resource.EntitySummaryResult; import org.apache.falcon.resource.LineageGraphResult; +import org.apache.falcon.resource.SchedulableEntityInstanceResult; import org.apache.http.HttpResponse; import org.apache.log4j.Logger; @@ -121,4 +122,13 @@ public LineageGraphResult getLineageGraphResult() { return lineageGraphResult; } + /** + * Retrieves SchedulableEntityInstanceResult from a message if possible. + * @return SchedulableEntityInstanceResult + */ + public SchedulableEntityInstanceResult getSlaResult() { + SchedulableEntityInstanceResult schedulableEntityInstanceResult = new GsonBuilder().create().fromJson(message, + SchedulableEntityInstanceResult.class); + return schedulableEntityInstanceResult; + } } diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java index ccd083b84..452effa63 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java @@ -385,6 +385,7 @@ public enum URLS { STATUS_URL("/api/entities/status"), ENTITY_SUMMARY("/api/entities/summary"), SUBMIT_AND_SCHEDULE_URL("/api/entities/submitAndSchedule"), + SLA("/api/entities/sla-alert"), ENTITY_LINEAGE("/api/metadata/lineage/entities"), INSTANCE_RUNNING("/api/instance/running"), INSTANCE_STATUS("/api/instance/status"), @@ -595,7 +596,7 @@ public static Document convertStringToDocument(String xmlStr) { * @throws URISyntaxException * @throws AuthenticationException */ - public static ServiceResponse sendRequestLineage(String url, String method, String data, String user) + public static ServiceResponse sendJSONRequest(String url, String method, String data, String user) throws IOException, URISyntaxException, AuthenticationException, InterruptedException { BaseRequest request = new BaseRequest(url, method, user, data); request.addHeader(RequestKeys.CONTENT_TYPE_HEADER, RequestKeys.JSON_CONTENT_TYPE); diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/SLA/FeedSLAMonitoringTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/SLA/FeedSLAMonitoringTest.java new file mode 100644 index 000000000..fd26c71eb --- /dev/null +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/SLA/FeedSLAMonitoringTest.java @@ -0,0 +1,278 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.regression.SLA; + +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.Frequency; +import org.apache.falcon.regression.Entities.FeedMerlin; +import org.apache.falcon.regression.core.bundle.Bundle; +import org.apache.falcon.regression.core.helpers.ColoHelper; +import org.apache.falcon.regression.core.response.ServiceResponse; +import org.apache.falcon.regression.core.util.HadoopUtil; +import org.apache.falcon.regression.core.util.InstanceUtil; +import org.apache.falcon.regression.core.util.TimeUtil; +import org.apache.falcon.regression.core.util.AssertUtil; +import org.apache.falcon.regression.core.util.BundleUtil; +import org.apache.falcon.regression.testHelper.BaseTestClass; +import org.apache.falcon.resource.SchedulableEntityInstance; +import org.apache.falcon.resource.SchedulableEntityInstanceResult; +import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.Logger; +import org.joda.time.DateTime; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.List; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; +import java.util.ArrayList; +import java.util.Collections; + + +/** + * Feed SLA monitoring tests. + * Test assumes following properties are set in startup.properties of server : + * *.feed.sla.statusCheck.frequency.seconds=60 + * *.feed.sla.lookAheadWindow.millis=60000 + */ +@Test(groups = "distributed") +public class FeedSLAMonitoringTest extends BaseTestClass { + + private ColoHelper cluster = servers.get(0); + private FileSystem clusterFS = serverFS.get(0); + private String baseTestHDFSDir = cleanAndGetTestDir(); + private String feedInputPath = baseTestHDFSDir + "/input" + MINUTE_DATE_PATTERN; + private List slaFeedNames; + private List slaFeedFrequencies; + private String clusterName; + private static final Logger LOGGER = Logger.getLogger(FeedSLAMonitoringTest.class); + + private String startTime; + private String endTime; + private String slaStartTime; + private String slaEndTime; + private int noOfFeeds; + private int statusCheckFrequency; + + private static final Comparator DEPENDENCY_COMPARATOR = + new Comparator() { + @Override + public int compare(SchedulableEntityInstance o1, SchedulableEntityInstance o2) { + int tagDiff = o1.getTags().compareTo(o2.getTags()); + if (tagDiff != 0) { + return tagDiff; + } + int clusterDiff = o1.getCluster().compareTo(o2.getCluster()); + if (clusterDiff != 0) { + return clusterDiff; + } + int typeDiff = o1.getEntityType().compareTo(o2.getEntityType()); + if (typeDiff != 0) { + return typeDiff; + } + int nameDiff = o1.getEntityName().compareTo(o2.getEntityName()); + if (nameDiff != 0) { + return nameDiff; + } + int dateDiff = o1.getInstanceTime().compareTo(o2.getInstanceTime()); + if (dateDiff != 0) { + return dateDiff; + } + return 0; + } + }; + + /** + * Submitting 3 feeds with different frequencies and sla values. + * @throws Exception + */ + @BeforeMethod(alwaysRun = true) + public void setup() throws Exception { + + bundles[0] = BundleUtil.readELBundle(); + bundles[0] = new Bundle(bundles[0], cluster); + bundles[0].generateUniqueBundle(this); + bundles[0].setInputFeedDataPath(feedInputPath); + clusterName = bundles[0].getClusterNames().get(0); + ServiceResponse response = + prism.getClusterHelper().submitEntity(bundles[0].getClusters().get(0)); + AssertUtil.assertSucceeded(response); + + startTime = TimeUtil.getTimeWrtSystemTime(-10); + endTime = TimeUtil.addMinsToTime(startTime, 20); + noOfFeeds=3; + + LOGGER.info("Time range between : " + startTime + " and " + endTime); + final String oldFeedName = bundles[0].getInputFeedNameFromBundle(); + slaFeedFrequencies = Arrays.asList(new Frequency("1", Frequency.TimeUnit.minutes), + new Frequency("2", Frequency.TimeUnit.minutes), + new Frequency("4", Frequency.TimeUnit.minutes)); + + slaFeedNames = Arrays.asList(oldFeedName + "-1", oldFeedName + "-2", oldFeedName + "-3"); + + //Submit 3 feeds with different frequencies and sla values. + for (int bIndex = 0; bIndex < noOfFeeds; ++bIndex) { + final FeedMerlin ipFeed = new FeedMerlin(bundles[0].getInputFeedFromBundle()); + + ipFeed.setValidity(startTime, endTime); + ipFeed.setAvailabilityFlag("_SUCCESS"); + + //set slaLow and slaHigh + ipFeed.setSla(new Frequency("1", Frequency.TimeUnit.minutes), + new Frequency("2", Frequency.TimeUnit.minutes)); + ipFeed.setName(slaFeedNames.get(bIndex)); + ipFeed.setFrequency(slaFeedFrequencies.get(bIndex)); + + LOGGER.info("Feed is : " + ipFeed.toString()); + + AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(ipFeed.toString())); + } + } + + @AfterMethod(alwaysRun = true) + public void tearDown() throws IOException { + cleanTestsDirs(); + removeTestClassEntities(); + } + + /** + * The following test submits 3 feeds, checks the slaAlert for a given time range and validates its output. + * It also checks the sla status when feed is deleted , data created with/without _SUCCESS folder. + * @throws Exception + */ + @Test + public void feedSLATest() throws Exception { + /**TEST : Check sla response for a given time range + */ + + statusCheckFrequency=1*60; // 60 seconds + + // Map of instanceDate and corresponding list of SchedulableEntityInstance + Map> instanceEntityMap = new HashMap<>(); + + slaStartTime = startTime; + slaEndTime = TimeUtil.addMinsToTime(slaStartTime, 10); + DateTime slaStartDate = TimeUtil.oozieDateToDate(slaStartTime); + DateTime slaEndDate = TimeUtil.oozieDateToDate(slaEndTime); + + List expectedInstances = new ArrayList<>(); + SchedulableEntityInstance expectedSchedulableEntityInstance; + + for (int index = 0; index < noOfFeeds; ++index) { + + DateTime dt = new DateTime(slaStartDate); + while (!dt.isAfter(slaEndDate)) { + + expectedSchedulableEntityInstance = new SchedulableEntityInstance(slaFeedNames.get(index), + clusterName, dt.toDate(), EntityType.FEED); + expectedSchedulableEntityInstance.setTags("Missed SLA High"); + expectedInstances.add(expectedSchedulableEntityInstance); + + if (!instanceEntityMap.containsKey(dt.toString())) { + instanceEntityMap.put(dt.toString(), new ArrayList()); + } + instanceEntityMap.get(dt.toString()).add(expectedSchedulableEntityInstance); + dt = dt.plusMinutes(slaFeedFrequencies.get(index).getFrequencyAsInt()); + + } + } + + TimeUtil.sleepSeconds(statusCheckFrequency); + + SchedulableEntityInstanceResult response = prism.getFeedHelper().getSlaAlert( + "?start=" + slaStartTime + "&end=" + slaEndTime).getSlaResult(); + + LOGGER.info(response.getMessage()); + + validateInstances(response, expectedInstances); + + /**TEST : Create missing dependencies with _SUCCESS directory and check sla response + */ + + String dateEntry = (String) instanceEntityMap.keySet().toArray()[1]; + System.out.println(dateEntry + "/" + instanceEntityMap.get(dateEntry)); + List dataDates = InstanceUtil.getMinuteDatesToPath(dateEntry, dateEntry, 0); + + HadoopUtil.createFolders(clusterFS, baseTestHDFSDir + "/input/", dataDates); + + //sla response for feeds when _SUCCESS file is missing from dataPath + response = prism.getFeedHelper().getSlaAlert("?start=" + slaStartTime + "&end=" + slaEndTime).getSlaResult(); + + // Response does not change as it checks for _SUCCESS file + validateInstances(response, expectedInstances); + + //Create _SUCCESS file + HadoopUtil.recreateDir(clusterFS, baseTestHDFSDir + "/input/" + dataDates.get(0) + "/_SUCCESS"); + for (SchedulableEntityInstance instance : instanceEntityMap.get(dateEntry)) { + expectedInstances.remove(instance); + } + instanceEntityMap.remove(dateEntry); + + TimeUtil.sleepSeconds(statusCheckFrequency); + + //sla response for feeds when _SUCCESS file is available in dataPath + response = prism.getFeedHelper().getSlaAlert("?start=" + slaStartTime + "&end=" + slaEndTime).getSlaResult(); + validateInstances(response, expectedInstances); + + /** TEST : Delete feed and check sla response + */ + String deletedFeed = slaFeedNames.get(0); + prism.getFeedHelper().deleteByName(deletedFeed, null); + + for (Map.Entry> entry : instanceEntityMap.entrySet()) + { + System.out.println(entry.getKey() + "/" + entry.getValue()); + for (SchedulableEntityInstance instance : entry.getValue()) { + if (instance.getEntityName().equals(deletedFeed)) { + expectedInstances.remove(instance); + } + } + + } + TimeUtil.sleepSeconds(statusCheckFrequency); + response = prism.getFeedHelper().getSlaAlert("?start=" + slaStartTime + "&end=" + slaEndTime).getSlaResult(); + validateInstances(response, expectedInstances); + + } + + /** + * Validating expected response with actual response. + * @param response SchedulableEntityInstanceResult response + * @param expectedInstances List of expected instances + */ + private static void validateInstances(SchedulableEntityInstanceResult response, + List expectedInstances) { + + List actualInstances = Arrays.asList(response.getInstances()); + + for (SchedulableEntityInstance instance : actualInstances) { + instance.setTags("Missed SLA High"); + } + + Collections.sort(expectedInstances, DEPENDENCY_COMPARATOR); + Collections.sort(actualInstances, DEPENDENCY_COMPARATOR); + + Assert.assertEquals(actualInstances, expectedInstances, "Instances mismatch for"); + } +} diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/nativeScheduler/NativeScheduleTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/nativeScheduler/NativeScheduleTest.java index fe61cdf28..54e7805ff 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/nativeScheduler/NativeScheduleTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/nativeScheduler/NativeScheduleTest.java @@ -21,10 +21,13 @@ import org.apache.falcon.regression.core.bundle.Bundle; import org.apache.falcon.regression.core.helpers.ColoHelper; import org.apache.falcon.regression.core.response.ServiceResponse; -import org.apache.falcon.regression.core.util.*; +import org.apache.falcon.regression.core.util.AssertUtil; +import org.apache.falcon.regression.core.util.BundleUtil; +import org.apache.falcon.regression.core.util.OSUtil; +import org.apache.falcon.regression.core.util.TimeUtil; +import org.apache.falcon.regression.core.util.Util; import org.apache.falcon.regression.testHelper.BaseTestClass; import org.apache.log4j.Logger; -import org.apache.oozie.client.OozieClient; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; From c6c6f650d32fc8d40f39795e402093507e7462ae Mon Sep 17 00:00:00 2001 From: Pragya Date: Wed, 17 Feb 2016 10:08:52 +0000 Subject: [PATCH 3/3] Review comments addressed --- .../regression/SLA/FeedSLAMonitoringTest.java | 30 ++++--------------- 1 file changed, 5 insertions(+), 25 deletions(-) diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/SLA/FeedSLAMonitoringTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/SLA/FeedSLAMonitoringTest.java index fd26c71eb..fa1f8085f 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/SLA/FeedSLAMonitoringTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/SLA/FeedSLAMonitoringTest.java @@ -56,7 +56,7 @@ * *.feed.sla.statusCheck.frequency.seconds=60 * *.feed.sla.lookAheadWindow.millis=60000 */ -@Test(groups = "distributed") +@Test(groups = { "distributed", "embedded" }) public class FeedSLAMonitoringTest extends BaseTestClass { private ColoHelper cluster = servers.get(0); @@ -79,27 +79,7 @@ public class FeedSLAMonitoringTest extends BaseTestClass { new Comparator() { @Override public int compare(SchedulableEntityInstance o1, SchedulableEntityInstance o2) { - int tagDiff = o1.getTags().compareTo(o2.getTags()); - if (tagDiff != 0) { - return tagDiff; - } - int clusterDiff = o1.getCluster().compareTo(o2.getCluster()); - if (clusterDiff != 0) { - return clusterDiff; - } - int typeDiff = o1.getEntityType().compareTo(o2.getEntityType()); - if (typeDiff != 0) { - return typeDiff; - } - int nameDiff = o1.getEntityName().compareTo(o2.getEntityName()); - if (nameDiff != 0) { - return nameDiff; - } - int dateDiff = o1.getInstanceTime().compareTo(o2.getInstanceTime()); - if (dateDiff != 0) { - return dateDiff; - } - return 0; + return o1.compareTo(o2); } }; @@ -166,7 +146,7 @@ public void feedSLATest() throws Exception { /**TEST : Check sla response for a given time range */ - statusCheckFrequency=1*60; // 60 seconds + statusCheckFrequency=60; // 60 seconds // Map of instanceDate and corresponding list of SchedulableEntityInstance Map> instanceEntityMap = new HashMap<>(); @@ -211,7 +191,7 @@ public void feedSLATest() throws Exception { */ String dateEntry = (String) instanceEntityMap.keySet().toArray()[1]; - System.out.println(dateEntry + "/" + instanceEntityMap.get(dateEntry)); + LOGGER.info(dateEntry + "/" + instanceEntityMap.get(dateEntry)); List dataDates = InstanceUtil.getMinuteDatesToPath(dateEntry, dateEntry, 0); HadoopUtil.createFolders(clusterFS, baseTestHDFSDir + "/input/", dataDates); @@ -242,7 +222,7 @@ public void feedSLATest() throws Exception { for (Map.Entry> entry : instanceEntityMap.entrySet()) { - System.out.println(entry.getKey() + "/" + entry.getValue()); + LOGGER.info(entry.getKey() + "/" + entry.getValue()); for (SchedulableEntityInstance instance : entry.getValue()) { if (instance.getEntityName().equals(deletedFeed)) { expectedInstances.remove(instance);