diff --git a/docs/src/main/paradox/stream/operators/Sink/asJavaPublisher.md b/docs/src/main/paradox/stream/operators/Sink/asJavaPublisher.md new file mode 100644 index 00000000000..ed941079ad2 --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Sink/asJavaPublisher.md @@ -0,0 +1,39 @@ +# Sink.asJavaPublisher + +Integration with Java Flow API, materializes into a `java.util.concurrent.Flow.Publisher`. + +@ref[Sink operators](../index.md#sink-operators) + +## Signature + +@apidoc[Sink.asJavaPublisher](Sink$) { scala="#asJavaPublisher[T](fanout:Boolean):org.apache.pekko.stream.scaladsl.Sink[T,java.util.concurrent.Flow.Publisher[T]]" java="#asJavaPublisher(org.apache.pekko.stream.javadsl.asJavaPublisher)" } + + + +## Description + +This method gives you the capability to publish the data from the `Sink` through a Java Flow [Publisher](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/Flow.Publisher.html). +Generally, in Pekko Streams a `Sink` is considered a subscriber, which consumes the data from source. To integrate with other Reactive Stream implementations `Sink.asJavaPublisher` provides a `Publisher` materialized value when run. +Now, the data from this publisher can be consumed by subscribing to it. We can control if we allow more than one downstream subscriber from the single running Pekko stream through the `fanout` parameter. +If you want to support a ReactiveStreams Publisher, there is @ref[Sink.asPublisher](asPublisher.md). + +## Example + +In the example we are using a source and then creating a Publisher. After that, we see that when `fanout` is true multiple subscribers can subscribe to it, +but when it is false only the first subscriber will be able to subscribe and others will be rejected. + +Scala +: @@snip [AsPublisher.scala](/docs/src/test/scala/docs/stream/operators/sink/AsPublisher.scala) { #asPublisher } + +Java +: @@snip [SinkDocExamples.java](/docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java) { #asJavaPublisher } + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** the materialized publisher + +**completes** after the source is consumed and materialized publisher is created + +@@@ diff --git a/docs/src/main/paradox/stream/operators/Sink/asPublisher.md b/docs/src/main/paradox/stream/operators/Sink/asPublisher.md index 49be6139f96..c0b66ace6e5 100644 --- a/docs/src/main/paradox/stream/operators/Sink/asPublisher.md +++ b/docs/src/main/paradox/stream/operators/Sink/asPublisher.md @@ -15,10 +15,7 @@ Integration with Reactive Streams, materializes into a `org.reactivestreams.Publ This method gives you the capability to publish the data from the `Sink` through a Reactive Streams [Publisher](https://www.reactive-streams.org/reactive-streams-1.0.3-javadoc/org/reactivestreams/Publisher.html). Generally, in Pekko Streams a `Sink` is considered a subscriber, which consumes the data from source. To integrate with other Reactive Stream implementations `Sink.asPublisher` provides a `Publisher` materialized value when run. Now, the data from this publisher can be consumed by subscribing to it. We can control if we allow more than one downstream subscriber from the single running Pekko stream through the `fanout` parameter. -In Java 9, the Reactive Stream API was included in the JDK, and `Publisher` is available through [Flow.Publisher](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.Publisher.html). -Since those APIs are identical but exist at different package namespaces and does not depend on the Reactive Streams package a separate publisher sink for those is available -through @scala[`org.apache.pekko.stream.scaladsl.JavaFlowSupport.Sink#asPublisher`]@java[`org.apache.pekko.stream.javadsl.JavaFlowSupport.Sink#asPublisher`]. - +If you want to support [Flow.Publisher](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/Flow.Publisher.html), there is @ref[Sink.asJavaPublisher](asJavaPublisher.md). ## Example diff --git a/docs/src/main/paradox/stream/operators/Source/asJavaSubscriber.md b/docs/src/main/paradox/stream/operators/Source/asJavaSubscriber.md new file mode 100644 index 00000000000..0a8d1d0a070 --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Source/asJavaSubscriber.md @@ -0,0 +1,49 @@ +# Source.asJavaSubscriber + +Integration with Java Flow API, materializes into a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber). + +@ref[Source operators](../index.md#source-operators) + +## Signature + +Scala +: @@snip[JavaFlowSupport.scala](/stream/src/main/scala/org/apache/pekko/stream/scaladsl/JavaFlowSupport.scala) { #asSubscriber } + +Java +: @@snip[AsSubscriber.java](/docs/src/test/java/jdocs/stream/operators/source/AsSubscriber.java) { #api } + +## Description + +If you want to create a @apidoc[Source] that gets its elements from another library that supports +the Java Flow API, you can use `Source.asJavaSubscriber`. +Each time this @apidoc[Source] is materialized, it produces a materialized value of type +@javadoc[java.util.concurrent.Flow.Subscriber](java.util.concurrent.Flow.Subscriber). +This @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber) can be attached to a +Java Flow @javadoc[Publisher](java.util.concurrent.Flow.Publisher) +to populate it. + +If the API you want to consume elements from provides a @javadoc[Publisher](java.util.concurrent.Flow.Publisher) instead of accepting a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber), see @ref[fromPublisher](fromPublisher.md). + +@@@ note + +Reactive Streams users: we prefer @javadoc[java.util.concurrent.Flow](java.util.concurrent.Flow) but you may still use the [org.reactivestreams](https://github.com/reactive-streams/reactive-streams-jvm#reactive-streams) library with @apidoc[Source.asSubscriber](Source$) { scala="#asSubscriber[T]:org.apache.pekko.stream.scaladsl.Source[T,org.reactivestreams.Subscriber[T]]" java="#asSubscriber()" }. + +@@@ + +## Example + +Suppose we use a database client that supports the Java Flow API, +we could create a @apidoc[Source] that queries the database for its rows. That @apidoc[Source] can then +be used for further processing, for example creating a @apidoc[Source] that contains the names of the +rows. + +Note that since the database is queried for each materialization, the `rowSource` can be safely re-used. +Because both the database driver and Pekko Streams support Java Flow API, +backpressure is applied throughout the stream, preventing us from running out of memory when the database +rows are consumed slower than they are produced by the database. + +Scala +: @@snip [AsSubscriber.scala](/docs/src/test/scala/docs/stream/operators/source/AsSubscriber.scala) { #imports #example } + +Java +: @@snip [AsSubscriber.java](/docs/src/test/java/jdocs/stream/operators/source/AsSubscriber.java) { #imports #example } diff --git a/docs/src/main/paradox/stream/operators/Source/asSubscriber.md b/docs/src/main/paradox/stream/operators/Source/asSubscriber.md index e19db56bd6f..0ff27e4cc79 100644 --- a/docs/src/main/paradox/stream/operators/Source/asSubscriber.md +++ b/docs/src/main/paradox/stream/operators/Source/asSubscriber.md @@ -1,6 +1,6 @@ # Source.asSubscriber -Integration with Reactive Streams, materializes into a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber). +Integration with Reactive Streams, materializes into a @javadoc[Subscriber](org.reactivestreams.Subscriber). @ref[Source operators](../index.md#source-operators) @@ -10,25 +10,19 @@ Scala : @@snip[JavaFlowSupport.scala](/stream/src/main/scala/org/apache/pekko/stream/scaladsl/JavaFlowSupport.scala) { #asSubscriber } Java -: @@snip[JavaFlowSupport.java](/docs/src/test/java/jdocs/stream/operators/source/AsSubscriber.java) { #api } +: @@snip[AsSubscriber.java](/docs/src/test/java/jdocs/stream/operators/source/AsSubscriber.java) { #api } ## Description If you want to create a @apidoc[Source] that gets its elements from another library that supports -[Reactive Streams](https://www.reactive-streams.org/), you can use `JavaFlowSupport.Source.asSubscriber`. +[Reactive Streams](https://www.reactive-streams.org/), you can use `Source.asSubscriber`. Each time this @apidoc[Source] is materialized, it produces a materialized value of type -@javadoc[java.util.concurrent.Flow.Subscriber](java.util.concurrent.Flow.Subscriber). -This @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber) can be attached to a -[Reactive Streams](https://www.reactive-streams.org/) @javadoc[Publisher](java.util.concurrent.Flow.Publisher) +@javadoc[org.reactivestreams.Subscriber](org.reactivestreams.Subscriber). +This @javadoc[Subscriber](org.reactivestreams.Subscriber) can be attached to a +[Reactive Streams](https://www.reactive-streams.org/) @javadoc[Publisher](org.reactivestreams.Publisher) to populate it. -If the API you want to consume elements from provides a @javadoc[Publisher](java.util.concurrent.Flow.Publisher) instead of accepting a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber), see @ref[fromPublisher](fromPublisher.md). - -@@@ note - -Reactive Streams users: we prefer @javadoc[java.util.concurrent.Flow](java.util.concurrent.Flow) you may still use the [org.reactivestreams](https://github.com/reactive-streams/reactive-streams-jvm#reactive-streams) library with @apidoc[Source.asSubscriber](Source$) { scala="#asSubscriber[T]:org.apache.pekko.stream.scaladsl.Source[T,org.reactivestreams.Subscriber[T]]" java="#asSubscriber()" }. - -@@@ +If the API you want to consume elements from provides a @javadoc[Publisher](org.reactivestreams.Publisher) instead of accepting a @javadoc[Subscriber](org.reactivestreams.Subscriber), see @ref[fromPublisher](fromPublisher.md). ## Example diff --git a/docs/src/main/paradox/stream/operators/Source/fromPublisher.md b/docs/src/main/paradox/stream/operators/Source/fromPublisher.md index e41531e134f..4616aae5658 100644 --- a/docs/src/main/paradox/stream/operators/Source/fromPublisher.md +++ b/docs/src/main/paradox/stream/operators/Source/fromPublisher.md @@ -1,6 +1,6 @@ # Source.fromPublisher -Integration with Reactive Streams, subscribes to a @javadoc[Publisher](java.util.concurrent.Flow.Publisher). +Integration with Reactive Streams, subscribes to a @javadoc[Java Flow Publisher](java.util.concurrent.Flow.Publisher) or a @javadoc[org.reactivestreams Publisher](org.reactivestreams.Publisher). @ref[Source operators](../index.md#source-operators) @@ -15,12 +15,10 @@ Java ## Description -If you want to create a @apidoc[Source] that gets its elements from another library that supports -[Reactive Streams](https://www.reactive-streams.org/), you can use `JavaFlowSupport.Source.fromPublisher`. -This source will produce the elements from the @javadoc[Publisher](java.util.concurrent.Flow.Publisher), +This source will produce the elements from the @javadoc[Java Flow Publisher](java.util.concurrent.Flow.Publisher) or the @javadoc[org.reactivestreams Publisher](org.reactivestreams.Publisher), and coordinate backpressure as needed. -If the API you want to consume elements from accepts a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber) instead of providing a @javadoc[Publisher](java.util.concurrent.Flow.Publisher), see @ref[asSubscriber](asSubscriber.md). +If the API you want to consume elements from accepts a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber) instead of providing a @javadoc[Publisher](java.util.concurrent.Flow.Publisher), see @ref[asJavaSubscriber](asJavaSubscriber.md). @@@ note diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index a728b8b8aed..9c541bcaf1a 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -7,8 +7,9 @@ These built-in sources are available from @scala[`org.apache.pekko.stream.scalad | |Operator|Description| |--|--|--| +|Source|@ref[asJavaSubscriber](Source/asJavaSubscriber.md)|Integration with Java Flow API, materializes into a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber).| |Source|@ref[asSourceWithContext](Source/asSourceWithContext.md)|Extracts context data from the elements of a `Source` so that it can be turned into a `SourceWithContext` which can propagate that context per element along a stream.| -|Source|@ref[asSubscriber](Source/asSubscriber.md)|Integration with Reactive Streams, materializes into a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber).| +|Source|@ref[asSubscriber](Source/asSubscriber.md)|Integration with Reactive Streams, materializes into a @javadoc[Subscriber](org.reactivestreams.Subscriber).| |Source|@ref[combine](Source/combine.md)|Combine several sources, using a given strategy such as merge or concat, into one source.| |Source|@ref[completionStage](Source/completionStage.md)|Send the single value of the `CompletionStage` when it completes and there is demand.| |Source|@ref[completionStageSource](Source/completionStageSource.md)|Streams the elements of an asynchronous source once its given *completion* operator completes.| @@ -51,6 +52,7 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl | |Operator|Description| |--|--|--| +|Sink|@ref[asPublisher](Sink/asJavaPublisher.md)|Integration with Java Flow API, materializes into a `java.util.concurrent.Flow.Publisher`.| |Sink|@ref[asPublisher](Sink/asPublisher.md)|Integration with Reactive Streams, materializes into a `org.reactivestreams.Publisher`.| |Sink|@ref[cancelled](Sink/cancelled.md)|Immediately cancel the stream| |Sink|@ref[collect](Sink/collect.md)|Collect all input elements using a Java @javadoc[Collector](java.util.stream.Collector).| @@ -410,6 +412,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [askWithContext](ActorFlow/askWithContext.md) * [askWithStatus](ActorFlow/askWithStatus.md) * [askWithStatusAndContext](ActorFlow/askWithStatusAndContext.md) +* [asJavaPublisher](Sink/asJavaPublisher.md) * [asOutputStream](StreamConverters/asOutputStream.md) * [asPublisher](Sink/asPublisher.md) * [asSourceWithContext](Source/asSourceWithContext.md) diff --git a/docs/src/main/paradox/stream/reactive-streams-interop.md b/docs/src/main/paradox/stream/reactive-streams-interop.md index d952bacf2e1..278f8e4094d 100644 --- a/docs/src/main/paradox/stream/reactive-streams-interop.md +++ b/docs/src/main/paradox/stream/reactive-streams-interop.md @@ -22,14 +22,10 @@ back pressure. Since Java 9 the APIs of Reactive Streams has been included in the Java Standard library, under the `java.util.concurrent.Flow` namespace. For Java 8 there is instead a separate Reactive Streams artifact with the same APIs in the package `org.reactivestreams`. -Pekko streams provides interoperability for both these two API versions, the Reactive Streams interfaces directly through factories on the -regular `Source` and `Sink` APIs. For the Java 9 and later built in interfaces there is a separate set of factories in -@scala[`org.apache.pekko.stream.scaladsl.JavaFlowSupport`]@java[`org.apache.pekko.stream.javadsl.JavaFlowSupport`]. +Pekko streams provides interoperability for both of these API versions directly through factories on the +regular `Source` and `Sink` APIs. -In the following samples the standalone Reactive Stream API factories has been used but each such call can be replaced with the -corresponding method from `JavaFlowSupport` and the JDK @scala[`java.util.concurrent.Flow._`]@java[`java.util.concurrent.Flow.*`] interfaces. - -Note that it is not possible to use `JavaFlowSupport` on Java 8 since the needed interfaces simply is not available in the Java standard library. +In the following samples, the standalone Reactive Stream API factories has been used but the code needed to use the `java.util.concurrent.Flow` equivalents is very similar. The two most important interfaces in Reactive Streams are the `Publisher` and `Subscriber`. diff --git a/docs/src/test/java/jdocs/stream/operators/source/AsSubscriber.java b/docs/src/test/java/jdocs/stream/operators/source/AsSubscriber.java index b2894799105..0f6607d7aeb 100644 --- a/docs/src/test/java/jdocs/stream/operators/source/AsSubscriber.java +++ b/docs/src/test/java/jdocs/stream/operators/source/AsSubscriber.java @@ -15,28 +15,11 @@ // #imports import java.util.concurrent.Flow.Publisher; -import java.util.concurrent.Flow.Subscriber; import org.apache.pekko.NotUsed; -import org.apache.pekko.stream.javadsl.JavaFlowSupport; // #imports import org.apache.pekko.stream.javadsl.Source; public interface AsSubscriber { - // We are 'faking' the JavaFlowSupport API here so we can include the signature as a snippet in - // the API, - // because we're not publishing those (jdk9+) classes in our API docs yet. - static class JavaFlowSupport { - public static final class Source { - public - // #api - static org.apache.pekko.stream.javadsl.Source> asSubscriber() - // #api - { - return org.apache.pekko.stream.javadsl.JavaFlowSupport.Source.asSubscriber(); - } - } - } - static class Row { public String getField(String fieldName) { throw new UnsupportedOperationException("Not implemented in sample"); @@ -54,7 +37,7 @@ Publisher fetchRows() { // #example class Example { Source rowSource = - JavaFlowSupport.Source.asSubscriber() + Source.asJavaSubscriber() .mapMaterializedValue( subscriber -> { // For each materialization, fetch the rows from the database: diff --git a/docs/src/test/java/jdocs/stream/operators/source/FromPublisher.java b/docs/src/test/java/jdocs/stream/operators/source/FromPublisher.java index ee4594b66cc..f986dde843a 100644 --- a/docs/src/test/java/jdocs/stream/operators/source/FromPublisher.java +++ b/docs/src/test/java/jdocs/stream/operators/source/FromPublisher.java @@ -16,28 +16,11 @@ // #imports import java.util.concurrent.Flow.Publisher; import org.apache.pekko.NotUsed; -import org.apache.pekko.stream.javadsl.JavaFlowSupport; import org.apache.pekko.stream.javadsl.Source; // #imports public interface FromPublisher { - // We are 'faking' the JavaFlowSupport API here so we can include the signature as a snippet in - // the API, - // because we're not publishing those (jdk9+) classes in our API docs yet. - static class JavaFlowSupport { - public static final class Source { - public - // #api - static org.apache.pekko.stream.javadsl.Source fromPublisher( - Publisher publisher) - // #api - { - return org.apache.pekko.stream.javadsl.JavaFlowSupport.Source.fromPublisher(publisher); - } - } - } - static class Row { public String getField(String fieldName) { throw new UnsupportedOperationException("Not implemented in sample"); @@ -58,8 +41,7 @@ public Source names() { // A new subscriber will subscribe to the supplied publisher for each // materialization, so depending on whether the database client supports // this the Source can be materialized more than once. - return JavaFlowSupport.Source.fromPublisher(databaseClient.fetchRows()) - .map(row -> row.getField("name")); + return Source.fromPublisher(databaseClient.fetchRows()).map(row -> row.getField("name")); } } // #example diff --git a/docs/src/test/scala/docs/stream/operators/source/AsSubscriber.scala b/docs/src/test/scala/docs/stream/operators/source/AsSubscriber.scala index 24b875a5db7..96fb3920891 100644 --- a/docs/src/test/scala/docs/stream/operators/source/AsSubscriber.scala +++ b/docs/src/test/scala/docs/stream/operators/source/AsSubscriber.scala @@ -20,7 +20,6 @@ import java.util.concurrent.Flow.Publisher import org.apache.pekko import pekko.NotUsed import pekko.stream.scaladsl.Source -import pekko.stream.scaladsl.JavaFlowSupport //#imports @@ -35,7 +34,7 @@ object AsSubscriber { // #example val rowSource: Source[Row, NotUsed] = - JavaFlowSupport.Source.asSubscriber + Source.asJavaSubscriber .mapMaterializedValue((subscriber: Subscriber[Row]) => { // For each materialization, fetch the rows from the database: val rows: Publisher[Row] = databaseClient.fetchRows() diff --git a/docs/src/test/scala/docs/stream/operators/source/FromPublisher.scala b/docs/src/test/scala/docs/stream/operators/source/FromPublisher.scala index 401e0e82546..775c3d14a34 100644 --- a/docs/src/test/scala/docs/stream/operators/source/FromPublisher.scala +++ b/docs/src/test/scala/docs/stream/operators/source/FromPublisher.scala @@ -20,7 +20,6 @@ import java.util.concurrent.Flow.Publisher import org.apache.pekko import pekko.NotUsed import pekko.stream.scaladsl.Source -import pekko.stream.scaladsl.JavaFlowSupport //#imports @@ -38,7 +37,7 @@ object FromPublisher { // A new subscriber will subscribe to the supplied publisher for each // materialization, so depending on whether the database client supports // this the Source can be materialized more than once. - JavaFlowSupport.Source.fromPublisher(databaseClient.fetchRows()) + Source.fromPublisher(databaseClient.fetchRows()) .map(row => row.name) // #example } diff --git a/project/VerifyJDK9Classes.scala b/project/VerifyJDK9Classes.scala index 6de5e62df0e..2e09961f714 100644 --- a/project/VerifyJDK9Classes.scala +++ b/project/VerifyJDK9Classes.scala @@ -63,15 +63,15 @@ object VerifyJDK9Classes { |object VerifyJDK9Classes { | def main(args: Array[String]): Unit = { | import org.apache.pekko.actor.ActorSystem - | import org.apache.pekko.stream.scaladsl.{ JavaFlowSupport, Source } + | import org.apache.pekko.stream.scaladsl.{ Sink, Source } | | import java.lang.System.exit | import scala.concurrent.Await | import scala.concurrent.duration.DurationInt | implicit val system: ActorSystem = ActorSystem.create("test") | val future = Source(1 to 3).runWith( - | JavaFlowSupport.Sink.asPublisher[Int](fanout = false).mapMaterializedValue { p => - | JavaFlowSupport.Source.fromPublisher(p).runFold(0)(_ + _) + | Sink.asJavaPublisher[Int](fanout = false).mapMaterializedValue { p => + | Source.fromPublisher(p).runFold(0)(_ + _) | }) | | val result = Await.result(future, 3.seconds) diff --git a/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/IterablePublisherViaJavaFlowPublisherTest.scala b/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/IterablePublisherViaJavaFlowPublisherTest.scala index b81ca2c16bf..b6bbf02501d 100644 --- a/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/IterablePublisherViaJavaFlowPublisherTest.scala +++ b/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/IterablePublisherViaJavaFlowPublisherTest.scala @@ -17,7 +17,7 @@ import java.util.concurrent.{ Flow => JavaFlow } import org.apache.pekko import pekko.NotUsed -import pekko.stream.scaladsl.{ JavaFlowSupport, Sink, Source } +import pekko.stream.scaladsl.{ Sink, Source } import org.reactivestreams._ @@ -25,10 +25,10 @@ class IterablePublisherViaJavaFlowPublisherTest extends PekkoPublisherVerificati override def createPublisher(elements: Long): Publisher[Int] = { val sourceViaJavaFlowPublisher: JavaFlow.Publisher[Int] = Source(iterable(elements)) - .runWith(JavaFlowSupport.Sink.asPublisher(fanout = false)) + .runWith(Sink.asJavaPublisher(fanout = false)) val javaFlowPublisherIntoPekkoSource: Source[Int, NotUsed] = - JavaFlowSupport.Source.fromPublisher(sourceViaJavaFlowPublisher) + Source.fromPublisher(sourceViaJavaFlowPublisher) javaFlowPublisherIntoPekkoSource .runWith(Sink.asPublisher(false)) // back as RS Publisher diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/JavaFlowSupportCompileTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/JavaFlowSupportCompileTest.java index 9e778914a20..2f625e2f813 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/JavaFlowSupportCompileTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/JavaFlowSupportCompileTest.java @@ -41,9 +41,8 @@ public void onComplete() {} }; final Source> stringSubscriberSource = - JavaFlowSupport.Source.asSubscriber(); - final Source stringNotUsedSource = - JavaFlowSupport.Source.fromPublisher(processor); + Source.asJavaSubscriber(); + final Source stringNotUsedSource = Source.fromPublisher(processor); final org.apache.pekko.stream.javadsl.Flow stringStringNotUsedFlow = JavaFlowSupport.Flow.fromProcessor(() -> processor); @@ -51,7 +50,7 @@ public void onComplete() {} JavaFlowSupport.Flow.fromProcessorMat(() -> Pair.apply(processor, NotUsed.getInstance())); final Sink> stringPublisherSink = - JavaFlowSupport.Sink.asPublisher(AsPublisher.WITH_FANOUT); - final Sink stringNotUsedSink = JavaFlowSupport.Sink.fromSubscriber(processor); + Sink.asJavaPublisher(AsPublisher.WITH_FANOUT); + final Sink stringNotUsedSink = Sink.fromSubscriber(processor); } } diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowPublisherSinkSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowPublisherSinkSpec.scala index 34c0cc5de8e..fc2d453ed5c 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowPublisherSinkSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowPublisherSinkSpec.scala @@ -25,16 +25,16 @@ class FlowPublisherSinkSpec extends StreamSpec { "work with SubscriberSource" in { val (sub, pub) = - JavaFlowSupport.Source.asSubscriber[Int].toMat(JavaFlowSupport.Sink.asPublisher(false))(Keep.both).run() - Source(1 to 100).to(JavaFlowSupport.Sink.fromSubscriber(sub)).run() - Await.result(JavaFlowSupport.Source.fromPublisher(pub).limit(1000).runWith(Sink.seq), 3.seconds) should ===( + Source.asJavaSubscriber[Int].toMat(Sink.asJavaPublisher(false))(Keep.both).run() + Source(1 to 100).to(Sink.fromSubscriber(sub)).run() + Await.result(Source.fromPublisher(pub).limit(1000).runWith(Sink.seq), 3.seconds) should ===( 1 to 100) } "be able to use Publisher in materialized value transformation" in { val f = Source(1 to 3).runWith( - JavaFlowSupport.Sink.asPublisher[Int](false).mapMaterializedValue { p => - JavaFlowSupport.Source.fromPublisher(p).runFold(0)(_ + _) + Sink.asJavaPublisher[Int](false).mapMaterializedValue { p => + Source.fromPublisher(p).runFold(0)(_ + _) }) Await.result(f, 3.seconds) should be(6) diff --git a/stream/src/main/java/org/apache/pekko/stream/javadsl/JavaFlowSupport.java b/stream/src/main/java/org/apache/pekko/stream/javadsl/JavaFlowSupport.java index 603b216d545..c9db616e144 100644 --- a/stream/src/main/java/org/apache/pekko/stream/javadsl/JavaFlowSupport.java +++ b/stream/src/main/java/org/apache/pekko/stream/javadsl/JavaFlowSupport.java @@ -49,7 +49,10 @@ private Source() { *

See also {@code Source.fromPublisher} if wanting to integrate with {@link * org.reactivestreams.Publisher} instead (which carries the same semantics, however existed * before RS's inclusion in Java 9). + * + * @deprecated Use {@code Source.fromPublisher} instead (since 2.0.0). */ + @Deprecated public static org.apache.pekko.stream.javadsl.Source fromPublisher( java.util.concurrent.Flow.Publisher publisher) { return org.apache.pekko.stream.javadsl.Source.fromPublisher( @@ -63,8 +66,11 @@ public static org.apache.pekko.stream.javadsl.Source fromPublish *

See also {@code Source.asSubscriber} if wanting to integrate with {@link * org.reactivestreams.Subscriber} instead (which carries the same semantics, however existed * before RS's inclusion in Java 9). + * + * @deprecated Use {@code Source.asJavaSubscriber} instead (since 2.0.0). */ // #asSubscriber + @Deprecated public static org.apache.pekko.stream.javadsl.Source> asSubscriber() { @@ -180,7 +186,10 @@ private Sink() { * *

If {@code fanout} is {@code WITHOUT_FANOUT} then the materialized {@code Publisher} will * only support a single {@code Subscriber} and reject any additional {@code Subscriber}s. + * + * @deprecated Use {@code Sink.asJavaPublisher} instead (since 2.0.0). */ + @Deprecated public static org.apache.pekko.stream.javadsl.Sink> asPublisher( AsPublisher fanout) { @@ -188,7 +197,12 @@ org.apache.pekko.stream.javadsl.Sink> .mapMaterializedValue(JavaFlowAndRsConverters::asJava); } - /** Helper to create <> from <>. */ + /** + * Helper to create <> from <>. + * + * @deprecated Use {@code Sink.fromSubscriber} instead (since 2.0.0). + */ + @Deprecated public static org.apache.pekko.stream.javadsl.Sink fromSubscriber( java.util.concurrent.Flow.Subscriber s) { return org.apache.pekko.stream.javadsl.Sink.fromSubscriber(JavaFlowAndRsConverters.asRs(s)); diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala index b280ae3a791..d36625aec79 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala @@ -30,7 +30,7 @@ import pekko.actor.{ ActorRef, ClassicActorSystemProvider, Status } import pekko.japi.function import pekko.japi.function.Creator import pekko.stream._ -import pekko.stream.impl.LinearTraversalBuilder +import pekko.stream.impl.{ JavaFlowAndRsConverters, LinearTraversalBuilder } import pekko.stream.scaladsl.SinkToCompletionStage import pekko.util.ConstantFun.scalaAnyToUnit @@ -180,6 +180,15 @@ object Sink { def fromSubscriber[In](subs: Subscriber[In]): Sink[In, NotUsed] = new Sink(scaladsl.Sink.fromSubscriber(subs)) + /** + * Helper to create [[Sink]] from `java.util.concurrent.Flow.Subscriber`. + * + * @see pekko.stream.javadsl.JavaFlowSupport.Sink#fromSubscriber + * @since 2.0.0 + */ + def fromSubscriber[In](subs: java.util.concurrent.Flow.Subscriber[In]): Sink[In, NotUsed] = + new Sink(scaladsl.Sink.fromSubscriber(subs)) + /** * A `Sink` that immediately cancels its upstream after materialization. */ @@ -212,6 +221,22 @@ object Sink { def asPublisher[T](fanout: AsPublisher): Sink[T, Publisher[T]] = new Sink(scaladsl.Sink.asPublisher(fanout == AsPublisher.WITH_FANOUT)) + /** + * A `Sink` that materializes into a [[java.util.concurrent.Flow.Publisher]]. + * + * If `fanout` is `true`, the materialized `Publisher` will support multiple `Subscriber`s and + * the size of the `inputBuffer` configured for this operator becomes the maximum number of elements that + * the fastest [[java.util.concurrent.Flow.Subscriber]] can be ahead of the slowest one before slowing + * the processing down due to back pressure. + * + * If `fanout` is `false` then the materialized `Publisher` will only support a single `Subscriber` and + * reject any additional `Subscriber`s. + */ + def asJavaPublisher[T](fanout: AsPublisher): Sink[T, java.util.concurrent.Flow.Publisher[T]] = { + import JavaFlowAndRsConverters.Implicits._ + asPublisher[T](fanout).mapMaterializedValue(_.asJava) + } + /** * A `Sink` that materializes this `Sink` itself as a `Source`. * The returned `Source` is a "live view" onto the `Sink` and only supports a single `Subscriber`. diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 100eb1d3947..8441313cfae 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -36,7 +36,7 @@ import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import pekko.japi.{ function, JavaPartialFunction, Pair } import pekko.japi.function.Creator import pekko.stream._ -import pekko.stream.impl.{ LinearTraversalBuilder, UnfoldAsyncJava, UnfoldJava } +import pekko.stream.impl.{ JavaFlowAndRsConverters, LinearTraversalBuilder, UnfoldAsyncJava, UnfoldJava } import pekko.stream.impl.Stages.DefaultAttributes import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava } import pekko.util._ @@ -80,7 +80,7 @@ object Source { } /** - * Helper to create [[Source]] from `Publisher`. + * Helper to create [[Source]] from `org.reactivestreams.Publisher`. * * Construct a transformation starting with given publisher. The transformation steps * are executed by a series of [[org.reactivestreams.Processor]] instances @@ -90,6 +90,15 @@ object Source { def fromPublisher[O](publisher: Publisher[O]): javadsl.Source[O, NotUsed] = new Source(scaladsl.Source.fromPublisher(publisher)) + /** + * Helper to create [[Source]] from `java.util.concurrent.Flow.Publisher`. + * + * @see pekko.stream.javadsl.JavaFlowSupport.Source#fromPublisher + * @since 2.0.0 + */ + def fromPublisher[O](publisher: java.util.concurrent.Flow.Publisher[O]): javadsl.Source[O, NotUsed] = + new Source(scaladsl.Source.fromPublisher(publisher)) + /** * Helper to create [[Source]] from `Iterator`. * Example usage: @@ -405,6 +414,17 @@ object Source { def asSubscriber[T](): Source[T, Subscriber[T]] = new Source(scaladsl.Source.asSubscriber) + /** + * Creates a `Source` that is materialized as a [[java.util.concurrent.Flow.Subscriber]] + * + * @see pekko.stream.javadsl.JavaFlowSupport.Source#asSubscriber + * @since 2.0.0 + */ + def asJavaSubscriber[T](): Source[T, java.util.concurrent.Flow.Subscriber[T]] = { + import JavaFlowAndRsConverters.Implicits._ + asSubscriber[T]().mapMaterializedValue(_.asJava) + } + /** * Creates a `Source` that is materialized as an [[pekko.actor.ActorRef]]. * Messages sent to this actor will be emitted to the stream if there is demand from downstream, diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/JavaFlowSupport.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/JavaFlowSupport.scala index 0d0e01fe8da..783ac3a37c0 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/JavaFlowSupport.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/JavaFlowSupport.scala @@ -45,6 +45,7 @@ object JavaFlowSupport { * @see See also [[Source.fromPublisher]] if wanting to integrate with [[org.reactivestreams.Publisher]] instead * (which carries the same semantics, however existed before RS's inclusion in Java 9). */ + @deprecated("Use pekko.stream.scaladsl.Source.fromPublisher", "2.0.0") final // #fromPublisher def fromPublisher[T](publisher: java.util.concurrent.Flow.Publisher[T]): Source[T, NotUsed] = @@ -57,6 +58,7 @@ object JavaFlowSupport { * @see See also [[Source.asSubscriber]] if wanting to integrate with [[org.reactivestreams.Subscriber]] instead * (which carries the same semantics, however existed before RS's inclusion in Java 9). */ + @deprecated("Use pekko.stream.scaladsl.Source.asJavaSubscriber", "2.0.0") final // #asSubscriber def asSubscriber[T]: Source[T, java.util.concurrent.Flow.Subscriber[T]] = @@ -94,8 +96,8 @@ object JavaFlowSupport { */ def toProcessor[In, Out, Mat]( self: Flow[In, Out, Mat]): RunnableGraph[juc.Flow.Processor[In @uncheckedVariance, Out @uncheckedVariance]] = - Source.asSubscriber[In].via(self) - .toMat(Sink.asPublisher[Out](fanout = false))(Keep.both) + scaladsl.Source.asJavaSubscriber[In].via(self) + .toMat(scaladsl.Sink.asJavaPublisher[Out](fanout = false))(Keep.both) .mapMaterializedValue { case (sub, pub) => new juc.Flow.Processor[In, Out] { override def onError(t: Throwable): Unit = sub.onError(t) @@ -123,12 +125,14 @@ object JavaFlowSupport { * If `fanout` is `WITHOUT_FANOUT` then the materialized `Publisher` will only support a single `Subscriber` and * reject any additional `Subscriber`s. */ + @deprecated("Use pekko.stream.scaladsl.Sink.asJavaPublisher", "2.0.0") final def asPublisher[T](fanout: Boolean): Sink[T, juc.Flow.Publisher[T]] = scaladsl.Sink.asPublisher[T](fanout).mapMaterializedValue(_.asJava) /** * Helper to create [[Sink]] from [[java.util.concurrent.Flow.Subscriber]]. */ + @deprecated("Use pekko.stream.scaladsl.Sink.fromSubscriber", "2.0.0") final def fromSubscriber[T](s: juc.Flow.Subscriber[T]): Sink[T, NotUsed] = scaladsl.Sink.fromSubscriber(s.asRs) } diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala index 03f97f94f8d..9ec4e7f2cd4 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala @@ -182,6 +182,17 @@ object Sink { def fromSubscriber[T](subscriber: Subscriber[T]): Sink[T, NotUsed] = fromGraph(new SubscriberSink(subscriber, DefaultAttributes.subscriberSink, shape("SubscriberSink"))) + /** + * Helper to create [[Sink]] from `java.util.concurrent.Flow.Subscriber`. + * + * @see pekko.stream.scaladsl.JavaFlowSupport.Sink#fromSubscriber + * @since 2.0.0 + */ + def fromSubscriber[T](subscriber: java.util.concurrent.Flow.Subscriber[T]) = { + import JavaFlowAndRsConverters.Implicits._ + fromGraph(new SubscriberSink(subscriber.asRs, DefaultAttributes.subscriberSink, shape("SubscriberSink"))) + } + /** * A `Sink` that immediately cancels its upstream after materialization. */ @@ -312,6 +323,25 @@ object Sink { if (fanout) new FanoutPublisherSink[T](DefaultAttributes.fanoutPublisherSink, shape("FanoutPublisherSink")) else new PublisherSink[T](DefaultAttributes.publisherSink, shape("PublisherSink"))) + /** + * A `Sink` that materializes into a [[java.util.concurrent.Flow.Publisher]]. + * + * If `fanout` is `true`, the materialized `Publisher` will support multiple `Subscriber`s and + * the size of the `inputBuffer` configured for this operator becomes the maximum number of elements that + * the fastest [[java.util.concurrent.Flow.Subscriber]] can be ahead of the slowest one before slowing + * the processing down due to back pressure. + * + * If `fanout` is `false` then the materialized `Publisher` will only support a single `Subscriber` and + * reject any additional `Subscriber`s. + * + * @see pekko.stream.scaladsl.JavaFlowSupport.Sink#asPublisher + * @since 2.0.0 + */ + def asJavaPublisher[T](fanout: Boolean): Sink[T, java.util.concurrent.Flow.Publisher[T]] = { + import JavaFlowAndRsConverters.Implicits._ + asPublisher[T](fanout).mapMaterializedValue(_.asJava) + } + /** * A `Sink` that materializes this `Sink` itself as a `Source`. * The returned `Source` is a "live view" onto the `Sink` and only supports a single `Subscriber`. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala index 5198444a4ba..6ea61fd6b85 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala @@ -276,6 +276,17 @@ object Source { def fromPublisher[T](publisher: Publisher[T]): Source[T, NotUsed] = fromGraph(new PublisherSource(publisher, DefaultAttributes.publisherSource, shape("PublisherSource"))) + /** + * Helper to create [[Source]] from `java.util.concurrent.Flow.Publisher`. + * + * @see pekko.stream.scaladsl.JavaFlowSupport.Source#fromPublisher + * @since 2.0.0 + */ + def fromPublisher[T](publisher: java.util.concurrent.Flow.Publisher[T]) = { + import JavaFlowAndRsConverters.Implicits._ + fromGraph(new PublisherSource(publisher.asRs, DefaultAttributes.publisherSource, shape("PublisherSource"))) + } + /** * Helper to create [[Source]] from `Iterator`. * Example usage: `Source.fromIterator(() => Iterator.from(0))` @@ -637,6 +648,17 @@ object Source { def asSubscriber[T]: Source[T, Subscriber[T]] = fromGraph(new SubscriberSource[T](DefaultAttributes.subscriberSource, shape("SubscriberSource"))) + /** + * Creates a `Source` that is materialized as a [[java.util.concurrent.Flow.Subscriber]] + * + * @see pekko.stream.scaladsl.JavaFlowSupport.Source#asSubscriber + * @since 2.0.0 + */ + def asJavaSubscriber[T]: Source[T, java.util.concurrent.Flow.Subscriber[T]] = { + import JavaFlowAndRsConverters.Implicits._ + asSubscriber[T].mapMaterializedValue(_.asJava) + } + /** * Creates a `Source` that is materialized as an [[pekko.actor.ActorRef]]. * Messages sent to this actor will be emitted to the stream if there is demand from downstream,