Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 26 additions & 26 deletions src/main/scala/shade/memcached/Codec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,23 @@ import scala.util.control.NonFatal
*/
@implicitNotFound("Could not find any Codec implementation for type ${T}. Please provide one or import shade.memcached.MemcachedCodecs._")
trait Codec[T] {
def serialize(value: T): Array[Byte]
def deserialize(data: Array[Byte]): T
def serialize(value: T, flags: Int): Array[Byte]
def deserialize(data: Array[Byte], flags: Int): T
}

object Codec extends BaseCodecs

trait BaseCodecs {
implicit object IntBinaryCodec extends Codec[Int] {
def serialize(value: Int): Array[Byte] =
def serialize(value: Int, flags: Int): Array[Byte] =
Array(
(value >>> 24).asInstanceOf[Byte],
(value >>> 16).asInstanceOf[Byte],
(value >>> 8).asInstanceOf[Byte],
value.asInstanceOf[Byte]
)

def deserialize(data: Array[Byte]): Int =
def deserialize(data: Array[Byte], flags: Int): Int =
(data(0).asInstanceOf[Int] & 255) << 24 |
(data(1).asInstanceOf[Int] & 255) << 16 |
(data(2).asInstanceOf[Int] & 255) << 8 |
Expand All @@ -49,32 +49,32 @@ trait BaseCodecs {

implicit object DoubleBinaryCodec extends Codec[Double] {
import java.lang.{ Double => JvmDouble }
def serialize(value: Double): Array[Byte] = {
def serialize(value: Double, flags: Int): Array[Byte] = {
val l = JvmDouble.doubleToLongBits(value)
LongBinaryCodec.serialize(l)
LongBinaryCodec.serialize(l, flags)
}

def deserialize(data: Array[Byte]): Double = {
val l = LongBinaryCodec.deserialize(data)
def deserialize(data: Array[Byte], flags: Int): Double = {
val l = LongBinaryCodec.deserialize(data, flags)
JvmDouble.longBitsToDouble(l)
}
}

implicit object FloatBinaryCodec extends Codec[Float] {
import java.lang.{ Float => JvmFloat }
def serialize(value: Float): Array[Byte] = {
def serialize(value: Float, flags: Int): Array[Byte] = {
val i = JvmFloat.floatToIntBits(value)
IntBinaryCodec.serialize(i)
IntBinaryCodec.serialize(i, flags)
}

def deserialize(data: Array[Byte]): Float = {
val i = IntBinaryCodec.deserialize(data)
def deserialize(data: Array[Byte], flags: Int): Float = {
val i = IntBinaryCodec.deserialize(data, flags)
JvmFloat.intBitsToFloat(i)
}
}

implicit object LongBinaryCodec extends Codec[Long] {
def serialize(value: Long): Array[Byte] =
def serialize(value: Long, flags: Int): Array[Byte] =
Array(
(value >>> 56).asInstanceOf[Byte],
(value >>> 48).asInstanceOf[Byte],
Expand All @@ -86,7 +86,7 @@ trait BaseCodecs {
value.asInstanceOf[Byte]
)

def deserialize(data: Array[Byte]): Long =
def deserialize(data: Array[Byte], flags: Int): Long =
(data(0).asInstanceOf[Long] & 255) << 56 |
(data(1).asInstanceOf[Long] & 255) << 48 |
(data(2).asInstanceOf[Long] & 255) << 40 |
Expand All @@ -98,45 +98,45 @@ trait BaseCodecs {
}

implicit object BooleanBinaryCodec extends Codec[Boolean] {
def serialize(value: Boolean): Array[Byte] =
def serialize(value: Boolean, flags: Int): Array[Byte] =
Array((if (value) 1 else 0).asInstanceOf[Byte])

def deserialize(data: Array[Byte]): Boolean =
def deserialize(data: Array[Byte], flags: Int): Boolean =
data.isDefinedAt(0) && data(0) == 1
}

implicit object CharBinaryCodec extends Codec[Char] {
def serialize(value: Char): Array[Byte] = Array(
def serialize(value: Char, flags: Int): Array[Byte] = Array(
(value >>> 8).asInstanceOf[Byte],
value.asInstanceOf[Byte]
)

def deserialize(data: Array[Byte]): Char =
def deserialize(data: Array[Byte], flags: Int): Char =
((data(0).asInstanceOf[Int] & 255) << 8 |
data(1).asInstanceOf[Int] & 255)
.asInstanceOf[Char]
}

implicit object ShortBinaryCodec extends Codec[Short] {
def serialize(value: Short): Array[Byte] = Array(
def serialize(value: Short, flags: Int): Array[Byte] = Array(
(value >>> 8).asInstanceOf[Byte],
value.asInstanceOf[Byte]
)

def deserialize(data: Array[Byte]): Short =
def deserialize(data: Array[Byte], flags: Int): Short =
((data(0).asInstanceOf[Short] & 255) << 8 |
data(1).asInstanceOf[Short] & 255)
.asInstanceOf[Short]
}

implicit object StringBinaryCodec extends Codec[String] {
def serialize(value: String): Array[Byte] = value.getBytes("UTF-8")
def deserialize(data: Array[Byte]): String = new String(data, "UTF-8")
def serialize(value: String, flags: Int): Array[Byte] = value.getBytes("UTF-8")
def deserialize(data: Array[Byte], flags: Int): String = new String(data, "UTF-8")
}

implicit object ArrayByteBinaryCodec extends Codec[Array[Byte]] {
def serialize(value: Array[Byte]): Array[Byte] = value
def deserialize(data: Array[Byte]): Array[Byte] = data
def serialize(value: Array[Byte], flags: Int): Array[Byte] = value
def deserialize(data: Array[Byte], flags: Int): Array[Byte] = data
}
}

Expand All @@ -152,7 +152,7 @@ trait GenericCodec {
case NonFatal(_) => // does nothing
}

def serialize(value: S): Array[Byte] =
def serialize(value: S, flags: Int): Array[Byte] =
using (new ByteArrayOutputStream()) { buf =>
using (new ObjectOutputStream(buf)) { out =>
out.writeObject(value)
Expand All @@ -161,7 +161,7 @@ trait GenericCodec {
}
}

def deserialize(data: Array[Byte]): S =
def deserialize(data: Array[Byte], flags: Int): S =
using (new ByteArrayInputStream(data)) { buf =>
val in = new GenericCodecObjectInputStream(classTag, buf)
using (in) { inp =>
Expand Down
26 changes: 13 additions & 13 deletions src/main/scala/shade/memcached/FakeMemcached.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,47 +21,47 @@ import scala.concurrent.{ ExecutionContext, Future }
class FakeMemcached(context: ExecutionContext) extends Memcached {
private[this] implicit val ec = context

def add[T](key: String, value: T, exp: Duration)(implicit codec: Codec[T]): CancelableFuture[Boolean] =
def add[T](key: String, value: T, flags: Int, exp: Duration)(implicit codec: Codec[T]): CancelableFuture[Boolean] =
value match {
case null =>
CancelableFuture.successful(false)
case _ =>
CancelableFuture.successful(cache.add(key, codec.serialize(value).toSeq, exp))
CancelableFuture.successful(cache.add(key, codec.serialize(value, flags).toSeq, exp))
}

def set[T](key: String, value: T, exp: Duration)(implicit codec: Codec[T]): CancelableFuture[Unit] =
def set[T](key: String, value: T, flags: Int, exp: Duration)(implicit codec: Codec[T]): CancelableFuture[Unit] =
value match {
case null =>
CancelableFuture.successful(())
case _ =>
CancelableFuture.successful(cache.set(key, codec.serialize(value).toSeq, exp))
CancelableFuture.successful(cache.set(key, codec.serialize(value, flags).toSeq, exp))
}

def delete(key: String): CancelableFuture[Boolean] =
CancelableFuture.successful(cache.delete(key))

def get[T](key: String)(implicit codec: Codec[T]): Future[Option[T]] =
Future.successful(cache.get[Seq[Byte]](key)).map(_.map(x => codec.deserialize(x.toArray)))
Future.successful(cache.get[Seq[Byte]](key)).map(_.map(x => codec.deserialize(x.toArray, 0)))

def compareAndSet[T](key: String, expecting: Option[T], newValue: T, exp: Duration)(implicit codec: Codec[T]): Future[Boolean] =
Future.successful(cache.compareAndSet(key, expecting.map(x => codec.serialize(x).toSeq), codec.serialize(newValue).toSeq, exp))
def compareAndSet[T](key: String, expecting: Option[T], newValue: T, flags: Int, exp: Duration)(implicit codec: Codec[T]): Future[Boolean] =
Future.successful(cache.compareAndSet(key, expecting.map(x => codec.serialize(x, flags).toSeq), codec.serialize(newValue, flags).toSeq, exp))

def transformAndGet[T](key: String, exp: Duration)(cb: (Option[T]) => T)(implicit codec: Codec[T]): Future[T] =
Future.successful(cache.transformAndGet[Seq[Byte]](key: String, exp) { current =>
val cValue = current.map(x => codec.deserialize(x.toArray))
val cValue = current.map(x => codec.deserialize(x.toArray, 0))
val update = cb(cValue)
codec.serialize(update).toSeq
codec.serialize(update, 0).toSeq
}) map { update =>
codec.deserialize(update.toArray)
codec.deserialize(update.toArray, 0)
}

def getAndTransform[T](key: String, exp: Duration)(cb: (Option[T]) => T)(implicit codec: Codec[T]): Future[Option[T]] =
Future.successful(cache.getAndTransform[Seq[Byte]](key: String, exp) { current =>
val cValue = current.map(x => codec.deserialize(x.toArray))
val cValue = current.map(x => codec.deserialize(x.toArray, 0))
val update = cb(cValue)
codec.serialize(update).toSeq
codec.serialize(update, 0).toSeq
}) map { update =>
update.map(x => codec.deserialize(x.toArray))
update.map(x => codec.deserialize(x.toArray, 0))
}

def increment(key: String, by: Long, default: Option[Long], exp: Duration): Future[Long] = {
Expand Down
14 changes: 7 additions & 7 deletions src/main/scala/shade/memcached/Memcached.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,20 @@ trait Memcached extends java.io.Closeable {
*
* @return either true, in case the value was set, or false otherwise
*/
def add[T](key: String, value: T, exp: Duration)(implicit codec: Codec[T]): CancelableFuture[Boolean]
def add[T](key: String, value: T, flags: Int, exp: Duration)(implicit codec: Codec[T]): CancelableFuture[Boolean]

def awaitAdd[T](key: String, value: T, exp: Duration)(implicit codec: Codec[T]): Boolean =
Await.result(add(key, value, exp), Duration.Inf)
def awaitAdd[T](key: String, value: T, flags: Int, exp: Duration)(implicit codec: Codec[T]): Boolean =
Await.result(add(key, value, flags, exp), Duration.Inf)

/**
* Sets a (key, value) in the cache store.
*
* The expiry time can be Duration.Inf (infinite duration).
*/
def set[T](key: String, value: T, exp: Duration)(implicit codec: Codec[T]): CancelableFuture[Unit]
def set[T](key: String, value: T, flags: Int, exp: Duration)(implicit codec: Codec[T]): CancelableFuture[Unit]

def awaitSet[T](key: String, value: T, exp: Duration)(implicit codec: Codec[T]) {
Await.result(set(key, value, exp), Duration.Inf)
def awaitSet[T](key: String, value: T, flags: Int, exp: Duration)(implicit codec: Codec[T]) {
Await.result(set(key, value, flags, exp), Duration.Inf)
}

/**
Expand Down Expand Up @@ -71,7 +71,7 @@ trait Memcached extends java.io.Closeable {
* @param exp can be Duration.Inf (infinite) for not setting an expiration
* @return either true (in case the compare-and-set succeeded) or false otherwise
*/
def compareAndSet[T](key: String, expecting: Option[T], newValue: T, exp: Duration)(implicit codec: Codec[T]): Future[Boolean]
def compareAndSet[T](key: String, expecting: Option[T], newValue: T, flags: Int, exp: Duration)(implicit codec: Codec[T]): Future[Boolean]

/**
* Transforms the given key and returns the new value.
Expand Down
54 changes: 26 additions & 28 deletions src/main/scala/shade/memcached/MemcachedImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,14 @@ class MemcachedImpl(config: Configuration, ec: ExecutionContext) extends Memcach
*
* @return either true, in case the value was set, or false otherwise
*/
def add[T](key: String, value: T, exp: Duration)(implicit codec: Codec[T]): CancelableFuture[Boolean] =
def add[T](key: String, value: T, flags: Int, exp: Duration)(implicit codec: Codec[T]): CancelableFuture[Boolean] =
value match {
case null =>
CancelableFuture.successful(false)
case _ =>
instance.realAsyncAdd(withPrefix(key), codec.serialize(value), 0, exp, config.operationTimeout) map {
case SuccessfulResult(givenKey, Some(_)) =>
true
case SuccessfulResult(givenKey, None) =>
false
instance.realAsyncAdd(withPrefix(key), codec.serialize(value, flags), flags, exp, config.operationTimeout) map {
case SuccessfulResult(givenKey, option, _) =>
option.isDefined
case failure: FailedResult =>
throwExceptionOn(failure)
}
Expand All @@ -63,13 +61,13 @@ class MemcachedImpl(config: Configuration, ec: ExecutionContext) extends Memcach
*
* The expiry time can be Duration.Inf (infinite duration).
*/
def set[T](key: String, value: T, exp: Duration)(implicit codec: Codec[T]): CancelableFuture[Unit] =
def set[T](key: String, value: T, flags: Int, exp: Duration)(implicit codec: Codec[T]): CancelableFuture[Unit] =
value match {
case null =>
CancelableFuture.successful(())
case _ =>
instance.realAsyncSet(withPrefix(key), codec.serialize(value), 0, exp, config.operationTimeout) map {
case SuccessfulResult(givenKey, _) =>
instance.realAsyncSet(withPrefix(key), codec.serialize(value, flags), flags, exp, config.operationTimeout) map {
case SuccessfulResult(givenKey, _, _) =>
()
case failure: FailedResult =>
throwExceptionOn(failure)
Expand All @@ -83,7 +81,7 @@ class MemcachedImpl(config: Configuration, ec: ExecutionContext) extends Memcach
*/
def delete(key: String): CancelableFuture[Boolean] =
instance.realAsyncDelete(withPrefix(key), config.operationTimeout) map {
case SuccessfulResult(givenKey, result) =>
case SuccessfulResult(givenKey, result, _) =>
result
case failure: FailedResult =>
throwExceptionOn(failure)
Expand All @@ -96,8 +94,8 @@ class MemcachedImpl(config: Configuration, ec: ExecutionContext) extends Memcach
*/
def get[T](key: String)(implicit codec: Codec[T]): Future[Option[T]] =
instance.realAsyncGet(withPrefix(key), config.operationTimeout) map {
case SuccessfulResult(givenKey, option) =>
option.map(codec.deserialize)
case SuccessfulResult(givenKey, option, flags) =>
option.map(codec.deserialize(_, flags))
case failure: FailedResult =>
throwExceptionOn(failure)
}
Expand All @@ -115,20 +113,20 @@ class MemcachedImpl(config: Configuration, ec: ExecutionContext) extends Memcach
* @param exp can be Duration.Inf (infinite) for not setting an expiration
* @return either true (in case the compare-and-set succeeded) or false otherwise
*/
def compareAndSet[T](key: String, expecting: Option[T], newValue: T, exp: Duration)(implicit codec: Codec[T]): Future[Boolean] =
def compareAndSet[T](key: String, expecting: Option[T], newValue: T, flags: Int, exp: Duration)(implicit codec: Codec[T]): Future[Boolean] =
expecting match {
case None =>
add[T](key, newValue, exp)
add[T](key, newValue, flags, exp)

case Some(expectingValue) =>
instance.realAsyncGets(withPrefix(key), config.operationTimeout) flatMap {
case SuccessfulResult(givenKey, None) =>
case SuccessfulResult(givenKey, None, _) =>
Future.successful(false)

case SuccessfulResult(givenKey, Some((currentData, casID))) =>
if (codec.deserialize(currentData) == expectingValue)
instance.realAsyncCAS(withPrefix(key), casID, 0, codec.serialize(newValue), exp, config.operationTimeout) map {
case SuccessfulResult(_, bool) =>
case SuccessfulResult(givenKey, Some((currentData, casID)), f) =>
if (codec.deserialize(currentData, f) == expectingValue)
instance.realAsyncCAS(withPrefix(key), casID, flags, codec.serialize(newValue, flags), exp, config.operationTimeout) map {
case SuccessfulResult(_, bool, _) =>
bool
case failure: FailedResult =>
throwExceptionOn(failure)
Expand Down Expand Up @@ -163,22 +161,22 @@ class MemcachedImpl(config: Configuration, ec: ExecutionContext) extends Memcach
throw new TimeoutException(key)

instance.realAsyncGets(keyWithPrefix, remainingTime.millis) flatMap {
case SuccessfulResult(_, None) =>
case SuccessfulResult(_, None, flags) =>
val result = cb(None)
add(key, result, exp) flatMap {
add(key, result, flags, exp) flatMap {
case true =>
Future.successful(f(None, result))
case false =>
loop(retry + 1)
}
case SuccessfulResult(_, Some((current, casID))) =>
val currentOpt = Some(codec.deserialize(current))
case SuccessfulResult(_, Some((current, casID)), flags) =>
val currentOpt = Some(codec.deserialize(current, flags))
val result = cb(currentOpt)

instance.realAsyncCAS(keyWithPrefix, casID, 0, codec.serialize(result), exp, remainingTime.millis) flatMap {
case SuccessfulResult(_, true) =>
instance.realAsyncCAS(keyWithPrefix, casID, flags, codec.serialize(result, flags), exp, remainingTime.millis) flatMap {
case SuccessfulResult(_, true, _) =>
Future.successful(f(currentOpt, result))
case SuccessfulResult(_, false) =>
case SuccessfulResult(_, false, _) =>
loop(retry + 1)
case failure: FailedResult =>
throwExceptionOn(failure)
Expand Down Expand Up @@ -258,7 +256,7 @@ class MemcachedImpl(config: Configuration, ec: ExecutionContext) extends Memcach
*/
def increment(key: String, by: Long, default: Option[Long], exp: Duration): Future[Long] =
instance.realAsyncMutate(withPrefix(key), by, Mutator.incr, default, exp, config.operationTimeout) map {
case SuccessfulResult(_, value) =>
case SuccessfulResult(_, value, _) =>
value
case failure: FailedResult =>
throwExceptionOn(failure)
Expand Down Expand Up @@ -287,7 +285,7 @@ class MemcachedImpl(config: Configuration, ec: ExecutionContext) extends Memcach
*/
def decrement(key: String, by: Long, default: Option[Long], exp: Duration): Future[Long] =
instance.realAsyncMutate(withPrefix(key), by, Mutator.decr, default, exp, config.operationTimeout) map {
case SuccessfulResult(_, value) =>
case SuccessfulResult(_, value, _) =>
value
case failure: FailedResult =>
throwExceptionOn(failure)
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/shade/memcached/internals/Result.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@
package shade.memcached.internals

sealed trait Result[+T]
case class SuccessfulResult[+T](key: String, result: T) extends Result[T]
case class SuccessfulResult[+T](key: String, result: T, flags: Int) extends Result[T]
case class FailedResult(key: String, state: Status) extends Result[Nothing]
Loading