我用 Django、Redis 和 WebSockets 构建了一个实时股票价格跟踪器
Source: Dev.to
我想在后端工程领域找到一个细分方向,并被实时系统所吸引。我想了解实时系统在底层是如何工作的——不仅仅是使用它们,而是自己构建一个。因此,我构建了一个 stock‑price tracker,它:
- 每 60 秒获取一次实时价格,
- 计算 SMA(简单移动平均线),
- 检测交叉警报,且
- 通过 WebSocket 将所有信息推送给已连接的客户端。
以下是我学到的内容。
它的功能(每 60 秒)
- 获取 来自 Finnhub API 的 15 只股票的实时价格。
- 保存 它们到 PostgreSQL。
- 缓存 每只股票最近 5 个价格到 Redis。
- 计算 来自缓存的 5 期 SMA。
- 检测 看涨/看跌交叉警报。
- 广播 所有信息给已连接的 WebSocket 客户端,使用单条消息。
The stack
- Django + DRF – API层。
- Celery + Celery Beat – 任务调度。
- Redis – 缓存 和 Channels 后端。
- Django Channels – WebSocket 支持。
- Uvicorn – ASGI 服务器。
- Finnhub API – 市场数据。
- SQLite – 用于演示数据库(我在 Mac 上使用 PostgreSQL 时遇到问题)。
让我恍然大悟的部分
Redis 可以用于很多场景。虽然我之前把它当作 Celery broker 使用过,但这个项目让我看到了它更广泛的能力——尤其是作为滚动窗口数据存储。
单个 Redis 实例的三大职责
| # | 角色 | 描述 |
|---|---|---|
| 1 | Celery broker | 在 Celery Beat 与 worker 之间传递任务。 |
| 2 | Price cache | 将每只股票最近的 5 条价格存储为 Redis List。 |
| 3 | Channel Layer backend | 让 Celery 与 Django Channels 通信,以广播 WebSocket 消息。 |
看到同一个服务能够承担三种完全不同的用途,真是灵光一现。
缓存工作原理
每只股票都有一个 Redis List 用来保存最近的 5 个价格。每次更新时我们执行两个命令:
RPUSH stock:AAPL:prices 255.78 # add new price to the end
LTRIM stock:AAPL:prices -5 -1 # keep only the newest 5 items
列表永远不会超过五个元素;最旧的价格会自动被移除。
我还使用 Redis pipelines 将这些命令批处理。与每只股票一次往返(共 15 次)相比,我把所有命令排队并在一次往返中执行——把 60 次往返压缩到仅 2 次。
SMA 与警报的工作原理
一旦缓存了五个价格,SMA 就是一个简单的平均值:
sma = sum(last_5_prices) / 5
交叉检测
# Bullish: price was below SMA, now above
if previous_price < previous_sma and current_price > current_sma:
alert = "bullish"
# Bearish: price was above SMA, now below
if previous_price > previous_sma and current_price < current_sma:
alert = "bearish"
我们需要前一个值来检测交叉,这也是缓存 SMA 很重要的原因。
实时广播是如何工作的
最棘手的部分是将后台 Celery 任务与 WebSocket 客户端连接起来。答案就在 Channel Layer:
Celery 任务完成处理
↓
向 Channel Layer(Redis)发布消息
↓
Django Channels 接收消息
↓
推送给所有已连接的 WebSocket 客户端
Redis 充当这两个进程之间的桥梁。
示例 WebSocket 负载
{
"type": "stock_update",
"timestamp": "2026-02-14T21:38:22+00:00",
"stocks": [
{ "ticker": "AAPL", "price": 255.78, "sma": 254.32, "alert": null },
{ "ticker": "MSFT", "price": 401.32, "sma": 399.80, "alert": "bullish" },
{ "ticker": "TSLA", "price": 417.44, "sma": 419.10, "alert": "bearish" }
]
}
所有 15 支股票会在一条消息中发送,每 60 秒自动推送一次。
在开发环境中运行两个服务器
manage.py runserver 是一个 WSGI 服务器,只处理请求/响应周期。WebSocket 需要持久连接,因此需要 ASGI 服务器。
# DRF browsable API (WSGI)
python manage.py runserver # → http://localhost:8000
# WebSocket server (ASGI)
uvicorn core.asgi:application --port 8001 # → ws://localhost:8001
REST 端点位于 8000 端口,WebSocket 连接位于 8001 端口。
我实际学到的内容
- 了解 Celery Beat 如何进行后台任务调度。
- 了解 Redis 列表 在滚动窗口数据中的完美用途。
- 了解为什么 Redis 管道 对批量命令很重要。
- 了解 WSGI 与 ASGI 的区别。
- 了解 Django Channels 如何使用通道层在异步和同步代码之间桥接。
- 了解如何端到端构建实时数据管道。
构建这个系统让实时系统不再神秘。它并不是魔法——只不过是一个生产者、一个通道和一个消费者。
源代码
- GitHub: Stock-Price-Tracker-API
- 股票价格跟踪器 API
未来计划
- 简单的前端展示其工作原理
- 确保在市场关闭时(自动)不进行 API 调用
- 将数据库迁移到 PostgreSQL
