diff --git a/common-lib/src/main/scala/com/gu/mediaservice/lib/BaseStore.scala b/common-lib/src/main/scala/com/gu/mediaservice/lib/BaseStore.scala index 0aeb698ccc..5a0f2eb36d 100644 --- a/common-lib/src/main/scala/com/gu/mediaservice/lib/BaseStore.scala +++ b/common-lib/src/main/scala/com/gu/mediaservice/lib/BaseStore.scala @@ -1,7 +1,7 @@ package com.gu.mediaservice.lib import org.apache.pekko.actor.{Cancellable, Scheduler} -import com.gu.mediaservice.lib.aws.S3 +import com.gu.mediaservice.lib.aws.{S3, S3Bucket} import com.gu.mediaservice.lib.config.CommonConfig import com.gu.mediaservice.lib.logging.GridLogging import org.joda.time.DateTime @@ -14,7 +14,7 @@ import scala.concurrent.duration._ import scala.util.control.NonFatal -abstract class BaseStore[TStoreKey, TStoreVal](bucket: String, config: CommonConfig)(implicit ec: ExecutionContext) +abstract class BaseStore[TStoreKey, TStoreVal](bucket: S3Bucket, config: CommonConfig)(implicit ec: ExecutionContext) extends GridLogging { val s3 = new S3(config) @@ -25,15 +25,14 @@ abstract class BaseStore[TStoreKey, TStoreVal](bucket: String, config: CommonCon protected def getS3Object(key: String): Option[String] = s3.getObjectAsString(bucket, key) protected def getLatestS3Stream: Option[InputStream] = { - val objects = s3.client - .listObjects(bucket).getObjectSummaries.asScala + val objects = s3.listObjects(bucket).getObjectSummaries.asScala .filterNot(_.getKey == "AMAZON_SES_SETUP_NOTIFICATION") if (objects.nonEmpty) { val obj = objects.maxBy(_.getLastModified) logger.info(s"Latest key ${obj.getKey} in bucket $bucket") - val stream = s3.client.getObject(bucket, obj.getKey).getObjectContent + val stream = s3.getObject(bucket, obj).getObjectContent Some(stream) } else { logger.error(s"Bucket $bucket is empty") diff --git a/common-lib/src/main/scala/com/gu/mediaservice/lib/ImageIngestOperations.scala b/common-lib/src/main/scala/com/gu/mediaservice/lib/ImageIngestOperations.scala index 96c9412556..c158008cfd 100644 --- a/common-lib/src/main/scala/com/gu/mediaservice/lib/ImageIngestOperations.scala +++ b/common-lib/src/main/scala/com/gu/mediaservice/lib/ImageIngestOperations.scala @@ -1,10 +1,10 @@ package com.gu.mediaservice.lib -import com.amazonaws.services.s3.model.{DeleteObjectsRequest, MultiObjectDeleteException} +import com.amazonaws.services.s3.model.MultiObjectDeleteException import java.io.File import com.gu.mediaservice.lib.config.CommonConfig -import com.gu.mediaservice.lib.aws.S3Object +import com.gu.mediaservice.lib.aws.{S3Bucket, S3Object} import com.gu.mediaservice.lib.logging.LogMarker import com.gu.mediaservice.model.{Instance, MimeType, Png} import com.typesafe.scalalogging.StrictLogging @@ -21,7 +21,7 @@ object ImageIngestOperations { private def snippetForId(id: String) = id.take(6).mkString("/") + "/" + id } -class ImageIngestOperations(imageBucket: String, thumbnailBucket: String, config: CommonConfig, isVersionedS3: Boolean = false) +class ImageIngestOperations(imageBucket: S3Bucket, thumbnailBucket: S3Bucket, config: CommonConfig, isVersionedS3: Boolean = false) extends S3ImageStorage(config) with StrictLogging { import ImageIngestOperations.{fileKeyFromId, optimisedPngKeyFromId} @@ -44,7 +44,7 @@ class ImageIngestOperations(imageBucket: String, thumbnailBucket: String, config private def storeThumbnailImage(storableImage: StorableThumbImage) (implicit logMarker: LogMarker): Future[S3Object] = { val instanceSpecificKey = instanceAwareThumbnailImageKey(storableImage) - logger.info(s"Storing thumbnail to instance specific key: $thumbnailBucket / $instanceSpecificKey") + logger.info(s"Storing thumbnail to instance specific key: ${thumbnailBucket.bucket} / $instanceSpecificKey") storeImage(thumbnailBucket, instanceSpecificKey, storableImage.file, Some(storableImage.mimeType), overwrite = true) } @@ -57,26 +57,45 @@ class ImageIngestOperations(imageBucket: String, thumbnailBucket: String, config overwrite = true) } - private def bulkDelete(bucket: String, keys: List[String]): Future[Map[String, Boolean]] = keys match { + private def bulkDelete(bucket: S3Bucket, keys: List[String]): Future[Map[String, Boolean]] = keys match { case Nil => Future.successful(Map.empty) - case _ => Future { - try { - logger.info(s"Creating S3 bulkDelete request for $bucket / keys: " + keys.mkString(",")) - client.deleteObjects( - new DeleteObjectsRequest(bucket).withKeys(keys: _*) - ) - keys.map { key => - key -> true - }.toMap - } catch { - case partialFailure: MultiObjectDeleteException => - logger.warn(s"Partial failure when deleting images from $bucket: ${partialFailure.getMessage} ${partialFailure.getErrors}") - val errorKeys = partialFailure.getErrors.asScala.map(_.getKey).toSet + case _ => + val bulkDeleteImplemented = bucket.endpoint != "storage.googleapis.com" + if (bulkDeleteImplemented) { + Future { + try { + logger.info(s"Bulk deleting S3 objects from ${bucket.bucket}: " + keys.mkString(",")) + deleteObjects(bucket, keys) + keys.map { key => + key -> true + }.toMap + } catch { + case partialFailure: MultiObjectDeleteException => + logger.warn(s"Partial failure when deleting images from $bucket: ${partialFailure.getMessage} ${partialFailure.getErrors}") + val errorKeys = partialFailure.getErrors.asScala.map(_.getKey).toSet + keys.map { key => + key -> !errorKeys.contains(key) + }.toMap + } + } + + } else { + Future.sequence { keys.map { key => - key -> !errorKeys.contains(key) - }.toMap + Future { + logger.info(s"Deleting S3 objects from ${bucket.bucket}: " + key) + try { + deleteObject(bucket, key) + (key, true) + } catch { + case e: Exception => + logger.debug(s"Failure when deleting images from $bucket: $key, ${e.getMessage}") + (key, false) + } + } + } + }.map(_.toMap) } - } } def deleteOriginal(id: String)(implicit logMarker: LogMarker, instance: Instance): Future[Unit] = if(isVersionedS3) deleteVersionedImage(imageBucket, fileKeyFromId(id)) else deleteImage(imageBucket, fileKeyFromId(id)) @@ -87,7 +106,7 @@ class ImageIngestOperations(imageBucket: String, thumbnailBucket: String, config def deletePNGs(ids: Set[String])(implicit instance: Instance) = bulkDelete(imageBucket, ids.map(id => optimisedPngKeyFromId(id)).toList) def doesOriginalExist(id: String)(implicit instance: Instance): Boolean = - client.doesObjectExist(imageBucket, fileKeyFromId(id)) + doesObjectExist(imageBucket, fileKeyFromId(id)) private def instanceAwareOriginalImageKey(storableImage: StorableOriginalImage) = { fileKeyFromId(storableImage.id)(storableImage.instance) @@ -107,7 +126,7 @@ sealed trait ImageWrapper { val instance: Instance } sealed trait StorableImage extends ImageWrapper { - def toProjectedS3Object(thumbBucket: String): S3Object = S3Object( + def toProjectedS3Object(thumbBucket: S3Bucket): S3Object = S3Object( thumbBucket, ImageIngestOperations.fileKeyFromId(id)(instance), file, @@ -119,7 +138,7 @@ sealed trait StorableImage extends ImageWrapper { case class StorableThumbImage(id: String, file: File, mimeType: MimeType, meta: Map[String, String] = Map.empty, instance: Instance) extends StorableImage case class StorableOriginalImage(id: String, file: File, mimeType: MimeType, lastModified: DateTime, meta: Map[String, String] = Map.empty, instance: Instance) extends StorableImage { - override def toProjectedS3Object(thumbBucket: String): S3Object = S3Object( + override def toProjectedS3Object(thumbBucket: S3Bucket): S3Object = S3Object( thumbBucket, ImageIngestOperations.fileKeyFromId(id)(instance), file, @@ -129,7 +148,7 @@ case class StorableOriginalImage(id: String, file: File, mimeType: MimeType, las ) } case class StorableOptimisedImage(id: String, file: File, mimeType: MimeType, meta: Map[String, String] = Map.empty, instance: Instance) extends StorableImage { - override def toProjectedS3Object(thumbBucket: String): S3Object = S3Object( + override def toProjectedS3Object(thumbBucket: S3Bucket): S3Object = S3Object( thumbBucket, ImageIngestOperations.optimisedPngKeyFromId(id)(instance), file, diff --git a/common-lib/src/main/scala/com/gu/mediaservice/lib/ImageQuarantineOperations.scala b/common-lib/src/main/scala/com/gu/mediaservice/lib/ImageQuarantineOperations.scala index 0cc3a146a0..eaf6b82979 100644 --- a/common-lib/src/main/scala/com/gu/mediaservice/lib/ImageQuarantineOperations.scala +++ b/common-lib/src/main/scala/com/gu/mediaservice/lib/ImageQuarantineOperations.scala @@ -2,13 +2,13 @@ package com.gu.mediaservice.lib import java.io.File import com.gu.mediaservice.lib.config.CommonConfig -import com.gu.mediaservice.lib.aws.S3Object +import com.gu.mediaservice.lib.aws.{S3Bucket, S3Object} import com.gu.mediaservice.lib.logging.LogMarker import com.gu.mediaservice.model.{Instance, MimeType} import scala.concurrent.Future -class ImageQuarantineOperations(quarantineBucket: String, config: CommonConfig, isVersionedS3: Boolean = false) +class ImageQuarantineOperations(quarantineBucket: S3Bucket, config: CommonConfig, isVersionedS3: Boolean = false) extends S3ImageStorage(config) { def storeQuarantineImage(id: String, file: File, mimeType: Option[MimeType], meta: Map[String, String] = Map.empty) diff --git a/common-lib/src/main/scala/com/gu/mediaservice/lib/ImageStorage.scala b/common-lib/src/main/scala/com/gu/mediaservice/lib/ImageStorage.scala index 6312450b97..a6c45c346c 100644 --- a/common-lib/src/main/scala/com/gu/mediaservice/lib/ImageStorage.scala +++ b/common-lib/src/main/scala/com/gu/mediaservice/lib/ImageStorage.scala @@ -2,11 +2,10 @@ package com.gu.mediaservice.lib import java.util.concurrent.Executors import java.io.File - import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ import scala.language.postfixOps -import com.gu.mediaservice.lib.aws.S3Object +import com.gu.mediaservice.lib.aws.{S3Bucket, S3Object} import com.gu.mediaservice.lib.logging.LogMarker import com.gu.mediaservice.model.MimeType @@ -35,9 +34,9 @@ trait ImageStorage { /** Store a copy of the given file and return the URI of that copy. * The file can safely be deleted afterwards. */ - def storeImage(bucket: String, id: String, file: File, mimeType: Option[MimeType], + def storeImage(bucket: S3Bucket, id: String, file: File, mimeType: Option[MimeType], meta: Map[String, String] = Map.empty, overwrite: Boolean) (implicit logMarker: LogMarker): Future[S3Object] - def deleteImage(bucket: String, id: String)(implicit logMarker: LogMarker): Future[Unit] + def deleteImage(bucket: S3Bucket, id: String)(implicit logMarker: LogMarker): Future[Unit] } diff --git a/common-lib/src/main/scala/com/gu/mediaservice/lib/S3ImageStorage.scala b/common-lib/src/main/scala/com/gu/mediaservice/lib/S3ImageStorage.scala index 30b5d4b21c..a4d603f1a6 100644 --- a/common-lib/src/main/scala/com/gu/mediaservice/lib/S3ImageStorage.scala +++ b/common-lib/src/main/scala/com/gu/mediaservice/lib/S3ImageStorage.scala @@ -1,6 +1,6 @@ package com.gu.mediaservice.lib -import com.gu.mediaservice.lib.aws.S3 +import com.gu.mediaservice.lib.aws.{S3, S3Bucket} import com.gu.mediaservice.lib.config.CommonConfig import com.gu.mediaservice.lib.logging.{GridLogging, LogMarker} import com.gu.mediaservice.model.MimeType @@ -14,10 +14,10 @@ import scala.concurrent.Future class S3ImageStorage(config: CommonConfig) extends S3(config) with ImageStorage with GridLogging { private val cacheSetting = Some(cacheForever) - def storeImage(bucket: String, id: String, file: File, mimeType: Option[MimeType], + def storeImage(bucket: S3Bucket, id: String, file: File, mimeType: Option[MimeType], meta: Map[String, String] = Map.empty, overwrite: Boolean) (implicit logMarker: LogMarker) = { - logger.info(logMarker, s"bucket: $bucket, id: $id, meta: $meta") + logger.info(logMarker, s"storeImage to bucket: ${bucket.bucket}, id: $id, meta: $meta") val eventualObject = if (overwrite) { store(bucket, id, file, mimeType, meta, cacheSetting) } else { @@ -27,21 +27,21 @@ class S3ImageStorage(config: CommonConfig) extends S3(config) with ImageStorage eventualObject } - def deleteImage(bucket: String, id: String)(implicit logMarker: LogMarker) = Future { - client.deleteObject(bucket, id) + def deleteImage(bucket: S3Bucket, id: String)(implicit logMarker: LogMarker) = Future { + deleteObject(bucket, id) logger.info(logMarker, s"Deleted image $id from bucket $bucket") } - def deleteVersionedImage(bucket: String, id: String)(implicit logMarker: LogMarker) = Future { - val objectVersion = client.getObjectMetadata(bucket, id).getVersionId - client.deleteVersion(bucket, id, objectVersion) + def deleteVersionedImage(bucket: S3Bucket, id: String)(implicit logMarker: LogMarker) = Future { + val objectVersion = getObjectMetadata(bucket, id).getVersionId + deleteVersion(bucket, id, objectVersion) logger.info(logMarker, s"Deleted image $id from bucket $bucket (version: $objectVersion)") } - def deleteFolder(bucket: String, id: String)(implicit logMarker: LogMarker) = Future { - val files = client.listObjects(bucket, id).getObjectSummaries.asScala + def deleteFolder(bucket: S3Bucket, id: String)(implicit logMarker: LogMarker) = Future { + val files = listObjects(bucket, id).getObjectSummaries.asScala logger.info(s"Found ${files.size} files to delete in folder $id") - files.foreach(file => client.deleteObject(bucket, file.getKey)) + files.foreach(file => deleteObject(bucket, file.getKey)) logger.info(logMarker, s"Deleting images in folder $id from bucket $bucket") } diff --git a/common-lib/src/main/scala/com/gu/mediaservice/lib/auth/KeyStore.scala b/common-lib/src/main/scala/com/gu/mediaservice/lib/auth/KeyStore.scala index bbb9da0ca1..ebc43bf09c 100644 --- a/common-lib/src/main/scala/com/gu/mediaservice/lib/auth/KeyStore.scala +++ b/common-lib/src/main/scala/com/gu/mediaservice/lib/auth/KeyStore.scala @@ -1,13 +1,13 @@ package com.gu.mediaservice.lib.auth import com.gu.mediaservice.lib.BaseStore +import com.gu.mediaservice.lib.aws.S3Bucket import com.gu.mediaservice.lib.config.CommonConfig import com.gu.mediaservice.model.Instance -import scala.jdk.CollectionConverters._ import scala.concurrent.ExecutionContext -class KeyStore(bucket: String, config: CommonConfig)(implicit ec: ExecutionContext) +class KeyStore(bucket: S3Bucket, config: CommonConfig)(implicit ec: ExecutionContext) extends BaseStore[String, ApiAccessor](bucket, config)(ec) { def lookupIdentity(key: String)(implicit instance: Instance): Option[ApiAccessor] = store.get().get(instance.id + "/" + key) @@ -17,7 +17,6 @@ class KeyStore(bucket: String, config: CommonConfig)(implicit ec: ExecutionConte } private def fetchAll: Map[String, ApiAccessor] = { - val keys = s3.client.listObjects(bucket).getObjectSummaries.asScala.map(_.getKey) - keys.flatMap(k => getS3Object(k).map(k -> ApiAccessor(_))).toMap + s3.listObjectKeys(bucket).flatMap(k => getS3Object(k).map(k -> ApiAccessor(_))).toMap } } diff --git a/common-lib/src/main/scala/com/gu/mediaservice/lib/aws/S3.scala b/common-lib/src/main/scala/com/gu/mediaservice/lib/aws/S3.scala index 04628b72e7..628b4db8b7 100644 --- a/common-lib/src/main/scala/com/gu/mediaservice/lib/aws/S3.scala +++ b/common-lib/src/main/scala/com/gu/mediaservice/lib/aws/S3.scala @@ -1,5 +1,7 @@ package com.gu.mediaservice.lib.aws +import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials} +import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration import com.amazonaws.services.s3.model._ import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder, model} import com.amazonaws.util.IOUtils @@ -7,7 +9,7 @@ import com.amazonaws.{AmazonServiceException, ClientConfiguration} import com.gu.mediaservice.lib.config.CommonConfig import com.gu.mediaservice.lib.logging.{GridLogging, LogMarker, Stopwatch} import com.gu.mediaservice.model._ -import org.joda.time.{DateTime, Duration} +import org.joda.time.DateTime import java.io.File import java.net.{URI, URL} @@ -17,15 +19,11 @@ import scala.concurrent.{ExecutionContext, Future} case class S3Object(uri: URI, size: Long, metadata: S3Metadata) object S3Object { - def objectUrl(bucket: String, key: String): URI = { - val bucketUrl = s"$bucket.${S3Ops.s3Endpoint}" - new URI("http", bucketUrl, s"/$key", null) - } - def apply(bucket: String, key: String, size: Long, metadata: S3Metadata): S3Object = - apply(objectUrl(bucket, key), size, metadata) + def apply(bucket: S3Bucket, key: String, size: Long, metadata: S3Metadata): S3Object = + apply(bucket.objectUrl(key), size, metadata) - def apply(bucket: String, key: String, file: File, mimeType: Option[MimeType], lastModified: Option[DateTime], + def apply(bucket: S3Bucket, key: String, file: File, mimeType: Option[MimeType], lastModified: Option[DateTime], meta: Map[String, String] = Map.empty, cacheControl: Option[String] = None): S3Object = { S3Object( bucket, @@ -61,42 +59,78 @@ object S3Metadata { case class S3ObjectMetadata(contentType: Option[MimeType], cacheControl: Option[String], lastModified: Option[DateTime]) class S3(config: CommonConfig) extends GridLogging with ContentDisposition with RoundedExpiration { - type Bucket = String type Key = String type UserMetadata = Map[String, String] - lazy val client: AmazonS3 = S3Ops.buildS3Client(config) - // also create a legacy client that uses v2 signatures for URL signing - private lazy val legacySigningClient: AmazonS3 = S3Ops.buildS3Client(config, forceV2Sigs = true) - - def signUrl(bucket: Bucket, url: URI, image: Image, expiration: DateTime = cachableExpiration(), imageType: ImageFileType = Source): String = { - // get path and remove leading `/` - val key: Key = url.getPath.drop(1) + val AmazonAwsS3Endpoint: String = S3.AmazonAwsS3Endpoint + + private val amazonS3: AmazonS3 = S3Ops.buildS3Client(config, forceV2Sigs = true) + private val googleS3: Option[AmazonS3] = S3Ops.buildGoogleS3Client(config) + private val localS3: Option[AmazonS3] = S3Ops.buildLocalS3Client(config) + + private def clientFor(bucket: S3Bucket): AmazonS3 = { + (bucket.endpoint match { + case "storage.googleapis.com" => + googleS3 + case "minio.griddev.eelpieconsulting.co.uk" => + localS3 + case _ => + Some(amazonS3) + }).getOrElse { + amazonS3 + } + } + def signUrl(bucket: S3Bucket, key: String, image: Image, expiration: DateTime = cachableExpiration(), imageType: ImageFileType = Source): String = { val contentDisposition = getContentDisposition(image, imageType, config.shortenDownloadFilename) val headers = new ResponseHeaderOverrides().withContentDisposition(contentDisposition) - val request = new GeneratePresignedUrlRequest(bucket, key).withExpiration(expiration.toDate).withResponseHeaders(headers) - legacySigningClient.generatePresignedUrl(request).toExternalForm + val request = new GeneratePresignedUrlRequest(bucket.bucket, key).withExpiration(expiration.toDate).withResponseHeaders(headers) + clientFor(bucket).generatePresignedUrl(request).toExternalForm + } + + def signUrlTony(bucket: S3Bucket, key: String, expiration: DateTime = cachableExpiration()): URL = { + val request = new GeneratePresignedUrlRequest(bucket.bucket, key).withExpiration(expiration.toDate) + clientFor(bucket).generatePresignedUrl(request) + } + + def copyObject(sourceBucket: S3Bucket, destinationBucket: S3Bucket, key: String): CopyObjectResult = { + clientFor(sourceBucket).copyObject(sourceBucket.bucket, key, destinationBucket.bucket, key) + } + + def generatePresignedRequest(request: GeneratePresignedUrlRequest, bucket: S3Bucket): URL = { + clientFor(bucket).generatePresignedUrl(request) } - def signUrlTony(bucket: Bucket, url: URI, expiration: DateTime = cachableExpiration()): URL = { - // get path and remove leading `/` - val key: Key = url.getPath.drop(1) + def deleteObject(bucket: S3Bucket, key: String): Unit = { + clientFor(bucket).deleteObject(bucket.bucket, key) + } + + def deleteObjects(bucket: S3Bucket, keys: Seq[String]): DeleteObjectsResult = { + clientFor(bucket).deleteObjects( + new DeleteObjectsRequest(bucket.bucket).withKeys(keys: _*) + ) + } + + def deleteVersion(bucket: S3Bucket, id: String, objectVersion: String): Unit = { + clientFor(bucket).deleteVersion(bucket.bucket, id, objectVersion) + } + + def doesObjectExist(bucket: S3Bucket, key: String) = { + clientFor(bucket).doesObjectExist(bucket.bucket, key) + } - val request = new GeneratePresignedUrlRequest(bucket, key).withExpiration(expiration.toDate) - legacySigningClient.generatePresignedUrl(request) + def getObject(bucket: S3Bucket, key: String): model.S3Object = { + clientFor(bucket).getObject(new GetObjectRequest(bucket.bucket, key)) } - def getObject(bucket: Bucket, url: URI): model.S3Object = { - // get path and remove leading `/` - val key: Key = url.getPath.drop(1) - client.getObject(new GetObjectRequest(bucket, key)) + def getObject(bucket: S3Bucket, obj: S3ObjectSummary): model.S3Object = { + clientFor(bucket).getObject(bucket.bucket, obj.getKey) } - def getObjectAsString(bucket: Bucket, key: String): Option[String] = { - val content = client.getObject(new GetObjectRequest(bucket, key)) + def getObjectAsString(bucket: S3Bucket, key: String): Option[String] = { + val content = clientFor(bucket).getObject(new GetObjectRequest(bucket.bucket, key)) val stream = content.getObjectContent try { Some(IOUtils.toString(stream).trim) @@ -110,7 +144,31 @@ class S3(config: CommonConfig) extends GridLogging with ContentDisposition with } } - def store(bucket: Bucket, id: Key, file: File, mimeType: Option[MimeType], meta: UserMetadata = Map.empty, cacheControl: Option[String] = None) + def getObjectMetadata(bucket: S3Bucket, id: String): ObjectMetadata = { + clientFor(bucket).getObjectMetadata(bucket.bucket, id) + } + + def listObjects(bucket: S3Bucket): ObjectListing = { + clientFor(bucket).listObjects(bucket.bucket) + } + + def listObjects(bucket: S3Bucket, prefix: String): ObjectListing = { + clientFor(bucket).listObjects(bucket.bucket, prefix) + } + + def listObjects(bucket: S3Bucket, request: ListObjectsRequest): ObjectListing = { + clientFor(bucket).listObjects(request) + } + + def listObjectKeys(bucket: S3Bucket): Seq[String] = { + clientFor(bucket).listObjects(bucket.bucket).getObjectSummaries.asScala.map(_.getKey).toSeq + } + + def putObject(bucket: S3Bucket, key: String, content: String): Unit = { + clientFor(bucket).putObject(bucket.bucket, key, content) + } + + def store(bucket: S3Bucket, id: Key, file: File, mimeType: Option[MimeType], meta: UserMetadata = Map.empty, cacheControl: Option[String] = None) (implicit ex: ExecutionContext, logMarker: LogMarker): Future[S3Object] = Future { val metadata = new ObjectMetadata @@ -125,19 +183,20 @@ class S3(config: CommonConfig) extends GridLogging with ContentDisposition with ) val markers = logMarker ++ fileMarkers - val req = new PutObjectRequest(bucket, id, file).withMetadata(metadata) + val req = new PutObjectRequest(bucket.bucket, id, file).withMetadata(metadata) Stopwatch(s"S3 client.putObject ($req)"){ + val client = clientFor(bucket) client.putObject(req) // once we've completed the PUT read back to ensure that we are returning reality - val metadata = client.getObjectMetadata(bucket, id) + val metadata = client.getObjectMetadata(bucket.bucket, id) S3Object(bucket, id, metadata.getContentLength, S3Metadata(metadata)) }(markers) } - def storeIfNotPresent(bucket: Bucket, id: Key, file: File, mimeType: Option[MimeType], meta: UserMetadata = Map.empty, cacheControl: Option[String] = None) + def storeIfNotPresent(bucket: S3Bucket, id: Key, file: File, mimeType: Option[MimeType], meta: UserMetadata = Map.empty, cacheControl: Option[String] = None) (implicit ex: ExecutionContext, logMarker: LogMarker): Future[S3Object] = { Future{ - Some(client.getObjectMetadata(bucket, id)) + Some(clientFor(bucket).getObjectMetadata(bucket.bucket, id)) }.recover { // translate this exception into the object not existing case as3e:AmazonS3Exception if as3e.getStatusCode == 404 => None @@ -150,11 +209,11 @@ class S3(config: CommonConfig) extends GridLogging with ContentDisposition with } } - def list(bucket: Bucket, prefixDir: String) + def list(bucket: S3Bucket, prefixDir: String) (implicit ex: ExecutionContext): Future[List[S3Object]] = Future { - val req = new ListObjectsRequest().withBucketName(bucket).withPrefix(s"$prefixDir/") - val listing = client.listObjects(req) + val req = new ListObjectsRequest().withBucketName(bucket.bucket).withPrefix(s"$prefixDir/") + val listing = clientFor(bucket).listObjects(req) val summaries = listing.getObjectSummaries.asScala summaries.map(summary => (summary.getKey, summary)).foldLeft(List[S3Object]()) { case (memo: List[S3Object], (key: String, summary: S3ObjectSummary)) => @@ -162,17 +221,17 @@ class S3(config: CommonConfig) extends GridLogging with ContentDisposition with } } - def getMetadata(bucket: Bucket, key: Key): S3Metadata = { - val meta = client.getObjectMetadata(bucket, key) + def getMetadata(bucket: S3Bucket, key: Key): S3Metadata = { + val meta = clientFor(bucket).getObjectMetadata(bucket.bucket, key) S3Metadata(meta) } - def getUserMetadata(bucket: Bucket, key: Key): Map[Bucket, Bucket] = - client.getObjectMetadata(bucket, key).getUserMetadata.asScala.toMap + def getUserMetadata(bucket: S3Bucket, key: Key): Map[String, String] = + clientFor(bucket).getObjectMetadata(bucket.bucket, key).getUserMetadata.asScala.toMap - def syncFindKey(bucket: Bucket, prefixName: String): Option[Key] = { - val req = new ListObjectsRequest().withBucketName(bucket).withPrefix(s"$prefixName-") - val listing = client.listObjects(req) + def syncFindKey(bucket: S3Bucket, prefixName: String): Option[Key] = { + val req = new ListObjectsRequest().withBucketName(bucket.bucket).withPrefix(s"$prefixName-") + val listing = clientFor(bucket).listObjects(req) val summaries = listing.getObjectSummaries.asScala summaries.headOption.map(_.getKey) } @@ -180,9 +239,44 @@ class S3(config: CommonConfig) extends GridLogging with ContentDisposition with } object S3Ops { - // TODO make this localstack friendly - // TODO: Make this region aware - i.e. RegionUtils.getRegion(region).getServiceEndpoint(AmazonS3.ENDPOINT_PREFIX) - val s3Endpoint = "s3.amazonaws.com" + def buildGoogleS3Client(config: CommonConfig): Option[AmazonS3] = { + config.googleS3AccessKey.flatMap { accessKey => + config.googleS3SecretKey.map { secretKey => + val endpointConfig = new EndpointConfiguration("https://storage.googleapis.com", null) + // create credentials provider + val credentials = new BasicAWSCredentials(accessKey, secretKey) + val credentialsProvider = new AWSStaticCredentialsProvider(credentials) + // create a client config + val clientConfig = new ClientConfiguration() + + val clientBuilder = AmazonS3ClientBuilder.standard() + clientBuilder.setEndpointConfiguration(endpointConfig) + clientBuilder.withCredentials(credentialsProvider) + clientBuilder.withClientConfiguration(clientConfig) + clientBuilder.build() + } + } + } + + def buildLocalS3Client(config: CommonConfig): Option[AmazonS3] = { + config.googleS3AccessKey.flatMap { accessKey => + config.googleS3SecretKey.map { secretKey => + val endpointConfig = new EndpointConfiguration("https://minio.griddev.eelpieconsulting.co.uk", null) + // create credentials provider + val credentials = new BasicAWSCredentials(accessKey, secretKey) + val credentialsProvider = new AWSStaticCredentialsProvider(credentials) + // create a client config + val clientConfig = new ClientConfiguration() + + val clientBuilder = AmazonS3ClientBuilder.standard() + clientBuilder.setEndpointConfiguration(endpointConfig) + clientBuilder.withCredentials(credentialsProvider) + clientBuilder.withClientConfiguration(clientConfig) + clientBuilder.withPathStyleAccessEnabled(true) + clientBuilder.build() + } + } + } def buildS3Client(config: CommonConfig, forceV2Sigs: Boolean = false, localstackAware: Boolean = true, maybeRegionOverride: Option[String] = None): AmazonS3 = { @@ -204,4 +298,9 @@ object S3Ops { config.withAWSCredentials(builder, localstackAware, maybeRegionOverride).build() } + +} + +object S3 { + val AmazonAwsS3Endpoint: String = "s3.amazonaws.com" } diff --git a/common-lib/src/main/scala/com/gu/mediaservice/lib/aws/S3Bucket.scala b/common-lib/src/main/scala/com/gu/mediaservice/lib/aws/S3Bucket.scala new file mode 100644 index 0000000000..bcc3c58ae5 --- /dev/null +++ b/common-lib/src/main/scala/com/gu/mediaservice/lib/aws/S3Bucket.scala @@ -0,0 +1,27 @@ +package com.gu.mediaservice.lib.aws + +import java.net.URI + +case class S3Bucket(bucket: String, endpoint: String, usesPathStyleURLs: Boolean) { + def objectUrl(key: String): URI = { + val bucketBaseURL = bucketURL() + new URI("http", bucketBaseURL.getHost, bucketBaseURL.getPath + key, null) + } + + def keyFromS3URL(url: URI): String = { + if (usesPathStyleURLs) { + url.getPath.drop(bucket.length + 2) + } else { + url.getPath.drop(1) + } + } + + def bucketURL(): URI = { + if (usesPathStyleURLs) { + new URI("https", endpoint, s"/$bucket/", null) + } else { + new URI("https", s"$bucket.$endpoint", "/", null) + } + } + +} diff --git a/common-lib/src/main/scala/com/gu/mediaservice/lib/config/CommonConfig.scala b/common-lib/src/main/scala/com/gu/mediaservice/lib/config/CommonConfig.scala index 51adb4793f..acc0edc4dc 100644 --- a/common-lib/src/main/scala/com/gu/mediaservice/lib/config/CommonConfig.scala +++ b/common-lib/src/main/scala/com/gu/mediaservice/lib/config/CommonConfig.scala @@ -1,6 +1,6 @@ package com.gu.mediaservice.lib.config -import com.gu.mediaservice.lib.aws.{AwsClientV1BuilderUtils, AwsClientV2BuilderUtils, KinesisSenderConfig} +import com.gu.mediaservice.lib.aws.{AwsClientV1BuilderUtils, AwsClientV2BuilderUtils, KinesisSenderConfig, S3, S3Bucket} import com.gu.mediaservice.model.UsageRightsSpec import com.typesafe.config.Config import com.typesafe.scalalogging.StrictLogging @@ -53,8 +53,18 @@ abstract class CommonConfig(resources: GridConfigResources) extends AwsClientV1B lazy val softDeletedMetadataTable: String = string("dynamo.table.softDelete.metadata") val maybeIngestSqsQueueUrl: Option[String] = stringOpt("sqs.ingest.queue.url") - val maybeIngestBucket: Option[String] = stringOpt("s3.ingest.bucket") - val maybeFailBucket: Option[String] = stringOpt("s3.fail.bucket") + val maybeIngestBucket: Option[S3Bucket] = for { + ingestBucket <- stringOpt("s3.ingest.bucket.name") + ingestBucketEndpoint <- stringOpt("s3.ingest.bucket.endpoint") + } yield { + S3Bucket(ingestBucket, ingestBucketEndpoint, usesPathStyleURLs = booleanOpt("s3.ingest.bucket.pathStyleURLs").getOrElse(false)) + } + val maybeFailBucket: Option[S3Bucket] = for { + failBucket <- stringOpt("s3.fail.bucket.name") + failBucketEndpoint <- stringOpt("s3.fail.bucket.endpoint") + } yield { + S3Bucket(failBucket, failBucketEndpoint, usesPathStyleURLs = booleanOpt("s3.fail.bucket.pathStyleURLs").getOrElse(false)) + } val maybeUploadLimitInBytes: Option[Int] = intOpt("upload.limit.mb").map(_ * 1024 * 1024) @@ -69,6 +79,12 @@ abstract class CommonConfig(resources: GridConfigResources) extends AwsClientV1B val services = new SingleHostServices(domainRoot) + val imageBucket: S3Bucket = S3Bucket(string("s3.image.bucket.name"), string("s3.image.bucket.endpoint"), usesPathStyleURLs = booleanOpt("s3.image.bucket.pathStyleURLs").getOrElse(false)) + val thumbnailBucket: S3Bucket = S3Bucket(string("s3.thumb.bucket.name"), string("s3.thumb.bucket.endpoint"), usesPathStyleURLs = booleanOpt("s3.thumb.bucket.pathStyleURLs").getOrElse(false)) + + val googleS3AccessKey = stringOpt("s3.accessKey") + val googleS3SecretKey = stringOpt("s3.secretKey") + /** * Load in a list of domain metadata specifications from configuration. For example: * {{{ diff --git a/common-lib/src/main/scala/com/gu/mediaservice/lib/instances/Instances.scala b/common-lib/src/main/scala/com/gu/mediaservice/lib/instances/Instances.scala index eafe858fa4..e23f9d208e 100644 --- a/common-lib/src/main/scala/com/gu/mediaservice/lib/instances/Instances.scala +++ b/common-lib/src/main/scala/com/gu/mediaservice/lib/instances/Instances.scala @@ -18,9 +18,7 @@ trait Instances extends StrictLogging { r.status match { case 200 => implicit val ir = Json.reads[Instance] - val instances = Json.parse(r.body).as[Seq[Instance]] - logger.info("Got instances: " + instances.map(_.id).mkString(",")) - instances + Json.parse(r.body).as[Seq[Instance]] case _ => logger.warn("Got non 200 status for instances call: " + r.status) Seq.empty diff --git a/common-lib/src/main/scala/com/gu/mediaservice/model/ThrallMessage.scala b/common-lib/src/main/scala/com/gu/mediaservice/model/ThrallMessage.scala index d37d61d388..f7fc7d26dc 100644 --- a/common-lib/src/main/scala/com/gu/mediaservice/model/ThrallMessage.scala +++ b/common-lib/src/main/scala/com/gu/mediaservice/model/ThrallMessage.scala @@ -89,6 +89,7 @@ object ExternalThrallMessage{ implicit val upsertFromProjectionMessage: OFormat[UpsertFromProjectionMessage] = Json.format[UpsertFromProjectionMessage] implicit val createInstanceMessage: OFormat[CreateInstanceMessage] = Json.format[CreateInstanceMessage] + implicit val reindexImageMessage: OFormat[ReindexImageMessage] = Json.format[ReindexImageMessage] implicit val writes: OWrites[ExternalThrallMessage] = Json.writes[ExternalThrallMessage] implicit val reads: Reads[ExternalThrallMessage] = Json.reads[ExternalThrallMessage] @@ -168,3 +169,5 @@ case class CompleteMigrationMessage(lastModified: DateTime, instance: Instance) } case class CreateInstanceMessage(id: String, lastModified: DateTime, instance: Instance) extends ExternalThrallMessage + +case class ReindexImageMessage(id: String, lastModified: DateTime, instance: Instance) extends ExternalThrallMessage diff --git a/common-lib/src/main/scala/com/gu/mediaservice/syntax/MessageSubjects.scala b/common-lib/src/main/scala/com/gu/mediaservice/syntax/MessageSubjects.scala index c40ff2de17..b0a2f32c6e 100644 --- a/common-lib/src/main/scala/com/gu/mediaservice/syntax/MessageSubjects.scala +++ b/common-lib/src/main/scala/com/gu/mediaservice/syntax/MessageSubjects.scala @@ -22,7 +22,7 @@ trait MessageSubjects { val UpdateImageSyndicationMetadata = "update-image-syndication-metadata" val UpdateImagePhotoshootMetadata = "update-image-photoshoot-metadata" val CreateInstance = "create-instance" - + val ReindexImage = "reindex-image" } object MessageSubjects extends MessageSubjects diff --git a/common-lib/src/test/resources/application.conf b/common-lib/src/test/resources/application.conf index 05d1db98e6..b5c3e1aee2 100644 --- a/common-lib/src/test/resources/application.conf +++ b/common-lib/src/test/resources/application.conf @@ -61,3 +61,10 @@ instance.service.my="" instance.service.instances="" usageEvents.queue.name="" + +s3.image.bucket.name="images" +s3.image.bucket.endpoint="some-providers-s3-endpoint" + +s3.thumb.bucket.name="thumbs" +s3.thumb.bucket.endpoint="some-providers-s3-endpoint" + diff --git a/cropper/app/CropperComponents.scala b/cropper/app/CropperComponents.scala index 509192dedc..d570859ade 100644 --- a/cropper/app/CropperComponents.scala +++ b/cropper/app/CropperComponents.scala @@ -1,4 +1,5 @@ import com.gu.mediaservice.GridClient +import com.gu.mediaservice.lib.aws.S3 import com.gu.mediaservice.lib.imaging.ImageOperations import com.gu.mediaservice.lib.management.Management import com.gu.mediaservice.lib.play.GridComponents @@ -12,8 +13,9 @@ class CropperComponents(context: Context) extends GridComponents(context, new Cr val store = new CropStore(config) val imageOperations = new ImageOperations(context.environment.rootPath.getAbsolutePath) + val s3 = new S3(config) - val crops = new Crops(config, store, imageOperations, config.imageBucket) + val crops = new Crops(config, store, imageOperations, config.imageBucket, s3) val notifications = new Notifications(config) private val gridClient = GridClient(config.services)(wsClient) diff --git a/cropper/app/lib/CropStore.scala b/cropper/app/lib/CropStore.scala index 1cc815d2cb..034790420f 100644 --- a/cropper/app/lib/CropStore.scala +++ b/cropper/app/lib/CropStore.scala @@ -49,9 +49,11 @@ class CropStore(config: CropperConfig) extends S3ImageStorage(config) with CropS date = userMetadata.get("date").flatMap(parseDateTime) dimensions = Dimensions(width, height) + key = config.imgPublishingBucket.keyFromS3URL(s3Object.uri) + sizing = Asset( - signedCropAssetUrl(s3Object.uri), + signedCropAssetUrl(key), Some(s3Object.size), objectMetadata.contentType, Some(dimensions), @@ -78,12 +80,12 @@ class CropStore(config: CropperConfig) extends S3ImageStorage(config) with CropS def translateImgHost(uri: URI): URI = new URI("https", config.imgPublishingHost, uri.getPath, uri.getFragment) - private def folderForImagesCrops(id: Bucket, instance: Instance) = { + private def folderForImagesCrops(id: String, instance: Instance) = { instance.id + "/" + id } - private def signedCropAssetUrl(uri: URI): URI = { - signUrlTony(config.imgPublishingBucket, uri).toURI + private def signedCropAssetUrl(key: String): URI = { + signUrlTony(config.imgPublishingBucket, key).toURI } } diff --git a/cropper/app/lib/CropperConfig.scala b/cropper/app/lib/CropperConfig.scala index 96d043d60a..4a4739d8ce 100644 --- a/cropper/app/lib/CropperConfig.scala +++ b/cropper/app/lib/CropperConfig.scala @@ -1,5 +1,6 @@ package lib +import com.gu.mediaservice.lib.aws.{S3, S3Bucket} import com.gu.mediaservice.lib.config.{CommonConfig, GridConfigResources} import com.gu.mediaservice.model.Instance @@ -7,10 +8,11 @@ import java.io.File class CropperConfig(resources: GridConfigResources) extends CommonConfig(resources) { - val imageBucket: String = string("s3.image.bucket") - - val imgPublishingBucket = string("publishing.image.bucket") - + val imgPublishingBucket: S3Bucket = S3Bucket( + string("publishing.image.bucket.name"), + string("publishing.image.bucket.endpoint"), + boolean("publishing.image.bucket.pathStyleURLs") + ) val canDownloadCrop: Boolean = boolean("canDownloadCrop") val imgPublishingHost = string("publishing.image.host") diff --git a/cropper/app/lib/Crops.scala b/cropper/app/lib/Crops.scala index a58485af65..be3481c014 100644 --- a/cropper/app/lib/Crops.scala +++ b/cropper/app/lib/Crops.scala @@ -3,7 +3,7 @@ package lib import java.io.File import com.gu.mediaservice.lib.metadata.FileMetadataHelper import com.gu.mediaservice.lib.Files -import com.gu.mediaservice.lib.aws.S3 +import com.gu.mediaservice.lib.aws.{S3, S3Bucket} import com.gu.mediaservice.lib.imaging.{ExportResult, ImageOperations} import com.gu.mediaservice.lib.logging.{GridLogging, LogMarker, Stopwatch} import com.gu.mediaservice.model._ @@ -17,7 +17,7 @@ case object InvalidCropRequest extends Exception("Crop request invalid for image case class MasterCrop(sizing: Future[Asset], file: File, dimensions: Dimensions, aspectRatio: Float) -class Crops(config: CropperConfig, store: CropStore, imageOperations: ImageOperations, imageBucket: String)(implicit ec: ExecutionContext) extends GridLogging { +class Crops(config: CropperConfig, store: CropStore, imageOperations: ImageOperations, imageBucket: S3Bucket, s3: S3)(implicit ec: ExecutionContext) extends GridLogging { import Files._ private val cropQuality = 75d @@ -26,8 +26,6 @@ class Crops(config: CropperConfig, store: CropStore, imageOperations: ImageOpera // We don't overly care about output crop file sizes here, but prefer a fast output, so turn it right down. private val pngCropQuality = 1d - private val s3 = new S3(config) - def outputFilename(source: SourceImage, bounds: Bounds, outputWidth: Int, fileType: MimeType, isMaster: Boolean = false, instance: Instance): String = { val masterString: String = if (isMaster) "master/" else "" instance.id + "/" + s"${source.id}/${Crop.getCropId(bounds)}/$masterString$outputWidth${fileType.fileExtension}" @@ -117,7 +115,8 @@ class Crops(config: CropperConfig, store: CropStore, imageOperations: ImageOpera val hasAlpha = apiImage.fileMetadata.colourModelInformation.get("hasAlpha").flatMap(a => Try(a.toBoolean).toOption).getOrElse(true) val cropType = Crops.cropType(mimeType, colourType, hasAlpha) - val secureUrl = s3.signUrlTony(imageBucket, secureFile) + val key = imageBucket.keyFromS3URL(secureFile) + val secureUrl = s3.signUrlTony(imageBucket, key) Stopwatch.async(s"making crop assets for ${apiImage.id} ${Crop.getCropId(source.bounds)}") { for { diff --git a/cropper/test/lib/CropsTest.scala b/cropper/test/lib/CropsTest.scala index debab1aab0..77f6c4ace7 100644 --- a/cropper/test/lib/CropsTest.scala +++ b/cropper/test/lib/CropsTest.scala @@ -1,7 +1,9 @@ package lib +import com.gu.mediaservice.lib.aws.{S3, S3Bucket} import com.gu.mediaservice.lib.imaging.ImageOperations import com.gu.mediaservice.model._ +import org.mockito.Mockito.when import org.scalatest.funspec.AnyFunSpec import org.scalatest.matchers.should.Matchers import org.scalatestplus.mockito.MockitoSugar @@ -44,31 +46,38 @@ class CropsTest extends AnyFunSpec with Matchers with MockitoSugar { Crops.cropType(Tiff, "TrueColor", hasAlpha = false) shouldBe Jpeg } - private val config = mock[CropperConfig] + private val config = { + val mockConfig = mock[CropperConfig] + when(mockConfig.awsRegion).thenReturn("eu-west-1") + when(mockConfig.googleS3AccessKey).thenReturn(None) + when(mockConfig.googleS3SecretKey).thenReturn(None) + mockConfig + } private val store = mock[CropStore] private val imageOperations: ImageOperations = mock[ImageOperations] private val source: SourceImage = SourceImage("test", mock[Asset], valid = true, mock[ImageMetadata], mock[FileMetadata]) private val bounds: Bounds = Bounds(10, 20, 30, 40) private val outputWidth = 1234 - private val imageBucket = "crops-bucket" + private val imageBucket = S3Bucket("crops-bucket", S3.AmazonAwsS3Endpoint, usesPathStyleURLs = false) + private val s3 = new S3(config) it("should should construct a correct address for a master jpg") { - val outputFilename = new Crops(config, store, imageOperations, imageBucket) + val outputFilename = new Crops(config, store, imageOperations, imageBucket, s3) .outputFilename(source, bounds, outputWidth, Jpeg, isMaster = true, instance = instance) outputFilename shouldBe "an-instance/test/10_20_30_40/master/1234.jpg" } it("should should construct a correct address for a non-master jpg") { - val outputFilename = new Crops(config, store, imageOperations, imageBucket) + val outputFilename = new Crops(config, store, imageOperations, imageBucket, s3) .outputFilename(source, bounds, outputWidth, Jpeg, instance = instance) outputFilename shouldBe "an-instance/test/10_20_30_40/1234.jpg" } it("should should construct a correct address for a non-master tiff") { - val outputFilename = new Crops(config, store, imageOperations, imageBucket) + val outputFilename = new Crops(config, store, imageOperations, imageBucket, s3) .outputFilename(source, bounds, outputWidth, Tiff, instance = instance) outputFilename shouldBe "an-instance/test/10_20_30_40/1234.tiff" } it("should should construct a correct address for a non-master png") { - val outputFilename = new Crops(config, store, imageOperations, imageBucket) + val outputFilename = new Crops(config, store, imageOperations, imageBucket, s3) .outputFilename(source, bounds, outputWidth, Png, instance = instance) outputFilename shouldBe "an-instance/test/10_20_30_40/1234.png" } diff --git a/image-loader/app/ImageLoaderComponents.scala b/image-loader/app/ImageLoaderComponents.scala index 4c26ac9eb3..c04d624203 100644 --- a/image-loader/app/ImageLoaderComponents.scala +++ b/image-loader/app/ImageLoaderComponents.scala @@ -1,5 +1,5 @@ import com.gu.mediaservice.GridClient -import com.gu.mediaservice.lib.aws.SimpleSqsMessageConsumer +import com.gu.mediaservice.lib.aws.{S3, SimpleSqsMessageConsumer} import com.gu.mediaservice.lib.imaging.ImageOperations import com.gu.mediaservice.lib.logging.GridLogging import com.gu.mediaservice.lib.play.GridComponents @@ -26,7 +26,8 @@ class ImageLoaderComponents(context: Context) extends GridComponents(context, ne val notifications = new Notifications(config) val downloader = new Downloader()(ec,wsClient) val uploader = new Uploader(store, config, imageOperations, notifications, imageProcessor) - val projector = Projector(config, imageOperations, imageProcessor, auth) + val s3 = new S3(config) + val projector = Projector(config, imageOperations, imageProcessor, auth, s3) val quarantineUploader: Option[QuarantineUploader] = (config.uploadToQuarantineEnabled, config.quarantineBucket) match { case (true, Some(bucketName)) =>{ val quarantineStore = new QuarantineStore(config) diff --git a/image-loader/app/controllers/ImageLoaderController.scala b/image-loader/app/controllers/ImageLoaderController.scala index 192a36cb0b..6ada971220 100644 --- a/image-loader/app/controllers/ImageLoaderController.scala +++ b/image-loader/app/controllers/ImageLoaderController.scala @@ -122,8 +122,6 @@ class ImageLoaderController(auth: Authentication, } private def handleMessageFromIngestBucket(sqsMessage: SQSMessage)(basicLogMarker: LogMarker): Future[Unit] = { - logger.info(basicLogMarker, sqsMessage.toString) - extractS3KeyFromSqsMessage(sqsMessage) match { case Failure(exception) => metrics.failedIngestsFromQueue.increment() diff --git a/image-loader/app/lib/ImageLoaderConfig.scala b/image-loader/app/lib/ImageLoaderConfig.scala index fb40a8889f..10cc71952b 100644 --- a/image-loader/app/lib/ImageLoaderConfig.scala +++ b/image-loader/app/lib/ImageLoaderConfig.scala @@ -1,5 +1,7 @@ package lib +import com.gu.mediaservice.lib.aws.{S3, S3Bucket} + import java.io.File import com.gu.mediaservice.lib.cleanup.{ComposedImageProcessor, ImageProcessor, ImageProcessorResources} import com.gu.mediaservice.lib.config.{CommonConfig, GridConfigResources, ImageProcessorLoader} @@ -10,12 +12,12 @@ import play.api.inject.ApplicationLifecycle import scala.concurrent.duration.FiniteDuration class ImageLoaderConfig(resources: GridConfigResources) extends CommonConfig(resources) with StrictLogging { - val imageBucket: String = string("s3.image.bucket") val maybeImageReplicaBucket: Option[String] = stringOpt("s3.image.replicaBucket") - val thumbnailBucket: String = string("s3.thumb.bucket") - val quarantineBucket: Option[String] = stringOpt("s3.quarantine.bucket") + val quarantineBucket: Option[S3Bucket] = stringOpt("s3.quarantine.bucket").map { bucket => + S3Bucket(bucket, S3.AmazonAwsS3Endpoint, usesPathStyleURLs = false) + } val uploadToQuarantineEnabled: Boolean = boolean("upload.quarantine.enabled") val tempDir: File = new File(stringDefault("upload.tmp.dir", "/tmp")) @@ -66,4 +68,5 @@ class ImageLoaderConfig(resources: GridConfigResources) extends CommonConfig(res .get[Seq[ImageProcessor]]("image.processors")(configLoader) ImageProcessor.compose("ImageConfigLoader-imageProcessor", processors:_*) } + } diff --git a/image-loader/app/lib/ImageLoaderStore.scala b/image-loader/app/lib/ImageLoaderStore.scala index 367254a822..83ed57da63 100644 --- a/image-loader/app/lib/ImageLoaderStore.scala +++ b/image-loader/app/lib/ImageLoaderStore.scala @@ -28,14 +28,14 @@ class ImageLoaderStore(config: ImageLoaderConfig) extends lib.ImageIngestOperati } def getS3Object(key: String)(implicit logMarker: LogMarker): S3Object = handleNotFound(key) { - client.getObject(config.maybeIngestBucket.get, key) + getObject(config.maybeIngestBucket.get, key) } { logger.error(logMarker, s"Attempted to read $key from ingest bucket, but it does not exist.") } def generatePreSignedUploadUrl(filename: String, expiration: ZonedDateTime, uploadedBy: String, mediaId: String)(implicit instance: Instance): String = { val request = new GeneratePresignedUrlRequest( - config.maybeIngestBucket.get, // bucket + config.maybeIngestBucket.get.bucket, // bucket s"${instance.id}/$uploadedBy/$filename", // key ) .withMethod(HttpMethod.PUT) @@ -44,18 +44,18 @@ class ImageLoaderStore(config: ImageLoaderConfig) extends lib.ImageIngestOperati // sent by the client in manager.js request.putCustomRequestHeader("x-amz-meta-media-id", mediaId) - client.generatePresignedUrl(request).toString + generatePresignedRequest(request, config.maybeIngestBucket.get).toString } def moveObjectToFailedBucket(key: String)(implicit logMarker: LogMarker) = handleNotFound(key){ - client.copyObject(config.maybeIngestBucket.get, key, config.maybeFailBucket.get, key) // TODO Naked get - make optional + copyObject(config.maybeIngestBucket.get, config.maybeFailBucket.get, key) // TODO Naked get - make optional deleteObjectFromIngestBucket(key) } { logger.warn(logMarker, s"Attempted to copy $key from ingest bucket to fail bucket, but it does not exist.") } def deleteObjectFromIngestBucket(key: String)(implicit logMarker: LogMarker) = handleNotFound(key) { - client.deleteObject(config.maybeIngestBucket.get,key) + deleteObject(config.maybeIngestBucket.get, key) } { logger.warn(logMarker, s"Attempted to delete $key from ingest bucket, but it does not exist.") } diff --git a/image-loader/app/lib/QuarantineStore.scala b/image-loader/app/lib/QuarantineStore.scala index 1c750bb7c1..0c7e5f515f 100644 --- a/image-loader/app/lib/QuarantineStore.scala +++ b/image-loader/app/lib/QuarantineStore.scala @@ -3,4 +3,4 @@ package lib.storage import lib.ImageLoaderConfig import com.gu.mediaservice.lib -class QuarantineStore(config: ImageLoaderConfig) extends lib.ImageQuarantineOperations(config.quarantineBucket.get, config) \ No newline at end of file +class QuarantineStore(config: ImageLoaderConfig) extends lib.ImageQuarantineOperations(config.quarantineBucket.get, config) diff --git a/image-loader/app/lib/imaging/FileMetadataReader.scala b/image-loader/app/lib/imaging/FileMetadataReader.scala index f81b9ce3a7..40b9ad413e 100644 --- a/image-loader/app/lib/imaging/FileMetadataReader.scala +++ b/image-loader/app/lib/imaging/FileMetadataReader.scala @@ -125,7 +125,7 @@ object FileMetadataReader extends GridLogging { val redactionReplacementValue = s"REDACTED (value longer than $redactionThreshold characters, please refer to the metadata stored in the file itself)" private def redactLongFieldValues(imageId: String, metadataType: String, exceptions: List[String] = Nil)(props: Map[String, String]) = props.map { case (fieldName, value) if value.length > redactionThreshold && !exceptions.exists(fieldName.contains) => - logger.warn(s"Redacting '$fieldName' $metadataType field for image $imageId, as it's problematically long (longer than $redactionThreshold characters") + logger.debug(s"Redacting '$fieldName' $metadataType field for image $imageId, as it's problematically long (longer than $redactionThreshold characters") fieldName -> redactionReplacementValue case keyValuePair => keyValuePair } diff --git a/image-loader/app/model/Projector.scala b/image-loader/app/model/Projector.scala index 2be26928a3..0fb8f8c0dc 100644 --- a/image-loader/app/model/Projector.scala +++ b/image-loader/app/model/Projector.scala @@ -1,13 +1,12 @@ package model import java.io.{File, FileOutputStream} -import com.amazonaws.services.s3.AmazonS3 import com.gu.mediaservice.{GridClient, ImageDataMerger} import com.gu.mediaservice.lib.auth.Authentication -import com.amazonaws.services.s3.model.{GetObjectRequest, ObjectMetadata, S3Object => AwsS3Object} +import com.amazonaws.services.s3.model.{ObjectMetadata, S3Object => AwsS3Object} import com.gu.mediaservice.lib.ImageIngestOperations.{fileKeyFromId, optimisedPngKeyFromId} import com.gu.mediaservice.lib.{ImageIngestOperations, ImageStorageProps, StorableOptimisedImage, StorableOriginalImage, StorableThumbImage} -import com.gu.mediaservice.lib.aws.S3Ops +import com.gu.mediaservice.lib.aws.{S3, S3Bucket} import com.gu.mediaservice.lib.cleanup.ImageProcessor import com.gu.mediaservice.lib.config.InstanceForRequest import com.gu.mediaservice.lib.imaging.ImageOperations @@ -30,8 +29,8 @@ object Projector { import Uploader.toImageUploadOpsCfg - def apply(config: ImageLoaderConfig, imageOps: ImageOperations, processor: ImageProcessor, auth: Authentication)(implicit ec: ExecutionContext): Projector - = new Projector(toImageUploadOpsCfg(config), S3Ops.buildS3Client(config), imageOps, processor, auth) + def apply(config: ImageLoaderConfig, imageOps: ImageOperations, processor: ImageProcessor, auth: Authentication, s3: S3)(implicit ec: ExecutionContext): Projector + = new Projector(toImageUploadOpsCfg(config), s3, imageOps, processor, auth) } case class S3FileExtractedMetadata( @@ -84,7 +83,7 @@ object S3FileExtractedMetadata { } class Projector(config: ImageUploadOpsCfg, - s3: AmazonS3, + s3: S3, imageOps: ImageOperations, processor: ImageProcessor, auth: Authentication) extends GridLogging with InstanceForRequest { @@ -98,7 +97,7 @@ class Projector(config: ImageUploadOpsCfg, val s3Key = fileKeyFromId(imageId) if (!s3.doesObjectExist(config.originalFileBucket, s3Key)) - throw new NoSuchImageExistsInS3(config.originalFileBucket, s3Key) + throw new NoSuchImageExistsInS3(config.originalFileBucket.bucket, s3Key) val s3Source = Stopwatch(s"object exists, getting s3 object at s3://${config.originalFileBucket}/$s3Key to perform Image projection"){ s3.getObject(config.originalFileBucket, s3Key) @@ -162,7 +161,7 @@ class Projector(config: ImageUploadOpsCfg, class ImageUploadProjectionOps(config: ImageUploadOpsCfg, imageOps: ImageOperations, processor: ImageProcessor, - s3: AmazonS3 + s3: S3 ) extends GridLogging { import Uploader.{fromUploadRequestShared, toMetaMap} @@ -207,7 +206,7 @@ class ImageUploadProjectionOps(config: ImageUploadOpsCfg, } private def fetchFile( - bucket: String, key: String, outFile: File + bucket: S3Bucket, key: String, outFile: File )(implicit ec: ExecutionContext, logMarker: LogMarker): Future[Option[(File, MimeType)]] = { logger.info(logMarker, s"Trying fetch existing image from S3 bucket - $bucket at key $key") val doesFileExist = Future { s3.doesObjectExist(bucket, key) } recover { case _ => false } @@ -216,7 +215,7 @@ class ImageUploadProjectionOps(config: ImageUploadOpsCfg, logger.warn(logMarker, s"image did not exist in bucket $bucket at key $key") Future.successful(None) // falls back to creating from original file case true => - val obj = s3.getObject(new GetObjectRequest(bucket, key)) + val obj = s3.getObject(bucket, key) val fos = new FileOutputStream(outFile) try { IOUtils.copy(obj.getObjectContent, fos) diff --git a/image-loader/app/model/Uploader.scala b/image-loader/app/model/Uploader.scala index c10cfa6013..3bf87d9bf7 100644 --- a/image-loader/app/model/Uploader.scala +++ b/image-loader/app/model/Uploader.scala @@ -12,7 +12,7 @@ import com.gu.mediaservice.lib.argo.ArgoHelpers import com.gu.mediaservice.lib.auth.Authentication import com.gu.mediaservice.lib.auth.Authentication.Principal import com.gu.mediaservice.lib.{BrowserViewableImage, ImageStorageProps, StorableOptimisedImage, StorableOriginalImage, StorableThumbImage} -import com.gu.mediaservice.lib.aws.{S3Object, UpdateMessage} +import com.gu.mediaservice.lib.aws.{S3Bucket, S3Object, UpdateMessage} import com.gu.mediaservice.lib.cleanup.ImageProcessor import com.gu.mediaservice.lib.formatting._ import com.gu.mediaservice.lib.imaging.ImageOperations @@ -70,8 +70,8 @@ case class ImageUploadOpsCfg( thumbWidth: Int, thumbQuality: Double, transcodedMimeTypes: List[MimeType], - originalFileBucket: String, - thumbBucket: String + originalFileBucket: S3Bucket, + thumbBucket: S3Bucket, ) case class ImageUploadOpsDependencies( @@ -97,7 +97,7 @@ object Uploader extends GridLogging { config.thumbQuality, config.transcodedMimeTypes, config.imageBucket, - config.thumbnailBucket + config.thumbnailBucket, ) } diff --git a/image-loader/test/scala/model/ImageUploadTest.scala b/image-loader/test/scala/model/ImageUploadTest.scala index 0247f56d15..027c2c6a00 100644 --- a/image-loader/test/scala/model/ImageUploadTest.scala +++ b/image-loader/test/scala/model/ImageUploadTest.scala @@ -5,7 +5,7 @@ import java.net.URI import java.util.UUID import com.drew.imaging.ImageProcessingException import com.gu.mediaservice.lib.{StorableImage, StorableOptimisedImage, StorableOriginalImage, StorableThumbImage} -import com.gu.mediaservice.lib.aws.{S3Metadata, S3Object, S3ObjectMetadata, S3Ops} +import com.gu.mediaservice.lib.aws.{S3, S3Bucket, S3Metadata, S3Object, S3ObjectMetadata, S3Ops} import com.gu.mediaservice.lib.cleanup.ImageProcessor import com.gu.mediaservice.lib.imaging.ImageOperations import com.gu.mediaservice.lib.logging.LogMarker @@ -32,7 +32,7 @@ class ImageUploadTest extends AsyncFunSuite with Matchers with MockitoSugar { private implicit val logMarker: MockLogMarker = new MockLogMarker() // For mime type info, see https://github.com/guardian/grid/pull/2568 val tempDir = new File("/tmp") - val mockConfig: ImageUploadOpsCfg = ImageUploadOpsCfg(tempDir, 256, 85d, List(Tiff), "img-bucket", "thumb-bucket") + val mockConfig: ImageUploadOpsCfg = ImageUploadOpsCfg(tempDir, 256, 85d, List(Tiff), S3Bucket("img-bucket", S3.AmazonAwsS3Endpoint, usesPathStyleURLs = false), S3Bucket("thumb-bucket", S3.AmazonAwsS3Endpoint, usesPathStyleURLs = false)) /** * @todo: I flailed about until I found a path that worked, but @@ -53,7 +53,7 @@ class ImageUploadTest extends AsyncFunSuite with Matchers with MockitoSugar { def mockStore = (a: StorableImage) => Future.successful( - S3Object("madeupname", "madeupkey", a.file, Some(a.mimeType), None, a.meta, None) + S3Object(S3Bucket("madeupname", S3.AmazonAwsS3Endpoint, usesPathStyleURLs = false), "madeupkey", a.file, Some(a.mimeType), None, a.meta, None) ) def storeOrProjectOriginalFile: StorableOriginalImage => Future[S3Object] = mockStore diff --git a/image-loader/test/scala/model/ProjectorTest.scala b/image-loader/test/scala/model/ProjectorTest.scala index e7e5365d09..d9748f588d 100644 --- a/image-loader/test/scala/model/ProjectorTest.scala +++ b/image-loader/test/scala/model/ProjectorTest.scala @@ -3,10 +3,10 @@ package model import java.io.File import java.net.URI import java.util.{Date, UUID} -import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.ObjectMetadata import com.gu.mediaservice.GridClient import com.gu.mediaservice.lib.auth.Authentication +import com.gu.mediaservice.lib.aws.{S3, S3Bucket} import com.gu.mediaservice.lib.cleanup.ImageProcessor import com.gu.mediaservice.lib.imaging.ImageOperations import com.gu.mediaservice.lib.logging.{LogMarker, MarkerMap} @@ -38,9 +38,9 @@ class ProjectorTest extends AnyFreeSpec with Matchers with ScalaFutures with Moc private val imageOperations = new ImageOperations(ctxPath) - private val config = ImageUploadOpsCfg(new File("/tmp"), 256, 85d, Nil, "img-bucket", "thumb-bucket") + private val config = ImageUploadOpsCfg(new File("/tmp"), 256, 85d, Nil, S3Bucket("img-bucket", S3.AmazonAwsS3Endpoint, usesPathStyleURLs = false), S3Bucket("thumb-bucket", S3.AmazonAwsS3Endpoint, usesPathStyleURLs = false)) - private val s3 = mock[AmazonS3] + private val s3 = mock[S3] private val auth = mock[Authentication] private val projector = new Projector(config, s3, imageOperations, ImageProcessor.identity, auth) diff --git a/kahuna/app/lib/KahunaConfig.scala b/kahuna/app/lib/KahunaConfig.scala index eb462c0f80..1ddb1bde36 100644 --- a/kahuna/app/lib/KahunaConfig.scala +++ b/kahuna/app/lib/KahunaConfig.scala @@ -2,6 +2,7 @@ package lib import com.gu.mediaservice.lib.auth.Permissions.Pinboard import com.gu.mediaservice.lib.auth.SimplePermission +import com.gu.mediaservice.lib.aws.S3Bucket import com.gu.mediaservice.lib.config.{CommonConfig, GridConfigResources} import com.gu.mediaservice.model.Instance import play.api.libs.json._ @@ -50,8 +51,7 @@ class KahunaConfig(resources: GridConfigResources) extends CommonConfig(resource val frameAncestors: Set[String] = getStringSet("security.frameAncestors") val connectSources: Set[String] = getStringSet("security.connectSources") ++ maybeIngestBucket.map { ingestBucket => - if (isDev) "https://localstack.media.local.dev-gutools.co.uk" - else s"https://$ingestBucket.s3.$awsRegion.amazonaws.com" + ingestBucket.bucketURL().toURL.toExternalForm } ++ telemetryUri val fontSources: Set[String] = getStringSet("security.fontSources") val imageSources: Set[String] = getStringSet("security.imageSources") diff --git a/media-api/app/MediaApiComponents.scala b/media-api/app/MediaApiComponents.scala index e84b796bb0..bbbc4e0d15 100644 --- a/media-api/app/MediaApiComponents.scala +++ b/media-api/app/MediaApiComponents.scala @@ -1,4 +1,4 @@ -import com.gu.mediaservice.lib.aws.ThrallMessageSender +import com.gu.mediaservice.lib.aws.{S3, ThrallMessageSender} import com.gu.mediaservice.lib.instances.InstancesClient import com.gu.mediaservice.lib.management.{ElasticSearchHealthCheck, Management} import com.gu.mediaservice.lib.metadata.SoftDeletedMetadataTable @@ -18,6 +18,7 @@ class MediaApiComponents(context: Context) extends GridComponents(context, new M val mediaApiMetrics = new MediaApiMetrics(config, actorSystem, applicationLifecycle) val s3Client = new S3Client(config) + val s3 = new S3(config) val usageQuota = new UsageQuota(config, actorSystem.scheduler) usageQuota.quotaStore.update() @@ -31,7 +32,7 @@ class MediaApiComponents(context: Context) extends GridComponents(context, new M val softDeletedMetadataTable = new SoftDeletedMetadataTable(config) - val mediaApi = new MediaApi(auth, messageSender, softDeletedMetadataTable, elasticSearch, imageResponse, config, controllerComponents, s3Client, mediaApiMetrics, wsClient, authorisation, events) + val mediaApi = new MediaApi(auth, messageSender, softDeletedMetadataTable, elasticSearch, imageResponse, config, controllerComponents, s3, mediaApiMetrics, wsClient, authorisation, events) val suggestionController = new SuggestionController(auth, elasticSearch, controllerComponents) val aggController = new AggregationController(auth, elasticSearch, controllerComponents) val usageController = new UsageController(auth, config, elasticSearch, usageQuota, controllerComponents) diff --git a/media-api/app/controllers/MediaApi.scala b/media-api/app/controllers/MediaApi.scala index 4504e466cb..e566bc4ad6 100644 --- a/media-api/app/controllers/MediaApi.scala +++ b/media-api/app/controllers/MediaApi.scala @@ -10,6 +10,7 @@ import com.gu.mediaservice.lib.auth.Permissions.{ArchiveImages, DeleteCropsOrUsa import com.gu.mediaservice.lib.auth._ import com.gu.mediaservice.lib.auth.provider.ApiKeyAuthenticationProvider import com.gu.mediaservice.lib.aws.{ContentDisposition, ThrallMessageSender, UpdateMessage} +import com.gu.mediaservice.lib.aws.{ContentDisposition, S3, ThrallMessageSender, UpdateMessage} import com.gu.mediaservice.lib.config.InstanceForRequest import com.gu.mediaservice.lib.events.UsageEvents import com.gu.mediaservice.lib.formatting.printDateTime @@ -40,7 +41,7 @@ class MediaApi( imageResponse: ImageResponse, config: MediaApiConfig, override val controllerComponents: ControllerComponents, - s3Client: S3Client, + s3: S3, mediaApiMetrics: MediaApiMetrics, ws: WSClient, authorisation: Authorisation, @@ -250,7 +251,8 @@ class MediaApi( val maybeResult = for { export <- source.exports.find(_.id.contains(exportId)) asset <- export.assets.find(_.dimensions.exists(_.width == width)) - s3Object <- Try(s3Client.getObject(config.imgPublishingBucket, asset.file)).toOption + key = config.imgPublishingBucket.keyFromS3URL(asset.file) + s3Object <- Try(s3.getObject(config.imgPublishingBucket, key)).toOption file = StreamConverters.fromInputStream(() => s3Object.getObjectContent) entity = HttpEntity.Streamed(file, asset.size, asset.mimeType.map(_.name)) result = Result(ResponseHeader(OK), entity).withHeaders("Content-Disposition" -> getContentDisposition(source, export, asset, config.shortenDownloadFilename)) @@ -357,7 +359,8 @@ class MediaApi( val apiKey = request.user.accessor logger.info(s"Download original image: $id from user: ${Authentication.getIdentity(request.user)}", apiKey, id) mediaApiMetrics.incrementImageDownload(apiKey, mediaApiMetrics.OriginalDownloadType) - val s3Object = s3Client.getObject(config.imageBucket, image.source.file) + val key = config.imageBucket.keyFromS3URL(image.source.file) + val s3Object = s3.getObject(config.imageBucket, key) val file = StreamConverters.fromInputStream(() => s3Object.getObjectContent) val entity = HttpEntity.Streamed(file, image.source.size, image.source.mimeType.map(_.name)) @@ -410,8 +413,9 @@ class MediaApi( logger.info(s"Download optimised image: $id from user: ${Authentication.getIdentity(request.user)}", apiKey, id) mediaApiMetrics.incrementImageDownload(apiKey, mediaApiMetrics.OptimisedDownloadType) + val key = config.imageBucket.keyFromS3URL(image.optimisedPng.getOrElse(image.source).file) val sourceImageUri = - new URI(s3Client.signUrl(config.imageBucket, image.optimisedPng.getOrElse(image.source).file, image, imageType = image.optimisedPng match { + new URI(s3.signUrl(config.imageBucket, key, image, imageType = image.optimisedPng match { case Some(_) => OptimisedPng case _ => Source })) diff --git a/media-api/app/lib/ImageResponse.scala b/media-api/app/lib/ImageResponse.scala index c96aaade7c..8fff479bee 100644 --- a/media-api/app/lib/ImageResponse.scala +++ b/media-api/app/lib/ImageResponse.scala @@ -75,11 +75,12 @@ class ImageResponse(config: MediaApiConfig, s3Client: S3Client, usageQuota: Usag val fileUri = image.source.file - val imageUrl = s3Client.signUrl(config.imageBucket, fileUri, image, imageType = Source) + val key = config.imageBucket.keyFromS3URL(fileUri) + val imageUrl = s3Client.signUrl(config.imageBucket, key, image, imageType = Source) val pngUrl: Option[String] = pngFileUri - .map(s3Client.signUrl(config.imageBucket, _, image, imageType = OptimisedPng)) + .map(uri => s3Client.signUrl(config.imageBucket, config.imageBucket.keyFromS3URL(uri), image, imageType = OptimisedPng)) - def s3SignedThumbUrl = s3Client.signUrl(config.thumbBucket, fileUri, image, imageType = Thumbnail) + def s3SignedThumbUrl = s3Client.signUrl(config.thumbnailBucket, key, image, imageType = Thumbnail) val thumbUrl = config.cloudFrontDomainThumbBucket .flatMap(s3Client.signedCloudFrontUrl(_, fileUri.getPath.drop(1))) diff --git a/media-api/app/lib/MediaApiConfig.scala b/media-api/app/lib/MediaApiConfig.scala index 081a9f347b..1eb97adef9 100644 --- a/media-api/app/lib/MediaApiConfig.scala +++ b/media-api/app/lib/MediaApiConfig.scala @@ -1,34 +1,30 @@ package lib -import com.amazonaws.services.cloudfront.util.SignerUtils +import com.gu.mediaservice.lib.aws.{S3, S3Bucket} import com.gu.mediaservice.lib.config.{CommonConfigWithElastic, GridConfigResources} import com.gu.mediaservice.model.Instance import org.joda.time.DateTime import scalaz.NonEmptyList -import java.security.PrivateKey import scala.util.Try case class StoreConfig( - storeBucket: String, - storeKey: String + storeBucket: S3Bucket, + storeKey: String, ) class MediaApiConfig(resources: GridConfigResources) extends CommonConfigWithElastic(resources) { - val configBucket: String = string("s3.config.bucket") - val usageMailBucket: String = string("s3.usagemail.bucket") + val configBucket: S3Bucket = S3Bucket(string("s3.config.bucket"), S3.AmazonAwsS3Endpoint, usesPathStyleURLs = false) + val usageMailBucket: S3Bucket = S3Bucket(string("s3.usagemail.bucket"), S3.AmazonAwsS3Endpoint, usesPathStyleURLs = false) val quotaStoreKey: String = string("quota.store.key") val quotaStoreConfig: StoreConfig = StoreConfig(configBucket, quotaStoreKey) //Lazy allows this to be empty and not break things unless used somewhere - lazy val imgPublishingBucket = string("publishing.image.bucket") - - val imageBucket: String = string("s3.image.bucket") - val thumbBucket: String = string("s3.thumb.bucket") + lazy val imgPublishingBucket: S3Bucket = S3Bucket(string("publishing.image.bucket"), S3.AmazonAwsS3Endpoint, usesPathStyleURLs = false) val cloudFrontDomainThumbBucket: Option[String] = stringOpt("cloudfront.domain.thumbbucket") - val cloudFrontPrivateKeyBucket: Option[String] = stringOpt("cloudfront.private-key.bucket") + val cloudFrontPrivateKeyBucket: Option[S3Bucket] = stringOpt("cloudfront.private-key.bucket").map(S3Bucket(_, S3.AmazonAwsS3Endpoint, usesPathStyleURLs = false)) val cloudFrontPrivateKeyBucketKey: Option[String] = stringOpt("cloudfront.private-key.key") val cloudFrontKeyPairId: Option[String] = stringOpt("cloudfront.keypair.id") diff --git a/media-api/app/lib/QuotaStore.scala b/media-api/app/lib/QuotaStore.scala index c79486fbd1..1606586554 100644 --- a/media-api/app/lib/QuotaStore.scala +++ b/media-api/app/lib/QuotaStore.scala @@ -1,14 +1,15 @@ package lib import com.gu.mediaservice.lib.BaseStore +import com.gu.mediaservice.lib.aws.S3Bucket import play.api.libs.json.Json import scala.concurrent.ExecutionContext class QuotaStore( quotaFile: String, - bucket: String, - config: MediaApiConfig + bucket: S3Bucket, + config: MediaApiConfig, )(implicit ec: ExecutionContext) extends BaseStore[String, SupplierUsageQuota](bucket, config)(ec) { def getQuota: Map[String, SupplierUsageQuota] = store.get() diff --git a/media-api/app/lib/S3Client.scala b/media-api/app/lib/S3Client.scala index 1161392cfd..a286d5420a 100644 --- a/media-api/app/lib/S3Client.scala +++ b/media-api/app/lib/S3Client.scala @@ -29,7 +29,7 @@ class S3Client(config: MediaApiConfig) extends S3(config) with CloudFrontDistrib lazy val keyPairId: Option[String] = config.cloudFrontKeyPairId lazy val privateKey: PrivateKey = { config.cloudFrontPrivateKeyBucket.flatMap(bucket => config.cloudFrontPrivateKeyBucketKey.map { key => - val privateKeyStream = client.getObject(bucket, key).getObjectContent + val privateKeyStream = getObject(bucket, key).getObjectContent try { PEM.readPrivateKey(privateKeyStream) } diff --git a/media-api/app/lib/UsageQuota.scala b/media-api/app/lib/UsageQuota.scala index bcd4e93e21..0dc049fc77 100644 --- a/media-api/app/lib/UsageQuota.scala +++ b/media-api/app/lib/UsageQuota.scala @@ -2,6 +2,7 @@ package lib import org.apache.pekko.actor.Scheduler import com.gu.mediaservice.lib.FeatureToggle +import com.gu.mediaservice.lib.aws.S3 import com.gu.mediaservice.model.UsageRights import scala.concurrent.Await @@ -16,13 +17,13 @@ class UsageQuota(config: MediaApiConfig, scheduler: Scheduler) { val quotaStore = new QuotaStore( config.quotaStoreConfig.storeKey, config.quotaStoreConfig.storeBucket, - config + config, ) val usageStore = new UsageStore( config.usageMailBucket, config, - quotaStore + quotaStore, ) def scheduleUpdates(): Unit = { diff --git a/media-api/app/lib/UsageStore.scala b/media-api/app/lib/UsageStore.scala index 2c19044131..c8b55defa0 100644 --- a/media-api/app/lib/UsageStore.scala +++ b/media-api/app/lib/UsageStore.scala @@ -1,6 +1,7 @@ package lib import com.gu.mediaservice.lib.BaseStore +import com.gu.mediaservice.lib.aws.S3Bucket import com.gu.mediaservice.lib.logging.GridLogging import com.gu.mediaservice.model.{Agencies, Agency, UsageRights} import org.joda.time.DateTime @@ -57,9 +58,9 @@ object UsageStore extends GridLogging { } class UsageStore( - bucket: String, + bucket: S3Bucket, config: MediaApiConfig, - quotaStore: QuotaStore + quotaStore: QuotaStore, )(implicit val ec: ExecutionContext) extends BaseStore[String, UsageStatus](bucket, config) with GridLogging { import UsageStore._ diff --git a/media-api/test/lib/elasticsearch/Fixtures.scala b/media-api/test/lib/elasticsearch/Fixtures.scala index 84d2fa8eee..df7334f978 100644 --- a/media-api/test/lib/elasticsearch/Fixtures.scala +++ b/media-api/test/lib/elasticsearch/Fixtures.scala @@ -35,8 +35,10 @@ trait Fixtures { "es.index.aliases.current", "es.index.aliases.migration", "es6.url", - "s3.image.bucket", - "s3.thumb.bucket", + "s3.image.bucket.name", + "s3.image.bucket.endpoint", + "s3.thumb.bucket.name", + "s3.thumb.bucket.endpoint", "grid.stage", "grid.appName", "instance.service.my", diff --git a/rest-lib/src/main/scala/com/gu/mediaservice/lib/auth/provider/ApiKeyAuthenticationProvider.scala b/rest-lib/src/main/scala/com/gu/mediaservice/lib/auth/provider/ApiKeyAuthenticationProvider.scala index f264d8589f..f282bd20b9 100644 --- a/rest-lib/src/main/scala/com/gu/mediaservice/lib/auth/provider/ApiKeyAuthenticationProvider.scala +++ b/rest-lib/src/main/scala/com/gu/mediaservice/lib/auth/provider/ApiKeyAuthenticationProvider.scala @@ -2,6 +2,7 @@ package com.gu.mediaservice.lib.auth.provider import com.gu.mediaservice.lib.auth.Authentication.{MachinePrincipal, Principal} import com.gu.mediaservice.lib.auth.provider.ApiKeyAuthenticationProvider.{ApiKeyInstance, KindeIdKey} import com.gu.mediaservice.lib.auth.{ApiAccessor, KeyStore} +import com.gu.mediaservice.lib.aws.{S3, S3Bucket} import com.gu.mediaservice.lib.config.InstanceForRequest import com.gu.mediaservice.lib.events.UsageEvents import com.gu.mediaservice.model.Instance @@ -25,7 +26,8 @@ class ApiKeyAuthenticationProvider(configuration: Configuration, resources: Auth var keyStorePlaceholder: Option[KeyStore] = _ override def initialise(): Unit = { - val store = new KeyStore(configuration.get[String]("authKeyStoreBucket"), resources.commonConfig) + val authKeyStoreBucket = S3Bucket(configuration.get[String]("authKeyStoreBucket"), S3.AmazonAwsS3Endpoint, usesPathStyleURLs = false) + val store = new KeyStore(authKeyStoreBucket, resources.commonConfig) store.scheduleUpdates(resources.actorSystem.scheduler) keyStorePlaceholder = Some(store) } diff --git a/rest-lib/src/test/resources/application.conf b/rest-lib/src/test/resources/application.conf index 930c6a00d5..7e55f72bb2 100644 --- a/rest-lib/src/test/resources/application.conf +++ b/rest-lib/src/test/resources/application.conf @@ -3,3 +3,7 @@ grid.appName: "test" thrall.kinesis.stream.name: "not-used" thrall.kinesis.lowPriorityStream.name: "not-used" domain.root: "notused.example.com" +s3.image.bucket.name: images +s3.image.bucket.endpoint: some-providers-s3-endpoint +s3.thumb.bucket.name: thumbs +s3.thumb.bucket.endpoint: some-providers-s3-endpoint diff --git a/rest-lib/src/test/scala/com/gu/mediaservice/lib/auth/ApiKeyAuthenticationProviderTest.scala b/rest-lib/src/test/scala/com/gu/mediaservice/lib/auth/ApiKeyAuthenticationProviderTest.scala index 3ba9231943..76c2bfd1b4 100644 --- a/rest-lib/src/test/scala/com/gu/mediaservice/lib/auth/ApiKeyAuthenticationProviderTest.scala +++ b/rest-lib/src/test/scala/com/gu/mediaservice/lib/auth/ApiKeyAuthenticationProviderTest.scala @@ -3,6 +3,7 @@ package com.gu.mediaservice.lib.auth import org.apache.pekko.actor.ActorSystem import com.gu.mediaservice.lib.auth.Authentication.MachinePrincipal import com.gu.mediaservice.lib.auth.provider.{ApiKeyAuthenticationProvider, Authenticated, AuthenticationProviderResources, Invalid, NotAuthenticated, NotAuthorised} +import com.gu.mediaservice.lib.aws.{S3, S3Bucket} import com.gu.mediaservice.lib.config.{CommonConfig, GridConfigResources} import com.gu.mediaservice.lib.events.UsageEvents import com.gu.mediaservice.model.Instance @@ -43,7 +44,7 @@ class ApiKeyAuthenticationProviderTest extends AsyncFreeSpec with Matchers with Future.successful(()) } - override def keyStore: KeyStore = new KeyStore("not-used", resources.commonConfig) { + override def keyStore: KeyStore = new KeyStore(S3Bucket("not-used", S3.AmazonAwsS3Endpoint, usesPathStyleURLs = false), resources.commonConfig) { override def lookupIdentity(key: String)(implicit instance: Instance): Option[ApiAccessor] = { key match { case "key-chuckle" => Some(ApiAccessor("brothers", Internal)) diff --git a/scripts/src/main/scala/com/gu/mediaservice/lib/JsonValueCodecJsValue.scala b/scripts/src/main/scala/com/gu/mediaservice/lib/JsonValueCodecJsValue.scala index da73d03db6..c6ad7b03f8 100644 --- a/scripts/src/main/scala/com/gu/mediaservice/lib/JsonValueCodecJsValue.scala +++ b/scripts/src/main/scala/com/gu/mediaservice/lib/JsonValueCodecJsValue.scala @@ -98,6 +98,7 @@ object JsonValueCodecJsValue { case JsNull => out.writeNull() case _ => + out.writeNull() } } diff --git a/thrall/app/ThrallComponents.scala b/thrall/app/ThrallComponents.scala index 7bf5eace79..671d2f9f44 100644 --- a/thrall/app/ThrallComponents.scala +++ b/thrall/app/ThrallComponents.scala @@ -2,6 +2,7 @@ import org.apache.pekko.Done import org.apache.pekko.stream.scaladsl.Source import com.gu.kinesis.{KinesisRecord, KinesisSource, ConsumerConfig => KclPekkoStreamConfig} import com.gu.mediaservice.GridClient +import com.gu.mediaservice.lib.aws.ThrallMessageSender import com.gu.mediaservice.lib.aws.{S3Ops, ThrallMessageSender} import com.gu.mediaservice.lib.instances.{Instances, InstancesClient} import com.gu.mediaservice.lib.logging.MarkerMap @@ -22,6 +23,7 @@ import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest import scala.concurrent.{Await, Future} import scala.concurrent.duration._ import scala.language.postfixOps +import com.gu.mediaservice.lib.aws.S3 class ThrallComponents(context: Context) extends GridComponents(context, new ThrallConfig(_)) with StrictLogging with AssetsComponents with Instances { @@ -61,6 +63,7 @@ class ThrallComponents(context: Context) extends GridComponents(context, new Thr Await.ready(ensureIndexes(), 60 seconds) val messageSender = new ThrallMessageSender(config.thrallKinesisStreamConfig) + val lowPriorityMessageSender = new ThrallMessageSender(config.thrallKinesisLowPriorityStreamConfig) val highPriorityKinesisConfig: KclPekkoStreamConfig = KinesisConfig.kinesisConfig(config.kinesisConfig) val lowPriorityKinesisConfig: KclPekkoStreamConfig = KinesisConfig.kinesisConfig(config.kinesisLowPriorityConfig) @@ -78,7 +81,8 @@ class ThrallComponents(context: Context) extends GridComponents(context, new Thr gridClient, auth, instanceMessageSender, - events + events, + messageSender ) val thrallStreamProcessor = new ThrallStreamProcessor( @@ -91,7 +95,7 @@ class ThrallComponents(context: Context) extends GridComponents(context, new Thr val streamRunning: Future[Done] = thrallStreamProcessor.run() - val s3 = S3Ops.buildS3Client(config) + val s3 = new S3(config) Source.repeat(()).throttle(1, per = 5.minute).map(_ => { implicit val logMarker: MarkerMap = MarkerMap() @@ -121,7 +125,7 @@ class ThrallComponents(context: Context) extends GridComponents(context, new Thr val softDeletedMetadataTable = new SoftDeletedMetadataTable(config) val maybeCustomReapableEligibility = config.maybeReapableEligibilityClass(applicationLifecycle) - val thrallController = new ThrallController(es, store, migrationSourceWithSender.send, messageSender, actorSystem, auth, config.services, controllerComponents, gridClient, s3, config.imageBucket) + val thrallController = new ThrallController(es, store, migrationSourceWithSender.send, messageSender, actorSystem, auth, config.services, controllerComponents, gridClient, s3, config.imageBucket, lowPriorityMessageSender) val reaperController = new ReaperController(es, store, authorisation, config, actorSystem.scheduler, maybeCustomReapableEligibility, softDeletedMetadataTable, thrallMetrics, auth, config.services, controllerComponents, wsClient, events) val healthCheckController = new HealthCheck(es, streamRunning.isCompleted, config, controllerComponents) diff --git a/thrall/app/controllers/ReaperController.scala b/thrall/app/controllers/ReaperController.scala index 711fabd547..51818771ba 100644 --- a/thrall/app/controllers/ReaperController.scala +++ b/thrall/app/controllers/ReaperController.scala @@ -114,7 +114,7 @@ class ReaperController( case Some(reaperBucket) => val now = DateTime.now(DateTimeZone.UTC) val key = s"$deleteType/${s3DirNameFromDate(now)}/$deleteType-${now.toString()}.json" - store.client.putObject(reaperBucket, key, json.toString()) + store.putObject(reaperBucket, key, json.toString()) json } } @@ -198,8 +198,8 @@ class ReaperController( case (Some(reaperBucket), Some(countOfImagesToReap)) => val recentRecords = List(now, now.minusDays(1), now.minusDays(2)).flatMap { day => val s3DirName = s3DirNameFromDate(day) - store.client.listObjects(reaperBucket, s"soft/$s3DirName/").getObjectSummaries.asScala.toList ++ - store.client.listObjects(reaperBucket, s"hard/$s3DirName/").getObjectSummaries.asScala.toList + store.listObjects(reaperBucket, s"soft/$s3DirName/").getObjectSummaries.asScala.toList ++ + store.listObjects(reaperBucket, s"hard/$s3DirName/").getObjectSummaries.asScala.toList } val recentRecordKeys = recentRecords @@ -214,9 +214,11 @@ class ReaperController( def reaperRecord(key: String) = auth { config.maybeReaperBucket match { case None => NotImplemented("Reaper bucket not configured") case Some(reaperBucket) => - Ok( - store.client.getObjectAsString(reaperBucket, key) - ).as(JSON) + store.getObjectAsString(reaperBucket, key).map { record => + Ok(record).as(JSON) + }.getOrElse{ + NotFound + } }} def conf() = Action.async { diff --git a/thrall/app/controllers/ThrallController.scala b/thrall/app/controllers/ThrallController.scala index 3ae766bebb..4cb8a34da7 100644 --- a/thrall/app/controllers/ThrallController.scala +++ b/thrall/app/controllers/ThrallController.scala @@ -3,16 +3,15 @@ package controllers import org.apache.pekko.actor.ActorSystem import org.apache.pekko.stream.Materializer import org.apache.pekko.stream.scaladsl.{Sink, Source} -import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.ListObjectsRequest import com.gu.mediaservice.GridClient import com.gu.mediaservice.lib.auth.{Authentication, BaseControllerWithLoginRedirects} -import com.gu.mediaservice.lib.aws.{ThrallMessageSender, UpdateMessage} +import com.gu.mediaservice.lib.aws.{S3, S3Bucket, ThrallMessageSender, UpdateMessage} import com.gu.mediaservice.lib.config.{InstanceForRequest, Services} import com.gu.mediaservice.lib.elasticsearch.{NotRunning, Running} import com.gu.mediaservice.lib.logging.GridLogging -import com.gu.mediaservice.model.{CompleteMigrationMessage, CreateMigrationIndexMessage, Instance, UpsertFromProjectionMessage} -import com.gu.mediaservice.syntax.MessageSubjects.Image +import com.gu.mediaservice.model.{CompleteMigrationMessage, CreateMigrationIndexMessage, Instance, ReindexImageMessage, UpsertFromProjectionMessage} +import com.gu.mediaservice.syntax.MessageSubjects.{Image, ReindexImage} import lib.elasticsearch.ElasticSearch import lib.{MigrationRequest, OptionalFutureRunner, Paging, ThrallStore} import org.joda.time.{DateTime, DateTimeZone} @@ -40,8 +39,9 @@ class ThrallController( override val services: Services, override val controllerComponents: ControllerComponents, gridClient: GridClient, - s3: AmazonS3, - imageBucket: String, + s3: S3, + imageBucket: S3Bucket, + lowPriorityMessageSender: ThrallMessageSender )(implicit val ec: ExecutionContext) extends BaseControllerWithLoginRedirects with GridLogging with InstanceForRequest { private val numberFormatter: Long => String = java.text.NumberFormat.getIntegerInstance().format @@ -150,7 +150,7 @@ class ThrallController( def startMigration = withLoginRedirectAsync { implicit request => val instance = instanceOf(request) - if(Form(single("start-confirmation" -> text)).bindFromRequest().get != "start"){ + if (Form(single("start-confirmation" -> text)).bindFromRequest().get != "start") { Future.successful(BadRequest("you did not enter 'start' in the text box")) } else { val msgFailedToFetchIndex = s"Could not fetch ES index details for alias '${es.imagesMigrationAlias(instance)}'" @@ -192,7 +192,7 @@ class ThrallController( def completeMigration(): Action[AnyContent] = withLoginRedirectAsync { implicit request => val instance = instanceOf(request) - if(Form(single("complete-confirmation" -> text)).bindFromRequest().get != "complete"){ + if (Form(single("complete-confirmation" -> text)).bindFromRequest().get != "complete") { Future.successful(BadRequest("you did not enter 'complete' in the text box")) } else { es.refreshAndRetrieveMigrationStatus(instance) match { @@ -305,14 +305,14 @@ class ThrallController( @tailrec def getMediaIdsFromS3(all: Seq[String], nextMarker: Option[String])(implicit instance: Instance): Seq[String] = { - val baseRequest = new ListObjectsRequest().withBucketName(imageBucket).withPrefix(instance.id + "/") + val baseRequest = new ListObjectsRequest().withBucketName(imageBucket.bucket).withPrefix(instance.id + "/") val request = nextMarker.map { marker => baseRequest.withMarker(marker) }.getOrElse { baseRequest } - val listing = s3.listObjects(request) + val listing = s3.listObjects(imageBucket, request) val keys = listing.getObjectSummaries.asScala.flatMap { s3Object => logger.info("Reindexing s3 key: " + s3Object.getKey) s3Object.getKey.split("/").lastOption @@ -325,29 +325,19 @@ class ThrallController( } } + logger.info(s"Reindex requested for instance ${instance.id}") val mediaIds = getMediaIdsFromS3(Seq.empty, None) + logger.info(s"Queuing reindex requests for ${mediaIds.size} images for instance ${instance.id}") mediaIds.foreach { mediaId => - Await.result(reindexImage(mediaId), Duration(10, TimeUnit.SECONDS)) + lowPriorityMessageSender.publish( + UpdateMessage( + subject = ReindexImage, + id = Some(mediaId), + instance = instance + ) + ) } Ok("ok") } - private def reindexImage(mediaId: String)(implicit instance: Instance) = { - logger.info(s"Reindexing from s3 ${instance.id} / $mediaId") - - gridClient.getImageLoaderProjection(mediaId, auth.innerServiceCall).map { maybeImage => - logger.info(s"Projected ${instance.id} / $mediaId to $maybeImage}") - maybeImage.exists { image => - val updateMessage = UpdateMessage(subject = Image, image = Some(image), instance = instance) - logger.info(s"Publishing projected image as a thrall image message: ${updateMessage.id}") - messageSender.publish(updateMessage) - true - } - }.recover { - case _: Throwable => - logger.warn(s"Error while reindexing ${instance.id} / $mediaId - Image has not been reindexed!") - false - } - } - } diff --git a/thrall/app/lib/ThrallConfig.scala b/thrall/app/lib/ThrallConfig.scala index 25743ddf67..48ab0dd971 100644 --- a/thrall/app/lib/ThrallConfig.scala +++ b/thrall/app/lib/ThrallConfig.scala @@ -1,6 +1,6 @@ package lib -import com.gu.mediaservice.lib.aws.AwsClientV2BuilderUtils +import com.gu.mediaservice.lib.aws.{AwsClientV2BuilderUtils, S3, S3Bucket} import com.gu.mediaservice.lib.cleanup.ReapableEligibiltyResources import com.gu.mediaservice.lib.config.{CommonConfigWithElastic, GridConfigResources, ReapableEligibilityLoader} import com.gu.mediaservice.lib.elasticsearch.ReapableEligibility @@ -56,11 +56,7 @@ object KinesisReceiverConfig { } class ThrallConfig(resources: GridConfigResources) extends CommonConfigWithElastic(resources) { - val imageBucket: String = string("s3.image.bucket") - - val thumbnailBucket: String = string("s3.thumb.bucket") - - val maybeReaperBucket: Option[String] = stringOpt("s3.reaper.bucket") + val maybeReaperBucket: Option[S3Bucket] = stringOpt("s3.reaper.bucket").map(S3Bucket(_, S3.AmazonAwsS3Endpoint, usesPathStyleURLs = false)) val maybeReaperCountPerRun: Option[Int] = intOpt("reaper.countPerRun") val metadataTopicArn: String = string("indexed.image.sns.topic.arn") diff --git a/thrall/app/lib/kinesis/MessageProcessor.scala b/thrall/app/lib/kinesis/MessageProcessor.scala index c6e328b62c..490f9ac08b 100644 --- a/thrall/app/lib/kinesis/MessageProcessor.scala +++ b/thrall/app/lib/kinesis/MessageProcessor.scala @@ -2,13 +2,15 @@ package lib.kinesis import com.gu.mediaservice.GridClient import com.gu.mediaservice.lib.auth.Authentication -import com.gu.mediaservice.lib.aws.EsResponse +import com.gu.mediaservice.lib.aws.{EsResponse, ThrallMessageSender, UpdateMessage} import com.gu.mediaservice.lib.elasticsearch.{ElasticNotFoundException, Running} import com.gu.mediaservice.lib.events.UsageEvents import com.gu.mediaservice.lib.logging.{GridLogging, LogMarker, combineMarkers} import com.gu.mediaservice.model.{AddImageLeaseMessage, CreateMigrationIndexMessage, DeleteImageExportsMessage, DeleteImageMessage, DeleteUsagesMessage, ImageMessage, MigrateImageMessage, RemoveImageLeaseMessage, ReplaceImageLeasesMessage, SetImageCollectionsMessage, SoftDeleteImageMessage, ThrallMessage, UnSoftDeleteImageMessage, UpdateImageExportsMessage, UpdateImagePhotoshootMetadataMessage, UpdateImageSyndicationMetadataMessage, UpdateImageUsagesMessage, UpdateImageUserMetadataMessage} import com.gu.mediaservice.model.usage.{Usage, UsageNotice} +import com.gu.mediaservice.syntax.MessageSubjects.Image import instances.{InstanceMessageSender, InstanceStatusMessage} +import org.joda.time.DateTime // import all except `Right`, which otherwise shadows the type used in `Either`s import com.gu.mediaservice.model.{Right => _, _} import com.gu.mediaservice.syntax.MessageSubjects @@ -33,7 +35,8 @@ class MessageProcessor( gridClient: GridClient, auth: Authentication, instanceMessageSender: InstanceMessageSender, - usageEvents: UsageEvents + usageEvents: UsageEvents, + messageSender: ThrallMessageSender ) extends GridLogging with MessageSubjects { def process(updateMessage: ThrallMessage, logMarker: LogMarker)(implicit ec: ExecutionContext): Future[Any] = { @@ -60,6 +63,7 @@ class MessageProcessor( case message: UpdateUsageStatusMessage => updateUsageStatus(message, logMarker) case message: CompleteMigrationMessage => completeMigration(message, logMarker) case message: CreateInstanceMessage => setupNewInstance(message, logMarker) + case message: ReindexImageMessage => reindexImage(message, logMarker) case _ => logger.info(s"Unmatched ThrallMessage type: ${updateMessage.subject}; ignoring") Future.successful(()) @@ -267,4 +271,24 @@ class MessageProcessor( } } + private def reindexImage(message: ReindexImageMessage, logMarker: LogMarker)(implicit ec: ExecutionContext): Future[Boolean] = { + val mediaId = message.id + implicit val instance: Instance = message.instance + logger.info(s"Reindexing from s3 ${instance.id} / $mediaId") + + gridClient.getImageLoaderProjection(mediaId, auth.innerServiceCall).map { maybeImage => + logger.info(s"Projected ${instance.id} / $mediaId to $maybeImage}") + maybeImage.exists { image => + val updateMessage = UpsertFromProjectionMessage(image.id, image, DateTime.now, instance) + logger.info(s"Publishing projected image as a thrall image message: ${updateMessage.id}") + messageSender.publish(updateMessage) + true + } + }.recover { + case _: Throwable => + logger.warn(s"Error while reindexing ${instance.id} / $mediaId - Image has not been reindexed!") + false + } + } + } diff --git a/thrall/app/lib/kinesis/MessageTranslator.scala b/thrall/app/lib/kinesis/MessageTranslator.scala index 0081621c3e..ab3a88d47c 100644 --- a/thrall/app/lib/kinesis/MessageTranslator.scala +++ b/thrall/app/lib/kinesis/MessageTranslator.scala @@ -81,6 +81,10 @@ object MessageTranslator extends GridLogging { case Some(id) => Right(CreateInstanceMessage(id, updateMessage.lastModified, updateMessage.instance)) case _ => Left(MissingFieldsException(updateMessage.subject)) } + case ReindexImage => (updateMessage.id) match { + case Some(id) => Right(ReindexImageMessage(id, updateMessage.lastModified, updateMessage.instance)) + case _ => Left(MissingFieldsException(updateMessage.subject)) + } case _ => Left(ProcessorNotFoundException(updateMessage.subject)) } } diff --git a/thrall/app/lib/kinesis/ThrallEventConsumer.scala b/thrall/app/lib/kinesis/ThrallEventConsumer.scala index fc4f0ddb9e..988eb2a43e 100644 --- a/thrall/app/lib/kinesis/ThrallEventConsumer.scala +++ b/thrall/app/lib/kinesis/ThrallEventConsumer.scala @@ -3,7 +3,7 @@ package lib.kinesis import org.apache.pekko.actor.ActorSystem import com.gu.mediaservice.GridClient import com.gu.mediaservice.lib.auth.Authentication -import com.gu.mediaservice.lib.aws.UpdateMessage +import com.gu.mediaservice.lib.aws.{ThrallMessageSender, UpdateMessage} import com.gu.mediaservice.lib.events.UsageEvents import com.gu.mediaservice.lib.json.{JsonByteArrayUtil, PlayJsonHelpers} import com.gu.mediaservice.lib.logging._ @@ -26,7 +26,8 @@ class ThrallEventConsumer(es: ElasticSearch, gridClient: GridClient, auth: Authentication, instanceMessageSender: InstanceMessageSender, - usageEvents: UsageEvents + usageEvents: UsageEvents, + messageSender: ThrallMessageSender ) extends PlayJsonHelpers with GridLogging { private val attemptTimeout = FiniteDuration(20, SECONDS) @@ -34,7 +35,7 @@ class ThrallEventConsumer(es: ElasticSearch, private val attempts = 2 private val timeout = attemptTimeout * attempts + delay * (attempts - 1) - private val messageProcessor = new MessageProcessor(es, store, metadataEditorNotifications, gridClient, auth, instanceMessageSender, usageEvents) + private val messageProcessor = new MessageProcessor(es, store, metadataEditorNotifications, gridClient, auth, instanceMessageSender, usageEvents, messageSender) private implicit val implicitActorSystem: ActorSystem = actorSystem