大容量负载的 JSON 解析:在速度、内存和可扩展性之间的平衡
Source: Dev.to
介绍
想象一下,你为黑色星期五策划的营销活动取得了巨大的成功,客户开始涌入你的网站。你的 Mixpanel 设置通常每小时大约会收到 1 000 条客户事件,但现在在同一个小时内收到了 数百万 条事件。因此,你的数据管道现在需要解析大量的 JSON 数据并将其存入数据库。
你常用的 JSON 解析库跟不上突如其来的数据激增,近实时分析报告也开始延迟。这时你会意识到 高效的 JSON 解析库 有多么重要。除了能够处理大容量负载之外,一个好的库还应能够序列化和反序列化高度嵌套的 JSON 结构。
在本文中,我们将探讨用于大负载的 Python 解析库。我们重点考察 ujson、orjson 和 ijson 的功能,并对标准库 (json)、ujson 和 orjson 在序列化和反序列化性能上进行基准测试。
序列化 = 将 Python 对象转换为 JSON 字符串。
反序列化 = 从 JSON 字符串重建 Python 对象。
稍后展示的决策流图可以帮助你为工作流选择合适的解析器。我们还会介绍 NDJSON 以及能够解析 NDJSON 负载的库。让我们开始吧。
标准库 json
标准库支持对所有基本的 Python 数据类型(dict、list、tuple 等)进行序列化。当你调用 json.loads() 时,整个 JSON 文档会一次性加载到内存中。这在处理小型负载时没问题,但对于大型负载可能导致:
- 内存不足错误
- 下游工作流阻塞
import json
with open("large_payload.json", "r") as f:
json_data = json.loads(f) # 将整个文件一次性加载到内存中,所有 token 同时读取
ijson
对于 数百兆字节 规模的负载,ijson(全称 iterative json)一次读取 一个 token,避免了加载整个文档所带来的内存开销。
import ijson
with open("json_data.json", "r") as f:
# 每次从数组中获取一个 dict
for record in ijson.items(f, "items.item"):
process(record) # ijson 库一次读取一个 token
ijson 因此会流式处理每个元素,将其转换为 Python dict,并交给你的处理函数 (process(record))。

ujson

ujson 长期以来因其 C 基 实现并提供 Python 绑定而成为处理大 JSON 负载的热门选择,这使其比纯 Python 的 json 模块快得多。
注意: 维护者已将
ujson设为 仅维护 模式,因此新项目通常更倾向于使用orjson。
import ujson
taxonomy_data = (
'{"id":1, "genus":"Thylacinus", "species":"cynocephalus", "extinct": true}'
)
# Deserialize
data_dict = ujson.loads(taxonomy_data)
# Serialize
with open("taxonomy_data.json", "w") as fh:
ujson.dump(data_dict, fh)
# Deserialize again
with open("taxonomy_data.json", "r") as fh:
data = ujson.load(fh)
print(data)
orjson
orjson 是用 Rust 编写的,因而具备速度快和内存安全保证,而基于 C 的库(如 ujson)缺乏这些特性。它还支持序列化额外的 Python 类型,如 dataclass 和 datetime。
一个关键区别在于:orjson.dumps() 返回 bytes,而其他库返回字符串。返回 bytes 消除了额外的编码步骤,提升了 orjson 的高吞吐量。
import json
import orjson
# Example payload
book_payload = (
'{"id":1,"name":"The Great Gatsby","author":"F. Scott Fitzgerald"}'
)
# Serialize to bytes
json_bytes = orjson.dumps(json.loads(book_payload))
# Deserialize back to a Python object
obj = orjson.loads(json_bytes)
print(obj)
决策流程图
以下是一个简化的流程,帮助您挑选合适的解析器:
+-------------------+
| Payload size? |
+--------+----------+
|
+-------------+-------------+
| |
100 MB)** – stream with `ijson`.
NDJSON(Newline‑Delimited JSON)
当处理日志式数据时,NDJSON 通常更合适,因为每一行都是有效的 JSON 文档。您可以使用以下方式解析 NDJSON:
- Standard
json– 按行读取。 orjson– 快速的按行反序列化(orjson.loads(line))。ijson– 也能工作,但按行处理的方式通常更简单。
import orjson
with open("events.ndjson", "r") as f:
for line in f:
event = orjson.loads(line)
process(event)
摘要
| Library | Language | Speed | Memory usage | Streaming support | Extra features |
|---|---|---|---|---|---|
json (stdlib) | Python (C) | 基准 | 高(加载整个文档) | 否 | 无 |
ujson | C | 快速 | 中等(加载整个文档) | 否 | 仅维护 |
orjson | Rust | 最快 | 低(字节输出) | 否 | Dataclass, datetime, UUID, 等 |
ijson | Python (C) | 中等(流式) | 非常低 | 是 | 基于事件的解析 |
对于大多数新项目:
- 当负载适合内存时,使用
orjson以获得更快的速度和额外的类型支持。 - 对于真正巨大的负载或需要增量处理数据时,切换到
ijson。
祝解析愉快!
使用 json、ujson 和 orjson 进行 JSON 解析与序列化
import json
import ujson
import orjson
# Sample JSON payload
book_payload = '{"Title":"The Great Gatsby","Author":"F. Scott Fitzgerald","Publishing House":"Charles Scribner\'s Sons"}'
# Deserialize with orjson
data_dict = orjson.loads(book_payload)
print(data_dict)
# Serialize to a file
with open("book_data.json", "wb") as f:
f.write(orjson.dumps(data_dict)) # Returns a bytes object
# Deserialize from the file
with open("book_data.json", "rb") as f:
book_data = orjson.loads(f.read())
print(book_data)
测试 json、ujson 和 orjson 的序列化能力
我们创建一个包含整数、字符串和 datetime 值的示例 dataclass 对象。
from dataclasses import dataclass
from datetime import datetime
@dataclass
class User:
id: int
name: str
created: datetime
u = User(id=1, name="Thomas", created=datetime.now())
1. 标准库 json
import json
try:
print("json:", json.dumps(u))
except TypeError as e:
print("json error:", e)
结果: json 抛出 TypeError,因为它无法序列化 dataclass 实例或 datetime 对象。
2. ujson
import ujson
try:
print("ujson:", ujson.dumps(u))
except TypeError as e:
print("ujson error:", e)
结果: ujson 同样无法序列化 dataclass 和 datetime 值。
3. orjson
import orjson
try:
print("orjson:", orjson.dumps(u))
except TypeError as e:
print("orjson error:", e)
结果: orjson 能够成功序列化 dataclass 和 datetime 对象。
使用 NDJSON(换行分隔的 JSON)
NDJSON 是一种每行都是单独 JSON 对象的格式,例如:
{"id": "A13434", "name": "Ella"}
{"id": "A13455", "name": "Charmont"}
{"id": "B32434", "name": "Areida"}
它常用于日志和流式数据。下面展示了在 Python 中处理 NDJSON 的三种方法。
使用标准库 json 处理 NDJSON
import json
ndjson_payload = """{"id": "A13434", "name": "Ella"}
{"id": "A13455", "name": "Charmont"}
{"id": "B32434", "name": "Areida"}"""
# 将负载写入文件
with open("json_lib.ndjson", "w", encoding="utf-8") as fh:
for line in ndjson_payload.splitlines():
fh.write(line.strip() + "\n")
# 按行读取并处理
with open("json_lib.ndjson", "r", encoding="utf-8") as fh:
for line in fh:
if line.strip(): # 跳过空行
item = json.loads(line) # 反序列化
print(item) # 或传递给调用方函数
使用 ijson(流式解析器)处理 NDJSON
import ijson
ndjson_payload = """{"id": "A13434", "name": "Ella"}
{"id": "A13455", "name": "Charmont"}
{"id": "B32434", "name": "Areida"}"""
# 将负载写入文件
with open("ijson_lib.ndjson", "w", encoding="utf-8") as fh:
fh.write(ndjson_payload)
# 迭代解析
with open("ijson_lib.ndjson", "r", encoding="utf-8") as fh:
for item in ijson.items(fh, "", multiple_values=True):
print(item)
解释: ijson.items(fh, "", multiple_values=True) 将每个根元素(即每行)视为单独的 JSON 对象,并一次返回一个。
使用专用的 ndjson 库处理 NDJSON
import ndjson
ndjson_payload = """{"id": "A13434", "name": "Ella"}
{"id": "A13455", "name": "Charmont"}
{"id": "B32434", "name": "Areida"}"""
# 将负载写入文件
with open("ndjson_lib.ndjson", "w", encoding="utf-8") as fh:
fh.write(ndjson_payload)
# 加载文件 – 返回字典列表
with open("ndjson_lib.ndjson", "r", encoding="utf-8") as fh:
ndjson_data = ndjson.load(fh)
print(ndjson_data)
要点总结
- 对于小到中等规模的 NDJSON 负载,使用标准的
json模块逐行读取即可很好地工作。 - 对于非常大的负载,
ijson是最佳选择,因为它采用流式处理,内存占用极低。 - 如果需要 生成 NDJSON(即从 Python 对象输出),
ndjson库非常方便(ndjson.dumps()会自动完成转换)。
为什么在基准测试中不包括 ijson
ijson 是一个 流式解析器,在本质上与我们基准测试的批量解析器(json、ujson、orjson)不同。将流式解析器与批量解析器进行比较相当于“苹果对橙子”的比较:
- 批量解析器 将整个 JSON 文档加载到内存中,优化的是速度。
ijson则增量处理文档,优化的是内存效率。
如果在仅关注速度的基准测试中加入 ijson,它会被误标为最慢的选手,却忽略了它的主要优势——在处理大规模 JSON 流时的低内存消耗。因此,当主要关注内存使用时,ijson 会单独进行评估。
用于基准测试的合成 JSON 负载生成
我们使用 mimesis 库生成一个包含 100 万条记录 的大型合成 JSON 负载。该数据可用于对 JSON 库进行基准测试。下面的代码会创建该负载;生成的文件大小约为 100 – 150 MB,足以进行有意义的性能测试。
from mimesis import Person, Address
import json
person_name = Person("en")
complete_address = Address("en")
with open("large_payload.json", "w") as fh: # Streaming to a file
fh.write("[") # JSON array start
for i in range(1_000_000):
payload = {
"id": person_name.identifier(),
"name": person_name.full_name(),
"email": person_name.email(),
"address": {
"street": complete_address.street_name(),
"city": complete_address.city(),
"postal_code": complete_address.postal_code()
}
}
json.dump(payload, fh)
# Add a comma after every element except the last one
if i < 999_999:
fh.write(",")
fh.write("]") # JSON array end
示例输出
[
{
"id": "8177",
"name": "Willia Hays",
"email": "showers1819@yandex.com",
"address": {
"street": "Emerald Cove",
"city": "Crown Point",
"postal_code": "58293"
}
},
{
"id": "5931",
"name": "Quinn Greer",
"email": "professional2038@outlook.com",
"address": {
"street": "Ohlone",
"city": "Bridgeport",
"postal_code": "92982"
}
}
]
Source: …
让我们从基准测试开始
基准测试前置条件
我们将 JSON 文件读取为字符串,然后使用每个库的 loads() 函数进行反序列化。
with open("large_payload1.json", "r") as fh:
payload_str = fh.read() # 原始 JSON 文本
一个辅助函数会将给定的 loads 实现运行三次,并返回总耗时。
import time
def benchmark_load(func, payload_str):
start = time.perf_counter()
for _ in range(3):
func(payload_str)
end = time.perf_counter()
return end - start
基准测试反序列化速度
import json, ujson, orjson
results = {
"json.loads": benchmark_load(json.loads, payload_str),
"ujson.loads": benchmark_load(ujson.loads, payload_str),
"orjson.loads": benchmark_load(orjson.loads, payload_str),
}
for lib, t in results.items():
print(f"{lib}: {t:.4f} seconds")
结果: orjson 在反序列化方面最快。
基准测试序列化速度
import json, ujson, orjson
def benchmark_dump(func, obj):
start = time.perf_counter()
for _ in range(3):
func(obj)
end = time.perf_counter()
return end - start
# 示例对象(已加载)
example_obj = json.loads(payload_str)
ser_results = {
"json.dumps": benchmark_dump(json.dumps, example_obj),
"ujson.dumps": benchmark_dump(ujson.dumps, example_obj),
"orjson.dumps": benchmark_dump(orjson.dumps, example_obj),
}
for lib, t in ser_results.items():
print(f"{lib}: {t:.4f} seconds")