stringNotUsedSink = Sink.fromSubscriber(processor);
}
}
diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowPublisherSinkSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowPublisherSinkSpec.scala
index 34c0cc5de8e..fc2d453ed5c 100644
--- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowPublisherSinkSpec.scala
+++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowPublisherSinkSpec.scala
@@ -25,16 +25,16 @@ class FlowPublisherSinkSpec extends StreamSpec {
"work with SubscriberSource" in {
val (sub, pub) =
- JavaFlowSupport.Source.asSubscriber[Int].toMat(JavaFlowSupport.Sink.asPublisher(false))(Keep.both).run()
- Source(1 to 100).to(JavaFlowSupport.Sink.fromSubscriber(sub)).run()
- Await.result(JavaFlowSupport.Source.fromPublisher(pub).limit(1000).runWith(Sink.seq), 3.seconds) should ===(
+ Source.asJavaSubscriber[Int].toMat(Sink.asJavaPublisher(false))(Keep.both).run()
+ Source(1 to 100).to(Sink.fromSubscriber(sub)).run()
+ Await.result(Source.fromPublisher(pub).limit(1000).runWith(Sink.seq), 3.seconds) should ===(
1 to 100)
}
"be able to use Publisher in materialized value transformation" in {
val f = Source(1 to 3).runWith(
- JavaFlowSupport.Sink.asPublisher[Int](false).mapMaterializedValue { p =>
- JavaFlowSupport.Source.fromPublisher(p).runFold(0)(_ + _)
+ Sink.asJavaPublisher[Int](false).mapMaterializedValue { p =>
+ Source.fromPublisher(p).runFold(0)(_ + _)
})
Await.result(f, 3.seconds) should be(6)
diff --git a/stream/src/main/java/org/apache/pekko/stream/javadsl/JavaFlowSupport.java b/stream/src/main/java/org/apache/pekko/stream/javadsl/JavaFlowSupport.java
index 603b216d545..c9db616e144 100644
--- a/stream/src/main/java/org/apache/pekko/stream/javadsl/JavaFlowSupport.java
+++ b/stream/src/main/java/org/apache/pekko/stream/javadsl/JavaFlowSupport.java
@@ -49,7 +49,10 @@ private Source() {
* See also {@code Source.fromPublisher} if wanting to integrate with {@link
* org.reactivestreams.Publisher} instead (which carries the same semantics, however existed
* before RS's inclusion in Java 9).
+ *
+ * @deprecated Use {@code Source.fromPublisher} instead (since 2.0.0).
*/
+ @Deprecated
public static org.apache.pekko.stream.javadsl.Source fromPublisher(
java.util.concurrent.Flow.Publisher publisher) {
return org.apache.pekko.stream.javadsl.Source.fromPublisher(
@@ -63,8 +66,11 @@ public static org.apache.pekko.stream.javadsl.Source fromPublish
* See also {@code Source.asSubscriber} if wanting to integrate with {@link
* org.reactivestreams.Subscriber} instead (which carries the same semantics, however existed
* before RS's inclusion in Java 9).
+ *
+ * @deprecated Use {@code Source.asJavaSubscriber} instead (since 2.0.0).
*/
// #asSubscriber
+ @Deprecated
public static
org.apache.pekko.stream.javadsl.Source>
asSubscriber() {
@@ -180,7 +186,10 @@ private Sink() {
*
* If {@code fanout} is {@code WITHOUT_FANOUT} then the materialized {@code Publisher} will
* only support a single {@code Subscriber} and reject any additional {@code Subscriber}s.
+ *
+ * @deprecated Use {@code Sink.asJavaPublisher} instead (since 2.0.0).
*/
+ @Deprecated
public static
org.apache.pekko.stream.javadsl.Sink> asPublisher(
AsPublisher fanout) {
@@ -188,7 +197,12 @@ org.apache.pekko.stream.javadsl.Sink>
.mapMaterializedValue(JavaFlowAndRsConverters::asJava);
}
- /** Helper to create <> from <>. */
+ /**
+ * Helper to create <> from <>.
+ *
+ * @deprecated Use {@code Sink.fromSubscriber} instead (since 2.0.0).
+ */
+ @Deprecated
public static org.apache.pekko.stream.javadsl.Sink fromSubscriber(
java.util.concurrent.Flow.Subscriber s) {
return org.apache.pekko.stream.javadsl.Sink.fromSubscriber(JavaFlowAndRsConverters.asRs(s));
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
index b280ae3a791..d36625aec79 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
@@ -30,7 +30,7 @@ import pekko.actor.{ ActorRef, ClassicActorSystemProvider, Status }
import pekko.japi.function
import pekko.japi.function.Creator
import pekko.stream._
-import pekko.stream.impl.LinearTraversalBuilder
+import pekko.stream.impl.{ JavaFlowAndRsConverters, LinearTraversalBuilder }
import pekko.stream.scaladsl.SinkToCompletionStage
import pekko.util.ConstantFun.scalaAnyToUnit
@@ -180,6 +180,15 @@ object Sink {
def fromSubscriber[In](subs: Subscriber[In]): Sink[In, NotUsed] =
new Sink(scaladsl.Sink.fromSubscriber(subs))
+ /**
+ * Helper to create [[Sink]] from `java.util.concurrent.Flow.Subscriber`.
+ *
+ * @see pekko.stream.javadsl.JavaFlowSupport.Sink#fromSubscriber
+ * @since 2.0.0
+ */
+ def fromSubscriber[In](subs: java.util.concurrent.Flow.Subscriber[In]): Sink[In, NotUsed] =
+ new Sink(scaladsl.Sink.fromSubscriber(subs))
+
/**
* A `Sink` that immediately cancels its upstream after materialization.
*/
@@ -212,6 +221,22 @@ object Sink {
def asPublisher[T](fanout: AsPublisher): Sink[T, Publisher[T]] =
new Sink(scaladsl.Sink.asPublisher(fanout == AsPublisher.WITH_FANOUT))
+ /**
+ * A `Sink` that materializes into a [[java.util.concurrent.Flow.Publisher]].
+ *
+ * If `fanout` is `true`, the materialized `Publisher` will support multiple `Subscriber`s and
+ * the size of the `inputBuffer` configured for this operator becomes the maximum number of elements that
+ * the fastest [[java.util.concurrent.Flow.Subscriber]] can be ahead of the slowest one before slowing
+ * the processing down due to back pressure.
+ *
+ * If `fanout` is `false` then the materialized `Publisher` will only support a single `Subscriber` and
+ * reject any additional `Subscriber`s.
+ */
+ def asJavaPublisher[T](fanout: AsPublisher): Sink[T, java.util.concurrent.Flow.Publisher[T]] = {
+ import JavaFlowAndRsConverters.Implicits._
+ asPublisher[T](fanout).mapMaterializedValue(_.asJava)
+ }
+
/**
* A `Sink` that materializes this `Sink` itself as a `Source`.
* The returned `Source` is a "live view" onto the `Sink` and only supports a single `Subscriber`.
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index 100eb1d3947..8441313cfae 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -36,7 +36,7 @@ import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
import pekko.japi.{ function, JavaPartialFunction, Pair }
import pekko.japi.function.Creator
import pekko.stream._
-import pekko.stream.impl.{ LinearTraversalBuilder, UnfoldAsyncJava, UnfoldJava }
+import pekko.stream.impl.{ JavaFlowAndRsConverters, LinearTraversalBuilder, UnfoldAsyncJava, UnfoldJava }
import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava }
import pekko.util._
@@ -80,7 +80,7 @@ object Source {
}
/**
- * Helper to create [[Source]] from `Publisher`.
+ * Helper to create [[Source]] from `org.reactivestreams.Publisher`.
*
* Construct a transformation starting with given publisher. The transformation steps
* are executed by a series of [[org.reactivestreams.Processor]] instances
@@ -90,6 +90,15 @@ object Source {
def fromPublisher[O](publisher: Publisher[O]): javadsl.Source[O, NotUsed] =
new Source(scaladsl.Source.fromPublisher(publisher))
+ /**
+ * Helper to create [[Source]] from `java.util.concurrent.Flow.Publisher`.
+ *
+ * @see pekko.stream.javadsl.JavaFlowSupport.Source#fromPublisher
+ * @since 2.0.0
+ */
+ def fromPublisher[O](publisher: java.util.concurrent.Flow.Publisher[O]): javadsl.Source[O, NotUsed] =
+ new Source(scaladsl.Source.fromPublisher(publisher))
+
/**
* Helper to create [[Source]] from `Iterator`.
* Example usage:
@@ -405,6 +414,17 @@ object Source {
def asSubscriber[T](): Source[T, Subscriber[T]] =
new Source(scaladsl.Source.asSubscriber)
+ /**
+ * Creates a `Source` that is materialized as a [[java.util.concurrent.Flow.Subscriber]]
+ *
+ * @see pekko.stream.javadsl.JavaFlowSupport.Source#asSubscriber
+ * @since 2.0.0
+ */
+ def asJavaSubscriber[T](): Source[T, java.util.concurrent.Flow.Subscriber[T]] = {
+ import JavaFlowAndRsConverters.Implicits._
+ asSubscriber[T]().mapMaterializedValue(_.asJava)
+ }
+
/**
* Creates a `Source` that is materialized as an [[pekko.actor.ActorRef]].
* Messages sent to this actor will be emitted to the stream if there is demand from downstream,
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/JavaFlowSupport.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/JavaFlowSupport.scala
index 0d0e01fe8da..783ac3a37c0 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/JavaFlowSupport.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/JavaFlowSupport.scala
@@ -45,6 +45,7 @@ object JavaFlowSupport {
* @see See also [[Source.fromPublisher]] if wanting to integrate with [[org.reactivestreams.Publisher]] instead
* (which carries the same semantics, however existed before RS's inclusion in Java 9).
*/
+ @deprecated("Use pekko.stream.scaladsl.Source.fromPublisher", "2.0.0")
final
// #fromPublisher
def fromPublisher[T](publisher: java.util.concurrent.Flow.Publisher[T]): Source[T, NotUsed] =
@@ -57,6 +58,7 @@ object JavaFlowSupport {
* @see See also [[Source.asSubscriber]] if wanting to integrate with [[org.reactivestreams.Subscriber]] instead
* (which carries the same semantics, however existed before RS's inclusion in Java 9).
*/
+ @deprecated("Use pekko.stream.scaladsl.Source.asJavaSubscriber", "2.0.0")
final
// #asSubscriber
def asSubscriber[T]: Source[T, java.util.concurrent.Flow.Subscriber[T]] =
@@ -94,8 +96,8 @@ object JavaFlowSupport {
*/
def toProcessor[In, Out, Mat](
self: Flow[In, Out, Mat]): RunnableGraph[juc.Flow.Processor[In @uncheckedVariance, Out @uncheckedVariance]] =
- Source.asSubscriber[In].via(self)
- .toMat(Sink.asPublisher[Out](fanout = false))(Keep.both)
+ scaladsl.Source.asJavaSubscriber[In].via(self)
+ .toMat(scaladsl.Sink.asJavaPublisher[Out](fanout = false))(Keep.both)
.mapMaterializedValue {
case (sub, pub) => new juc.Flow.Processor[In, Out] {
override def onError(t: Throwable): Unit = sub.onError(t)
@@ -123,12 +125,14 @@ object JavaFlowSupport {
* If `fanout` is `WITHOUT_FANOUT` then the materialized `Publisher` will only support a single `Subscriber` and
* reject any additional `Subscriber`s.
*/
+ @deprecated("Use pekko.stream.scaladsl.Sink.asJavaPublisher", "2.0.0")
final def asPublisher[T](fanout: Boolean): Sink[T, juc.Flow.Publisher[T]] =
scaladsl.Sink.asPublisher[T](fanout).mapMaterializedValue(_.asJava)
/**
* Helper to create [[Sink]] from [[java.util.concurrent.Flow.Subscriber]].
*/
+ @deprecated("Use pekko.stream.scaladsl.Sink.fromSubscriber", "2.0.0")
final def fromSubscriber[T](s: juc.Flow.Subscriber[T]): Sink[T, NotUsed] =
scaladsl.Sink.fromSubscriber(s.asRs)
}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
index 03f97f94f8d..9ec4e7f2cd4 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
@@ -182,6 +182,17 @@ object Sink {
def fromSubscriber[T](subscriber: Subscriber[T]): Sink[T, NotUsed] =
fromGraph(new SubscriberSink(subscriber, DefaultAttributes.subscriberSink, shape("SubscriberSink")))
+ /**
+ * Helper to create [[Sink]] from `java.util.concurrent.Flow.Subscriber`.
+ *
+ * @see pekko.stream.scaladsl.JavaFlowSupport.Sink#fromSubscriber
+ * @since 2.0.0
+ */
+ def fromSubscriber[T](subscriber: java.util.concurrent.Flow.Subscriber[T]) = {
+ import JavaFlowAndRsConverters.Implicits._
+ fromGraph(new SubscriberSink(subscriber.asRs, DefaultAttributes.subscriberSink, shape("SubscriberSink")))
+ }
+
/**
* A `Sink` that immediately cancels its upstream after materialization.
*/
@@ -312,6 +323,25 @@ object Sink {
if (fanout) new FanoutPublisherSink[T](DefaultAttributes.fanoutPublisherSink, shape("FanoutPublisherSink"))
else new PublisherSink[T](DefaultAttributes.publisherSink, shape("PublisherSink")))
+ /**
+ * A `Sink` that materializes into a [[java.util.concurrent.Flow.Publisher]].
+ *
+ * If `fanout` is `true`, the materialized `Publisher` will support multiple `Subscriber`s and
+ * the size of the `inputBuffer` configured for this operator becomes the maximum number of elements that
+ * the fastest [[java.util.concurrent.Flow.Subscriber]] can be ahead of the slowest one before slowing
+ * the processing down due to back pressure.
+ *
+ * If `fanout` is `false` then the materialized `Publisher` will only support a single `Subscriber` and
+ * reject any additional `Subscriber`s.
+ *
+ * @see pekko.stream.scaladsl.JavaFlowSupport.Sink#asPublisher
+ * @since 2.0.0
+ */
+ def asJavaPublisher[T](fanout: Boolean): Sink[T, java.util.concurrent.Flow.Publisher[T]] = {
+ import JavaFlowAndRsConverters.Implicits._
+ asPublisher[T](fanout).mapMaterializedValue(_.asJava)
+ }
+
/**
* A `Sink` that materializes this `Sink` itself as a `Source`.
* The returned `Source` is a "live view" onto the `Sink` and only supports a single `Subscriber`.
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index 5198444a4ba..6ea61fd6b85 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -276,6 +276,17 @@ object Source {
def fromPublisher[T](publisher: Publisher[T]): Source[T, NotUsed] =
fromGraph(new PublisherSource(publisher, DefaultAttributes.publisherSource, shape("PublisherSource")))
+ /**
+ * Helper to create [[Source]] from `java.util.concurrent.Flow.Publisher`.
+ *
+ * @see pekko.stream.scaladsl.JavaFlowSupport.Source#fromPublisher
+ * @since 2.0.0
+ */
+ def fromPublisher[T](publisher: java.util.concurrent.Flow.Publisher[T]) = {
+ import JavaFlowAndRsConverters.Implicits._
+ fromGraph(new PublisherSource(publisher.asRs, DefaultAttributes.publisherSource, shape("PublisherSource")))
+ }
+
/**
* Helper to create [[Source]] from `Iterator`.
* Example usage: `Source.fromIterator(() => Iterator.from(0))`
@@ -637,6 +648,17 @@ object Source {
def asSubscriber[T]: Source[T, Subscriber[T]] =
fromGraph(new SubscriberSource[T](DefaultAttributes.subscriberSource, shape("SubscriberSource")))
+ /**
+ * Creates a `Source` that is materialized as a [[java.util.concurrent.Flow.Subscriber]]
+ *
+ * @see pekko.stream.scaladsl.JavaFlowSupport.Source#asSubscriber
+ * @since 2.0.0
+ */
+ def asJavaSubscriber[T]: Source[T, java.util.concurrent.Flow.Subscriber[T]] = {
+ import JavaFlowAndRsConverters.Implicits._
+ asSubscriber[T].mapMaterializedValue(_.asJava)
+ }
+
/**
* Creates a `Source` that is materialized as an [[pekko.actor.ActorRef]].
* Messages sent to this actor will be emitted to the stream if there is demand from downstream,