我用 Django、Redis 和 WebSockets 构建了一个实时股票价格跟踪器

发布: (2026年2月15日 GMT+8 06:46)
6 分钟阅读
原文: Dev.to

Source: Dev.to

Ebenezer Lamptey

我想在后端工程领域找到一个细分方向,并被实时系统所吸引。我想了解实时系统在底层是如何工作的——不仅仅是使用它们,而是自己构建一个。因此,我构建了一个 stock‑price tracker,它:

  • 每 60 秒获取一次实时价格,
  • 计算 SMA(简单移动平均线),
  • 检测交叉警报,且
  • 通过 WebSocket 将所有信息推送给已连接的客户端。

以下是我学到的内容。

它的功能(每 60 秒)

  • 获取 来自 Finnhub API 的 15 只股票的实时价格。
  • 保存 它们到 PostgreSQL。
  • 缓存 每只股票最近 5 个价格到 Redis。
  • 计算 来自缓存的 5 期 SMA。
  • 检测 看涨/看跌交叉警报。
  • 广播 所有信息给已连接的 WebSocket 客户端,使用单条消息。

The stack

  • Django + DRF – API层。
  • Celery + Celery Beat – 任务调度。
  • Redis – 缓存 Channels 后端。
  • Django Channels – WebSocket 支持。
  • Uvicorn – ASGI 服务器。
  • Finnhub API – 市场数据。
  • SQLite – 用于演示数据库(我在 Mac 上使用 PostgreSQL 时遇到问题)。

让我恍然大悟的部分

Redis 可以用于很多场景。虽然我之前把它当作 Celery broker 使用过,但这个项目让我看到了它更广泛的能力——尤其是作为滚动窗口数据存储。

单个 Redis 实例的三大职责

#角色描述
1Celery broker在 Celery Beat 与 worker 之间传递任务。
2Price cache将每只股票最近的 5 条价格存储为 Redis List
3Channel Layer backend让 Celery 与 Django Channels 通信,以广播 WebSocket 消息。

看到同一个服务能够承担三种完全不同的用途,真是灵光一现。

缓存工作原理

每只股票都有一个 Redis List 用来保存最近的 5 个价格。每次更新时我们执行两个命令:

RPUSH stock:AAPL:prices 255.78   # add new price to the end
LTRIM stock:AAPL:prices -5 -1   # keep only the newest 5 items

列表永远不会超过五个元素;最旧的价格会自动被移除。

我还使用 Redis pipelines 将这些命令批处理。与每只股票一次往返(共 15 次)相比,我把所有命令排队并在一次往返中执行——把 60 次往返压缩到仅 2 次。

SMA 与警报的工作原理

一旦缓存了五个价格,SMA 就是一个简单的平均值:

sma = sum(last_5_prices) / 5

交叉检测

# Bullish: price was below SMA, now above
if previous_price < previous_sma and current_price > current_sma:
    alert = "bullish"

# Bearish: price was above SMA, now below
if previous_price > previous_sma and current_price < current_sma:
    alert = "bearish"

我们需要前一个值来检测交叉,这也是缓存 SMA 很重要的原因。

实时广播是如何工作的

最棘手的部分是将后台 Celery 任务与 WebSocket 客户端连接起来。答案就在 Channel Layer

Celery 任务完成处理

向 Channel Layer(Redis)发布消息

Django Channels 接收消息

推送给所有已连接的 WebSocket 客户端

Redis 充当这两个进程之间的桥梁。

示例 WebSocket 负载

{
  "type": "stock_update",
  "timestamp": "2026-02-14T21:38:22+00:00",
  "stocks": [
    { "ticker": "AAPL", "price": 255.78, "sma": 254.32, "alert": null },
    { "ticker": "MSFT", "price": 401.32, "sma": 399.80, "alert": "bullish" },
    { "ticker": "TSLA", "price": 417.44, "sma": 419.10, "alert": "bearish" }
  ]
}

所有 15 支股票会在一条消息中发送,每 60 秒自动推送一次。

在开发环境中运行两个服务器

manage.py runserver 是一个 WSGI 服务器,只处理请求/响应周期。WebSocket 需要持久连接,因此需要 ASGI 服务器。

# DRF browsable API (WSGI)
python manage.py runserver        # → http://localhost:8000

# WebSocket server (ASGI)
uvicorn core.asgi:application --port 8001   # → ws://localhost:8001

REST 端点位于 8000 端口,WebSocket 连接位于 8001 端口。

我实际学到的内容

  • 了解 Celery Beat 如何进行后台任务调度。
  • 了解 Redis 列表 在滚动窗口数据中的完美用途。
  • 了解为什么 Redis 管道 对批量命令很重要。
  • 了解 WSGIASGI 的区别。
  • 了解 Django Channels 如何使用通道层在异步和同步代码之间桥接。
  • 了解如何端到端构建实时数据管道。

构建这个系统让实时系统不再神秘。它并不是魔法——只不过是一个生产者、一个通道和一个消费者。

源代码

未来计划

  • 简单的前端展示其工作原理
  • 确保在市场关闭时(自动)不进行 API 调用
  • 将数据库迁移到 PostgreSQL
0 浏览
Back to Blog

相关文章

阅读更多 »

在 Python 中使用 Zig 函数

概述 本文档展示了如何在 Zig 中编写一个小型 HTTP 客户端,将其暴露给 C,然后从 Python 调用它。重点在于:使用 Zig 的 :0const u8 nul…

硬核模拟二十一点

我构建了一个二十一点模拟器。那句话低估了它。实际上,我构建的是一个 28,000 行的 Python 框架,用于使用事件驱动的方式模拟纸牌游戏……