diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java index a8aea527d..a7e3211ec 100644 --- a/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java +++ b/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java @@ -306,10 +306,6 @@ public void entityCommand(CommandLine commandLine, FalconClient client) throws I validateNotEmpty(filePath, FILE_PATH_OPT); validateColo(optionsList); result = client.validate(entityType, filePath, skipDryRun, doAsUser).getMessage(); - } else if (optionsList.contains(FalconCLIConstants.SUBMIT_AND_SCHEDULE_OPT)) { - validateNotEmpty(filePath, "file"); - validateColo(optionsList); - result = client.submitAndSchedule(entityType, filePath, skipDryRun, doAsUser, userProps).getMessage(); } else if (optionsList.contains(FalconCLIConstants.SCHEDULE_OPT)) { validateNotEmpty(entityName, FalconCLIConstants.ENTITY_NAME_OPT); colo = getColo(colo); diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconInstanceCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconInstanceCLI.java index cd2ade072..d27983f14 100644 --- a/cli/src/main/java/org/apache/falcon/cli/FalconInstanceCLI.java +++ b/cli/src/main/java/org/apache/falcon/cli/FalconInstanceCLI.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.Set; +import static org.apache.falcon.client.FalconCLIConstants.LIB_OPT; import static org.apache.falcon.client.FalconCLIConstants.RUNNING_OPT; import static org.apache.falcon.client.FalconCLIConstants.RUNNING_OPT_DESCRIPTION; import static org.apache.falcon.client.FalconCLIConstants.LIST_OPT; @@ -53,7 +54,7 @@ import static org.apache.falcon.client.FalconCLIConstants.LOG_OPT; import static org.apache.falcon.client.FalconCLIConstants.LOG_OPT_DESCRIPTION; import static org.apache.falcon.client.FalconCLIConstants.PARARMS_OPT; -import static org.apache.falcon.client.FalconCLIConstants.PARARMS_OPT_DESCRIPTION; +import static org.apache.falcon.client.FalconCLIConstants.PARAMS_OPT_DESCRIPTION; import static org.apache.falcon.client.FalconCLIConstants.LISTING_OPT; import static org.apache.falcon.client.FalconCLIConstants.LISTING_OPT_DESCRIPTION; import static org.apache.falcon.client.FalconCLIConstants.DEPENDENCY_OPT_DESCRIPTION; @@ -69,8 +70,8 @@ import static org.apache.falcon.client.FalconCLIConstants.RUNID_OPT; import static org.apache.falcon.client.FalconCLIConstants.CLUSTERS_OPT; import static org.apache.falcon.client.FalconCLIConstants.CLUSTERS_OPT_DESCRIPTION; -import static org.apache.falcon.client.FalconCLIConstants.SOURCECLUSTER_OPT; -import static org.apache.falcon.client.FalconCLIConstants.SOURCECLUSTER_OPT_DESCRIPTION; +import static org.apache.falcon.client.FalconCLIConstants.SOURCE_CLUSTER_OPT; +import static org.apache.falcon.client.FalconCLIConstants.SOURCE_CLUSTER_OPT_DESCRIPTION; import static org.apache.falcon.client.FalconCLIConstants.FILE_PATH_OPT; import static org.apache.falcon.client.FalconCLIConstants.FILE_PATH_OPT_DESCRIPTION; import static org.apache.falcon.client.FalconCLIConstants.TYPE_OPT; @@ -130,7 +131,7 @@ public Options createInstanceOptions() { Option resume = new Option(RESUME_OPT, false, RESUME_OPT_DESCRIPTION); Option rerun = new Option(RERUN_OPT, false, RERUN_OPT_DESCRIPTION); Option logs = new Option(LOG_OPT, false, LOG_OPT_DESCRIPTION); - Option params = new Option(PARARMS_OPT, false, PARARMS_OPT_DESCRIPTION); + Option params = new Option(PARARMS_OPT, false, PARAMS_OPT_DESCRIPTION); Option listing = new Option(LISTING_OPT, false, LISTING_OPT_DESCRIPTION); Option dependency = new Option(DEPENDENCY_OPT, false, DEPENDENCY_OPT_DESCRIPTION); Option triage = new Option(TRIAGE_OPT, false, TRIAGE_OPT_DESCRIPTION); @@ -156,9 +157,9 @@ public Options createInstanceOptions() { Option url = new Option(URL_OPTION, true, URL_OPTION_DESCRIPTION); Option start = new Option(START_OPT, true, START_OPT_DESCRIPTION); Option end = new Option(END_OPT, true, END_OPT_DESCRIPTION); - Option runid = new Option(RUNID_OPT, true, RUNID_OPT_DESCRIPTION); + Option runId = new Option(RUNID_OPT, true, RUNID_OPT_DESCRIPTION); Option clusters = new Option(CLUSTERS_OPT, true, CLUSTERS_OPT_DESCRIPTION); - Option sourceClusters = new Option(SOURCECLUSTER_OPT, true, SOURCECLUSTER_OPT_DESCRIPTION); + Option sourceClusters = new Option(SOURCE_CLUSTER_OPT, true, SOURCE_CLUSTER_OPT_DESCRIPTION); Option filePath = new Option(FILE_PATH_OPT, true, FILE_PATH_OPT_DESCRIPTION); Option entityType = new Option(TYPE_OPT, true, TYPE_OPT_DESCRIPTION); Option entityName = new Option(ENTITY_NAME_OPT, true, ENTITY_NAME_OPT_DESCRIPTION); @@ -177,7 +178,7 @@ public Options createInstanceOptions() { Option allAttempts = new Option(ALL_ATTEMPTS, false, ALL_ATTEMPTS_DESCRIPTION); Option instanceStatus = new Option(FalconCLIConstants.INSTANCE_STATUS_OPT, true, "Instance status"); - Option nameSubsequence = new Option(FalconCLIConstants.NAMESEQ_OPT, true, "Subsequence of entity name"); + Option nameSubSequence = new Option(FalconCLIConstants.NAMESEQ_OPT, true, "SubSequence of entity name"); Option tagKeywords = new Option(FalconCLIConstants.TAGKEYS_OPT, true, "Keywords in tags"); instanceOptions.addOption(url); @@ -187,7 +188,7 @@ public Options createInstanceOptions() { instanceOptions.addOption(filePath); instanceOptions.addOption(entityType); instanceOptions.addOption(entityName); - instanceOptions.addOption(runid); + instanceOptions.addOption(runId); instanceOptions.addOption(clusters); instanceOptions.addOption(sourceClusters); instanceOptions.addOption(colo); @@ -202,7 +203,7 @@ public Options createInstanceOptions() { instanceOptions.addOption(debug); instanceOptions.addOption(instanceTime); instanceOptions.addOption(instanceStatus); - instanceOptions.addOption(nameSubsequence); + instanceOptions.addOption(nameSubSequence); instanceOptions.addOption(tagKeywords); instanceOptions.addOption(allAttempts); @@ -221,6 +222,7 @@ public void instanceCommand(CommandLine commandLine, FalconClient client) throws String instanceTime = commandLine.getOptionValue(INSTANCE_TIME_OPT); String start = commandLine.getOptionValue(FalconCLIConstants.START_OPT); String end = commandLine.getOptionValue(FalconCLIConstants.END_OPT); + String lib = commandLine.getOptionValue(FalconCLIConstants.LIB_OPT); String status = commandLine.getOptionValue(FalconCLIConstants.INSTANCE_STATUS_OPT); String nameSubsequence = commandLine.getOptionValue(FalconCLIConstants.NAMESEQ_OPT); String tagKeywords = commandLine.getOptionValue(FalconCLIConstants.TAGKEYS_OPT); @@ -228,7 +230,7 @@ public void instanceCommand(CommandLine commandLine, FalconClient client) throws String runId = commandLine.getOptionValue(RUNID_OPT); String colo = commandLine.getOptionValue(FalconCLIConstants.COLO_OPT); String clusters = commandLine.getOptionValue(CLUSTERS_OPT); - String sourceClusters = commandLine.getOptionValue(SOURCECLUSTER_OPT); + String sourceClusters = commandLine.getOptionValue(SOURCE_CLUSTER_OPT); List lifeCycles = getLifeCycle(commandLine.getOptionValue(LIFECYCLE_OPT)); String filterBy = commandLine.getOptionValue(FalconCLIConstants.FILTER_BY_OPT); String orderBy = commandLine.getOptionValue(FalconCLIConstants.ORDER_BY_OPT); @@ -244,6 +246,9 @@ public void instanceCommand(CommandLine commandLine, FalconClient client) throws if (!optionsList.contains(SEARCH_OPT)) { validateInstanceCommands(optionsList, entity, type, colo); } + if (optionsList.contains(LIB_OPT)) { + validateNotEmpty(lib, LIB_OPT); + } if (optionsList.contains(TRIAGE_OPT)) { validateNotEmpty(colo, FalconCLIConstants.COLO_OPT); @@ -300,7 +305,7 @@ public void instanceCommand(CommandLine commandLine, FalconClient client) throws isForced = true; } result = ResponseHelper.getString(client.rerunInstances(type, entity, start, end, filePath, colo, - clusters, sourceClusters, lifeCycles, isForced, doAsUser)); + clusters, sourceClusters, lifeCycles, isForced, doAsUser, lib)); } else if (optionsList.contains(LOG_OPT)) { ValidationUtil.validateOrderBy(orderBy, instanceAction); ValidationUtil.validateFilterBy(filterBy, instanceAction); @@ -341,7 +346,7 @@ private void validateInstanceCommands(Set optionsList, } } - if (optionsList.contains(SOURCECLUSTER_OPT)) { + if (optionsList.contains(SOURCE_CLUSTER_OPT)) { if (optionsList.contains(RUNNING_OPT) || optionsList.contains(LOG_OPT) || optionsList.contains(FalconCLIConstants.STATUS_OPT) diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java index 5d6eff5f2..c0392a60c 100644 --- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java @@ -323,13 +323,14 @@ public abstract InstancesResult resumeInstances(String type, String entity, Stri * process. * @param isForced can be used to forcefully rerun the entire instance. * @param doAsUser proxy user + * @param lib can be used to rerun the instance with comma separated lib paths. * @return Results of the rerun command. * @throws IOException */ public abstract InstancesResult rerunInstances(String type, String entity, String start, String end, String filePath, String colo, String clusters, String sourceClusters, List lifeCycles, Boolean isForced, - String doAsUser) throws IOException; + String doAsUser, String lib) throws IOException; /** * Get summary of instance/instances of an entity. diff --git a/client/src/main/java/org/apache/falcon/client/FalconCLIConstants.java b/client/src/main/java/org/apache/falcon/client/FalconCLIConstants.java index 04f15992c..23dd31f5c 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconCLIConstants.java +++ b/client/src/main/java/org/apache/falcon/client/FalconCLIConstants.java @@ -126,9 +126,11 @@ private FalconCLIConstants(){ + " process in the range start time and optional end time"; public static final String RERUN_OPT_DESCRIPTION = "Reruns process instances for a given process in the" + " range start time and optional end time and overrides properties present in job.properties file"; + public static final String LIB_OPT_DESCRIPTION = "List of comma separated lib paths to be used for the rerun " + + "of the given instances"; public static final String LOG_OPT_DESCRIPTION = "Logs print the logs for process instances for a given" + " process in the range start time and optional end time"; - public static final String PARARMS_OPT_DESCRIPTION = "Displays the workflow parameters for a given instance" + public static final String PARAMS_OPT_DESCRIPTION = "Displays the workflow parameters for a given instance" + " of specified nominal time start time represents nominal time and end time is not considered"; public static final String LISTING_OPT_DESCRIPTION = "Displays feed listing and their status between a" + " start and end time range."; @@ -145,7 +147,7 @@ private FalconCLIConstants(){ + "runid, defaults to 0"; public static final String CLUSTERS_OPT_DESCRIPTION = "clusters is optional for commands kill, suspend and " + "resume, should not be specified for other commands"; - public static final String SOURCECLUSTER_OPT_DESCRIPTION = " source cluster is optional for commands kill, " + public static final String SOURCE_CLUSTER_OPT_DESCRIPTION = " source cluster is optional for commands kill, " + "suspend and resume, should not be specified for other commands (required for only feed)"; public static final String FILE_PATH_OPT_DESCRIPTION = "Path to job.properties file is required for rerun " + "command, it should contain name=value pair for properties to override for rerun"; @@ -168,9 +170,10 @@ private FalconCLIConstants(){ public static final String RUNNING_OPT = "running"; public static final String KILL_OPT = "kill"; public static final String RERUN_OPT = "rerun"; + public static final String LIB_OPT = "lib"; public static final String LOG_OPT = "logs"; public static final String CLUSTERS_OPT = "clusters"; - public static final String SOURCECLUSTER_OPT = "sourceClusters"; + public static final String SOURCE_CLUSTER_OPT = "sourceClusters"; public static final String LIFECYCLE_OPT = "lifecycle"; public static final String PARARMS_OPT = "params"; public static final String LISTING_OPT = "listing"; diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index 8f77faddb..cac7bff56 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -86,6 +86,7 @@ public class FalconClient extends AbstractFalconClient { public static final String PATH = "path"; + public static final String LIB = "lib"; public static final String COLO = "colo"; private static final String KEY = "key"; private static final String VALUE = "value"; @@ -632,7 +633,7 @@ public InstancesResult resumeInstances(String type, String entity, String start, public InstancesResult rerunInstances(String type, String entity, String start, String end, String filePath, String colo, String clusters, String sourceClusters, List lifeCycles, - Boolean isForced, String doAsUser) throws IOException { + Boolean isForced, String doAsUser, String lib) throws IOException { StringBuilder buffer = new StringBuilder(); if (filePath != null) { @@ -653,7 +654,7 @@ public InstancesResult rerunInstances(String type, String entity, String start, ClientResponse clientResponse = new ResourceBuilder().path(Instances.RERUN.path, type, entity) .addQueryParam(START, start).addQueryParam(END, end).addQueryParam(COLO, colo) .addQueryParam(LIFECYCLE, lifeCycles, type).addQueryParam(FORCE, isForced) - .addQueryParam(USER, doAsUser).call(Instances.RERUN, props); + .addQueryParam(USER, doAsUser).addQueryParam(LIB, lib).call(Instances.RERUN, props); return getResponse(InstancesResult.class, clientResponse); } diff --git a/docs/src/site/twiki/falconcli/RerunInstance.twiki b/docs/src/site/twiki/falconcli/RerunInstance.twiki index aac844ca3..4f8ad0dc2 100644 --- a/docs/src/site/twiki/falconcli/RerunInstance.twiki +++ b/docs/src/site/twiki/falconcli/RerunInstance.twiki @@ -4,6 +4,7 @@ Rerun option is used to rerun instances of a given process. On issuing a rerun, by default the execution resumes from the last failed node in the workflow. This option is valid only for process instances in terminal state, i.e. SUCCEEDED, KILLED or FAILED. If one wants to forcefully rerun the entire workflow, -force should be passed along with -rerun +If one wants to pass specific lib paths to be used for rerunning specified instances, -lib should be passed along with -rerun passing comma separated lib paths with -lib option. Additionally, you can also specify properties to override via a properties file. Usage: diff --git a/docs/src/site/twiki/restapi/InstanceRerun.twiki b/docs/src/site/twiki/restapi/InstanceRerun.twiki index eef0e1a40..c10434d24 100644 --- a/docs/src/site/twiki/restapi/InstanceRerun.twiki +++ b/docs/src/site/twiki/restapi/InstanceRerun.twiki @@ -15,6 +15,7 @@ Rerun instances of an entity. On issuing a rerun, by default the execution resum * lifecycle can be Eviction/Replication(default) for feed and Execution(default) for process. * force can be used to forcefully rerun the entire instance. * doAs allows the current user to impersonate the user passed in doAs when interacting with the Falcon system. + * lib allows the user to rerun the instance with specified comma separated lib paths. ---++ Results Results of the rerun command. @@ -44,7 +45,7 @@ POST http://localhost:15000/api/instance/rerun/process/SampleProcess?colo=*&star -POST http://localhost:15000/api/instance/rerun/process/SampleProcess?colo=*&start=2013-04-03T07:00Z&end=2014-04-03T07:00Z&force=true&doAs=joe +POST http://localhost:15000/api/instance/rerun/process/SampleProcess?colo=*&start=2013-04-03T07:00Z&end=2014-04-03T07:00Z&force=true&doAs=joe&lib=lib1,lib2 ---+++ Result diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java index f86f09775..18828f4c8 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java @@ -52,6 +52,7 @@ import org.apache.falcon.util.DeploymentUtil; import org.apache.falcon.util.StartupProperties; import org.apache.falcon.workflow.engine.AbstractWorkflowEngine; +import org.apache.oozie.client.OozieClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -908,15 +909,18 @@ private InstancesResult.WorkflowStatus getProcessInstanceStatus(Process process, public InstancesResult reRunInstance(String type, String entity, String startStr, String endStr, HttpServletRequest request, String colo, List lifeCycles, - Boolean isForced) { + Boolean isForced, String lib) { Properties props = getProperties(request); - return reRunInstance(type, entity, startStr, endStr, props, colo, lifeCycles, isForced); + return reRunInstance(type, entity, startStr, endStr, props, colo, lifeCycles, isForced, lib); } public InstancesResult reRunInstance(String type, String entity, String startStr, String endStr, Properties props, - String colo, List lifeCycles, Boolean isForced) { + String colo, List lifeCycles, Boolean isForced, String lib) { checkColo(colo); checkType(type); + if (StringUtils.isNotBlank(lib)) { + props.put(OozieClient.LIBPATH, lib); + } if (StartupProperties.isServerInSafeMode()) { throwSafemodeException("RERUN"); } diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java index 102392393..0445699e8 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java @@ -520,6 +520,7 @@ protected InstancesResult doExecute(String colo) throws FalconException { * @param lifeCycles can be Eviction/Replication(default) for feed and Execution(default) for * process. * @param isForced can be used to forcefully rerun the entire instance. + * @param lib can be used to pass specific lib paths to rerun the instance. * @return Results of the rerun command. */ @POST @@ -535,14 +536,15 @@ public InstancesResult reRunInstance( @Context HttpServletRequest request, @Dimension("colo") @QueryParam("colo") String colo, @Dimension("lifecycle") @QueryParam("lifecycle") final List lifeCycles, - @Dimension("force") @QueryParam("force") final Boolean isForced) { + @Dimension("force") @QueryParam("force") final Boolean isForced, + @Dimension("lib") @QueryParam("lib") final String lib) { final HttpServletRequest bufferedRequest = new BufferedRequest(request); return new InstanceProxy(InstancesResult.class) { @Override protected InstancesResult doExecute(String colo) throws FalconException { return getInstanceManager(colo).invoke("reRunInstance", - type, entity, startStr, endStr, bufferedRequest, colo, lifeCycles, isForced); + type, entity, startStr, endStr, bufferedRequest, colo, lifeCycles, isForced, lib); } }.execute(colo, type, entity); } diff --git a/shell/src/main/java/org/apache/falcon/shell/commands/FalconInstanceCommands.java b/shell/src/main/java/org/apache/falcon/shell/commands/FalconInstanceCommands.java index f20e20119..494eb6ae7 100644 --- a/shell/src/main/java/org/apache/falcon/shell/commands/FalconInstanceCommands.java +++ b/shell/src/main/java/org/apache/falcon/shell/commands/FalconInstanceCommands.java @@ -41,6 +41,8 @@ import static org.apache.falcon.client.FalconCLIConstants.FILE_PATH_OPT_DESCRIPTION; import static org.apache.falcon.client.FalconCLIConstants.FILTER_BY_OPT; import static org.apache.falcon.client.FalconCLIConstants.FILTER_BY_OPT_DESCRIPTION; +import static org.apache.falcon.client.FalconCLIConstants.LIB_OPT; +import static org.apache.falcon.client.FalconCLIConstants.LIB_OPT_DESCRIPTION; import static org.apache.falcon.client.FalconCLIConstants.LIST_OPT; import static org.apache.falcon.client.FalconCLIConstants.NUM_RESULTS_OPT; import static org.apache.falcon.client.FalconCLIConstants.NUM_RESULTS_OPT_DESCRIPTION; @@ -81,15 +83,15 @@ import static org.apache.falcon.client.FalconCLIConstants.LOG_OPT_DESCRIPTION; import static org.apache.falcon.client.FalconCLIConstants.PARARMS_OPT; -import static org.apache.falcon.client.FalconCLIConstants.PARARMS_OPT_DESCRIPTION; +import static org.apache.falcon.client.FalconCLIConstants.PARAMS_OPT_DESCRIPTION; import static org.apache.falcon.client.FalconCLIConstants.RERUN_OPT; import static org.apache.falcon.client.FalconCLIConstants.RERUN_OPT_DESCRIPTION; import static org.apache.falcon.client.FalconCLIConstants.RUNID_OPT; import static org.apache.falcon.client.FalconCLIConstants.RUNID_OPT_DESCRIPTION; import static org.apache.falcon.client.FalconCLIConstants.RUNNING_OPT; import static org.apache.falcon.client.FalconCLIConstants.RUNNING_OPT_DESCRIPTION; -import static org.apache.falcon.client.FalconCLIConstants.SOURCECLUSTER_OPT; -import static org.apache.falcon.client.FalconCLIConstants.SOURCECLUSTER_OPT_DESCRIPTION; +import static org.apache.falcon.client.FalconCLIConstants.SOURCE_CLUSTER_OPT; +import static org.apache.falcon.client.FalconCLIConstants.SOURCE_CLUSTER_OPT_DESCRIPTION; import static org.apache.falcon.client.FalconCLIConstants.TRIAGE_OPT; import static org.apache.falcon.client.FalconCLIConstants.TRIAGE_OPT_DESCRIPTION; import static org.apache.falcon.ValidationUtil.getLifeCycle; @@ -220,7 +222,7 @@ public String kill( @CliOption(key = {START_OPT}, mandatory = true, help = START_OPT_DESCRIPTION) final String start, @CliOption(key = {END_OPT}, mandatory = true, help = END_OPT_DESCRIPTION) final String end, @CliOption(key = {CLUSTERS_OPT}, mandatory = false, help = CLUSTERS_OPT_DESCRIPTION) final String clusters, - @CliOption(key = {SOURCECLUSTER_OPT}, mandatory = false, help = SOURCECLUSTER_OPT_DESCRIPTION) + @CliOption(key = {SOURCE_CLUSTER_OPT}, mandatory = false, help = SOURCE_CLUSTER_OPT_DESCRIPTION) final String sourceClusters, @CliOption(key = {LIFECYCLE_OPT}, mandatory = false, help = LIFECYCLE_OPT_DESCRIPTION) final String lifeCycle @@ -240,7 +242,7 @@ public String suspend( @CliOption(key = {START_OPT}, mandatory = true, help = START_OPT_DESCRIPTION) final String start, @CliOption(key = {END_OPT}, mandatory = true, help = END_OPT_DESCRIPTION) final String end, @CliOption(key = {CLUSTERS_OPT}, mandatory = false, help = CLUSTERS_OPT_DESCRIPTION) final String clusters, - @CliOption(key = {SOURCECLUSTER_OPT}, mandatory = false, help = SOURCECLUSTER_OPT_DESCRIPTION) + @CliOption(key = {SOURCE_CLUSTER_OPT}, mandatory = false, help = SOURCE_CLUSTER_OPT_DESCRIPTION) final String sourceClusters, @CliOption(key = {LIFECYCLE_OPT}, mandatory = false, help = LIFECYCLE_OPT_DESCRIPTION) final String lifeCycle @@ -260,7 +262,7 @@ public String resume( @CliOption(key = {START_OPT}, mandatory = true, help = START_OPT_DESCRIPTION) final String start, @CliOption(key = {END_OPT}, mandatory = true, help = END_OPT_DESCRIPTION) final String end, @CliOption(key = {CLUSTERS_OPT}, mandatory = false, help = CLUSTERS_OPT_DESCRIPTION) final String clusters, - @CliOption(key = {SOURCECLUSTER_OPT}, mandatory = false, help = SOURCECLUSTER_OPT_DESCRIPTION) + @CliOption(key = {SOURCE_CLUSTER_OPT}, mandatory = false, help = SOURCE_CLUSTER_OPT_DESCRIPTION) final String sourceClusters, @CliOption(key = {LIFECYCLE_OPT}, mandatory = false, help = LIFECYCLE_OPT_DESCRIPTION) final String lifeCycle @@ -282,16 +284,18 @@ public String rerun( @CliOption(key = {FILE_PATH_OPT}, mandatory = false, help = FILE_PATH_OPT_DESCRIPTION) final String filePath, @CliOption(key = {CLUSTERS_OPT}, mandatory = false, help = CLUSTERS_OPT_DESCRIPTION) final String clusters, - @CliOption(key = {SOURCECLUSTER_OPT}, mandatory = false, help = SOURCECLUSTER_OPT_DESCRIPTION) + @CliOption(key = {SOURCE_CLUSTER_OPT}, mandatory = false, help = SOURCE_CLUSTER_OPT_DESCRIPTION) final String sourceClusters, @CliOption(key = {LIFECYCLE_OPT}, mandatory = false, help = LIFECYCLE_OPT_DESCRIPTION) final String lifeCycle, @CliOption(key = {FORCE_RERUN_FLAG}, mandatory = false, specifiedDefaultValue = "true", - help = FORCE_RERUN_FLAG_DESCRIPTION) final Boolean forceRerun + help = FORCE_RERUN_FLAG_DESCRIPTION) final Boolean forceRerun, + @CliOption(key = {LIB_OPT}, mandatory = false, help = LIB_OPT_DESCRIPTION) + final String lib ) throws IOException { return ResponseHelper.getString(getFalconClient().rerunInstances(entityType.name(), entityName, start, end, filePath, getColo(colo), clusters, sourceClusters, getLifeCycle(lifeCycle), - forceRerun, getDoAs())); + forceRerun, getDoAs(), lib)); } @CliCommand(value = {INSTANCE_COMMAND_PREFIX + LOG_OPT}, @@ -326,7 +330,7 @@ entityName, start, end, getColo(colo), runId, getLifeCycle(lifeCycle), filterBy, } // RESUME CHECKSTYLE CHECK ParameterNumberCheck @CliCommand(value = {INSTANCE_COMMAND_PREFIX + PARARMS_OPT}, - help = PARARMS_OPT_DESCRIPTION) + help = PARAMS_OPT_DESCRIPTION) public String params( @CliOption(key = {TYPE_OPT}, mandatory = true, help = TYPE_OPT_DESCRIPTION) final EntityType entityType, @CliOption(key = {ENTITY_NAME_OPT}, mandatory = true, help = ENTITY_NAME_OPT_DESCRIPTION) diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java index 53073f0f9..88bc653bb 100644 --- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java +++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java @@ -311,10 +311,10 @@ public InstancesResult resumeInstances(String type, String entity, String start, public InstancesResult rerunInstances(String type, String entity, String start, String end, String filePath, String colo, String clusters, String sourceClusters, - List lifeCycles, Boolean isForced, String doAsUser) throws - IOException { + List lifeCycles, Boolean isForced, String doAsUser, + String lib) throws IOException { Properties props = getProperties(clusters, sourceClusters); - return localInstanceManager.reRunInstance(type, entity, start, end, props, colo, lifeCycles, isForced); + return localInstanceManager.reRunInstance(type, entity, start, end, props, colo, lifeCycles, isForced, lib); } public InstancesSummaryResult getSummaryOfInstances(String type, String entity, String start, String end, diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalInstanceManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalInstanceManager.java index bd42270aa..2ac755722 100644 --- a/unit/src/main/java/org/apache/falcon/unit/LocalInstanceManager.java +++ b/unit/src/main/java/org/apache/falcon/unit/LocalInstanceManager.java @@ -52,8 +52,8 @@ public InstancesResult resumeInstance(Properties properties, String type, String public InstancesResult reRunInstance(String type, String entity, String startStr, String endStr, Properties properties, String colo, List lifeCycles, - Boolean isForced) { - return super.reRunInstance(type, entity, startStr, endStr, properties, colo, lifeCycles, isForced); + Boolean isForced, String lib) { + return super.reRunInstance(type, entity, startStr, endStr, properties, colo, lifeCycles, isForced, lib); } public InstancesResult getStatusOfInstances(String type, String entity, String start, String end, diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java index 0bc775501..c82e513eb 100644 --- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java +++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java @@ -31,6 +31,7 @@ import org.apache.falcon.resource.InstancesSummaryResult; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.oozie.client.OozieClient; import org.testng.Assert; import org.testng.annotations.Test; @@ -278,10 +279,19 @@ public void testProcessInstanceManagementAPI1() throws Exception { Assert.assertEquals(currentStatus, InstancesResult.WorkflowStatus.KILLED); getClient().rerunInstances(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, END_TIME, null, null, - CLUSTER_NAME, null, null, true, null); + CLUSTER_NAME, null, null, true, null, "testRerunLib"); waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, InstancesResult.WorkflowStatus.RUNNING); - currentStatus = getClient().getInstanceStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME); - Assert.assertEquals(currentStatus, InstancesResult.WorkflowStatus.RUNNING); + InstancesResult params = getClient().getParamsOfInstance(EntityType.PROCESS.name(), PROCESS_NAME, + SCHEDULE_TIME, null, null, null); + Assert.assertEquals(params.getInstances()[0].getStatus(), InstancesResult.WorkflowStatus.RUNNING); + String actualLib = null; + for (InstancesResult.KeyValuePair property : params.getInstances()[0].getWfParams()) { + if (property.getKey().equals(OozieClient.LIBPATH)) { + actualLib = property.getValue(); + break; + } + } + Assert.assertEquals(actualLib, "testRerunLib"); } @Test diff --git a/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java b/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java index 5b47dc940..847d4be7c 100644 --- a/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java +++ b/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java @@ -299,9 +299,10 @@ public InstancesResult reRunInstance( @Context HttpServletRequest request, @Dimension("colo") @QueryParam("colo") String colo, @Dimension("lifecycle") @QueryParam("lifecycle") List lifeCycles, - @Dimension("force") @QueryParam("force") Boolean isForced) { + @Dimension("force") @QueryParam("force") Boolean isForced, + @Dimension("lib") @QueryParam("lib") String lib) { try { - return super.reRunInstance(type, entity, startStr, endStr, request, colo, lifeCycles, isForced); + return super.reRunInstance(type, entity, startStr, endStr, request, colo, lifeCycles, isForced, lib); } catch (Throwable throwable) { throw FalconWebException.newAPIException(throwable); } diff --git a/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java index 00dbf7a96..37056c0ce 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java +++ b/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java @@ -87,7 +87,7 @@ public void testKillAndRerunInstances() throws Exception { Assert.assertEquals(status, InstancesResult.WorkflowStatus.KILLED); result = falconUnitClient.rerunInstances(EntityType.PROCESS.toString(), - processName, START_INSTANCE, END_TIME, colo, null, null, null, null, true, null); + processName, START_INSTANCE, END_TIME, colo, null, null, null, null, true, null, null); assertStatus(result); waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, diff --git a/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java b/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java index 37f8b9929..7b0cd9e94 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java +++ b/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java @@ -191,7 +191,7 @@ public void testReRunInstances() throws Exception { assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.KILLED); context.getClient().rerunInstances(EntityType.PROCESS.name(), context.processName, - START_INSTANCE, endTime, null, context.colo, null, null, null, true, null); + START_INSTANCE, endTime, null, context.colo, null, null, null, true, null, null); waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, WorkflowStatus.RUNNING); response = context.getClient().getStatusOfInstances(EntityType.PROCESS.name(),