Combine #6:时间操控操作员

发布: (2025年12月28日 GMT+8 02:48)
11 min read
原文: Dev.to

Source: Dev.to

请提供您希望翻译的正文内容,我将按照要求将其翻译成简体中文并保留原有的格式、Markdown 语法以及技术术语。谢谢!

时间位移

delay(for:tolerance:scheduler:options:) 接收一个输入事件流,将其在参数 intervalSchedulerTimeType.Stride)指定的时间内存储,然后在指定的 scheduler 上逐个重新发出,保持与接收时相同的时间间隔。

// Publisher que será alimentado por el Timer.
let sourcePublisher = PassthroughSubject<Int, Never>()

// Publisher retrasado (delay).
// Se especifica el retraso con .seconds() y se emiten los valores
// en el scheduler .main para mostrarlos en pantalla.
let delayedPublisher = sourcePublisher.delay(
    for: .seconds(delayInSeconds),
    scheduler: DispatchQueue.main
)

// Timer que publica valores en el RunLoop.main.
Timer
    .publish(every: 1.0 / valuesPerSecond, on: .main, in: .common)
    .autoconnect()                 // Inicia el timer inmediatamente.
    .subscribe(sourcePublisher)    // Conecta el timer al publisher de origen.
    .store(in: &subscriptions)

在上面的示例中使用了 Foundation 的 TimerPublisher 版本。
实例 Timer.TimerPublisher 遵循 ConnectablePublisher 协议,这意味着它只有在调用 .connect() 方法后才会发出元素。
操作符 autoconnect() 会在第一个订阅者订阅时自动调用 connect()

Source:

累计值

collect(_:options:) 接收一个输入事件流,在 strategy 参数(Publishers.TimeGroupingStrategy)定义的时间内进行缓存,然后在指定的 scheduler 上发出包含已收集事件的数组。

// 创建一个 publisher,Timer 将在其上发出值。
let sourcePublisher = PassthroughSubject<Int, Never>()

// 创建一个收集(collect)publisher。
// • 指定收集窗口(.byTime 或 .byTimeOrCount)
// • 在 scheduler .main 上发出值,以便在屏幕上显示
let collectedPublisher = sourcePublisher
    .collect(
        .byTime(DispatchQueue.main, .seconds(collectTimeStride))
    )
// 创建一个在 RunLoop.main 上发出值的定时器
Timer
    .publish(every: 1.0 / valuesPerSecond, on: .main, in: .common)
    .autoconnect()
    .subscribe(sourcePublisher)
    .store(in: &subscriptions)

分组策略

Publishers.TimeGroupingStrategy 可以是:

策略描述
.byTime仅按时间进行分组。
.byTimeOrCount按时间 按最大事件数量进行分组。

对于 .byTimeOrCount,如果在设定的时间结束前已经累计了达到上限的事件,则数组会立即发出。

let collectTimeStride = 4          // 窗口时长(秒)
let collectMaxCount   = 2          // 每个窗口的最大事件数

let collectedPublisher = sourcePublisher
    .collect(
        .byTimeOrCount(
            DispatchQueue.main,
            .seconds(collectTimeStride),
            collectMaxCount
        )
    )

在此示例中,当达到 4 秒的时间阈值 2 条事件的数量阈值 时,收集的事件数组将被发出,以先到者为准。

丢弃事件

debounce(for:scheduler:options:)

debounce(for:scheduler:options:) 在输入发布者发出 最后 一个元素后,等待由参数 dueTimeSchedulerTimeType.Stride)定义的时间,然后重新发出该最后的值。

重要:如果输入发布者在 debounce 的等待时间到期之前发送完成事件,则该操作符 不会 重新发出该事件。

import Combine

let subject = PassthroughSubject<Int, Never>()
var subscriptions = Set<AnyCancellable>()

// Debounce con ventana de 1 s, re‑emite en DispatchQueue.main
let debounced = subject
    .debounce(for: .seconds(1.0), scheduler: DispatchQueue.main)
    .share()                     // Un único punto de suscripción

// Suscripción para imprimir los eventos
debounced
    .sink { value in
        print("Debounced value: \(value)")
    }
    .store(in: &subscriptions)

在上面的示例中使用 share(),使多个订阅者能够接收相同的 debounce 结果,而不会创建该操作符的多个实例。

throttle(for:scheduler:latest:)

throttle(for:scheduler:latest:) 也会降低输入流产生的事件数量,但其行为与 debounce 不同:

特性debouncethrottle
窗口在收到 最后 一个事件 之后 打开。在收到 第一个 事件 之后 打开。
发出仅窗口中的 最后 一个事件。窗口中的 第一个latest = false)或 最后latest = true)事件。
第一个值仅在窗口结束时发出。在收到第一个事件时立即发出。
import Combine

let subject = PassthroughSubject<Int, Never>()
var subscriptions = Set<AnyCancellable>()

// Throttle con ventana de 1 s, emite el último evento de cada ventana
let throttled = subject
    .throttle(for: .seconds(1.0), scheduler: DispatchQueue.main, latest: true)

throttled
    .sink { value in
        print("Throttled value: \(value)")
    }
    .store(in: &subscriptions)
  • 使用 latest: false 将在每个 1 秒间隔中获取 第一个 值。
  • 使用 latest: true 将在同一间隔内获取 最后 接收到的值。

节流

let throttled = subject
    .throttle(for: .seconds(throttleDelay),
               scheduler: DispatchQueue.main,
               latest: true)

// Se crea una suscripción sobre `throttled` para imprimir los eventos
throttled
    .sink {  }
    .store(in: &subscriptions)

Timeout

timeout(_:scheduler:options:customError:) 如果上游(upstream)在未发出任何事件的情况下超过 interval 定义的时间,则发布一个结束事件(successfailure)。

  • 如果 customErrornil,将发出成功的结束事件。
  • 否则,将发出该闭包中定义的错误。
enum TimeoutError: Swift.Error {
    case timedOut
}

// Se crea un subject para emitir eventos.
let subject = PassthroughSubject<String, Never>()

// Se aplica el operador `timeout` para emitir un evento de fin (en este caso,
// un error `.timedOut`) si no se recibe ningún evento en 5 segundos.
let timeoutSubject = subject
    .timeout(.seconds(5), scheduler: DispatchQueue.main) {
        TimeoutError.timedOut
    }

测量时间

measureInterval(using:options:) 测量并发出两个从输入流接收到的事件之间的时间间隔。

Publisher 发出的值类型是通过参数传入的 SchedulerTimeInterval

Scheduler单位
DispatchQueue纳秒
RunLoop
cancellable = Timer.publish(every: 1, on: .main, in: .default)
    .autoconnect()
    .measureInterval(using: RunLoop.main)
    .sink { interval in
        print(interval)   // → Stride(magnitude: 1.0013610124588013)
                           // → Stride(magnitude: 0.9992760419845581)
    }

在上面的示例中,有一个每秒发出一次事件的 Timer
请记得调用 autoconnect() 在第一次订阅时启动 Timer
发出的值是 ,因为 SchedulerRunLoop.main

问卷

  1. 简要说明 delay(for:scheduler:) 的作用以及它如何保持事件的时间间隔。

  2. Timer.TimerPublisherautoconnect() 操作符的作用是什么?

  3. collect 操作符中 .byTime.byTimeOrCount 策略有什么区别?

  4. debouncethrottle 有何区别?

  5. measureInterval(using:) 发出什么类型的值,其计量单位取决于什么?

  6. 如果带有 debounce(for:)Publisher 在等待时间结束前完成,会发生什么?

    • 仍然会发出最后一个值
    • 不发出任何值
    • 完成被延迟
    • 订阅被取消
  7. throttle(for:scheduler:latest:) 中,如果 latest = false,每个窗口会发出哪个值?

    • 收到的第一个值
    • 收到的最后一个值
    • 所有收到的值
    • 直到结束都不发出
  8. timeout(_:scheduler:options:customError:) 操作符在以下情况下会发出错误或成功完成…

    • Publisher 在间隔之前结束
    • Publisher 在间隔期间未发出任何值
    • Publisher 发出多个值
    • 手动取消订阅
  9. measureInterval(using:) 中,RunLoop.main 发出的值的计量单位是:

    • 纳秒
    • 毫秒
    • 分钟
  10. 使用 collect(.byTimeOrCount) 时,在收集窗口结束前发生了发射。这可能是什么原因?

    • 累计了参数定义的事件数量
    • 输入的 Publisher 发送了结束事件
    • 这是误报
    • 订阅出现错误

解决方案

  1. delay(for:scheduler:)
    将上游接收到的事件保存在内存中,并在参数 interval 定义的延迟时间后逐个重新发出,保持原始的时间间隔。

  2. autoconnect()Timer.TimerPublisher
    TimerPublisher可连接的,这意味着必须显式调用 .connect() 才会开始发出事件。autoconnect() 会在收到第一个订阅时自动建立连接。

  3. .byTime.byTimeOrCountcollect 中的区别

    • .byTime 仅按时间分组,在指定的时间段结束时发出累计的数组。
    • .byTimeOrCount 同时按时间和数量累计;在时间段结束 达到指定数量时(先到者)发出数组。
  4. debouncethrottle 的区别

    • debounce 在“防抖”时间结束后发出 最后 一个收到的值。
    • throttle 周期性 发出值,并且可以根据 latest 参数在每个时间窗口内发出 第一个最后一个 收到的值。
  5. measureInterval(using:) 发出的值
    发出一个 Stride 类型的值(具体为 SchedulerTimeType.Stride),表示两个时间点之间的间隔。度量单位取决于所使用的 Scheduler(例如 DispatchQueue 为纳秒,RunLoop 为秒等)。

  6. 如果使用 debounce(for:) 的 Publisher 在等待时间结束前完成
    将不会发出任何值。上游完成时,未发出的最后一个值会被丢弃。

  7. throttle(for:scheduler:latest:)latest = false
    在每个时间窗口内发出 第一个 收到的值。

  8. timeout(_:scheduler:options:customError:)
    当 Publisher 在指定的时间间隔内 未发出任何值 时,会发出错误(如果 customErrornil 则正常完成)。

  9. measureInterval(using:)RunLoop.main 上的度量单位
    值以 为单位。

  10. 使用 collect(.byTimeOrCount) 时的提前发射
    因为上游 Publisher 发送了 结束事件,导致在时间窗口尚未到期时就强制发出累计的数组。

问答

7. 在 throttle(for:scheduler:latest:) 中,如果 latest = false,每个窗口会发出什么值?

  • [✅] 第一个收到的值

8. 操作符 timeout(_:scheduler:options:customError:) 在以下情况下会发出错误或成功完成…

  • [✅] 发布者在该时间间隔内未发出任何内容

9. 在 measureInterval(using:) 中,RunLoop.main 发出的值的计量单位是:

  • [✅] 秒

10. 使用 collect(.byTimeOrCount) 时,在收集窗口结束前发生了发射。可能是什么原因?

  • [✅] 达到了参数定义的事件数量 (注:原答案指出是完成事件;此处保持原文)
Back to Blog

相关文章

阅读更多 »

Combine #13:资源管理

share 和 multicast_: share 大多数 Combine 的 Publisher 是 struct,只是描述一个 pipeline,而不保存共享状态。不会创建一个 i...

Swift Combine 中的热与冷发布者

什么是 Hot 和 Cold Publisher? Cold Publisher Cold Publisher 为每个订阅者创建一个新的执行。当你订阅时,工作会重新开始。swift...

Swift #28:Foundation

抱歉,我没有看到需要翻译的具体文字内容。请提供要翻译的摘录或摘要,我会帮您翻译成简体中文。