Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Invalid HTTP/2 request headers return 400 Bad Request instead of GOAWAY
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.http.impl.engine.http2.FrameEvent#ParsedHeadersFrame.copy")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.http.impl.engine.http2.FrameEvent#ParsedHeadersFrame.this")
ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.http.impl.engine.http2.FrameEvent$ParsedHeadersFrame$")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.http.impl.engine.http2.FrameEvent#ParsedHeadersFrame.apply")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.http.impl.engine.http2.FrameEvent#ParsedHeadersFrame.unapply")
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import pekko.annotation.InternalApi
import pekko.http.impl.engine.http2.Http2Protocol.ErrorCode
import pekko.http.impl.engine.http2.Http2Protocol.FrameType
import pekko.http.impl.engine.http2.Http2Protocol.SettingIdentifier
import pekko.http.scaladsl.model.ErrorInfo
import pekko.util.ByteString

import scala.collection.immutable
Expand Down Expand Up @@ -112,11 +113,14 @@ private[http] object FrameEvent {
/**
* Convenience (logical) representation of a parsed HEADERS frame with zero, one or
* many CONTINUATIONS Frames into a single, decompressed object.
*
* @param headerParseErrorDetails Only used server side, passes header errors from decompression into error response logic
*/
final case class ParsedHeadersFrame(
streamId: Int,
endStream: Boolean,
keyValuePairs: Seq[(String, AnyRef)],
priorityInfo: Option[PriorityFrame]) extends StreamFrameEvent
priorityInfo: Option[PriorityFrame],
headerParseErrorDetails: Option[ErrorInfo]) extends StreamFrameEvent

}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ private[http2] object FrameLogger {
case GoAwayFrame(lastStreamId, errorCode, debug) =>
LogEntry(0, "GOAY", s"lastStreamId = $lastStreamId, errorCode = $errorCode, debug = ${debug.utf8String}")

case ParsedHeadersFrame(streamId, endStream, kvPairs, prio) =>
case ParsedHeadersFrame(streamId, endStream, kvPairs, prio, _) =>
val prioInfo = if (prio.isDefined) display(entryForFrame(prio.get)) + " " else ""
val kvInfo = kvPairs.map {
case (key, value) => s"$key -> $value"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ private[http] object Http2Blueprint {
*
* To make use of parallelism requests and responses need to be associated (other than by ordering), suggestion
* is to add a special (virtual) header containing the streamId (or any other kind of token) is added to the HttRequest
* that must be reproduced in an HttpResponse. This can be done automatically for the `bind`` API but for
* that must be reproduced in an HttpResponse. This can be done automatically for the `bind` API but for
* `bindFlow` the user needs to take of this manually.
*/
def httpLayer(settings: ServerSettings, log: LoggingAdapter, dateHeaderRendering: DateHeaderRendering)
Expand All @@ -284,12 +284,14 @@ private[http] object Http2Blueprint {
// HttpHeaderParser is not thread safe and should not be called concurrently,
// the internal trie, however, has built-in protection and will do copy-on-write
val masterHttpHeaderParser = HttpHeaderParser(parserSettings, log)
BidiFlow.fromFlows(
Flow[HttpResponse].map(new ResponseRendering(settings, log, dateHeaderRendering)),
Flow[Http2SubStream].via(StreamUtils.statefulAttrsMap { attrs =>
val headerParser = masterHttpHeaderParser.createShallowCopy()
RequestParsing.parseRequest(headerParser, settings, attrs)
}))
RequestErrorFlow().atop(
BidiFlow.fromFlows(
Flow[HttpResponse].map(new ResponseRendering(settings, log, dateHeaderRendering)),
Flow[Http2SubStream].via(StreamUtils.statefulAttrsMap { attrs =>
val headerParser = masterHttpHeaderParser.createShallowCopy()
RequestParsing.parseRequest(headerParser, settings, attrs)
}))
)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ private[http2] trait Http2StreamHandling extends GraphStageLogic with LogHelper
nextStateStream: IncomingStreamBuffer => StreamState,
correlationAttributes: Map[AttributeKey[_], _] = Map.empty): StreamState =
event match {
case frame @ ParsedHeadersFrame(streamId, endStream, _, _) =>
case frame @ ParsedHeadersFrame(streamId, endStream, _, _, _) =>
if (endStream) {
dispatchSubstream(frame, Left(ByteString.empty), correlationAttributes)
nextStateEmpty
Expand Down Expand Up @@ -833,7 +833,7 @@ private[http2] trait Http2StreamHandling extends GraphStageLogic with LogHelper
"Found both an attribute with trailing headers, and headers in the `LastChunk`. This is not supported.")
trailer = OptionVal.Some(ParsedHeadersFrame(streamId, endStream = true,
HttpMessageRendering.renderHeaders(headers, log, isServer, shouldRenderAutoHeaders = false,
dateHeaderRendering = DateHeaderRendering.Unavailable), None))
dateHeaderRendering = DateHeaderRendering.Unavailable), None, None))
}

maybePull()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ private[http2] sealed abstract class MessageRendering[R <: HttpMessage] extends
shouldRenderAutoHeaders = true, dateHeaderRendering)

val streamId = nextStreamId(r)
val headersFrame = ParsedHeadersFrame(streamId, endStream = r.entity.isKnownEmpty, headerPairs.result(), None)
val headersFrame = ParsedHeadersFrame(streamId, endStream = r.entity.isKnownEmpty, headerPairs.result(), None, None)
val trailingHeadersFrame =
r.attribute(AttributeKeys.trailer) match {
case Some(trailer) if trailer.headers.nonEmpty =>
OptionVal.Some(ParsedHeadersFrame(streamId, endStream = true, trailer.headers, None))
OptionVal.Some(ParsedHeadersFrame(streamId, endStream = true, trailer.headers, None, None))
case _ => OptionVal.None
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/

/*
* Copyright (C) 2009-2023 Lightbend Inc. <https://www.lightbend.com>
*/

package org.apache.pekko.http.impl.engine.http2

import org.apache.pekko
import pekko.NotUsed
import pekko.annotation.InternalApi
import pekko.http.impl.engine.http2.RequestParsing.ParseRequestResult
import pekko.http.scaladsl.model.{ HttpRequest, HttpResponse, StatusCodes }
import pekko.stream.{ Attributes, BidiShape, Inlet, Outlet }
import pekko.stream.scaladsl.BidiFlow
import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }

/**
* INTERNAL API
*/
@InternalApi
private[http2] object RequestErrorFlow {

private val _bidiFlow = BidiFlow.fromGraph(new RequestErrorFlow)
def apply(): BidiFlow[HttpResponse, HttpResponse, ParseRequestResult, HttpRequest, NotUsed] = _bidiFlow

}

/**
* INTERNAL API
*/
@InternalApi
private[http2] final class RequestErrorFlow
extends GraphStage[BidiShape[HttpResponse, HttpResponse, ParseRequestResult, HttpRequest]] {

val requestIn = Inlet[ParseRequestResult]("requestIn")
val requestOut = Outlet[HttpRequest]("requestOut")
val responseIn = Inlet[HttpResponse]("responseIn")
val responseOut = Outlet[HttpResponse]("responseOut")

override val shape: BidiShape[HttpResponse, HttpResponse, ParseRequestResult, HttpRequest] =
BidiShape(responseIn, responseOut, requestIn, requestOut)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
setHandlers(requestIn, requestOut,
new InHandler with OutHandler {
override def onPush(): Unit = {
grab(requestIn) match {
case RequestParsing.OkRequest(request) => push(requestOut, request)
case notOk: RequestParsing.BadRequest =>
emit(responseOut,
HttpResponse(StatusCodes.BadRequest, entity = notOk.info.summary).addAttribute(Http2.streamId,
notOk.streamId))
pull(requestIn)
}
}

override def onPull(): Unit = pull(requestIn)
})
setHandlers(responseIn, responseOut,
new InHandler with OutHandler {
override def onPush(): Unit = push(responseOut, grab(responseIn))
override def onPull(): Unit = if (!hasBeenPulled(responseIn)) pull(responseIn)
})

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package org.apache.pekko.http.impl.engine.http2
import javax.net.ssl.SSLSession
import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.http.impl.engine.http2.Http2Compliance.Http2ProtocolException
import pekko.http.impl.engine.parsing.HttpHeaderParser
import pekko.http.impl.engine.server.HttpAttributes
import pekko.http.scaladsl.model
Expand All @@ -29,15 +28,20 @@ import pekko.util.OptionVal

import scala.annotation.tailrec
import scala.collection.immutable.VectorBuilder
import scala.util.control.NoStackTrace

/**
* INTERNAL API
*/
@InternalApi
private[http2] object RequestParsing {

sealed trait ParseRequestResult
final case class OkRequest(request: HttpRequest) extends ParseRequestResult
final case class BadRequest(info: ErrorInfo, streamId: Int) extends ParseRequestResult

def parseRequest(httpHeaderParser: HttpHeaderParser, serverSettings: ServerSettings, streamAttributes: Attributes)
: Http2SubStream => HttpRequest = {
: Http2SubStream => ParseRequestResult = {

val remoteAddressAttribute: Option[RemoteAddress] =
if (serverSettings.remoteAddressAttribute) {
Expand Down Expand Up @@ -151,19 +155,19 @@ private[http2] object RequestParsing {
rec(incomingHeaders, offset + 1, method, scheme, authority, pathAndRawQuery,
OptionVal.Some(ContentType.get(value)), contentLength, cookies, true, headers)
else
malformedRequest("HTTP message must not contain more than one content-type header")
parseError("HTTP message must not contain more than one content-type header", "content-type")

case ":status" =>
malformedRequest("Pseudo-header ':status' is for responses only; it cannot appear in a request")
parseError("Pseudo-header ':status' is for responses only; it cannot appear in a request", ":status")

case "content-length" =>
if (contentLength == -1) {
val contentLengthValue = ContentLength.get(value).toLong
if (contentLengthValue < 0)
malformedRequest("HTTP message must not contain a negative content-length header")
parseError("HTTP message must not contain a negative content-length header", "content-length")
rec(incomingHeaders, offset + 1, method, scheme, authority, pathAndRawQuery, contentType,
contentLengthValue, cookies, true, headers)
} else malformedRequest("HTTP message must not contain more than one content-length header")
} else parseError("HTTP message must not contain more than one content-length header", "content-length")

case "cookie" =>
// Compress cookie headers as described here https://tools.ietf.org/html/rfc7540#section-8.1.2.5
Expand All @@ -182,11 +186,21 @@ private[http2] object RequestParsing {
}
}

val incomingHeaders = subStream.initialHeaders.keyValuePairs.toIndexedSeq
if (incomingHeaders.size > serverSettings.parserSettings.maxHeaderCount)
malformedRequest(
s"HTTP message contains more than the configured limit of ${serverSettings.parserSettings.maxHeaderCount} headers")
else rec(incomingHeaders, 0)
try {
subStream.initialHeaders.headerParseErrorDetails match {
case Some(details) =>
// header errors already found in decompression
BadRequest(details, subStream.streamId)
case None =>
val incomingHeaders = subStream.initialHeaders.keyValuePairs.toIndexedSeq
if (incomingHeaders.size > serverSettings.parserSettings.maxHeaderCount)
parseError(
s"HTTP message contains more than the configured limit of ${serverSettings.parserSettings.maxHeaderCount} headers")
else OkRequest(rec(incomingHeaders, 0))
}
} catch {
case bre: ParsingException => BadRequest(bre.info, subStream.streamId)
}
}
}

Expand All @@ -201,25 +215,34 @@ private[http2] object RequestParsing {
}

private[http2] def checkRequiredPseudoHeader(name: String, value: AnyRef): Unit =
if (value eq null) malformedRequest(s"Mandatory pseudo-header '$name' missing")
if (value eq null) protocolError(s"Mandatory pseudo-header '$name' missing")
private[http2] def checkUniquePseudoHeader(name: String, value: AnyRef): Unit =
if (value ne null) malformedRequest(s"Pseudo-header '$name' must not occur more than once")
if (value ne null) protocolError(s"Pseudo-header '$name' must not occur more than once")
private[http2] def checkNoRegularHeadersBeforePseudoHeader(name: String, seenRegularHeader: Boolean): Unit =
if (seenRegularHeader) malformedRequest(s"Pseudo-header field '$name' must not appear after a regular header")
private[http2] def malformedRequest(msg: String): Nothing =
throw new Http2ProtocolException(s"Malformed request: $msg")
if (seenRegularHeader) parseError(s"Pseudo-header field '$name' must not appear after a regular header", name)
private[http2] def validateHeader(httpHeader: HttpHeader) = httpHeader.lowercaseName match {
case "connection" =>
// https://tools.ietf.org/html/rfc7540#section-8.1.2.2
malformedRequest("Header 'Connection' must not be used with HTTP/2")
parseError("Header 'Connection' must not be used with HTTP/2", "Connection")
case "transfer-encoding" =>
// https://tools.ietf.org/html/rfc7540#section-8.1.2.2
malformedRequest("Header 'Transfer-Encoding' must not be used with HTTP/2")
parseError("Header 'Transfer-Encoding' must not be used with HTTP/2", "Transfer-encoding")
case "te" =>
// https://tools.ietf.org/html/rfc7540#section-8.1.2.2
if (httpHeader.value.compareToIgnoreCase("trailers") != 0)
malformedRequest(s"Header 'TE' must not contain value other than 'trailers', value was '${httpHeader.value}")
parseError(s"Header 'TE' must not contain value other than 'trailers', value was '${httpHeader.value}", "TE")
case _ => // ok
}

// parse errors lead to BadRequest response while Protocol
private[http2] def protocolError(summary: String): Nothing =
throw new Http2Compliance.Http2ProtocolException(s"Malformed request: $summary")

private[http2] def parseError(summary: String, headerName: String): Nothing =
throw new ParsingException(ErrorInfo(s"Malformed request: $summary").withErrorHeaderName(headerName))
with NoStackTrace

private def parseError(summary: String): Nothing =
throw new ParsingException(ErrorInfo(s"Malformed request: $summary")) with NoStackTrace

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package client

import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.http.impl.engine.http2.Http2Compliance.Http2ProtocolException
import pekko.http.impl.engine.http2.RequestParsing._
import pekko.http.impl.engine.parsing.HttpHeaderParser
import pekko.http.impl.engine.server.HttpAttributes
Expand Down Expand Up @@ -85,26 +86,26 @@ private[http2] object ResponseParsing {
rec(remainingHeaders.tail, status, OptionVal.Some(contentTypeValue), contentLength, seenRegularHeader,
headers)
else
malformedRequest("HTTP message must not contain more than one content-type header")
malformedResponse("HTTP message must not contain more than one content-type header")

case ("content-type", ct: String) =>
if (contentType.isEmpty) {
val contentTypeValue =
ContentType.parse(ct).getOrElse(malformedRequest(s"Invalid content-type: '$ct'"))
ContentType.parse(ct).getOrElse(malformedResponse(s"Invalid content-type: '$ct'"))
rec(remainingHeaders.tail, status, OptionVal.Some(contentTypeValue), contentLength, seenRegularHeader,
headers)
} else malformedRequest("HTTP message must not contain more than one content-type header")
} else malformedResponse("HTTP message must not contain more than one content-type header")

case ("content-length", length: String) =>
if (contentLength == -1) {
val contentLengthValue = length.toLong
if (contentLengthValue < 0)
malformedRequest("HTTP message must not contain a negative content-length header")
malformedResponse("HTTP message must not contain a negative content-length header")
rec(remainingHeaders.tail, status, contentType, contentLengthValue, seenRegularHeader, headers)
} else malformedRequest("HTTP message must not contain more than one content-length header")
} else malformedResponse("HTTP message must not contain more than one content-length header")

case (name, _) if name.startsWith(':') =>
malformedRequest(s"Unexpected pseudo-header '$name' in response")
malformedResponse(s"Unexpected pseudo-header '$name' in response")

case (_, httpHeader: HttpHeader) =>
rec(remainingHeaders.tail, status, contentType, contentLength, seenRegularHeader = true,
Expand All @@ -119,4 +120,7 @@ private[http2] object ResponseParsing {

rec(subStream.initialHeaders.keyValuePairs)
}

private def malformedResponse(msg: String): Nothing =
throw new Http2ProtocolException(s"Malformed response: $msg")
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private[http2] object HeaderCompression extends GraphStage[FlowShape[FrameEvent,
case ack @ SettingsAckFrame(s) =>
applySettings(s)
push(eventsOut, ack)
case ParsedHeadersFrame(streamId, endStream, kvs, prioInfo) =>
case ParsedHeadersFrame(streamId, endStream, kvs, prioInfo, _) =>
// When ending the stream without any payload, use a DATA frame rather than
// a HEADERS frame to work around https://github.com/golang/go/issues/47851.
if (endStream && kvs.isEmpty) push(eventsOut, DataFrame(streamId, endStream, ByteString.empty))
Expand Down
Loading
Loading