Swift AsyncThrowingStream and AsyncStream code examples

Swift AsyncThrowingStream and AsyncStream code examples

Preface

AsyncThrowingStream and AsyncStream are part of the concurrency framework introduced in Swift 5.5 by SE-314[1]. Asynchronous streams allow you to replace existing code that relies on closures or combine publishers.

Before diving into the details around throwing streams, I recommend you read my article covering async-await if you haven’t already. Most of the code explained in this article will use the API explained there.

What is AsyncThrowingStream?

You can think of an AsyncThrowingStream as a stream of elements that may cause an error to be thrown. Its value is passed over time, and the stream can be closed by an end event. Once an error occurs, the end event can be either success or failure.

What is AsyncStream?

AsyncStream is similar to the throwing variant, but will never cause an error to be thrown. A non-throwing asynchronous stream completes upon explicit completion or cancellation of the stream.

Note: In this article, we will explain how to use AsyncThrowingStream. The code examples are similar to AsyncStream except for the part where error handling occurs.

AsyncThrowingStream

How to use AsyncThrowingStream

AsyncThrowingStream can be a great replacement for existing closure-based code like progress and completion handlers. To better understand what I mean, I'll walk you through a scenario we encountered in the WeTransfer application.

In our application, we have an existing closure-based class called FileDownloader:

 struct FileDownloader {
enum Status {
case downloading ( Float )
case finished ( Data )
}

func download ( _ url : URL , progressHandler : ( Float ) -> Void , completion : ( Result < Data , Error > ) -> Void ) throws {
// .. Download implementation
}
}

A file downloader accepts a URL, reports progress, and completes with a result containing the downloaded data or an error on failure.

A file downloader reports a stream of values ​​as the file download progresses. In this case, it reports a stream of status values ​​to report the current state of the running download. FileDownloader is a perfect example of a piece of code you can rewrite to use AsyncThrowingStream. However, rewriting requires you to rewrite your code at the implementation level as well, so let's define an overloaded method instead:

 extension FileDownloader {
func download ( _ url : URL ) -> AsyncThrowingStream < Status , Error > {
return AsyncThrowingStream { continuation in
do {
try self . download ( url , progressHandler : { progress in
continuation . yield (. downloading ( progress ))
}, completion : { result in
switch result {
case . success ( let data ):
continuation . yield (. finished ( data ))
continuation.finish ( )
case . failure ( let error ):
continuation.finish ( throwing : error )
}
})
} catch {
continuation.finish ( throwing : error )
}
}
}
}

As you can see, we wrapped the download method in an AsyncThrowingStream . We typed the stream's value Status as a generic type, allowing us to continue the stream with status updates.

Whenever an error occurs, we complete the stream by throwing an error. In the case of a completion handler, we either complete by throwing an error or follow up the production of data with a non-throwing completion callback.

 switch result {
case . success ( let data ):
continuation . yield (. finished ( data ))
continuation.finish ( )
case . failure ( let error ):
continuation.finish ( throwing : error )
}

It is critical not to forget to call the finish() callback after receiving the last status update. Otherwise, we will keep the stream alive and the implementation code will never continue.

We can rewrite the above code by using another yield method, accepting a Result enum as a parameter:

 continuation .yield ( with : result .map { .finished ( $0 ) } )
continuation.finish ( )

The rewritten code simplifies our code and removes the switch-case code. We have to map our Reslut enum to match the expected Status value. If we produce a failed result, our flow will end after throwing the contained error.

AsyncThrowingStream Iteration

Once you have configured your asynchronous throw stream, you can start iterating over the stream of values. In our FileDownloader example, it will look like this:

 do {
for try await status in download ( url ) {
switch status {
case .downloading ( let progress ) :
print ( "Downloading progress: \( progress )" )
case .finished ( let data ) :
print ( "Downloading completed with data: \( data )" )
}
}
print ( "Download finished and stream closed" )
} catch {
print ( "Download failed with \( error )" )
}

We handle any state updates, and we can use the catch closure to handle any errors that occur. You can iterate using the for ... in loop based on the AsyncSequence interface, which is the same for AsyncStream.

If you encounter a compilation error like this:

'async' in a function that does not support concurrency

The print statements in the above code example help you understand the life cycle of AsyncThrowingStream. You can replace the print statements to handle progress updates and process data to provide visualization for your users.

Debugging AsyncStream

If a stream fails to report a value, we can debug the callbacks generated by the stream by placing breakpoints. Although it is also possible that the print statement "Download finished and stream closed" above is never called, which means that your code in the implementation layer never continues. The latter may be the result of an unfinished stream.

To verify this, we can use the onTermination callback:

 func download(_ url: URL) -> AsyncThrowingStream<Status, Error> {
return AsyncThrowingStream { continuation in

/// Configure a termination callback to understand the lifecycle of your stream.
continuation.onTermination = { @Sendable status in
print("Stream terminated with status \(status)")
}

// ..
}
}

The callback is called when the stream is terminated, and it will tell you whether your stream is still alive.

If an error occurs, the output may look like this:

 Stream terminated with status finished(Optional(FileDownloader.FileDownloadingError.example))

The above output is only possible when using an AsyncThrowingStream. If it is a normal AsyncStream, the completed output looks like this:

 Stream terminated with status finished

The result of cancellation is the same for both types of streams:

 Stream terminated with status canceled

You can also use this termination callback to do any cleanup after the stream ends. For example, removing any observers or cleaning up disk space after a file has been downloaded.

Cancel an AsyncStream

An AsyncStream or AsyncThrowingStream can be cancelled due to the cancellation of an enclosing task. An example could be as follows:

 let task = Task . detached {
do {
for try await status in download ( url ) {
switch status {
case . downloading ( let progress ):
print ( "Downloading progress: \(progress)" )
case . finished ( let data ):
print ( "Downloading completed with data: \(data)" )
}
}
} catch {
print ( "Download failed with \(error)" )
}
}
task.cancel ( )

A stream is cancelled when it goes out of scope or when the enclosing task is cancelled. As mentioned before, cancellation will trigger the onTermination callback accordingly.

in conclusion

AsyncThrowingStream or AsyncStream are a great way to rewrite existing closure-based code to async-await-enabled alternatives. You can provide a continuous stream of values ​​and complete a stream on success or failure. You can iterate over the values ​​at the implementation level using a for loop based on the AsyncSequence APIs.

References

[1] SE-314: https://github.com/apple/swift-evolution/blob/main/proposals/0314-async-stream.md.​​

<<:  Microsoft Authenticator for iOS is updated to officially stop supporting Apple Watch

>>:  Xcode Project Analysis

Recommend

What changes have taken place in social software? Where will it go next?

Boss, I want to buy this mobile phone. Is QQ inst...

How much does it cost to be an agent for a catering mini app in Liuzhou?

Is it easy to be an agent of Liuzhou catering min...

Short video operation "routines" and traffic surges!

In the era of mobile Internet, short videos have ...

3 case studies: World Cup hot spots self-propagation operation activities

The World Cup is in full swing, and operators wil...

Still worried about AirPods being lost? A brooch can help

Apple's first truly wireless earphone product ...

Fitness Advanced【Precision】Precision Back

Fitness Advanced [Precision] Precision Back Resou...

Dropbox designer: How to make interface information more focused?

Always make design decisions around user goals, m...