Combine #2:发布者与订阅者
Source: Dev.to
Publisher
一个 发射器(Publisher)可以向一个或多个订阅者发送零个或多个值,并且只能发送一次结束事件,该事件可以是成功也可以是错误。
一旦发送了结束事件,发射器就不能再发送任何事件。
示例:从 NotificationCenter 的通知创建一个发射器。
let myNotification = Notification.Name("MyNotification")
let center = NotificationCenter.default
let publisher = center.publisher(for: myNotification)
注意: 上面的代码在至少有一个订阅者存在之前不会产生任何效果。
Subscriber
一个 订阅者(Subscriber)定义了在发射器的下游(“downstream”)输出时应采取的行为。
继续上面的示例:
let subscription = publisher
.sink { value in
print("Notification received from a publisher with value: \(value)")
}
center.post(name: myNotification, object: nil)
center.post(name: myNotification, object: nil)
subscription.cancel()
sink创建一个订阅,该订阅会接收发射器发出的 所有 值(无限需求)。sink也可以处理结束事件:
let just = Just("Hello world!")
_ = just.sink(
receiveCompletion: { print("Received completion", $0) },
receiveValue: { print("Received value", $0) }
)
_ = just.sink(
receiveCompletion: { print("Received completion (another)", $0) },
receiveValue: { print("Received value (another)", $0) }
)
// Output:
// Received value Hello world!
// Received completion finished
// Received value (another) Hello world!
// Received completion (another) finished
Just 是一种发射器,它会向每个订阅者发送单个值,然后结束。
常用操作符
assign(to:on:)
将收到的值赋给对象的一个支持 KVO 的属性。
class SomeObject {
var value: String = "" {
didSet { print(value) }
}
}
let object = SomeObject()
let publisher = ["Hello", "world!"].publisher
_ = publisher.assign(to: \.value, on: object)
assign(to:)(配合 @Published)
将收到的值重新通过另一个发射器发送,常用于属性标记了 @Published 的情况。
class SomeObject {
@Published var value = 0
}
let object = SomeObject()
object.$value
.sink { print($0) }
(0..()
init() {
["A", "B", "C"].publisher
.assign(to: \.word, on: self)
.store(in: &subscriptions)
}
}
订阅管理
订阅(Publisher + 操作符 + Subscriber)会返回一个 AnyCancellable 实例,它充当取消订阅和释放资源的令牌。
- 如果不保持对订阅的引用,订阅会在作用域结束时自动取消。
- 资源会在失去内存引用或显式调用
cancel()时被释放。
基础协议
public protocol Publisher {
associatedtype Output
associatedtype Failure: Error
func receive<S>(subscriber: S) where S: Subscriber,
Self.Failure == S.Failure,
Self.Output == S.Input
}
extension Publisher {
public func subscribe<S>(_ subscriber: S) where S: Subscriber,
Self.Failure == S.Failure,
Self.Output == S.Input
}
public protocol Subscriber: CustomCombineIdentifierConvertible {
associatedtype Input
associatedtype Failure: Error
func receive(subscription: Subscription)
func receive(_ input: Input) -> Subscribers.Demand
func receive(completion: Subscribers.Completion)
}
public protocol Subscription: Cancellable,
CustomCombineIdentifierConvertible {
func request(_ demand: Subscribers.Demand)
}
subscription.request(_:)用来设定订阅者愿意接收的最大值数量(backpressure 管理)。- 订阅者可以在每次收到值后调整需求。
自定义订阅者:IntSubscriber
下面示例展示了一个接收 Int 且永不失败的订阅者。
class IntSubscriber: Subscriber {
typealias Input = Int
typealias Failure = Never
func receive(subscription: Subscription) {
subscription.request(.max(3)) // 请求最多 3 个值
}
func receive(_ input: Int) -> Subscribers.Demand {
print("Received value", input)
return .none // 不调整需求
}
func receive(completion: Subscribers.Completion) {
print("Received completion", completion)
}
}
使用方式
let subscriber = IntSubscriber()
let publisher = (1...6).publisher
publisher.subscribe(subscriber)
// Output:
// Received value 1
// Received value 2
// Received value 3
如果 receive(_:) 返回 .unlimited(或每次返回 .max(1)),发射器会发送所有值并结束:
func receive(_ input: Int) -> Subscribers.Demand {
print("Received value", input)
return .unlimited // 或 .max(1)
}
预期输出:
Received value 1
Received value 2
Received value 3
Received value 4
Received value 5
Received value 6
Received completion finished
Future 与异步示例
Future 会产生一个值并结束,但它是异步完成的。
// 1️⃣ 返回一个 Future 的函数,该函数在延迟后对整数加 1
func futureIncrement(integer: Int, afterDelay delay: TimeInterval) -> Future<Int, Never> {
Future { promise in
print("Original")
DispatchQueue.global().asyncAfter(deadline: .now() + delay) {
print("Terminé")
promise(.success(integer + 1))
}
}
}
// 2️⃣ 创建 Future
let future = futureIncrement(integer: 1, afterDelay: 1)
// 3️⃣ 第一个订阅者
future
.sink(receiveCompletion: { print("First", $0) },
receiveValue: { print("First", $0) })
.store(in: &subscriptions)
// 4️⃣ 第二个订阅者(可以多次订阅)
future
.sink(receiveCompletion: { print("Second", $0) },
receiveValue: { print("Second", $0) })
.store(in: &subscriptions)
在此示例中:
- 每个订阅者都会收到同一个结果值(
2),且只会收到一次,即使Future在订阅之前已经创建。 - 代码展示了如何将
Future与sink结合使用,并将订阅存入AnyCancellable集合中。