Skip to content

Support multiple databases #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 34 additions & 5 deletions fastapi_asyncpg/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from __future__ import annotations

from fastapi import FastAPI

import asyncpg
Expand Down Expand Up @@ -46,27 +45,31 @@ async def on_connect(self):
the db"""
# if the pool is comming from outside (tests), don't connect it
if self._pool:
self.app.state.pool = self._pool
self._get_pool_manager_from_app().put(self.dsn, self._pool)
return
pool = await asyncpg.create_pool(dsn=self.dsn, **self.con_opts)
async with pool.acquire() as db:
await self.init_db(db)
self.app.state.pool = pool

self._get_pool_manager_from_app().put(self.dsn, pool)

async def on_disconnect(self):
# if the pool is comming from outside, don't desconnect it
# someone else will do (usualy a pytest fixture)
if self._pool:
return
await self.app.state.pool.close()
await self.pool.close()

def on_init(self, func):
self.init_db = func
return func

@property
def pool(self):
return self.app.state.pool
"""Fetch the connection pool associated with our DSN from the
pool manager stashed within app.state.
"""
return self._get_pool_manager_from_app().get(self.dsn)

async def connection(self):
"""
Expand Down Expand Up @@ -103,9 +106,35 @@ async def get_content(db = Depens(db.transaction)):
else:
await txn.commit()

def _get_pool_manager_from_app(self):
"""Find or create singleton AppPoolManager instance within self.app.state"""
if not hasattr(self.app.state, "fastapi_asyncpg_pool_manager"):
self.app.state.fastapi_asyncpg_pool_manager = AppPoolManager()

return self.app.state.fastapi_asyncpg_pool_manager

atomic = transaction


class AppPoolManager:
"""Object placed into fastapi app.state to manage one or more
asyncpg.pool.Pool instances within the fastapi app.

If the app uses more than one asyncpg database, then there
will be more than one pool. We separate them by the
connection DSN.
"""

def __init__(self):
self._pool_by_dsn = {}

def get(self, dsn):
return self._pool_by_dsn[dsn]

def put(self, dsn, pool):
self._pool_by_dsn[dsn] = pool


class SingleConnectionTestingPool:
"""A fake pool that simulates pooling, but runs on
a single transaction that it's rolled back after
Expand Down