-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathdatabase.py
58 lines (46 loc) · 1.64 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
"""Functionality to interact with the database."""
import asyncio
import os
from configparser import ConfigParser
from pathlib import Path
from typing import Any
from psycopg_pool import AsyncConnectionPool
def get_db_pool(
filename: str = "zeno_backend/database/database.ini",
section: str = "postgresql",
) -> AsyncConnectionPool:
"""Get the configuration of the database.
Args:
filename (str, optional): the path to the database.ini.
Defaults to "zeno_backend/database/database.ini".
section (str, optional): which section in the database.ini to read.
Defaults to "postgresql".
Raises:
Exception: reading the configuration failed.
Returns:
dict[str, Any]: the database configuration.
"""
if Path(filename).exists():
parser = ConfigParser()
parser.read(filename)
db: dict[str, Any] = {}
if parser.has_section(section):
params = parser.items(section)
for param in params:
db[param[0]] = param[1]
else:
raise Exception(f"Section {section} not found in the {filename} file")
else:
db: dict[str, Any] = {}
db["host"] = os.environ["DB_HOST"]
db["port"] = os.environ["DB_PORT"]
db["dbname"] = os.environ["DB_NAME"]
db["user"] = os.environ["DB_USER"]
db["password"] = os.environ["DB_PASSWORD"]
pool = AsyncConnectionPool(
" ".join([f"{k}={v}" for k, v in db.items()]), min_size=8, max_size=16
)
loop = asyncio.get_running_loop()
asyncio.run_coroutine_threadsafe(pool.open(), loop)
return pool
db_pool = get_db_pool()