一个用于清理、验证和查询 CSV/TSV/Excel/Parquet 文件的简约 Go 工具包

发布: (2025年12月10日 GMT+8 16:20)
5 min read
原文: Dev.to

Source: Dev.to

在许多软件系统中,并非所有数据都存放在数据库里。
有时它们存储在结构化文件中,例如 CSV、TSV 或电子表格,而实际上这些文件往往并不干净。数据可能是手动录入的,值可能缺失,行中可能出现导致下游处理失败的不一致。当文件很大时,尝试在 Excel 中“修复”会导致崩溃或难以定位无效记录。

我在实际项目中屡次遇到这些问题,于是构建了三个小型 Go 库,专注于结构化文件数据的预处理、验证和轻量分析。所有库都使用标准的 io.Reader 接口,允许你使用熟悉的 Go 原语而无需自定义数据结构。

  • fileprep – 预处理 + 基于结构体标签的字段级验证
  • fileframe – 一个小巧、不可变的 DataFrame,用于过滤和检查
  • filesql – 通过嵌入式 SQLite 数据库直接在 CSV/TSV/LTSV/Excel/Parquet 上运行 SQL

这些库相互独立,但共享同一套基于 io.Reader 的 API。

fileprep

功能

  • 预处理:去除空格、替换、Unicode 正规化、类型强制转换等
  • 清晰的错误报告:指明是哪一行、哪一列导致了失败
  • 支持复合验证(跨列规则)
  • 兼容 CSV、TSV、LTSV、Parquet、Excel —— 任何通过 io.Reader 提供的输入

示例

type User struct {
    Name  string `prep:"trim" validate:"required"`
    Email string `prep:"trim,lowercase"`
    Age   string
}

func main() {
    csvData := `name,email,age
  John Doe  ,JOHN@EXAMPLE.COM,30
Jane Smith,jane@example.com,25
`

    processor := fileprep.NewProcessor(fileprep.FileTypeCSV)
    var users []User

    reader, result, err := processor.Process(strings.NewReader(csvData), &users)
    if err != nil {
        fmt.Printf("Error: %v\n", err)
        return
    }

    fmt.Printf("Processed %d rows, %d valid\n", result.RowCount, result.ValidRowCount)

    for _, user := range users {
        fmt.Printf("Name: %q, Email: %q\n", user.Name, user.Email)
    }

    // `reader` can be passed directly to filesql
    _ = reader
}

输出

Processed 2 rows, 2 valid
Name: "John Doe", Email: "john@example.com"
Name: "Jane Smith", Email: "jane@example.com"

fileframe

功能

  • 不可变 DataFrame
  • 过滤、映射、分组
  • 适用于对小型/中型 CSV/TSV 数据集进行“一次性转换”

示例

// Sample sales data
csvData := `product,amount,category
Apple,100,Fruit
Banana,150,Fruit
Carrot,80,Vegetable
Orange,120,Fruit
Broccoli,90,Vegetable`

df, err := fileframe.NewDataFrame(strings.NewReader(csvData), fileframe.CSV)
if err != nil {
    fmt.Println("Error:", err)
    return
}

fmt.Printf("Total rows: %d\n", df.Len())
fmt.Printf("Columns: %v\n", df.Columns())

// Filter
filtered := df.Filter(func(row map[string]any) bool {
    amount, ok := row["amount"].(int64)
    return ok && amount > 100
})
fmt.Printf("Rows with amount > 100: %d\n", filtered.Len())

// GroupBy + Sum
groupedDf, err := df.GroupBy("category")
if err != nil {
    fmt.Println("Error:", err)
    return
}
grouped, err := groupedDf.Sum("amount")
if err != nil {
    fmt.Println("Error:", err)
    return
}

for _, row := range grouped.ToRecords() {
    fmt.Printf("  %s: %.0f\n", row["category"], row["sum_amount"])
}

输出

Total rows: 5
Columns: [product amount category]
Rows with amount > 100: 2
Fruit: 370
Vegetable: 170

filesql

filesql 并不是字面意义上的“文件上的 SQL”。它内部会把你的数据加载到一个临时的 SQLite 数据库中,从而让你在无需自行管理数据库的情况下拥有完整的 SQL 能力。

示例

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    db, err := filesql.OpenContext(ctx, "data.csv")
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    rows, err := db.QueryContext(ctx, "SELECT * FROM data WHERE age > 25")
    if err != nil {
        log.Fatal(err)
    }
    defer rows.Close()

    for rows.Next() {
        var name string
        var age int
        if err := rows.Scan(&name, &age); err != nil {
            log.Fatal(err)
        }
        fmt.Printf("Name: %s, Age: %d\n", name, age)
    }
}

如果你是 Go 开发者且不需要完整的 Apache 或 Python ETL 生态系统,这三个轻量库可能非常适合你的工作流。用户基数仍然较小,可能还有未被发现的 bug——欢迎提供反馈和提交 issue。

Back to Blog

相关文章

阅读更多 »