面向数据工程师的 REST API 调用:实用指南与示例
Source: Dev.to
引言
作为一名 数据工程师,你很少只与数据库打交道。现代数据管道经常从 REST API 中摄取数据——无论是从 SaaS 工具(Salesforce、Jira、Google Analytics)获取数据,还是内部微服务或第三方提供商。
了解 REST API 的工作原理以及如何高效地与其交互是 核心数据工程技能。
本指南涵盖:
- 什么是 REST API(简要、实用)
- 从数据工程角度看常见的 REST 方法
- 认证模式
- 分页、过滤和速率限制
- 使用 Python 的真实案例
- 生产数据管道的最佳实践
什么是 REST API(数据工程师视角)
REST(表述性状态转移)API 允许系统通过 HTTP 使用标准方法进行通信。
从数据工程师的角度来看:
- REST API 是 数据源
- JSON 是最常见的 数据格式
- API 通常是 增量的、分页的、受速率限制的
- API 为 数据湖、数据仓库或流处理系统 提供数据
核心 REST HTTP 方法
| 方法 | 数据工程师的使用场景 |
|---|---|
| GET | 获取数据(最常用) |
| POST | 提交参数、创建资源、复杂查询 |
| PUT | 更新已有资源 |
| DELETE | 在管道中很少使用 |
在数据工程中,GET 和 POST 占约 90 % 的使用比例。
REST API 请求的结构
一个典型的 REST API 调用包括:
GET https://api.example.com/v1/orders?start_date=2025-01-01&limit=100
组成部分
- 基础 URL:
https://api.example.com - 端点:
/v1/orders - 查询参数:
start_date、limit - 请求头:认证、内容类型
- HTTP 方法:GET / POST
示例 1:简单 GET 请求(获取数据)
使用场景
从外部系统获取每日销售数据。
API 请求
GET https://api.company.com/v1/sales
Python 示例(requests 库)
import requests
url = "https://api.company.com/v1/sales"
headers = {
"Authorization": "Bearer YOUR_API_TOKEN",
"Accept": "application/json"
}
response = requests.get(url, headers=headers)
data = response.json()
print(data)
典型 JSON 响应
{
"sales": [
{
"order_id": 101,
"amount": 250.50,
"currency": "USD",
"order_date": "2025-01-10"
}
]
}
随后会对 JSON 进行:
- 扁平化
- 转换
- 存入数据湖或数据仓库
示例 2:查询参数(过滤数据)
使用场景
拉取 增量数据,避免重新处理历史记录。
GET /v1/sales?start_date=2025-01-01&end_date=2025-01-31
Python 代码
params = {
"start_date": "2025-01-01",
"end_date": "2025-01-31"
}
response = requests.get(url, headers=headers, params=params)
sales_data = response.json()
✅ 最佳实践: 始终将管道设计为 增量。
示例 3:POST 请求(复杂查询)
某些 API 在过滤条件复杂时需要 POST。
API 调用
POST /v1/sales/search
请求体
{
"region": ["US", "EU"],
"min_amount": 100,
"date_range": {
"from": "2025-01-01",
"to": "2025-01-31"
}
}
Python 示例
payload = {
"region": ["US", "EU"],
"min_amount": 100,
"date_range": {
"from": "2025-01-01",
"to": "2025-01-31"
}
}
response = requests.post(url, headers=headers, json=payload)
data = response.json()
认证方式(非常重要)
1. API Key 认证
Authorization: ApiKey abc123
2. Bearer Token(OAuth 2.0)
Authorization: Bearer eyJhbGciOi...
3. Basic Auth(安全性较低)
requests.get(url, auth=("username", "password"))
🔐 数据工程提示
将凭证存放在:
- 环境变量
- 密钥管理服务(AWS Secrets Manager、Azure Key Vault)
示例 4:分页(API 中非常常见)
大多数 API 对每次请求返回的结果数量有限制。
带分页的 API 响应
{
"data": [...],
"page": 1,
"total_pages": 10
}
Python 分页逻辑
all_data = []
page = 1
while True:
params = {"page": page, "limit": 100}
response = requests.get(url, headers=headers, params=params)
result = response.json()
all_data.extend(result["data"])
if page >= result["total_pages"]:
break
page += 1
✅ 务必处理分页,否则会悄悄漏掉数据。
示例 5:处理速率限制
API 常常限制请求频率:
429 Too Many Requests
重试逻辑示例
import time
response = requests.get(url, headers=headers)
if response.status_code == 429:
time.sleep(60) # 简单的退避
response = requests.get(url, headers=headers)
📌 生产管道 应使用:
- 指数退避
- 重试次数上限
示例 6:错误处理(管道关键)
response = requests.get(url, headers=headers)
if response.status_code != 200:
raise Exception(
f"API failed with status {response.status_code}: {response.text}"
)
常见 HTTP 状态码
200– 成功400– 错误请求401– 未授权404– 未找到500– 服务器错误
REST API 在数据管道中的数据流
REST API
↓
Python / Spark 作业
↓
原始区(JSON)
↓
转换(扁平化、清洗)
↓
数据仓库(Snowflake / BigQuery / Redshift)
数据工程师的最佳实践
- ✔ 始终设计 幂等 的管道
- ✔ 记录请求/响应的元数据
- ✔ 保存原始 API 响应以便重新处理
- ✔ 使用增量加载(时间戳、ID)
- ✔ 监控失败和延迟
- ✔ 尊重 API 速率限制
结论
REST API 是数据工程师 主要的数据摄取机制。掌握 REST 调用——包括认证、分页、重试和错误处理——能让你的管道 可靠、可扩展、具备生产就绪能力。对 REST API 的扎实理解会简化任何新数据源的集成过程。