Combine #2: Emitters & Subscribers
Source: Dev.to
Publisher
Publisher인 emisor는 하나 이상의 구독자에게 0개 이상의 값을 전송할 수 있으며, 성공 또는 오류일 수 있는 단일 종료 이벤트를 전송합니다.
종료 이벤트가 전송된 후에는 발행자는 더 이상 이벤트를 전송할 수 없습니다.
예시: NotificationCenter의 알림으로부터 발행자를 생성하기.
let myNotification = Notification.Name("MyNotification")
let center = NotificationCenter.default
let publisher = center.publisher(for: myNotification)
Nota: 위 코드는 최소 하나의 구독자가 존재하기 전까지는 아무 일도 일어나지 않습니다.
Subscriber
Subscriber인 suscriptor는 발행자의 “downstream”(하류)에서 나오는 값을 어떻게 처리할지 정의합니다.
앞 예시를 이어서:
let subscription = publisher
.sink { value in
print("Notification received from a publisher with value: \(value)")
}
center.post(name: myNotification, object: nil)
center.post(name: myNotification, object: nil)
subscription.cancel()
sink는 발행자가 전송하는 모든 값을 받는 구독을 생성합니다(무제한 수요).sink는 종료 이벤트도 처리할 수 있습니다:
let just = Just("Hello world!")
_ = just.sink(
receiveCompletion: { print("Received completion", $0) },
receiveValue: { print("Received value", $0) }
)
_ = just.sink(
receiveCompletion: { print("Received completion (another)", $0) },
receiveValue: { print("Received value (another)", $0) }
)
// Output:
// Received value Hello world!
// Received completion finished
// Received value (another) Hello world!
// Received completion (another) finished
Just는 각 구독자에게 단일 값을 전송하고 바로 종료하는 발행자입니다.
Operadores comunes
assign(to:on:)
받은 값을 객체의 KVO를 지원하는 프로퍼티에 할당합니다.
class SomeObject {
var value: String = "" {
didSet { print(value) }
}
}
let object = SomeObject()
let publisher = ["Hello", "world!"].publisher
_ = publisher.assign(to: \.value, on: object)
assign(to:) (with @Published)
받은 값을 다른 발행자를 통해 다시 전송합니다. 프로퍼티가 @Published로 선언된 경우에 유용합니다.
class SomeObject {
@Published var value = 0
}
let object = SomeObject()
object.$value
.sink { print($0) }
(0..()
init() {
["A", "B", "C"].publisher
.assign(to: \.word, on: self)
.store(in: &subscriptions)
}
}
Gestión de suscripciones
구독(Publisher + 연산자 + Subscriber)은 AnyCancellable 인스턴스를 반환합니다. 이 인스턴스는 구독을 취소하고 리소스를 해제하는 토큰 역할을 합니다.
- 구독에 대한 참조를 유지하지 않으면 범위를 벗어날 때 자동으로 취소됩니다.
- 메모리에서 참조가 사라지거나
cancel()을 명시적으로 호출하면 리소스가 해제됩니다.
Protocolos básicos
public protocol Publisher {
associatedtype Output
associatedtype Failure: Error
func receive<S>(subscriber: S) where S: Subscriber,
Self.Failure == S.Failure,
Self.Output == S.Input
}
extension Publisher {
public func subscribe<S>(_ subscriber: S) where S: Subscriber,
Self.Failure == S.Failure,
Self.Output == S.Input
}
public protocol Subscriber: CustomCombineIdentifierConvertible {
associatedtype Input
associatedtype Failure: Error
func receive(subscription: Subscription)
func receive(_ input: Input) -> Subscribers.Demand
func receive(completion: Subscribers.Completion)
}
public protocol Subscription: Cancellable,
CustomCombineIdentifierConvertible {
func request(_ demand: Subscribers.Demand)
}
subscription.request(_:)는 구독자가 받을 수 있는 최대 값의 수를 지정합니다(backpressure 관리).- 구독자는 각 값을 받은 후 수요를 조정할 수 있습니다.
Suscriptor personalizado: IntSubscriber
Int를 받고 절대 실패하지 않는 구독자의 예시입니다.
class IntSubscriber: Subscriber {
typealias Input = Int
typealias Failure = Never
func receive(subscription: Subscription) {
subscription.request(.max(3)) // 최대 3개의 값 요청
}
func receive(_ input: Int) -> Subscribers.Demand {
print("Received value", input)
return .none // 수요를 조정하지 않음
}
func receive(completion: Subscribers.Completion) {
print("Received completion", completion)
}
}
Uso
let subscriber = IntSubscriber()
let publisher = (1...6).publisher
publisher.subscribe(subscriber)
// Output:
// Received value 1
// Received value 2
// Received value 3
receive(_:)가 .unlimited(또는 각 호출마다 .max(1))를 반환하면 발행자는 모든 값을 전송하고 종료합니다:
func receive(_ input: Int) -> Subscribers.Demand {
print("Received value", input)
return .unlimited // 또는 .max(1)
}
예상 출력:
Received value 1
Received value 2
Received value 3
Received value 4
Received value 5
Received value 6
Received completion finished
Future y ejemplos asíncronos
Future는 값을 생성하고 종료하지만, 비동기적으로 수행됩니다.
// 1️⃣ Función que devuelve un Future que incrementa un entero después de un retardo
func futureIncrement(integer: Int, afterDelay delay: TimeInterval) -> Future<Int, Never> {
Future { promise in
print("Original")
DispatchQueue.global().asyncAfter(deadline: .now() + delay) {
print("Terminé")
promise(.success(integer + 1))
}
}
}
// 2️⃣ Crear el Future
let future = futureIncrement(integer: 1, afterDelay: 1)
// 3️⃣ Primer suscriptor
future
.sink(receiveCompletion: { print("First", $0) },
receiveValue: { print("First", $0) })
.store(in: &subscriptions)
// 4️⃣ Segundo suscriptor (se puede suscribir varias veces)
future
.sink(receiveCompletion: { print("Second", $0) },
receiveValue: { print("Second", $0) })
.store(in: &subscriptions)
이 예시에서:
- 각 구독자는 동일한 결과값(
2)을 한 번만 받으며,Future는 구독 전에 이미 생성되었습니다. - 코드는
Future를sink와 결합하고 구독을AnyCancellable집합에 저장하는 방법을 보여줍니다.