Designing and writing a custom, generic Combine Publisher in Swift

The Shareup iOS app makes heavy use of Apple’s Combine framework, which is Apple’s recommended approach to handling asynchronous programming. Combine makes it easy to process and transform asynchronous events over time. It’s similar in many ways to earlier “reactive programming” third-party frameworks like ReactiveSwift and RxSwift.

Two of the most important types in Combine are publishers and subscribers. The Publisher protocol declares a type that can produce a sequence of values over time. Correspondingly, the [Subscriber protocol] declares a type that can receive those values from a Publisher. A third type—the Subscription protocol—declares a type representing the connection of a subscriber to a publisher. Most of Combine’s magic happens inside of the Subscription, as you’ll see later.

How Shareup uses Combine

As I said earlier, Shareup makes heavy use of Combine. Generally, we like to enforce a unidirectional data flow in our apps, which means the application’s UI is a simple reflection of an immutable snapshot of application state. Shareup’s application state can only be updated in limited, predefined ways, and, importantly, the UI cannot directly change the application state. Rather, the UI layer describes actions the user or environment takes and sends those actions to the core business logic of the application. The core business logic of the application modifies the application state in response to those actions and delivers a new immutable snapshot of the application state to the UI, which re-renders itself.

The difference between Shareup’s architecture and similar unidirectional data flow architectures is Shareup does not hold its entire application state in memory. Instead, most of the application state lives inside tables in a SQLite database. Shareup’s different components, including the UI and networking layers, use Combine to subscribe to specific subsets of the entire application state. Any time a change is committed to the database, the application components who subscribed to tables modified by the change receive an immutable version of the new state. We’ve open-sourced the framework that powers this functionality: SQLite. You can see some examples of it at work in its test suite.

The problem

Some components in Shareup only need to subscribe to state changes corresponding to a single SQLite table, which makes it easy to create a Combine Publisher using a single SQL SELECT statement. For example, our WebSocket component responsible for uploading new shares is only interested in reading the application state contained in our upload_queue SQLite table. Creating its Publisher is a matter of writing the correct SELECT query to fetch the unprocessed uploads in the upload_queue table.

UI components, on the other hand, usually need state composed of different data from multiple SQLite tables. For example, when showing the user a list of his/her shares, we need to draw from multiple tables to populate the view:

  1. The items table provides the name, kind, size, and thumbnail of a shared item.
  2. The downloads table provides the status of the download of the item, if it was shared by someone else with the user.
  3. The uploads table provides the status of the upload of the item, if it is in the process of being shared with someone else.

Although it would be possible to combine these different pieces of data together in the UI, that would leak a lot of business logic into the UI, which we want to avoid. Instead, for each custom UI element, we create a view model holding just the required information. We initialize these view models by combining publishers pulling from the necessary SQLite tables. (It would be possible, in many cases, to write a single, complicated SQL SELECT statement joining the data in the tables together, but those tend to be fiddly and difficult to maintain. We prefer to compose simpler publishers together.)

Combine provides some wonderful built-in publishers to simplify the task of combining the results of multiple publishers into one: CombineLatest and Merge. Unfortunately, neither of them does exactly what we want.

CombineLatest combines the output of multiple upstream publishers, but it only does so after each upstream publisher has published at least once. In our example, it’s possible for any of the upstream publishers (items, downloads, or uploads) to publish once, many times, or never. In the case where one of the publishers never publishes, our combined publisher would also never publish, regardless of how often the other upstream publishers published.

Merge publishes whenever an upstream publisher publishes, but it requires each upstream publisher to have the same type of output. In our case, each upstream publisher outputs different data, which means we can’t use Merge.

What we need is a publisher that allows for heterogeneous upstream publishers and sends its own output as soon as they publish.

The solution

ReduceLatest3

We will create a new Publisher called ReduceLatest3 because its purpose matches that of the reduce() function, which is to produce a single value from multiple different values. We suffix the publisher’s name with “3” because this is how Apple names publishers that take three upstream publishers, such as CombineLatest3.

When creating a custom generic Publisher, it’s important to think carefully about the generic requirements of the new publisher. Each Publisher needs to define its Output and Failure types. In our case, we want our publisher to accept three different kinds of output from its upstream publishers. Our new publisher, in turn, would produce a different kind of output. So, we’ll need four different generic types to represent the outputs of the upstream publishers and our new publisher’s output. However, we’ll only need a single generic type to represent the failure type because we want to limit all the upstream publishers to the same failure type, which is often Error.

Typically, you create custom publishers inside of the Publishers namespace. Given the requirements we laid out above, we can define our custom publisher like so:

public extension Publishers {
  struct ReduceLatest3<A: Publisher, B: Publisher, C: Publisher, Output>: Publisher
    where A.Failure == B.Failure, A.Failure == C.Failure
  {
    public typealias Failure = A.Failure
  }
}

The next step is to think about how to initialize the Publisher. Since we need to combine the results of three upstream publishers, the initializer should accept three publishers matching the generic types A, B, and C. In addition, our publisher will need a way to tranform the output of A, B, and C into its own Output. So, we’ll need the caller to specify a closure that takes the output of the three upstream publishers and returns a value corresponding to our Output type. The straightforward way to define this closure would be:

(A.Output, B.Output, C.Output) -> Output

However, that would require each upstream publisher to publish a value or produce a default value before we could call this closure. Instead, we’ll define each parameter as an Optional, which means we’ll be able to call this function as soon as a single upstream publisher publishes.

private let a: A
private let b: B
private let c: C
private let transform: (A.Output?, B.Output?, C.Output?) -> Output

public init(
  _ a: A,
  _ b: B,
  _ c: C,
  _ transform: @escaping (A.Output?, B.Output?, C.Output?) -> Output
) {
  self.a = a
  self.b = b
  self.c = c
  self.transform = transform
}

Publisher only has one more method for us to implement because, as you’ll soon see, the intelligence of a Combine Publisher lives in the Subscription, not the publisher. We need to implement receive(subscriber:), which attaches a subscriber to the publisher. Our implementation is fairly short and relies on ReduceLatestSubscription, which we will write below. We need to initialize an instance of ReduceLatestSubscription, passing it the subscriber, upstream publishers, and transform function. After that, we call receive(subscription:) on the Subscriber with the newly-created instance of ReduceLatestSubscription.

public func receive<S: Subscriber>(subscriber: S)
  where S.Failure == Failure, S.Input == Output
{
  let subscription = ReduceLatestSubscription(
    subscriber: subscriber,
    a: a,
    b: b,
    c: c,
    transform: transform
  )
  subscriber.receive(subscription: subscription)
}

ReduceLatestSubscription

ReduceLatestSubscription is where most of the work happens, but its work is considered private implementation details. Consumers of ReduceLatest3 never need to access ReduceLatestSubscription, which means we’ll make it private. It will need to be a class because, in Apple’s words, they have “identity, defined by the moment in time a particular subscriber attached to a publisher”. Additionally, canceling a Subscription must be thread-safe.

Definition

The definition of ReduceLatestSubscription is extensive because we need generic types for the subscriber, the three upstream publishers, the subscription’s output, and the subscription’s failure type.

private final class ReduceLatestSubscription<Subscriber, A, B, C, Output, Failure>: Subscription
  where
  Subscriber: Combine.Subscriber,
  Subscriber.Input == Output,
  Subscriber.Failure == Failure,
  A: Publisher,
  B: Publisher,
  C: Publisher,
  A.Failure == Failure,
  B.Failure == Failure,
  C.Failure == Failure
{}

Responsibilities

The main responsibilities of ReduceLatestSubscription are:

  1. Subscribe to upstream publishers
  2. Store the last published value of each upstream publisher
  3. Publish to the subscriber when any of the upstream publishers publish
  4. Publish a completion to the subscriber after all of the upstream publishers have finished
  5. Publish a failure completion to the subscriber after any of the upstream publishers have failed
  6. Cancel all upstream publishers and stop publishing to the subscriber after cancel() is called

Instance properties

Our subscription needs to retain the Subscriber and transform function. Additionally, it needs to store optional values holding the latest output of the upstream publishers. However, there are some additional properties we need to add to keep track of the subscription’s internal state. We’ll want to use a lock to prevent concurrency issues because we don’t know which thread an upstream publisher will publish on. We’ll also need to retain the AnyCancellable tokens for the unfinished upstream publishers in order to prevent them from being automatically canceled. Along those lines, we’ll also want to keep track of which of the upstream publishers have finished because, once the last upstream publisher has finished, we’ll want to complete our publisher and notify the subscriber. The AnyCancellable tokens and completion status are stored separately because certain publishers (e.g, Just) complete immediately, which doesn’t even give you enough time to store their token. Last, we’ll need to keep track of the downstream Demand. We can only publish to downstream subscribers when they demand more items. In total, we have to create nine instance properties for ReduceLatestSubscription:

private enum Key: String {
  case a, b, c
}

private let lock = RecursiveLock()
private var tokens = [Key: AnyCancellable]()
private var completions = [Key.a: false, .b: false, .c: false]
private var demand = Subscribers.Demand.none

private var subscriber: Subscriber?
private let transform: (A.Output?, B.Output?, C.Output?) -> Output

private var latestA: A.Output?
private var latestB: B.Output?
private var latestC: C.Output?

Key is a convenience type to keep track of specific upstream publisher’s AnyCancellable tokens and completion status. Using it helps prevent typing or copy-paste mistakes.

Initialization

We already know the form of ReduceLatestSubscription's initializer because we called it inside of Publisher.receive(subscriber:). However, the initializer’s implementation is more complicated than simply maintaining references to each of the passed-in arguments. Our subscription’s initializer also needs to subscribe to each of the upstream publishers and hold on to the AnyCancellable tokens returned by the calls to Publisher.sink()

init(
  subscriber: Subscriber,
  a: A,
  b: B,
  c: C,
  transform: @escaping (A.Output?, B.Output?, C.Output?) -> Output
) {
  self.subscriber = subscriber
  self.transform = transform
  
  let tokenA = a.sink(receiveCompletion: completeA, receiveValue: receiveA)
  let tokenB = b.sink(receiveCompletion: completeB, receiveValue: receiveB)
  let tokenC = c.sink(receiveCompletion: completeC, receiveValue: receiveC)
  
  [Key.a: tokenA, .b: tokenB, .c: tokenC]
    .forEach { tokens[$0] = $1 }
}

Subscription conformance

Conforming to the Subscription protocol is straightforward. We have to implement request(_ demand: Subscribers.Demand), adding the new demand to our current demand. Then, if we previously had a demand of .none, we need to publish whatever data we have available. We also need to implement cancel() to cancel the subscription and prevent the subscriber from receiving any more messages from our publisher.

func request(_ demand: Subscribers.Demand) {
  guard demand != .none else { return }
  let shouldPublish: Bool = lock.locked {
    let shouldPublish = self.demand == .none
    self.demand += demand
    return shouldPublish
  }
  guard shouldPublish else { return }
  publish()
}

func cancel() {
  lock.locked { subscriber = nil }
  complete(with: .finished)
}

Two things to note. First, we need lock all access to mutable instance properties of ReduceLatestSubscription because, as we mentioned before, our subscription’s methods could be called from any thread. Second, there were bugs in Apple’s implementation of Demand that would cause a crash when trying to add or subtract Demand.none from another Demand. So, throughout the app, we always ensure a new demand is not equal to .none before trying to add it to or subtract it from our saved demand.

Publishing values

Each time one of the upstream publishers publish, we need to send the new combined value to the subscriber. We do this by saving the newly published value, checking to make sure the subscriber has requested to receive a value, and then sending the transformed value to the subscriber.

private func receiveA(_ input: A.Output) {
  lock.locked { latestA = input }
  publish()
}

private func receiveB(_ input: B.Output) {
  lock.locked { latestB = input }
  publish()
}

private func receiveC(_ input: C.Output) {
  lock.locked { latestC = input }
  publish()
}

private func publish() {
  lock.locked {
    guard demand > .none else { return }
    guard let subscriber = self.subscriber else { return }
    demand -= .max(1)
    let newDemand = subscriber.receive(transform(latestA, latestB, latestC))
    guard newDemand != .none else { return }
    demand += newDemand
  }
}

Completion

In addition to sending new values to our subscriber any time an upstream publisher publishes, we also need to send Subscribers.Completion.finished after the last upstream publisher finishes or send Subscribers.Completion.failure(_:) when any upstream publisher fails.

Whenever we receive a completion from one of our upstream publishers, we first determine if it was a failure or not. If it was a failure, we call our private complete(with:) function. This function removes all of our upstream publishers’ AnyCancellable tokens, which cancels all of the upstream publishers. Then, we send the failure to our downstream subscriber. Afterwards, we delete our reference to the subscriber, which will prevent ReduceLatestSubscription from sending any more messages to the subscriber. If the completion was not a failure, we check to see if there are any more unfinished upstream publishers remaining. If there are none remaining, we call complete(with:) just as we did in the case of a failure.

private func completeA(_ completion: Subscribers.Completion<A.Failure>) {
  completeUpstream(.a, completion)
}

private func completeB(_ completion: Subscribers.Completion<B.Failure>) {
  completeUpstream(.b, completion)
}

private func completeC(_ completion: Subscribers.Completion<C.Failure>) {
  completeUpstream(.c, completion)
}

private func completeUpstream(
  _ key: Key,
  _ completion: Subscribers.Completion<Failure>
) {
  guard case .finished = completion else {
    complete(with: completion)
    return
  }
  
  let shouldComplete: Bool = lock.locked {
    tokens.removeValue(forKey: key)
    completions[key] = true
    return completions.reduce(true) { $0 && $1.value }
  }
  
  guard shouldComplete else { return }
  complete(with: completion)
}

private func complete(with completion: Subscribers.Completion<Failure>) {
  let subscriber = lock.locked { () -> Subscriber? in
    let sub = self.subscriber
    self.subscriber = nil
    completions.removeAll()
    tokens.removeAll()
    return sub
  }
  subscriber?.receive(completion: completion)
}

Creating a custom operator

The last step when creating a custom Combine Publisher is to add a corresponding operator to the Publisher namespace. This allows your publisher to be included in other Combine event-processing pipelines. You need to define a function in an extension of Publisher that accepts as arguments the parameters you need to create an instance of your new Publisher. Then, you return the Publisher from the function.

func reduceLatest<B: Publisher, C: Publisher, Output>(
  _ b: B,
  _ c: C,
  _ transform: @escaping (Self.Output?, B.Output?, C.Output?) -> Output
) -> Publishers.ReduceLatest3<Self, B, C, Output> {
  Publishers.ReduceLatest3(self, b, c, transform)
}

Testing ReduceLatest3

The easiest way to test custom publishers is to create one or more instances of PassthroughSubject to act as upstream publishers. Then, send values to the subjects and see what your custom publisher publishes.

func receive(value: (Int?, Character?, Bool?)) {
  print(value)
}

let pub1 = PassthroughSubject<Int, Never>()
let pub2 = PassthroughSubject<Character, Never>()
let pub3 = PassthroughSubject<Bool, Never>()

let token = pub1
  .reduceLatest(pub2, pub3) { ($0, $1, $2) } 
  .sink(receiveValue: receive(value:))
defer { token.cancel() }

pub1.send(1) // (1, <nil>, <nil>)
pub1.send(2) // (2, <nil>, <nil>)
pub2.send("A") // (2, A, <nil>)
pub3.send(false) // (2, A, false)
pub3.send(completion: .finished)
pub3.send(true) // (2, A, false)
pub2.send("B") // (2, B, false)

Feedback

I’ve made all the code for ReduceLatest public on GitHub. I encourage you to check it out, play around with it, and use it in your apps. Be sure to let me know if you have any thoughts or use ReduceLatest to build something cool.