从 Parquet 到 Snowflake:智能查询,快速加载
发布: (2026年1月7日 GMT+8 02:05)
3 min read
原文: Dev.to
Source: Dev.to

问题
在处理大量金融数据时,高效查询并将结果加载到像 Snowflake 这样的数据仓库至关重要。任务是从存储在 AWS S3 上 3 TB 的 Parquet 数据中生成每日指标(总交易额、活跃客户数、平均余额)。数据按 transaction_date 分区,但较旧的分区列名不一致。结果必须加载到 Snowflake 以便进一步分析。
方法
高效查询数据
通过分区裁剪只读取最近 30 天的数据,从而节省时间和成本。
处理模式演进
使用 COALESCE 等 SQL 函数处理缺失或不同命名的列,确保跨分区的模式一致。
汇总指标
按地区聚合数据,计算总交易额、活跃客户数以及平均账户余额。
将数据加载到 Snowflake
处理完成后,使用 Snowflake 的 COPY INTO 方法进行高效的大规模写入,将 CSV 文件中的结果导入仓库。
为什么可行
- 分区裁剪 将查询限制在相关数据上,使查询快速且成本低。
- 使用
COALESCE的模式处理 能在不断演进的分区数据之间实现无缝集成。 - Snowflake 的优化加载机制 能实现快速可靠的数据传输。
该方法使在云存储中处理大型分区数据集变得可管理,同时确保高效处理并加载到 Snowflake。
PySQL 解决方案
使用分区裁剪读取最近 30 天的 Parquet
import duckdb
import datetime
end = datetime.date.today()
start = end - datetime.timedelta(days=30)
con = duckdb.connect()
df = con.execute(f"""
SELECT *
FROM read_parquet('s3://bank-lake/transactions/transaction_date>= {start} AND transaction_date <= {end}/*.parquet')
""").fetchdf()
汇总指标,处理模式差异
SELECT
region,
SUM(transaction_amount) AS total_tx,
COUNT(DISTINCT customer_id) AS active_customers,
AVG(COALESCE(account_balance, acct_balance)) AS avg_balance
FROM df
GROUP BY region
将结果加载到 Snowflake
import snowflake.connector
# Export the aggregated DataFrame to CSV
result.to_csv("daily.csv", index=False)
# Connect to Snowflake
conn = snowflake.connector.connect(
user='YOUR_USER',
password='YOUR_PASSWORD',
account='YOUR_ACCOUNT'
)
# Upload and copy the CSV into Snowflake
conn.cursor().execute("""
PUT file://daily.csv @%DAILY_REGION_METRICS;
COPY INTO DAILY_REGION_METRICS
FROM @%DAILY_REGION_METRICS
FILE_FORMAT=(TYPE=CSV FIELD_OPTIONALLY_ENCLOSED_BY='"');
""")