Combine #2: Emisores & Suscriptores
Source: Dev.to
Publisher
Un emisor (Publisher) puede transmitir cero o más valores a uno o más suscriptores, y un solo evento de fin que puede ser éxito o error.
Una vez emitido el evento de fin, el emisor no puede volver a transmitir eventos.
Ejemplo: crear un emisor a partir de una notificación del NotificationCenter.
let myNotification = Notification.Name("MyNotification")
let center = NotificationCenter.default
let publisher = center.publisher(for: myNotification)
Nota: El código anterior no hará nada hasta que exista al menos un suscriptor.
Subscriber
Un suscriptor (Subscriber) define el comportamiento a aplicar ante la salida (“downstream”) del emisor.
Continuando el ejemplo anterior:
let subscription = publisher
.sink { value in
print("Notification received from a publisher with value: \(value)")
}
center.post(name: myNotification, object: nil)
center.post(name: myNotification, object: nil)
subscription.cancel()
sinkcrea una suscripción que recibe todos los valores emitidos por el emisor (demanda ilimitada).sinktambién puede procesar los eventos de fin:
let just = Just("Hello world!")
_ = just.sink(
receiveCompletion: { print("Received completion", $0) },
receiveValue: { print("Received value", $0) }
)
_ = just.sink(
receiveCompletion: { print("Received completion (another)", $0) },
receiveValue: { print("Received value (another)", $0) }
)
// Output:
// Received value Hello world!
// Received completion finished
// Received value (another) Hello world!
// Received completion (another) finished
Just es un emisor que emite un solo valor a cada suscriptor y luego finaliza.
Operadores comunes
assign(to:on:)
Asigna el valor recibido a una propiedad que admite KVO de un objeto.
class SomeObject {
var value: String = "" {
didSet { print(value) }
}
}
let object = SomeObject()
let publisher = ["Hello", "world!"].publisher
_ = publisher.assign(to: \.value, on: object)
assign(to:) (con @Published)
Re‑emite el valor recibido a través de otro emisor, útil cuando la propiedad está marcada con @Published.
class SomeObject {
@Published var value = 0
}
let object = SomeObject()
object.$value
.sink { print($0) }
(0..()
init() {
["A", "B", "C"].publisher
.assign(to: \.word, on: self)
.store(in: &subscriptions)
}
}
Gestión de suscripciones
Las suscripciones (Publisher + operadores + Subscriber) devuelven una instancia de AnyCancellable, que actúa como token para cancelar la suscripción y liberar recursos.
- Si no se mantiene una referencia a la suscripción, ésta se cancela automáticamente al salir del ámbito.
- Los recursos se liberan tanto al perder la referencia en memoria como al llamar explícitamente a
cancel().
Protocolos básicos
public protocol Publisher {
associatedtype Output
associatedtype Failure: Error
func receive<S>(subscriber: S) where S: Subscriber,
Self.Failure == S.Failure,
Self.Output == S.Input
}
extension Publisher {
public func subscribe<S>(_ subscriber: S) where S: Subscriber,
Self.Failure == S.Failure,
Self.Output == S.Input
}
public protocol Subscriber: CustomCombineIdentifierConvertible {
associatedtype Input
associatedtype Failure: Error
func receive(subscription: Subscription)
func receive(_ input: Input) -> Subscribers.Demand
func receive(completion: Subscribers.Completion)
}
public protocol Subscription: Cancellable,
CustomCombineIdentifierConvertible {
func request(_ demand: Subscribers.Demand)
}
subscription.request(_:)fija el número máximo de valores que el suscriptor está dispuesto a recibir (gestión de backpressure).- El suscriptor puede ajustar la demanda después de recibir cada valor.
Suscriptor personalizado: IntSubscriber
Ejemplo de un suscriptor que recibe Int y nunca falla.
class IntSubscriber: Subscriber {
typealias Input = Int
typealias Failure = Never
func receive(subscription: Subscription) {
subscription.request(.max(3)) // solicita hasta 3 valores
}
func receive(_ input: Int) -> Subscribers.Demand {
print("Received value", input)
return .none // no ajusta la demanda
}
func receive(completion: Subscribers.Completion) {
print("Received completion", completion)
}
}
Uso
let subscriber = IntSubscriber()
let publisher = (1...6).publisher
publisher.subscribe(subscriber)
// Output:
// Received value 1
// Received value 2
// Received value 3
Si receive(_:) devuelve .unlimited (o .max(1) en cada llamada), el emisor enviará todos los valores y terminará:
func receive(_ input: Int) -> Subscribers.Demand {
print("Received value", input)
return .unlimited // o .max(1)
}
Salida esperada:
Received value 1
Received value 2
Received value 3
Received value 4
Received value 5
Received value 6
Received completion finished
Future y ejemplos asíncronos
Future produce un valor y termina, pero lo hace de forma asíncrona.
// 1️⃣ Función que devuelve un Future que incrementa un entero después de un retardo
func futureIncrement(integer: Int, afterDelay delay: TimeInterval) -> Future<Int, Never> {
Future { promise in
print("Original")
DispatchQueue.global().asyncAfter(deadline: .now() + delay) {
print("Terminé")
promise(.success(integer + 1))
}
}
}
// 2️⃣ Crear el Future
let future = futureIncrement(integer: 1, afterDelay: 1)
// 3️⃣ Primer suscriptor
future
.sink(receiveCompletion: { print("First", $0) },
receiveValue: { print("First", $0) })
.store(in: &subscriptions)
// 4️⃣ Segundo suscriptor (se puede suscribir varias veces)
future
.sink(receiveCompletion: { print("Second", $0) },
receiveValue: { print("Second", $0) })
.store(in: &subscriptions)
En este ejemplo:
- Cada suscriptor recibe el mismo valor resultante (
2) una sola vez, aunque elFuturese haya creado antes de suscribirse. - El código muestra cómo combinar
Futureconsinky almacenar la suscripción en un conjunto deAnyCancellable.