11public import HTTPTypes
22import NIOCore
33import NIOHTTPTypes
4+ import Synchronization
45
56/// A specialized reader for HTTP request bodies and trailers that manages the reading process
67/// and captures the final trailer fields.
@@ -23,7 +24,7 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable
2324 public typealias ReadFailure = any Error
2425
2526 /// The HTTP trailer fields captured at the end of the request.
26- fileprivate var state : ReaderState ?
27+ fileprivate var state : ReaderState
2728
2829 /// The iterator that provides HTTP request parts from the underlying channel.
2930 private var iterator : NIOAsyncChannelInboundStream < HTTPRequestPart > . AsyncIterator
@@ -32,9 +33,11 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable
3233 ///
3334 /// - Parameter iterator: The NIO async channel inbound stream iterator to use for reading request parts.
3435 fileprivate init (
35- iterator: consuming sending NIOAsyncChannelInboundStream< HTTPRequestPart > . AsyncIterator
36+ iterator: consuming sending NIOAsyncChannelInboundStream< HTTPRequestPart > . AsyncIterator ,
37+ readerState: ReaderState
3638 ) {
3739 self . iterator = iterator
40+ self . state = readerState
3841 }
3942
4043 /// Reads a chunk of request body data.
@@ -53,18 +56,28 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable
5356 // TODO: Add ByteBuffer span interfaces
5457 return try await body ( Array ( buffer: element) . span)
5558 case . end ( let trailers) :
56- self . state? . trailers = trailers
57- self . state? . finishedReading = true
59+ self . state. wrapped. withLock { state in
60+ state. trailers = trailers
61+ state. finishedReading = true
62+ }
5863 return try await body ( nil )
5964 case . none:
6065 return try await body ( nil )
6166 }
6267 }
6368 }
6469
65- final class ReaderState {
66- var trailers : HTTPFields ? = nil
67- var finishedReading : Bool = false
70+ final class ReaderState: Sendable {
71+ struct Wrapped {
72+ var trailers : HTTPFields ? = nil
73+ var finishedReading : Bool = false
74+ }
75+
76+ let wrapped : Mutex < Wrapped >
77+
78+ init ( ) {
79+ self . wrapped = . init( . init( ) )
80+ }
6881 }
6982
7083 /// The underlying reader type for the HTTP request body.
@@ -76,10 +89,9 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable
7689 /// The type of errors that can occur during reading operations.
7790 public typealias Failure = any Error
7891
79- /// The internal reader that provides HTTP request parts from the underlying channel.
80- private var partsReader: RequestBodyAsyncReader
92+ private var iterator: NIOAsyncChannelInboundStream< HTTPRequestPart> . AsyncIterator?
8193
82- fileprivate let readerState : ReaderState
94+ internal var state : ReaderState
8395
8496 /// Initializes a new HTTP request body and trailers reader with the given NIO async channel iterator.
8597 ///
@@ -88,8 +100,8 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable
88100 iterator: consuming sending NIOAsyncChannelInboundStream< HTTPRequestPart> . AsyncIterator,
89101 readerState: ReaderState
90102 ) {
91- self . partsReader = RequestBodyAsyncReader ( iterator: iterator )
92- self . readerState = readerState
103+ self . iterator = iterator
104+ self . state = readerState
93105 }
94106
95107 /// Processes the request body reading operation and captures the final trailer fields.
@@ -118,11 +130,16 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable
118130 /// }
119131 /// ```
120132 public consuming func consumeAndConclude< Return> (
121- body: ( consuming RequestBodyAsyncReader) async throws -> Return
133+ body: ( consuming sending RequestBodyAsyncReader) async throws -> Return
122134 ) async throws -> ( Return, HTTPFields? ) {
123- self . partsReader. state = self . readerState
124- let result = try await body ( self . partsReader)
125- return ( result, self . readerState. trailers)
135+ if let iterator = self . iterator. sendingTake ( ) {
136+ let partsReader = RequestBodyAsyncReader ( iterator: iterator, readerState: self . state)
137+ let result = try await body ( partsReader)
138+ let trailers = self . state. wrapped. withLock { $0. trailers }
139+ return ( result, trailers)
140+ } else {
141+ fatalError ( " consumeAndConclude called more than once " )
142+ }
126143 }
127144}
128145
@@ -131,3 +148,12 @@ extension HTTPRequestConcludingAsyncReader: Sendable {}
131148
132149@available( * , unavailable)
133150extension HTTPRequestConcludingAsyncReader. RequestBodyAsyncReader: Sendable { }
151+
152+
153+ extension Optional {
154+ mutating func sendingTake( ) - > sending Self {
155+ let result = consume self
156+ self = nil
157+ return result
158+ }
159+ }
0 commit comments