From f4357efd6abe4e7975855cd8a34f7f49147e53b6 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Sat, 8 Nov 2025 23:06:04 +0100 Subject: [PATCH 1/9] move some JavaFlowSupport methods to Source and Sink --- .../scaladsl/FlowPublisherSinkSpec.scala | 8 +++---- .../apache/pekko/stream/javadsl/Sink.scala | 9 ++++++++ .../apache/pekko/stream/javadsl/Source.scala | 19 +++++++++++++++- .../stream/scaladsl/JavaFlowSupport.scala | 3 +++ .../apache/pekko/stream/scaladsl/Sink.scala | 11 ++++++++++ .../apache/pekko/stream/scaladsl/Source.scala | 22 +++++++++++++++++++ 6 files changed, 67 insertions(+), 5 deletions(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowPublisherSinkSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowPublisherSinkSpec.scala index 34c0cc5de8e..5d7b015e14a 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowPublisherSinkSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowPublisherSinkSpec.scala @@ -25,16 +25,16 @@ class FlowPublisherSinkSpec extends StreamSpec { "work with SubscriberSource" in { val (sub, pub) = - JavaFlowSupport.Source.asSubscriber[Int].toMat(JavaFlowSupport.Sink.asPublisher(false))(Keep.both).run() - Source(1 to 100).to(JavaFlowSupport.Sink.fromSubscriber(sub)).run() - Await.result(JavaFlowSupport.Source.fromPublisher(pub).limit(1000).runWith(Sink.seq), 3.seconds) should ===( + Source.asSubscriber[Int].toMat(JavaFlowSupport.Sink.asPublisher(false))(Keep.both).run() + Source(1 to 100).to(Sink.fromSubscriber(sub)).run() + Await.result(Source.fromPublisher(pub).limit(1000).runWith(Sink.seq), 3.seconds) should ===( 1 to 100) } "be able to use Publisher in materialized value transformation" in { val f = Source(1 to 3).runWith( JavaFlowSupport.Sink.asPublisher[Int](false).mapMaterializedValue { p => - JavaFlowSupport.Source.fromPublisher(p).runFold(0)(_ + _) + Source.fromPublisher(p).runFold(0)(_ + _) }) Await.result(f, 3.seconds) should be(6) diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala index b280ae3a791..6e49de4f406 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala @@ -180,6 +180,15 @@ object Sink { def fromSubscriber[In](subs: Subscriber[In]): Sink[In, NotUsed] = new Sink(scaladsl.Sink.fromSubscriber(subs)) + /** + * Helper to create [[Sink]] from `java.util.concurrent.Flow.Subscriber`. + * + * @see pekko.stream.javadsl.JavaFlowSupport.Sink#fromSubscriber + * @since 2.0.0 + */ + def fromSubscriber[In](subs: java.util.concurrent.Flow.Subscriber[In]): Sink[In, NotUsed] = + new Sink(scaladsl.Sink.fromSubscriber(subs)) + /** * A `Sink` that immediately cancels its upstream after materialization. */ diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 100eb1d3947..2b4b0c0c1ab 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -80,7 +80,7 @@ object Source { } /** - * Helper to create [[Source]] from `Publisher`. + * Helper to create [[Source]] from `org.reactivestreams.Publisher`. * * Construct a transformation starting with given publisher. The transformation steps * are executed by a series of [[org.reactivestreams.Processor]] instances @@ -90,6 +90,15 @@ object Source { def fromPublisher[O](publisher: Publisher[O]): javadsl.Source[O, NotUsed] = new Source(scaladsl.Source.fromPublisher(publisher)) + /** + * Helper to create [[Source]] from `java.util.concurrent.Flow.Publisher`. + * + * @see pekko.stream.javadsl.JavaFlowSupport.Source#fromPublisher + * @since 2.0.0 + */ + def fromPublisher[O](publisher: java.util.concurrent.Flow.Publisher[O]): javadsl.Source[O, NotUsed] = + new Source(scaladsl.Source.fromPublisher(publisher)) + /** * Helper to create [[Source]] from `Iterator`. * Example usage: @@ -405,6 +414,14 @@ object Source { def asSubscriber[T](): Source[T, Subscriber[T]] = new Source(scaladsl.Source.asSubscriber) + /** + * Creates a `Source` that is materialized as a [[java.util.concurrent.Flow.Subscriber]] + */ + def asSubscriber[T](): Source[T, java.util.concurrent.Flow.Subscriber[T]] = { + import JavaFlowAndRsConverters.Implicits._ + asSubscriber[T]().mapMaterializedValue(_.asJava) + } + /** * Creates a `Source` that is materialized as an [[pekko.actor.ActorRef]]. * Messages sent to this actor will be emitted to the stream if there is demand from downstream, diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/JavaFlowSupport.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/JavaFlowSupport.scala index 0d0e01fe8da..12f1b111036 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/JavaFlowSupport.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/JavaFlowSupport.scala @@ -45,6 +45,7 @@ object JavaFlowSupport { * @see See also [[Source.fromPublisher]] if wanting to integrate with [[org.reactivestreams.Publisher]] instead * (which carries the same semantics, however existed before RS's inclusion in Java 9). */ + @deprecated("Use pekko.stream.[javadsl|scaladsl].Source.fromPublisher", "2.0.0") final // #fromPublisher def fromPublisher[T](publisher: java.util.concurrent.Flow.Publisher[T]): Source[T, NotUsed] = @@ -57,6 +58,7 @@ object JavaFlowSupport { * @see See also [[Source.asSubscriber]] if wanting to integrate with [[org.reactivestreams.Subscriber]] instead * (which carries the same semantics, however existed before RS's inclusion in Java 9). */ + @deprecated("Use pekko.stream.[javadsl|scaladsl].Source.asSubscriber", "2.0.0") final // #asSubscriber def asSubscriber[T]: Source[T, java.util.concurrent.Flow.Subscriber[T]] = @@ -129,6 +131,7 @@ object JavaFlowSupport { /** * Helper to create [[Sink]] from [[java.util.concurrent.Flow.Subscriber]]. */ + @deprecated("Use pekko.stream.[javadsl|scaladsl].Sink.fromSubscriber", "2.0.0") final def fromSubscriber[T](s: juc.Flow.Subscriber[T]): Sink[T, NotUsed] = scaladsl.Sink.fromSubscriber(s.asRs) } diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala index 03f97f94f8d..d25b1387040 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala @@ -182,6 +182,17 @@ object Sink { def fromSubscriber[T](subscriber: Subscriber[T]): Sink[T, NotUsed] = fromGraph(new SubscriberSink(subscriber, DefaultAttributes.subscriberSink, shape("SubscriberSink"))) + /** + * Helper to create [[Sink]] from `java.util.concurrent.Flow.Subscriber`. + * + * @see pekko.stream.scaladsl.JavaFlowSupport.Sink#fromSubscriber + * @since 2.0.0 + */ + def fromSubscriber[T](subscriber: java.util.concurrent.Flow.Subscriber[T]) = { + import JavaFlowAndRsConverters.Implicits._ + fromGraph(new SubscriberSink(subscriber.asRs, DefaultAttributes.subscriberSink, shape("SubscriberSink"))) + } + /** * A `Sink` that immediately cancels its upstream after materialization. */ diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala index 5198444a4ba..877d3904e27 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala @@ -276,6 +276,17 @@ object Source { def fromPublisher[T](publisher: Publisher[T]): Source[T, NotUsed] = fromGraph(new PublisherSource(publisher, DefaultAttributes.publisherSource, shape("PublisherSource"))) + /** + * Helper to create [[Source]] from `java.util.concurrent.Flow.Publisher`. + * + * @see pekko.stream.scaladsl.JavaFlowSupport.Source#fromPublisher + * @since 2.0.0 + */ + def fromPublisher[T](publisher: java.util.concurrent.Flow.Publisher[T]) = { + import JavaFlowAndRsConverters.Implicits._ + fromGraph(new PublisherSource(publisher.asRs, DefaultAttributes.publisherSource, shape("PublisherSource"))) + } + /** * Helper to create [[Source]] from `Iterator`. * Example usage: `Source.fromIterator(() => Iterator.from(0))` @@ -637,6 +648,17 @@ object Source { def asSubscriber[T]: Source[T, Subscriber[T]] = fromGraph(new SubscriberSource[T](DefaultAttributes.subscriberSource, shape("SubscriberSource"))) + /** + * Creates a `Source` that is materialized as a [[java.util.concurrent.Flow.Subscriber]] + * + * @see pekko.stream.scaladsl.JavaFlowSupport.Source#asSubscriber + * @since 2.0.0 + */ + def asSubscriber[T]: Source[T, java.util.concurrent.Flow.Subscriber[T]] = { + import JavaFlowAndRsConverters.Implicits._ + asSubscriber[T].mapMaterializedValue(_.asJava) + } + /** * Creates a `Source` that is materialized as an [[pekko.actor.ActorRef]]. * Messages sent to this actor will be emitted to the stream if there is demand from downstream, From a20db26a5a063679a62d3c9d3c32d09fe3079336 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Sat, 8 Nov 2025 23:40:14 +0100 Subject: [PATCH 2/9] more changes --- .../scaladsl/FlowPublisherSinkSpec.scala | 4 ++-- .../pekko/stream/javadsl/JavaFlowSupport.java | 16 +++++++++++++++- .../apache/pekko/stream/javadsl/Sink.scala | 18 +++++++++++++++++- .../apache/pekko/stream/javadsl/Source.scala | 7 +++++-- .../stream/scaladsl/JavaFlowSupport.scala | 11 ++++++----- .../apache/pekko/stream/scaladsl/Sink.scala | 19 +++++++++++++++++++ .../apache/pekko/stream/scaladsl/Source.scala | 2 +- 7 files changed, 65 insertions(+), 12 deletions(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowPublisherSinkSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowPublisherSinkSpec.scala index 5d7b015e14a..fc2d453ed5c 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowPublisherSinkSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowPublisherSinkSpec.scala @@ -25,7 +25,7 @@ class FlowPublisherSinkSpec extends StreamSpec { "work with SubscriberSource" in { val (sub, pub) = - Source.asSubscriber[Int].toMat(JavaFlowSupport.Sink.asPublisher(false))(Keep.both).run() + Source.asJavaSubscriber[Int].toMat(Sink.asJavaPublisher(false))(Keep.both).run() Source(1 to 100).to(Sink.fromSubscriber(sub)).run() Await.result(Source.fromPublisher(pub).limit(1000).runWith(Sink.seq), 3.seconds) should ===( 1 to 100) @@ -33,7 +33,7 @@ class FlowPublisherSinkSpec extends StreamSpec { "be able to use Publisher in materialized value transformation" in { val f = Source(1 to 3).runWith( - JavaFlowSupport.Sink.asPublisher[Int](false).mapMaterializedValue { p => + Sink.asJavaPublisher[Int](false).mapMaterializedValue { p => Source.fromPublisher(p).runFold(0)(_ + _) }) diff --git a/stream/src/main/java/org/apache/pekko/stream/javadsl/JavaFlowSupport.java b/stream/src/main/java/org/apache/pekko/stream/javadsl/JavaFlowSupport.java index 603b216d545..c9db616e144 100644 --- a/stream/src/main/java/org/apache/pekko/stream/javadsl/JavaFlowSupport.java +++ b/stream/src/main/java/org/apache/pekko/stream/javadsl/JavaFlowSupport.java @@ -49,7 +49,10 @@ private Source() { *

See also {@code Source.fromPublisher} if wanting to integrate with {@link * org.reactivestreams.Publisher} instead (which carries the same semantics, however existed * before RS's inclusion in Java 9). + * + * @deprecated Use {@code Source.fromPublisher} instead (since 2.0.0). */ + @Deprecated public static org.apache.pekko.stream.javadsl.Source fromPublisher( java.util.concurrent.Flow.Publisher publisher) { return org.apache.pekko.stream.javadsl.Source.fromPublisher( @@ -63,8 +66,11 @@ public static org.apache.pekko.stream.javadsl.Source fromPublish *

See also {@code Source.asSubscriber} if wanting to integrate with {@link * org.reactivestreams.Subscriber} instead (which carries the same semantics, however existed * before RS's inclusion in Java 9). + * + * @deprecated Use {@code Source.asJavaSubscriber} instead (since 2.0.0). */ // #asSubscriber + @Deprecated public static org.apache.pekko.stream.javadsl.Source> asSubscriber() { @@ -180,7 +186,10 @@ private Sink() { * *

If {@code fanout} is {@code WITHOUT_FANOUT} then the materialized {@code Publisher} will * only support a single {@code Subscriber} and reject any additional {@code Subscriber}s. + * + * @deprecated Use {@code Sink.asJavaPublisher} instead (since 2.0.0). */ + @Deprecated public static org.apache.pekko.stream.javadsl.Sink> asPublisher( AsPublisher fanout) { @@ -188,7 +197,12 @@ org.apache.pekko.stream.javadsl.Sink> .mapMaterializedValue(JavaFlowAndRsConverters::asJava); } - /** Helper to create <> from <>. */ + /** + * Helper to create <> from <>. + * + * @deprecated Use {@code Sink.fromSubscriber} instead (since 2.0.0). + */ + @Deprecated public static org.apache.pekko.stream.javadsl.Sink fromSubscriber( java.util.concurrent.Flow.Subscriber s) { return org.apache.pekko.stream.javadsl.Sink.fromSubscriber(JavaFlowAndRsConverters.asRs(s)); diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala index 6e49de4f406..d36625aec79 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala @@ -30,7 +30,7 @@ import pekko.actor.{ ActorRef, ClassicActorSystemProvider, Status } import pekko.japi.function import pekko.japi.function.Creator import pekko.stream._ -import pekko.stream.impl.LinearTraversalBuilder +import pekko.stream.impl.{ JavaFlowAndRsConverters, LinearTraversalBuilder } import pekko.stream.scaladsl.SinkToCompletionStage import pekko.util.ConstantFun.scalaAnyToUnit @@ -221,6 +221,22 @@ object Sink { def asPublisher[T](fanout: AsPublisher): Sink[T, Publisher[T]] = new Sink(scaladsl.Sink.asPublisher(fanout == AsPublisher.WITH_FANOUT)) + /** + * A `Sink` that materializes into a [[java.util.concurrent.Flow.Publisher]]. + * + * If `fanout` is `true`, the materialized `Publisher` will support multiple `Subscriber`s and + * the size of the `inputBuffer` configured for this operator becomes the maximum number of elements that + * the fastest [[java.util.concurrent.Flow.Subscriber]] can be ahead of the slowest one before slowing + * the processing down due to back pressure. + * + * If `fanout` is `false` then the materialized `Publisher` will only support a single `Subscriber` and + * reject any additional `Subscriber`s. + */ + def asJavaPublisher[T](fanout: AsPublisher): Sink[T, java.util.concurrent.Flow.Publisher[T]] = { + import JavaFlowAndRsConverters.Implicits._ + asPublisher[T](fanout).mapMaterializedValue(_.asJava) + } + /** * A `Sink` that materializes this `Sink` itself as a `Source`. * The returned `Source` is a "live view" onto the `Sink` and only supports a single `Subscriber`. diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 2b4b0c0c1ab..8441313cfae 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -36,7 +36,7 @@ import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import pekko.japi.{ function, JavaPartialFunction, Pair } import pekko.japi.function.Creator import pekko.stream._ -import pekko.stream.impl.{ LinearTraversalBuilder, UnfoldAsyncJava, UnfoldJava } +import pekko.stream.impl.{ JavaFlowAndRsConverters, LinearTraversalBuilder, UnfoldAsyncJava, UnfoldJava } import pekko.stream.impl.Stages.DefaultAttributes import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava } import pekko.util._ @@ -416,8 +416,11 @@ object Source { /** * Creates a `Source` that is materialized as a [[java.util.concurrent.Flow.Subscriber]] + * + * @see pekko.stream.javadsl.JavaFlowSupport.Source#asSubscriber + * @since 2.0.0 */ - def asSubscriber[T](): Source[T, java.util.concurrent.Flow.Subscriber[T]] = { + def asJavaSubscriber[T](): Source[T, java.util.concurrent.Flow.Subscriber[T]] = { import JavaFlowAndRsConverters.Implicits._ asSubscriber[T]().mapMaterializedValue(_.asJava) } diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/JavaFlowSupport.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/JavaFlowSupport.scala index 12f1b111036..7f7145dd714 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/JavaFlowSupport.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/JavaFlowSupport.scala @@ -45,7 +45,7 @@ object JavaFlowSupport { * @see See also [[Source.fromPublisher]] if wanting to integrate with [[org.reactivestreams.Publisher]] instead * (which carries the same semantics, however existed before RS's inclusion in Java 9). */ - @deprecated("Use pekko.stream.[javadsl|scaladsl].Source.fromPublisher", "2.0.0") + @deprecated("Use pekko.stream.scaladsl.Source.fromPublisher", "2.0.0") final // #fromPublisher def fromPublisher[T](publisher: java.util.concurrent.Flow.Publisher[T]): Source[T, NotUsed] = @@ -58,7 +58,7 @@ object JavaFlowSupport { * @see See also [[Source.asSubscriber]] if wanting to integrate with [[org.reactivestreams.Subscriber]] instead * (which carries the same semantics, however existed before RS's inclusion in Java 9). */ - @deprecated("Use pekko.stream.[javadsl|scaladsl].Source.asSubscriber", "2.0.0") + @deprecated("Use pekko.stream.scaladsl.Source.asSubscriber", "2.0.0") final // #asSubscriber def asSubscriber[T]: Source[T, java.util.concurrent.Flow.Subscriber[T]] = @@ -96,8 +96,8 @@ object JavaFlowSupport { */ def toProcessor[In, Out, Mat]( self: Flow[In, Out, Mat]): RunnableGraph[juc.Flow.Processor[In @uncheckedVariance, Out @uncheckedVariance]] = - Source.asSubscriber[In].via(self) - .toMat(Sink.asPublisher[Out](fanout = false))(Keep.both) + scaladsl.Source.asJavaSubscriber[In].via(self) + .toMat(scaladsl.Sink.asJavaPublisher[Out](fanout = false))(Keep.both) .mapMaterializedValue { case (sub, pub) => new juc.Flow.Processor[In, Out] { override def onError(t: Throwable): Unit = sub.onError(t) @@ -125,13 +125,14 @@ object JavaFlowSupport { * If `fanout` is `WITHOUT_FANOUT` then the materialized `Publisher` will only support a single `Subscriber` and * reject any additional `Subscriber`s. */ + @deprecated("Use pekko.stream.scaladsl.Sink.asJavaPublisher", "2.0.0") final def asPublisher[T](fanout: Boolean): Sink[T, juc.Flow.Publisher[T]] = scaladsl.Sink.asPublisher[T](fanout).mapMaterializedValue(_.asJava) /** * Helper to create [[Sink]] from [[java.util.concurrent.Flow.Subscriber]]. */ - @deprecated("Use pekko.stream.[javadsl|scaladsl].Sink.fromSubscriber", "2.0.0") + @deprecated("Use pekko.stream.scaladsl.Sink.fromSubscriber", "2.0.0") final def fromSubscriber[T](s: juc.Flow.Subscriber[T]): Sink[T, NotUsed] = scaladsl.Sink.fromSubscriber(s.asRs) } diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala index d25b1387040..9ec4e7f2cd4 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala @@ -323,6 +323,25 @@ object Sink { if (fanout) new FanoutPublisherSink[T](DefaultAttributes.fanoutPublisherSink, shape("FanoutPublisherSink")) else new PublisherSink[T](DefaultAttributes.publisherSink, shape("PublisherSink"))) + /** + * A `Sink` that materializes into a [[java.util.concurrent.Flow.Publisher]]. + * + * If `fanout` is `true`, the materialized `Publisher` will support multiple `Subscriber`s and + * the size of the `inputBuffer` configured for this operator becomes the maximum number of elements that + * the fastest [[java.util.concurrent.Flow.Subscriber]] can be ahead of the slowest one before slowing + * the processing down due to back pressure. + * + * If `fanout` is `false` then the materialized `Publisher` will only support a single `Subscriber` and + * reject any additional `Subscriber`s. + * + * @see pekko.stream.scaladsl.JavaFlowSupport.Sink#asPublisher + * @since 2.0.0 + */ + def asJavaPublisher[T](fanout: Boolean): Sink[T, java.util.concurrent.Flow.Publisher[T]] = { + import JavaFlowAndRsConverters.Implicits._ + asPublisher[T](fanout).mapMaterializedValue(_.asJava) + } + /** * A `Sink` that materializes this `Sink` itself as a `Source`. * The returned `Source` is a "live view" onto the `Sink` and only supports a single `Subscriber`. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala index 877d3904e27..6ea61fd6b85 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala @@ -654,7 +654,7 @@ object Source { * @see pekko.stream.scaladsl.JavaFlowSupport.Source#asSubscriber * @since 2.0.0 */ - def asSubscriber[T]: Source[T, java.util.concurrent.Flow.Subscriber[T]] = { + def asJavaSubscriber[T]: Source[T, java.util.concurrent.Flow.Subscriber[T]] = { import JavaFlowAndRsConverters.Implicits._ asSubscriber[T].mapMaterializedValue(_.asJava) } From e67567ebc2b2e9e6184fd30bb54b04f62964f857 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Sun, 9 Nov 2025 00:05:42 +0100 Subject: [PATCH 3/9] update docs --- .../stream/reactive-streams-interop.md | 10 +++------- .../stream/operators/source/AsSubscriber.java | 18 +----------------- .../operators/source/FromPublisher.java | 19 +------------------ .../operators/source/AsSubscriber.scala | 3 +-- .../operators/source/FromPublisher.scala | 3 +-- 5 files changed, 7 insertions(+), 46 deletions(-) diff --git a/docs/src/main/paradox/stream/reactive-streams-interop.md b/docs/src/main/paradox/stream/reactive-streams-interop.md index d952bacf2e1..278f8e4094d 100644 --- a/docs/src/main/paradox/stream/reactive-streams-interop.md +++ b/docs/src/main/paradox/stream/reactive-streams-interop.md @@ -22,14 +22,10 @@ back pressure. Since Java 9 the APIs of Reactive Streams has been included in the Java Standard library, under the `java.util.concurrent.Flow` namespace. For Java 8 there is instead a separate Reactive Streams artifact with the same APIs in the package `org.reactivestreams`. -Pekko streams provides interoperability for both these two API versions, the Reactive Streams interfaces directly through factories on the -regular `Source` and `Sink` APIs. For the Java 9 and later built in interfaces there is a separate set of factories in -@scala[`org.apache.pekko.stream.scaladsl.JavaFlowSupport`]@java[`org.apache.pekko.stream.javadsl.JavaFlowSupport`]. +Pekko streams provides interoperability for both of these API versions directly through factories on the +regular `Source` and `Sink` APIs. -In the following samples the standalone Reactive Stream API factories has been used but each such call can be replaced with the -corresponding method from `JavaFlowSupport` and the JDK @scala[`java.util.concurrent.Flow._`]@java[`java.util.concurrent.Flow.*`] interfaces. - -Note that it is not possible to use `JavaFlowSupport` on Java 8 since the needed interfaces simply is not available in the Java standard library. +In the following samples, the standalone Reactive Stream API factories has been used but the code needed to use the `java.util.concurrent.Flow` equivalents is very similar. The two most important interfaces in Reactive Streams are the `Publisher` and `Subscriber`. diff --git a/docs/src/test/java/jdocs/stream/operators/source/AsSubscriber.java b/docs/src/test/java/jdocs/stream/operators/source/AsSubscriber.java index b2894799105..5115f4169d2 100644 --- a/docs/src/test/java/jdocs/stream/operators/source/AsSubscriber.java +++ b/docs/src/test/java/jdocs/stream/operators/source/AsSubscriber.java @@ -17,26 +17,10 @@ import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Flow.Subscriber; import org.apache.pekko.NotUsed; -import org.apache.pekko.stream.javadsl.JavaFlowSupport; // #imports import org.apache.pekko.stream.javadsl.Source; public interface AsSubscriber { - // We are 'faking' the JavaFlowSupport API here so we can include the signature as a snippet in - // the API, - // because we're not publishing those (jdk9+) classes in our API docs yet. - static class JavaFlowSupport { - public static final class Source { - public - // #api - static org.apache.pekko.stream.javadsl.Source> asSubscriber() - // #api - { - return org.apache.pekko.stream.javadsl.JavaFlowSupport.Source.asSubscriber(); - } - } - } - static class Row { public String getField(String fieldName) { throw new UnsupportedOperationException("Not implemented in sample"); @@ -54,7 +38,7 @@ Publisher fetchRows() { // #example class Example { Source rowSource = - JavaFlowSupport.Source.asSubscriber() + Source.asJavaSubscriber() .mapMaterializedValue( subscriber -> { // For each materialization, fetch the rows from the database: diff --git a/docs/src/test/java/jdocs/stream/operators/source/FromPublisher.java b/docs/src/test/java/jdocs/stream/operators/source/FromPublisher.java index ee4594b66cc..bb4350b26d7 100644 --- a/docs/src/test/java/jdocs/stream/operators/source/FromPublisher.java +++ b/docs/src/test/java/jdocs/stream/operators/source/FromPublisher.java @@ -16,28 +16,11 @@ // #imports import java.util.concurrent.Flow.Publisher; import org.apache.pekko.NotUsed; -import org.apache.pekko.stream.javadsl.JavaFlowSupport; import org.apache.pekko.stream.javadsl.Source; // #imports public interface FromPublisher { - // We are 'faking' the JavaFlowSupport API here so we can include the signature as a snippet in - // the API, - // because we're not publishing those (jdk9+) classes in our API docs yet. - static class JavaFlowSupport { - public static final class Source { - public - // #api - static org.apache.pekko.stream.javadsl.Source fromPublisher( - Publisher publisher) - // #api - { - return org.apache.pekko.stream.javadsl.JavaFlowSupport.Source.fromPublisher(publisher); - } - } - } - static class Row { public String getField(String fieldName) { throw new UnsupportedOperationException("Not implemented in sample"); @@ -58,7 +41,7 @@ public Source names() { // A new subscriber will subscribe to the supplied publisher for each // materialization, so depending on whether the database client supports // this the Source can be materialized more than once. - return JavaFlowSupport.Source.fromPublisher(databaseClient.fetchRows()) + return Source.fromPublisher(databaseClient.fetchRows()) .map(row -> row.getField("name")); } } diff --git a/docs/src/test/scala/docs/stream/operators/source/AsSubscriber.scala b/docs/src/test/scala/docs/stream/operators/source/AsSubscriber.scala index 24b875a5db7..96fb3920891 100644 --- a/docs/src/test/scala/docs/stream/operators/source/AsSubscriber.scala +++ b/docs/src/test/scala/docs/stream/operators/source/AsSubscriber.scala @@ -20,7 +20,6 @@ import java.util.concurrent.Flow.Publisher import org.apache.pekko import pekko.NotUsed import pekko.stream.scaladsl.Source -import pekko.stream.scaladsl.JavaFlowSupport //#imports @@ -35,7 +34,7 @@ object AsSubscriber { // #example val rowSource: Source[Row, NotUsed] = - JavaFlowSupport.Source.asSubscriber + Source.asJavaSubscriber .mapMaterializedValue((subscriber: Subscriber[Row]) => { // For each materialization, fetch the rows from the database: val rows: Publisher[Row] = databaseClient.fetchRows() diff --git a/docs/src/test/scala/docs/stream/operators/source/FromPublisher.scala b/docs/src/test/scala/docs/stream/operators/source/FromPublisher.scala index 401e0e82546..775c3d14a34 100644 --- a/docs/src/test/scala/docs/stream/operators/source/FromPublisher.scala +++ b/docs/src/test/scala/docs/stream/operators/source/FromPublisher.scala @@ -20,7 +20,6 @@ import java.util.concurrent.Flow.Publisher import org.apache.pekko import pekko.NotUsed import pekko.stream.scaladsl.Source -import pekko.stream.scaladsl.JavaFlowSupport //#imports @@ -38,7 +37,7 @@ object FromPublisher { // A new subscriber will subscribe to the supplied publisher for each // materialization, so depending on whether the database client supports // this the Source can be materialized more than once. - JavaFlowSupport.Source.fromPublisher(databaseClient.fetchRows()) + Source.fromPublisher(databaseClient.fetchRows()) .map(row => row.name) // #example } From ea20bd64ef7ff3ec18ea5ef9c320c30ed311f76f Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Sun, 9 Nov 2025 00:13:44 +0100 Subject: [PATCH 4/9] javafmt --- .../test/java/jdocs/stream/operators/source/AsSubscriber.java | 1 - .../test/java/jdocs/stream/operators/source/FromPublisher.java | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/docs/src/test/java/jdocs/stream/operators/source/AsSubscriber.java b/docs/src/test/java/jdocs/stream/operators/source/AsSubscriber.java index 5115f4169d2..0f6607d7aeb 100644 --- a/docs/src/test/java/jdocs/stream/operators/source/AsSubscriber.java +++ b/docs/src/test/java/jdocs/stream/operators/source/AsSubscriber.java @@ -15,7 +15,6 @@ // #imports import java.util.concurrent.Flow.Publisher; -import java.util.concurrent.Flow.Subscriber; import org.apache.pekko.NotUsed; // #imports import org.apache.pekko.stream.javadsl.Source; diff --git a/docs/src/test/java/jdocs/stream/operators/source/FromPublisher.java b/docs/src/test/java/jdocs/stream/operators/source/FromPublisher.java index bb4350b26d7..f986dde843a 100644 --- a/docs/src/test/java/jdocs/stream/operators/source/FromPublisher.java +++ b/docs/src/test/java/jdocs/stream/operators/source/FromPublisher.java @@ -41,8 +41,7 @@ public Source names() { // A new subscriber will subscribe to the supplied publisher for each // materialization, so depending on whether the database client supports // this the Source can be materialized more than once. - return Source.fromPublisher(databaseClient.fetchRows()) - .map(row -> row.getField("name")); + return Source.fromPublisher(databaseClient.fetchRows()).map(row -> row.getField("name")); } } // #example From 77cc75a4c60402147ca15d8bd5c2d325eb31bb00 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Sun, 9 Nov 2025 00:59:17 +0100 Subject: [PATCH 5/9] asjavapublisher --- .../stream/operators/Sink/asJavaPublisher.md | 39 +++++++++++++++++++ .../stream/operators/Sink/asPublisher.md | 5 +-- .../main/paradox/stream/operators/index.md | 2 + 3 files changed, 42 insertions(+), 4 deletions(-) create mode 100644 docs/src/main/paradox/stream/operators/Sink/asJavaPublisher.md diff --git a/docs/src/main/paradox/stream/operators/Sink/asJavaPublisher.md b/docs/src/main/paradox/stream/operators/Sink/asJavaPublisher.md new file mode 100644 index 00000000000..81c1dcf86fa --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Sink/asJavaPublisher.md @@ -0,0 +1,39 @@ +# Sink.asJavaPublisher + +Integration with Java Flow API, materializes into a `java.util.concurrent.Flow.Publisher`. + +@ref[Sink operators](../index.md#sink-operators) + +## Signature + +@apidoc[Sink.asJavaPublisher](Sink$) { scala="#asJavaPublisher[T](fanout:Boolean):org.apache.pekko.stream.scaladsl.Sink[T,java.util.concurrent.Flow.Publisher[T]]" java="#asJavaPublisher(org.apache.pekko.stream.javadsl.asJavaPublisher)" } + + + +## Description + +This method gives you the capability to publish the data from the `Sink` through a Java Flow [Publisher](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/Flow.Publisher.html). +Generally, in Pekko Streams a `Sink` is considered a subscriber, which consumes the data from source. To integrate with other Reactive Stream implementations `Sink.asJavaPublisher` provides a `Publisher` materialized value when run. +Now, the data from this publisher can be consumed by subscribing to it. We can control if we allow more than one downstream subscriber from the single running Pekko stream through the `fanout` parameter. +If you want to support a ReactiveStreams Publisher, there is [Sink.asPublisher](asPublisher.md). + +## Example + +In the example we are using a source and then creating a Publisher. After that, we see that when `fanout` is true multiple subscribers can subscribe to it, +but when it is false only the first subscriber will be able to subscribe and others will be rejected. + +Scala +: @@snip [AsPublisher.scala](/docs/src/test/scala/docs/stream/operators/sink/AsPublisher.scala) { #asPublisher } + +Java +: @@snip [SinkDocExamples.java](/docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java) { #asJavaPublisher } + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** the materialized publisher + +**completes** after the source is consumed and materialized publisher is created + +@@@ diff --git a/docs/src/main/paradox/stream/operators/Sink/asPublisher.md b/docs/src/main/paradox/stream/operators/Sink/asPublisher.md index 49be6139f96..594a9be8198 100644 --- a/docs/src/main/paradox/stream/operators/Sink/asPublisher.md +++ b/docs/src/main/paradox/stream/operators/Sink/asPublisher.md @@ -15,10 +15,7 @@ Integration with Reactive Streams, materializes into a `org.reactivestreams.Publ This method gives you the capability to publish the data from the `Sink` through a Reactive Streams [Publisher](https://www.reactive-streams.org/reactive-streams-1.0.3-javadoc/org/reactivestreams/Publisher.html). Generally, in Pekko Streams a `Sink` is considered a subscriber, which consumes the data from source. To integrate with other Reactive Stream implementations `Sink.asPublisher` provides a `Publisher` materialized value when run. Now, the data from this publisher can be consumed by subscribing to it. We can control if we allow more than one downstream subscriber from the single running Pekko stream through the `fanout` parameter. -In Java 9, the Reactive Stream API was included in the JDK, and `Publisher` is available through [Flow.Publisher](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.Publisher.html). -Since those APIs are identical but exist at different package namespaces and does not depend on the Reactive Streams package a separate publisher sink for those is available -through @scala[`org.apache.pekko.stream.scaladsl.JavaFlowSupport.Sink#asPublisher`]@java[`org.apache.pekko.stream.javadsl.JavaFlowSupport.Sink#asPublisher`]. - +If you want to support [Flow.Publisher](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/Flow.Publisher.html), there is [Sink.asJavaPublisher](asJavaPublisher.md). ## Example diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index a728b8b8aed..db5fb8e4bcd 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -51,6 +51,7 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl | |Operator|Description| |--|--|--| +|Sink|@ref[asPublisher](Sink/asJavaPublisher.md)|Integration with Java Flow, materializes into a `java.util.concurrent.Flow.Publisher`.| |Sink|@ref[asPublisher](Sink/asPublisher.md)|Integration with Reactive Streams, materializes into a `org.reactivestreams.Publisher`.| |Sink|@ref[cancelled](Sink/cancelled.md)|Immediately cancel the stream| |Sink|@ref[collect](Sink/collect.md)|Collect all input elements using a Java @javadoc[Collector](java.util.stream.Collector).| @@ -410,6 +411,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [askWithContext](ActorFlow/askWithContext.md) * [askWithStatus](ActorFlow/askWithStatus.md) * [askWithStatusAndContext](ActorFlow/askWithStatusAndContext.md) +* [asJavaPublisher](Sink/asJavaPublisher.md) * [asOutputStream](StreamConverters/asOutputStream.md) * [asPublisher](Sink/asPublisher.md) * [asSourceWithContext](Source/asSourceWithContext.md) From 07d82132dafbd5c4ba849d42d2f24b7191579050 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Sun, 9 Nov 2025 01:18:34 +0100 Subject: [PATCH 6/9] some doc changes --- .../stream/operators/Sink/asJavaPublisher.md | 2 +- .../stream/operators/Sink/asPublisher.md | 2 +- .../operators/Source/asJavaSubscriber.md | 49 +++++++++++++++++++ .../stream/operators/Source/asSubscriber.md | 20 +++----- .../stream/operators/Source/fromPublisher.md | 2 +- .../main/paradox/stream/operators/index.md | 5 +- 6 files changed, 62 insertions(+), 18 deletions(-) create mode 100644 docs/src/main/paradox/stream/operators/Source/asJavaSubscriber.md diff --git a/docs/src/main/paradox/stream/operators/Sink/asJavaPublisher.md b/docs/src/main/paradox/stream/operators/Sink/asJavaPublisher.md index 81c1dcf86fa..ed941079ad2 100644 --- a/docs/src/main/paradox/stream/operators/Sink/asJavaPublisher.md +++ b/docs/src/main/paradox/stream/operators/Sink/asJavaPublisher.md @@ -15,7 +15,7 @@ Integration with Java Flow API, materializes into a `java.util.concurrent.Flow.P This method gives you the capability to publish the data from the `Sink` through a Java Flow [Publisher](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/Flow.Publisher.html). Generally, in Pekko Streams a `Sink` is considered a subscriber, which consumes the data from source. To integrate with other Reactive Stream implementations `Sink.asJavaPublisher` provides a `Publisher` materialized value when run. Now, the data from this publisher can be consumed by subscribing to it. We can control if we allow more than one downstream subscriber from the single running Pekko stream through the `fanout` parameter. -If you want to support a ReactiveStreams Publisher, there is [Sink.asPublisher](asPublisher.md). +If you want to support a ReactiveStreams Publisher, there is @ref[Sink.asPublisher](asPublisher.md). ## Example diff --git a/docs/src/main/paradox/stream/operators/Sink/asPublisher.md b/docs/src/main/paradox/stream/operators/Sink/asPublisher.md index 594a9be8198..c0b66ace6e5 100644 --- a/docs/src/main/paradox/stream/operators/Sink/asPublisher.md +++ b/docs/src/main/paradox/stream/operators/Sink/asPublisher.md @@ -15,7 +15,7 @@ Integration with Reactive Streams, materializes into a `org.reactivestreams.Publ This method gives you the capability to publish the data from the `Sink` through a Reactive Streams [Publisher](https://www.reactive-streams.org/reactive-streams-1.0.3-javadoc/org/reactivestreams/Publisher.html). Generally, in Pekko Streams a `Sink` is considered a subscriber, which consumes the data from source. To integrate with other Reactive Stream implementations `Sink.asPublisher` provides a `Publisher` materialized value when run. Now, the data from this publisher can be consumed by subscribing to it. We can control if we allow more than one downstream subscriber from the single running Pekko stream through the `fanout` parameter. -If you want to support [Flow.Publisher](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/Flow.Publisher.html), there is [Sink.asJavaPublisher](asJavaPublisher.md). +If you want to support [Flow.Publisher](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/Flow.Publisher.html), there is @ref[Sink.asJavaPublisher](asJavaPublisher.md). ## Example diff --git a/docs/src/main/paradox/stream/operators/Source/asJavaSubscriber.md b/docs/src/main/paradox/stream/operators/Source/asJavaSubscriber.md new file mode 100644 index 00000000000..0a8d1d0a070 --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Source/asJavaSubscriber.md @@ -0,0 +1,49 @@ +# Source.asJavaSubscriber + +Integration with Java Flow API, materializes into a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber). + +@ref[Source operators](../index.md#source-operators) + +## Signature + +Scala +: @@snip[JavaFlowSupport.scala](/stream/src/main/scala/org/apache/pekko/stream/scaladsl/JavaFlowSupport.scala) { #asSubscriber } + +Java +: @@snip[AsSubscriber.java](/docs/src/test/java/jdocs/stream/operators/source/AsSubscriber.java) { #api } + +## Description + +If you want to create a @apidoc[Source] that gets its elements from another library that supports +the Java Flow API, you can use `Source.asJavaSubscriber`. +Each time this @apidoc[Source] is materialized, it produces a materialized value of type +@javadoc[java.util.concurrent.Flow.Subscriber](java.util.concurrent.Flow.Subscriber). +This @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber) can be attached to a +Java Flow @javadoc[Publisher](java.util.concurrent.Flow.Publisher) +to populate it. + +If the API you want to consume elements from provides a @javadoc[Publisher](java.util.concurrent.Flow.Publisher) instead of accepting a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber), see @ref[fromPublisher](fromPublisher.md). + +@@@ note + +Reactive Streams users: we prefer @javadoc[java.util.concurrent.Flow](java.util.concurrent.Flow) but you may still use the [org.reactivestreams](https://github.com/reactive-streams/reactive-streams-jvm#reactive-streams) library with @apidoc[Source.asSubscriber](Source$) { scala="#asSubscriber[T]:org.apache.pekko.stream.scaladsl.Source[T,org.reactivestreams.Subscriber[T]]" java="#asSubscriber()" }. + +@@@ + +## Example + +Suppose we use a database client that supports the Java Flow API, +we could create a @apidoc[Source] that queries the database for its rows. That @apidoc[Source] can then +be used for further processing, for example creating a @apidoc[Source] that contains the names of the +rows. + +Note that since the database is queried for each materialization, the `rowSource` can be safely re-used. +Because both the database driver and Pekko Streams support Java Flow API, +backpressure is applied throughout the stream, preventing us from running out of memory when the database +rows are consumed slower than they are produced by the database. + +Scala +: @@snip [AsSubscriber.scala](/docs/src/test/scala/docs/stream/operators/source/AsSubscriber.scala) { #imports #example } + +Java +: @@snip [AsSubscriber.java](/docs/src/test/java/jdocs/stream/operators/source/AsSubscriber.java) { #imports #example } diff --git a/docs/src/main/paradox/stream/operators/Source/asSubscriber.md b/docs/src/main/paradox/stream/operators/Source/asSubscriber.md index e19db56bd6f..0ff27e4cc79 100644 --- a/docs/src/main/paradox/stream/operators/Source/asSubscriber.md +++ b/docs/src/main/paradox/stream/operators/Source/asSubscriber.md @@ -1,6 +1,6 @@ # Source.asSubscriber -Integration with Reactive Streams, materializes into a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber). +Integration with Reactive Streams, materializes into a @javadoc[Subscriber](org.reactivestreams.Subscriber). @ref[Source operators](../index.md#source-operators) @@ -10,25 +10,19 @@ Scala : @@snip[JavaFlowSupport.scala](/stream/src/main/scala/org/apache/pekko/stream/scaladsl/JavaFlowSupport.scala) { #asSubscriber } Java -: @@snip[JavaFlowSupport.java](/docs/src/test/java/jdocs/stream/operators/source/AsSubscriber.java) { #api } +: @@snip[AsSubscriber.java](/docs/src/test/java/jdocs/stream/operators/source/AsSubscriber.java) { #api } ## Description If you want to create a @apidoc[Source] that gets its elements from another library that supports -[Reactive Streams](https://www.reactive-streams.org/), you can use `JavaFlowSupport.Source.asSubscriber`. +[Reactive Streams](https://www.reactive-streams.org/), you can use `Source.asSubscriber`. Each time this @apidoc[Source] is materialized, it produces a materialized value of type -@javadoc[java.util.concurrent.Flow.Subscriber](java.util.concurrent.Flow.Subscriber). -This @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber) can be attached to a -[Reactive Streams](https://www.reactive-streams.org/) @javadoc[Publisher](java.util.concurrent.Flow.Publisher) +@javadoc[org.reactivestreams.Subscriber](org.reactivestreams.Subscriber). +This @javadoc[Subscriber](org.reactivestreams.Subscriber) can be attached to a +[Reactive Streams](https://www.reactive-streams.org/) @javadoc[Publisher](org.reactivestreams.Publisher) to populate it. -If the API you want to consume elements from provides a @javadoc[Publisher](java.util.concurrent.Flow.Publisher) instead of accepting a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber), see @ref[fromPublisher](fromPublisher.md). - -@@@ note - -Reactive Streams users: we prefer @javadoc[java.util.concurrent.Flow](java.util.concurrent.Flow) you may still use the [org.reactivestreams](https://github.com/reactive-streams/reactive-streams-jvm#reactive-streams) library with @apidoc[Source.asSubscriber](Source$) { scala="#asSubscriber[T]:org.apache.pekko.stream.scaladsl.Source[T,org.reactivestreams.Subscriber[T]]" java="#asSubscriber()" }. - -@@@ +If the API you want to consume elements from provides a @javadoc[Publisher](org.reactivestreams.Publisher) instead of accepting a @javadoc[Subscriber](org.reactivestreams.Subscriber), see @ref[fromPublisher](fromPublisher.md). ## Example diff --git a/docs/src/main/paradox/stream/operators/Source/fromPublisher.md b/docs/src/main/paradox/stream/operators/Source/fromPublisher.md index e41531e134f..df93998a6a1 100644 --- a/docs/src/main/paradox/stream/operators/Source/fromPublisher.md +++ b/docs/src/main/paradox/stream/operators/Source/fromPublisher.md @@ -20,7 +20,7 @@ If you want to create a @apidoc[Source] that gets its elements from another libr This source will produce the elements from the @javadoc[Publisher](java.util.concurrent.Flow.Publisher), and coordinate backpressure as needed. -If the API you want to consume elements from accepts a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber) instead of providing a @javadoc[Publisher](java.util.concurrent.Flow.Publisher), see @ref[asSubscriber](asSubscriber.md). +If the API you want to consume elements from accepts a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber) instead of providing a @javadoc[Publisher](java.util.concurrent.Flow.Publisher), see @ref[asJavaSubscriber](asJavaSubscriber.md). @@@ note diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index db5fb8e4bcd..9c541bcaf1a 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -7,8 +7,9 @@ These built-in sources are available from @scala[`org.apache.pekko.stream.scalad | |Operator|Description| |--|--|--| +|Source|@ref[asJavaSubscriber](Source/asJavaSubscriber.md)|Integration with Java Flow API, materializes into a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber).| |Source|@ref[asSourceWithContext](Source/asSourceWithContext.md)|Extracts context data from the elements of a `Source` so that it can be turned into a `SourceWithContext` which can propagate that context per element along a stream.| -|Source|@ref[asSubscriber](Source/asSubscriber.md)|Integration with Reactive Streams, materializes into a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber).| +|Source|@ref[asSubscriber](Source/asSubscriber.md)|Integration with Reactive Streams, materializes into a @javadoc[Subscriber](org.reactivestreams.Subscriber).| |Source|@ref[combine](Source/combine.md)|Combine several sources, using a given strategy such as merge or concat, into one source.| |Source|@ref[completionStage](Source/completionStage.md)|Send the single value of the `CompletionStage` when it completes and there is demand.| |Source|@ref[completionStageSource](Source/completionStageSource.md)|Streams the elements of an asynchronous source once its given *completion* operator completes.| @@ -51,7 +52,7 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl | |Operator|Description| |--|--|--| -|Sink|@ref[asPublisher](Sink/asJavaPublisher.md)|Integration with Java Flow, materializes into a `java.util.concurrent.Flow.Publisher`.| +|Sink|@ref[asPublisher](Sink/asJavaPublisher.md)|Integration with Java Flow API, materializes into a `java.util.concurrent.Flow.Publisher`.| |Sink|@ref[asPublisher](Sink/asPublisher.md)|Integration with Reactive Streams, materializes into a `org.reactivestreams.Publisher`.| |Sink|@ref[cancelled](Sink/cancelled.md)|Immediately cancel the stream| |Sink|@ref[collect](Sink/collect.md)|Collect all input elements using a Java @javadoc[Collector](java.util.stream.Collector).| From 0efea855050d77130dcc5c05e83957ffc543152d Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Sun, 9 Nov 2025 11:17:45 +0100 Subject: [PATCH 7/9] build issues --- .../main/paradox/stream/operators/Source/fromPublisher.md | 6 ++---- project/VerifyJDK9Classes.scala | 6 +++--- .../tck/IterablePublisherViaJavaFlowPublisherTest.scala | 6 +++--- .../pekko/stream/javadsl/JavaFlowSupportCompileTest.java | 8 ++++---- 4 files changed, 12 insertions(+), 14 deletions(-) diff --git a/docs/src/main/paradox/stream/operators/Source/fromPublisher.md b/docs/src/main/paradox/stream/operators/Source/fromPublisher.md index df93998a6a1..4616aae5658 100644 --- a/docs/src/main/paradox/stream/operators/Source/fromPublisher.md +++ b/docs/src/main/paradox/stream/operators/Source/fromPublisher.md @@ -1,6 +1,6 @@ # Source.fromPublisher -Integration with Reactive Streams, subscribes to a @javadoc[Publisher](java.util.concurrent.Flow.Publisher). +Integration with Reactive Streams, subscribes to a @javadoc[Java Flow Publisher](java.util.concurrent.Flow.Publisher) or a @javadoc[org.reactivestreams Publisher](org.reactivestreams.Publisher). @ref[Source operators](../index.md#source-operators) @@ -15,9 +15,7 @@ Java ## Description -If you want to create a @apidoc[Source] that gets its elements from another library that supports -[Reactive Streams](https://www.reactive-streams.org/), you can use `JavaFlowSupport.Source.fromPublisher`. -This source will produce the elements from the @javadoc[Publisher](java.util.concurrent.Flow.Publisher), +This source will produce the elements from the @javadoc[Java Flow Publisher](java.util.concurrent.Flow.Publisher) or the @javadoc[org.reactivestreams Publisher](org.reactivestreams.Publisher), and coordinate backpressure as needed. If the API you want to consume elements from accepts a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber) instead of providing a @javadoc[Publisher](java.util.concurrent.Flow.Publisher), see @ref[asJavaSubscriber](asJavaSubscriber.md). diff --git a/project/VerifyJDK9Classes.scala b/project/VerifyJDK9Classes.scala index 6de5e62df0e..2e09961f714 100644 --- a/project/VerifyJDK9Classes.scala +++ b/project/VerifyJDK9Classes.scala @@ -63,15 +63,15 @@ object VerifyJDK9Classes { |object VerifyJDK9Classes { | def main(args: Array[String]): Unit = { | import org.apache.pekko.actor.ActorSystem - | import org.apache.pekko.stream.scaladsl.{ JavaFlowSupport, Source } + | import org.apache.pekko.stream.scaladsl.{ Sink, Source } | | import java.lang.System.exit | import scala.concurrent.Await | import scala.concurrent.duration.DurationInt | implicit val system: ActorSystem = ActorSystem.create("test") | val future = Source(1 to 3).runWith( - | JavaFlowSupport.Sink.asPublisher[Int](fanout = false).mapMaterializedValue { p => - | JavaFlowSupport.Source.fromPublisher(p).runFold(0)(_ + _) + | Sink.asJavaPublisher[Int](fanout = false).mapMaterializedValue { p => + | Source.fromPublisher(p).runFold(0)(_ + _) | }) | | val result = Await.result(future, 3.seconds) diff --git a/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/IterablePublisherViaJavaFlowPublisherTest.scala b/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/IterablePublisherViaJavaFlowPublisherTest.scala index b81ca2c16bf..b6bbf02501d 100644 --- a/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/IterablePublisherViaJavaFlowPublisherTest.scala +++ b/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/IterablePublisherViaJavaFlowPublisherTest.scala @@ -17,7 +17,7 @@ import java.util.concurrent.{ Flow => JavaFlow } import org.apache.pekko import pekko.NotUsed -import pekko.stream.scaladsl.{ JavaFlowSupport, Sink, Source } +import pekko.stream.scaladsl.{ Sink, Source } import org.reactivestreams._ @@ -25,10 +25,10 @@ class IterablePublisherViaJavaFlowPublisherTest extends PekkoPublisherVerificati override def createPublisher(elements: Long): Publisher[Int] = { val sourceViaJavaFlowPublisher: JavaFlow.Publisher[Int] = Source(iterable(elements)) - .runWith(JavaFlowSupport.Sink.asPublisher(fanout = false)) + .runWith(Sink.asJavaPublisher(fanout = false)) val javaFlowPublisherIntoPekkoSource: Source[Int, NotUsed] = - JavaFlowSupport.Source.fromPublisher(sourceViaJavaFlowPublisher) + Source.fromPublisher(sourceViaJavaFlowPublisher) javaFlowPublisherIntoPekkoSource .runWith(Sink.asPublisher(false)) // back as RS Publisher diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/JavaFlowSupportCompileTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/JavaFlowSupportCompileTest.java index 9e778914a20..6eb8fd98366 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/JavaFlowSupportCompileTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/JavaFlowSupportCompileTest.java @@ -41,9 +41,9 @@ public void onComplete() {} }; final Source> stringSubscriberSource = - JavaFlowSupport.Source.asSubscriber(); + Source.asJavaSubscriber(); final Source stringNotUsedSource = - JavaFlowSupport.Source.fromPublisher(processor); + Source.fromPublisher(processor); final org.apache.pekko.stream.javadsl.Flow stringStringNotUsedFlow = JavaFlowSupport.Flow.fromProcessor(() -> processor); @@ -51,7 +51,7 @@ public void onComplete() {} JavaFlowSupport.Flow.fromProcessorMat(() -> Pair.apply(processor, NotUsed.getInstance())); final Sink> stringPublisherSink = - JavaFlowSupport.Sink.asPublisher(AsPublisher.WITH_FANOUT); - final Sink stringNotUsedSink = JavaFlowSupport.Sink.fromSubscriber(processor); + Sink.asJavaPublisher(AsPublisher.WITH_FANOUT); + final Sink stringNotUsedSink = Sink.fromSubscriber(processor); } } From 02072f6252716aebaa5d0ffb4b9537c3f5cda8be Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Sun, 9 Nov 2025 11:25:54 +0100 Subject: [PATCH 8/9] Update JavaFlowSupportCompileTest.java --- .../pekko/stream/javadsl/JavaFlowSupportCompileTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/JavaFlowSupportCompileTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/JavaFlowSupportCompileTest.java index 6eb8fd98366..2f625e2f813 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/JavaFlowSupportCompileTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/JavaFlowSupportCompileTest.java @@ -42,8 +42,7 @@ public void onComplete() {} final Source> stringSubscriberSource = Source.asJavaSubscriber(); - final Source stringNotUsedSource = - Source.fromPublisher(processor); + final Source stringNotUsedSource = Source.fromPublisher(processor); final org.apache.pekko.stream.javadsl.Flow stringStringNotUsedFlow = JavaFlowSupport.Flow.fromProcessor(() -> processor); From 63d1cf139eea2ba04386b182b37e7401b9d0562d Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Sun, 9 Nov 2025 12:08:49 +0100 Subject: [PATCH 9/9] Update JavaFlowSupport.scala --- .../org/apache/pekko/stream/scaladsl/JavaFlowSupport.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/JavaFlowSupport.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/JavaFlowSupport.scala index 7f7145dd714..783ac3a37c0 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/JavaFlowSupport.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/JavaFlowSupport.scala @@ -58,7 +58,7 @@ object JavaFlowSupport { * @see See also [[Source.asSubscriber]] if wanting to integrate with [[org.reactivestreams.Subscriber]] instead * (which carries the same semantics, however existed before RS's inclusion in Java 9). */ - @deprecated("Use pekko.stream.scaladsl.Source.asSubscriber", "2.0.0") + @deprecated("Use pekko.stream.scaladsl.Source.asJavaSubscriber", "2.0.0") final // #asSubscriber def asSubscriber[T]: Source[T, java.util.concurrent.Flow.Subscriber[T]] =