From f94c21bf2fab9c0252a5b2a156a3166643ff7fcf Mon Sep 17 00:00:00 2001 From: Venkata Sai Dheeraj Narayanabhatla Date: Mon, 8 May 2023 11:26:33 -0400 Subject: [PATCH 1/2] Implemented WebDAV reader and writer Implemented WebDAV reader and writer --- pom.xml | 5 ++ .../odstransferservice/Enum/EndpointType.java | 2 +- .../service/JobControl.java | 8 ++ .../service/step/webdav/WebDAVReader.java | 75 +++++++++++++++++++ .../service/step/webdav/WebDAVWriter.java | 71 ++++++++++++++++++ 5 files changed, 160 insertions(+), 1 deletion(-) create mode 100644 src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/webdav/WebDAVReader.java create mode 100644 src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/webdav/WebDAVWriter.java diff --git a/pom.xml b/pom.xml index ab72a72f..c36238ac 100644 --- a/pom.xml +++ b/pom.xml @@ -178,6 +178,11 @@ json 20211205 + + com.github.lookfirst + sardine + 5.10 + diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/Enum/EndpointType.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/Enum/EndpointType.java index 5dd3ca7f..72ba49aa 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/Enum/EndpointType.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/Enum/EndpointType.java @@ -1,5 +1,5 @@ package org.onedatashare.transferservice.odstransferservice.Enum; public enum EndpointType { - dropbox, gdrive, sftp, ftp, box, s3, gftp, http, vfs, scp + dropbox, gdrive, sftp, ftp, box, s3, gftp, http, vfs, scp, webdav } \ No newline at end of file diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobControl.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobControl.java index 13d6092c..52fa437f 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobControl.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobControl.java @@ -30,6 +30,8 @@ import org.onedatashare.transferservice.odstransferservice.service.step.sftp.SFTPWriter; import org.onedatashare.transferservice.odstransferservice.service.step.vfs.VfsReader; import org.onedatashare.transferservice.odstransferservice.service.step.vfs.VfsWriter; +import org.onedatashare.transferservice.odstransferservice.service.step.webdav.WebDAVReader; +import org.onedatashare.transferservice.odstransferservice.service.step.webdav.WebDAVWriter; import org.onedatashare.transferservice.odstransferservice.utility.ODSUtility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -174,6 +176,9 @@ protected AbstractItemCountingItemStreamItemReader getRightReader(End case gdrive: GDriveReader dDriveReader = new GDriveReader(request.getSource().getOauthSourceCredential(), fileInfo); return dDriveReader; + case webdav: + WebDAVReader webDAVReader = new WebDAVReader(request.getSource().getVfsSourceCredential(),fileInfo); + return webDAVReader; } return null; } @@ -227,6 +232,9 @@ protected ItemWriter getRightWriter(EndpointType type, EntityInfo fil writer.setPool(connectionBag.getGoogleDriveWriterPool()); return writer; } + case webdav: + WebDAVWriter writer = new WebDAVWriter(request.getDestination().getVfsDestCredential(),fileInfo,this.metricsCollector,this.influxCache); + return writer; } return null; } diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/webdav/WebDAVReader.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/webdav/WebDAVReader.java new file mode 100644 index 00000000..8d973def --- /dev/null +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/webdav/WebDAVReader.java @@ -0,0 +1,75 @@ +package org.onedatashare.transferservice.odstransferservice.service.step.webdav; + +import com.github.sardine.Sardine; +import com.github.sardine.SardineFactory; +import org.onedatashare.transferservice.odstransferservice.constant.ODSConstants; +import org.onedatashare.transferservice.odstransferservice.model.DataChunk; +import org.onedatashare.transferservice.odstransferservice.model.EntityInfo; +import org.onedatashare.transferservice.odstransferservice.model.FilePart; +import org.onedatashare.transferservice.odstransferservice.model.credential.AccountEndpointCredential; +import org.onedatashare.transferservice.odstransferservice.service.FilePartitioner; +import org.onedatashare.transferservice.odstransferservice.utility.ODSUtility; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.annotation.BeforeStep; +import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader; +import org.springframework.util.ClassUtils; + +import java.io.InputStream; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; +public class WebDAVReader extends AbstractItemCountingItemStreamItemReader { + + Logger logger = LoggerFactory.getLogger(WebDAVReader.class); + private final EntityInfo fileInfo; + private final AccountEndpointCredential credential; + private final FilePartitioner filePartitioner; + private Sardine client; + + private String fileName; + private String uri; + + public WebDAVReader(AccountEndpointCredential credential, EntityInfo fileInfo){ + this.fileInfo = fileInfo; + this.filePartitioner = new FilePartitioner(fileInfo.getChunkSize()); + this.setName(ClassUtils.getShortName(WebDAVReader.class)); + this.credential = credential; + } + + @BeforeStep + public void beforeStep(StepExecution stepExecution){ + + this.filePartitioner.createParts(this.fileInfo.getSize(), this.fileInfo.getId()); + this.fileName = fileInfo.getId(); + this.uri = credential.getUri() + Paths.get(fileInfo.getPath()).toString(); + } + @Override + protected DataChunk doRead() throws Exception { + FilePart filePart = this.filePartitioner.nextPart(); + if (filePart == null) return null; + Map headers = new HashMap<>(); + headers.put(ODSConstants.RANGE,String.format(ODSConstants.byteRange,filePart.getStart(), filePart.getEnd())); + InputStream response = client.get(this.uri,headers); + byte[] body = response.readAllBytes(); + DataChunk chunk = ODSUtility.makeChunk(body.length, body, filePart.getStart(), Long.valueOf(filePart.getPartIdx()).intValue(), this.fileName); + logger.info(chunk.toString()); + return chunk; + } + + @Override + protected void doOpen() throws Exception { + if(this.credential!=null && this.credential.getUsername()!=null){ + client = SardineFactory.begin(this.credential.getUsername(),this.credential.getSecret()); + }else{ + client = SardineFactory.begin(); + } + + } + + @Override + protected void doClose() throws Exception { + client.shutdown(); + } +} diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/webdav/WebDAVWriter.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/webdav/WebDAVWriter.java new file mode 100644 index 00000000..5a963963 --- /dev/null +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/webdav/WebDAVWriter.java @@ -0,0 +1,71 @@ +package org.onedatashare.transferservice.odstransferservice.service.step.webdav; + +import com.github.sardine.Sardine; +import com.github.sardine.SardineFactory; +import org.apache.http.HttpHeaders; +import org.onedatashare.transferservice.odstransferservice.model.DataChunk; +import org.onedatashare.transferservice.odstransferservice.model.EntityInfo; +import org.onedatashare.transferservice.odstransferservice.model.credential.AccountEndpointCredential; +import org.onedatashare.transferservice.odstransferservice.service.InfluxCache; +import org.onedatashare.transferservice.odstransferservice.service.cron.MetricsCollector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.annotation.AfterStep; +import org.springframework.batch.core.annotation.BeforeWrite; +import org.springframework.batch.item.ItemWriter; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.net.URLConnection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +public class WebDAVWriter implements ItemWriter { + Logger logger = LoggerFactory.getLogger(WebDAVWriter.class); + private EntityInfo fileInfo; + private Sardine client; + private AccountEndpointCredential credential; + private String uri; + private String fileName; + + public WebDAVWriter(AccountEndpointCredential credential, EntityInfo fileInfo, MetricsCollector metricsCollector, InfluxCache influxCache) { + this.credential = credential; + this.fileInfo = fileInfo; + this.uri = credential.getUri(); + } + + @BeforeWrite + public void beforeWrite(List items){ + this.fileName = items.get(0).getFileName(); + if(client==null){ + if(this.credential.getUsername()!=null){ + logger.debug("Setting authentication credentials for WebDAV client"); + client= SardineFactory.begin(this.credential.getUsername(),this.credential.getSecret()); + }else{ + client= SardineFactory.begin(); + } + } + } + + + @Override + public void write(List items) throws Exception { + for(DataChunk item : items){ + Map headers = new HashMap<>(); + long bytesTo = item.getStartPosition()+item.getSize() - 1; + headers.put(HttpHeaders.CONTENT_RANGE,"bytes "+item.getStartPosition()+"-"+bytesTo+"/"+this.fileInfo.getSize()); + boolean fileExist = client.exists(this.uri+item.getFileName()); + if(fileExist==true){ + logger.debug("File already exists. Overriding the file"); + } + client.put(this.uri+item.getFileName(),new ByteArrayInputStream(item.getData()), URLConnection.guessContentTypeFromName(item.getFileName()), true); + } + } + + @AfterStep + public void afterStep() throws IOException { + client.shutdown(); + } +} From 2720b0a3f7ff81922fc222fb24320a5896c56bee Mon Sep 17 00:00:00 2001 From: Venkata Sai Dheeraj Narayanabhatla Date: Mon, 8 May 2023 11:28:55 -0400 Subject: [PATCH 2/2] Refactoring webdav writer Refactoring webdav writer --- .../odstransferservice/service/step/webdav/WebDAVWriter.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/webdav/WebDAVWriter.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/webdav/WebDAVWriter.java index 5a963963..3f8d9750 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/webdav/WebDAVWriter.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/webdav/WebDAVWriter.java @@ -53,9 +53,6 @@ public void beforeWrite(List items){ @Override public void write(List items) throws Exception { for(DataChunk item : items){ - Map headers = new HashMap<>(); - long bytesTo = item.getStartPosition()+item.getSize() - 1; - headers.put(HttpHeaders.CONTENT_RANGE,"bytes "+item.getStartPosition()+"-"+bytesTo+"/"+this.fileInfo.getSize()); boolean fileExist = client.exists(this.uri+item.getFileName()); if(fileExist==true){ logger.debug("File already exists. Overriding the file");