From be4eb3de4010cc13cbde1e8616a1336a5c8ed447 Mon Sep 17 00:00:00 2001 From: TejasBhitle Date: Sun, 17 Dec 2023 11:55:56 -0500 Subject: [PATCH 1/6] Merge conflicts resolved --- .../controller/TransferController.java | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/controller/TransferController.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/controller/TransferController.java index fcb2bce3..991c0441 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/controller/TransferController.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/controller/TransferController.java @@ -12,6 +12,7 @@ import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; +import org.springframework.batch.core.explore.JobExplorer; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; @@ -48,6 +49,15 @@ public class TransferController { @Autowired VfsExpander vfsExpander; + @Autowired + JobExplorer jobExplorer; + + Set jobIds; + public TransferController(Set jobIds){ + this.jobIds = jobIds; + } + + @RequestMapping(value = "/start", method = RequestMethod.POST) @Async public ResponseEntity start(@RequestBody TransferJobRequest request) throws Exception { @@ -62,5 +72,27 @@ public ResponseEntity start(@RequestBody TransferJobRequest request) thr JobExecution jobExecution = asyncJobLauncher.run(job, parameters); return ResponseEntity.status(HttpStatus.OK).body("Your batch job has been submitted with \n ID: " + jobExecution.getJobId()); } + + @RequestMapping(value = "/pause", method = RequestMethod.POST) + @Async + public ResponseEntity pause() throws Exception{ + logger.info("Pause Controller Entry point"); + Long runningJobId = 0L; + for(Long jobId : jobIds){ + JobExecution jobExecution = jobExplorer.getJobExecution(jobId); + if(jobExecution != null && jobExecution.isRunning()){ + runningJobId = jobId; + break; + } + } + + if(runningJobId == 0L){ + return ResponseEntity.status(HttpStatus.OK).body("No running job found"); + } + + + return ResponseEntity.status(HttpStatus.OK).body("Your batch job with id "+runningJobId+"has been paused"); + } + } From 815ce1bc5159111ab69f0f3f7dd7cc9d10ae64f5 Mon Sep 17 00:00:00 2001 From: TejasBhitle Date: Sun, 17 Dec 2023 12:42:16 -0500 Subject: [PATCH 2/6] Merge conflicts resolved --- .../controller/TransferController.java | 40 ++++++++++++++++++- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/controller/TransferController.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/controller/TransferController.java index 991c0441..f65042da 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/controller/TransferController.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/controller/TransferController.java @@ -53,6 +53,11 @@ public class TransferController { JobExplorer jobExplorer; Set jobIds; + + Long pausedJobId; + Integer pausedJobParallelism; + Integer pausedJobConcurrency; + public TransferController(Set jobIds){ this.jobIds = jobIds; } @@ -77,7 +82,7 @@ public ResponseEntity start(@RequestBody TransferJobRequest request) thr @Async public ResponseEntity pause() throws Exception{ logger.info("Pause Controller Entry point"); - Long runningJobId = 0L; + Long runningJobId = null; for(Long jobId : jobIds){ JobExecution jobExecution = jobExplorer.getJobExecution(jobId); if(jobExecution != null && jobExecution.isRunning()){ @@ -86,13 +91,44 @@ public ResponseEntity pause() throws Exception{ } } - if(runningJobId == 0L){ + if(runningJobId == null){ return ResponseEntity.status(HttpStatus.OK).body("No running job found"); } + pausedJobId = runningJobId; + + pausedJobParallelism = jc.getParallelismChunkListener().getParallelism(); + jc.getParallelismChunkListener().changeParallelism(0); + + pausedJobConcurrency = jc.getConcurrencyStepListener().getConcurrency(); + jc.getConcurrencyStepListener().changeConcurrency(0); return ResponseEntity.status(HttpStatus.OK).body("Your batch job with id "+runningJobId+"has been paused"); } + @RequestMapping(value = "/resume", method = RequestMethod.POST) + @Async + public ResponseEntity resume() throws Exception{ + logger.info("Pause Controller Entry point"); + if(pausedJobId == null){ + return ResponseEntity.status(HttpStatus.OK).body("No paused job found"); + } + + JobExecution jobExecution = jobExplorer.getJobExecution(pausedJobId); + if(jobExecution == null || !(jobExecution.getStatus().equals(BatchStatus.STOPPING) || jobExecution.getStatus().equals(BatchStatus.STOPPED))){ + return ResponseEntity.status(HttpStatus.OK).body("No paused job found with "+pausedJobId+" jobId"); + } + + jc.getParallelismChunkListener().changeParallelism(pausedJobParallelism); + jc.getConcurrencyStepListener().changeConcurrency(pausedJobConcurrency); + pausedJobParallelism = 0; + pausedJobConcurrency = 0; + + Long resumedJobId = pausedJobId; + pausedJobId = null; + + return ResponseEntity.status(HttpStatus.OK).body("Your batch job with id "+resumedJobId+"has been resumed"); + } + } From 27bca82e619cac246db349863dc1f401c1033b22 Mon Sep 17 00:00:00 2001 From: TejasBhitle Date: Tue, 28 Nov 2023 17:16:25 -0500 Subject: [PATCH 3/6] pause and resume API added --- .../odstransferservice/controller/TransferController.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/controller/TransferController.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/controller/TransferController.java index f65042da..40ec748e 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/controller/TransferController.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/controller/TransferController.java @@ -62,7 +62,6 @@ public TransferController(Set jobIds){ this.jobIds = jobIds; } - @RequestMapping(value = "/start", method = RequestMethod.POST) @Async public ResponseEntity start(@RequestBody TransferJobRequest request) throws Exception { @@ -75,6 +74,7 @@ public ResponseEntity start(@RequestBody TransferJobRequest request) thr jc.setRequest(request); Job job = jc.concurrentJobDefinition(); JobExecution jobExecution = asyncJobLauncher.run(job, parameters); + this.jobIds.add(jobExecution.getJobId()); return ResponseEntity.status(HttpStatus.OK).body("Your batch job has been submitted with \n ID: " + jobExecution.getJobId()); } @@ -82,7 +82,7 @@ public ResponseEntity start(@RequestBody TransferJobRequest request) thr @Async public ResponseEntity pause() throws Exception{ logger.info("Pause Controller Entry point"); - Long runningJobId = null; + Long runningJobId = 0L; for(Long jobId : jobIds){ JobExecution jobExecution = jobExplorer.getJobExecution(jobId); if(jobExecution != null && jobExecution.isRunning()){ @@ -91,7 +91,7 @@ public ResponseEntity pause() throws Exception{ } } - if(runningJobId == null){ + if(runningJobId == 0L){ return ResponseEntity.status(HttpStatus.OK).body("No running job found"); } From a6b3e7595e66ffa48608ea7038fe8095ebc02902 Mon Sep 17 00:00:00 2001 From: TejasBhitle Date: Wed, 29 Nov 2023 16:58:45 -0500 Subject: [PATCH 4/6] added logic to change concurrency and parallelism --- .../controller/TransferController.java | 14 ++++----- .../OdsTransferServiceApplicationTests.java | 30 +++++++++++++++++++ 2 files changed, 37 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/controller/TransferController.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/controller/TransferController.java index 40ec748e..7db5ff43 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/controller/TransferController.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/controller/TransferController.java @@ -8,12 +8,10 @@ import org.onedatashare.transferservice.odstransferservice.service.VfsExpander; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.batch.core.Job; -import org.springframework.batch.core.JobExecution; -import org.springframework.batch.core.JobParameters; -import org.springframework.batch.core.JobParametersBuilder; +import org.springframework.batch.core.*; import org.springframework.batch.core.explore.JobExplorer; import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.batch.core.launch.JobOperator; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; @@ -52,6 +50,9 @@ public class TransferController { @Autowired JobExplorer jobExplorer; + @Autowired + JobOperator jobOperator; + Set jobIds; Long pausedJobId; @@ -82,7 +83,7 @@ public ResponseEntity start(@RequestBody TransferJobRequest request) thr @Async public ResponseEntity pause() throws Exception{ logger.info("Pause Controller Entry point"); - Long runningJobId = 0L; + Long runningJobId = null; for(Long jobId : jobIds){ JobExecution jobExecution = jobExplorer.getJobExecution(jobId); if(jobExecution != null && jobExecution.isRunning()){ @@ -91,7 +92,7 @@ public ResponseEntity pause() throws Exception{ } } - if(runningJobId == 0L){ + if(runningJobId == null){ return ResponseEntity.status(HttpStatus.OK).body("No running job found"); } @@ -126,7 +127,6 @@ public ResponseEntity resume() throws Exception{ Long resumedJobId = pausedJobId; pausedJobId = null; - return ResponseEntity.status(HttpStatus.OK).body("Your batch job with id "+resumedJobId+"has been resumed"); } diff --git a/src/test/java/org/onedatashare/transferservice/odstransferservice/OdsTransferServiceApplicationTests.java b/src/test/java/org/onedatashare/transferservice/odstransferservice/OdsTransferServiceApplicationTests.java index b10ee3cc..9427eee5 100644 --- a/src/test/java/org/onedatashare/transferservice/odstransferservice/OdsTransferServiceApplicationTests.java +++ b/src/test/java/org/onedatashare/transferservice/odstransferservice/OdsTransferServiceApplicationTests.java @@ -1,8 +1,18 @@ package org.onedatashare.transferservice.odstransferservice; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.onedatashare.transferservice.odstransferservice.controller.TransferController; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.explore.JobExplorer; +import org.springframework.batch.core.launch.JobOperator; import org.springframework.boot.test.context.SpringBootTest; +import java.util.HashSet; +import java.util.Set; + +import static org.mockito.Mockito.*; + @SpringBootTest class OdsTransferServiceApplicationTests { @@ -10,4 +20,24 @@ class OdsTransferServiceApplicationTests { void contextLoads() { } + @Test + public void testPause() throws Exception { + // setup + long jobId = 5L; + Set jobIds = new HashSet<>(); + TransferController transferController = new TransferController(jobIds); + + JobExecution jobExecution = mock(JobExecution.class); + JobExplorer jobExplorer = mock(JobExplorer.class); + JobOperator jobOperator = mock(JobOperator.class); + Mockito.when(jobExplorer.getJobExecution(jobId)).thenReturn(jobExecution); + Mockito.when(jobExecution.isRunning()).thenReturn(Boolean.TRUE); + + //action + transferController.pause(); + + //assert + verify(jobOperator, times(1)).stop(jobId); + } + } From 854ed7c3be5913da90f361286e9f4afe93029dcd Mon Sep 17 00:00:00 2001 From: TejasBhitle Date: Sun, 17 Dec 2023 11:50:54 -0500 Subject: [PATCH 5/6] Code cleanup --- .../OdsTransferServiceApplicationTests.java | 38 ++----------------- 1 file changed, 4 insertions(+), 34 deletions(-) diff --git a/src/test/java/org/onedatashare/transferservice/odstransferservice/OdsTransferServiceApplicationTests.java b/src/test/java/org/onedatashare/transferservice/odstransferservice/OdsTransferServiceApplicationTests.java index 9427eee5..d6371fc7 100644 --- a/src/test/java/org/onedatashare/transferservice/odstransferservice/OdsTransferServiceApplicationTests.java +++ b/src/test/java/org/onedatashare/transferservice/odstransferservice/OdsTransferServiceApplicationTests.java @@ -1,43 +1,13 @@ package org.onedatashare.transferservice.odstransferservice; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; -import org.onedatashare.transferservice.odstransferservice.controller.TransferController; -import org.springframework.batch.core.JobExecution; -import org.springframework.batch.core.explore.JobExplorer; -import org.springframework.batch.core.launch.JobOperator; import org.springframework.boot.test.context.SpringBootTest; -import java.util.HashSet; -import java.util.Set; - -import static org.mockito.Mockito.*; - @SpringBootTest class OdsTransferServiceApplicationTests { - @Test - void contextLoads() { - } - - @Test - public void testPause() throws Exception { - // setup - long jobId = 5L; - Set jobIds = new HashSet<>(); - TransferController transferController = new TransferController(jobIds); - - JobExecution jobExecution = mock(JobExecution.class); - JobExplorer jobExplorer = mock(JobExplorer.class); - JobOperator jobOperator = mock(JobOperator.class); - Mockito.when(jobExplorer.getJobExecution(jobId)).thenReturn(jobExecution); - Mockito.when(jobExecution.isRunning()).thenReturn(Boolean.TRUE); - - //action - transferController.pause(); - - //assert - verify(jobOperator, times(1)).stop(jobId); - } + @Test + void contextLoads() { + } -} +} \ No newline at end of file From 28cbb8ae7f61a710e839497b6a04fa71f67091f2 Mon Sep 17 00:00:00 2001 From: TejasBhitle Date: Sun, 17 Dec 2023 12:50:51 -0500 Subject: [PATCH 6/6] Code cleanup --- .../controller/TransferController.java | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/controller/TransferController.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/controller/TransferController.java index 7db5ff43..ac76457e 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/controller/TransferController.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/controller/TransferController.java @@ -56,8 +56,6 @@ public class TransferController { Set jobIds; Long pausedJobId; - Integer pausedJobParallelism; - Integer pausedJobConcurrency; public TransferController(Set jobIds){ this.jobIds = jobIds; @@ -97,12 +95,7 @@ public ResponseEntity pause() throws Exception{ } pausedJobId = runningJobId; - - pausedJobParallelism = jc.getParallelismChunkListener().getParallelism(); - jc.getParallelismChunkListener().changeParallelism(0); - - pausedJobConcurrency = jc.getConcurrencyStepListener().getConcurrency(); - jc.getConcurrencyStepListener().changeConcurrency(0); + jobOperator.stop(pausedJobId); return ResponseEntity.status(HttpStatus.OK).body("Your batch job with id "+runningJobId+"has been paused"); } @@ -120,10 +113,7 @@ public ResponseEntity resume() throws Exception{ return ResponseEntity.status(HttpStatus.OK).body("No paused job found with "+pausedJobId+" jobId"); } - jc.getParallelismChunkListener().changeParallelism(pausedJobParallelism); - jc.getConcurrencyStepListener().changeConcurrency(pausedJobConcurrency); - pausedJobParallelism = 0; - pausedJobConcurrency = 0; + jobOperator.restart(pausedJobId); Long resumedJobId = pausedJobId; pausedJobId = null;