停止编写 try/except 地狱:使用 SQLAlchemy 与 Unit Of Work 清理数据库事务
发布: (2025年12月11日 GMT+8 08:37)
4 min read
原文: Dev.to
Source: Dev.to
async def create_order(user_id: int, items_in_basket: list[dict]) -> Order:
session = get_session()
try:
user: User = await session.get(User, user_id)
if not user:
await session.rollback()
raise ValueError("User not found")
order: Order = Order(user_id=user_id)
session.add(order)
for item in items_in_basket:
line: OrderLineItem = OrderLineItem(**item, order=order)
session.add(line)
await session.commit()
return order
except DuplicateError:
await session.rollback()
raise
except Exception:
await session.rollback()
raise
finally:
await session.close()
上述模式把 rollback 调用散落在各处,需要 finally 块,并且把业务逻辑和样板代码混在一起。更简洁的做法是使用 工作单元(Unit of Work,UoW)来自动管理会话生命周期。
什么是工作单元?
Martin Fowler 对其定义如下:
“工作单元会跟踪在一次业务事务中对数据库可能产生影响的所有操作。当事务结束时,它会计算出需要对数据库进行的所有更改。”
可以把它想象成在线购物车:你可以添加、删除或修改商品,但在点击 购买 之前,任何更改都不会持久化。如果你放弃购物车,则不会保存任何更改。数据库操作同理:
- 在内存中跟踪 更改
- 一次性提交 所有更改,或
- 如果出现错误则回滚 所有更改
实现方式
from typing import Self
from types import TracebackType
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
class UnitOfWork:
"""Async context manager for database transactions.
Commits on success, rolls back on exception, always cleans up.
"""
def __init__(self, session_factory: async_sessionmaker[AsyncSession]) -> None:
self._session_factory = session_factory
async def __aenter__(self) -> Self:
self.session = self._session_factory()
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
if exc_type is not None:
await self.rollback()
await self.session.close()
async def commit(self) -> None:
await self.session.commit()
async def rollback(self) -> None:
await self.session.rollback()
工作原理
__aenter__为每个async with块创建一个全新的AsyncSession,让每次事务都有独立的会话。__aexit__在块外抛出异常时自动回滚,然后关闭会话。- 显式的
commit()让你自行决定何时持久化更改。
显式优于隐式 —— 手动调用
commit()可以避免意外写入(例如提前返回或只执行只读查询时)。
使用方法
在应用启动时设置会话工厂(一次性)
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncEngine
engine: AsyncEngine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db"
)
SessionFactory = async_sessionmaker(engine, expire_on_commit=False)
创建工作单元并执行操作
async def main() -> None:
uow = UnitOfWork(session_factory=SessionFactory)
async with uow:
uow.session.add(User(name="Alice", email="alice@example.com"))
await uow.commit()
print(f"Created user with ID: {user.id}")
场景示例
正常路径 —— 提交成功
async with uow:
uow.session.add(User(name="Bob"))
await uow.commit() # changes are saved
提交前出现异常 —— 自动回滚
async with uow:
uow.session.add(User(name="Charlie"))
raise ValueError("Something went wrong")
# No commit called; __aexit__ rolls back automatically
在服务函数中使用工作单元
async def create_order(user_id: int, items_in_basket: list[dict], uow: UnitOfWork) -> Order:
async with uow:
user = await uow.session.get(User, user_id)
if not user:
raise ValueError("User not found")
order = Order(user_id=user_id)
uow.session.add(order)
for item in items_in_basket:
uow.session.add(OrderLine(**item, order=order))
await uow.commit()
return order
这种模式消除了分散的 rollback 调用,去掉了 finally 块的需求,并且将业务逻辑与持久化关注点清晰分离。