aboutsummaryrefslogtreecommitdiff
path: root/utils/database.py
diff options
context:
space:
mode:
Diffstat (limited to 'utils/database.py')
-rw-r--r--utils/database.py89
1 files changed, 89 insertions, 0 deletions
diff --git a/utils/database.py b/utils/database.py
new file mode 100644
index 0000000..e234331
--- /dev/null
+++ b/utils/database.py
@@ -0,0 +1,89 @@
+import aiomysql
+from typing import Optional, List, Dict, Any
+import config
+
+
+class Database:
+ """MariaDB connection pool manager for async operations."""
+
+ _pool: Optional[aiomysql.Pool] = None
+
+ @classmethod
+ async def get_pool(cls) -> aiomysql.Pool:
+ """Get or create the connection pool."""
+ if cls._pool is None or cls._pool.closed:
+ cls._pool = await aiomysql.create_pool(
+ host=config.DB_HOST,
+ port=config.DB_PORT,
+ user=config.DB_USER,
+ password=config.DB_PASSWORD,
+ db=config.DB_NAME,
+ autocommit=True,
+ minsize=1,
+ maxsize=10,
+ )
+ return cls._pool
+
+ @classmethod
+ async def close(cls):
+ """Close the connection pool."""
+ if cls._pool is not None and not cls._pool.closed:
+ cls._pool.close()
+ await cls._pool.wait_closed()
+ cls._pool = None
+
+ @classmethod
+ async def execute(cls, query: str, args: tuple = ()) -> int:
+ """Execute a query and return the number of affected rows."""
+ pool = await cls.get_pool()
+ async with pool.acquire() as conn:
+ async with conn.cursor() as cur:
+ await cur.execute(query, args)
+ return cur.rowcount
+
+ @classmethod
+ async def execute_returning_id(cls, query: str, args: tuple = ()) -> int:
+ """Execute an INSERT query and return the last inserted ID."""
+ pool = await cls.get_pool()
+ async with pool.acquire() as conn:
+ async with conn.cursor() as cur:
+ await cur.execute(query, args)
+ return cur.lastrowid
+
+ @classmethod
+ async def fetchone(cls, query: str, args: tuple = ()) -> Optional[Dict[str, Any]]:
+ """Fetch a single row as a dictionary."""
+ pool = await cls.get_pool()
+ async with pool.acquire() as conn:
+ async with conn.cursor(aiomysql.DictCursor) as cur:
+ await cur.execute(query, args)
+ return await cur.fetchone()
+
+ @classmethod
+ async def fetchall(cls, query: str, args: tuple = ()) -> List[Dict[str, Any]]:
+ """Fetch all rows as a list of dictionaries."""
+ pool = await cls.get_pool()
+ async with pool.acquire() as conn:
+ async with conn.cursor(aiomysql.DictCursor) as cur:
+ await cur.execute(query, args)
+ return await cur.fetchall()
+
+ @classmethod
+ async def init_tables(cls):
+ """Initialize database tables if they don't exist."""
+ create_commissions = """
+ CREATE TABLE IF NOT EXISTS commissions (
+ id INT AUTO_INCREMENT PRIMARY KEY,
+ discord_user_id BIGINT,
+ email VARCHAR(255) NOT NULL,
+ name VARCHAR(255) NOT NULL,
+ commission_type ENUM('art', 'design', 'other', 'unsure') NOT NULL,
+ description TEXT NOT NULL,
+ budget VARCHAR(100),
+ status ENUM('pending', 'accepted', 'in_progress', 'completed', 'rejected') DEFAULT 'pending',
+ rejection_reason TEXT,
+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+ updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
+ )
+ """
+ await cls.execute(create_commissions)