aboutsummaryrefslogtreecommitdiff
path: root/utils/database.py
blob: e234331848ca2d3f4929dac0042a3352d80eda77 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import aiomysql
from typing import Optional, List, Dict, Any
import config


class Database:
    """MariaDB connection pool manager for async operations."""

    _pool: Optional[aiomysql.Pool] = None

    @classmethod
    async def get_pool(cls) -> aiomysql.Pool:
        """Get or create the connection pool."""
        if cls._pool is None or cls._pool.closed:
            cls._pool = await aiomysql.create_pool(
                host=config.DB_HOST,
                port=config.DB_PORT,
                user=config.DB_USER,
                password=config.DB_PASSWORD,
                db=config.DB_NAME,
                autocommit=True,
                minsize=1,
                maxsize=10,
            )
        return cls._pool

    @classmethod
    async def close(cls):
        """Close the connection pool."""
        if cls._pool is not None and not cls._pool.closed:
            cls._pool.close()
            await cls._pool.wait_closed()
            cls._pool = None

    @classmethod
    async def execute(cls, query: str, args: tuple = ()) -> int:
        """Execute a query and return the number of affected rows."""
        pool = await cls.get_pool()
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                await cur.execute(query, args)
                return cur.rowcount

    @classmethod
    async def execute_returning_id(cls, query: str, args: tuple = ()) -> int:
        """Execute an INSERT query and return the last inserted ID."""
        pool = await cls.get_pool()
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                await cur.execute(query, args)
                return cur.lastrowid

    @classmethod
    async def fetchone(cls, query: str, args: tuple = ()) -> Optional[Dict[str, Any]]:
        """Fetch a single row as a dictionary."""
        pool = await cls.get_pool()
        async with pool.acquire() as conn:
            async with conn.cursor(aiomysql.DictCursor) as cur:
                await cur.execute(query, args)
                return await cur.fetchone()

    @classmethod
    async def fetchall(cls, query: str, args: tuple = ()) -> List[Dict[str, Any]]:
        """Fetch all rows as a list of dictionaries."""
        pool = await cls.get_pool()
        async with pool.acquire() as conn:
            async with conn.cursor(aiomysql.DictCursor) as cur:
                await cur.execute(query, args)
                return await cur.fetchall()

    @classmethod
    async def init_tables(cls):
        """Initialize database tables if they don't exist."""
        create_commissions = """
        CREATE TABLE IF NOT EXISTS commissions (
            id INT AUTO_INCREMENT PRIMARY KEY,
            discord_user_id BIGINT,
            email VARCHAR(255) NOT NULL,
            name VARCHAR(255) NOT NULL,
            commission_type ENUM('art', 'design', 'other', 'unsure') NOT NULL,
            description TEXT NOT NULL,
            budget VARCHAR(100),
            status ENUM('pending', 'accepted', 'in_progress', 'completed', 'rejected') DEFAULT 'pending',
            rejection_reason TEXT,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
        )
        """
        await cls.execute(create_commissions)