From b1f117acf2320fe7790504888a879377c8f32a37 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Sun, 22 Feb 2026 21:48:55 +0100 Subject: [PATCH 1/2] revert recent changes to recoverWith Update Ops.scala revert recent changes to recoverWith --- .../apache/pekko/stream/impl/fusing/Ops.scala | 29 ++++--------------- 1 file changed, 6 insertions(+), 23 deletions(-) diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala index d02744c8d3..6f01f9e8b5 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala @@ -2169,36 +2169,19 @@ private[pekko] object TakeWithin { override def onPull(): Unit = pull(in) @nowarn("msg=Any") - @tailrec def onFailure(ex: Throwable): Unit = { import Collect.NotApplied if (maximumRetries < 0 || attempt < maximumRetries) { pf.applyOrElse(ex, NotApplied) match { - case _: NotApplied.type => failStage(ex) + case _: NotApplied.type => failStage(ex) case source: Graph[SourceShape[T] @unchecked, M @unchecked] if TraversalBuilder.isEmptySource(source) => completeStage() - case source: Graph[SourceShape[T] @unchecked, M @unchecked] => - TraversalBuilder.getValuePresentedSource(source) match { - case OptionVal.Some(graph) => graph match { - case singleSource: SingleSource[T @unchecked] => emit(out, singleSource.elem, () => completeStage()) - case failed: FailedSource[T @unchecked] => onFailure(failed.failure) - case futureSource: FutureSource[T @unchecked] => futureSource.future.value match { - case Some(Success(elem)) => emit(out, elem, () => completeStage()) - case Some(Failure(ex)) => onFailure(ex) - case None => - switchTo(source) - attempt += 1 - } - case iterableSource: IterableSource[T @unchecked] => - emitMultiple(out, iterableSource.elements, () => completeStage()) - case javaStreamSource: JavaStreamSource[T @unchecked, _] => - emitMultiple(out, javaStreamSource.open().spliterator(), () => completeStage()) - case _ => - switchTo(source) - attempt += 1 - } + case other: Graph[SourceShape[T] @unchecked, M @unchecked] => + TraversalBuilder.getSingleSource(other) match { + case OptionVal.Some(singleSource) => + emit(out, singleSource.elem.asInstanceOf[T], () => completeStage()) case _ => - switchTo(source) + switchTo(other) attempt += 1 } case _ => throw new IllegalStateException() // won't happen, compiler exhaustiveness check pleaser From 02e1b13da87c1b73dbf1f85048d962af0e243034 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Mon, 23 Feb 2026 10:26:07 +0100 Subject: [PATCH 2/2] scalafmt --- .../org/apache/pekko/stream/impl/fusing/Ops.scala | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala index 6f01f9e8b5..0635e52d51 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala @@ -36,16 +36,9 @@ import pekko.stream.Attributes.{ InputBuffer, LogLevels } import pekko.stream.Attributes.SourceLocation import pekko.stream.OverflowStrategies._ import pekko.stream.Supervision.Decider -import pekko.stream.impl.{ - Buffer => BufferImpl, - ContextPropagation, - FailedSource, - JavaStreamSource, - ReactiveStreamsCompliance, - TraversalBuilder -} +import pekko.stream.impl.{ Buffer => BufferImpl, ContextPropagation, ReactiveStreamsCompliance, TraversalBuilder } import pekko.stream.impl.Stages.DefaultAttributes -import pekko.stream.impl.fusing.GraphStages.{ FutureSource, SimpleLinearGraphStage, SingleSource } +import pekko.stream.impl.fusing.GraphStages.SimpleLinearGraphStage import pekko.stream.scaladsl.{ DelayStrategy, Source, @@ -2173,7 +2166,7 @@ private[pekko] object TakeWithin { import Collect.NotApplied if (maximumRetries < 0 || attempt < maximumRetries) { pf.applyOrElse(ex, NotApplied) match { - case _: NotApplied.type => failStage(ex) + case _: NotApplied.type => failStage(ex) case source: Graph[SourceShape[T] @unchecked, M @unchecked] if TraversalBuilder.isEmptySource(source) => completeStage() case other: Graph[SourceShape[T] @unchecked, M @unchecked] =>