8 Python 数据库优化技术,让你的应用性能提升10倍

发布: (2025年12月19日 GMT+8 20:55)
12 min read
原文: Dev.to

Source: Dev.to

(请提供需要翻译的正文内容,我将为您翻译成简体中文。)

📚 关于作者

作为畅销书作者,我邀请您在 Amazon 上探索我的书籍。
别忘了在 Medium 关注我并表达您的支持。谢谢!您的支持意义非凡!

🚀 加速 Python 数据库访问

在使用 Python 操作数据库的过程中,我发现速度不仅是锦上添花,而是必不可少的。当应用变慢时,往往是数据库调用导致的。随着时间的推移,我汇总了一套实用的方法,能够真正提升性能。这些并非空洞的理论,而是我在实际项目中经常使用的技巧,帮助应用在规模扩大时仍保持响应迅速。下面列出 条最有效的做法。

1️⃣ 检查查询计划

首先,了解数据库到底在做什么。在 PostgreSQL 中可以在查询前加上 EXPLAIN ANALYZE。这不会真正执行查询,而是显示执行计划和成本估算。

import psycopg2

# Connect to your database
conn = psycopg2.connect(database="myapp", user="app_user", password="secret")
cur = conn.cursor()

# Ask the database to explain its plan for a query
query = "SELECT * FROM user_orders WHERE user_id = 456;"
cur.execute(f"EXPLAIN ANALYZE {query}")
execution_plan = cur.fetchall()

for line in execution_plan:
    print(line[0])

# Look for lines about "Seq Scan" (slow) vs "Index Scan" (fast)
# Also check the estimated cost; a lower number is better.

如果看到 “Seq Scan on user_orders”,说明数据库在对整张表进行顺序扫描——在大表上非常慢。你希望看到 “Index Scan”。这一步是排查性能问题的起点。

2️⃣ 添加合适的索引

慢查询最常见的解决办法就是创建索引。索引就像书的目录:可以直接跳到相关页,而不必遍历每一页。

from sqlalchemy import create_engine, text

engine = create_engine('postgresql://user:pass@localhost/myapp')
with engine.connect() as conn:
    # Single‑column index
    conn.execute(text("CREATE INDEX idx_user_email ON users(email);"))

    # Composite index for queries that filter by city and status
    conn.execute(
        text(
            "CREATE INDEX idx_city_active "
            "ON customers(city, account_status) "
            "WHERE account_status = 'active';"
        )
    )

    print("Indexes created.")

Note: 索引可以加快读取,但会减慢写入,因为每次 INSERT/UPDATE 都需要更新索引。只在经常出现在 WHEREORDER BYJOIN 子句中的列上创建索引。

3️⃣ 使用连接池

当大量用户频繁打开和关闭连接时,容易耗尽连接数或产生额外开销。连接池维护一组已打开的连接以供复用。

from sqlalchemy import create_engine, text
from sqlalchemy.pool import QueuePool

engine = create_engine(
    'postgresql://user:pass@localhost/myapp',
    poolclass=QueuePool,
    pool_size=10,        # 10 connections always ready
    max_overflow=20,     # Allow up to 20 extra if needed
    pool_timeout=30,     # Wait 30 seconds for a free connection
    pool_recycle=1800    # Recycle connections after 30 minutes
)

# Using the pool is the same as usual
with engine.connect() as conn:
    result = conn.execute(text("SELECT name FROM products"))
    for row in result:
        print(row[0])

在应用启动时配置一次即可。连接池可以防止繁忙的 Web 应用出现 “too many connections” 错误。

4️⃣ 批量插入 / 更新

一次只插入一行会极大拖慢性能,因为每次插入都要往返一次数据库。应使用批量操作。

import psycopg2

conn = psycopg2.connect(database="myapp", user="app_user", password="secret")
cur = conn.cursor()

# Data to insert
new_logs = [
    ('error',   '2023-10-26 10:00:00', 'Payment failed'),
    ('info',    '2023-10-26 10:00:01', 'User logged in'),
    ('warning', '2023-10-26 10:00:02', 'Cache nearly full'),
]

# Insert all rows in one round‑trip
cur.executemany(
    "INSERT INTO app_logs (level, timestamp, message) VALUES (%s, %s, %s)",
    new_logs
)
conn.commit()
print(f"Inserted {cur.rowcount} log entries efficiently.")

Batching can turn a minutes‑long operation into a few seconds. The same idea works for bulk updates (e.g., using a CASE statement).

5️⃣ Materialized Views for Heavy Queries

如果一个复杂查询需要连接许多表并进行大量计算,但底层数据并不是每秒都在变化,物化视图是完美的选择。它将查询结果存储为真实的表,并可以定期刷新。

from sqlalchemy import create_engine, text
from datetime import date

engine = create_engine('postgresql://user:pass@localhost/myapp')

with engine.connect() as conn:
    # Create a materialized view for a weekly sales report
    conn.execute(text("""
        CREATE MATERIALIZED VIEW weekly_sales_report AS
        SELECT
            o.order_id,
            o.order_date,
            c.customer_name,
            SUM(oi.quantity * oi.unit_price) AS total_amount
        FROM orders o
        JOIN order_items oi ON o.order_id = oi.order_id
        JOIN customers c ON o.customer_id = c.customer_id
        WHERE o.order_date >= (CURRENT_DATE - INTERVAL '7 days')
        GROUP BY o.order_id, o.order_date, c.customer_name
    """))
    print("Materialized view created.")

Refresh it when the source data changes:

REFRESH MATERIALIZED VIEW weekly_sales_report;

6️⃣ Use SELECT Only What You Need

获取不必要的列或行会浪费带宽和内存。始终将结果集限制在应用实际需要的范围内。

# Bad: selects all columns
cur.execute("SELECT * FROM users WHERE id = %s", (user_id,))

# Good: select only needed columns
cur.execute(
    "SELECT username, email, created_at FROM users WHERE id = %s",
    (user_id,)
)

7️⃣ Leverage Server‑Side Cursors for Large Result Sets

当需要处理数百万行时,一次性将它们全部拉入 Python 会耗尽内存。服务器端(具名)游标可以增量流式读取行。

import psycopg2

conn = psycopg2.connect(database="myapp", user="app_user", password="secret")
cur = conn.cursor(name="large_fetch")   # Named cursor → server‑side

cur.execute("SELECT id, data FROM big_table")
for row in cur:
    process(row)   # Handle one row at a time

8️⃣ Cache Frequently Used Data

对于很少变化的数据(例如查找表、配置),可以将其缓存到内存或外部缓存(Redis、Memcached)。这可以消除重复的数据库访问。

import redis
import json

r = redis.Redis(host='localhost', port=6379, db=0)

def get_country_name(country_code):
    # Try cache first
    cached = r.get(f"country:{country_code}")
    if cached:
        return cached.decode('utf-8')

    # Fallback to DB
    cur.execute(
        "SELECT name FROM countries WHERE code = %s",
        (country_code,)
    )
    name = cur.fetchone()[0]

    # Store in cache for next time (TTL = 1 hour)
    r.setex(f"country:{country_code}", 3600, name)
    return name

🎯 要点

性能调优是一个迭代过程:测量 → 识别 → 修复 → 重新测量。通过定期检查查询计划、添加适当的索引、使用连接池、批量操作、使用物化视图、限制 SELECT、流式传输大结果集以及缓存静态数据,您可以保持 Python 应用程序的快速和可扩展性。

祝编码愉快! 🚀

优化数据库查询与缓存

物化视图示例

SELECT
    product_id,
    SUM(quantity) AS total_units,
    SUM(quantity * unit_price) AS total_revenue
FROM order_details
WHERE order_date > CURRENT_DATE - 7
GROUP BY product_id
ORDER BY total_revenue DESC;
# 刷新视图(例如通过调度器每小时一次)
conn.execute(text("REFRESH MATERIALIZED VIEW weekly_sales_report;"))

# 查询视图 —— 立即返回结果
result = conn.execute(text("SELECT * FROM weekly_sales_report LIMIT 5;"))
for row in result:
    print(f"Product {row[0]}: ${row[2]:.2f} revenue")

第一次创建以及每次刷新都会运行慢查询,但对物化视图的每一次 SELECT 都像读取普通表一样快。我把它用于仪表盘和报告。

简单的 Redis 缓存用于高频读取数据

import redis
import json
import hashlib
import psycopg2

# 连接 Redis
cache = redis.Redis(host='localhost', port=6379, db=0)

# 连接 PostgreSQL
db_conn = psycopg2.connect(database="myapp", user="user", password="pass")
db_cur = db_conn.cursor()

def get_top_products(limit=10, cache_seconds=300):
    """返回热销商品,缓存 `cache_seconds` 秒。"""
    # 1️⃣ 构建唯一缓存键
    query_signature = f"top_products_{limit}"
    cache_key = hashlib.md5(query_signature.encode()).hexdigest()

    # 2️⃣ 先尝试从缓存读取
    cached_result = cache.get(cache_key)
    if cached_result is not None:
        print("Result loaded from cache.")
        return json.loads(cached_result)

    # 3️⃣ 缓存未命中 → 查询数据库
    db_cur.execute("""
        SELECT product_id, product_name, COUNT(*) AS order_count
        FROM order_items
        GROUP BY product_id, product_name
        ORDER BY order_count DESC
        LIMIT %s
    """, (limit,))
    result = db_cur.fetchall()

    # 4️⃣ 将新结果存入 Redis
    cache.setex(cache_key, cache_seconds, json.dumps(result))
    print("Result queried from database and cached.")
    return result

# 使用示例
products = get_top_products(limit=5)
for prod_id, name, count in products:
    print(f"{name}: ordered {count} times")

设置 TTL(生存时间)可以防止陈旧数据永久存在。该模式非常适合首页列表、排行榜或任何不会即时变化的公共数据。

查询重写技巧

通过编写更清晰的 SQL 来避免不必要的工作。

-- 慢版本:IN 子查询
SELECT *
FROM employees
WHERE department_id IN (
    SELECT id FROM departments WHERE location = 'NYC'
);
-- 快速版本:JOIN(通常优化更好)
SELECT e.*
FROM employees e
JOIN departments d ON e.department_id = d.id
WHERE d.location = 'NYC';
-- 在 SELECT 中只取需要的列
SELECT id, first_name, email
FROM users;

监控查询性能

import time
import logging
from contextlib import contextmanager

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@contextmanager
def monitor_query(query_tag):
    """计时数据库操作并记录持续时间。"""
    start = time.perf_counter()
    try:
        yield
    finally:
        elapsed = time.perf_counter() - start
        logger.info(f"Query '{query_tag}' took {elapsed:.4f} seconds")
        if elapsed > 0.5:  # 对慢查询发出警告
            logger.warning(f"Slow query alert: '{query_tag}'")

# 示例用法
with monitor_query("fetch_recent_orders"):
    cur.execute(
        "SELECT * FROM orders WHERE order_date > NOW() - INTERVAL '1 day'"
    )
    orders = cur.fetchall()

print(f"Fetched {len(orders)} orders.")

将这些时间记录到文件或监控系统中。随着时间推移,你会看到趋势,及早捕获回归,将性能从未知变为可管理的纪律。

综合运用

数据库性能关乎 有意性

  1. 测量 —— 找出瓶颈。
  2. 定位 —— 应用修复(索引、物化…)

views,缓存)。

  1. Scale – 根据需要使用连接池、分片或其他模式。
  2. Watch – 持续监控以确保系统保持高速。

你不需要在每个项目中使用所有技术,但将它们放在工具箱中可以让你应对几乎所有的性能下降。
从小处着手:今天挑选一个慢查询,使用 EXPLAIN 分析,并尝试添加索引或重写查询。第一次成功就能展示这些方法的强大威力。

📘 在我的频道免费获取我的最新电子书
👍 点赞分享评论订阅以获取最新动态。

101 本书

101 本书 是一家由作者 Aarav Joshi 共同创立的 AI 驱动出版公司。通过利用先进的 AI 技术,我们将出版成本保持在极低水平——有些书的售价低至 $4——让高质量的知识对每个人都可获取。

探索我们的目录 – Golang Clean Code (链接占位符)

[w.amazon.com/dp/B0DQQF9K3Z](https://w.amazon.com/dp/B0DQQF9K3Z) **available on Amazon.**

Stay tuned for updates and exciting news. When shopping for books, search for **Aarav Joshi** to find more of our titles. Use the provided link to enjoy **special discounts**!

保持关注以获取最新动态和精彩消息。购买图书时,搜索 Aarav Joshi 可找到我们的更多作品。使用提供的链接即可享受 特别折扣

我们的创作

请务必查看我们的创作:

Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | Java Elite Dev | Golang Elite Dev | Python Elite Dev | JS Elite Dev | JS Schools

我们在 Medium

Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva

Back to Blog

相关文章

阅读更多 »

第11周:Prisma!

涵盖主题✅ - 理解 ORM 实际解决了什么 - 使用 Prisma 定义数据库模式 - 生成类型安全的数据库客户端 - 执行 CRUD…