diff --git a/app/controllers/ProjectEntryController.scala b/app/controllers/ProjectEntryController.scala index 819826f8..a4b6fa11 100644 --- a/app/controllers/ProjectEntryController.scala +++ b/app/controllers/ProjectEntryController.scala @@ -19,14 +19,14 @@ import services.RabbitMqSend.FixEvent import services.actors.Auditor import services.actors.creation.GenericCreationActor.{NewProjectRequest, ProjectCreateTransientData} import services.actors.creation.{CreationMessage, GenericCreationActor} -import services.{CreateOperation, UpdateOperation} +import services.{CreateOperation, UpdateOperation, ZipService} import slick.dbio.DBIOAction import slick.jdbc.PostgresProfile import slick.jdbc.PostgresProfile.api._ import slick.lifted.TableQuery -import java.io.File -import java.nio.file.Paths +import java.io.{BufferedInputStream, File, FileInputStream, PipedInputStream, PipedOutputStream} +import java.nio.file.{Files, Path, Paths} import java.time.ZonedDateTime import javax.inject.{Inject, Named, Singleton} import scala.concurrent.{Await, ExecutionContext, Future} @@ -38,20 +38,32 @@ import vidispine.{VSOnlineOutputMessage, VidispineCommunicator, VidispineConfig} import mes.OnlineOutputMessage import mess.InternalOnlineOutputMessage import akka.actor.ActorSystem -import akka.stream.Materializer +import akka.stream.{IOResult, Materializer} +import akka.stream.scaladsl.{Source, StreamConverters} +import akka.util.ByteString + +import java.io._ +import java.nio.file._ +import java.util.zip.{ZipEntry, ZipOutputStream} +import scala.concurrent.Future import java.util.concurrent.{Executors, TimeUnit} import de.geekonaut.slickmdc.MdcExecutionContext import services.RabbitMqSAN.SANEvent import com.om.mxs.client.japi.Vault -import akka.stream.scaladsl.{Keep, Sink, Source} +import akka.stream.scaladsl.{Concat, FileIO, Keep, Sink, Source, StreamConverters} +import akka.util.ByteString import mxscopy.streamcomponents.OMFastContentSearchSource import mxscopy.models.ObjectMatrixEntry import matrixstore.MatrixStoreEnvironmentConfigProvider import mxscopy.MXSConnectionBuilderImpl import mxscopy.MXSConnectionBuilder import services.RabbitMqMatrix.MatrixEvent + import java.util.Date import java.sql.Timestamp +import java.util.zip.{ZipEntry, ZipOutputStream} +import scala.compat.java8.StreamConverters.StreamHasToScala +import scala.jdk.CollectionConverters.IteratorHasAsScala @Singleton class ProjectEntryController @Inject() (@Named("project-creation-actor") projectCreationActor:ActorRef, @@ -64,14 +76,14 @@ class ProjectEntryController @Inject() (@Named("project-creation-actor") project @Named("rabbitmq-san") rabbitMqSAN:ActorRef, @Named("rabbitmq-matrix") rabbitMqMatrix:ActorRef, @Named("auditor") auditor:ActorRef, - override val controllerComponents:ControllerComponents, override val bearerTokenAuth:BearerTokenAuth) + + override val controllerComponents:ControllerComponents, override val bearerTokenAuth:BearerTokenAuth, zipService: ZipService) (implicit fileEntryDAO:FileEntryDAO, injector: Injector, mat: Materializer) extends GenericDatabaseObjectControllerWithFilter[ProjectEntry,ProjectEntryFilterTerms] with ProjectEntrySerializer with ProjectRequestSerializer with ProjectEntryFilterTermsSerializer with UpdateTitleRequestSerializer with FileEntrySerializer with AssetFolderFileEntrySerializer - with Security -{ - override implicit val cache:SyncCacheApi = cacheImpl + with Security { + override implicit val cache: SyncCacheApi = cacheImpl val dbConfig = dbConfigProvider.get[PostgresProfile] implicit val implicitConfig = config @@ -80,22 +92,22 @@ class ProjectEntryController @Inject() (@Named("project-creation-actor") project TableQuery[ProjectEntryRow].filter(_.id === requestedId).delete.asTry ) - override def selectid(requestedId: Int):Future[Try[Seq[ProjectEntry]]] = dbConfig.db.run( + override def selectid(requestedId: Int): Future[Try[Seq[ProjectEntry]]] = dbConfig.db.run( TableQuery[ProjectEntryRow].filter(_.id === requestedId).result.asTry ) - protected def selectVsid(vsid: String):Future[Try[Seq[ProjectEntry]]] = dbConfig.db.run( + protected def selectVsid(vsid: String): Future[Try[Seq[ProjectEntry]]] = dbConfig.db.run( TableQuery[ProjectEntryRow].filter(_.vidispineProjectId === vsid).result.asTry ) - override def dbupdate(itemId:Int, entry:ProjectEntry) :Future[Try[Int]] = { + override def dbupdate(itemId: Int, entry: ProjectEntry): Future[Try[Int]] = { logger.info(s"Updating project id ${itemId} and status ${entry.status}") val newRecord = entry.id match { - case Some(id)=>entry - case None=>entry.copy(id=Some(itemId)) + case Some(id) => entry + case None => entry.copy(id = Some(itemId)) } - dbConfig.db.run(TableQuery[ProjectEntryRow].filter(_.id===itemId).update(newRecord).asTry) + dbConfig.db.run(TableQuery[ProjectEntryRow].filter(_.id === itemId).update(newRecord).asTry) .map(rows => { sendToRabbitMq(UpdateOperation(), itemId, rabbitMqPropagator) rows @@ -104,9 +116,9 @@ class ProjectEntryController @Inject() (@Named("project-creation-actor") project override def notifyRequested[T](requestedId: Int, username: String, request: Request[T]): Unit = { request.headers.get("User-Agent") match { - case None=> - case Some(userAgent)=> - if(userAgent.contains("Mozilla")) { //we are only interested in logging requests that came from a browser, otherwise the log would fill with the automated requests + case None => + case Some(userAgent) => + if (userAgent.contains("Mozilla")) { //we are only interested in logging requests that came from a browser, otherwise the log would fill with the automated requests auditor ! Auditor.LogEvent( username, AuditAction.ViewProjectPage, @@ -119,64 +131,68 @@ class ProjectEntryController @Inject() (@Named("project-creation-actor") project } /** - * Fully generic container method to process an update request - * @param requestedId an ID to identify what should be updated, this is passed to `selector` - * @param selector a function that takes `requestedId` and returns a Future, containing a Try, containing a sequence of ProjectEntries - * that correspond to the provided ID - * @param f a function to perform the actual update. This is only called if selector returns a valid sequence of at least one ProjectEntry, - * and is called for each ProjectEntry in the sequence that `selector` returns. - * It should return a Future containing a Try containing the number of rows updated. - * @tparam T the data type of `requestedId` - * @return A Future containing a sequnce of results for each invokation of f. with either a Failure indicating why - * `f` was not called, or a Success with the result of `f` - */ - def doUpdateGenericSelector[T](requestedId:T, selector:T=>Future[Try[Seq[ProjectEntry]]])(f: ProjectEntry=>Future[Try[Int]]):Future[Seq[Try[Int]]] = selector(requestedId).flatMap({ - case Success(someSeq)=> - if(someSeq.isEmpty) - Future(Seq(Failure(new RecordNotFoundException(s"No records found for id $requestedId")))) - else - Future.sequence(someSeq.map(f)) - case Failure(error)=>Future(Seq(Failure(error))) + * Fully generic container method to process an update request + * + * @param requestedId an ID to identify what should be updated, this is passed to `selector` + * @param selector a function that takes `requestedId` and returns a Future, containing a Try, containing a sequence of ProjectEntries + * that correspond to the provided ID + * @param f a function to perform the actual update. This is only called if selector returns a valid sequence of at least one ProjectEntry, + * and is called for each ProjectEntry in the sequence that `selector` returns. + * It should return a Future containing a Try containing the number of rows updated. + * @tparam T the data type of `requestedId` + * @return A Future containing a sequnce of results for each invokation of f. with either a Failure indicating why + * `f` was not called, or a Success with the result of `f` + */ + def doUpdateGenericSelector[T](requestedId: T, selector: T => Future[Try[Seq[ProjectEntry]]])(f: ProjectEntry => Future[Try[Int]]): Future[Seq[Try[Int]]] = selector(requestedId).flatMap({ + case Success(someSeq) => + if (someSeq.isEmpty) + Future(Seq(Failure(new RecordNotFoundException(s"No records found for id $requestedId")))) + else + Future.sequence(someSeq.map(f)) + case Failure(error) => Future(Seq(Failure(error))) }) /** - * Most updates are done with the primary key, this is a convenience method to call [[doUpdateGenericSelector]] - * with the appropriate selector and data type for the primary key - * @param requestedId integer primary key value identifying what should be updated - * @param f a function to perform the actual update. See [[doUpdateGenericSelector]] for details - * @return see [[doUpdateGenericSelector]] - */ - def doUpdateGeneric(requestedId:Int)(f: ProjectEntry=>Future[Try[Int]]) = doUpdateGenericSelector[Int](requestedId,selectid)(f) + * Most updates are done with the primary key, this is a convenience method to call [[doUpdateGenericSelector]] + * with the appropriate selector and data type for the primary key + * + * @param requestedId integer primary key value identifying what should be updated + * @param f a function to perform the actual update. See [[doUpdateGenericSelector]] for details + * @return see [[doUpdateGenericSelector]] + */ + def doUpdateGeneric(requestedId: Int)(f: ProjectEntry => Future[Try[Int]]) = doUpdateGenericSelector[Int](requestedId, selectid)(f) /** - * Update the vidisipineId on a data record - * @param requestedId primary key of the record to update - * @param newVsid new vidispine ID. Note that this is an Option[String] as the id can be null - * @return a Future containing a Try containing an Int describing the number of records updated - */ - def doUpdateVsid(requestedId:Int, newVsid:Option[String]):Future[Seq[Try[Int]]] = doUpdateGeneric(requestedId){ record=> - val updatedProjectEntry = record.copy (vidispineProjectId = newVsid) - dbConfig.db.run ( - TableQuery[ProjectEntryRow].filter (_.id === requestedId).update (updatedProjectEntry).asTry - ) - .map(rows => { - sendToRabbitMq(UpdateOperation(), requestedId, rabbitMqPropagator) - rows - }) + * Update the vidisipineId on a data record + * + * @param requestedId primary key of the record to update + * @param newVsid new vidispine ID. Note that this is an Option[String] as the id can be null + * @return a Future containing a Try containing an Int describing the number of records updated + */ + def doUpdateVsid(requestedId: Int, newVsid: Option[String]): Future[Seq[Try[Int]]] = doUpdateGeneric(requestedId) { record => + val updatedProjectEntry = record.copy(vidispineProjectId = newVsid) + dbConfig.db.run( + TableQuery[ProjectEntryRow].filter(_.id === requestedId).update(updatedProjectEntry).asTry + ) + .map(rows => { + sendToRabbitMq(UpdateOperation(), requestedId, rabbitMqPropagator) + rows + }) } /** - * generic code for an endpoint to update the title - * @param requestedId identifier of the record to update - * @param updater function to perform the actual update. This is passed requestedId and a string to change the title to - * @tparam T type of @reqestedId - * @return a Future[Response] - */ - def genericUpdateTitleEndpoint[T](requestedId:T)(updater:(T,String)=>Future[Seq[Try[Int]]]) = IsAuthenticatedAsync(parse.json) {uid=>{request=> + * generic code for an endpoint to update the title + * + * @param requestedId identifier of the record to update + * @param updater function to perform the actual update. This is passed requestedId and a string to change the title to + * @tparam T type of @reqestedId + * @return a Future[Response] + */ + def genericUpdateTitleEndpoint[T](requestedId: T)(updater: (T, String) => Future[Seq[Try[Int]]]) = IsAuthenticatedAsync(parse.json) { uid => { request => request.body.validate[UpdateTitleRequest].fold( - errors=> - Future(BadRequest(Json.obj("status"->"error", "detail"->JsError.toJson(errors)))), - updateTitleRequest=> { + errors => + Future(BadRequest(Json.obj("status" -> "error", "detail" -> JsError.toJson(errors)))), + updateTitleRequest => { val results = updater(requestedId, updateTitleRequest.newTitle).map(_.partition(_.isSuccess)) results.map(resultTuple => { @@ -190,37 +206,40 @@ class ProjectEntryController @Inject() (@Named("project-creation-actor") project }) } ) - }} + } + } /** - * endpoint to update project title field of record based on primary key - * @param requestedId - * @return - */ - def updateTitle(requestedId:Int) = genericUpdateTitleEndpoint[Int](requestedId) { (requestedId,newTitle)=> - doUpdateGeneric(requestedId) {record=> - val updatedProjectEntry = record.copy (projectTitle = newTitle) - dbConfig.db.run ( - TableQuery[ProjectEntryRow].filter (_.id === requestedId).update (updatedProjectEntry).asTry - ) - .map(rows => { - sendToRabbitMq(UpdateOperation(), requestedId, rabbitMqPropagator) - rows - }) + * endpoint to update project title field of record based on primary key + * + * @param requestedId + * @return + */ + def updateTitle(requestedId: Int) = genericUpdateTitleEndpoint[Int](requestedId) { (requestedId, newTitle) => + doUpdateGeneric(requestedId) { record => + val updatedProjectEntry = record.copy(projectTitle = newTitle) + dbConfig.db.run( + TableQuery[ProjectEntryRow].filter(_.id === requestedId).update(updatedProjectEntry).asTry + ) + .map(rows => { + sendToRabbitMq(UpdateOperation(), requestedId, rabbitMqPropagator) + rows + }) } } /** - * endoint to update project title field of record based on vidispine id - * @param vsid - * @return - */ - def updateTitleByVsid(vsid:String) = genericUpdateTitleEndpoint[String](vsid) { (vsid,newTitle)=> - doUpdateGenericSelector[String](vsid,selectVsid) { record=> //this lambda function is called once for each record + * endoint to update project title field of record based on vidispine id + * + * @param vsid + * @return + */ + def updateTitleByVsid(vsid: String) = genericUpdateTitleEndpoint[String](vsid) { (vsid, newTitle) => + doUpdateGenericSelector[String](vsid, selectVsid) { record => //this lambda function is called once for each record val updatedProjectEntry = record.copy(projectTitle = newTitle) dbConfig.db.run( - TableQuery[ProjectEntryRow].filter(_.id === record.id.get).update(updatedProjectEntry).asTry - ) + TableQuery[ProjectEntryRow].filter(_.id === record.id.get).update(updatedProjectEntry).asTry + ) .map(rows => { sendToRabbitMq(UpdateOperation(), record, rabbitMqPropagator) rows @@ -229,34 +248,35 @@ class ProjectEntryController @Inject() (@Named("project-creation-actor") project } - def genericHandleFailures[T](failures:Seq[Try[Int]], requestedId:T) = { - val notFoundFailures = failures.filter(_.failed.get.getClass==classOf[RecordNotFoundException]) + def genericHandleFailures[T](failures: Seq[Try[Int]], requestedId: T) = { + val notFoundFailures = failures.filter(_.failed.get.getClass == classOf[RecordNotFoundException]) - if(notFoundFailures.length==failures.length) { + if (notFoundFailures.length == failures.length) { NotFound(Json.obj("status" -> "error", "detail" -> s"no records found for $requestedId")) } else { InternalServerError(Json.obj("status" -> "error", "detail" -> failures.map(_.failed.get.toString))) } } - def filesList(requestedId: Int, allVersions: Boolean) = IsAuthenticatedAsync {uid=>{request=> + def filesList(requestedId: Int, allVersions: Boolean) = IsAuthenticatedAsync { uid => { request => implicit val db = dbConfig.db selectid(requestedId).flatMap({ - case Failure(error)=> - logger.error(s"could not list files from project ${requestedId}",error) - Future(InternalServerError(Json.obj("status"->"error","detail"->error.toString))) - case Success(someSeq)=> + case Failure(error) => + logger.error(s"could not list files from project ${requestedId}", error) + Future(InternalServerError(Json.obj("status" -> "error", "detail" -> error.toString))) + case Success(someSeq) => someSeq.headOption match { //matching on pk, so can only be one result - case Some(projectEntry)=> - projectEntry.associatedFiles(allVersions).map(fileList=>Ok(Json.obj("status"->"ok","files"->fileList))) - case None=> - Future(NotFound(Json.obj("status"->"error","detail"->s"project $requestedId not found"))) + case Some(projectEntry) => + projectEntry.associatedFiles(allVersions).map(fileList => Ok(Json.obj("status" -> "ok", "files" -> fileList))) + case None => + Future(NotFound(Json.obj("status" -> "error", "detail" -> s"project $requestedId not found"))) } }) - }} + } + } - override def selectall(startAt:Int, limit:Int) = dbConfig.db.run( + override def selectall(startAt: Int, limit: Int) = dbConfig.db.run( TableQuery[ProjectEntryRow].length.result.zip( TableQuery[ProjectEntryRow].sortBy(_.created.desc).drop(startAt).take(limit).result ) @@ -274,93 +294,98 @@ class ProjectEntryController @Inject() (@Named("project-creation-actor") project ).map(Success(_)).recover(Failure(_)) } - override def jstranslate(result: Seq[ProjectEntry]):Json.JsValueWrapper = result - override def jstranslate(result: ProjectEntry):Json.JsValueWrapper = result //implicit translation should handle this + override def jstranslate(result: Seq[ProjectEntry]): Json.JsValueWrapper = result + + override def jstranslate(result: ProjectEntry): Json.JsValueWrapper = result //implicit translation should handle this /*this is pointless because of the override of [[create]] below, so it should not get called, but is needed to conform to the [[GenericDatabaseObjectController]] protocol*/ - override def insert(entry: ProjectEntry,uid:String) = Future(Failure(new RuntimeException("ProjectEntryController::insert should not have been called"))) + override def insert(entry: ProjectEntry, uid: String) = Future(Failure(new RuntimeException("ProjectEntryController::insert should not have been called"))) - override def validate(request:Request[JsValue]) = request.body.validate[ProjectEntry] + override def validate(request: Request[JsValue]) = request.body.validate[ProjectEntry] override def validateFilterParams(request: Request[JsValue]): JsResult[ProjectEntryFilterTerms] = request.body.validate[ProjectEntryFilterTerms] private val vsidValidator = "^\\w{2}-\\d+$".r - def getByVsid(vsid:String) = IsAuthenticatedAsync { uid=> request=> - if(vsidValidator.matches(vsid)) { - dbConfig.db.run { - TableQuery[ProjectEntryRow].filter(_.vidispineProjectId===vsid).sortBy(_.created.desc).result - }.map(_.headOption match { - case Some(projectRecord)=> - Ok(Json.obj("status"->"ok","result"->projectRecord)) - case None=> - NotFound(Json.obj("status"->"notfound","detail"->"No project with that VSID")) - }).recover({ - case err:Throwable=> - logger.error(s"Could not look up VSID $vsid: ", err) - InternalServerError(Json.obj("status"->"error","detail"->"Database error looking up record, see server logs")) - }) - } else { - Future(BadRequest(Json.obj("status"->"bad_request","detail"->"Malformed vidispine ID"))) - } + def getByVsid(vsid: String) = IsAuthenticatedAsync { uid => + request => + if (vsidValidator.matches(vsid)) { + dbConfig.db.run { + TableQuery[ProjectEntryRow].filter(_.vidispineProjectId === vsid).sortBy(_.created.desc).result + }.map(_.headOption match { + case Some(projectRecord) => + Ok(Json.obj("status" -> "ok", "result" -> projectRecord)) + case None => + NotFound(Json.obj("status" -> "notfound", "detail" -> "No project with that VSID")) + }).recover({ + case err: Throwable => + logger.error(s"Could not look up VSID $vsid: ", err) + InternalServerError(Json.obj("status" -> "error", "detail" -> "Database error looking up record, see server logs")) + }) + } else { + Future(BadRequest(Json.obj("status" -> "bad_request", "detail" -> "Malformed vidispine ID"))) + } } - def createFromFullRequest(rq:ProjectRequestFull) = { - implicit val timeout:akka.util.Timeout = 60.seconds + def createFromFullRequest(rq: ProjectRequestFull) = { + implicit val timeout: akka.util.Timeout = 60.seconds val initialData = ProjectCreateTransientData(None, None, None) - val msg = NewProjectRequest(rq,None,initialData) + val msg = NewProjectRequest(rq, None, initialData) (projectCreationActor ? msg).mapTo[CreationMessage].map({ - case GenericCreationActor.ProjectCreateSucceeded(succeededRequest, projectEntry)=> + case GenericCreationActor.ProjectCreateSucceeded(succeededRequest, projectEntry) => logger.info(s"Created new project: $projectEntry") sendToRabbitMq(CreateOperation(), projectEntry, rabbitMqPropagator) - Ok(Json.obj("status"->"ok","detail"->"created project", "projectId"->projectEntry.id.get)) - case GenericCreationActor.ProjectCreateFailed(failedRequest, error)=> + Ok(Json.obj("status" -> "ok", "detail" -> "created project", "projectId" -> projectEntry.id.get)) + case GenericCreationActor.ProjectCreateFailed(failedRequest, error) => logger.error("Could not create new project", error) - InternalServerError(Json.obj("status"->"error","detail"->error.toString)) + InternalServerError(Json.obj("status" -> "error", "detail" -> error.toString)) }) } - override def create = IsAuthenticatedAsync(parse.json) {uid=>{ request => + override def create = IsAuthenticatedAsync(parse.json) { uid => { request => implicit val db = dbConfig.db request.body.validate[ProjectRequest].fold( - errors=> - Future(BadRequest(Json.obj("status"->"error","detail"->JsError.toJson(errors)))), - projectRequest=> { - val fullRequestFuture=projectRequest.hydrate + errors => + Future(BadRequest(Json.obj("status" -> "error", "detail" -> JsError.toJson(errors)))), + projectRequest => { + val fullRequestFuture = projectRequest.hydrate fullRequestFuture.flatMap({ - case None=> - Future(BadRequest(Json.obj("status"->"error","detail"->"Invalid template or storage ID"))) - case Some(rq)=> + case None => + Future(BadRequest(Json.obj("status" -> "error", "detail" -> "Invalid template or storage ID"))) + case Some(rq) => createFromFullRequest(rq) }) }) - }} + } + } - def getDistinctOwnersList:Future[Try[Seq[String]]] = { + def getDistinctOwnersList: Future[Try[Seq[String]]] = { //work around distinctOn bug - https://github.com/slick/slick/issues/1712 dbConfig.db.run(sql"""select distinct(s_user) from "ProjectEntry" where s_user not like '%|%'""".as[String].asTry) } - def distinctOwners = IsAuthenticatedAsync {uid=>{request=> + def distinctOwners = IsAuthenticatedAsync { uid => { request => getDistinctOwnersList.map({ - case Success(ownerList)=> - Ok(Json.obj("status"->"ok","result"->ownerList)) - case Failure(error)=> + case Success(ownerList) => + Ok(Json.obj("status" -> "ok", "result" -> ownerList)) + case Failure(error) => logger.error("Could not look up distinct project owners: ", error) - InternalServerError(Json.obj("status"->"error","detail"->error.toString)) + InternalServerError(Json.obj("status" -> "error", "detail" -> error.toString)) }) - }} + } + } /** - * respond to CORS options requests for login from vaultdoor - * see https://developer.mozilla.org/en-US/docs/Glossary/Preflight_request - * @return - */ - def searchOptions = Action { request=> + * respond to CORS options requests for login from vaultdoor + * see https://developer.mozilla.org/en-US/docs/Glossary/Preflight_request + * + * @return + */ + def searchOptions = Action { request => AllowCORSFunctions.checkCorsOrigins(config, request) match { case Right(allowedOrigin) => val returnHeaders = Map( @@ -378,46 +403,47 @@ class ProjectEntryController @Inject() (@Named("project-creation-actor") project } } - def projectWasOpened(id: Int): EssentialAction = IsAuthenticatedAsync { uid=> request => - import models.EntryStatusMapper._ + def projectWasOpened(id: Int): EssentialAction = IsAuthenticatedAsync { uid => + request => + import models.EntryStatusMapper._ - def updateProject() = TableQuery[ProjectEntryRow] - .filter(_.id === id) - .filter(_.status === EntryStatus.New) - .map(_.status) - .update(EntryStatus.InProduction) - .map(rows => { - if (rows > 0) { - sendToRabbitMq(UpdateOperation(), id, rabbitMqPropagator) - } - }) + def updateProject() = TableQuery[ProjectEntryRow] + .filter(_.id === id) + .filter(_.status === EntryStatus.New) + .map(_.status) + .update(EntryStatus.InProduction) + .map(rows => { + if (rows > 0) { + sendToRabbitMq(UpdateOperation(), id, rabbitMqPropagator) + } + }) - def updateCommission(commissionId: Option[Int]) = TableQuery[PlutoCommissionRow] - .filter(_.id === commissionId) - .filter(_.status === EntryStatus.New) - .map(_.status) - .update(EntryStatus.InProduction).flatMap(rows => { - if (rows > 0) { - TableQuery[PlutoCommissionRow].filter(_.id === commissionId).result.map({ - case Seq() => - logger.error(s"Failed to update commission, commission not updated: $commissionId") - throw new IllegalStateException(s"Failed to update commission, commission not updated: $commissionId") - case Seq(commission) => - val commissionsSerializer = new PlutoCommissionSerializer {} - implicit val commissionsWrites: Writes[PlutoCommission] = commissionsSerializer.plutoCommissionWrites - rabbitMqPropagator ! ChangeEvent(Seq(commissionsWrites.writes(commission)), getItemType(commission), UpdateOperation()) - case _ => - logger.error(s"Failed to update commission, multiple commissions updated: $commissionId") - throw new IllegalStateException(s"Failed to update commission, multiple commissions updated: $commissionId") + def updateCommission(commissionId: Option[Int]) = TableQuery[PlutoCommissionRow] + .filter(_.id === commissionId) + .filter(_.status === EntryStatus.New) + .map(_.status) + .update(EntryStatus.InProduction).flatMap(rows => { + if (rows > 0) { + TableQuery[PlutoCommissionRow].filter(_.id === commissionId).result.map({ + case Seq() => + logger.error(s"Failed to update commission, commission not updated: $commissionId") + throw new IllegalStateException(s"Failed to update commission, commission not updated: $commissionId") + case Seq(commission) => + val commissionsSerializer = new PlutoCommissionSerializer {} + implicit val commissionsWrites: Writes[PlutoCommission] = commissionsSerializer.plutoCommissionWrites + rabbitMqPropagator ! ChangeEvent(Seq(commissionsWrites.writes(commission)), getItemType(commission), UpdateOperation()) + case _ => + logger.error(s"Failed to update commission, multiple commissions updated: $commissionId") + throw new IllegalStateException(s"Failed to update commission, multiple commissions updated: $commissionId") + }) + } else { + DBIOAction.successful(()) + } }) - } else { - DBIOAction.successful(()) - } - }) - auditor ! Auditor.LogEvent(uid, AuditAction.OpenProject, id, ZonedDateTime.now(), request.headers.get("User-Agent")) + auditor ! Auditor.LogEvent(uid, AuditAction.OpenProject, id, ZonedDateTime.now(), request.headers.get("User-Agent")) - dbConfig.db.run( + dbConfig.db.run( TableQuery[ProjectEntryRow] .filter(_.id === id) .result @@ -432,14 +458,14 @@ class ProjectEntryController @Inject() (@Named("project-creation-actor") project } acts }) - ).recover({ - case err: Throwable => - logger.error("Failed to mark project as opened", err) - InternalServerError(Json.obj("status" -> "error", "detail" -> "Failed to mark project as opened")) - }) + ).recover({ + case err: Throwable => + logger.error("Failed to mark project as opened", err) + InternalServerError(Json.obj("status" -> "error", "detail" -> "Failed to mark project as opened")) + }) } - private def updateStatusColumn(projectId:Int, newValue:EntryStatus.Value) = { + private def updateStatusColumn(projectId: Int, newValue: EntryStatus.Value) = { import EntryStatusMapper._ dbConfig.db.run { @@ -448,129 +474,137 @@ class ProjectEntryController @Inject() (@Named("project-creation-actor") project } } - def updateStatus(projectId: Int) = IsAuthenticatedAsync(parse.json) {uid=> request=> - import PlutoCommissionStatusUpdateRequestSerializer._ - request.body.validate[PlutoCommissionStatusUpdateRequest].fold( - invalidErrs=> - Future(BadRequest(Json.obj("status"->"bad_request","detail"->JsError.toJson(invalidErrs)))), - requiredUpdate=> - updateStatusColumn(projectId, requiredUpdate.status).map(rowsUpdated=>{ - if(rowsUpdated==0){ - NotFound(Json.obj("status"->"not_found","detail"->s"No project with id $projectId")) - } else { - if(rowsUpdated>1) logger.error(s"Status update request for project $projectId returned $rowsUpdated rows updated, expected 1! This indicates a database problem") - auditor ! Auditor.LogEvent(uid, AuditAction.ChangeProjectStatus, projectId, ZonedDateTime.now, request.headers.get("User-Agent")) - sendToRabbitMq(UpdateOperation(), projectId, rabbitMqPropagator).foreach(_ => ()) - Ok(Json.obj("status"->"ok","detail"->"Project status updated")) - } - }).recover({ - case err:Throwable=> - logger.error(s"Could not update status of project $projectId to ${requiredUpdate.status}: ", err) - InternalServerError(Json.obj("status"->"db_error","detail"->"Database error, see logs for details")) - }) - ) + def updateStatus(projectId: Int) = IsAuthenticatedAsync(parse.json) { uid => + request => + import PlutoCommissionStatusUpdateRequestSerializer._ + request.body.validate[PlutoCommissionStatusUpdateRequest].fold( + invalidErrs => + Future(BadRequest(Json.obj("status" -> "bad_request", "detail" -> JsError.toJson(invalidErrs)))), + requiredUpdate => + updateStatusColumn(projectId, requiredUpdate.status).map(rowsUpdated => { + if (rowsUpdated == 0) { + NotFound(Json.obj("status" -> "not_found", "detail" -> s"No project with id $projectId")) + } else { + if (rowsUpdated > 1) logger.error(s"Status update request for project $projectId returned $rowsUpdated rows updated, expected 1! This indicates a database problem") + auditor ! Auditor.LogEvent(uid, AuditAction.ChangeProjectStatus, projectId, ZonedDateTime.now, request.headers.get("User-Agent")) + sendToRabbitMq(UpdateOperation(), projectId, rabbitMqPropagator).foreach(_ => ()) + Ok(Json.obj("status" -> "ok", "detail" -> "Project status updated")) + } + }).recover({ + case err: Throwable => + logger.error(s"Could not update status of project $projectId to ${requiredUpdate.status}: ", err) + InternalServerError(Json.obj("status" -> "db_error", "detail" -> "Database error, see logs for details")) + }) + ) } - def queryUsersForAutocomplete(prefix:String, limit:Option[Int]) = IsAuthenticatedAsync { uid=> request=> - implicit val db = dbConfig.db - implicit val ordering = Ordering.String - ProjectEntry.listUsers(prefix, limit.getOrElse(10)) - .map(results=>{ - Ok(Json.obj("status"->"ok","users"->results.sorted)) - }) - .recover({ - case err:Throwable=> - logger.error(s"Could not look up users with prefix $prefix and limit ${limit.getOrElse(10)}: ${err.getMessage}", err) - InternalServerError(Json.obj("status"->"db_error", "detail"->"Database error, see logs for details")) - }) + def queryUsersForAutocomplete(prefix: String, limit: Option[Int]) = IsAuthenticatedAsync { uid => + request => + implicit val db = dbConfig.db + implicit val ordering = Ordering.String + ProjectEntry.listUsers(prefix, limit.getOrElse(10)) + .map(results => { + Ok(Json.obj("status" -> "ok", "users" -> results.sorted)) + }) + .recover({ + case err: Throwable => + logger.error(s"Could not look up users with prefix $prefix and limit ${limit.getOrElse(10)}: ${err.getMessage}", err) + InternalServerError(Json.obj("status" -> "db_error", "detail" -> "Database error, see logs for details")) + }) } - def isUserKnown(uname:String) = IsAuthenticatedAsync { uid=> request=> - implicit val db = dbConfig.db + def isUserKnown(uname: String) = IsAuthenticatedAsync { uid => + request => + implicit val db = dbConfig.db - ProjectEntry.isUserKnown(uname) - .map(result=>Ok(Json.obj("status"->"ok", "known"->result))) - .recover(err=>{ - logger.error(s"Could not check if '$uname' is known: ${err.getMessage}", err) - InternalServerError(Json.obj("status"->"error", "detail"->"Database error, see logs for details")) - }) + ProjectEntry.isUserKnown(uname) + .map(result => Ok(Json.obj("status" -> "ok", "known" -> result))) + .recover(err => { + logger.error(s"Could not check if '$uname' is known: ${err.getMessage}", err) + InternalServerError(Json.obj("status" -> "error", "detail" -> "Database error, see logs for details")) + }) } object SortDirection extends Enumeration { val desc, asc = Value } - private def getSortDirection(directionString:String):Option[SortDirection.Value] = Try { SortDirection.withName(directionString) }.toOption + private def getSortDirection(directionString: String): Option[SortDirection.Value] = Try { + SortDirection.withName(directionString) + }.toOption - def obitsListSorted(name:Option[String], startAt:Int, limit:Int, sort: String, sortDirection: String) = IsAuthenticatedAsync { uid => request => - implicit val db = dbConfig.db + def obitsListSorted(name: Option[String], startAt: Int, limit: Int, sort: String, sortDirection: String) = IsAuthenticatedAsync { uid => + request => + implicit val db = dbConfig.db - val baseQuery = name match { - case None=> - TableQuery[ProjectEntryRow].filter(_.isObitProject.nonEmpty) - case Some(obitName)=> - TableQuery[ProjectEntryRow].filter(_.isObitProject.toLowerCase like s"%$obitName%") - } + val baseQuery = name match { + case None => + TableQuery[ProjectEntryRow].filter(_.isObitProject.nonEmpty) + case Some(obitName) => + TableQuery[ProjectEntryRow].filter(_.isObitProject.toLowerCase like s"%$obitName%") + } - val sortedQuery = (sort, getSortDirection(sortDirection).getOrElse(SortDirection.asc)) match { - case ("created", SortDirection.desc) => baseQuery.sortBy(_.created.desc) - case ("created", SortDirection.asc) => baseQuery.sortBy(_.created.asc) - case ("title", SortDirection.desc) => baseQuery.sortBy(_.projectTitle.desc) - case ("title", SortDirection.asc) => baseQuery.sortBy(_.projectTitle.asc) - case ("isObitProject", SortDirection.desc) => baseQuery.sortBy(_.isObitProject.desc) - case ("isObitProject", SortDirection.asc) => baseQuery.sortBy(_.isObitProject.asc) - case _ => - logger.warn(s"Sort field $sort was not recognised, ignoring.") - baseQuery - } + val sortedQuery = (sort, getSortDirection(sortDirection).getOrElse(SortDirection.asc)) match { + case ("created", SortDirection.desc) => baseQuery.sortBy(_.created.desc) + case ("created", SortDirection.asc) => baseQuery.sortBy(_.created.asc) + case ("title", SortDirection.desc) => baseQuery.sortBy(_.projectTitle.desc) + case ("title", SortDirection.asc) => baseQuery.sortBy(_.projectTitle.asc) + case ("isObitProject", SortDirection.desc) => baseQuery.sortBy(_.isObitProject.desc) + case ("isObitProject", SortDirection.asc) => baseQuery.sortBy(_.isObitProject.asc) + case _ => + logger.warn(s"Sort field $sort was not recognised, ignoring.") + baseQuery + } - db.run( - for { - content <- sortedQuery.drop(startAt).take(limit).result - count <- sortedQuery.length.result - } yield (content, count) - ) - .map(results=>Ok(Json.obj("status"->"ok","count"->results._2,"result"->jstranslate(results._1)))) - .recover({ - case err:Throwable=> - logger.error(s"Could not query database for obituaries: ${err.getMessage}", err) - InternalServerError(Json.obj("status"->"error", "detail"->"Database error, see logs for details")) - }) + db.run( + for { + content <- sortedQuery.drop(startAt).take(limit).result + count <- sortedQuery.length.result + } yield (content, count) + ) + .map(results => Ok(Json.obj("status" -> "ok", "count" -> results._2, "result" -> jstranslate(results._1)))) + .recover({ + case err: Throwable => + logger.error(s"Could not query database for obituaries: ${err.getMessage}", err) + InternalServerError(Json.obj("status" -> "error", "detail" -> "Database error, see logs for details")) + }) } /** - * Returns a JSON object containing a list of strings for names of valid obituaries startig with the given prefix. - * If no prefix is supplied, then everything is returned (up to the given limit) - * @param prefix optional prefix to limit the search to - * @param limit don't return more than this number of results - * @return - */ - def findAvailableObits(prefix:Option[String], limit:Int) = IsAuthenticatedAsync { uid=> request=> - implicit val db = dbConfig.db - implicit val ordering = Ordering.String - ProjectEntry.listObits(prefix.getOrElse(""), limit) - .map(results=>{ - Ok(Json.obj("status"->"ok","obitNames"->results.sorted)) - }) - .recover({ - case err:Throwable=> - logger.error(s"Could not look up obituaries with prefix $prefix and limit ${limit}: ${err.getMessage}", err) - InternalServerError(Json.obj("status"->"db_error", "detail"->"Database error, see logs for details")) - }) + * Returns a JSON object containing a list of strings for names of valid obituaries startig with the given prefix. + * If no prefix is supplied, then everything is returned (up to the given limit) + * + * @param prefix optional prefix to limit the search to + * @param limit don't return more than this number of results + * @return + */ + def findAvailableObits(prefix: Option[String], limit: Int) = IsAuthenticatedAsync { uid => + request => + implicit val db = dbConfig.db + implicit val ordering = Ordering.String + ProjectEntry.listObits(prefix.getOrElse(""), limit) + .map(results => { + Ok(Json.obj("status" -> "ok", "obitNames" -> results.sorted)) + }) + .recover({ + case err: Throwable => + logger.error(s"Could not look up obituaries with prefix $prefix and limit ${limit}: ${err.getMessage}", err) + InternalServerError(Json.obj("status" -> "db_error", "detail" -> "Database error, see logs for details")) + }) } - def assetFolderForProject(projectId:Int) = { + def assetFolderForProject(projectId: Int) = { implicit val db = dbConfig.db db.run( TableQuery[ProjectMetadataRow] - .filter(_.key===ProjectMetadata.ASSET_FOLDER_KEY) - .filter(_.projectRef===projectId) + .filter(_.key === ProjectMetadata.ASSET_FOLDER_KEY) + .filter(_.projectRef === projectId) .result - ).map(results=>{ + ).map(results => { val resultCount = results.length - if(resultCount==0){ + if (resultCount == 0) { logger.error("No asset folder registered under that project id.") - } else if(resultCount>1){ + } else if (resultCount > 1) { logger.warn(s"Multiple asset folders found for project $projectId: $results") } else { results.head.value.getOrElse("") @@ -581,12 +615,13 @@ class ProjectEntryController @Inject() (@Named("project-creation-actor") project }) } - def fixPermissions(projectId: Int) = IsAuthenticatedAsync {uid=> request=> - val assetFolderString = Await.result(assetFolderForProject(projectId), Duration.Inf).toString - val fileName = Paths.get(assetFolderString).getFileName.toString - val parentDir = Paths.get(assetFolderString).getParent.toString - rabbitMqSend ! FixEvent(true,false,fileName,parentDir) - Future(Ok(Json.obj("status"->"ok","detail"->"Fix permissions run."))) + def fixPermissions(projectId: Int) = IsAuthenticatedAsync { uid => + request => + val assetFolderString = Await.result(assetFolderForProject(projectId), Duration.Inf).toString + val fileName = Paths.get(assetFolderString).getFileName.toString + val parentDir = Paths.get(assetFolderString).getParent.toString + rabbitMqSend ! FixEvent(true, false, fileName, parentDir) + Future(Ok(Json.obj("status" -> "ok", "detail" -> "Fix permissions run."))) } def deleteDataRunner(projectId: Int, delay: Int, pluto: Boolean, file: Boolean, backups: Boolean, pTR: Boolean, deliverables: Boolean, sAN: Boolean, matrix: Boolean, s3: Boolean, buckets: Array[String], bucketBooleans: Array[Boolean]): Unit = { @@ -601,18 +636,18 @@ class ProjectEntryController @Inject() (@Named("project-creation-actor") project fileEntryDAO .deleteFromDisk(entry) .andThen(_ => fileEntryDAO.deleteRecord(entry)) - if(entry.filepath.endsWith(".cpr")) { + if (entry.filepath.endsWith(".cpr")) { db.run( TableQuery[ProjectMetadataRow] - .filter(_.key===ProjectMetadata.ASSET_FOLDER_KEY) - .filter(_.projectRef===projectId) + .filter(_.key === ProjectMetadata.ASSET_FOLDER_KEY) + .filter(_.projectRef === projectId) .result - ).map(results=>{ + ).map(results => { val resultCount = results.length - if(resultCount==0){ + if (resultCount == 0) { logger.info(s"No asset folder registered for that project id.") } else { - logger.info(s"Found the asset folder at: ${results.head.value.get} Attempting to delete any Cubase files present." ) + logger.info(s"Found the asset folder at: ${results.head.value.get} Attempting to delete any Cubase files present.") for { files <- Option(new File(results.head.value.get).listFiles) file <- files if file.getName.endsWith(".cpr") @@ -623,8 +658,8 @@ class ProjectEntryController @Inject() (@Named("project-creation-actor") project logger.error(s"Could not look up asset folder for project id $projectId: ", err) }) } - }) - } + }) + } ) case Failure(error) => logger.error(s"Could not look up project entry for ${projectId}: ", error) @@ -658,11 +693,11 @@ class ProjectEntryController @Inject() (@Named("project-creation-actor") project } } - val xtensionXtractor="^(.*)\\.([^.]+)$".r + val xtensionXtractor = "^(.*)\\.([^.]+)$".r - def removeProjectFileExtension(projectFileName:String) = projectFileName match { - case xtensionXtractor(barePath,_)=>barePath - case _=> + def removeProjectFileExtension(projectFileName: String) = projectFileName match { + case xtensionXtractor(barePath, _) => barePath + case _ => logger.warn(s"The project file '$projectFileName' does not appear to have a file extension") projectFileName } @@ -686,7 +721,7 @@ class ProjectEntryController @Inject() (@Named("project-creation-actor") project case None => logger.info(s"Attempt at loading storage data failed.") } - case Failure(err)=> + case Failure(err) => logger.error(s"Attempt at loading storage data failed.", err) }) }) @@ -706,7 +741,7 @@ class ProjectEntryController @Inject() (@Named("project-creation-actor") project def deleteS3() = Future { if (s3) { - for((bucket,i) <- buckets.view.zipWithIndex) { + for ((bucket, i) <- buckets.view.zipWithIndex) { if (bucketBooleans(i)) { val assetFolderString = Await.result(assetFolderForProject(projectId), Duration.Inf).toString logger.info(s"Asset folder for project: $assetFolderString") @@ -773,8 +808,8 @@ class ProjectEntryController @Inject() (@Named("project-creation-actor") project Executors.newWorkStealingPool(10) ) ) - implicit lazy val actorSystem:ActorSystem = ActorSystem("pluto-core-delete", defaultExecutionContext=Some(executionContext)) - implicit lazy val mat:Materializer = Materializer(actorSystem) + implicit lazy val actorSystem: ActorSystem = ActorSystem("pluto-core-delete", defaultExecutionContext = Some(executionContext)) + implicit lazy val mat: Materializer = Materializer(actorSystem) implicit lazy val vidispineCommunicator = new VidispineCommunicator(vidispineConfig) val vidispineMethodOut = Await.result(onlineFilesByProject(vidispineCommunicator, projectId), 120.seconds) vidispineMethodOut.map(onlineOutputMessage => { @@ -794,10 +829,10 @@ class ProjectEntryController @Inject() (@Named("project-creation-actor") project def nearlineFilesByProject(vault: Vault, projectId: String): Future[Seq[OnlineOutputMessage]] = { val sinkFactory = Sink.seq[OnlineOutputMessage] Source.fromGraph(new OMFastContentSearchSource(vault, - s"""GNM_PROJECT_ID:\"$projectId\"""", - Array("MXFS_PATH", "MXFS_FILENAME", "GNM_PROJECT_ID", "GNM_TYPE", "__mxs__length") - ) - ).filterNot(isBrandingMatrix) + s"""GNM_PROJECT_ID:\"$projectId\"""", + Array("MXFS_PATH", "MXFS_FILENAME", "GNM_PROJECT_ID", "GNM_TYPE", "__mxs__length") + ) + ).filterNot(isBrandingMatrix) .map(InternalOnlineOutputMessage.toOnlineOutputMessage) .toMat(sinkFactory)(Keep.right) .run() @@ -828,18 +863,18 @@ class ProjectEntryController @Inject() (@Named("project-creation-actor") project implicit val db = dbConfig.db MatrixDeleteJobDAO.getOrCreate(projectId, "Started") lazy val matrixStoreConfig = new MatrixStoreEnvironmentConfigProvider().get() match { - case Left(err)=> + case Left(err) => logger.error(s"Could not initialise due to incorrect matrix-store config: $err") sys.exit(1) - case Right(config)=>config + case Right(config) => config } implicit lazy val executionContext = new MdcExecutionContext( ExecutionContext.fromExecutor( Executors.newWorkStealingPool(10) ) ) - implicit lazy val actorSystem:ActorSystem = ActorSystem("pluto-core-delete-matrix", defaultExecutionContext=Some(executionContext)) - implicit lazy val mat:Materializer = Materializer(actorSystem) + implicit lazy val actorSystem: ActorSystem = ActorSystem("pluto-core-delete-matrix", defaultExecutionContext = Some(executionContext)) + implicit lazy val mat: Materializer = Materializer(actorSystem) val connectionIdleTime = sys.env.getOrElse("CONNECTION_MAX_IDLE", "750").toInt implicit val matrixStore = new MXSConnectionBuilderImpl( hosts = matrixStoreConfig.hosts, @@ -930,53 +965,55 @@ class ProjectEntryController @Inject() (@Named("project-creation-actor") project def deleteData(projectId: Int) = IsAdmin { uid => request => - logger.info(s"Got a delete data request for project ${projectId}.") - logger.info(s"Pluto value is: ${request.body.asJson.get("pluto")}") - logger.info(s"File value is: ${request.body.asJson.get("file")}") - logger.info(s"Backups value is: ${request.body.asJson.get("backups")}") - logger.info(s"PTR value is: ${request.body.asJson.get("PTR")}") - logger.info(s"Deliverables value is: ${request.body.asJson.get("deliverables")}") - logger.info(s"SAN value is: ${request.body.asJson.get("SAN")}") - logger.info(s"Matrix value is: ${request.body.asJson.get("matrix")}") - logger.info(s"S3 value is: ${request.body.asJson.get("S3")}") - logger.info(s"Buckets value is: ${request.body.asJson.get("buckets")}") - logger.info(s"Bucket Booleans value is: ${request.body.asJson.get("bucketBooleans")}") - deleteDataRunner(projectId, 0, request.body.asJson.get("pluto").toString().toBoolean, request.body.asJson.get("file").toString().toBoolean, request.body.asJson.get("backups").toString().toBoolean, request.body.asJson.get("PTR").toString().toBoolean, request.body.asJson.get("deliverables").toString().toBoolean, request.body.asJson.get("SAN").toString().toBoolean, request.body.asJson.get("matrix").toString().toBoolean, request.body.asJson.get("S3").toString().toBoolean, request.body.asJson.get("buckets").validate[Array[String]].get, request.body.asJson.get("bucketBooleans").validate[Array[Boolean]].get) - Ok(Json.obj("status"->"ok","detail"->"Delete data run.")) + logger.info(s"Got a delete data request for project ${projectId}.") + logger.info(s"Pluto value is: ${request.body.asJson.get("pluto")}") + logger.info(s"File value is: ${request.body.asJson.get("file")}") + logger.info(s"Backups value is: ${request.body.asJson.get("backups")}") + logger.info(s"PTR value is: ${request.body.asJson.get("PTR")}") + logger.info(s"Deliverables value is: ${request.body.asJson.get("deliverables")}") + logger.info(s"SAN value is: ${request.body.asJson.get("SAN")}") + logger.info(s"Matrix value is: ${request.body.asJson.get("matrix")}") + logger.info(s"S3 value is: ${request.body.asJson.get("S3")}") + logger.info(s"Buckets value is: ${request.body.asJson.get("buckets")}") + logger.info(s"Bucket Booleans value is: ${request.body.asJson.get("bucketBooleans")}") + deleteDataRunner(projectId, 0, request.body.asJson.get("pluto").toString().toBoolean, request.body.asJson.get("file").toString().toBoolean, request.body.asJson.get("backups").toString().toBoolean, request.body.asJson.get("PTR").toString().toBoolean, request.body.asJson.get("deliverables").toString().toBoolean, request.body.asJson.get("SAN").toString().toBoolean, request.body.asJson.get("matrix").toString().toBoolean, request.body.asJson.get("S3").toString().toBoolean, request.body.asJson.get("buckets").validate[Array[String]].get, request.body.asJson.get("bucketBooleans").validate[Array[Boolean]].get) + Ok(Json.obj("status" -> "ok", "detail" -> "Delete data run.")) } - def deleteJob(projectId: Int) = IsAdminAsync { uid => request => - dbConfig.db.run( - TableQuery[DeleteJob].filter(_.projectEntry===projectId).result - ).map(_.headOption match { - case Some(jobRecord)=> - Ok(Json.obj("status"->"ok","job_status"->jobRecord.status)) - case None=> - NotFound(Json.obj("status"->"notfound","detail"->s"No job with project id: $projectId")) - }).recover({ - case err:Throwable=> - logger.error(s"Could not look up project $projectId: ", err) - InternalServerError(Json.obj("status"->"error","detail"->"Database error looking up job, see server logs")) - }) + def deleteJob(projectId: Int) = IsAdminAsync { uid => + request => + dbConfig.db.run( + TableQuery[DeleteJob].filter(_.projectEntry === projectId).result + ).map(_.headOption match { + case Some(jobRecord) => + Ok(Json.obj("status" -> "ok", "job_status" -> jobRecord.status)) + case None => + NotFound(Json.obj("status" -> "notfound", "detail" -> s"No job with project id: $projectId")) + }).recover({ + case err: Throwable => + logger.error(s"Could not look up project $projectId: ", err) + InternalServerError(Json.obj("status" -> "error", "detail" -> "Database error looking up job, see server logs")) + }) } - def matrixDeleteJob(projectId: Int) = IsAdminAsync { uid => request => - dbConfig.db.run( - TableQuery[MatrixDeleteJob].filter(_.projectEntry===projectId).result - ).map(_.headOption match { - case Some(jobRecord)=> - Ok(Json.obj("status"->"ok","job_status"->jobRecord.status)) - case None=> - NotFound(Json.obj("status"->"notfound","detail"->s"No job with project id: $projectId")) - }).recover({ - case err:Throwable=> - logger.error(s"Could not look up project $projectId: ", err) - InternalServerError(Json.obj("status"->"error","detail"->"Database error looking up job, see server logs")) - }) + def matrixDeleteJob(projectId: Int) = IsAdminAsync { uid => + request => + dbConfig.db.run( + TableQuery[MatrixDeleteJob].filter(_.projectEntry === projectId).result + ).map(_.headOption match { + case Some(jobRecord) => + Ok(Json.obj("status" -> "ok", "job_status" -> jobRecord.status)) + case None => + NotFound(Json.obj("status" -> "notfound", "detail" -> s"No job with project id: $projectId")) + }).recover({ + case err: Throwable => + logger.error(s"Could not look up project $projectId: ", err) + InternalServerError(Json.obj("status" -> "error", "detail" -> "Database error looking up job, see server logs")) + }) } def getProjectsForCommission(commission: Int) = dbConfig.db.run( - TableQuery[ProjectEntryRow].filter(_.commission===commission).sortBy(_.created.desc).result + TableQuery[ProjectEntryRow].filter(_.commission === commission).sortBy(_.created.desc).result ).map(Success(_)).recover(Failure(_)) def deleteCommissionData(commissionId: Int) = IsAdmin { uid => @@ -997,7 +1034,7 @@ class ProjectEntryController @Inject() (@Named("project-creation-actor") project implicit val db = dbConfig.db getProjectsForCommission(commissionId).map({ - case Success(result)=> + case Success(result) => result.map((project) => { logger.info(s"Found project ${project.id.get}.") deleteDataRunner(project.id.get, 400, request.body.asJson.get("pluto").toString().toBoolean, request.body.asJson.get("file").toString().toBoolean, request.body.asJson.get("backups").toString().toBoolean, request.body.asJson.get("PTR").toString().toBoolean, request.body.asJson.get("deliverables").toString().toBoolean, request.body.asJson.get("SAN").toString().toBoolean, request.body.asJson.get("matrix").toString().toBoolean, request.body.asJson.get("S3").toString().toBoolean, request.body.asJson.get("buckets").validate[Array[String]].get, request.body.asJson.get("bucketBooleans").validate[Array[Boolean]].get) @@ -1017,58 +1054,122 @@ class ProjectEntryController @Inject() (@Named("project-creation-actor") project logger.error(s"Could not look up commission entry for ${commissionId}: ") }) } - case Failure(error)=> + case Failure(error) => logger.error(error.toString) }) - Ok(Json.obj("status"->"ok","detail"->"Delete data run.")) + Ok(Json.obj("status" -> "ok", "detail" -> "Delete data run.")) } - def assetFolderFilesList(requestedId: Int, allVersions: Boolean) = IsAuthenticatedAsync {uid=>{request=> + def assetFolderFilesList(requestedId: Int, allVersions: Boolean) = IsAuthenticatedAsync { uid => { request => implicit val db = dbConfig.db selectid(requestedId).flatMap({ - case Failure(error)=> - logger.error(s"Could not list files from project ${requestedId}",error) - Future(InternalServerError(Json.obj("status"->"error","detail"->error.toString))) - case Success(someSeq)=> + case Failure(error) => + logger.error(s"Could not list files from project ${requestedId}", error) + Future(InternalServerError(Json.obj("status" -> "error", "detail" -> error.toString))) + case Success(someSeq) => someSeq.headOption match { //matching on pk, so can only be one result - case Some(projectEntry)=> - projectEntry.associatedAssetFolderFiles(allVersions, implicitConfig).map(fileList=>Ok(Json.obj("status"->"ok","files"->fileList))) - case None=> - Future(NotFound(Json.obj("status"->"error","detail"->s"project $requestedId not found"))) + case Some(projectEntry) => + projectEntry.associatedAssetFolderFiles(allVersions, implicitConfig).map(fileList => Ok(Json.obj("status" -> "ok", "files" -> fileList))) + case None => + Future(NotFound(Json.obj("status" -> "error", "detail" -> s"project $requestedId not found"))) } }) - }} + } + } + + def logAssetFolderContents(projectId: Int): Unit = { + val assetFolderPath = Await.result(assetFolderForProject(projectId), Duration.Inf).toString + + def logFilesInDirectory(directoryPath: String): Unit = { + val directory = new File(directoryPath) + val directoryContents = directory.listFiles() - def fileDownload(requestedId: Int) = IsAuthenticatedAsync {uid=>{request=> + directoryContents.foreach { item => + if (item.isFile) { + logger.info(s"File: ${item.getAbsolutePath}") + } else if (item.isDirectory) { + logger.info(s"Directory: ${item.getAbsolutePath}") + logFilesInDirectory(item.getAbsolutePath) + } + } + } + + Try(logFilesInDirectory(assetFolderPath)).recover { + case e: Exception => logger.error(s"Error while logging asset folder contents: ${e.getMessage}") + } + } + + def fileDownload(requestedId: Int): Action[AnyContent] = Action.async { request => + logger.info(s"Attempting to zip files for project ${requestedId}") + + lazy val vidispineConfig = VidispineConfig.fromEnvironment.toOption.get + implicit lazy val executionContext = new MdcExecutionContext( + ExecutionContext.fromExecutor( + Executors.newWorkStealingPool(10) + ) + ) + implicit lazy val actorSystem: ActorSystem = ActorSystem("pluto-core-delete", defaultExecutionContext = Some(executionContext)) + implicit lazy val mat: Materializer = Materializer(actorSystem) + implicit lazy val vidispineCommunicator = new VidispineCommunicator(vidispineConfig) + val vidispineMethodOut = Await.result(vidispineCommunicator.getFilesOfProject(requestedId), 120.seconds) + + vidispineMethodOut.map(_.filePath).map(filePath => { + logger.info(s"Vidispine file paths for project are: $filePath") + }) + + logger.info(s"Got a download request for project $requestedId") implicit val db = dbConfig.db + logAssetFolderContents(requestedId) - selectid(requestedId).flatMap({ - case Failure(error)=> - logger.error(s"Could not download file for project ${requestedId}",error) - Future(InternalServerError(Json.obj("status"->"error","detail"->error.toString))) - case Success(someSeq)=> - someSeq.headOption match { - case Some(projectEntry)=> + selectid(requestedId).flatMap { + case Failure(error) => + logger.error(s"Could not download file for project $requestedId", error) + Future.successful(InternalServerError(Json.obj("status" -> "error", "detail" -> error.toString))) + case Success(someSeq) => + someSeq.headOption match { + case Some(projectEntry) => val fileData = for { - f1 <- projectEntry.associatedFiles(false).map(fileList=>fileList(0)) + f1 <- projectEntry.associatedFiles(false).map(_.head) f2 <- f1.getFullPath } yield (f1, f2) - val (fileEntry, fullPath) = (fileData.map(_._1), fileData.map(_._2)) + val (fileEntryFuture, fullPathFuture) = (fileData.map(_._1), fileData.map(_._2)) + + val combinedFuture = for { + fileEntryData <- fileEntryFuture + fullPathData <- fullPathFuture + } yield (fileEntryData, fullPathData) - val fileEntryData = Await.result(fileEntry, Duration(10, TimeUnit.SECONDS)) - val fullPathData = Await.result(fullPath, Duration(10, TimeUnit.SECONDS)) + combinedFuture.flatMap { case (_, fullPathData) => + if (!Files.exists(Paths.get(fullPathData))) { + val errorMessage = s"Project file not found at path: $fullPathData" + logger.error(errorMessage) + Future.successful(InternalServerError(Json.obj("status" -> "error", "detail" -> errorMessage))) + } else { + val assetFolderPathFuture = assetFolderForProject(requestedId) + assetFolderPathFuture.flatMap { assetFolderPath => + val source: Source[ByteString, _] = zipService.zipProjectAndAssets(fullPathData, assetFolderPath.toString) + val response = Ok.sendEntity(HttpEntity.Streamed(source, None, Some("application/zip"))) + .withHeaders("Content-Disposition" -> s"""attachment; filename="archive.zip"""") + Future.successful(response).recover { + case ex: Exception => + logger.error("Error processing file download", ex) + InternalServerError(Json.obj("status" -> "error", "detail" -> "Error processing file download")) + } + } + } + }.recover { + case ex: Exception => + logger.error("Error processing file download", ex) + InternalServerError(Json.obj("status" -> "error", "detail" -> "Error processing file download")) + } - Future(Ok.sendFile( - content = new java.io.File(fullPathData), - fileName = _ => Some(fileEntryData.filepath) - )) - case None=> - Future(NotFound(Json.obj("status"->"error","detail"->s"Project $requestedId not found"))) + case None => + Future.successful(NotFound(Json.obj("status" -> "error", "detail" -> s"Project $requestedId not found"))) } - }) - }} + } + } } diff --git a/app/services/ZipService.scala b/app/services/ZipService.scala new file mode 100644 index 00000000..a4f3668c --- /dev/null +++ b/app/services/ZipService.scala @@ -0,0 +1,55 @@ +package services + +import java.io.File +import java.nio.file.{Files, Paths} +import java.util.zip.{ZipEntry, ZipOutputStream} +import scala.concurrent.Future +import akka.stream.scaladsl.Source +import akka.util.ByteString +import akka.stream.scaladsl.StreamConverters +import javax.inject.Inject + +class ZipService @Inject()() { + + def zipProjectAndAssets(projectFilePath: String, assetFolderPath: String): Source[ByteString, _] = { + import scala.concurrent.ExecutionContext.Implicits.global + StreamConverters.asOutputStream().mapMaterializedValue { os => + Future { + val zipOut = new ZipOutputStream(os) + try { + // Add project file to zip + val projectFile = Paths.get(projectFilePath) + val projectFileEntry = new ZipEntry(projectFile.getFileName.toString) + zipOut.putNextEntry(projectFileEntry) + Files.copy(projectFile, zipOut) + zipOut.closeEntry() + + // Add asset folder contents to zip + def addFolderToZip(folder: File, parentFolder: String): Unit = { + val files = folder.listFiles() + if (files != null && files.nonEmpty) { + files.foreach { file => + val entryName = parentFolder + "/" + file.getName + if (file.isDirectory) { + addFolderToZip(file, entryName) + } else { + zipOut.putNextEntry(new ZipEntry(entryName)) + Files.copy(file.toPath, zipOut) + zipOut.closeEntry() + } + } + } else { + zipOut.putNextEntry(new ZipEntry(parentFolder + "/")) + zipOut.closeEntry() + } + } + + val assetFolder = new File(assetFolderPath) + addFolderToZip(assetFolder, assetFolder.getName) + } finally { + zipOut.close() + } + } + } + } +} \ No newline at end of file diff --git a/build.sbt b/build.sbt index 8eac32e2..3a037006 100644 --- a/build.sbt +++ b/build.sbt @@ -94,6 +94,7 @@ val akkaVersion = "2.8.4" //messaging persistence and clustering libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-persistence" % akkaVersion, + "com.typesafe.akka" %% "akka-stream" % akkaVersion, "com.typesafe.akka" %% "akka-persistence-query" % akkaVersion, "com.github.dnvriend" %% "akka-persistence-jdbc" % "3.5.3", "com.typesafe.akka" %% "akka-actor-typed" % akkaVersion, diff --git a/frontend/app/ProjectEntryList/ProjectEntryEditComponent.tsx b/frontend/app/ProjectEntryList/ProjectEntryEditComponent.tsx index d967dd05..9d29b727 100644 --- a/frontend/app/ProjectEntryList/ProjectEntryEditComponent.tsx +++ b/frontend/app/ProjectEntryList/ProjectEntryEditComponent.tsx @@ -24,7 +24,7 @@ import { updateProjectOpenedStatus, getSimpleProjectTypeData, getMissingFiles, - downloadProjectFile, + downloadProject, } from "./helpers"; import { SystemNotification, @@ -457,7 +457,7 @@ const ProjectEntryEditComponent: React.FC = ( variant="contained" onClick={async () => { try { - await downloadProjectFile(project.id); + await downloadProject(project.id); } catch (error) { SystemNotification.open( SystemNotifcationKind.Error, @@ -467,7 +467,7 @@ const ProjectEntryEditComponent: React.FC = ( } }} > - Download Project File + Download Project ) : null} diff --git a/frontend/app/ProjectEntryList/helpers.ts b/frontend/app/ProjectEntryList/helpers.ts index d9d193fd..6580d3b6 100644 --- a/frontend/app/ProjectEntryList/helpers.ts +++ b/frontend/app/ProjectEntryList/helpers.ts @@ -581,7 +581,7 @@ export const getMissingFiles = async (id: number): Promise => { } }; -export const downloadProjectFile = async (id: number) => { +export const downloadProject = async (id: number) => { const url = `${deploymentRootPath}${API_PROJECTS}/${id}/fileDownload`; const token = localStorage.getItem("pluto:access-token"); @@ -599,24 +599,31 @@ export const downloadProjectFile = async (id: number) => { let filename = ""; - fetch(url, newInit) - .then((response) => { - // @ts-ignore - filename = response.headers - .get("Content-Disposition") - .split('filename="')[1] - .split('";')[0]; + try { + const response = await fetch(url, newInit); - if (filename.substr(filename.length - 1)) { - filename = filename.slice(0, -1); - } + if (!response.ok) { + throw new Error( + `HTTP error! status: ${response.status} ${response.statusText}` + ); + } - return response.blob(); - }) - .then((blob) => { - saveAs(blob, filename); - }) - .catch((err) => { - console.log(err); - }); + // @ts-ignore + filename = response.headers + .get("Content-Disposition") + .split('filename="')[1] + .split('";')[0]; + + if (filename.substr(filename.length - 1)) { + filename = filename.slice(0, -1); + } + + const blob = await response.blob(); + saveAs(blob, filename); + } catch (err) { + console.error(err); + // Display the error to the user + // replace this with your notification or alert system + alert(`An error occurred while downloading the project: ${err.message}`); + } }; diff --git a/test/controllers/ProjectEntryControllerSpec.scala b/test/controllers/ProjectEntryControllerSpec.scala index 25a3c1f5..7645d00e 100644 --- a/test/controllers/ProjectEntryControllerSpec.scala +++ b/test/controllers/ProjectEntryControllerSpec.scala @@ -10,6 +10,7 @@ import play.api.db.slick.DatabaseConfigProvider import play.api.libs.json.JsValue import play.api.test.Helpers._ import play.api.test._ +import services.ZipService import slick.jdbc.JdbcProfile import scala.concurrent.Await @@ -248,4 +249,12 @@ class ProjectEntryControllerSpec extends Specification with utils.BuildMyApp wit resultList.length mustEqual 0 } } +// +// "ProjectEntryController fileDownload" should { +// "Return 200 OK for a valid file download request" in new WithApplication(buildApp) { +// val response = route(app, FakeRequest(GET, "/api/project/1/fileDownload").withSession("uid" -> "testuser")).get +// +// status(response) mustEqual OK +// } +// } } diff --git a/test/services/ZipServiceSpec.scala b/test/services/ZipServiceSpec.scala new file mode 100644 index 00000000..6074a1d3 --- /dev/null +++ b/test/services/ZipServiceSpec.scala @@ -0,0 +1,66 @@ +package services + +import akka.actor.ActorSystem +import akka.stream.Materializer +import org.specs2.mock.Mockito +import org.specs2.mutable.Specification + +import scala.concurrent.duration._ +import akka.stream.scaladsl._ +import akka.util.ByteString +import org.specs2.runner.sbtRun.env.executionContext + +import java.nio.file.{Files, Path} +import java.io._ +import java.util.zip.ZipInputStream +import scala.concurrent.{Await, ExecutionContext} +import scala.util.Try + +class ZipServiceSpec extends Specification with Mockito { + + implicit lazy val actorSystem: ActorSystem = ActorSystem("pluto-core-download", defaultExecutionContext = Some(executionContext)) + implicit lazy val mat: Materializer = Materializer(actorSystem) + + implicit val ec: ExecutionContext = ExecutionContext.Implicits.global + + val zipService = new ZipService() + + "ZipService" should { + + "zip project and assets correctly" in { + // Initialize projectFile and assetDir + val projectFile: Path = Files.createTempFile("project", ".pproj") + val assetDir: Path = Files.createTempDirectory("assets") + + // Write some content to the project file and asset directory + Files.write(projectFile, "Hello, project!".getBytes) + val assetFile = Files.createFile(assetDir.resolve("asset.mp4")) + Files.write(assetFile, "Hello, asset!".getBytes) + + // Call the method under test + val resultSource: Source[ByteString, _] = zipService.zipProjectAndAssets(projectFile.toString, assetDir.toString) + + // Collect the result into a byte array + val resultFuture = resultSource.runFold(ByteString.empty)(_ ++ _) + val resultBytes = Await.result(resultFuture, 5.seconds).toArray + + // Check that the result is a valid zip file containing the expected entries + val zipInputStream = new ZipInputStream(new ByteArrayInputStream(resultBytes)) + + val projectEntry = zipInputStream.getNextEntry + projectEntry.getName must beEqualTo(projectFile.getFileName.toString) + scala.io.Source.fromInputStream(zipInputStream).mkString must beEqualTo("Hello, project!") + + val assetEntry = zipInputStream.getNextEntry + assetEntry.getName must beEqualTo(assetDir.getFileName.toString + "/asset.mp4") + scala.io.Source.fromInputStream(zipInputStream).mkString must beEqualTo("Hello, asset!") + + // Cleanup + Try(Files.deleteIfExists(projectFile)) + Try(Files.deleteIfExists(assetFile)) + Try(Files.deleteIfExists(assetDir)) + + success + } + } +}