bearjaw.dev

Doing swifty things 1-bit at a time 🚀

Exploring AsyncStreams – Part 1

I hadn't used AsyncStream much since it was first introduced in Swift 5.5.
With Swift 6 now out in the wild, it felt like enough time had passed to see whether it could be an addition — or even replace — Combine in my daily dev life.

Small discalaimer

Personally, I like writing simple code. Ideally, it should be testable, easy to read, and easy to reason about. The code examples below are simplified to quickly try out some concepts. This post is mostly about exploring how AsyncStream could fit into the apps I write or work on.

Reactive programming with Combine

Combine offers reactive programming in a very neat way. Quite often, you can simply use the property wrapper @Published and subscribe to it with $ and sink(_:):

@MainActor
final class BookStore {
    @Published
    private(set) var books: [Book] = []
}

final class ViewController: UIViewController {
    private let store = BookStore()
    private var disposables = Set<AnyCancellable>()
    
    override func viewDidLoad() {
        super.viewDidLoad()
        store.$books.sink { _ in
            // Update the dataSource
        }.store(in: &disposables)
    }
}

Writing reactive code with Combine feels very easy and straightforward. Combine also offers a lot of powerful functions to modify the stream. It even offers an AsyncStream publisher! Plus, the Apple developer community has added even more convenient open-source extensions and custom Publishers.

Streams

AsyncStream can offer an alternative that feels almost like an extension to the Swift language itself, making it a natural companion for Swift's concurrency features.
Let's look at a simple example of observing a stream of values over time:

final class ViewController: UIViewController {
    private let store = BookStore()
    
    override func viewDidLoad() {
        super.viewDidLoad()
        Task { await observeBooks() }
    }
    
    private func observeBooks() async {
        for await books in store.stream {
            // Update snapshot
        }
    }
}

Observing books from the store now reads like a simple for in loop that we already know. No need to capture self anywhere or keep track of a cancellable (yet).

However, the downside is that we had to introduce a Task, which breaks a little of the straightforwardness. In SwiftUI this is much easier as you can use the task() view modifier.

The code above, while looking clean, is actually quite dangerous. With AsyncStream, it is important to finish the stream.

This makes sense because for await is a suspension point, and without ending it properly, the view controller will never be deallocated.

Thus, we need to adjust our subscription strategy. We'll now resubscribe once the view is added to the view hierarchy and cancel the subscription once the view is removed.

By calling cancel() on the task we can end the stream. Additionally you can cancel a stream after meeting a condition by using either return or break.
The difference is:

  • break executes any code following the for await loop.
  • return ends the function immediately.
final class UniverseBookViewController: UIViewController {
    private let store = BookStore()
    private var task: Task<Void, Never>?
    
    override func viewDidAppear(_ animated: Bool) {
        super.viewDidAppear(animated)
        view.backgroundColor = .white
        task = Task { await observeBooks() }
    }
    
    override func viewDidDisappear(_ animated: Bool) {
        super.viewDidDisappear(animated)
        task?.cancel()
    }
    
    private func observeBooks() async {
        for await books in store.stream {
            if let book = books.first(where: { $0.title == "The wonders of the universe" }) {
                // Update the view with our book
                print(book)
                // Using a return or a break can also cancel a stream.
                return
            } else {
                // Keep waiting
                print(books)
            }
        }
    }
}

Another important thing to remember is that finishing a stream doesn't necessarily end the fetching of new data. You'll need to explicity stop any work you're doing. You can implement a onTermination handler and stop any polling or timers.

continuation.onTermination = { [weak self] reason in
    self?.stopPolling()
}

Another major difference compared to using @Published is that when you subscribe to an AsyncStream, you don't receive any initial values by default.

So let's look at how to create a stream in the first place.

Bring your own stream!

One important thing to keep in mind about AsyncStream is that you can only have one subscriber per stream. So, you need to return a new instance for each new subscriber and then keep the continuation which makes finishing a stream explicitly a bit difficult.

@MainActor
final class BookStore {
    ...

    private var continuations: [AsyncStream<[Book]>.Continuation] = []

    var stream: AsyncStream<[Book]> {
        AsyncStream { continuation in
            self.continuations.append(continuation)
        }
    }
    ...
}

To emit values over time, you work with the Continuation object. In this case, we store the continuation so that we can yield new values when needed.

In Swift 5.9, Swift introduced a nice little convenience initializer:

var stream: AsyncStream<[Book]> {
    let container = AsyncStream.makeStream(of: [Book].self)
    self.continuations.append(container.continuation)
    return container.stream
}

With this new handy makeStream method we can get rid of the closure which improves the readability for my taste. However we can only access this in iOS 17 and equivalent other platforms.

To emit values, you use yield(_:):

func newBook(_ title: String, author: String) {
    let book = Book(title: title, author: author)
    books.append(book)
    for continuation in continuations {
        continuation.yield(books)
    }
}

You can't access the current value of an AsyncStream. You can only await its next value. If you need to access the latest data at any time, you'll have to keep a separate buffer yourself.

If you want to emit a value immediately after someone subscribes, you can call yield(_:) right after creating the stream.

Some observations

I'd say AsyncStream is definitely an interesting tool to have. Especially when paired with Swift Async Algorithms.
However, I don't think it currently serves as a full replacement for Combine.

First of all, it's very easy to shoot yourself in the foot by accidentally forgetting to finish the stream.
In SwiftUI, you can easily tie an AsyncStream’s lifecycle to the SwiftUI view’s lifecycle, making it much easier to use.

In UIKit, however, you need to manually manage subscriptions and cancellations using UIViewController lifecycle methods.

You also need to ensure your code still behaves correctly even when the view controller presents another view controller on top.

I am still only scratching the surface but I can see that I could find some usage for streams going forward.