运行时适配器热切换与 Ports & Adapters — Alistair Cockburn 未记录的模式

发布: (2026年2月13日 GMT+8 05:07)
11 分钟阅读
原文: Dev.to

Source: Dev.to

请提供您希望翻译的正文内容,我将为您翻译成简体中文。

当 Alistair Cockburn 在 2005 年描述六边形架构时,他将适配器的可互换性记录为该模式的核心属性。在其著作的第 5 章中,他写道:

“对于每个端口,可能会有多个适配器,以适配各种可以插入该端口的技术。”

原始模式留下了一个未解之谜:当需要在运行时、跨分布式系统、因故障而切换适配器,并且在其他实例遇到同样问题之前该怎么办?

本文展示了使用以下技术实现 紧急适配器热切换 的具体方案:

  • Spring Boot 4
  • Spring Cloud Bus
  • Resilience4j

当某个服务实例检测到适配器故障时,它会通过消息总线广播该故障。所有其他实例会在自己遭遇超时之前 切换到备用适配器

Cockburn 在 LinkedIn 上的回应:
“太棒了!我已经着手记录长期运行系统的实时切换,但从未敢想紧急热切换——谢谢!”

Source:

问题

在典型的 Ports & Adapters 架构中,领域服务通过适配器与上游 API 通信。当该 API 开始超时时,每个服务实例都会独立发现故障并耗尽各自的超时窗口。

  • 10 个实例,每个 3 秒超时至少 30 秒 的累计降级体验。
  • 实际上,重试、线程池饱和以及级联故障会使影响更加严重。

经典的断路器模式(Hystrix、Resilience4j)是 按实例 工作的:每个服务在检测到故障后打开自己的断路器。没有协调机制——实例 7 并不知道实例 1 已在两秒前触发了超时。

Source:

解决方案:广播驱动的适配器切换

关键洞见: 第一个检测到故障的实例应当向所有其他实例发出警告。

Instance 1: timeout detected → broadcast "switch to fallback"
Instance 2: receives event → switches adapter (avoids timeout)
Instance 3: receives event → switches adapter (avoids timeout)
...
Instance N: receives event → switches adapter (avoids timeout)
  • 这是一种 应用于基础设施的最终一致性
  • 适配器状态会异步地在集群中传播。
  • 所有实例在第一次检测到故障后的毫秒级时间内,都会收敛到相同的回退适配器。

实现

完整源代码

(文章后面提供仓库链接。)

端口

public interface AnimalPort {
    String getAnimal();
    String name();
}

适配器注册表

注册表保存可用的适配器以及指向 活动 适配器的 AtomicReference。交换该引用是一次无锁、线程安全的操作。

@Component
public class AdapterConfig {

    private final Map<String, AnimalPort> adapters;
    private final AtomicReference<AnimalPort> activeAdapter;

    public AdapterConfig(
            @Qualifier("primaryAdapter")   final AnimalPort primary,
            @Qualifier("fallbackAdapter") final AnimalPort fallback) {

        this.adapters = Map.of(
                "primary",  primary,
                "fallback", fallback
        );
        this.activeAdapter = new AtomicReference<>(primary);
    }

    /** 返回当前使用的适配器。 */
    public AnimalPort getActiveAdapter() {
        return activeAdapter.get();
    }

    /**
     * 切换活动适配器。
     *
     * @param adapterName {@code adapters} 中的一个键
     * @return 若适配器实际发生了更改则返回 {@code true}
     */
    public boolean switchTo(final String adapterName) {
        final AnimalPort target   = adapters.get(adapterName);
        final AnimalPort previous = activeAdapter.getAndSet(target);
        return !previous.name().equals(adapterName);
    }
}

注意 – 已经在使用 适配器执行请求的线程会正常完成;新请求会立即看到新适配器。无需 synchronized 块。

超时检测 + 广播

服务层使用 Resilience4j TimeLimiter 包装每一次适配器调用。如果调用超时,首先检测到失败的实例会发布自定义 Spring Cloud Bus 事件,促使所有节点切换到回退适配器。

@Service
public class AnimalService {

    private final AdapterConfig               adapterConfig;
    private final ApplicationEventPublisher   publisher;
    private final BusProperties               busProperties;
    private final TimeLimiter                 timeLimiter;
    private final ExecutorService             executor;

    public AnimalService(final AdapterConfig adapterConfig,
                         final ApplicationEventPublisher publisher,
                         final BusProperties busProperties) {

        this.adapterConfig = adapterConfig;
        this.publisher     = publisher;
        this.busProperties = busProperties;

        this.timeLimiter = TimeLimiter.of(
                TimeLimiterConfig.custom()
                        .timeoutDuration(Duration.ofSeconds(3))
                        .cancelRunningFuture(true)
                        .build()
        );

        // Java 25 虚拟线程使超时包装保持轻量。
        this.executor = Executors.newVirtualThreadPerTaskExecutor();
    }

    public String getAnimal() {
        final AnimalPort adapter = adapterConfig.getActiveAdapter();

        try {
            // 在虚拟线程上运行可能耗时的调用。
            final CompletableFuture<String> future =
                    CompletableFuture.supplyAsync(adapter::getAnimal, executor);

            // 应用超时。
            return timeLimiter.executeFutureSupplier(() -> future);
        } catch (final Exception e) {
            // 第一个检测到失败的实例 → 向所有实例广播。
            publisher.publishEvent(new AdapterSwitchEvent(
                    this,
                    busProperties.getId(),
                    "**",               // 源标识(可选)
                    "fallback"
            ));

            // 若广播延迟,则在本地回退。
            return adapterConfig.getActiveAdapter().getAnimal();
        }
    }
}

关键决策

| 决策 |

| 原因 | |--------------------------------------|--------| | Virtual threads (Executors.newVirtualThreadPerTaskExecutor()) | Java 25 的虚拟线程避免阻塞操作系统线程,使超时包装保持轻量。 | | AtomicReference for the active adapter | 保证无锁、线程安全的切换。 | | Spring Cloud Bus event publishing | 提供一个简单的分布式消息总线,用于向所有实例广播切换。 | | Resilience4j TimeLimiter | 在不与熔断器耦合的情况下处理每次调用的超时检测。 |

端到端流程

  1. 实例 1 调用 AnimalService.getAnimal()
  2. 调用超过 3 秒的 TimeLimiter
  3. catch 块在总线上发布 AdapterSwitchEvent
  4. 所有实例(包括检测到超时的实例)收到事件并调用 AdapterConfig.switchTo("fallback")
  5. 之后每个实例的调用都使用回退适配器 而不会再触发超时

要点

  • 快速故障传播 在集群中显著降低上游故障期间的累计延迟。
  • 无锁适配器切换 避免竞争并保持请求延迟低。
  • 虚拟线程 使得超时包装器足够轻量,能够在每个请求中使用。
  • 该模式可以推广到任何需要热插拔的端口/适配器组合。

随意探索完整的示例仓库,并将此方法应用到您自己的六边形架构项目中!

Overview

  • 目标 – 总线事件面向 所有 服务,而不是特定实例。
  • 即时本地回退 – 广播后,当前请求会在本地回退,而不等待总线往返。

总线事件

一个携带目标适配器名称的自定义 RemoteApplicationEvent

public class AdapterSwitchEvent extends RemoteApplicationEvent {

    private final String targetAdapter;

    public AdapterSwitchEvent(final Object source,
                              final String originService,
                              final String destinationService,
                              final String targetAdapter) {
        super(source != null ? source : new Object(),
              originService,
              () -> destinationService != null ? destinationService : "**");
        this.targetAdapter = targetAdapter;
    }

    // getter
    public String getTargetAdapter() {
        return targetAdapter;
    }
}

Spring Cloud Bus 将此事件序列化为 JSON,推送到 RabbitMQ,所有已连接的实例都会收到该事件。每个实例上的监听器执行适配器切换:

@EventListener
public void onAdapterSwitch(final AdapterSwitchEvent event) {
    adapterConfig.switchTo(event.getTargetAdapter());
}

恢复检测

计划的健康检查器会 ping 主服务,并在其恢复时广播 switch‑back 事件:

@Scheduled(fixedDelayString = "${health.check.interval:5000}")
public void checkPrimaryHealth() {
    // If we are already using the primary adapter, no need to check.
    if ("primary".equals(adapterConfig.getActiveAdapterName())) {
        return;
    }

    try {
        String response = restClient.get()
                                   .uri("/health")
                                   .retrieve()
                                   .body(String.class);

        // When the primary reports it is up, publish a switch‑back event.
        if ("UP".equals(response)) {
            publisher.publishEvent(new AdapterSwitchEvent(
                this,
                busProperties.getId(),
                "**",          // placeholder for correlation ID or similar
                "primary"
            ));
        }
    } catch (Exception e) {
        // Primary is still down – ignore and retry on the next schedule.
    }
}

运行演示

git clone https://github.com/charles-hornick/adapter-hotswap-spring.git
cd adapter-hotswap-spring
docker-compose up --build

该演示运行两个 hotswap 服务实例,一个不稳定的主服务(循环超时),以及一个稳定的后备服务。请关注日志输出:

Response: Chien          ← primary adapter
Response: Chien
EVENT: Primary adapter timeout — broadcasting switch to fallback
EVENT: Switching adapter from 'primary' to 'fallback'
(instance 2) EVENT RECEIVED: Switch to 'fallback'
Response: Chat            ← fallback adapter
HEALTHCHECK: Primary adapter responding again
EVENT: Switching adapter from 'fallback' to 'primary'
Response: Chien          ← back to primary

实例 2 在从未经历超时的情况下完成切换。

限制与权衡

这只是一个演示,而非可投入生产的框架。实际使用时需考虑以下因素:

问题描述缓解措施
分脑风险如果 RabbitMQ 被分区,实例可能在活跃适配器上出现分歧。使用共识机制,或通过健康检查器接受最终收敛。
恢复时的惊群效应当广播“切换回”时,所有实例会同时访问主节点。为健康检查间隔添加抖动,或采用金丝雀方式,让单个实例先行切换回。
单点决策第一个检测到故障的实例为整个集群做决定;误报会导致不必要的切换。在广播前要求 N 次连续失败,或使用基于法定人数的决策。
总线延迟在广播与接收之间存在一个窗口期,期间其他实例仍可能访问故障适配器。接受最终一致性;RabbitMQ 的传播通常足够快,能满足大多数使用场景。

技术栈: Java 25, Spring Boot 4.0.2, Spring Cloud 2025.1.0, Resilience4j 2.3.0, RabbitMQ

作者: Charles Hornick – 专注于智能重构和遗留应用救援的 Java 顾问。

charles-hornick.be

0 浏览
Back to Blog

相关文章

阅读更多 »