diff options
Diffstat (limited to 'utils')
| -rw-r--r-- | utils/__init__.py | 4 | ||||
| -rw-r--r-- | utils/cooldowns.py | 40 | ||||
| -rw-r--r-- | utils/database.py | 89 |
3 files changed, 133 insertions, 0 deletions
diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..6a69aa0 --- /dev/null +++ b/utils/__init__.py @@ -0,0 +1,4 @@ +from .database import Database +from .cooldowns import CooldownManager + +__all__ = ["Database", "CooldownManager"] diff --git a/utils/cooldowns.py b/utils/cooldowns.py new file mode 100644 index 0000000..a1dc153 --- /dev/null +++ b/utils/cooldowns.py @@ -0,0 +1,40 @@ +import time +from typing import Dict, Optional +import config + + +class CooldownManager: + """In-memory cooldown tracking for anti-spam.""" + + def __init__(self, cooldown_seconds: Optional[int] = None): + self._cooldowns: Dict[int, float] = {} + self.cooldown_seconds = cooldown_seconds or config.COMMISSION_COOLDOWN_SECONDS + + def check(self, user_id: int) -> Optional[int]: + """ + Check if a user is on cooldown. + + Returns: + None if not on cooldown, otherwise seconds remaining. + """ + if user_id not in self._cooldowns: + return None + + elapsed = time.time() - self._cooldowns[user_id] + if elapsed >= self.cooldown_seconds: + del self._cooldowns[user_id] + return None + + return int(self.cooldown_seconds - elapsed) + + def set(self, user_id: int): + """Set cooldown for a user.""" + self._cooldowns[user_id] = time.time() + + def clear(self, user_id: int): + """Clear cooldown for a user.""" + self._cooldowns.pop(user_id, None) + + def is_on_cooldown(self, user_id: int) -> bool: + """Check if user is currently on cooldown.""" + return self.check(user_id) is not None diff --git a/utils/database.py b/utils/database.py new file mode 100644 index 0000000..e234331 --- /dev/null +++ b/utils/database.py @@ -0,0 +1,89 @@ +import aiomysql +from typing import Optional, List, Dict, Any +import config + + +class Database: + """MariaDB connection pool manager for async operations.""" + + _pool: Optional[aiomysql.Pool] = None + + @classmethod + async def get_pool(cls) -> aiomysql.Pool: + """Get or create the connection pool.""" + if cls._pool is None or cls._pool.closed: + cls._pool = await aiomysql.create_pool( + host=config.DB_HOST, + port=config.DB_PORT, + user=config.DB_USER, + password=config.DB_PASSWORD, + db=config.DB_NAME, + autocommit=True, + minsize=1, + maxsize=10, + ) + return cls._pool + + @classmethod + async def close(cls): + """Close the connection pool.""" + if cls._pool is not None and not cls._pool.closed: + cls._pool.close() + await cls._pool.wait_closed() + cls._pool = None + + @classmethod + async def execute(cls, query: str, args: tuple = ()) -> int: + """Execute a query and return the number of affected rows.""" + pool = await cls.get_pool() + async with pool.acquire() as conn: + async with conn.cursor() as cur: + await cur.execute(query, args) + return cur.rowcount + + @classmethod + async def execute_returning_id(cls, query: str, args: tuple = ()) -> int: + """Execute an INSERT query and return the last inserted ID.""" + pool = await cls.get_pool() + async with pool.acquire() as conn: + async with conn.cursor() as cur: + await cur.execute(query, args) + return cur.lastrowid + + @classmethod + async def fetchone(cls, query: str, args: tuple = ()) -> Optional[Dict[str, Any]]: + """Fetch a single row as a dictionary.""" + pool = await cls.get_pool() + async with pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute(query, args) + return await cur.fetchone() + + @classmethod + async def fetchall(cls, query: str, args: tuple = ()) -> List[Dict[str, Any]]: + """Fetch all rows as a list of dictionaries.""" + pool = await cls.get_pool() + async with pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute(query, args) + return await cur.fetchall() + + @classmethod + async def init_tables(cls): + """Initialize database tables if they don't exist.""" + create_commissions = """ + CREATE TABLE IF NOT EXISTS commissions ( + id INT AUTO_INCREMENT PRIMARY KEY, + discord_user_id BIGINT, + email VARCHAR(255) NOT NULL, + name VARCHAR(255) NOT NULL, + commission_type ENUM('art', 'design', 'other', 'unsure') NOT NULL, + description TEXT NOT NULL, + budget VARCHAR(100), + status ENUM('pending', 'accepted', 'in_progress', 'completed', 'rejected') DEFAULT 'pending', + rejection_reason TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP + ) + """ + await cls.execute(create_commissions) |
