1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
import aiomysql
from typing import Optional, List, Dict, Any
import config
class Database:
"""MariaDB connection pool manager for async operations."""
_pool: Optional[aiomysql.Pool] = None
@classmethod
async def get_pool(cls) -> aiomysql.Pool:
"""Get or create the connection pool."""
if cls._pool is None or cls._pool.closed:
cls._pool = await aiomysql.create_pool(
host=config.DB_HOST,
port=config.DB_PORT,
user=config.DB_USER,
password=config.DB_PASSWORD,
db=config.DB_NAME,
autocommit=True,
minsize=1,
maxsize=10,
)
return cls._pool
@classmethod
async def close(cls):
"""Close the connection pool."""
if cls._pool is not None and not cls._pool.closed:
cls._pool.close()
await cls._pool.wait_closed()
cls._pool = None
@classmethod
async def execute(cls, query: str, args: tuple = ()) -> int:
"""Execute a query and return the number of affected rows."""
pool = await cls.get_pool()
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(query, args)
return cur.rowcount
@classmethod
async def execute_returning_id(cls, query: str, args: tuple = ()) -> int:
"""Execute an INSERT query and return the last inserted ID."""
pool = await cls.get_pool()
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(query, args)
return cur.lastrowid
@classmethod
async def fetchone(cls, query: str, args: tuple = ()) -> Optional[Dict[str, Any]]:
"""Fetch a single row as a dictionary."""
pool = await cls.get_pool()
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(query, args)
return await cur.fetchone()
@classmethod
async def fetchall(cls, query: str, args: tuple = ()) -> List[Dict[str, Any]]:
"""Fetch all rows as a list of dictionaries."""
pool = await cls.get_pool()
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(query, args)
return await cur.fetchall()
@classmethod
async def init_tables(cls):
"""Initialize database tables if they don't exist."""
create_commissions = """
CREATE TABLE IF NOT EXISTS commissions (
id INT AUTO_INCREMENT PRIMARY KEY,
discord_user_id BIGINT,
email VARCHAR(255) NOT NULL,
name VARCHAR(255) NOT NULL,
commission_type ENUM('art', 'design', 'other', 'unsure') NOT NULL,
description TEXT NOT NULL,
budget VARCHAR(100),
status ENUM('pending', 'accepted', 'in_progress', 'completed', 'rejected') DEFAULT 'pending',
rejection_reason TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
)
"""
await cls.execute(create_commissions)
|