从 Parquet 到 Snowflake:智能查询,快速加载

发布: (2026年1月7日 GMT+8 02:05)
3 min read
原文: Dev.to

Source: Dev.to

Cover image for From Parquet to Snowflake: Query Smart, Load Fast

问题

在处理大量金融数据时,高效查询并将结果加载到像 Snowflake 这样的数据仓库至关重要。任务是从存储在 AWS S3 上 3 TB 的 Parquet 数据中生成每日指标(总交易额、活跃客户数、平均余额)。数据按 transaction_date 分区,但较旧的分区列名不一致。结果必须加载到 Snowflake 以便进一步分析。

方法

高效查询数据

通过分区裁剪只读取最近 30 天的数据,从而节省时间和成本。

处理模式演进

使用 COALESCE 等 SQL 函数处理缺失或不同命名的列,确保跨分区的模式一致。

汇总指标

按地区聚合数据,计算总交易额、活跃客户数以及平均账户余额。

将数据加载到 Snowflake

处理完成后,使用 Snowflake 的 COPY INTO 方法进行高效的大规模写入,将 CSV 文件中的结果导入仓库。

为什么可行

  • 分区裁剪 将查询限制在相关数据上,使查询快速且成本低。
  • 使用 COALESCE 的模式处理 能在不断演进的分区数据之间实现无缝集成。
  • Snowflake 的优化加载机制 能实现快速可靠的数据传输。

该方法使在云存储中处理大型分区数据集变得可管理,同时确保高效处理并加载到 Snowflake。

PySQL 解决方案

使用分区裁剪读取最近 30 天的 Parquet

import duckdb
import datetime

end = datetime.date.today()
start = end - datetime.timedelta(days=30)

con = duckdb.connect()
df = con.execute(f"""
    SELECT *
    FROM read_parquet('s3://bank-lake/transactions/transaction_date>= {start} AND transaction_date <= {end}/*.parquet')
""").fetchdf()

汇总指标,处理模式差异

SELECT
    region,
    SUM(transaction_amount) AS total_tx,
    COUNT(DISTINCT customer_id) AS active_customers,
    AVG(COALESCE(account_balance, acct_balance)) AS avg_balance
FROM df
GROUP BY region

将结果加载到 Snowflake

import snowflake.connector

# Export the aggregated DataFrame to CSV
result.to_csv("daily.csv", index=False)

# Connect to Snowflake
conn = snowflake.connector.connect(
    user='YOUR_USER',
    password='YOUR_PASSWORD',
    account='YOUR_ACCOUNT'
)

# Upload and copy the CSV into Snowflake
conn.cursor().execute("""
    PUT file://daily.csv @%DAILY_REGION_METRICS;
    COPY INTO DAILY_REGION_METRICS
    FROM @%DAILY_REGION_METRICS
    FILE_FORMAT=(TYPE=CSV FIELD_OPTIONALLY_ENCLOSED_BY='"');
""")
Back to Blog

相关文章

阅读更多 »

构建 Write-Once 发布流水线

问题在于,撰写内容是最容易的部分。将其在各平台上持续发布——使用正确的 metadata、图片、标签、canonical URLs 和更新……