diff options
Diffstat (limited to 'utils/database.py')
| -rw-r--r-- | utils/database.py | 89 |
1 files changed, 89 insertions, 0 deletions
diff --git a/utils/database.py b/utils/database.py new file mode 100644 index 0000000..e234331 --- /dev/null +++ b/utils/database.py @@ -0,0 +1,89 @@ +import aiomysql +from typing import Optional, List, Dict, Any +import config + + +class Database: + """MariaDB connection pool manager for async operations.""" + + _pool: Optional[aiomysql.Pool] = None + + @classmethod + async def get_pool(cls) -> aiomysql.Pool: + """Get or create the connection pool.""" + if cls._pool is None or cls._pool.closed: + cls._pool = await aiomysql.create_pool( + host=config.DB_HOST, + port=config.DB_PORT, + user=config.DB_USER, + password=config.DB_PASSWORD, + db=config.DB_NAME, + autocommit=True, + minsize=1, + maxsize=10, + ) + return cls._pool + + @classmethod + async def close(cls): + """Close the connection pool.""" + if cls._pool is not None and not cls._pool.closed: + cls._pool.close() + await cls._pool.wait_closed() + cls._pool = None + + @classmethod + async def execute(cls, query: str, args: tuple = ()) -> int: + """Execute a query and return the number of affected rows.""" + pool = await cls.get_pool() + async with pool.acquire() as conn: + async with conn.cursor() as cur: + await cur.execute(query, args) + return cur.rowcount + + @classmethod + async def execute_returning_id(cls, query: str, args: tuple = ()) -> int: + """Execute an INSERT query and return the last inserted ID.""" + pool = await cls.get_pool() + async with pool.acquire() as conn: + async with conn.cursor() as cur: + await cur.execute(query, args) + return cur.lastrowid + + @classmethod + async def fetchone(cls, query: str, args: tuple = ()) -> Optional[Dict[str, Any]]: + """Fetch a single row as a dictionary.""" + pool = await cls.get_pool() + async with pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute(query, args) + return await cur.fetchone() + + @classmethod + async def fetchall(cls, query: str, args: tuple = ()) -> List[Dict[str, Any]]: + """Fetch all rows as a list of dictionaries.""" + pool = await cls.get_pool() + async with pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute(query, args) + return await cur.fetchall() + + @classmethod + async def init_tables(cls): + """Initialize database tables if they don't exist.""" + create_commissions = """ + CREATE TABLE IF NOT EXISTS commissions ( + id INT AUTO_INCREMENT PRIMARY KEY, + discord_user_id BIGINT, + email VARCHAR(255) NOT NULL, + name VARCHAR(255) NOT NULL, + commission_type ENUM('art', 'design', 'other', 'unsure') NOT NULL, + description TEXT NOT NULL, + budget VARCHAR(100), + status ENUM('pending', 'accepted', 'in_progress', 'completed', 'rejected') DEFAULT 'pending', + rejection_reason TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP + ) + """ + await cls.execute(create_commissions) |
