Combine #2:发布者与订阅者

发布: (2025年12月6日 GMT+8 00:59)
5 min read
原文: Dev.to

Source: Dev.to

Publisher

一个 发射器Publisher)可以向一个或多个订阅者发送零个或多个值,并且只能发送一次结束事件,该事件可以是成功也可以是错误。
一旦发送了结束事件,发射器就不能再发送任何事件。

示例:从 NotificationCenter 的通知创建一个发射器。

let myNotification = Notification.Name("MyNotification")
let center = NotificationCenter.default
let publisher = center.publisher(for: myNotification)

注意: 上面的代码在至少有一个订阅者存在之前不会产生任何效果。

Subscriber

一个 订阅者Subscriber)定义了在发射器的下游(“downstream”)输出时应采取的行为。

继续上面的示例:

let subscription = publisher
    .sink { value in
        print("Notification received from a publisher with value: \(value)")
    }

center.post(name: myNotification, object: nil)
center.post(name: myNotification, object: nil)

subscription.cancel()
  • sink 创建一个订阅,该订阅会接收发射器发出的 所有 值(无限需求)。
  • sink 也可以处理结束事件:
let just = Just("Hello world!")

_ = just.sink(
    receiveCompletion: { print("Received completion", $0) },
    receiveValue: { print("Received value", $0) }
)

_ = just.sink(
    receiveCompletion: { print("Received completion (another)", $0) },
    receiveValue: { print("Received value (another)", $0) }
)

// Output:
// Received value Hello world!
// Received completion finished
// Received value (another) Hello world!
// Received completion (another) finished

Just 是一种发射器,它会向每个订阅者发送单个值,然后结束。

常用操作符

assign(to:on:)

将收到的值赋给对象的一个支持 KVO 的属性。

class SomeObject {
    var value: String = "" {
        didSet { print(value) }
    }
}

let object = SomeObject()
let publisher = ["Hello", "world!"].publisher

_ = publisher.assign(to: \.value, on: object)

assign(to:)(配合 @Published

将收到的值重新通过另一个发射器发送,常用于属性标记了 @Published 的情况。

class SomeObject {
    @Published var value = 0
}

let object = SomeObject()
object.$value
    .sink { print($0) }

(0..()

    init() {
        ["A", "B", "C"].publisher
            .assign(to: \.word, on: self)
            .store(in: &subscriptions)
    }
}

订阅管理

订阅(Publisher + 操作符 + Subscriber)会返回一个 AnyCancellable 实例,它充当取消订阅和释放资源的令牌。

  • 如果不保持对订阅的引用,订阅会在作用域结束时自动取消。
  • 资源会在失去内存引用或显式调用 cancel() 时被释放。

基础协议

public protocol Publisher {
    associatedtype Output
    associatedtype Failure: Error

    func receive<S>(subscriber: S) where S: Subscriber,
                                        Self.Failure == S.Failure,
                                        Self.Output == S.Input
}

extension Publisher {
    public func subscribe<S>(_ subscriber: S) where S: Subscriber,
                                                   Self.Failure == S.Failure,
                                                   Self.Output == S.Input
}
public protocol Subscriber: CustomCombineIdentifierConvertible {
    associatedtype Input
    associatedtype Failure: Error

    func receive(subscription: Subscription)
    func receive(_ input: Input) -> Subscribers.Demand
    func receive(completion: Subscribers.Completion)
}
public protocol Subscription: Cancellable,
                               CustomCombineIdentifierConvertible {
    func request(_ demand: Subscribers.Demand)
}
  • subscription.request(_:) 用来设定订阅者愿意接收的最大值数量(backpressure 管理)。
  • 订阅者可以在每次收到值后调整需求。

自定义订阅者:IntSubscriber

下面示例展示了一个接收 Int 且永不失败的订阅者。

class IntSubscriber: Subscriber {
    typealias Input = Int
    typealias Failure = Never

    func receive(subscription: Subscription) {
        subscription.request(.max(3))   // 请求最多 3 个值
    }

    func receive(_ input: Int) -> Subscribers.Demand {
        print("Received value", input)
        return .none                     // 不调整需求
    }

    func receive(completion: Subscribers.Completion) {
        print("Received completion", completion)
    }
}

使用方式

let subscriber = IntSubscriber()
let publisher = (1...6).publisher

publisher.subscribe(subscriber)

// Output:
// Received value 1
// Received value 2
// Received value 3

如果 receive(_:) 返回 .unlimited(或每次返回 .max(1)),发射器会发送所有值并结束:

func receive(_ input: Int) -> Subscribers.Demand {
    print("Received value", input)
    return .unlimited   // 或 .max(1)
}

预期输出:

Received value 1
Received value 2
Received value 3
Received value 4
Received value 5
Received value 6
Received completion finished

Future 与异步示例

Future 会产生一个值并结束,但它是异步完成的。

// 1️⃣  返回一个 Future 的函数,该函数在延迟后对整数加 1
func futureIncrement(integer: Int, afterDelay delay: TimeInterval) -> Future<Int, Never> {
    Future { promise in
        print("Original")
        DispatchQueue.global().asyncAfter(deadline: .now() + delay) {
            print("Terminé")
            promise(.success(integer + 1))
        }
    }
}

// 2️⃣  创建 Future
let future = futureIncrement(integer: 1, afterDelay: 1)

// 3️⃣  第一个订阅者
future
    .sink(receiveCompletion: { print("First", $0) },
          receiveValue: { print("First", $0) })
    .store(in: &subscriptions)

// 4️⃣  第二个订阅者(可以多次订阅)
future
    .sink(receiveCompletion: { print("Second", $0) },
          receiveValue: { print("Second", $0) })
    .store(in: &subscriptions)

在此示例中:

  • 每个订阅者都会收到同一个结果值(2),且只会收到一次,即使 Future 在订阅之前已经创建。
  • 代码展示了如何将 Futuresink 结合使用,并将订阅存入 AnyCancellable 集合中。
Back to Blog

相关文章

阅读更多 »

Swift 和 UIKit 中的自定义字体

介绍 在本教程中,我将演示如何在 Swift 与 UIKit 中使用自定义字体。我将使用 Montserrat 和 Hind 字体,它们可以从…下载。