diff --git a/Package.swift b/Package.swift index a78325e..5a3ba81 100644 --- a/Package.swift +++ b/Package.swift @@ -5,6 +5,7 @@ import PackageDescription let extraSettings: [SwiftSetting] = [ .enableExperimentalFeature("SuppressedAssociatedTypes"), .enableExperimentalFeature("LifetimeDependence"), + .enableExperimentalFeature("Lifetimes"), .enableUpcomingFeature("LifetimeDependence"), .enableUpcomingFeature("NonisolatedNonsendingByDefault"), .enableUpcomingFeature("InferIsolatedConformances"), @@ -21,7 +22,10 @@ let package = Package( targets: ["HTTPServer"]) ], dependencies: [ - .package(url: "https://github.com/apple/swift-collections.git", from: "1.0.4"), + .package( + url: "https://github.com/FranzBusch/swift-collections.git", + branch: "fb-async" + ), .package(url: "https://github.com/apple/swift-http-types.git", from: "1.0.0"), .package(url: "https://github.com/apple/swift-distributed-tracing.git", from: "1.0.0"), .package(url: "https://github.com/apple/swift-certificates.git", from: "1.0.4"), @@ -48,7 +52,9 @@ let package = Package( .target( name: "HTTPServer", dependencies: [ + "AsyncStreaming", .product(name: "DequeModule", package: "swift-collections"), + .product(name: "BasicContainers", package: "swift-collections"), .product(name: "X509", package: "swift-certificates"), .product(name: "HTTPTypes", package: "swift-http-types"), .product(name: "NIOCore", package: "swift-nio"), @@ -71,6 +77,13 @@ let package = Package( ], swiftSettings: extraSettings ), + .target( + name: "AsyncStreaming", + dependencies: [ + .product(name: "BasicContainers", package: "swift-collections") + ], + swiftSettings: extraSettings + ), .testTarget( name: "HTTPServerTests", dependencies: [ diff --git a/Sources/AsyncStreaming/EitherError.swift b/Sources/AsyncStreaming/EitherError.swift new file mode 100644 index 0000000..69d2138 --- /dev/null +++ b/Sources/AsyncStreaming/EitherError.swift @@ -0,0 +1,56 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift HTTP API Proposal open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of Swift HTTP API Proposal project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +/// An enumeration that represents one of two possible error types. +/// +/// ``EitherError`` provides a type-safe way to represent errors that can be one of two distinct +/// error types. +public enum EitherError: Error { + /// An error of the first type. + /// + /// The associated value contains the specific error instance of type `First`. + case first(First) + + /// An error of the second type. + /// + /// The associated value contains the specific error instance of type `Second`. + case second(Second) + + /// Throws the underlying error by unwrapping this either error. + /// + /// This method extracts and throws the actual error contained within the either error, + /// whether it's the first or second type. This is useful when you need to propagate + /// the original error without the either error wrapper. + /// + /// - Throws: The underlying error, either of type `First` or `Second`. + /// + /// ## Example + /// + /// ```swift + /// do { + /// // Some operation that returns EitherError + /// let result = try await operation() + /// } catch let eitherError as EitherError { + /// try eitherError.unwrap() // Throws the original error + /// } + /// ``` + public func unwrap() throws { + switch self { + case .first(let first): + throw first + case .second(let second): + throw second + } + } +} diff --git a/Sources/AsyncStreaming/Internal/InlineArray+convenience.swift b/Sources/AsyncStreaming/Internal/InlineArray+convenience.swift new file mode 100644 index 0000000..30e66c4 --- /dev/null +++ b/Sources/AsyncStreaming/Internal/InlineArray+convenience.swift @@ -0,0 +1,24 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift HTTP API Proposal open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of Swift HTTP API Proposal project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@available(macOS 26.0, iOS 26.0, watchOS 26.0, tvOS 26.0, visionOS 26.0, *) +extension InlineArray where Element: ~Copyable { + package static func one(value: consuming Element) -> InlineArray<1, Element> { + return InlineArray<1, Element>(first: value) { _ in fatalError() } + } + + package static func zero(of elementType: Element.Type = Element.self) -> InlineArray<0, Element> { + return InlineArray<0, Element> { _ in } + } +} diff --git a/Sources/AsyncStreaming/Internal/Optional+SendingTake.swift b/Sources/AsyncStreaming/Internal/Optional+SendingTake.swift new file mode 100644 index 0000000..604d0a2 --- /dev/null +++ b/Sources/AsyncStreaming/Internal/Optional+SendingTake.swift @@ -0,0 +1,22 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift HTTP API Proposal open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of Swift HTTP API Proposal project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +extension Optional where Wrapped: ~Copyable { + @inlinable + mutating func takeSending() -> sending Self { + let result = consume self + self = nil + return result + } +} diff --git a/Sources/AsyncStreaming/README.md b/Sources/AsyncStreaming/README.md new file mode 100644 index 0000000..132c07e --- /dev/null +++ b/Sources/AsyncStreaming/README.md @@ -0,0 +1,394 @@ +# AsyncReader and AsyncWriter protocols + +## Introduction + +This target introduces new `AsyncReader` and `AsyncWriter` protocols that +provide a pull/push-based interface for asynchronous streaming such as file I/O, +networking and more. It builds on the learnings of `AsyncSequence` with support +for `~Copyable` and `~Escapable` types, typed throws, lifetimes and more. + +## Motivation + +While `AsyncSequence` has seen widespread adoption for consuming asynchronous +streams, several limitations have emerged over the past years: + +### No support for `~Copyable` and `~Escapable` types + +`AsyncSequence` was introduced before `~Copyable` and `~Escapable` types were +introduced, hence, the current `AsyncSequence` protocol's does not support types +with those constraints. Furthermore, it doesn't allow elements with those +constraints either. + +### Iterator pattern isn't fitting + +`AsyncSequence` followed the design principles of its synchronous counter part +`Sequence`. While iterators are a good abstraction for those it became obvious +that for asynchronous sequences they aren't a good fit. This is due to two +reasons. First, most asynchronous sequences do not support multiple iterators. +Secondly, most asynchronous sequences are not replayable. + +### Seeking + +Some asynchronous sequences allow seeking the current position forward or +backwards such as asynchronous sequences that represent an array or a file. +However there is currently no protocol to express this. APIs such as resumable +HTTP uploads could leverage such a protocol by seeking forward the asynchronous +sequence to the position where the upload last stopped. + +### Final elements + +Some asynchronous sequences can finish with a special last element. A common +example are HTTP trailers that are an optional part at the end of an HTTP +request or response. The current `AsyncSequence` protocol only allows to express +this by making the `Element` an `Either` like type. + +### Bulk iteration + +The current `AsyncIterator.next()` method only allows iteration element by +element. This limits performance by requiring multiple calls to retrieve +elements from the iterator even if those elements are already available. + +### Bi-directional streaming and Structured Concurrency + +`AsyncSequence`s are used to express a series of asynchronous elements such as +the requests or response body parts of an HTTP request. Various APIs around the +ecosystem have adopted `AsyncSequence`s such as `NIOFileSystem`, +`AsyncHTTPClient` or `grpc-swift`. During the design and implementation of APIs +that support bi-directional streaming such as HTTP or gRPC it became apparent +that pull-based `AsyncSequence`s model is only working for one side of the +bi-directional streaming. Trying to express both side as an `AsyncSequence` +forced the introduction of unstructured tasks breaking Structured Concurrency +guarantees. + +```swift +func bidirectionalStreaming(input: some AsyncSequence) async throws -> some AsyncSequence { + // The output async sequence can start producing values before the input has been fully streamed + // this forces us to create an unstructured task to continue iterating the input after the return of this method + Task { + for await byte in input { + // Send byte + } + } + return ConcreteAsyncSequence() +} +``` + +This is due to that fact that `AsyncSequence` is a pull-based model, if the +input and output in a bi-directional streaming setup are related then using a +pull-based model into both directions can work; however, when the two are +unrelated then a push-based model for the output is a better fit. Hence, we see +a proliferation of asynchronous writer protocols and types throughout the +ecosystem such as: +- [NIOAsyncWriter](https://github.com/apple/swift-nio/blob/main/Sources/NIOCore/AsyncSequences/NIOAsyncWriter.swift) +- [WritableFileHandleProtocol](https://github.com/apple/swift-nio/blob/767ea9ee09c4227d32f230c7e24bb9f5a6a5cfd9/Sources/NIOFS/FileHandleProtocol.swift#L448) +- [RPCWriterProtocol](https://github.com/grpc/grpc-swift-2/blob/5c04d83ba35f4343dcf691a000bcb89f68755587/Sources/GRPCCore/Streaming/RPCWriterProtocol.swift#L19) + +### Some algorithms break Structured Concurrency + +During the implementation of various algorithms inside `swift-async-algorithms`, +we learned that whenever the production of values needs to outlive a single call +to the iterator's `next()` method it forced us to use unstructured tasks. +Examples of this are: +- [merge](https://github.com/apple/swift-async-algorithms/blob/26111a6fb73ce448a41579bbdb12bdebd66672f1/Sources/AsyncAlgorithms/Merge/AsyncMerge2Sequence.swift#L16) + where a single call to `next` races multiple base asynchronous sequences. We + return the first value produced by any of the bases but the calls to the other + bases still need to continue. +- [zip](https://github.com/apple/swift-async-algorithms/blob/26111a6fb73ce448a41579bbdb12bdebd66672f1/Sources/AsyncAlgorithms/Zip/AsyncZip2Sequence.swift#L15) + same problem as `merge`. +- [buffer](https://github.com/apple/swift-async-algorithms/blob/26111a6fb73ce448a41579bbdb12bdebd66672f1/Sources/AsyncAlgorithms/Buffer/AsyncBufferSequence.swift#L25) + where the base needs to produce elements until the buffer is full + +While the implementations try their best to make the usage of unstructured tasks +as _structured_ as possible, there are multiple problems with their usage: +1. Cancellation needs to be propagated manually +2. Priority escalation needs to be propagated manually +3. Task executor preference needs to be propagated manually +4. Task locals are only copied on the first call to `next` + +## Proposed solution + +### `AsyncReader` + +`AsyncReader` is a replacement to `AsyncSequence` that addresses the above +limitations. It allows `~Copyable` elements and offers bulk +iteration by providing a `Span`. + +```swift +try await fileReader.read { span in + print(span.count) +} +``` + +### `ConcludingAsyncReader` + +The `ConcludingAsyncReader` is a new type that provides scoped access to an +`AsyncReader`. Once, the user is done with the `AsyncReader` the concluding +final element is returned. + +```swift +let trailers = try await httpRequestConcludingReader.consumeAndConclude { bodyReader in + // Use the bodyReader to read the HTTP request body + try await bodyReader.read { chunk in + print(chunk) + } +} + +// The trailers are returned once we are done with the body reader +print(trailers) +``` + +### `AsyncWriter` + +`AsyncWriter` is the push-based counter part to `AsyncReader` that models an +asynchronous writable type. Similar to `AsyncReader` it allows `~Copyable` elements + and offers bulk writing by offering an `OutputSpan` to write into. + +```swift +var values = [1, 2, 3, 4] +try await fileWriter.write { outputSpan in + for value in values { + outputSpan.append(value) + } +} +``` + +### `ConcludingAsyncWriter` + +`ConcludingAsyncWriter` is the counter part to the `ConcludingAsyncReader`. It +provides access to a scoped writer. Once the user is done with the writer they +can return a final element. + +```swift +try await httpRequestConcludingWriter.consumeAndConclude { bodyWriter in + // Use the bodyWriter to write the HTTP request body + try await bodyWriter.write(values.span.bytes) + + // Return the trailers as the final element + return HTTPFields(...) +} +``` + +### `Seekable` + +`Seekable` is a protocol that types can conform to indicate that their position +in a stream can be moved. + +```swift +var fileReader = ... +// Seeks to the 5th position of the front of the stream. +fileReader.seek(to: .front(5)) +``` + +### `Runnable` + +// TODO + +```swift +try await merge(reader1, reader) + .forEach { element in + print(element) + } +``` + +## Detailed design + +### `AsyncReader` + +```swift +/// A protocol that represents an asynchronous reader capable of reading elements from some source. +/// +/// ``AsyncReader`` defines an interface for types that can asynchronously read elements +/// of a specified type from a source. +@available(macOS 26.0, iOS 26.0, watchOS 26.0, tvOS 26.0, visionOS 26.0, *) +public protocol AsyncReader: ~Copyable, ~Escapable { + /// The type of elements that can be read by this reader. + associatedtype ReadElement: ~Copyable + + /// The type of error that can be thrown during reading operations. + associatedtype ReadFailure: Error + + /// Reads elements from the underlying source and processes them with the provided body function. + /// + /// This method asynchronously reads a span of elements from whatever source the reader + /// represents, then passes them to the provided body function. The operation may complete immediately + /// or may await resources or processing time. + /// + /// - Parameter maximumCount: The maximum count of items the caller is ready + /// to process, or nil if the caller is prepared to accept an arbitrarily + /// large span. If non-nil, the maximum must be greater than zero. + /// + /// - Parameter body: A function that consumes a span of read elements and performs some operation + /// on them, returning a value of type `Return`. When the span is empty, it indicates + /// the end of the reading operation or stream. + /// + /// - Returns: The value returned by the body function after processing the read elements. + /// + /// - Throws: An `EitherError` containing either a `ReadFailure` from the read operation + /// or a `Failure` from the body closure. + /// + /// ```swift + /// var fileReader: FileAsyncReader = ... + /// + /// // Read data from a file asynchronously and process it + /// let result = try await fileReader.read { data in + /// guard data.count > 0 else { + /// // Handle end of stream/terminal value + /// return finalProcessedValue + /// } + /// // Process the data + /// return data + /// } + /// ``` + @_lifetime(&self) + mutating func read( + maximumCount: Int?, + body: (consuming Span) async throws(Failure) -> Return + ) async throws(EitherError) -> Return +} +``` + +### `ConcludingAsyncReader` + +```swift +/// A protocol that provides access to an asynchronous reader and concludes with a final value. +/// +/// ``ConcludingAsyncReader`` adds functionality to asynchronous readers that need to +/// provide a conclusive element after all reads are completed. This is particularly useful +/// for streams that have meaningful completion states beyond just terminating, such as +/// HTTP responses that include headers after the body is fully read. +public protocol ConcludingAsyncReader: ~Copyable { + /// The underlying asynchronous reader type that produces elements. + associatedtype Underlying: AsyncReader, ~Copyable, ~Escapable + + /// The type of the final element produced after all reads are completed. + associatedtype FinalElement + + /// Processes the underlying async reader until completion and returns both the result of processing + /// and a final element. + /// + /// - Parameter body: A closure that takes the underlying `AsyncReader` and returns a value. + /// - Returns: A tuple containing the value returned by the body closure and the final element. + /// - Throws: Any error thrown by the body closure or encountered while processing the reader. + /// + /// - Note: This method consumes the concluding async reader, meaning it can only be called once on a value type. + /// + /// ```swift + /// let responseReader: HTTPResponseReader = ... + /// + /// // Process the body while capturing the final response status + /// let (bodyData, statusCode) = try await responseReader.consumeAndConclude { reader in + /// var collectedData = Data() + /// while let chunk = try await reader.read(body: { $0 }) { + /// collectedData.append(chunk) + /// } + /// return collectedData + /// } + /// ``` + consuming func consumeAndConclude( + body: (consuming sending Underlying) async throws -> Return + ) async throws -> (Return, FinalElement) +} +``` + +### `AsyncWriter` + +```swift +/// A protocol that represents an asynchronous writer capable of providing a buffer to write into. +/// +/// ``AsyncWriter`` defines an interface for types that can asynchronously write elements +/// to a destination by providing an output span buffer for efficient batch writing operations. +public protocol AsyncWriter: ~Copyable, ~Escapable { + /// The type of elements that can be written by this writer. + associatedtype WriteElement: ~Copyable + + /// The type of error that can be thrown during writing operations. + associatedtype WriteFailure: Error + + /// Provides a buffer to write elements into. + /// + /// This method supplies an output span that the body closure can use to append elements + /// for writing. The writer manages the buffer allocation and handles the actual writing + /// operation once the body closure completes. + /// + /// - Parameter body: A closure that receives an `OutputSpan` for appending elements + /// to write. The closure can return a result of type `Result`. + /// + /// - Returns: The value returned by the body closure. + /// + /// - Throws: An `EitherError` containing either a `WriteFailure` from the write operation + /// or a `Failure` from the body closure. + /// + /// ## Example + /// + /// ```swift + /// var writer: SomeAsyncWriter = ... + /// + /// try await writer.write { outputSpan in + /// for item in items { + /// outputSpan.append(item) + /// } + /// return outputSpan.count + /// } + /// ``` + @_lifetime(self: copy self) + mutating func write( + _ body: (inout OutputSpan) async throws(Failure) -> Result + ) async throws(EitherError) -> Result +} +``` + +### `ConcludingAsyncWriter` + +```swift +/// A protocol that represents an asynchronous writer that produces a final value upon completion. +/// +/// ``ConcludingAsyncWriter`` adds functionality to asynchronous writers that need to +/// provide a conclusive element after writing is complete. This is particularly useful +/// for streams that have meaningful completion states, such as HTTP response that need +/// to finalize with optional trailers. +public protocol ConcludingAsyncWriter: ~Copyable { + /// The underlying asynchronous writer type. + associatedtype Underlying: AsyncWriter, ~Copyable, ~Escapable + + /// The type of the final element produced after writing is complete. + associatedtype FinalElement + + /// Allows writing to the underlying async writer and produces a final element upon completion. + /// + /// - Parameter body: A closure that takes the underlying writer and returns both a value and a final element. + /// - Returns: The value returned by the body closure. + /// - Throws: Any error thrown by the body closure or encountered while writing. + /// + /// - Note: This method consumes the concluding async writer, meaning it can only be called once on a value type. + /// + /// ```swift + /// let responseWriter: HTTPResponseWriter = ... + /// + /// // Write the response body and produce a final status + /// let result = try await responseWriter.produceAndConclude { writer in + /// try await writer.write(data) + /// return (true, trailers) + /// } + /// ``` + consuming func produceAndConclude( + body: (consuming sending Underlying) async throws -> (Return, FinalElement) + ) async throws -> Return +} +``` + +## Alternatives considered + +### Naming + +We considered various other names for these types such as: + +- `AsyncReader` and `AsyncWriter` alternatives: + - `AsyncReadable` and `AsyncWritable` +- `ConcludingAsyncReader` and `ConcludingAsyncWriter` alternatives: + - `FinalElementAsyncReader` and `FinalElementAsyncWriter` + +### Async generators + +Asynchronous generators might provide an alternative to the current +`AsyncSequence` and the `AsyncReader` here. However, they would require +significant compiler features and potentially only replace the _read_ side. diff --git a/Sources/AsyncStreaming/Reader/Array+AsyncReader.swift b/Sources/AsyncStreaming/Reader/Array+AsyncReader.swift new file mode 100644 index 0000000..826e86b --- /dev/null +++ b/Sources/AsyncStreaming/Reader/Array+AsyncReader.swift @@ -0,0 +1,89 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift HTTP API Proposal open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of Swift HTTP API Proposal project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@available(macOS 26.0, iOS 26.0, watchOS 26.0, tvOS 26.0, visionOS 26.0, *) +extension Array { + /// Creates an async reader that provides access to the array's elements. + /// + /// This method converts an array into an ``AsyncReader`` implementation, allowing + /// the array's elements to be read through the async reader interface. + /// + /// - Returns: An ``AsyncReader`` that produces all elements of the array. + /// + /// ## Example + /// + /// ```swift + /// let numbers = [1, 2, 3, 4, 5] + /// var reader = numbers.asyncReader() + /// + /// try await reader.forEach { span in + /// print("Read \(span.count) numbers") + /// } + /// ``` + public func asyncReader() -> some AsyncReader & SendableMetatype { + return ArrayAsyncReader(array: self) + } +} + +/// An async reader implementation that provides array elements through the AsyncReader interface. +/// +/// This internal reader type wraps an array and delivers its elements through the ``AsyncReader`` +/// protocol. It maintains a current read position and can deliver elements in chunks based on +/// the requested maximum count. +@available(macOS 26.0, iOS 26.0, watchOS 26.0, tvOS 26.0, visionOS 26.0, *) +struct ArrayAsyncReader: AsyncReader { + typealias ReadElement = Element + typealias ReadFailure = Never + + let array: [Element] + var index: Array.Index + + init(array: [Element]) { + self.array = array + self.index = array.startIndex + } + + mutating func read( + maximumCount: Int?, + body: + nonisolated(nonsending) ( + consuming Span + ) async throws(Failure) -> Return + ) async throws(EitherError) -> Return { + do { + guard self.index < self.array.endIndex else { + return try await body([Element]().span) + } + + guard let maximumCount else { + defer { + self.index = self.array.span.indices.endIndex + } + return try await body(self.array.span.extracting(self.index...)) + } + let endIndex = min( + self.array.span.indices.endIndex, + self.index.advanced( + by: maximumCount + ) + ) + defer { + self.index = endIndex + } + return try await body(self.array.span.extracting(self.index..( + upTo limit: Int, + body: (Span) async throws(Failure) -> Result + ) async throws(EitherError) -> Result { + var buffer = [ReadElement]() + buffer.reserveCapacity(limit) + var shouldContinue = true + do { + while shouldContinue { + try await self.read(maximumCount: limit - buffer.count) { span in + guard span.count > 0 else { + shouldContinue = false + return + } + precondition(span.count <= limit - buffer.count) + for index in span.indices { + buffer.append(span[index]) + } + } + } + } catch { + switch error { + case .first(let error): + throw .first(error) + case .second: + fatalError() + } + } + do { + return try await body(buffer.span) + } catch { + throw .second(error) + } + } + + /// Collects elements from the reader up to a specified limit and processes them with a body function. + /// + /// This method continuously reads elements from the async reader, accumulating them in a buffer + /// until either the end of the stream is reached (indicated by a `nil` element) or the specified + /// limit is exceeded. Once collection is complete, the accumulated elements are passed to the + /// provided body function as a `Span` for processing. + /// + /// - Parameters: + /// - limit: The maximum number of elements to collect before throwing a `LimitExceeded` error. + /// This prevents unbounded memory growth when reading from potentially infinite streams. + /// - body: A closure that receives a `Span` containing all collected elements and returns + /// a result of type `Result`. This closure is called once after all elements have been + /// collected successfully. + /// + /// - Returns: The value returned by the body closure after processing the collected elements. + /// + /// ## Example + /// + /// ```swift + /// var reader: SomeAsyncReader = ... + /// + /// let processedData = try await reader.collect(upTo: 1000) { span in + /// // Process all collected elements + /// } + /// ``` + /// + /// ## Memory Considerations + /// + /// Since this method buffers all elements in memory before processing, it should be used + /// with caution on large datasets. The `limit` parameter serves as a safety mechanism + /// to prevent excessive memory usage. + #if compiler(<6.3) + @_lifetime(&self) + #endif + public mutating func collect( + upTo limit: Int, + body: (Span) async -> Result + ) async -> Result where ReadFailure == Never { + var buffer = [ReadElement]() + buffer.reserveCapacity(limit) + var shouldContinue = true + while limit - buffer.count > 0 && shouldContinue { + // This force-try is safe since neither read nor the closure are throwing + try! await self.read(maximumCount: limit - buffer.count) { span in + precondition(span.count <= limit - buffer.count) + guard span.count > 0 else { + // This means the underlying reader is finished and we can return + shouldContinue = false + return + } + for index in span.indices { + buffer.append(span[index]) + } + } + } + return await body(buffer.span) + } + + /// Collects elements from the reader into an output span until the span is full. + /// + /// This method continuously reads elements from the async reader and appends them to the + /// provided output span until the span reaches its capacity. This provides an efficient + /// way to fill a pre-allocated buffer with elements from the reader. + /// + /// - Parameter outputSpan: An `OutputSpan` to append read elements into. The method continues + /// reading until this span is full. + /// + /// - Throws: An error of type `ReadFailure` if any read operation fails. + /// + /// ## Example + /// + /// ```swift + /// var reader: SomeAsyncReader = ... + /// var buffer = [Int](repeating: 0, count: 100) + /// + /// try await buffer.withOutputSpan { outputSpan in + /// try await reader.collect(into: &outputSpan) + /// } + /// ``` + #if compiler(<6.3) + @_lifetime(&self) + #endif + public mutating func collect( + into outputSpan: inout OutputSpan + ) async throws(ReadFailure) { + while !outputSpan.isFull { + do { + try await self.read(maximumCount: outputSpan.freeCapacity) { span in + for index in span.indices { + outputSpan.append(span[index]) + } + } + } catch { + switch error { + case .first(let error): + throw error + case .second: + fatalError() + } + } + } + } +} diff --git a/Sources/AsyncStreaming/Reader/AsyncReader+forEach.swift b/Sources/AsyncStreaming/Reader/AsyncReader+forEach.swift new file mode 100644 index 0000000..6908a8e --- /dev/null +++ b/Sources/AsyncStreaming/Reader/AsyncReader+forEach.swift @@ -0,0 +1,104 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift HTTP API Proposal open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of Swift HTTP API Proposal project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +// swift-format-ignore: AmbiguousTrailingClosureOverload +@available(macOS 26.0, iOS 26.0, watchOS 26.0, tvOS 26.0, visionOS 26.0, *) +extension AsyncReader where Self: ~Copyable, Self: ~Escapable { + /// Iterates over all elements from the reader, executing the provided body for each span. + /// + /// This method continuously reads elements from the async reader until the stream ends, + /// executing the provided closure for each span of elements read. The iteration terminates + /// when the reader produces an empty span, indicating the end of the stream. + /// + /// - Parameter body: An asynchronous closure that processes each span of elements read + /// from the stream. The closure receives a `Span` for each read operation. + /// + /// - Throws: An `EitherError` containing either a `ReadFailure` from the read operation + /// or a `Failure` from the body closure. + /// + /// ## Example + /// + /// ```swift + /// var fileReader: FileAsyncReader = ... + /// + /// // Process each chunk of data from the file + /// try await fileReader.forEach { chunk in + /// print("Processing \(chunk.count) elements") + /// // Process the chunk + /// } + /// ``` + public consuming func forEach( + body: (consuming Span) async throws(Failure) -> Void + ) async throws(EitherError) { + var shouldContinue = true + while shouldContinue { + try await self.read(maximumCount: nil) { (next) throws(Failure) -> Void in + guard next.count > 0 else { + shouldContinue = false + return + } + + try await body(next) + } + } + } + + /// Iterates over all elements from the reader, executing the provided body for each span. + /// + /// This method continuously reads elements from the async reader until the stream ends, + /// executing the provided closure for each span of elements read. The iteration terminates + /// when the reader produces an empty span, indicating the end of the stream. + /// + /// - Parameter body: An asynchronous closure that processes each span of elements read + /// from the stream. The closure receives a `Span` for each read operation. + /// + /// - Throws: An error of type `Failure` from the body closure. Since this reader never fails, + /// only the body closure can throw errors. + /// + /// ## Example + /// + /// ```swift + /// var fileReader: FileAsyncReader = ... + /// + /// // Process each chunk of data from the file + /// try await fileReader.forEach { chunk in + /// print("Processing \(chunk.count) elements") + /// // Process the chunk + /// } + /// ``` + public consuming func forEach( + body: (consuming Span) async throws(Failure) -> Void + ) async throws(Failure) where ReadFailure == Never { + var shouldContinue = true + while shouldContinue { + do { + try await self.read(maximumCount: nil) { (next) throws(Failure) -> Void in + guard next.count > 0 else { + shouldContinue = false + return + } + + try await body(next) + } + } catch { + switch error { + case .first: + fatalError() + case .second(let error): + throw error + } + } + } + } +} diff --git a/Sources/AsyncStreaming/Reader/AsyncReader+map.swift b/Sources/AsyncStreaming/Reader/AsyncReader+map.swift new file mode 100644 index 0000000..3d2c7f5 --- /dev/null +++ b/Sources/AsyncStreaming/Reader/AsyncReader+map.swift @@ -0,0 +1,99 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift HTTP API Proposal open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of Swift HTTP API Proposal project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import BasicContainers + +@available(macOS 26.0, iOS 26.0, watchOS 26.0, tvOS 26.0, visionOS 26.0, *) +extension AsyncReader where Self: ~Copyable, Self: ~Escapable { + /// Transforms elements read from this reader using the provided transformation function. + /// + /// This method creates a new async reader that applies the specified transformation to each + /// element read from the underlying reader. The transformation is applied lazily as elements + /// are read, maintaining the streaming nature of the operation. + /// + /// - Parameter transformation: An asynchronous closure that transforms each read element + /// of type `ReadElement` into a new element of type `MappedElement`. + /// + /// - Returns: A new ``AsyncReader`` that produces transformed elements of type `MappedElement`. + /// + /// ## Example + /// + /// ```swift + /// var dataReader: SomeAsyncReader = ... + /// + /// // Transform the spans into their element count + /// var countReader = dataReader.map { span in + /// span.count + /// } + /// + /// try await countReader.forEach { span in + /// print("Received chunk with \(span[0]) values") + /// } + /// ``` + @_lifetime(copy self) + public consuming func map( + _ transformation: @escaping (borrowing ReadElement) async -> MappedElement + ) -> some (AsyncReader & ~Copyable & ~Escapable) { + return AsyncMapReader(base: self, transformation: transformation) + } +} + +/// An async reader that transforms elements from a base reader using a mapping function. +/// +/// This internal reader type wraps another async reader and applies a transformation +/// to each element read from the base reader. The transformation is applied lazily +/// as elements are read, maintaining the streaming nature of the operation. +@available(macOS 26.0, iOS 26.0, watchOS 26.0, tvOS 26.0, visionOS 26.0, *) +struct AsyncMapReader: AsyncReader, ~Copyable, ~Escapable { + typealias ReadElement = MappedElement + typealias ReadFailure = Base.ReadFailure + + var base: Base + var transformation: (borrowing Base.ReadElement) async -> MappedElement + + @_lifetime(copy base) + init( + base: consuming Base, + transformation: @escaping (borrowing Base.ReadElement) async -> MappedElement + ) { + self.base = base + self.transformation = transformation + } + + #if compiler(<6.3) + @_lifetime(&self) + #endif + mutating func read( + maximumCount: Int?, + body: nonisolated(nonsending) (consuming Span) async throws(Failure) -> Return + ) async throws(EitherError) -> Return { + var buffer = RigidArray() + return try await self.base + .read(maximumCount: maximumCount) { (span) throws(Failure) -> Return in + guard span.count > 0 else { + let emptySpan = InlineArray<0, MappedElement>.zero() + return try await body(emptySpan.span) + } + + buffer.reserveCapacity(span.count) + + for index in span.indices { + let transformed = await self.transformation(span[index]) + buffer.append(transformed) + } + + return try await body(buffer.span) + } + } +} diff --git a/Sources/AsyncStreaming/Reader/AsyncReader.swift b/Sources/AsyncStreaming/Reader/AsyncReader.swift new file mode 100644 index 0000000..68a8e77 --- /dev/null +++ b/Sources/AsyncStreaming/Reader/AsyncReader.swift @@ -0,0 +1,159 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift HTTP API Proposal open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of Swift HTTP API Proposal project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +/// A protocol that represents an asynchronous reader capable of reading elements from some source. +/// +/// ``AsyncReader`` defines an interface for types that can asynchronously read elements +/// of a specified type from a source. +@available(macOS 26.0, iOS 26.0, watchOS 26.0, tvOS 26.0, visionOS 26.0, *) +public protocol AsyncReader: ~Copyable, ~Escapable { + /// The type of elements that can be read by this reader. + associatedtype ReadElement: ~Copyable + + /// The type of error that can be thrown during reading operations. + associatedtype ReadFailure: Error + + /// Reads elements from the underlying source and processes them with the provided body closure. + /// + /// This method asynchronously reads a span of elements from whatever source the reader + /// represents, then passes them to the provided body closure. The operation may complete immediately + /// or may await resources or processing time. + /// + /// - Parameter maximumCount: The maximum count of items the caller is ready + /// to process, or nil if the caller is prepared to accept an arbitrarily + /// large span. If non-nil, the maximum must be greater than zero. + /// + /// - Parameter body: A closure that consumes a span of read elements and performs some operation + /// on them, returning a value of type `Return`. When the span is empty, it indicates + /// the end of the reading operation or stream. + /// + /// - Returns: The value returned by the body closure after processing the read elements. + /// + /// - Throws: An `EitherError` containing either a `ReadFailure` from the read operation + /// or a `Failure` from the body closure. + /// + /// ```swift + /// var fileReader: FileAsyncReader = ... + /// + /// // Read data from a file asynchronously and process it + /// let result = try await fileReader.read { data in + /// guard data.count > 0 else { + /// // Handle end of stream/terminal value + /// return finalProcessedValue + /// } + /// // Process the data + /// return data + /// } + /// ``` + #if compiler(<6.3) + @_lifetime(&self) + #endif + mutating func read( + maximumCount: Int?, + body: (consuming Span) async throws(Failure) -> Return + ) async throws(EitherError) -> Return + +} + +@available(macOS 26.0, iOS 26.0, watchOS 26.0, tvOS 26.0, visionOS 26.0, *) +extension AsyncReader where Self: ~Copyable, Self: ~Escapable { + /// Reads elements from the underlying source and processes them with the provided body closure. + /// + /// This is a convenience method for async readers that never fail, simplifying the error handling + /// by directly throwing the body closure's error type instead of wrapping it in `EitherError`. + /// + /// - Parameter maximumCount: The maximum count of items the caller is ready to process, + /// or nil if the caller is prepared to accept an arbitrarily large span. + /// + /// - Parameter body: A closure that consumes a span of read elements and performs some operation + /// on them, returning a value of type `Return`. + /// + /// - Returns: The value returned by the body closure after processing the read elements. + /// + /// - Throws: An error of type `Failure` if the body closure throws. + /// + /// ## Example + /// + /// ```swift + /// var reader: some AsyncReader = ... // Never-failing reader + /// + /// let result = try await reader.read(maximumCount: 100) { span in + /// // Process the span + /// return span.count + /// } + /// ``` + #if compiler(<6.3) + @_lifetime(&self) + #endif + public mutating func read( + maximumCount: Int?, + body: (consuming Span) async throws(Failure) -> Return + ) async throws(Failure) -> Return where Self.ReadFailure == Never { + do { + return try await self.read(maximumCount: maximumCount) { (span) throws(Failure) -> Return in + return try await body(span) + } + } catch { + switch error { + case .first: + fatalError() + case .second(let error): + throw error + } + } + } +} + +@available(macOS 26.0, iOS 26.0, watchOS 26.0, tvOS 26.0, visionOS 26.0, *) +extension AsyncReader where ReadElement: Copyable { + /// Reads elements from this reader into the provided output span. + /// + /// This method reads a span of elements from the underlying reader and appends them + /// to the provided output span. This is a convenience method for readers with copyable + /// elements that need to populate an existing output buffer. The method reads up to + /// the free capacity available in the output span. + /// + /// - Parameter outputSpan: An `OutputSpan` to append read elements into. + /// + /// - Throws: An error of type `ReadFailure` if the read operation cannot be completed successfully. + /// + /// ## Example + /// + /// ```swift + /// var reader: some AsyncReader = ... + /// var buffer = [Int](repeating: 0, count: 100) + /// + /// await buffer.withOutputSpan { outputSpan in + /// await reader.read(into: &outputSpan) + /// } + /// ``` + public mutating func read( + into outputSpan: inout OutputSpan + ) async throws(ReadFailure) { + do { + try await self.read(maximumCount: outputSpan.freeCapacity) { span in + for i in span.indices { + outputSpan.append(span[i]) + } + } + } catch { + switch error { + case .first(let error): + throw error + case .second: + fatalError() + } + } + } +} diff --git a/Sources/AsyncStreaming/Reader/ConcludingAsyncReader+collect.swift b/Sources/AsyncStreaming/Reader/ConcludingAsyncReader+collect.swift new file mode 100644 index 0000000..c2b7f9c --- /dev/null +++ b/Sources/AsyncStreaming/Reader/ConcludingAsyncReader+collect.swift @@ -0,0 +1,54 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift HTTP API Proposal open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of Swift HTTP API Proposal project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@available(macOS 26.0, iOS 26.0, watchOS 26.0, tvOS 26.0, visionOS 26.0, *) +extension ConcludingAsyncReader where Self: ~Copyable { + /// Collects elements from the underlying async reader and returns both the processed result and final element. + /// + /// This method provides a convenient way to collect elements from the underlying reader while + /// capturing both the processing result and the final element that concludes the reading operation. + /// It combines the functionality of ``AsyncReader/collect(upTo:body:)-(_,(Span) -> Result)`` from ``AsyncReader`` with the concluding + /// behavior of ``ConcludingAsyncReader``. + /// + /// - Parameters: + /// - limit: The maximum number of elements to collect from the underlying reader. + /// - body: A closure that processes the collected elements as a `Span` and returns a result. + /// + /// - Returns: A tuple containing the result from processing the collected elements and the final element. + /// + /// - Throws: Any error thrown by the underlying read operations or the body closure during + /// the collection and processing of elements. + /// + /// ## Example + /// + /// ```swift + /// let responseReader: HTTPConcludingReader = ... + /// + /// // Collect response data and get final headers + /// let (processedData, finalHeaders) = try await responseReader.collect(upTo: 1024 * 1024) { span in + /// // Process all collected elements + /// } + /// ``` + public consuming func collect( + upTo limit: Int, + body: (Span) async throws -> Result + ) async throws -> (Result, FinalElement) where Underlying.ReadElement: Copyable { + try await self.consumeAndConclude { reader in + var reader = reader + return try await reader.collect(upTo: limit) { span in + try await body(span) + } + } + } +} diff --git a/Sources/AsyncStreaming/Reader/ConcludingAsyncReader.swift b/Sources/AsyncStreaming/Reader/ConcludingAsyncReader.swift new file mode 100644 index 0000000..fdeb697 --- /dev/null +++ b/Sources/AsyncStreaming/Reader/ConcludingAsyncReader.swift @@ -0,0 +1,53 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift HTTP API Proposal open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of Swift HTTP API Proposal project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +/// A protocol that represents an asynchronous reader that produces elements and concludes with a final value. +/// +/// ``ConcludingAsyncReader`` adds functionality to asynchronous readers that need to +/// provide a conclusive element after all reads are completed. This is particularly useful +/// for streams that have meaningful completion states beyond just terminating, such as +/// HTTP responses that include headers after the body is fully read. +@available(macOS 26.0, iOS 26.0, watchOS 26.0, tvOS 26.0, visionOS 26.0, *) +public protocol ConcludingAsyncReader: ~Copyable { + /// The underlying asynchronous reader type that produces elements. + associatedtype Underlying: AsyncReader, ~Copyable, ~Escapable + + /// The type of the final element produced after all reads are completed. + associatedtype FinalElement + + /// Processes the underlying async reader until completion and returns both the result of processing + /// and a final element. + /// + /// - Parameter body: A closure that takes the underlying `AsyncReader` and returns a value. + /// - Returns: A tuple containing the value returned by the body closure and the final element. + /// - Throws: Any error thrown by the body closure or encountered while processing the reader. + /// + /// - Note: This method consumes the concluding async reader, meaning it can only be called once on a value type. + /// + /// ```swift + /// let responseReader: HTTPResponseReader = ... + /// + /// // Process the body while capturing the final response status + /// let (bodyData, statusCode) = try await responseReader.consumeAndConclude { reader in + /// var collectedData = Data() + /// while let chunk = try await reader.read(body: { $0 }) { + /// collectedData.append(chunk) + /// } + /// return collectedData + /// } + /// ``` + consuming func consumeAndConclude( + body: (consuming sending Underlying) async throws(Failure) -> Return + ) async throws(Failure) -> (Return, FinalElement) +} diff --git a/Sources/AsyncStreaming/Writer/AsyncWriter+AsyncReader.swift b/Sources/AsyncStreaming/Writer/AsyncWriter+AsyncReader.swift new file mode 100644 index 0000000..6f90f9a --- /dev/null +++ b/Sources/AsyncStreaming/Writer/AsyncWriter+AsyncReader.swift @@ -0,0 +1,52 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift HTTP API Proposal open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of Swift HTTP API Proposal project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@available(macOS 26.0, iOS 26.0, watchOS 26.0, tvOS 26.0, visionOS 26.0, *) +extension AsyncWriter where Self: ~Copyable, Self: ~Escapable { + /// Writes all elements from an async reader to this writer. + /// + /// This method consumes an async reader and writes all its elements to the underlying + /// writer destination. It continuously reads spans of elements from the reader and writes + /// them until the reader stream ends. + /// + /// - Parameter reader: An ``AsyncReader`` providing elements to write. The reader is + /// consumed by this operation. + /// + /// - Throws: An `EitherError` containing either a `ReadFailure` from the reader or a nested + /// `EitherError` from the write operation. + /// + /// ## Example + /// + /// ```swift + /// var fileWriter: FileAsyncWriter = ... + /// let dataReader: DataAsyncReader = ... + /// + /// // Copy all data from reader to writer + /// try await fileWriter.write(dataReader) + /// ``` + /// + /// ## Discussion + /// + /// This method provides a convenient way to pipe data from one async stream to another, + /// automatically handling the iteration and transfer of elements. The operation continues + /// until the reader signals completion by producing an empty span. + @_lifetime(self: copy self) + public mutating func write( + _ reader: consuming some (AsyncReader & ~Copyable & ~Escapable) + ) async throws(EitherError>) where WriteElement: Copyable { + try await reader.forEach { (span) throws(EitherError) -> Void in + try await self.write(span) + } + } +} diff --git a/Sources/AsyncStreaming/Writer/AsyncWriter.swift b/Sources/AsyncStreaming/Writer/AsyncWriter.swift new file mode 100644 index 0000000..300e76a --- /dev/null +++ b/Sources/AsyncStreaming/Writer/AsyncWriter.swift @@ -0,0 +1,151 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift HTTP API Proposal open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of Swift HTTP API Proposal project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +/// A protocol that represents an asynchronous writer capable of providing a buffer to write into. +/// +/// ``AsyncWriter`` defines an interface for types that can asynchronously write elements +/// to a destination by providing an output span buffer for efficient batch writing operations. +@available(macOS 26.0, iOS 26.0, watchOS 26.0, tvOS 26.0, visionOS 26.0, *) +public protocol AsyncWriter: ~Copyable, ~Escapable { + /// The type of elements that can be written by this writer. + associatedtype WriteElement: ~Copyable + + /// The type of error that can be thrown during writing operations. + associatedtype WriteFailure: Error + + /// Provides a buffer to write elements into. + /// + /// This method supplies an output span that the body closure can use to append elements + /// for writing. The writer manages the buffer allocation and handles the actual writing + /// operation once the body closure completes. + /// + /// - Parameter body: A closure that receives an `OutputSpan` for appending elements + /// to write. The closure can return a result of type `Result`. + /// + /// - Returns: The value returned by the body closure. + /// + /// - Throws: An `EitherError` containing either a `WriteFailure` from the write operation + /// or a `Failure` from the body closure. + /// + /// ## Example + /// + /// ```swift + /// var writer: SomeAsyncWriter = ... + /// + /// try await writer.write { outputSpan in + /// for item in items { + /// outputSpan.append(item) + /// } + /// return outputSpan.count + /// } + /// ``` + // TODO: EOF should be signaled by providing an empty output span? + @_lifetime(self: copy self) + mutating func write( + _ body: (inout OutputSpan) async throws(Failure) -> Result + ) async throws(EitherError) -> Result + + /// Writes a span of elements to the underlying destination. + /// + /// This method asynchronously writes all elements from the provided span to whatever destination + /// the writer represents. The operation may require multiple write calls to complete if the + /// writer cannot accept all elements at once. + /// + /// - Parameter span: The span of elements to write. + /// + /// - Throws: An `EitherError` containing either a `WriteFailure` from the write operation + /// or an `AsyncWriterWroteShortError` if the writer cannot accept any more data before + /// all elements are written. + /// + /// ## Example + /// + /// ```swift + /// var fileWriter: FileAsyncWriter = ... + /// let dataBuffer: [UInt8] = [1, 2, 3, 4, 5] + /// + /// // Write the entire span to a file asynchronously + /// try await fileWriter.write(dataBuffer.span) + /// ``` + @_lifetime(self: copy self) + mutating func write( + _ span: Span + ) async throws(EitherError) +} + +/// An error that indicates the writer was unable to accept all provided elements. +/// +/// This error is thrown when an async writer signals that it cannot accept any more data +/// by providing an empty output span, but there are still elements remaining to be written. +public struct AsyncWriterWroteShortError: Error { + public init() {} +} + +@available(macOS 26.0, iOS 26.0, watchOS 26.0, tvOS 26.0, visionOS 26.0, *) +extension AsyncWriter where Self: ~Copyable, Self: ~Escapable { + /// Writes the provided element to the underlying destination. + /// + /// This method asynchronously writes the given element to whatever destination the writer + /// represents. The operation may complete immediately or may await resources or processing time. + /// + /// - Parameter element: The element to write. This typically represents a single item or a collection + /// of items depending on the specific writer implementation. + /// + /// - Throws: An error of type `WriteFailure` if the write operation cannot be completed successfully. + /// + /// - Note: This method is marked as `mutating` because writing operations often change the internal + /// state of the writer. + /// + /// ```swift + /// var fileWriter: FileAsyncWriter = ... + /// + /// // Write data to a file asynchronously + /// try await fileWriter.write(dataChunk) + /// ``` + @_lifetime(self: copy self) + public mutating func write(_ element: consuming WriteElement) async throws(WriteFailure) { + // Since the element is ~Copyable but we don't have call-once closures + // we need to move it into an Optional and then take it out once. This + // also makes the below force unwrap safe + var opt = Optional(element) + do { + try await self.write { outputSpan in + outputSpan.append(opt.take()!) + } + } catch { + switch error { + case .first(let error): + throw error + case .second: + fatalError() + } + } + } + + @_lifetime(self: copy self) + public mutating func write(_ span: Span) async throws(EitherError) + where WriteElement: Copyable { + var index = span.indices.startIndex + while index < span.indices.endIndex { + try await self.write { (outputSpan) throws(AsyncWriterWroteShortError) -> Void in + guard outputSpan.capacity != 0 else { + throw AsyncWriterWroteShortError() + } + while !outputSpan.isFull && index < span.indices.endIndex { + outputSpan.append(span[index]) + index += 1 + } + } + } + } +} diff --git a/Sources/HTTPServer/AsyncPrimitives/ConcludingAsyncWriter.swift b/Sources/AsyncStreaming/Writer/ConcludingAsyncWriter.swift similarity index 65% rename from Sources/HTTPServer/AsyncPrimitives/ConcludingAsyncWriter.swift rename to Sources/AsyncStreaming/Writer/ConcludingAsyncWriter.swift index c92d880..b3fd5dd 100644 --- a/Sources/HTTPServer/AsyncPrimitives/ConcludingAsyncWriter.swift +++ b/Sources/AsyncStreaming/Writer/ConcludingAsyncWriter.swift @@ -1,12 +1,27 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift HTTP API Proposal open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of Swift HTTP API Proposal project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + /// A protocol that represents an asynchronous writer that produces a final value upon completion. /// /// ``ConcludingAsyncWriter`` adds functionality to asynchronous writers that need to /// provide a conclusive element after writing is complete. This is particularly useful /// for streams that have meaningful completion states, such as HTTP response that need /// to finalize with optional trailers. -public protocol ConcludingAsyncWriter: ~Copyable { +@available(macOS 26.0, iOS 26.0, watchOS 26.0, tvOS 26.0, visionOS 26.0, *) +public protocol ConcludingAsyncWriter: ~Copyable, ~Escapable { /// The underlying asynchronous writer type. - associatedtype Underlying: AsyncWriter, ~Copyable + associatedtype Underlying: AsyncWriter, ~Copyable, ~Escapable /// The type of the final element produced after writing is complete. associatedtype FinalElement @@ -33,6 +48,7 @@ public protocol ConcludingAsyncWriter: ~Copyable { ) async throws -> Return } +@available(macOS 26.0, iOS 26.0, watchOS 26.0, tvOS 26.0, visionOS 26.0, *) extension ConcludingAsyncWriter where Self: ~Copyable { /// Produces a final element using the underlying async writer without returning a separate value. /// @@ -64,6 +80,7 @@ extension ConcludingAsyncWriter where Self: ~Copyable { } } +@available(macOS 26.0, iOS 26.0, watchOS 26.0, tvOS 26.0, visionOS 26.0, *) extension ConcludingAsyncWriter where Self: ~Copyable { /// Writes a single element to the underlying writer and concludes with a final element. /// @@ -86,7 +103,7 @@ extension ConcludingAsyncWriter where Self: ~Copyable { /// ) /// ``` public consuming func writeAndConclude( - element: consuming Underlying.WriteElement, + _ element: consuming Underlying.WriteElement, finalElement: FinalElement ) async throws { var element = Optional.some(element) @@ -96,4 +113,35 @@ extension ConcludingAsyncWriter where Self: ~Copyable { return finalElement } } + + /// Writes a span of elements to the underlying writer and concludes with a final element. + /// + /// This is a convenience method for scenarios where you need to write multiple elements + /// from a span and then conclude the writing operation with a final element. It provides a + /// streamlined interface for batch write operations. + /// + /// - Parameter span: The span of elements to write to the underlying writer. + /// - Parameter finalElement: The final element to produce after writing is complete. + /// + /// - Throws: Any error encountered while writing the elements or during the concluding operation. + /// + /// ```swift + /// let responseWriter: HTTPResponseWriter = ... + /// + /// // Write multiple response chunks and conclude with headers + /// try await responseWriter.writeAndConclude( + /// dataSpan, + /// finalElement: responseHeaders + /// ) + /// ``` + public consuming func writeAndConclude( + _ span: consuming Span, + finalElement: FinalElement + ) async throws where Underlying.WriteElement: Copyable { + try await self.produceAndConclude { writer in + var writer = writer + try await writer.write(span) + return finalElement + } + } } diff --git a/Sources/AsyncStreaming/Writer/RigidArray+AsyncWriter.swift b/Sources/AsyncStreaming/Writer/RigidArray+AsyncWriter.swift new file mode 100644 index 0000000..a566483 --- /dev/null +++ b/Sources/AsyncStreaming/Writer/RigidArray+AsyncWriter.swift @@ -0,0 +1,58 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift HTTP API Proposal open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift HTTP API Proposal project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of Swift HTTP API Proposal project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +public import BasicContainers + +/// Conforms `RigidArray` to the ``AsyncWriter`` protocol. +/// +/// This extension enables `RigidArray` to be used as an asynchronous writer, allowing +/// elements to be appended through the async writer interface. +@available(macOS 26.0, iOS 26.0, watchOS 26.0, tvOS 26.0, visionOS 26.0, *) +extension RigidArray: AsyncWriter { + /// Provides a buffer to write elements into the rigid array. + /// + /// This method allocates space for elements in the array and provides an `OutputSpan` + /// that the body closure can use to append elements. The method appends up to the + /// specified count of elements to the array. + /// + /// - Parameter body: A closure that receives an `OutputSpan` to write elements into. + /// + /// - Returns: The value returned by the body closure. + /// + /// - Throws: Any error thrown by the body closure. + /// + /// ## Example + /// + /// ```swift + /// var array = RigidArray() + /// + /// try await array.write { outputSpan in + /// for i in 0..<5 { + /// outputSpan.append(i) + /// } + /// } + /// ``` + public mutating func write( + _ body: nonisolated(nonsending) (inout OutputSpan) async throws(Failure) -> Result + ) async throws(EitherError) -> Result { + do { + // TODO: Reconsider adding count to AsyncWriter + return try await self.append(count: 10) { (outputSpan) async throws(Failure) -> Result in + try await body(&outputSpan) + } + } catch { + throw .second(error) + } + } +} diff --git a/Sources/Example/Example.swift b/Sources/Example/Example.swift index 036e87f..0e87f9c 100644 --- a/Sources/Example/Example.swift +++ b/Sources/Example/Example.swift @@ -19,6 +19,7 @@ import Instrumentation import Logging import Middleware import X509 +internal import AsyncStreaming @main @available(macOS 26.0, iOS 26.0, watchOS 26.0, tvOS 26.0, visionOS 26.0, *) @@ -61,7 +62,7 @@ struct Example { try await server.serve { request, requestContext, requestBodyAndTrailers, responseSender in let writer = try await responseSender.send(HTTPResponse(status: .ok)) - try await writer.writeAndConclude(element: "Well, hello!".utf8.span, finalElement: nil) + try await writer.writeAndConclude("Well, hello!".utf8.span, finalElement: nil) } } } diff --git a/Sources/Example/Middlewares/HTTPRequestLoggingMiddleware.swift b/Sources/Example/Middlewares/HTTPRequestLoggingMiddleware.swift index d90ae4b..6a4f83b 100644 --- a/Sources/Example/Middlewares/HTTPRequestLoggingMiddleware.swift +++ b/Sources/Example/Middlewares/HTTPRequestLoggingMiddleware.swift @@ -2,6 +2,8 @@ import HTTPServer import HTTPTypes import Logging import Middleware +import AsyncStreaming +import BasicContainers @available(macOS 26.0, iOS 26.0, watchOS 26.0, tvOS 26.0, visionOS 26.0, *) struct HTTPRequestLoggingMiddleware< @@ -9,9 +11,9 @@ struct HTTPRequestLoggingMiddleware< ResponseConcludingAsyncWriter: ConcludingAsyncWriter & ~Copyable >: Middleware where - RequestConcludingAsyncReader.Underlying.ReadElement == Span, + RequestConcludingAsyncReader.Underlying.ReadElement == UInt8, RequestConcludingAsyncReader.FinalElement == HTTPFields?, - ResponseConcludingAsyncWriter.Underlying.WriteElement == Span, + ResponseConcludingAsyncWriter.Underlying.WriteElement == UInt8, ResponseConcludingAsyncWriter.FinalElement == HTTPFields? { typealias Input = RequestResponseMiddlewareBox @@ -75,30 +77,34 @@ struct HTTPRequestLoggingConcludingAsyncReader< Base: ConcludingAsyncReader & ~Copyable >: ConcludingAsyncReader, ~Copyable where - Base.Underlying.ReadElement == Span, + Base.Underlying.ReadElement == UInt8, Base.FinalElement == HTTPFields? { typealias Underlying = RequestBodyAsyncReader typealias FinalElement = HTTPFields? - struct RequestBodyAsyncReader: AsyncReader, ~Copyable { - typealias ReadElement = Span - typealias ReadFailure = any Error + struct RequestBodyAsyncReader: AsyncReader, ~Copyable, ~Escapable { + typealias ReadElement = Base.Underlying.ReadElement + typealias ReadFailure = Base.Underlying.ReadFailure private var underlying: Base.Underlying private let logger: Logger + @_lifetime(copy underlying) init(underlying: consuming Base.Underlying, logger: Logger) { self.underlying = underlying self.logger = logger } - mutating func read( - body: (consuming Span?) async throws -> Return - ) async throws -> Return { - let logger = self.logger - return try await self.underlying.read { span in - logger.info("Received next chunk \(span?.count ?? 0)") + #if compiler(<6.3) + @_lifetime(&self) + #endif + mutating func read( + maximumCount: Int?, + body: nonisolated(nonsending) (consuming Span) async throws(Failure) -> Return + ) async throws(EitherError) -> Return { + return try await self.underlying.read(maximumCount: maximumCount) { span throws(Failure) in + logger.info("Received next chunk \(span.count)") return try await body(span) } } @@ -112,10 +118,10 @@ where self.logger = logger } - consuming func consumeAndConclude( - body: (consuming sending Underlying) async throws -> Return - ) async throws -> (Return, FinalElement) { - let (result, trailers) = try await self.base.consumeAndConclude { [logger] reader in + consuming func consumeAndConclude( + body: nonisolated(nonsending) (consuming sending Underlying) async throws(Failure) -> Return + ) async throws(Failure) -> (Return, FinalElement) { + let (result, trailers) = try await self.base.consumeAndConclude { reader throws(Failure) in let wrappedReader = RequestBodyAsyncReader( underlying: reader, logger: logger @@ -138,27 +144,41 @@ struct HTTPResponseLoggingConcludingAsyncWriter< Base: ConcludingAsyncWriter & ~Copyable >: ConcludingAsyncWriter, ~Copyable where - Base.Underlying.WriteElement == Span, + Base.Underlying.WriteElement == UInt8, Base.FinalElement == HTTPFields? { typealias Underlying = ResponseBodyAsyncWriter typealias FinalElement = HTTPFields? - struct ResponseBodyAsyncWriter: AsyncWriter, ~Copyable { - typealias WriteElement = Span - typealias WriteFailure = any Error + struct ResponseBodyAsyncWriter: AsyncWriter, ~Copyable, ~Escapable { + typealias WriteElement = Base.Underlying.WriteElement + typealias WriteFailure = Base.Underlying.WriteFailure private var underlying: Base.Underlying private let logger: Logger + @_lifetime(copy underlying) init(underlying: consuming Base.Underlying, logger: Logger) { self.underlying = underlying self.logger = logger } - mutating func write(_ elements: consuming Span) async throws(any Error) { - logger.info("Wrote next chunk \(elements.count)") - try await self.underlying.write(elements) + @_lifetime(self: copy self) + mutating func write( + _ body: (inout OutputSpan) async throws(Failure) -> Result + ) async throws(EitherError) -> Result { + try await self.underlying.write { span throws(Failure) in + self.logger.info("Wrote next chunk \(span.count)") + return try await body(&span) + } + } + + @_lifetime(self: copy self) + mutating func write( + _ span: Span + ) async throws(EitherError) { + self.logger.info("Wrote next chunk") + try await self.underlying.write(span) } } diff --git a/Sources/Example/Middlewares/RouteHandlerMiddleware.swift b/Sources/Example/Middlewares/RouteHandlerMiddleware.swift index a6a62bb..db546f9 100644 --- a/Sources/Example/Middlewares/RouteHandlerMiddleware.swift +++ b/Sources/Example/Middlewares/RouteHandlerMiddleware.swift @@ -1,22 +1,23 @@ -import HTTPServer -import HTTPTypes -import Middleware +public import HTTPTypes +public import AsyncStreaming +public import Middleware +public import HTTPServer @available(macOS 26.0, iOS 26.0, watchOS 26.0, tvOS 26.0, visionOS 26.0, *) -struct RouteHandlerMiddleware< +public struct RouteHandlerMiddleware< RequestConcludingAsyncReader: ConcludingAsyncReader & ~Copyable, ResponseConcludingAsyncWriter: ConcludingAsyncWriter & ~Copyable, >: Middleware, Sendable where - RequestConcludingAsyncReader.Underlying: AsyncReader, any Error>, + RequestConcludingAsyncReader.Underlying: AsyncReader, RequestConcludingAsyncReader.FinalElement == HTTPFields?, - ResponseConcludingAsyncWriter.Underlying: AsyncWriter, any Error>, + ResponseConcludingAsyncWriter.Underlying: AsyncWriter, ResponseConcludingAsyncWriter.FinalElement == HTTPFields? { - typealias Input = RequestResponseMiddlewareBox - typealias NextInput = Never + public typealias Input = RequestResponseMiddlewareBox + public typealias NextInput = Never - func intercept( + public func intercept( input: consuming Input, next: (consuming NextInput) async throws -> Void ) async throws { @@ -27,16 +28,9 @@ where var responseBodyAsyncWriter = responseBodyAsyncWriter if let reader = maybeReader.take() { _ = try await reader.consumeAndConclude { bodyAsyncReader in - var shouldContinue = true var bodyAsyncReader = bodyAsyncReader - while shouldContinue { - try await bodyAsyncReader.read { span in - guard let span else { - shouldContinue = false - return - } - try await responseBodyAsyncWriter.write(span) - } + try await bodyAsyncReader.read(maximumCount: nil) { span in + try await responseBodyAsyncWriter.write(span) } } return HTTPFields(dictionaryLiteral: (HTTPField.Name.acceptEncoding, "encoding")) diff --git a/Sources/HTTPServer/AsyncPrimitives/AsyncReader.swift b/Sources/HTTPServer/AsyncPrimitives/AsyncReader.swift deleted file mode 100644 index 3a83fe5..0000000 --- a/Sources/HTTPServer/AsyncPrimitives/AsyncReader.swift +++ /dev/null @@ -1,165 +0,0 @@ -/// A protocol that represents an asynchronous reader capable of reading elements from some source. -/// -/// ``AsyncReader`` defines an interface for types that can asynchronously read elements -/// of a specified type from a source. -@available(macOS 26.0, iOS 26.0, watchOS 26.0, tvOS 26.0, visionOS 26.0, *) -public protocol AsyncReader: ~Copyable { - /// The type of elements that can be read by this reader. - associatedtype ReadElement: ~Copyable, ~Escapable - - /// The type of error that can be thrown during reading operations. - associatedtype ReadFailure: Error - - /// Reads an element from the underlying source and processes it with the provided body function. - /// - /// This method asynchronously reads an element from whatever source the reader - /// represents, then passes it to the provided body function. The operation may complete immediately - /// or may await resources or processing time. - /// - /// - Parameter body: A function that consumes the read element and performs some operation - /// on it, returning a value of type `Return`. When the element is `nil`, it indicates that - /// this is a terminal value, signaling the end of the reading operation or stream. - /// - /// - Returns: The value returned by the body function after processing the read element. - /// - /// - Throws: An error of type `ReadFailure` if the read operation cannot be completed successfully. - /// - /// - Note: This method is marked as `mutating` because reading operations often change the internal - /// state of the reader. - /// - /// ```swift - /// var fileReader: FileAsyncReader = ... - /// - /// // Read data from a file asynchronously and process it - /// let result = try await fileReader.read { data in - /// guard let data else { - /// // Handle end of stream/terminal value - /// return finalProcessedValue - /// } - /// // Process the non-nil data - /// return processedValue - /// } - /// ``` - mutating func read( - body: (consuming ReadElement?) async throws -> Return - ) async throws(ReadFailure) -> Return -} - -@available(macOS 26.0, iOS 26.0, watchOS 26.0, tvOS 26.0, visionOS 26.0, *) -extension AsyncReader where Self: ~Copyable { - /// Collects elements from the reader up to a specified limit and processes them with a body function. - /// - /// This method continuously reads elements from the async reader, accumulating them in a buffer - /// until either the end of the stream is reached (indicated by a `nil` element) or the specified - /// limit is exceeded. Once collection is complete, the accumulated elements are passed to the - /// provided body function as a `Span` for processing. - /// - /// - Parameters: - /// - limit: The maximum number of elements to collect before throwing a `LimitExceeded` error. - /// This prevents unbounded memory growth when reading from potentially infinite streams. - /// - body: A closure that receives a `Span` containing all collected elements and returns - /// a result of type `Result`. This closure is called once after all elements have been - /// collected successfully. - /// - /// - Returns: The value returned by the body closure after processing the collected elements. - /// - /// - Throws: - /// - `LimitExceeded` if the number of elements exceeds the specified limit before the stream ends. - /// - Any error thrown by the underlying read operations or the body closure. - /// - /// ## Example - /// - /// ```swift - /// var reader: SomeAsyncReader = ... - /// - /// let processedData = try await reader.collect(upTo: 1000) { span in - /// // Process all collected elements - /// } - /// ``` - /// - /// ## Memory Considerations - /// - /// Since this method buffers all elements in memory before processing, it should be used - /// with caution on large datasets. The `limit` parameter serves as a safety mechanism - /// to prevent excessive memory usage. - public mutating func collect( - upTo limit: Int, - body: (Span) async throws -> Result - ) async throws -> Result where ReadElement: Copyable { - var buffer = [ReadElement]() - var shouldContinue = true - while shouldContinue { - try await self.read { element in - guard let element else { - shouldContinue = false - return - } - guard buffer.count < limit else { - throw LimitExceeded() - } - buffer.append(element) - } - } - return try await body(buffer.span) - } - - /// Collects elements from the reader up to a specified limit and processes them with a body function. - /// - /// This method continuously reads elements from the async reader, accumulating them in a buffer - /// until either the end of the stream is reached (indicated by a `nil` element) or the specified - /// limit is exceeded. Once collection is complete, the accumulated elements are passed to the - /// provided body function as a `Span` for processing. - /// - /// - Parameters: - /// - limit: The maximum number of elements to collect before throwing a `LimitExceeded` error. - /// This prevents unbounded memory growth when reading from potentially infinite streams. - /// - body: A closure that receives a `Span` containing all collected elements and returns - /// a result of type `Result`. This closure is called once after all elements have been - /// collected successfully. - /// - /// - Returns: The value returned by the body closure after processing the collected elements. - /// - /// - Throws: - /// - `LimitExceeded` if the number of elements exceeds the specified limit before the stream ends. - /// - Any error thrown by the underlying read operations or the body closure. - /// - /// ## Example - /// - /// ```swift - /// var reader: SomeAsyncReader = ... - /// - /// let processedData = try await reader.collect(upTo: 1000) { span in - /// // Process all collected elements - /// } - /// ``` - /// - /// ## Memory Considerations - /// - /// Since this method buffers all elements in memory before processing, it should be used - /// with caution on large datasets. The `limit` parameter serves as a safety mechanism - /// to prevent excessive memory usage. - public mutating func collect( - upTo limit: Int, - body: (Span) async throws -> Result - ) async throws -> Result where ReadElement == Span { - var buffer = [Element]() - var shouldContinue = true - while shouldContinue { - try await self.read { span in - guard let span else { - shouldContinue = false - return - } - guard (buffer.count + span.count) < limit else { - throw LimitExceeded() - } - - buffer.reserveCapacity(span.count) - for index in span.indices { - buffer.append(span[index]) - } - } - } - return try await body(buffer.span) - } -} diff --git a/Sources/HTTPServer/AsyncPrimitives/AsyncWriter.swift b/Sources/HTTPServer/AsyncPrimitives/AsyncWriter.swift deleted file mode 100644 index 8e734e2..0000000 --- a/Sources/HTTPServer/AsyncPrimitives/AsyncWriter.swift +++ /dev/null @@ -1,32 +0,0 @@ -/// A protocol that represents an asynchronous writer capable of writing elements to some destination. -/// -/// ``AsyncWriter`` defines an interface for types that can asynchronously write elements -/// of a specified type to a destination. -public protocol AsyncWriter: ~Copyable { - /// The type of elements that can be written by this writer. - associatedtype WriteElement: ~Copyable, ~Escapable - - /// The type of error that can be thrown during writing operations. - associatedtype WriteFailure: Error - - /// Writes the provided element to the underlying destination. - /// - /// This method asynchronously writes the given element to whatever destination the writer - /// represents. The operation may complete immediately or may await resources or processing time. - /// - /// - Parameter element: The element to write. This typically represents a single item or a collection - /// of items depending on the specific writer implementation. - /// - /// - Throws: An error of type `WriteFailure` if the write operation cannot be completed successfully. - /// - /// - Note: This method is marked as `mutating` because writing operations often change the internal - /// state of the writer. - /// - /// ```swift - /// var fileWriter: FileAsyncWriter = ... - /// - /// // Write data to a file asynchronously - /// try await fileWriter.write(dataChunk) - /// ``` - mutating func write(_ element: consuming WriteElement) async throws(WriteFailure) -} diff --git a/Sources/HTTPServer/AsyncPrimitives/ConcludingAsyncReader.swift b/Sources/HTTPServer/AsyncPrimitives/ConcludingAsyncReader.swift deleted file mode 100644 index 549c132..0000000 --- a/Sources/HTTPServer/AsyncPrimitives/ConcludingAsyncReader.swift +++ /dev/null @@ -1,144 +0,0 @@ -/// A protocol that represents an asynchronous reader that produces elements and concludes with a final value. -/// -/// ``ConcludingAsyncReader`` adds functionality to asynchronous readers that need to -/// provide a conclusive element after all reads are completed. This is particularly useful -/// for streams that have meaningful completion states beyond just terminating, such as -/// HTTP responses that include headers after the body is fully read. -@available(macOS 26.0, iOS 26.0, watchOS 26.0, tvOS 26.0, visionOS 26.0, *) -public protocol ConcludingAsyncReader: ~Copyable { - /// The underlying asynchronous reader type that produces elements. - associatedtype Underlying: AsyncReader, ~Copyable, ~Escapable - - /// The type of the final element produced after all reads are completed. - associatedtype FinalElement - - /// Processes the underlying async reader until completion and returns both the result of processing - /// and a final element. - /// - /// - Parameter body: A closure that takes the underlying `AsyncReader` and returns a value. - /// - Returns: A tuple containing the value returned by the body closure and the final element. - /// - Throws: Any error thrown by the body closure or encountered while processing the reader. - /// - /// - Note: This method consumes the concluding async reader, meaning it can only be called once on a value type. - /// - /// ```swift - /// let responseReader: HTTPResponseReader = ... - /// - /// // Process the body while capturing the final response status - /// let (bodyData, statusCode) = try await responseReader.consumeAndConclude { reader in - /// var collectedData = Data() - /// while let chunk = try await reader.read(body: { $0 }) { - /// collectedData.append(chunk) - /// } - /// return collectedData - /// } - /// ``` - consuming func consumeAndConclude( - body: (consuming sending Underlying) async throws -> Return - ) async throws -> (Return, FinalElement) -} - -@available(macOS 26.0, iOS 26.0, watchOS 26.0, tvOS 26.0, visionOS 26.0, *) -extension ConcludingAsyncReader where Self: ~Copyable { - /// Processes the underlying async reader until completion and returns only the final element. - /// - /// This is a convenience method when the body's return value is `Void` and only returns the final element. - /// - /// - Parameter body: A closure that takes the underlying `AsyncReader`. - /// - Returns: The final element produced after all reads are completed. - /// - Throws: Any error thrown by the body closure or encountered while processing the reader. - /// - /// - Note: This method consumes the concluding async reader, meaning it can only be called once on a value type. - /// - /// ```swift - /// let responseReader: HTTPResponseReader = ... - /// - /// // Process the body but only capture the final response status - /// let statusCode = try await responseReader.consumeAndConclude { reader in - /// while let chunk = try await reader.read(body: { $0 }) { - /// // Process chunks but don't collect them - /// print("Received chunk of size: \(chunk.count)") - /// } - /// } - /// ``` - public consuming func consumeAndConclude( - body: (consuming sending Underlying) async throws -> Void - ) async throws -> FinalElement { - let (_, finalElement) = try await self.consumeAndConclude { reader in - try await body(reader) - } - return finalElement - } - - /// Collects elements from the underlying async reader and returns both the processed result and final element. - /// - /// This method provides a convenient way to collect elements from the underlying reader while - /// capturing both the processing result and the final element that concludes the reading operation. - /// It combines the functionality of ``AsyncReader/collect(upTo:body:)-(_,(Span) -> Result)`` from ``AsyncReader`` with the concluding - /// behavior of ``ConcludingAsyncReader``. - /// - /// - Parameter limit: The maximum number of elements to collect before throwing a `LimitExceeded` error. - /// - Parameter body: A closure that processes the collected elements as a `Span` and returns a result. - /// - /// - Returns: A tuple containing the result from processing the collected elements and the final element. - /// - /// - Throws: - /// - `LimitExceeded` if the number of elements exceeds the specified limit. - /// - Any error thrown by the body closure or the underlying read operations. - /// - /// ```swift - /// let responseReader: HTTPConcludingReader = ... - /// - /// // Collect response data and get final headers - /// let (processedData, finalHeaders) = try await responseReader.collect(upTo: 1024 * 1024) { span in - /// // Process all collected elements - /// } - /// ``` - public consuming func collect( - upTo limit: Int, - body: (Span) async throws -> Result - ) async throws -> (Result, FinalElement) where Underlying.ReadElement: Copyable { - try await self.consumeAndConclude { reader in - var reader = reader - return try await reader.collect(upTo: limit) { span in - try await body(span) - } - } - } - - /// Collects elements from the underlying async reader and returns both the processed result and final element. - /// - /// This method provides a convenient way to collect elements from the underlying reader while - /// capturing both the processing result and the final element that concludes the reading operation. - /// It combines the functionality of ``AsyncReader/collect(upTo:body:)-(_,(Span) -> Result)`` from ``AsyncReader`` with the concluding - /// behavior of ``ConcludingAsyncReader``. - /// - /// - Parameter limit: The maximum number of elements to collect before throwing a `LimitExceeded` error. - /// - Parameter body: A closure that processes the collected elements as a `Span` and returns a result. - /// - /// - Returns: A tuple containing the result from processing the collected elements and the final element. - /// - /// - Throws: - /// - `LimitExceeded` if the number of elements exceeds the specified limit. - /// - Any error thrown by the body closure or the underlying read operations. - /// - /// ```swift - /// let responseReader: HTTPConcludingReader = ... - /// - /// // Collect response data and get final headers - /// let (processedData, finalHeaders) = try await responseReader.collect(upTo: 1024 * 1024) { span in - /// // Process all collected elements - /// } - /// ``` - public consuming func collect( - upTo limit: Int, - body: (Span) async throws -> Result - ) async throws -> (Result, FinalElement) where Underlying.ReadElement == Span { - try await self.consumeAndConclude { reader in - var reader = reader - return try await reader.collect(upTo: limit) { span in - try await body(span) - } - } - } -} diff --git a/Sources/HTTPServer/AsyncPrimitives/LimitExceededError.swift b/Sources/HTTPServer/AsyncPrimitives/LimitExceededError.swift deleted file mode 100644 index 99b208b..0000000 --- a/Sources/HTTPServer/AsyncPrimitives/LimitExceededError.swift +++ /dev/null @@ -1,3 +0,0 @@ -struct LimitExceeded: Error { - init() {} -} diff --git a/Sources/HTTPServer/HTTPRequestConcludingAsyncReader.swift b/Sources/HTTPServer/HTTPRequestConcludingAsyncReader.swift index db557fe..5de42e6 100644 --- a/Sources/HTTPServer/HTTPRequestConcludingAsyncReader.swift +++ b/Sources/HTTPServer/HTTPRequestConcludingAsyncReader.swift @@ -12,6 +12,7 @@ //===----------------------------------------------------------------------===// public import HTTPTypes +public import AsyncStreaming import NIOCore import NIOHTTPTypes import Synchronization @@ -30,8 +31,8 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable /// This reader processes the body parts of an HTTP request and provides them as spans of bytes, /// while also capturing any trailer fields received at the end of the request. public struct RequestBodyAsyncReader: AsyncReader, ~Copyable { - /// The type of elements this reader provides (byte spans representing body chunks). - public typealias ReadElement = Span + /// The type of elements this reader provides. + public typealias ReadElement = UInt8 /// The type of errors that can occur during reading operations. public typealias ReadFailure = any Error @@ -59,23 +60,34 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable /// and returns a value of type `Return`. /// - Returns: The value returned by the body function after processing the read element. /// - Throws: An error if the reading operation fails. - public mutating func read( - body: (consuming ReadElement?) async throws -> Return - ) async throws(ReadFailure) -> Return { - switch try await self.iterator.next(isolation: #isolation) { - case .head: - fatalError() - case .body(let element): - // TODO: Add ByteBuffer span interfaces - return try await body(Array(buffer: element).span) - case .end(let trailers): - self.state.wrapped.withLock { state in - state.trailers = trailers - state.finishedReading = true + public mutating func read( + maximumCount: Int?, + body: nonisolated(nonsending) (consuming Span) async throws(Failure) -> Return + ) async throws(EitherError) -> Return { + let requestPart: HTTPRequestPart? + do { + requestPart = try await self.iterator.next(isolation: #isolation) + } catch { + throw .first(error) + } + + do { + switch requestPart { + case .head: + fatalError() + case .body(let element): + return try await body(Array(buffer: element).span) + case .end(let trailers): + self.state.wrapped.withLock { state in + state.trailers = trailers + state.finishedReading = true + } + return try await body(.init()) + case .none: + return try await body(.init()) } - return try await body(nil) - case .none: - return try await body(nil) + } catch { + throw .second(error) } } } @@ -142,9 +154,9 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable /// return collectedData /// } /// ``` - public consuming func consumeAndConclude( - body: (consuming sending RequestBodyAsyncReader) async throws -> Return - ) async throws -> (Return, HTTPFields?) { + public consuming func consumeAndConclude( + body: nonisolated(nonsending) (consuming sending RequestBodyAsyncReader) async throws(Failure) -> Return + ) async throws(Failure) -> (Return, HTTPFields?) { if let iterator = self.iterator.sendingTake() { let partsReader = RequestBodyAsyncReader(iterator: iterator, readerState: self.state) let result = try await body(partsReader) diff --git a/Sources/HTTPServer/HTTPResponseConcludingAsyncWriter.swift b/Sources/HTTPServer/HTTPResponseConcludingAsyncWriter.swift index e055fdf..073626b 100644 --- a/Sources/HTTPServer/HTTPResponseConcludingAsyncWriter.swift +++ b/Sources/HTTPServer/HTTPResponseConcludingAsyncWriter.swift @@ -12,6 +12,8 @@ //===----------------------------------------------------------------------===// public import HTTPTypes +public import AsyncStreaming +import BasicContainers import NIOCore import NIOHTTPTypes import Synchronization @@ -33,7 +35,7 @@ public struct HTTPResponseConcludingAsyncWriter: ConcludingAsyncWriter, ~Copyabl /// incrementally as spans of bytes. public struct ResponseBodyAsyncWriter: AsyncWriter { /// The type of elements this writer accepts (byte arrays representing body chunks). - public typealias WriteElement = Span + public typealias WriteElement = UInt8 /// The type of errors that can occur during writing operations. public typealias WriteFailure = any Error @@ -52,13 +54,51 @@ public struct HTTPResponseConcludingAsyncWriter: ConcludingAsyncWriter, ~Copyabl /// /// - Parameter element: A span of bytes representing the body chunk to write. /// - Throws: An error if the writing operation fails. - public mutating func write(_ element: consuming Span) async throws(any Error) { - var buffer = ByteBuffer() - buffer.reserveCapacity(element.count) - for index in element.indices { - buffer.writeInteger(element[index]) + public mutating func write( + _ body: nonisolated(nonsending) (inout OutputSpan) async throws(Failure) -> Result + ) async throws(EitherError) -> Result { + var buffer = RigidArray.init(capacity: 1024) + + let result: Result + do { + result = try await buffer.append(count: 1024) { span in + try await body(&span) + } + } catch { + throw .first(error) } - try await self.writer.write(.body(buffer)) + + var byteBuffer = ByteBuffer() + byteBuffer.reserveCapacity(buffer.count) + for index in buffer.indices { + byteBuffer.writeInteger(buffer[index]) + } + +// buffer.span.withUnsafeBufferPointer { buffer in +// <#code#> +// } +// var byteBuffer = ByteBuffer() + + do { + try await self.writer.write(.body(byteBuffer)) + } catch { + throw .first(error) + } + + return result + + +// let pointer = buffer.withUnsafeMutableBufferPointer { $0 } +// var span = OutputSpan( +// buffer: pointer, +// initializedCount: 0 +// ) +// do { +// let bodyResult = try await body(&span) +// +// } catch { +// throw .second(error) +// } } } diff --git a/Sources/HTTPServer/HTTPResponseSender.swift b/Sources/HTTPServer/HTTPResponseSender.swift index 4666098..2ada0e6 100644 --- a/Sources/HTTPServer/HTTPResponseSender.swift +++ b/Sources/HTTPServer/HTTPResponseSender.swift @@ -12,6 +12,7 @@ //===----------------------------------------------------------------------===// public import HTTPTypes +public import AsyncStreaming /// This type ensures that a single non-informational (1xx) `HTTPResponse` is sent back to the client when handling a request. /// @@ -22,6 +23,7 @@ public import HTTPTypes /// /// This forces structure in the response flow, requiring users to send a single response before they can stream a response body and /// trailers using the returned `ResponseWriter`. +@available(macOS 26.0, iOS 26.0, watchOS 26.0, tvOS 26.0, visionOS 26.0, *) public struct HTTPResponseSender: ~Copyable { private let _sendInformational: (HTTPResponse) async throws -> Void private let _send: (HTTPResponse) async throws -> ResponseWriter diff --git a/Sources/HTTPServer/HTTPServerClosureRequestHandler.swift b/Sources/HTTPServer/HTTPServerClosureRequestHandler.swift index a70ec6c..9365699 100644 --- a/Sources/HTTPServer/HTTPServerClosureRequestHandler.swift +++ b/Sources/HTTPServer/HTTPServerClosureRequestHandler.swift @@ -12,6 +12,7 @@ //===----------------------------------------------------------------------===// public import HTTPTypes +public import AsyncStreaming /// A closure-based implementation of ``HTTPServerRequestHandler``. /// @@ -39,9 +40,9 @@ public import HTTPTypes @available(macOS 26.0, iOS 26.0, watchOS 26.0, tvOS 26.0, visionOS 26.0, *) public struct HTTPServerClosureRequestHandler< ConcludingRequestReader: ConcludingAsyncReader & ~Copyable & SendableMetatype, - RequestReader: AsyncReader, any Error> & ~Copyable, + RequestReader: AsyncReader & ~Copyable & ~Escapable, ConcludingResponseWriter: ConcludingAsyncWriter & ~Copyable & SendableMetatype, - RequestWriter: AsyncWriter, any Error> & ~Copyable + RequestWriter: AsyncWriter & ~Copyable & ~Escapable >: HTTPServerRequestHandler { /// The underlying closure that handles HTTP requests private let _handler: diff --git a/Sources/HTTPServer/HTTPServerProtocol.swift b/Sources/HTTPServer/HTTPServerProtocol.swift index f6a1ba9..05d81be 100644 --- a/Sources/HTTPServer/HTTPServerProtocol.swift +++ b/Sources/HTTPServer/HTTPServerProtocol.swift @@ -12,6 +12,7 @@ //===----------------------------------------------------------------------===// public import HTTPTypes +public import AsyncStreaming @available(macOS 26.0, iOS 26.0, watchOS 26.0, tvOS 26.0, visionOS 26.0, *) /// A generic HTTP server protocol that can handle incoming HTTP requests. @@ -20,7 +21,7 @@ public protocol HTTPServerProtocol: Sendable, ~Copyable, ~Escapable { /// must be an optional `HTTPFields`, and ``ConcludingAsyncReader/Underlying`` must use `Span` as its /// `ReadElement`. associatedtype RequestReader: ConcludingAsyncReader & ~Copyable & SendableMetatype - where RequestReader.Underlying.ReadElement == Span, + where RequestReader.Underlying.ReadElement == UInt8, RequestReader.Underlying.ReadFailure == any Error, RequestReader.FinalElement == HTTPFields? @@ -28,7 +29,7 @@ public protocol HTTPServerProtocol: Sendable, ~Copyable, ~Escapable { /// must be an optional `HTTPFields`, and ``ConcludingAsyncWriter/Underlying`` must use `Span` as its /// `WriteElement`. associatedtype ResponseWriter: ConcludingAsyncWriter & ~Copyable & SendableMetatype - where ResponseWriter.Underlying.WriteElement == Span, + where ResponseWriter.Underlying.WriteElement == UInt8, ResponseWriter.Underlying.WriteFailure == any Error, ResponseWriter.FinalElement == HTTPFields? diff --git a/Sources/HTTPServer/HTTPServerRequestHandler.swift b/Sources/HTTPServer/HTTPServerRequestHandler.swift index 0ca6b21..1ba9896 100644 --- a/Sources/HTTPServer/HTTPServerRequestHandler.swift +++ b/Sources/HTTPServer/HTTPServerRequestHandler.swift @@ -12,6 +12,7 @@ //===----------------------------------------------------------------------===// public import HTTPTypes +public import AsyncStreaming /// A protocol that defines the contract for handling HTTP server requests. /// @@ -75,14 +76,14 @@ public protocol HTTPServerRequestHandler: Sendabl /// must be an optional `HTTPFields`, and ``ConcludingAsyncReader/Underlying`` must use `Span` as its /// `ReadElement`. associatedtype RequestReader: ConcludingAsyncReader & ~Copyable & SendableMetatype - where RequestReader.Underlying.ReadElement == Span, + where RequestReader.Underlying.ReadElement == UInt8, RequestReader.FinalElement == HTTPFields? /// The ``ConcludingAsyncWriter`` to use when writing responses. ``ConcludingAsyncWriter/FinalElement`` /// must be an optional `HTTPFields`, and ``ConcludingAsyncWriter/Underlying`` must use `Span` as its /// `WriteElement`. associatedtype ResponseWriter: ConcludingAsyncWriter & ~Copyable & SendableMetatype - where ResponseWriter.Underlying.WriteElement == Span, + where ResponseWriter.Underlying.WriteElement == UInt8, ResponseWriter.FinalElement == HTTPFields? /// Handles an incoming HTTP request and generates a response. diff --git a/Sources/HTTPServer/RequestResponseMiddlewareBox.swift b/Sources/HTTPServer/RequestResponseMiddlewareBox.swift index 28d6f42..b2a757d 100644 --- a/Sources/HTTPServer/RequestResponseMiddlewareBox.swift +++ b/Sources/HTTPServer/RequestResponseMiddlewareBox.swift @@ -1,4 +1,5 @@ public import HTTPTypes +public import AsyncStreaming /// This type holds the values passed to the ``HTTPServerRequestHandler`` when handling a request. /// It is necessary to box them together so that they can be used with `Middlewares`, as this will be the `Middleware.Input`.