**8 Python Database Optimization Techniques to 10x Your Application Performance**
Source: Dev.to
š About the Author
As a bestāselling author, I invite you to explore my books on Amazon.
Donāt forget to follow me on Medium and show your support. Thank you! Your support means the world!
š Making Python Database Access Faster
Working with databases in Python, Iāve learned that speed isnāt just a niceātoāhaveāitās essential. When an application slows down, the database calls are often the culprit. Over time Iāve gathered a set of practical methods that make a real difference. These arenāt just theories; they are techniques I use regularly to keep applications responsive, even as they grow. Below are eight of the most effective ones.
1ļøā£ Inspect the Query Plan
First, see what the database is actually doing. In PostgreSQL you can prepend a query with EXPLAIN ANALYZE. This doesnāt execute the query for real; it shows the execution plan and cost estimates.
import psycopg2
# Connect to your database
conn = psycopg2.connect(database="myapp", user="app_user", password="secret")
cur = conn.cursor()
# Ask the database to explain its plan for a query
query = "SELECT * FROM user_orders WHERE user_id = 456;"
cur.execute(f"EXPLAIN ANALYZE {query}")
execution_plan = cur.fetchall()
for line in execution_plan:
print(line[0])
# Look for lines about "Seq Scan" (slow) vs "Index Scan" (fast)
# Also check the estimated cost; a lower number is better.
If you see āSeq Scan on user_ordersā, the database is reading every rowāslow for large tables. You want to see āIndex Scanā instead. This simple check is the starting point for any performance issue.
2ļøā£ Add the Right Indexes
The most common fix for a slow query is adding an index. Think of an index like a bookās index: you jump straight to the relevant page instead of scanning every page.
from sqlalchemy import create_engine, text
engine = create_engine('postgresql://user:pass@localhost/myapp')
with engine.connect() as conn:
# Singleācolumn index
conn.execute(text("CREATE INDEX idx_user_email ON users(email);"))
# Composite index for queries that filter by city and status
conn.execute(
text(
"CREATE INDEX idx_city_active "
"ON customers(city, account_status) "
"WHERE account_status = 'active';"
)
)
print("Indexes created.")
Note: Indexes speed up reads but slow down writes because the index must be updated on every
INSERT/UPDATE. Add indexes only on columns frequently used inWHERE,ORDER BY, orJOINclauses.
3ļøā£ Use a Connection Pool
When many users open and close connections, you can run out of connections or suffer overhead. A connection pool keeps a set of open connections ready for reuse.
from sqlalchemy import create_engine, text
from sqlalchemy.pool import QueuePool
engine = create_engine(
'postgresql://user:pass@localhost/myapp',
poolclass=QueuePool,
pool_size=10, # 10 connections always ready
max_overflow=20, # Allow up to 20 extra if needed
pool_timeout=30, # Wait 30āÆseconds for a free connection
pool_recycle=1800 # Recycle connections after 30āÆminutes
)
# Using the pool is the same as usual
with engine.connect() as conn:
result = conn.execute(text("SELECT name FROM products"))
for row in result:
print(row[0])
Set this up once when the application starts. The pool prevents ātoo many connectionsā errors in a busy web app.
4ļøā£ Batch Inserts / Updates
Inserting rows one at a time is a disaster for performance. Each insert is a roundātrip to the DB. Use batch operations instead.
import psycopg2
conn = psycopg2.connect(database="myapp", user="app_user", password="secret")
cur = conn.cursor()
# Data to insert
new_logs = [
('error', '2023-10-26 10:00:00', 'Payment failed'),
('info', '2023-10-26 10:00:01', 'User logged in'),
('warning', '2023-10-26 10:00:02', 'Cache nearly full'),
]
# Insert all rows in one roundātrip
cur.executemany(
"INSERT INTO app_logs (level, timestamp, message) VALUES (%s, %s, %s)",
new_logs
)
conn.commit()
print(f"Inserted {cur.rowcount} log entries efficiently.")
Batching can turn a minutesālong operation into a few seconds. The same idea works for bulk updates (e.g., using a CASE statement).
5ļøā£ Materialized Views for Heavy Queries
If a complex query joins many tables and performs heavy calculations, but the underlying data doesnāt change every second, a materialized view is perfect. It stores the query result as a real table that can be refreshed periodically.
from sqlalchemy import create_engine, text
from datetime import date
engine = create_engine('postgresql://user:pass@localhost/myapp')
with engine.connect() as conn:
# Create a materialized view for a weekly sales report
conn.execute(text("""
CREATE MATERIALIZED VIEW weekly_sales_report AS
SELECT
o.order_id,
o.order_date,
c.customer_name,
SUM(oi.quantity * oi.unit_price) AS total_amount
FROM orders o
JOIN order_items oi ON o.order_id = oi.order_id
JOIN customers c ON o.customer_id = c.customer_id
WHERE o.order_date >= (CURRENT_DATE - INTERVAL '7 days')
GROUP BY o.order_id, o.order_date, c.customer_name
"""))
print("Materialized view created.")
Refresh it when the source data changes:
REFRESH MATERIALIZED VIEW weekly_sales_report;
6ļøā£ Use SELECT Only What You Need
Fetching unnecessary columns or rows wastes bandwidth and memory. Always limit the result set to what the application actually uses.
# Bad: selects all columns
cur.execute("SELECT * FROM users WHERE id = %s", (user_id,))
# Good: select only needed columns
cur.execute(
"SELECT username, email, created_at FROM users WHERE id = %s",
(user_id,)
)
7ļøā£ Leverage ServerāSide Cursors for Large Result Sets
When you need to process millions of rows, pulling them all into Python at once can exhaust memory. Serverāside (named) cursors stream rows incrementally.
import psycopg2
conn = psycopg2.connect(database="myapp", user="app_user", password="secret")
cur = conn.cursor(name="large_fetch") # Named cursor ā serverāside
cur.execute("SELECT id, data FROM big_table")
for row in cur:
process(row) # Handle one row at a time
8ļøā£ Cache Frequently Used Data
For data that rarely changes (e.g., lookup tables, configuration), cache it in memory or an external cache (Redis, Memcached). This eliminates repeated DB hits.
import redis
import json
r = redis.Redis(host='localhost', port=6379, db=0)
def get_country_name(country_code):
# Try cache first
cached = r.get(f"country:{country_code}")
if cached:
return cached.decode('utf-8')
# Fallback to DB
cur.execute(
"SELECT name FROM countries WHERE code = %s",
(country_code,)
)
name = cur.fetchone()[0]
# Store in cache for next time (TTL = 1 hour)
r.setex(f"country:{country_code}", 3600, name)
return name
šÆ Takeaway
Performance tuning is an iterative process: measure ā identify ā fix ā reāmeasure. By regularly inspecting query plans, adding appropriate indexes, pooling connections, batching operations, using materialized views, limiting SELECTs, streaming large results, and caching static data, youāll keep your Python applications fast and scalable.
Happy coding! š
Optimizing Database Queries & Caching
Materialized View Example
SELECT
product_id,
SUM(quantity) AS total_units,
SUM(quantity * unit_price) AS total_revenue
FROM order_details
WHERE order_date > CURRENT_DATE - 7
GROUP BY product_id
ORDER BY total_revenue DESC;
# Refresh the view (e.g., hourly via a scheduler)
conn.execute(text("REFRESH MATERIALIZED VIEW weekly_sales_report;"))
# Query the view ā instant results
result = conn.execute(text("SELECT * FROM weekly_sales_report LIMIT 5;"))
for row in result:
print(f"Product {row[0]}: ${row[2]:.2f} revenue")
The first creation and each refresh run the slow query, but every
SELECTfrom the materialized view is as fast as reading from a regular table. I use this for dashboards and reports.
Simple Redis Cache for FrequentlyāRead Data
import redis
import json
import hashlib
import psycopg2
# Connect to Redis
cache = redis.Redis(host='localhost', port=6379, db=0)
# Connect to PostgreSQL
db_conn = psycopg2.connect(database="myapp", user="user", password="pass")
db_cur = db_conn.cursor()
def get_top_products(limit=10, cache_seconds=300):
"""Return topāselling products, cached for `cache_seconds`."""
# 1ļøā£ Build a unique cache key
query_signature = f"top_products_{limit}"
cache_key = hashlib.md5(query_signature.encode()).hexdigest()
# 2ļøā£ Try the cache first
cached_result = cache.get(cache_key)
if cached_result is not None:
print("Result loaded from cache.")
return json.loads(cached_result)
# 3ļøā£ Cache miss ā query the DB
db_cur.execute("""
SELECT product_id, product_name, COUNT(*) AS order_count
FROM order_items
GROUP BY product_id, product_name
ORDER BY order_count DESC
LIMIT %s
""", (limit,))
result = db_cur.fetchall()
# 4ļøā£ Store the fresh result in Redis
cache.setex(cache_key, cache_seconds, json.dumps(result))
print("Result queried from database and cached.")
return result
# Usage
products = get_top_products(limit=5)
for prod_id, name, count in products:
print(f"{name}: ordered {count} times")
Setting a TTL (timeātoālive) prevents stale data from persisting forever. This pattern works well for homepage listings, leaderboards, or any public data that doesnāt change instantly.
QueryāRewrite Tips
Avoid unnecessary work by writing clearer SQL.
-- Slow version: IN subquery
SELECT *
FROM employees
WHERE department_id IN (
SELECT id FROM departments WHERE location = 'NYC'
);
-- Faster version: JOIN (often better optimized)
SELECT e.*
FROM employees e
JOIN departments d ON e.department_id = d.id
WHERE d.location = 'NYC';
-- Be specific in SELECT
SELECT id, first_name, email
FROM users;
Monitoring Query Performance
import time
import logging
from contextlib import contextmanager
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@contextmanager
def monitor_query(query_tag):
"""Time a database operation and log the duration."""
start = time.perf_counter()
try:
yield
finally:
elapsed = time.perf_counter() - start
logger.info(f"Query '{query_tag}' took {elapsed:.4f} seconds")
if elapsed > 0.5: # Warn on slow queries
logger.warning(f"Slow query alert: '{query_tag}'")
# Example usage
with monitor_query("fetch_recent_orders"):
cur.execute(
"SELECT * FROM orders WHERE order_date > NOW() - INTERVAL '1 day'"
)
orders = cur.fetchall()
print(f"Fetched {len(orders)} orders.")
Log these timings to a file or a monitoring system. Over time youāll see trends, catch regressions early, and turn performance from a mystery into a manageable discipline.
Putting It All Together
Database performance is about intentionality:
- Measure ā find the bottleneck.
- Target ā apply fixes (indexes, materialized views, caching).
- Scale ā use pooling, sharding, or other patterns as needed.
- Watch ā continuously monitor to ensure things stay fast.
You donāt need every technique on every project, but having them in your toolbox lets you handle almost any slowdown. Start small: pick one slow query today, explain it, and test an index or rewrite. That first win shows how powerful these methods can be.
š Checkout my latest ebook for free on my channel!
š Like, share, comment, and subscribe to stay updated.
101 Books
101 Books is an AIādriven publishing company coāfounded by author Aarav Joshi. By leveraging advanced AI technology, we keep publishing costs incredibly lowāsome books are priced as low as $4āmaking quality knowledge accessible to everyone.
Explore our catalog ā Golang Clean Code (link placeholder)
[w.amazon.com/dp/B0DQQF9K3Z](https://w.amazon.com/dp/B0DQQF9K3Z) **available on Amazon.**
Stay tuned for updates and exciting news. When shopping for books, search for **Aarav Joshi** to find more of our titles. Use the provided link to enjoy **special discounts**!
Our Creations
Be sure to check out our creations:
Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | Java Elite Dev | Golang Elite Dev | Python Elite Dev | JS Elite Dev | JS Schools
We are on Medium
Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva