Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .python-version

This file was deleted.

2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ lint:
flake8 .

test:
pytest --cov context_async_sqlalchemy exmaples/fastapi_example/tests --cov-report=term-missing
pytest --cov context_async_sqlalchemy examples/fastapi_example/tests --cov-report=term-missing

uv:
uv sync
Expand Down
293 changes: 41 additions & 252 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,283 +2,72 @@

[![PyPI](https://img.shields.io/pypi/v/context-async-sqlalchemy.svg)](https://pypi.org/project/context-async-sqlalchemy/)

[DOCUMENTATION](https://krylosov-aa.github.io/context-async-sqlalchemy/)

ContextVar + async sqlalchemy = happiness.
Provides a super convenient way to work with sqlalchemy in asynchronous
applications. It takes care of the issues of managing the lifecycle of engine,
session, and transactions without being a wrapper.

The main task is to get quick and easy access to the session and not worry
about when to open and when to close it.

The key features are:

- Super easy to use
- Automatically manages the lifecycle of engine, session, and transaction
(autocommit/autorollback)
- It doesn't interfere with manually opening and closing sessions and
transactions when needed.
- Does not depend on the web framework
- It is not a wrapper over sqlalchemy
- It is convenient to test
- Host switching in runtime
- It can manage multiple databases and multiple sessions to a single database
- Provides tools for concurrent sql queries
- Lazy initialization is everywhere

A convenient way to configure and interact with async sqlalchemy session
through context in asynchronous applications.

## What does usage look like?

```python
from context_async_sqlalchemy import db_session
from sqlalchemy import insert

from database import master # your configured connection to the database
from database import connection # your configured connection to the database
from models import ExampleTable # just some model for example

async def some_func() -> None:
# Created a session (no connection to the database yet)
# If you call db_session again, it will return the same session
# even in child coroutines.
session = await db_session(master)
session = await db_session(connection)

stmt = insert(ExampleTable).values(text="example_with_db_session")

# On the first request, a connection and transaction were opened
await session.execute(stmt)

# The commit and closing of the session will occur automatically
```


## How to use

The repository includes an example integration with FastAPI,
which describes numerous workflows.
[FastAPI example](https://github.com/krylosov-aa/context-async-sqlalchemy/tree/main/exmaples/fastapi_example/routes)


It also includes two types of test setups you can use in your projects.
The library currently has 90% test coverage. The tests are in the
examples, as we want to test not in the abstract but in the context of a real
asynchronous web application.

[FastAPI tests example](https://github.com/krylosov-aa/context-async-sqlalchemy/tree/main/exmaples/fastapi_example/tests)

### The most basic example

#### 1. Configure the connection to the database

for example for PostgreSQL database.py:
```python
from sqlalchemy.ext.asyncio import (
async_sessionmaker,
AsyncEngine,
AsyncSession,
create_async_engine,
)

from context_async_sqlalchemy import DBConnect


def create_engine(host: str) -> AsyncEngine:
"""
database connection parameters.
"""
# In production code, you will probably take these parameters from env
pg_user = "krylosov-aa"
pg_password = ""
pg_port = 6432
pg_db = "test"
return create_async_engine(
f"postgresql+asyncpg://"
f"{pg_user}:{pg_password}"
f"@{host}:{pg_port}"
f"/{pg_db}",
future=True,
pool_pre_ping=True,
)


def create_session_maker(
engine: AsyncEngine,
) -> async_sessionmaker[AsyncSession]:
"""session parameters"""
return async_sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)


master = DBConnect(
host="127.0.0.1",
engine_creator=create_engine,
session_maker_creator=create_session_maker,
)

```

#### 2. Manage Database connection lifecycle
Configure the connection to the database at the begin of your application's life.
Close the resources at the end of your application's life


Example for FastAPI:

```python
from contextlib import asynccontextmanager
from typing import Any, AsyncGenerator
from fastapi import FastAPI

from database import master


@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]:
"""Database connection lifecycle management"""
yield
await master.close() # Close the engine if it was open
```


#### 3. Setup context lifetime

For a contextual session to work, a context needs to be set. This assumes some
kind of middleware.


You can use ready-made FastAPI middleware:
```python
from fastapi import FastAPI
from context_async_sqlalchemy import add_fastapi_db_session_middleware

app = FastAPI()

add_fastapi_db_session_middleware(app)
```


I'll use FastAPI middleware as an example:
```python
from fastapi import Request
from starlette.middleware.base import ( # type: ignore[attr-defined]
Response,
RequestResponseEndpoint,
)

from context_async_sqlalchemy import (
init_db_session_ctx,
is_context_initiated,
reset_db_session_ctx,
auto_commit_by_status_code,
rollback_all_sessions,
)


async def fastapi_db_session_middleware(
request: Request, call_next: RequestResponseEndpoint
) -> Response:
"""
Database session lifecycle management.
The session itself is created on demand in db_session().

Transaction auto-commit is implemented if there is no exception and
the response status is < 400. Otherwise, a rollback is performed.

But you can commit or rollback manually in the handler.
"""
# Tests have different session management rules
# so if the context variable is already set, we do nothing
if is_context_initiated():
return await call_next(request)

# We set the context here, meaning all child coroutines will receive the
# same context. And even if a child coroutine requests the
# session first, the dictionary itself is shared, and this coroutine will
# add the session to dictionary = shared context.
token = init_db_session_ctx()
try:
response = await call_next(request)
await auto_commit_by_status_code(response.status_code)
return response
except Exception:
await rollback_all_sessions()
raise
finally:
await reset_db_session_ctx(token)
```


#### 4. Write a function that will work with the session

```python
from sqlalchemy import insert

from context_async_sqlalchemy import db_session

from database import master
from models import ExampleTable


async def handler_with_db_session() -> None:
"""
An example of a typical handle that uses a context session to work with
a database.
Autocommit or autorollback occurs automatically at the end of a request
(in middleware).
"""
# Created a session (no connection to the database yet)

# If you call db_session again, it will return the same session
# even in child coroutines.
session = await db_session(master)

stmt = insert(ExampleTable).values(text="example_with_db_session")

# On the first request, a connection and transaction were opened
session = await db_session(connection)

# The second request will use the same connection and the same transaction
await session.execute(stmt)
```


## Master/Replica or several databases at the same time

This is why `db_session` and other functions accept `DBConnect` as input.
This way, you can work with multiple hosts simultaneously,
for example, with the master and the replica.

libpq can detect the master and replica to create an engine. However, it only
does this once during creation. This handler helps change the host on the fly
if the master or replica changes. Let's imagine that you have a third-party
functionality that helps determine the master or replica.

In this example, the host is not set from the very beginning, but will be
calculated during the first call to create a session.

```python
from context_async_sqlalchemy import DBConnect

from master_replica_helper import get_master, get_replica


async def renew_master_connect(connect: DBConnect) -> None:
"""Updates the host if the master has changed"""
master_host = await get_master()
if master_host != connect.host:
await connect.change_host(master_host)


master = DBConnect(
engine_creator=create_engine,
session_maker_creator=create_session_maker,
before_create_session_handler=renew_master_connect,
)


async def renew_replica_connect(connect: DBConnect) -> None:
"""Updates the host if the replica has changed"""
replica_host = await get_replica()
if replica_host != connect.host:
await connect.change_host(replica_host)


replica = DBConnect(
engine_creator=create_engine,
session_maker_creator=create_session_maker,
before_create_session_handler=renew_replica_connect,
)
# The commit and closing of the session will occur automatically
```

## Testing

The library provides several ready-made utils that can be used in tests,
for example in fixtures. It helps write tests that share a common transaction
between the test and the application, so data isolation between tests is
achieved through fast transaction rollback.

## How it works

You can see the capabilities in the examples:
Here is a very simplified diagram of how everything works:

[Here are tests with a common transaction between the
application and the tests.](https://github.com/krylosov-aa/context-async-sqlalchemy/blob/main/exmaples/fastapi_example/tests/transactional/__init__.py)
![basic schema.png](docs_sources/docs/img/basic_schema.png)

1. Before executing your code, the middleware will prepare a container in
which the sessions required by your code will be stored.
The container is saved in contextvars
2. Your code accesses the library to create new sessions and retrieve
existing ones
3. After your code, middleware will automatically commit or roll back open
transactions. Closes open sessions and clears the context.

[And here's an example with different transactions.](https://github.com/krylosov-aa/context-async-sqlalchemy/blob/main/exmaples/fastapi_example/tests/non_transactional/__init__.py)
The library also provides the ability to commit, rollback, and close at any
time, without waiting for the end of the request, without any problems.
17 changes: 12 additions & 5 deletions context_async_sqlalchemy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
get_db_session_from_context,
put_db_session_to_context,
pop_db_session_from_context,
run_in_new_ctx,
)
from .connect import DBConnect
from .session import (
Expand All @@ -23,9 +22,15 @@
rollback_all_sessions,
close_all_sessions,
)
from .run_in_new_context import run_in_new_ctx
from .starlette_utils import (
add_starlette_http_db_session_middleware,
starlette_http_db_session_middleware,
)

from .fastapi_utils import (
fastapi_db_session_middleware,
add_fastapi_db_session_middleware,
fastapi_http_db_session_middleware,
add_fastapi_http_db_session_middleware,
)

__all__ = [
Expand All @@ -48,6 +53,8 @@
"commit_all_sessions",
"rollback_all_sessions",
"close_all_sessions",
"fastapi_db_session_middleware",
"add_fastapi_db_session_middleware",
"add_starlette_http_db_session_middleware",
"starlette_http_db_session_middleware",
"fastapi_http_db_session_middleware",
"add_fastapi_http_db_session_middleware",
]
4 changes: 2 additions & 2 deletions context_async_sqlalchemy/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ async def change_host(self, host: str) -> None:

async def create_session(self) -> AsyncSession:
"""Creates a new session"""
maker = await self.get_session_maker()
maker = await self.session_maker()
return maker()

async def get_session_maker(self) -> async_sessionmaker[AsyncSession]:
async def session_maker(self) -> async_sessionmaker[AsyncSession]:
"""Gets the session maker"""
if self._before_create_session_handler:
await self._before_create_session_handler(self)
Expand Down
Loading