Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 4ef4bdf309 | |||
| 7c6249b120 | |||
| 09fa7988f1 | |||
| b3d618b337 |
67
.env_example
Normal file
67
.env_example
Normal file
@@ -0,0 +1,67 @@
|
|||||||
|
# Groovy-Zilean Configuration
|
||||||
|
# Copy this file to .env and fill in your values
|
||||||
|
# NEVER commit .env to git!
|
||||||
|
|
||||||
|
# ===================================
|
||||||
|
# Environment Selection
|
||||||
|
# ===================================
|
||||||
|
# Set to "dev" for development bot, "live" for production
|
||||||
|
ENVIRONMENT=dev
|
||||||
|
|
||||||
|
# ===================================
|
||||||
|
# Discord Bot Tokens
|
||||||
|
# ===================================
|
||||||
|
DISCORD_TOKEN_DEV=
|
||||||
|
DISCORD_TOKEN_LIVE=
|
||||||
|
|
||||||
|
# Bot settings
|
||||||
|
DISCORD_PREFIX=
|
||||||
|
|
||||||
|
# ===================================
|
||||||
|
# Spotify Integration
|
||||||
|
# ===================================
|
||||||
|
SPOTIFY_CLIENT_ID=
|
||||||
|
SPOTIFY_CLIENT_SECRET=
|
||||||
|
|
||||||
|
# ===================================
|
||||||
|
# Database
|
||||||
|
# ===================================
|
||||||
|
# For now (SQLite)
|
||||||
|
DB_PATH=./data/music.db
|
||||||
|
|
||||||
|
# For future (PostgreSQL)
|
||||||
|
# DB_HOST=localhost
|
||||||
|
# DB_PORT=5432
|
||||||
|
# DB_NAME=groovy_zilean
|
||||||
|
# DB_USER=groovy
|
||||||
|
# DB_PASSWORD=your_db_password
|
||||||
|
|
||||||
|
# ===================================
|
||||||
|
# Bot Status/Presence
|
||||||
|
# ===================================
|
||||||
|
# Types: playing, listening, watching, streaming, competing
|
||||||
|
STATUS_TYPE=listening
|
||||||
|
STATUS_TEXT==help | /help
|
||||||
|
# STATUS_URL= # Only needed for streaming type
|
||||||
|
|
||||||
|
# ===================================
|
||||||
|
# Color Scheme (hex colors)
|
||||||
|
# ===================================
|
||||||
|
COLOR_PRIMARY=#7289DA
|
||||||
|
COLOR_SUCCESS=#43B581
|
||||||
|
COLOR_ERROR=#F04747
|
||||||
|
COLOR_WARNING=#FAA61A
|
||||||
|
|
||||||
|
# ===================================
|
||||||
|
# Web Dashboard (Future)
|
||||||
|
# ===================================
|
||||||
|
# DISCORD_CLIENT_ID=your_oauth_client_id
|
||||||
|
# DISCORD_CLIENT_SECRET=your_oauth_secret
|
||||||
|
# DISCORD_REDIRECT_URI=http://localhost:8000/callback
|
||||||
|
# WEB_SECRET_KEY=random_secret_for_sessions
|
||||||
|
|
||||||
|
# ===================================
|
||||||
|
# Logging
|
||||||
|
# ===================================
|
||||||
|
LOG_LEVEL=INFO
|
||||||
|
# Options: DEBUG, INFO, WARNING, ERROR, CRITICAL
|
||||||
2
.gitignore
vendored
2
.gitignore
vendored
@@ -160,4 +160,4 @@ cython_debug/
|
|||||||
#.idea/
|
#.idea/
|
||||||
|
|
||||||
# My stuff
|
# My stuff
|
||||||
data/
|
data/*.db
|
||||||
|
|||||||
201
SETUP.md
Normal file
201
SETUP.md
Normal file
@@ -0,0 +1,201 @@
|
|||||||
|
# Groovy-Zilean Setup Guide
|
||||||
|
|
||||||
|
Quick start guide for getting groovy-zilean running locally or in production.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Prerequisites
|
||||||
|
|
||||||
|
- **Python 3.11+** (3.9 deprecated by yt-dlp)
|
||||||
|
- **FFmpeg** installed on your system
|
||||||
|
- Discord Bot Token ([Discord Developer Portal](https://discord.com/developers/applications))
|
||||||
|
- Spotify API Credentials ([Spotify Developer Dashboard](https://developer.spotify.com/dashboard))
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 1. Clone & Setup Environment
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Clone the repository
|
||||||
|
git clone <your-repo-url>
|
||||||
|
cd groovy-zilean
|
||||||
|
|
||||||
|
# Create virtual environment (keeps dependencies isolated)
|
||||||
|
python3.12 -m venv venv
|
||||||
|
|
||||||
|
# Activate virtual environment
|
||||||
|
source venv/bin/activate # Linux/Mac
|
||||||
|
# OR
|
||||||
|
venv\Scripts\activate # Windows
|
||||||
|
|
||||||
|
# Install dependencies
|
||||||
|
pip install --upgrade pip
|
||||||
|
pip install -r requirements.txt
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 2. Configuration
|
||||||
|
|
||||||
|
### Create `.env` file
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Copy the example file
|
||||||
|
cp .env.example .env
|
||||||
|
|
||||||
|
# Edit with your favorite editor
|
||||||
|
nano .env
|
||||||
|
# OR
|
||||||
|
vim .env
|
||||||
|
# OR
|
||||||
|
code .env # VS Code
|
||||||
|
```
|
||||||
|
|
||||||
|
### Fill in Required Values
|
||||||
|
|
||||||
|
At minimum, you need:
|
||||||
|
|
||||||
|
```env
|
||||||
|
# Choose environment: "dev" or "live"
|
||||||
|
ENVIRONMENT=dev
|
||||||
|
|
||||||
|
# Discord bot tokens (get from Discord Developer Portal)
|
||||||
|
DISCORD_TOKEN_DEV=your_dev_bot_token_here
|
||||||
|
DISCORD_TOKEN_LIVE=your_live_bot_token_here
|
||||||
|
|
||||||
|
# Spotify credentials (get from Spotify Developer Dashboard)
|
||||||
|
SPOTIFY_CLIENT_ID=your_spotify_client_id
|
||||||
|
SPOTIFY_CLIENT_SECRET=your_spotify_secret
|
||||||
|
```
|
||||||
|
|
||||||
|
**Optional but recommended:**
|
||||||
|
```env
|
||||||
|
# Bot settings
|
||||||
|
DISCORD_PREFIX==
|
||||||
|
STATUS_TYPE=listening
|
||||||
|
STATUS_TEXT=Zilean's Theme
|
||||||
|
|
||||||
|
# Colors (hex format)
|
||||||
|
COLOR_PRIMARY=#7289DA
|
||||||
|
COLOR_SUCCESS=#43B581
|
||||||
|
COLOR_ERROR=#F04747
|
||||||
|
COLOR_WARNING=#FAA61A
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 3. Create Data Directory
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Create directory for database
|
||||||
|
mkdir -p data
|
||||||
|
|
||||||
|
# The database file (music.db) will be created automatically on first run
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 4. Run the Bot
|
||||||
|
|
||||||
|
### Development Mode
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Make sure .env has ENVIRONMENT=dev
|
||||||
|
source venv/bin/activate # If not already activated
|
||||||
|
python main.py
|
||||||
|
```
|
||||||
|
|
||||||
|
### Production Mode
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Change .env to ENVIRONMENT=live
|
||||||
|
source venv/bin/activate
|
||||||
|
python main.py
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 5. Switching Between Dev and Live
|
||||||
|
|
||||||
|
**Super easy!** Just change one line in `.env`:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# For development bot
|
||||||
|
ENVIRONMENT=dev
|
||||||
|
|
||||||
|
# For production bot
|
||||||
|
ENVIRONMENT=live
|
||||||
|
```
|
||||||
|
|
||||||
|
The bot will automatically use the correct token!
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Troubleshooting
|
||||||
|
|
||||||
|
### "Configuration Error: DISCORD_TOKEN_DEV not found"
|
||||||
|
- Make sure you copied `.env.example` to `.env`
|
||||||
|
- Check that `.env` has the token values filled in
|
||||||
|
- Token should NOT have quotes around it
|
||||||
|
|
||||||
|
### "No module named 'dotenv'"
|
||||||
|
```bash
|
||||||
|
pip install python-dotenv
|
||||||
|
```
|
||||||
|
|
||||||
|
### "FFmpeg not found"
|
||||||
|
```bash
|
||||||
|
# Debian/Ubuntu
|
||||||
|
sudo apt install ffmpeg
|
||||||
|
|
||||||
|
# macOS
|
||||||
|
brew install ffmpeg
|
||||||
|
|
||||||
|
# Arch Linux
|
||||||
|
sudo pacman -S ffmpeg
|
||||||
|
```
|
||||||
|
|
||||||
|
### Python version issues
|
||||||
|
yt-dlp requires Python 3.10+. Check your version:
|
||||||
|
```bash
|
||||||
|
python --version
|
||||||
|
```
|
||||||
|
|
||||||
|
If too old, install newer Python and recreate venv:
|
||||||
|
```bash
|
||||||
|
python3.12 -m venv venv
|
||||||
|
source venv/bin/activate
|
||||||
|
pip install -r requirements.txt
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Project Structure
|
||||||
|
|
||||||
|
```
|
||||||
|
groovy-zilean/
|
||||||
|
├── main.py # Entry point (run this!)
|
||||||
|
├── bot.py # Bot class definition
|
||||||
|
├── config.py # Configuration management
|
||||||
|
├── .env # YOUR secrets (never commit!)
|
||||||
|
├── .env.example # Template (safe to commit)
|
||||||
|
├── requirements.txt # Python dependencies
|
||||||
|
├── cogs/
|
||||||
|
│ └── music/ # Music functionality
|
||||||
|
│ ├── main.py # Commands
|
||||||
|
│ ├── queue.py # Queue management
|
||||||
|
│ ├── util.py # Utilities
|
||||||
|
│ └── translate.py # URL/search handling
|
||||||
|
└── data/
|
||||||
|
└── music.db # SQLite database (auto-created)
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Next Steps
|
||||||
|
|
||||||
|
- Check out `PRODUCTION_ROADMAP.md` for the full development plan
|
||||||
|
- See `README.md` for feature list and usage
|
||||||
|
- Join your test server and try commands!
|
||||||
|
|
||||||
|
**Happy coding!** 🎵⏱️
|
||||||
25
bot.py
25
bot.py
@@ -1,20 +1,30 @@
|
|||||||
|
"""
|
||||||
|
Groovy-Zilean Bot Class
|
||||||
|
Main bot implementation with cog loading and background tasks
|
||||||
|
"""
|
||||||
|
|
||||||
|
from typing import Any
|
||||||
from discord.ext import commands
|
from discord.ext import commands
|
||||||
from discord.ext import tasks
|
from discord.ext import tasks
|
||||||
from cogs.music.main import music
|
from cogs.music.main import music
|
||||||
from help import GroovyHelp # Import the new Help Cog
|
from help import GroovyHelp
|
||||||
|
|
||||||
cogs = [
|
# List of cogs to load on startup
|
||||||
|
cogs: list[type[commands.Cog]] = [
|
||||||
music,
|
music,
|
||||||
GroovyHelp
|
GroovyHelp
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
class Groovy(commands.Bot):
|
class Groovy(commands.Bot):
|
||||||
def __init__(self, *args, **kwargs):
|
"""Custom bot class with automatic cog loading and inactivity checking"""
|
||||||
|
|
||||||
|
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
||||||
# We force help_command to None because we are using a custom Cog for it
|
# We force help_command to None because we are using a custom Cog for it
|
||||||
# But we pass all other args (like command_prefix) to the parent
|
# But we pass all other args (like command_prefix) to the parent
|
||||||
super().__init__(*args, help_command=None, **kwargs)
|
super().__init__(*args, help_command=None, **kwargs)
|
||||||
|
|
||||||
async def on_ready(self):
|
async def on_ready(self) -> None:
|
||||||
import config # Imported here to avoid circular dependencies if any
|
import config # Imported here to avoid circular dependencies if any
|
||||||
|
|
||||||
# Set status
|
# Set status
|
||||||
@@ -47,11 +57,12 @@ class Groovy(commands.Bot):
|
|||||||
print(f"✅ {self.user} is ready and online!")
|
print(f"✅ {self.user} is ready and online!")
|
||||||
|
|
||||||
@tasks.loop(seconds=30)
|
@tasks.loop(seconds=30)
|
||||||
async def inactivity_checker(self):
|
async def inactivity_checker(self) -> None:
|
||||||
"""Check for inactive voice connections"""
|
"""Check for inactive voice connections every 30 seconds"""
|
||||||
from cogs.music import util
|
from cogs.music import util
|
||||||
await util.check_inactivity(self)
|
await util.check_inactivity(self)
|
||||||
|
|
||||||
@inactivity_checker.before_loop
|
@inactivity_checker.before_loop
|
||||||
async def before_inactivity_checker(self):
|
async def before_inactivity_checker(self) -> None:
|
||||||
|
"""Wait for bot to be ready before starting inactivity checker"""
|
||||||
await self.wait_until_ready()
|
await self.wait_until_ready()
|
||||||
|
|||||||
315
cogs/music/db_manager.py
Normal file
315
cogs/music/db_manager.py
Normal file
@@ -0,0 +1,315 @@
|
|||||||
|
"""
|
||||||
|
Database Manager for Groovy-Zilean
|
||||||
|
Centralizes all database operations and provides a clean interface.
|
||||||
|
Makes future PostgreSQL migration much easier.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import sqlite3
|
||||||
|
from contextlib import contextmanager
|
||||||
|
from typing import Optional, List, Tuple, Any
|
||||||
|
import config
|
||||||
|
|
||||||
|
|
||||||
|
class DatabaseManager:
|
||||||
|
"""Manages database connections and operations"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.db_path = config.get_db_path()
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def get_connection(self):
|
||||||
|
"""
|
||||||
|
Context manager for database connections.
|
||||||
|
Automatically handles commit/rollback and closing.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
with db.get_connection() as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.execute(...)
|
||||||
|
"""
|
||||||
|
conn = sqlite3.connect(self.db_path)
|
||||||
|
try:
|
||||||
|
yield conn
|
||||||
|
conn.commit()
|
||||||
|
except Exception as e:
|
||||||
|
conn.rollback()
|
||||||
|
raise e
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
def initialize_tables(self):
|
||||||
|
"""Create database tables if they don't exist"""
|
||||||
|
with self.get_connection() as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
|
||||||
|
# Create servers table
|
||||||
|
cursor.execute('''CREATE TABLE IF NOT EXISTS servers (
|
||||||
|
server_id TEXT PRIMARY KEY,
|
||||||
|
is_playing INTEGER DEFAULT 0,
|
||||||
|
song_name TEXT,
|
||||||
|
song_url TEXT,
|
||||||
|
song_thumbnail TEXT,
|
||||||
|
loop_mode TEXT DEFAULT 'off',
|
||||||
|
volume INTEGER DEFAULT 100,
|
||||||
|
effect TEXT DEFAULT 'none',
|
||||||
|
song_start_time REAL DEFAULT 0,
|
||||||
|
song_duration INTEGER DEFAULT 0
|
||||||
|
);''')
|
||||||
|
|
||||||
|
# Set all to not playing on startup
|
||||||
|
cursor.execute("UPDATE servers SET is_playing = 0;")
|
||||||
|
|
||||||
|
# Migrations for existing databases - add columns if missing
|
||||||
|
migrations = [
|
||||||
|
("loop_mode", "TEXT DEFAULT 'off'"),
|
||||||
|
("volume", "INTEGER DEFAULT 100"),
|
||||||
|
("effect", "TEXT DEFAULT 'none'"),
|
||||||
|
("song_start_time", "REAL DEFAULT 0"),
|
||||||
|
("song_duration", "INTEGER DEFAULT 0"),
|
||||||
|
("song_thumbnail", "TEXT DEFAULT ''"),
|
||||||
|
("song_url", "TEXT DEFAULT ''")
|
||||||
|
]
|
||||||
|
|
||||||
|
for col_name, col_type in migrations:
|
||||||
|
try:
|
||||||
|
cursor.execute(f"ALTER TABLE servers ADD COLUMN {col_name} {col_type};")
|
||||||
|
except sqlite3.OperationalError:
|
||||||
|
# Column already exists, skip
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Create songs/queue table
|
||||||
|
cursor.execute('''CREATE TABLE IF NOT EXISTS songs (
|
||||||
|
server_id TEXT NOT NULL,
|
||||||
|
song_link TEXT,
|
||||||
|
queued_by TEXT,
|
||||||
|
position INTEGER NOT NULL,
|
||||||
|
title TEXT,
|
||||||
|
thumbnail TEXT,
|
||||||
|
duration INTEGER,
|
||||||
|
PRIMARY KEY (position),
|
||||||
|
FOREIGN KEY (server_id) REFERENCES servers(server_id)
|
||||||
|
);''')
|
||||||
|
|
||||||
|
# Clear all songs on startup
|
||||||
|
cursor.execute("DELETE FROM songs;")
|
||||||
|
|
||||||
|
# ===================================
|
||||||
|
# Server Operations
|
||||||
|
# ===================================
|
||||||
|
|
||||||
|
def ensure_server_exists(self, server_id: str) -> None:
|
||||||
|
"""Add server to database if it doesn't exist"""
|
||||||
|
with self.get_connection() as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.execute('SELECT COUNT(*) FROM servers WHERE server_id = ?', (server_id,))
|
||||||
|
if cursor.fetchone()[0] == 0:
|
||||||
|
cursor.execute('''INSERT INTO servers (server_id, loop_mode, volume, effect, song_thumbnail, song_url)
|
||||||
|
VALUES (?, 'off', 100, 'none', '', '')''', (server_id,))
|
||||||
|
|
||||||
|
def set_server_playing(self, server_id: str, playing: bool) -> None:
|
||||||
|
"""Update server playing status"""
|
||||||
|
with self.get_connection() as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
self.ensure_server_exists(server_id)
|
||||||
|
val = 1 if playing else 0
|
||||||
|
cursor.execute("UPDATE servers SET is_playing = ? WHERE server_id = ?", (val, server_id))
|
||||||
|
|
||||||
|
def is_server_playing(self, server_id: str) -> bool:
|
||||||
|
"""Check if server is currently playing"""
|
||||||
|
with self.get_connection() as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
self.ensure_server_exists(server_id)
|
||||||
|
cursor.execute("SELECT is_playing FROM servers WHERE server_id = ?", (server_id,))
|
||||||
|
res = cursor.fetchone()
|
||||||
|
return True if res and res[0] == 1 else False
|
||||||
|
|
||||||
|
def set_current_song(self, server_id: str, title: str, url: str, thumbnail: str = "", duration: int = 0, start_time: float = 0) -> None:
|
||||||
|
"""Update currently playing song information"""
|
||||||
|
with self.get_connection() as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
# Ensure duration is an integer
|
||||||
|
try:
|
||||||
|
duration = int(duration)
|
||||||
|
except:
|
||||||
|
duration = 0
|
||||||
|
|
||||||
|
cursor.execute(''' UPDATE servers
|
||||||
|
SET song_name = ?, song_url = ?, song_thumbnail = ?, song_start_time = ?, song_duration = ?
|
||||||
|
WHERE server_id = ?''',
|
||||||
|
(title, url, thumbnail, start_time, duration, server_id))
|
||||||
|
|
||||||
|
def get_current_song(self, server_id: str) -> dict:
|
||||||
|
"""Get current song info"""
|
||||||
|
with self.get_connection() as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.execute(''' SELECT song_name, song_thumbnail, song_url FROM servers WHERE server_id = ? LIMIT 1;''', (server_id,))
|
||||||
|
result = cursor.fetchone()
|
||||||
|
|
||||||
|
if result:
|
||||||
|
return {'title': result[0], 'thumbnail': result[1], 'url': result[2]}
|
||||||
|
return {'title': "Nothing", 'thumbnail': None, 'url': ''}
|
||||||
|
|
||||||
|
def get_current_progress(self, server_id: str) -> Tuple[int, int, float]:
|
||||||
|
"""Get playback progress (elapsed, duration, percentage)"""
|
||||||
|
import time
|
||||||
|
|
||||||
|
with self.get_connection() as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.execute('''SELECT song_start_time, song_duration, is_playing FROM servers WHERE server_id = ? LIMIT 1;''', (server_id,))
|
||||||
|
result = cursor.fetchone()
|
||||||
|
|
||||||
|
if not result or result[2] == 0:
|
||||||
|
return 0, 0, 0.0
|
||||||
|
|
||||||
|
start_time, duration, _ = result
|
||||||
|
|
||||||
|
if duration is None or duration == 0:
|
||||||
|
return 0, 0, 0.0
|
||||||
|
|
||||||
|
elapsed = int(time.time() - start_time)
|
||||||
|
elapsed = min(elapsed, duration)
|
||||||
|
percentage = (elapsed / duration) * 100 if duration > 0 else 0
|
||||||
|
|
||||||
|
return elapsed, duration, percentage
|
||||||
|
|
||||||
|
# ===================================
|
||||||
|
# Queue Operations
|
||||||
|
# ===================================
|
||||||
|
|
||||||
|
def add_song(self, server_id: str, song_link: str, queued_by: str, title: str, thumbnail: str = "", duration: int = 0, position: Optional[int] = None) -> int:
|
||||||
|
"""Add song to queue, returns position"""
|
||||||
|
with self.get_connection() as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
self.ensure_server_exists(server_id)
|
||||||
|
|
||||||
|
if position is None:
|
||||||
|
# Add to end
|
||||||
|
cursor.execute("SELECT MAX(position) FROM songs WHERE server_id = ?", (server_id,))
|
||||||
|
max_pos = cursor.fetchone()[0]
|
||||||
|
position = (max_pos + 1) if max_pos is not None else 0
|
||||||
|
else:
|
||||||
|
# Insert at specific position (shift others down)
|
||||||
|
cursor.execute("UPDATE songs SET position = position + 1 WHERE server_id = ? AND position >= ?",
|
||||||
|
(server_id, position))
|
||||||
|
|
||||||
|
cursor.execute("""INSERT INTO songs VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
||||||
|
(server_id, song_link, queued_by, position, title, thumbnail, duration))
|
||||||
|
|
||||||
|
return position
|
||||||
|
|
||||||
|
def get_next_song(self, server_id: str) -> Optional[Tuple]:
|
||||||
|
"""Get the next song in queue (doesn't remove it)"""
|
||||||
|
with self.get_connection() as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.execute('''SELECT * FROM songs WHERE server_id = ? ORDER BY position LIMIT 1;''', (server_id,))
|
||||||
|
return cursor.fetchone()
|
||||||
|
|
||||||
|
def remove_song(self, server_id: str, position: int) -> None:
|
||||||
|
"""Remove song at position from queue"""
|
||||||
|
with self.get_connection() as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.execute('''DELETE FROM songs WHERE server_id = ? AND position = ?''', (server_id, position))
|
||||||
|
|
||||||
|
def get_queue(self, server_id: str, limit: int = 10) -> Tuple[int, List[Tuple]]:
|
||||||
|
"""Get songs in queue (returns max_position, list of songs)"""
|
||||||
|
with self.get_connection() as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
self.ensure_server_exists(server_id)
|
||||||
|
|
||||||
|
cursor.execute("SELECT title, duration, queued_by FROM songs WHERE server_id = ? ORDER BY position LIMIT ?",
|
||||||
|
(server_id, limit))
|
||||||
|
songs = cursor.fetchall()
|
||||||
|
|
||||||
|
cursor.execute("SELECT MAX(position) FROM songs WHERE server_id = ?", (server_id,))
|
||||||
|
max_pos = cursor.fetchone()[0]
|
||||||
|
max_pos = max_pos if max_pos is not None else -1
|
||||||
|
|
||||||
|
return max_pos, songs
|
||||||
|
|
||||||
|
def clear_queue(self, server_id: str) -> None:
|
||||||
|
"""Clear all songs from queue"""
|
||||||
|
with self.get_connection() as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
self.ensure_server_exists(server_id)
|
||||||
|
cursor.execute("DELETE FROM songs WHERE server_id = ?", (server_id,))
|
||||||
|
|
||||||
|
def shuffle_queue(self, server_id: str) -> bool:
|
||||||
|
"""Shuffle the queue randomly, returns success"""
|
||||||
|
import random
|
||||||
|
|
||||||
|
with self.get_connection() as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
self.ensure_server_exists(server_id)
|
||||||
|
|
||||||
|
cursor.execute("SELECT position, song_link, queued_by, title, thumbnail, duration FROM songs WHERE server_id = ? ORDER BY position",
|
||||||
|
(server_id,))
|
||||||
|
songs = cursor.fetchall()
|
||||||
|
|
||||||
|
if len(songs) <= 1:
|
||||||
|
return False
|
||||||
|
|
||||||
|
random.shuffle(songs)
|
||||||
|
cursor.execute("DELETE FROM songs WHERE server_id = ?", (server_id,))
|
||||||
|
|
||||||
|
for i, s in enumerate(songs):
|
||||||
|
cursor.execute("INSERT INTO songs VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||||
|
(server_id, s[1], s[2], i, s[3], s[4], s[5]))
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
# ===================================
|
||||||
|
# Settings Operations
|
||||||
|
# ===================================
|
||||||
|
|
||||||
|
def get_loop_mode(self, server_id: str) -> str:
|
||||||
|
"""Get loop mode: 'off', 'song', or 'queue'"""
|
||||||
|
with self.get_connection() as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
self.ensure_server_exists(server_id)
|
||||||
|
cursor.execute("SELECT loop_mode FROM servers WHERE server_id = ?", (server_id,))
|
||||||
|
res = cursor.fetchone()
|
||||||
|
return res[0] if res else 'off'
|
||||||
|
|
||||||
|
def set_loop_mode(self, server_id: str, mode: str) -> None:
|
||||||
|
"""Set loop mode: 'off', 'song', or 'queue'"""
|
||||||
|
with self.get_connection() as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
self.ensure_server_exists(server_id)
|
||||||
|
cursor.execute("UPDATE servers SET loop_mode = ? WHERE server_id = ?", (mode, server_id))
|
||||||
|
|
||||||
|
def get_volume(self, server_id: str) -> int:
|
||||||
|
"""Get volume (0-200)"""
|
||||||
|
with self.get_connection() as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
self.ensure_server_exists(server_id)
|
||||||
|
cursor.execute("SELECT volume FROM servers WHERE server_id = ?", (server_id,))
|
||||||
|
res = cursor.fetchone()
|
||||||
|
return res[0] if res else 100
|
||||||
|
|
||||||
|
def set_volume(self, server_id: str, volume: int) -> int:
|
||||||
|
"""Set volume (0-200), returns the set volume"""
|
||||||
|
with self.get_connection() as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
self.ensure_server_exists(server_id)
|
||||||
|
cursor.execute("UPDATE servers SET volume = ? WHERE server_id = ?", (volume, server_id))
|
||||||
|
return volume
|
||||||
|
|
||||||
|
def get_effect(self, server_id: str) -> str:
|
||||||
|
"""Get current audio effect"""
|
||||||
|
with self.get_connection() as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
self.ensure_server_exists(server_id)
|
||||||
|
cursor.execute("SELECT effect FROM servers WHERE server_id = ?", (server_id,))
|
||||||
|
res = cursor.fetchone()
|
||||||
|
return res[0] if res else 'none'
|
||||||
|
|
||||||
|
def set_effect(self, server_id: str, effect: str) -> None:
|
||||||
|
"""Set audio effect"""
|
||||||
|
with self.get_connection() as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
self.ensure_server_exists(server_id)
|
||||||
|
cursor.execute("UPDATE servers SET effect = ? WHERE server_id = ?", (effect, server_id))
|
||||||
|
|
||||||
|
|
||||||
|
# Global instance
|
||||||
|
db = DatabaseManager()
|
||||||
@@ -12,28 +12,7 @@ from cogs.music.help import music_help
|
|||||||
|
|
||||||
import spotipy
|
import spotipy
|
||||||
from spotipy.oauth2 import SpotifyClientCredentials
|
from spotipy.oauth2 import SpotifyClientCredentials
|
||||||
|
import config # Use centralized config
|
||||||
|
|
||||||
# Fix this pls
|
|
||||||
|
|
||||||
import json
|
|
||||||
#from .. import config
|
|
||||||
# Read data from JSON file in ./data/config.json
|
|
||||||
def read_data():
|
|
||||||
with open("./data/config.json", "r") as file:
|
|
||||||
return json.load(file)
|
|
||||||
|
|
||||||
raise Exception("Could not load config data")
|
|
||||||
|
|
||||||
|
|
||||||
def get_spotify_creds():
|
|
||||||
data = read_data()
|
|
||||||
data = data.get("spotify")
|
|
||||||
|
|
||||||
SCID = data.get("SCID")
|
|
||||||
secret = data.get("SECRET")
|
|
||||||
|
|
||||||
return SCID, secret
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@@ -51,10 +30,13 @@ class music(commands.Cog):
|
|||||||
help_command.cog = self
|
help_command.cog = self
|
||||||
self.help_command = help_command
|
self.help_command = help_command
|
||||||
|
|
||||||
SCID, secret = get_spotify_creds()
|
# Get Spotify credentials from centralized config
|
||||||
|
spotify_id, spotify_secret = config.get_spotify_creds()
|
||||||
# Authentication - without user
|
# Authentication - without user
|
||||||
client_credentials_manager = SpotifyClientCredentials(client_id=SCID,
|
client_credentials_manager = SpotifyClientCredentials(
|
||||||
client_secret=secret)
|
client_id=spotify_id,
|
||||||
|
client_secret=spotify_secret
|
||||||
|
)
|
||||||
|
|
||||||
self.sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)
|
self.sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)
|
||||||
|
|
||||||
@@ -346,7 +328,7 @@ class music(commands.Cog):
|
|||||||
app_commands.Choice(name="Song", value="song"),
|
app_commands.Choice(name="Song", value="song"),
|
||||||
app_commands.Choice(name="Queue", value="queue")
|
app_commands.Choice(name="Queue", value="queue")
|
||||||
])
|
])
|
||||||
async def loop(self, ctx: Context, mode: str = None):
|
async def loop(self, ctx: Context, mode: str | None = None):
|
||||||
"""Toggle between loop modes or set a specific mode"""
|
"""Toggle between loop modes or set a specific mode"""
|
||||||
server = ctx.guild
|
server = ctx.guild
|
||||||
|
|
||||||
@@ -409,7 +391,7 @@ class music(commands.Cog):
|
|||||||
description="Set playback volume",
|
description="Set playback volume",
|
||||||
aliases=['vol', 'v'])
|
aliases=['vol', 'v'])
|
||||||
@app_commands.describe(level="Volume level (0-200%, default shows current)")
|
@app_commands.describe(level="Volume level (0-200%, default shows current)")
|
||||||
async def volume(self, ctx: Context, level: int = None):
|
async def volume(self, ctx: Context, level: int | None = None):
|
||||||
"""Set or display the current volume"""
|
"""Set or display the current volume"""
|
||||||
server = ctx.guild
|
server = ctx.guild
|
||||||
|
|
||||||
@@ -453,7 +435,7 @@ class music(commands.Cog):
|
|||||||
description="Apply audio effects to playback",
|
description="Apply audio effects to playback",
|
||||||
aliases=['fx', 'filter'])
|
aliases=['fx', 'filter'])
|
||||||
@app_commands.describe(effect_name="The audio effect to apply (leave empty to see list)")
|
@app_commands.describe(effect_name="The audio effect to apply (leave empty to see list)")
|
||||||
async def effect(self, ctx: Context, effect_name: str = None):
|
async def effect(self, ctx: Context, effect_name: str | None = None):
|
||||||
"""Apply or list audio effects"""
|
"""Apply or list audio effects"""
|
||||||
server = ctx.guild
|
server = ctx.guild
|
||||||
|
|
||||||
|
|||||||
@@ -1,14 +1,14 @@
|
|||||||
from http import server
|
"""
|
||||||
import sqlite3
|
Queue management for Groovy-Zilean music bot
|
||||||
import random
|
Now using centralized database manager for cleaner code
|
||||||
import time
|
"""
|
||||||
|
|
||||||
import discord
|
import discord
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import time
|
||||||
|
|
||||||
from .translate import search_song
|
from .translate import search_song
|
||||||
|
from .db_manager import db
|
||||||
db_path = "./data/music.db"
|
|
||||||
|
|
||||||
# Base FFmpeg options (will be modified by effects)
|
# Base FFmpeg options (will be modified by effects)
|
||||||
BASE_FFMPEG_OPTS = {
|
BASE_FFMPEG_OPTS = {
|
||||||
@@ -103,342 +103,224 @@ def get_effect_options(effect_name):
|
|||||||
return effects.get(effect_name, effects['none'])
|
return effects.get(effect_name, effects['none'])
|
||||||
|
|
||||||
|
|
||||||
# Creates the tables if they don't exist
|
# ===================================
|
||||||
|
# Initialization
|
||||||
|
# ===================================
|
||||||
|
|
||||||
def initialize_tables():
|
def initialize_tables():
|
||||||
# Connect to the database
|
"""Initialize database tables"""
|
||||||
conn = sqlite3.connect(db_path)
|
db.initialize_tables()
|
||||||
cursor = conn.cursor()
|
|
||||||
|
|
||||||
# Create servers table if it doesn't exist
|
|
||||||
cursor.execute('''CREATE TABLE IF NOT EXISTS servers (
|
|
||||||
server_id TEXT PRIMARY KEY,
|
|
||||||
is_playing INTEGER DEFAULT 0,
|
|
||||||
song_name TEXT,
|
|
||||||
song_url TEXT,
|
|
||||||
song_thumbnail TEXT,
|
|
||||||
loop_mode TEXT DEFAULT 'off',
|
|
||||||
volume INTEGER DEFAULT 100,
|
|
||||||
effect TEXT DEFAULT 'none',
|
|
||||||
song_start_time REAL DEFAULT 0,
|
|
||||||
song_duration INTEGER DEFAULT 0
|
|
||||||
);''')
|
|
||||||
|
|
||||||
# Set all to not playing
|
# ===================================
|
||||||
cursor.execute("UPDATE servers SET is_playing = 0;")
|
# Queue Management
|
||||||
|
# ===================================
|
||||||
|
|
||||||
# Add new columns if they don't exist (for existing databases)
|
async def add_song(server_id, details, queued_by, position=None):
|
||||||
# Migrations for existing databases
|
"""
|
||||||
columns = [
|
Add a song to the queue
|
||||||
("loop_mode", "TEXT DEFAULT 'off'"),
|
|
||||||
("volume", "INTEGER DEFAULT 100"),
|
|
||||||
("effect", "TEXT DEFAULT 'none'"),
|
|
||||||
("song_start_time", "REAL DEFAULT 0"),
|
|
||||||
("song_duration", "INTEGER DEFAULT 0"),
|
|
||||||
("song_thumbnail", "TEXT DEFAULT ''"),
|
|
||||||
("song_url", "TEXT DEFAULT ''") # NEW
|
|
||||||
]
|
|
||||||
|
|
||||||
for col_name, col_type in columns:
|
Args:
|
||||||
try:
|
server_id: Discord server ID
|
||||||
cursor.execute(f"ALTER TABLE servers ADD COLUMN {col_name} {col_type};")
|
details: Dictionary with song info (url, title, thumbnail, duration) or string
|
||||||
except sqlite3.OperationalError:
|
queued_by: Username who queued the song
|
||||||
pass
|
position: Optional position in queue (None = end of queue)
|
||||||
|
|
||||||
cursor.execute('''CREATE TABLE IF NOT EXISTS songs (
|
|
||||||
server_id TEXT NOT NULL,
|
|
||||||
song_link TEXT,
|
|
||||||
queued_by TEXT,
|
|
||||||
position INTEGER NOT NULL,
|
|
||||||
title TEXT,
|
|
||||||
thumbnail TEXT,
|
|
||||||
duration INTEGER,
|
|
||||||
PRIMARY KEY (position),
|
|
||||||
FOREIGN KEY (server_id) REFERENCES servers(server_id)
|
|
||||||
);''')
|
|
||||||
|
|
||||||
cursor.execute("DELETE FROM songs;")
|
|
||||||
conn.commit()
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
# Queue a song in the db
|
|
||||||
async def add_song(server_id, details, queued_by):
|
|
||||||
# Connect to db
|
|
||||||
conn = sqlite3.connect(db_path)
|
|
||||||
cursor = conn.cursor()
|
|
||||||
|
|
||||||
await add_server(server_id, cursor, conn)
|
|
||||||
|
|
||||||
max_order_num = await get_max(server_id, cursor) + 1
|
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Position in queue
|
||||||
|
"""
|
||||||
if isinstance(details, str):
|
if isinstance(details, str):
|
||||||
# Fallback for raw strings
|
# Fallback for raw strings (legacy support)
|
||||||
cursor.execute("""INSERT INTO songs VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
pos = db.add_song(
|
||||||
(server_id, "Not grabbed", queued_by, max_order_num, details, "Unknown", 0))
|
server_id=str(server_id),
|
||||||
|
song_link="Not grabbed",
|
||||||
|
queued_by=queued_by,
|
||||||
|
title=details,
|
||||||
|
thumbnail="Unknown",
|
||||||
|
duration=0,
|
||||||
|
position=position
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
# Save exact duration and thumbnail from the start
|
# Standard dictionary format
|
||||||
cursor.execute("""INSERT INTO songs VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
pos = db.add_song(
|
||||||
(server_id, details['url'], queued_by, max_order_num, details['title'], details['thumbnail'], details['duration']))
|
server_id=str(server_id),
|
||||||
|
song_link=details['url'],
|
||||||
|
queued_by=queued_by,
|
||||||
|
title=details['title'],
|
||||||
|
thumbnail=details.get('thumbnail', ''),
|
||||||
|
duration=details.get('duration', 0),
|
||||||
|
position=position
|
||||||
|
)
|
||||||
|
|
||||||
conn.commit()
|
return pos
|
||||||
conn.close()
|
|
||||||
|
|
||||||
return max_order_num
|
|
||||||
|
|
||||||
|
|
||||||
# Pop song from server (respects loop mode)
|
|
||||||
async def pop(server_id, ignore=False, skip_mode=False):
|
async def pop(server_id, ignore=False, skip_mode=False):
|
||||||
"""
|
"""
|
||||||
Pop next song from queue
|
Pop next song from queue
|
||||||
ignore: Skip the song without returning URL
|
|
||||||
skip_mode: True when called from skip command (affects loop song behavior)
|
Args:
|
||||||
|
server_id: Discord server ID
|
||||||
|
ignore: Skip the song without returning URL
|
||||||
|
skip_mode: True when called from skip command (affects loop song behavior)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Song URL or None
|
||||||
"""
|
"""
|
||||||
# Connect to db
|
result = db.get_next_song(str(server_id))
|
||||||
conn = sqlite3.connect(db_path)
|
|
||||||
cursor = conn.cursor()
|
|
||||||
|
|
||||||
# JUST INCASE!
|
|
||||||
await add_server(server_id, cursor, conn)
|
|
||||||
|
|
||||||
# Fetch info: link(1), title(4), thumbnail(5), duration(6)
|
|
||||||
cursor.execute('''SELECT * FROM songs WHERE server_id = ? ORDER BY position LIMIT 1;''', (server_id,))
|
|
||||||
result = cursor.fetchone()
|
|
||||||
conn.commit()
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
if result is None:
|
if result is None:
|
||||||
return None
|
return None
|
||||||
elif ignore:
|
|
||||||
await mark_song_as_finished(server_id, result[3])
|
|
||||||
return None
|
|
||||||
elif result[1] == "Not grabbed":
|
|
||||||
# Lazy load logic
|
|
||||||
song_list = await search_song(result[4])
|
|
||||||
if not song_list:
|
|
||||||
return None
|
|
||||||
song = song_list[0]
|
|
||||||
|
|
||||||
await set_current_song(server_id, song['title'], song.get('thumbnail', ''), song.get('duration', 0))
|
# result format: (server_id, song_link, queued_by, position, title, thumbnail, duration)
|
||||||
|
server_id_str, song_link, queued_by, position, title, thumbnail, duration = result
|
||||||
|
|
||||||
|
if ignore:
|
||||||
|
db.remove_song(str(server_id), position)
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Handle lazy-loaded songs (not yet fetched from YouTube)
|
||||||
|
if song_link == "Not grabbed":
|
||||||
|
song_list = await search_song(title)
|
||||||
|
if not song_list:
|
||||||
|
db.remove_song(str(server_id), position)
|
||||||
|
return None
|
||||||
|
|
||||||
|
song = song_list[0]
|
||||||
|
await set_current_song(
|
||||||
|
server_id,
|
||||||
|
song['title'],
|
||||||
|
song['url'],
|
||||||
|
song.get('thumbnail', ''),
|
||||||
|
song.get('duration', 0)
|
||||||
|
)
|
||||||
|
|
||||||
# Check loop mode before removing
|
# Check loop mode before removing
|
||||||
loop_mode = await get_loop_mode(server_id)
|
loop_mode = await get_loop_mode(server_id)
|
||||||
if loop_mode != 'song': # Only remove if not looping song
|
if loop_mode != 'song':
|
||||||
await mark_song_as_finished(server_id, result[3])
|
db.remove_song(str(server_id), position)
|
||||||
|
|
||||||
return song['url']
|
return song['url']
|
||||||
|
|
||||||
# Pre-grabbed logic (Standard)
|
# Standard pre-fetched song
|
||||||
# result[1] is url, result[5] is thumbnail, result[6] is duration
|
await set_current_song(server_id, title, song_link, thumbnail, duration)
|
||||||
await set_current_song(server_id, result[4], result[1], result[5], result[6])
|
|
||||||
|
|
||||||
# Check loop mode before removing
|
# Check loop mode before removing
|
||||||
loop_mode = await get_loop_mode(server_id)
|
loop_mode = await get_loop_mode(server_id)
|
||||||
if loop_mode != 'song': # Only remove if not looping song
|
if loop_mode != 'song':
|
||||||
await mark_song_as_finished(server_id, result[3])
|
db.remove_song(str(server_id), position)
|
||||||
|
|
||||||
return result[1]
|
return song_link
|
||||||
|
|
||||||
# Add server to db if first time queuing
|
|
||||||
async def add_server(server_id, cursor, conn):
|
|
||||||
cursor.execute('SELECT COUNT(*) FROM servers WHERE server_id = ?', (server_id,))
|
|
||||||
if cursor.fetchone()[0] == 0:
|
|
||||||
cursor.execute('''INSERT INTO servers (server_id, loop_mode, volume, effect, song_thumbnail, song_url)
|
|
||||||
VALUES (?, 'off', 100, 'none', '', '')''', (server_id,))
|
|
||||||
conn.commit()
|
|
||||||
|
|
||||||
|
|
||||||
# set song as played and update indexes
|
|
||||||
async def mark_song_as_finished(server_id, order_num):
|
|
||||||
# Connect to the database
|
|
||||||
conn = sqlite3.connect(db_path)
|
|
||||||
cursor = conn.cursor()
|
|
||||||
|
|
||||||
# Update the song as finished
|
|
||||||
cursor.execute('''DELETE FROM songs
|
|
||||||
WHERE server_id = ? AND position = ?''',
|
|
||||||
(server_id, order_num))
|
|
||||||
|
|
||||||
# Close connection
|
|
||||||
conn.commit()
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
|
|
||||||
# set the current playing song of the server
|
|
||||||
async def set_current_song(server_id, title, url, thumbnail="", duration=0):
|
|
||||||
conn = sqlite3.connect(db_path)
|
|
||||||
cursor = conn.cursor()
|
|
||||||
start_time = time.time()
|
|
||||||
|
|
||||||
# Ensure duration is an integer
|
|
||||||
try:
|
|
||||||
duration = int(duration)
|
|
||||||
except:
|
|
||||||
duration = 0
|
|
||||||
|
|
||||||
cursor.execute(''' UPDATE servers
|
|
||||||
SET song_name = ?, song_url = ?, song_thumbnail = ?, song_start_time = ?, song_duration = ?
|
|
||||||
WHERE server_id = ?''',
|
|
||||||
(title, url, thumbnail, start_time, duration, server_id))
|
|
||||||
|
|
||||||
conn.commit()
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
# Returns dictionary with title and thumbnail
|
|
||||||
async def get_current_song(server_id):
|
|
||||||
conn = sqlite3.connect(db_path)
|
|
||||||
cursor = conn.cursor()
|
|
||||||
|
|
||||||
cursor.execute(''' SELECT song_name, song_thumbnail, song_url FROM servers WHERE server_id = ? LIMIT 1;''', (server_id,))
|
|
||||||
result = cursor.fetchone()
|
|
||||||
conn.commit()
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
if result:
|
|
||||||
return {'title': result[0], 'thumbnail': result[1], 'url': result[2]}
|
|
||||||
return {'title': "Nothing", 'thumbnail': None, 'url': ''}
|
|
||||||
|
|
||||||
async def get_current_progress(server_id):
|
|
||||||
conn = sqlite3.connect(db_path)
|
|
||||||
cursor = conn.cursor()
|
|
||||||
cursor.execute('''SELECT song_start_time, song_duration, is_playing FROM servers WHERE server_id = ? LIMIT 1;''', (server_id,))
|
|
||||||
result = cursor.fetchone()
|
|
||||||
conn.close() # Close quickly
|
|
||||||
|
|
||||||
if not result or result[2] == 0:
|
|
||||||
return 0, 0, 0.0
|
|
||||||
|
|
||||||
start_time, duration, _ = result
|
|
||||||
|
|
||||||
if duration is None or duration == 0:
|
|
||||||
return 0, 0, 0.0
|
|
||||||
|
|
||||||
elapsed = int(time.time() - start_time)
|
|
||||||
elapsed = min(elapsed, duration)
|
|
||||||
percentage = (elapsed / duration) * 100 if duration > 0 else 0
|
|
||||||
|
|
||||||
return elapsed, duration, percentage
|
|
||||||
|
|
||||||
async def get_max(server_id, cursor):
|
|
||||||
cursor.execute("SELECT MAX(position) FROM songs WHERE server_id = ?", (server_id,))
|
|
||||||
result = cursor.fetchone()
|
|
||||||
return result[0] if result[0] is not None else -1
|
|
||||||
|
|
||||||
async def update_server(server_id, playing):
|
|
||||||
conn = sqlite3.connect(db_path)
|
|
||||||
cursor = conn.cursor()
|
|
||||||
await add_server(server_id, cursor, conn)
|
|
||||||
val = 1 if playing else 0
|
|
||||||
cursor.execute("UPDATE servers SET is_playing = ? WHERE server_id = ?", (val, server_id))
|
|
||||||
conn.commit()
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
async def is_server_playing(server_id):
|
|
||||||
conn = sqlite3.connect(db_path)
|
|
||||||
cursor = conn.cursor()
|
|
||||||
await add_server(server_id, cursor, conn)
|
|
||||||
cursor.execute("SELECT is_playing FROM servers WHERE server_id = ?", (server_id,))
|
|
||||||
res = cursor.fetchone()
|
|
||||||
conn.close()
|
|
||||||
return True if res[0] == 1 else False
|
|
||||||
|
|
||||||
async def clear(server_id):
|
|
||||||
conn = sqlite3.connect(db_path)
|
|
||||||
cursor = conn.cursor()
|
|
||||||
await add_server(server_id, cursor, conn)
|
|
||||||
await update_server(server_id, False)
|
|
||||||
cursor.execute("DELETE FROM songs WHERE server_id = ?", (server_id,))
|
|
||||||
conn.commit()
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
async def grab_songs(server_id):
|
async def grab_songs(server_id):
|
||||||
conn = sqlite3.connect(db_path)
|
"""
|
||||||
cursor = conn.cursor()
|
Get current queue
|
||||||
await add_server(server_id, cursor, conn)
|
|
||||||
cursor.execute("SELECT title, duration, queued_by FROM songs WHERE server_id = ? ORDER BY position LIMIT 10", (server_id,))
|
|
||||||
songs = cursor.fetchall()
|
|
||||||
max_pos = await get_max(server_id, cursor)
|
|
||||||
conn.close()
|
|
||||||
return max_pos, songs
|
|
||||||
|
|
||||||
# --- Effects/Loop/Shuffle/Volume (Simplified Paste) ---
|
Returns:
|
||||||
async def get_loop_mode(server_id):
|
Tuple of (max_position, list_of_songs)
|
||||||
conn = sqlite3.connect(db_path)
|
"""
|
||||||
cursor = conn.cursor()
|
return db.get_queue(str(server_id), limit=10)
|
||||||
await add_server(server_id, cursor, conn)
|
|
||||||
cursor.execute("SELECT loop_mode FROM servers WHERE server_id = ?", (server_id,))
|
|
||||||
res = cursor.fetchone()
|
|
||||||
conn.close()
|
|
||||||
return res[0] if res else 'off'
|
|
||||||
|
|
||||||
async def set_loop_mode(server_id, mode):
|
|
||||||
conn = sqlite3.connect(db_path)
|
|
||||||
cursor = conn.cursor()
|
|
||||||
await add_server(server_id, cursor, conn)
|
|
||||||
cursor.execute("UPDATE servers SET loop_mode = ? WHERE server_id = ?", (mode, server_id))
|
|
||||||
conn.commit()
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
async def get_volume(server_id):
|
async def clear(server_id):
|
||||||
conn = sqlite3.connect(db_path)
|
"""Clear the queue for a server"""
|
||||||
cursor = conn.cursor()
|
db.clear_queue(str(server_id))
|
||||||
await add_server(server_id, cursor, conn)
|
await update_server(server_id, False)
|
||||||
cursor.execute("SELECT volume FROM servers WHERE server_id = ?", (server_id,))
|
|
||||||
res = cursor.fetchone()
|
|
||||||
conn.close()
|
|
||||||
return res[0] if res else 100
|
|
||||||
|
|
||||||
async def set_volume(server_id, vol):
|
|
||||||
conn = sqlite3.connect(db_path)
|
|
||||||
cursor = conn.cursor()
|
|
||||||
await add_server(server_id, cursor, conn)
|
|
||||||
cursor.execute("UPDATE servers SET volume = ? WHERE server_id = ?", (vol, server_id))
|
|
||||||
conn.commit()
|
|
||||||
conn.close()
|
|
||||||
return vol
|
|
||||||
|
|
||||||
async def shuffle_queue(server_id):
|
async def shuffle_queue(server_id):
|
||||||
conn = sqlite3.connect(db_path)
|
"""Shuffle the queue randomly"""
|
||||||
cursor = conn.cursor()
|
return db.shuffle_queue(str(server_id))
|
||||||
await add_server(server_id, cursor, conn)
|
|
||||||
cursor.execute("SELECT position, song_link, queued_by, title, thumbnail, duration FROM songs WHERE server_id = ? ORDER BY position", (server_id,))
|
|
||||||
songs = cursor.fetchall()
|
# ===================================
|
||||||
if len(songs) <= 1:
|
# Server State Management
|
||||||
conn.close()
|
# ===================================
|
||||||
return False
|
|
||||||
random.shuffle(songs)
|
async def update_server(server_id, playing):
|
||||||
cursor.execute("DELETE FROM songs WHERE server_id = ?", (server_id,))
|
"""Update server playing status"""
|
||||||
for i, s in enumerate(songs):
|
db.set_server_playing(str(server_id), playing)
|
||||||
cursor.execute("INSERT INTO songs VALUES (?, ?, ?, ?, ?, ?, ?)", (server_id, s[1], s[2], i, s[3], s[4], s[5]))
|
|
||||||
conn.commit()
|
|
||||||
conn.close()
|
async def is_server_playing(server_id):
|
||||||
return True
|
"""Check if server is currently playing"""
|
||||||
|
return db.is_server_playing(str(server_id))
|
||||||
|
|
||||||
|
|
||||||
|
async def set_current_song(server_id, title, url, thumbnail="", duration=0):
|
||||||
|
"""Set the currently playing song"""
|
||||||
|
db.set_current_song(
|
||||||
|
str(server_id),
|
||||||
|
title,
|
||||||
|
url,
|
||||||
|
thumbnail,
|
||||||
|
duration,
|
||||||
|
time.time() # start_time
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def get_current_song(server_id):
|
||||||
|
"""Get current song info"""
|
||||||
|
return db.get_current_song(str(server_id))
|
||||||
|
|
||||||
|
|
||||||
|
async def get_current_progress(server_id):
|
||||||
|
"""Get playback progress (elapsed, duration, percentage)"""
|
||||||
|
return db.get_current_progress(str(server_id))
|
||||||
|
|
||||||
|
|
||||||
|
# ===================================
|
||||||
|
# Settings Management
|
||||||
|
# ===================================
|
||||||
|
|
||||||
|
async def get_loop_mode(server_id):
|
||||||
|
"""Get loop mode: 'off', 'song', or 'queue'"""
|
||||||
|
return db.get_loop_mode(str(server_id))
|
||||||
|
|
||||||
|
|
||||||
|
async def set_loop_mode(server_id, mode):
|
||||||
|
"""Set loop mode: 'off', 'song', or 'queue'"""
|
||||||
|
db.set_loop_mode(str(server_id), mode)
|
||||||
|
|
||||||
|
|
||||||
|
async def get_volume(server_id):
|
||||||
|
"""Get volume (0-200)"""
|
||||||
|
return db.get_volume(str(server_id))
|
||||||
|
|
||||||
|
|
||||||
|
async def set_volume(server_id, vol):
|
||||||
|
"""Set volume (0-200)"""
|
||||||
|
return db.set_volume(str(server_id), vol)
|
||||||
|
|
||||||
|
|
||||||
async def get_effect(server_id):
|
async def get_effect(server_id):
|
||||||
conn = sqlite3.connect(db_path)
|
"""Get current audio effect"""
|
||||||
cursor = conn.cursor()
|
return db.get_effect(str(server_id))
|
||||||
await add_server(server_id, cursor, conn)
|
|
||||||
cursor.execute("SELECT effect FROM servers WHERE server_id = ?", (server_id,))
|
|
||||||
res = cursor.fetchone()
|
|
||||||
conn.close()
|
|
||||||
return res[0] if res else 'none'
|
|
||||||
|
|
||||||
async def set_effect(server_id, fx):
|
async def set_effect(server_id, fx):
|
||||||
conn = sqlite3.connect(db_path)
|
"""Set audio effect"""
|
||||||
cursor = conn.cursor()
|
db.set_effect(str(server_id), fx)
|
||||||
await add_server(server_id, cursor, conn)
|
|
||||||
cursor.execute("UPDATE servers SET effect = ? WHERE server_id = ?", (fx, server_id))
|
|
||||||
conn.commit()
|
# ===================================
|
||||||
conn.close()
|
# Effect Metadata
|
||||||
|
# ===================================
|
||||||
|
|
||||||
def list_all_effects():
|
def list_all_effects():
|
||||||
|
"""List all available audio effects"""
|
||||||
return [
|
return [
|
||||||
'none', 'bassboost', 'nightcore', 'slowed', 'earrape', 'deepfry', 'distortion',
|
'none', 'bassboost', 'nightcore', 'slowed', 'earrape', 'deepfry', 'distortion',
|
||||||
'reverse', 'chipmunk', 'demonic', 'underwater', 'robot',
|
'reverse', 'chipmunk', 'demonic', 'underwater', 'robot',
|
||||||
'8d', 'vibrato', 'tremolo', 'echo', 'phone', 'megaphone'
|
'8d', 'vibrato', 'tremolo', 'echo', 'phone', 'megaphone'
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
def get_effect_emoji(effect_name):
|
def get_effect_emoji(effect_name):
|
||||||
# Short list of emoji mappings
|
"""Get emoji for effect"""
|
||||||
emojis = {
|
emojis = {
|
||||||
'none': '✨', # Changed to generic Sparkles
|
'none': '✨',
|
||||||
'bassboost': '💥',
|
'bassboost': '💥',
|
||||||
'nightcore': '⚡',
|
'nightcore': '⚡',
|
||||||
'slowed': '🐢',
|
'slowed': '🐢',
|
||||||
@@ -459,7 +341,9 @@ def get_effect_emoji(effect_name):
|
|||||||
}
|
}
|
||||||
return emojis.get(effect_name, '✨')
|
return emojis.get(effect_name, '✨')
|
||||||
|
|
||||||
|
|
||||||
def get_effect_description(effect_name):
|
def get_effect_description(effect_name):
|
||||||
|
"""Get description for effect"""
|
||||||
descriptions = {
|
descriptions = {
|
||||||
'none': 'Normal audio',
|
'none': 'Normal audio',
|
||||||
'bassboost': 'MAXIMUM BASS 🔊',
|
'bassboost': 'MAXIMUM BASS 🔊',
|
||||||
@@ -482,43 +366,60 @@ def get_effect_description(effect_name):
|
|||||||
}
|
}
|
||||||
return descriptions.get(effect_name, 'Unknown effect')
|
return descriptions.get(effect_name, 'Unknown effect')
|
||||||
|
|
||||||
|
|
||||||
|
# ===================================
|
||||||
|
# Playback
|
||||||
|
# ===================================
|
||||||
|
|
||||||
async def play(ctx):
|
async def play(ctx):
|
||||||
|
"""
|
||||||
|
Main playback loop - plays songs from queue
|
||||||
|
"""
|
||||||
server_id = ctx.guild.id
|
server_id = ctx.guild.id
|
||||||
voice_client = ctx.voice_client
|
voice_client = ctx.voice_client
|
||||||
|
|
||||||
if voice_client is None:
|
if voice_client is None:
|
||||||
await update_server(server_id, False)
|
await update_server(server_id, False)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# Wait for current song to finish
|
||||||
while voice_client.is_playing():
|
while voice_client.is_playing():
|
||||||
await asyncio.sleep(0.5)
|
await asyncio.sleep(0.5)
|
||||||
|
|
||||||
|
# Get next song
|
||||||
url = await pop(server_id)
|
url = await pop(server_id)
|
||||||
if url is None:
|
if url is None:
|
||||||
await update_server(server_id, False)
|
await update_server(server_id, False)
|
||||||
return
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Scale volume down to prevent earrape
|
# Get volume and effect settings
|
||||||
# User sees 0-200%, but internally we scale by 0.25
|
vol = await get_volume(server_id) / 100.0 * 0.25 # Scale down by 0.25
|
||||||
# So user's 100% = 0.25 actual volume (25%)
|
|
||||||
vol = await get_volume(server_id) / 100.0 * 0.25
|
|
||||||
fx = await get_effect(server_id)
|
fx = await get_effect(server_id)
|
||||||
opts = get_effect_options(fx)
|
opts = get_effect_options(fx)
|
||||||
|
|
||||||
|
# Create audio source
|
||||||
src = discord.FFmpegPCMAudio(url, **opts)
|
src = discord.FFmpegPCMAudio(url, **opts)
|
||||||
src = discord.PCMVolumeTransformer(src, volume=vol)
|
src = discord.PCMVolumeTransformer(src, volume=vol)
|
||||||
|
|
||||||
|
# After callback - play next song
|
||||||
def after(e):
|
def after(e):
|
||||||
if e: print(e)
|
if e:
|
||||||
if voice_client and not voice_client.is_connected(): return
|
print(f"Playback error: {e}")
|
||||||
|
if voice_client and not voice_client.is_connected():
|
||||||
|
return
|
||||||
|
|
||||||
|
# Schedule next song
|
||||||
coro = play(ctx)
|
coro = play(ctx)
|
||||||
fut = asyncio.run_coroutine_threadsafe(coro, ctx.bot.loop)
|
fut = asyncio.run_coroutine_threadsafe(coro, ctx.bot.loop)
|
||||||
try: fut.result()
|
try:
|
||||||
except: pass
|
fut.result()
|
||||||
|
except Exception as ex:
|
||||||
|
print(f"Error in after callback: {ex}")
|
||||||
|
|
||||||
voice_client.play(src, after=after)
|
voice_client.play(src, after=after)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Play error: {e}")
|
print(f"Play error: {e}")
|
||||||
|
# Try next song on error
|
||||||
await play(ctx)
|
await play(ctx)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,9 @@
|
|||||||
# Handles translating urls and search terms
|
"""
|
||||||
|
URL and search query handling for Groovy-Zilean
|
||||||
|
Translates YouTube, Spotify, SoundCloud URLs and search queries into playable audio
|
||||||
|
"""
|
||||||
|
|
||||||
|
from typing import Any
|
||||||
import yt_dlp as ytdlp
|
import yt_dlp as ytdlp
|
||||||
import spotipy
|
import spotipy
|
||||||
|
|
||||||
@@ -22,7 +26,7 @@ ydl_opts = {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
async def main(url, sp):
|
async def main(url: str, sp: spotipy.Spotify) -> list[dict[str, Any] | str]:
|
||||||
|
|
||||||
#url = url.lower()
|
#url = url.lower()
|
||||||
|
|
||||||
@@ -60,7 +64,7 @@ async def main(url, sp):
|
|||||||
return []
|
return []
|
||||||
|
|
||||||
|
|
||||||
async def search_song(search):
|
async def search_song(search: str) -> list[dict[str, Any]]:
|
||||||
with ytdlp.YoutubeDL(ydl_opts) as ydl:
|
with ytdlp.YoutubeDL(ydl_opts) as ydl:
|
||||||
try:
|
try:
|
||||||
info = ydl.extract_info(f"ytsearch1:{search}", download=False)
|
info = ydl.extract_info(f"ytsearch1:{search}", download=False)
|
||||||
@@ -89,7 +93,7 @@ async def search_song(search):
|
|||||||
return [data]
|
return [data]
|
||||||
|
|
||||||
|
|
||||||
async def spotify_song(url, sp):
|
async def spotify_song(url: str, sp: spotipy.Spotify) -> list[dict[str, Any]]:
|
||||||
track = sp.track(url.split("/")[-1].split("?")[0])
|
track = sp.track(url.split("/")[-1].split("?")[0])
|
||||||
search = ""
|
search = ""
|
||||||
|
|
||||||
@@ -106,7 +110,11 @@ async def spotify_song(url, sp):
|
|||||||
return await search_song(query)
|
return await search_song(query)
|
||||||
|
|
||||||
|
|
||||||
async def spotify_playlist(url, sp):
|
async def spotify_playlist(url: str, sp: spotipy.Spotify) -> list[str | dict[str, Any]]:
|
||||||
|
"""
|
||||||
|
Get songs from a Spotify playlist
|
||||||
|
Returns a mixed list where first item is dict, rest are search strings
|
||||||
|
"""
|
||||||
# Get the playlist uri code
|
# Get the playlist uri code
|
||||||
code = url.split("/")[-1].split("?")[0]
|
code = url.split("/")[-1].split("?")[0]
|
||||||
|
|
||||||
@@ -116,41 +124,35 @@ async def spotify_playlist(url, sp):
|
|||||||
except spotipy.exceptions.SpotifyException:
|
except spotipy.exceptions.SpotifyException:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
# Go through the tracks
|
# Go through the tracks and build search queries
|
||||||
songs = []
|
songs: list[str | dict[str, Any]] = [] # Explicit type for mypy
|
||||||
for track in results:
|
for track in results:
|
||||||
search = ""
|
search = ""
|
||||||
|
|
||||||
# Fetch all artists
|
# Fetch all artists
|
||||||
for artist in track['track']['artists']:
|
for artist in track['track']['artists']:
|
||||||
|
|
||||||
# Add all artists to search
|
# Add all artists to search
|
||||||
search += f"{artist['name']}, "
|
search += f"{artist['name']}, "
|
||||||
|
|
||||||
# Remove last column
|
# Remove last comma
|
||||||
search = search[:-2]
|
search = search[:-2]
|
||||||
search += f" - {track['track']['name']}"
|
search += f" - {track['track']['name']}"
|
||||||
songs.append(search)
|
songs.append(search)
|
||||||
|
|
||||||
#searched_result = search_song(search)
|
# Fetch first song's full data
|
||||||
#if searched_result == []:
|
|
||||||
#continue
|
|
||||||
|
|
||||||
#songs.append(searched_result[0])
|
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
search_result = await search_song(songs[0])
|
search_result = await search_song(songs[0]) # type: ignore
|
||||||
if search_result == []:
|
if search_result == []:
|
||||||
songs.pop(0)
|
songs.pop(0)
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
songs[0] = search_result[0]
|
songs[0] = search_result[0] # Replace string with dict
|
||||||
break
|
break
|
||||||
|
|
||||||
return songs
|
return songs
|
||||||
|
|
||||||
|
|
||||||
async def song_download(url):
|
async def song_download(url: str) -> list[dict[str, Any]]:
|
||||||
with ytdlp.YoutubeDL(ydl_opts) as ydl:
|
with ytdlp.YoutubeDL(ydl_opts) as ydl:
|
||||||
try:
|
try:
|
||||||
info = ydl.extract_info(url, download=False)
|
info = ydl.extract_info(url, download=False)
|
||||||
@@ -180,7 +182,7 @@ async def song_download(url):
|
|||||||
return [data]
|
return [data]
|
||||||
|
|
||||||
|
|
||||||
async def playlist_download(url):
|
async def playlist_download(url: str) -> list[dict[str, Any]]:
|
||||||
with ytdlp.YoutubeDL(ydl_opts) as ydl:
|
with ytdlp.YoutubeDL(ydl_opts) as ydl:
|
||||||
try:
|
try:
|
||||||
info = ydl.extract_info(url, download=False)
|
info = ydl.extract_info(url, download=False)
|
||||||
|
|||||||
@@ -1,3 +1,9 @@
|
|||||||
|
"""
|
||||||
|
Utility functions for Groovy-Zilean music bot
|
||||||
|
Handles voice channel operations, queue display, and inactivity tracking
|
||||||
|
"""
|
||||||
|
|
||||||
|
from typing import Any
|
||||||
import discord
|
import discord
|
||||||
from discord.ext.commands.context import Context
|
from discord.ext.commands.context import Context
|
||||||
from discord.ext.commands.converter import CommandError
|
from discord.ext.commands.converter import CommandError
|
||||||
@@ -7,15 +13,29 @@ from . import queue
|
|||||||
import asyncio
|
import asyncio
|
||||||
|
|
||||||
# Track last activity time for each server
|
# Track last activity time for each server
|
||||||
last_activity = {}
|
last_activity: dict[int, float] = {}
|
||||||
|
|
||||||
# Joining/moving to the user's vc in a guild
|
|
||||||
async def join_vc(ctx: Context):
|
|
||||||
|
|
||||||
|
# ===================================
|
||||||
|
# Voice Channel Management
|
||||||
|
# ===================================
|
||||||
|
|
||||||
|
async def join_vc(ctx: Context) -> discord.VoiceClient:
|
||||||
|
"""
|
||||||
|
Join or move to the user's voice channel
|
||||||
|
|
||||||
|
Args:
|
||||||
|
ctx: Command context
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The voice client connection
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
CommandError: If user is not in a voice channel
|
||||||
|
"""
|
||||||
# Get the user's vc
|
# Get the user's vc
|
||||||
author_voice = getattr(ctx.author, "voice")
|
author_voice = getattr(ctx.author, "voice")
|
||||||
if author_voice is None:
|
if author_voice is None:
|
||||||
# Raise exception if user is not in vc
|
|
||||||
raise CommandError("User is not in voice channel")
|
raise CommandError("User is not in voice channel")
|
||||||
|
|
||||||
# Get user's vc
|
# Get user's vc
|
||||||
@@ -25,19 +45,26 @@ async def join_vc(ctx: Context):
|
|||||||
|
|
||||||
# Join or move to the user's vc
|
# Join or move to the user's vc
|
||||||
if ctx.voice_client is None:
|
if ctx.voice_client is None:
|
||||||
vc = await vc.connect()
|
vc_client = await vc.connect()
|
||||||
else:
|
else:
|
||||||
# Safe to ignore type error for now
|
vc_client = await ctx.voice_client.move_to(vc)
|
||||||
vc = await ctx.voice_client.move_to(vc)
|
|
||||||
|
|
||||||
# Update last activity
|
# Update last activity
|
||||||
last_activity[ctx.guild.id] = asyncio.get_event_loop().time()
|
last_activity[ctx.guild.id] = asyncio.get_event_loop().time()
|
||||||
|
|
||||||
return vc
|
return vc_client
|
||||||
|
|
||||||
|
|
||||||
# Leaving the voice channel of a user
|
async def leave_vc(ctx: Context) -> None:
|
||||||
async def leave_vc(ctx: Context):
|
"""
|
||||||
|
Leave the voice channel and clean up
|
||||||
|
|
||||||
|
Args:
|
||||||
|
ctx: Command context
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
CommandError: If bot is not in VC or user is not in same VC
|
||||||
|
"""
|
||||||
# If the bot is not in a vc of this server
|
# If the bot is not in a vc of this server
|
||||||
if ctx.voice_client is None:
|
if ctx.voice_client is None:
|
||||||
raise CommandError("I am not in a voice channel")
|
raise CommandError("I am not in a voice channel")
|
||||||
@@ -73,9 +100,18 @@ async def leave_vc(ctx: Context):
|
|||||||
del last_activity[ctx.guild.id]
|
del last_activity[ctx.guild.id]
|
||||||
|
|
||||||
|
|
||||||
# Auto-disconnect if inactive
|
# ===================================
|
||||||
async def check_inactivity(bot):
|
# Inactivity Management
|
||||||
"""Background task to check for inactive voice connections"""
|
# ===================================
|
||||||
|
|
||||||
|
async def check_inactivity(bot: discord.Client) -> None:
|
||||||
|
"""
|
||||||
|
Background task to check for inactive voice connections
|
||||||
|
Auto-disconnects after 5 minutes of inactivity
|
||||||
|
|
||||||
|
Args:
|
||||||
|
bot: The Discord bot instance
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
current_time = asyncio.get_event_loop().time()
|
current_time = asyncio.get_event_loop().time()
|
||||||
|
|
||||||
@@ -98,20 +134,34 @@ async def check_inactivity(bot):
|
|||||||
print(f"Error in inactivity checker: {e}")
|
print(f"Error in inactivity checker: {e}")
|
||||||
|
|
||||||
|
|
||||||
# Update activity timestamp when playing
|
def update_activity(guild_id: int) -> None:
|
||||||
def update_activity(guild_id):
|
"""
|
||||||
"""Call this when a song starts playing"""
|
Update activity timestamp when a song starts playing
|
||||||
|
|
||||||
|
Args:
|
||||||
|
guild_id: Discord guild/server ID
|
||||||
|
"""
|
||||||
last_activity[guild_id] = asyncio.get_event_loop().time()
|
last_activity[guild_id] = asyncio.get_event_loop().time()
|
||||||
|
|
||||||
|
|
||||||
# Interactive buttons for queue control
|
# ===================================
|
||||||
|
# Queue Display & Controls
|
||||||
|
# ===================================
|
||||||
|
|
||||||
class QueueControls(View):
|
class QueueControls(View):
|
||||||
def __init__(self, ctx):
|
"""Interactive buttons for queue control"""
|
||||||
super().__init__(timeout=None) # No timeout allows buttons to stay active longer
|
|
||||||
|
def __init__(self, ctx: Context) -> None:
|
||||||
|
super().__init__(timeout=None) # No timeout allows buttons to stay active longer
|
||||||
self.ctx = ctx
|
self.ctx = ctx
|
||||||
|
|
||||||
async def refresh_message(self, interaction: discord.Interaction):
|
async def refresh_message(self, interaction: discord.Interaction) -> None:
|
||||||
"""Helper to regenerate the embed and edit the message"""
|
"""
|
||||||
|
Helper to regenerate the embed and edit the message
|
||||||
|
|
||||||
|
Args:
|
||||||
|
interaction: Discord interaction from button press
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
# Generate new embed
|
# Generate new embed
|
||||||
embed, view = await generate_queue_ui(self.ctx)
|
embed, view = await generate_queue_ui(self.ctx)
|
||||||
@@ -119,10 +169,13 @@ class QueueControls(View):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
# Fallback if edit fails
|
# Fallback if edit fails
|
||||||
if not interaction.response.is_done():
|
if not interaction.response.is_done():
|
||||||
await interaction.response.send_message("Refreshed, but something went wrong updating the display.", ephemeral=True)
|
await interaction.response.send_message(
|
||||||
|
"Refreshed, but something went wrong updating the display.",
|
||||||
|
ephemeral=True
|
||||||
|
)
|
||||||
|
|
||||||
@discord.ui.button(label="⏭️ Skip", style=discord.ButtonStyle.primary)
|
@discord.ui.button(label="⏭️ Skip", style=discord.ButtonStyle.primary)
|
||||||
async def skip_button(self, interaction: discord.Interaction, button: Button):
|
async def skip_button(self, interaction: discord.Interaction, button: Button) -> None:
|
||||||
if interaction.user not in self.ctx.voice_client.channel.members:
|
if interaction.user not in self.ctx.voice_client.channel.members:
|
||||||
await interaction.response.send_message("❌ You must be in the voice channel!", ephemeral=True)
|
await interaction.response.send_message("❌ You must be in the voice channel!", ephemeral=True)
|
||||||
return
|
return
|
||||||
@@ -130,11 +183,6 @@ class QueueControls(View):
|
|||||||
# Loop logic check
|
# Loop logic check
|
||||||
loop_mode = await queue.get_loop_mode(self.ctx.guild.id)
|
loop_mode = await queue.get_loop_mode(self.ctx.guild.id)
|
||||||
|
|
||||||
# Logic mimics the command
|
|
||||||
if loop_mode == 'song':
|
|
||||||
# Just restart current song effectively but here we assume standard skip behavior for button
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Perform the skip
|
# Perform the skip
|
||||||
await queue.pop(self.ctx.guild.id, True, skip_mode=True)
|
await queue.pop(self.ctx.guild.id, True, skip_mode=True)
|
||||||
if self.ctx.voice_client:
|
if self.ctx.voice_client:
|
||||||
@@ -144,35 +192,45 @@ class QueueControls(View):
|
|||||||
await self.refresh_message(interaction)
|
await self.refresh_message(interaction)
|
||||||
|
|
||||||
@discord.ui.button(label="🔀 Shuffle", style=discord.ButtonStyle.secondary)
|
@discord.ui.button(label="🔀 Shuffle", style=discord.ButtonStyle.secondary)
|
||||||
async def shuffle_button(self, interaction: discord.Interaction, button: Button):
|
async def shuffle_button(self, interaction: discord.Interaction, button: Button) -> None:
|
||||||
await queue.shuffle_queue(self.ctx.guild.id)
|
await queue.shuffle_queue(self.ctx.guild.id)
|
||||||
await self.refresh_message(interaction)
|
await self.refresh_message(interaction)
|
||||||
|
|
||||||
@discord.ui.button(label="🔁 Loop", style=discord.ButtonStyle.secondary)
|
@discord.ui.button(label="🔁 Loop", style=discord.ButtonStyle.secondary)
|
||||||
async def loop_button(self, interaction: discord.Interaction, button: Button):
|
async def loop_button(self, interaction: discord.Interaction, button: Button) -> None:
|
||||||
current_mode = await queue.get_loop_mode(self.ctx.guild.id)
|
current_mode = await queue.get_loop_mode(self.ctx.guild.id)
|
||||||
new_mode = 'song' if current_mode == 'off' else ('queue' if current_mode == 'song' else 'off')
|
new_mode = 'song' if current_mode == 'off' else ('queue' if current_mode == 'song' else 'off')
|
||||||
await queue.set_loop_mode(self.ctx.guild.id, new_mode)
|
await queue.set_loop_mode(self.ctx.guild.id, new_mode)
|
||||||
await self.refresh_message(interaction)
|
await self.refresh_message(interaction)
|
||||||
|
|
||||||
@discord.ui.button(label="🗑️ Clear", style=discord.ButtonStyle.danger)
|
@discord.ui.button(label="🗑️ Clear", style=discord.ButtonStyle.danger)
|
||||||
async def clear_button(self, interaction: discord.Interaction, button: Button):
|
async def clear_button(self, interaction: discord.Interaction, button: Button) -> None:
|
||||||
await queue.clear(self.ctx.guild.id)
|
await queue.clear(self.ctx.guild.id)
|
||||||
if self.ctx.voice_client and self.ctx.voice_client.is_playing():
|
if self.ctx.voice_client and self.ctx.voice_client.is_playing():
|
||||||
self.ctx.voice_client.stop()
|
self.ctx.voice_client.stop()
|
||||||
await self.refresh_message(interaction)
|
await self.refresh_message(interaction)
|
||||||
|
|
||||||
@discord.ui.button(label="🔄 Refresh", style=discord.ButtonStyle.gray)
|
@discord.ui.button(label="🔄 Refresh", style=discord.ButtonStyle.gray)
|
||||||
async def refresh_button(self, interaction: discord.Interaction, button: Button):
|
async def refresh_button(self, interaction: discord.Interaction, button: Button) -> None:
|
||||||
await self.refresh_message(interaction)
|
await self.refresh_message(interaction)
|
||||||
|
|
||||||
async def generate_queue_ui(ctx: Context):
|
|
||||||
|
async def generate_queue_ui(ctx: Context) -> tuple[discord.Embed, QueueControls]:
|
||||||
|
"""
|
||||||
|
Generate the queue embed and controls
|
||||||
|
|
||||||
|
Args:
|
||||||
|
ctx: Command context
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (embed, view) for displaying queue
|
||||||
|
"""
|
||||||
guild_id = ctx.guild.id
|
guild_id = ctx.guild.id
|
||||||
server = ctx.guild
|
server = ctx.guild
|
||||||
|
|
||||||
# Fetch all data
|
# Fetch all data
|
||||||
n, songs = await queue.grab_songs(guild_id)
|
n, songs = await queue.grab_songs(guild_id)
|
||||||
current = await queue.get_current_song(guild_id) # Returns title, thumbnail, url
|
current = await queue.get_current_song(guild_id)
|
||||||
loop_mode = await queue.get_loop_mode(guild_id)
|
loop_mode = await queue.get_loop_mode(guild_id)
|
||||||
volume = await queue.get_volume(guild_id)
|
volume = await queue.get_volume(guild_id)
|
||||||
effect = await queue.get_effect(guild_id)
|
effect = await queue.get_effect(guild_id)
|
||||||
@@ -183,10 +241,10 @@ async def generate_queue_ui(ctx: Context):
|
|||||||
|
|
||||||
# Map loop mode to nicer text
|
# Map loop mode to nicer text
|
||||||
loop_map = {
|
loop_map = {
|
||||||
'off': {'emoji': '⏹️', 'text': 'Off'},
|
'off': {'emoji': '⏹️', 'text': 'Off'},
|
||||||
'song': {'emoji': '🔂', 'text': 'Song'},
|
'song': {'emoji': '🔂', 'text': 'Song'},
|
||||||
'queue': {'emoji': '🔁', 'text': 'Queue'}
|
'queue': {'emoji': '🔁', 'text': 'Queue'}
|
||||||
}
|
}
|
||||||
loop_info = loop_map.get(loop_mode, loop_map['off'])
|
loop_info = loop_map.get(loop_mode, loop_map['off'])
|
||||||
loop_emoji = loop_info['emoji']
|
loop_emoji = loop_info['emoji']
|
||||||
loop_text = loop_info['text']
|
loop_text = loop_info['text']
|
||||||
@@ -197,11 +255,9 @@ async def generate_queue_ui(ctx: Context):
|
|||||||
|
|
||||||
# Progress Bar Logic
|
# Progress Bar Logic
|
||||||
progress_bar = ""
|
progress_bar = ""
|
||||||
# Only show bar if duration > 0 (prevents weird 00:00 bars)
|
|
||||||
if duration > 0:
|
if duration > 0:
|
||||||
bar_length = 16
|
bar_length = 16
|
||||||
filled = int((percentage / 100) * bar_length)
|
filled = int((percentage / 100) * bar_length)
|
||||||
# Ensure filled isn't bigger than length
|
|
||||||
filled = min(filled, bar_length)
|
filled = min(filled, bar_length)
|
||||||
bar_str = '▬' * filled + '🔘' + '▬' * (bar_length - filled)
|
bar_str = '▬' * filled + '🔘' + '▬' * (bar_length - filled)
|
||||||
progress_bar = f"\n`{format_time(elapsed)}` {bar_str} `{format_time(duration)}`"
|
progress_bar = f"\n`{format_time(elapsed)}` {bar_str} `{format_time(duration)}`"
|
||||||
@@ -215,14 +271,11 @@ async def generate_queue_ui(ctx: Context):
|
|||||||
description = "## 💤 Nothing is playing\nUse `/play` to start the party!"
|
description = "## 💤 Nothing is playing\nUse `/play` to start the party!"
|
||||||
else:
|
else:
|
||||||
# Create Hyperlink [Title](URL)
|
# Create Hyperlink [Title](URL)
|
||||||
# If no URL exists, link to Discord homepage as fallback or just bold
|
|
||||||
if url and url.startswith("http"):
|
if url and url.startswith("http"):
|
||||||
song_link = f"[{title}]({url})"
|
song_link = f"[{title}]({url})"
|
||||||
else:
|
else:
|
||||||
song_link = f"**{title}**"
|
song_link = f"**{title}**"
|
||||||
|
|
||||||
# CLEARER STATUS LINE:
|
|
||||||
# Loop: Mode | Effect: Name | Vol: %
|
|
||||||
description = (
|
description = (
|
||||||
f"## 💿 Now Playing\n"
|
f"## 💿 Now Playing\n"
|
||||||
f"### {song_link}\n"
|
f"### {song_link}\n"
|
||||||
@@ -241,7 +294,7 @@ async def generate_queue_ui(ctx: Context):
|
|||||||
|
|
||||||
embed.add_field(name="⏳ Up Next", value=queue_text, inline=False)
|
embed.add_field(name="⏳ Up Next", value=queue_text, inline=False)
|
||||||
|
|
||||||
remaining = (n) - 9 # Approx calculation based on your grabbing logic
|
remaining = n - 9
|
||||||
if remaining > 0:
|
if remaining > 0:
|
||||||
embed.set_footer(text=f"Waitlist: {remaining} more songs...")
|
embed.set_footer(text=f"Waitlist: {remaining} more songs...")
|
||||||
else:
|
else:
|
||||||
@@ -251,23 +304,38 @@ async def generate_queue_ui(ctx: Context):
|
|||||||
if thumb and isinstance(thumb, str) and thumb.startswith("http"):
|
if thumb and isinstance(thumb, str) and thumb.startswith("http"):
|
||||||
embed.set_thumbnail(url=thumb)
|
embed.set_thumbnail(url=thumb)
|
||||||
elif server.icon:
|
elif server.icon:
|
||||||
# Fallback to server icon
|
|
||||||
embed.set_thumbnail(url=server.icon.url)
|
embed.set_thumbnail(url=server.icon.url)
|
||||||
|
|
||||||
view = QueueControls(ctx)
|
view = QueueControls(ctx)
|
||||||
return embed, view
|
return embed, view
|
||||||
|
|
||||||
# The command entry point calls this
|
|
||||||
async def display_server_queue(ctx: Context, songs, n):
|
async def display_server_queue(ctx: Context, songs: list, n: int) -> None:
|
||||||
|
"""
|
||||||
|
Display the server's queue with interactive controls
|
||||||
|
|
||||||
|
Args:
|
||||||
|
ctx: Command context
|
||||||
|
songs: List of songs in queue
|
||||||
|
n: Total number of songs
|
||||||
|
"""
|
||||||
embed, view = await generate_queue_ui(ctx)
|
embed, view = await generate_queue_ui(ctx)
|
||||||
await ctx.send(embed=embed, view=view)
|
await ctx.send(embed=embed, view=view)
|
||||||
|
|
||||||
# Build a display message for queuing a new song
|
|
||||||
async def queue_message(ctx: Context, data: dict):
|
async def queue_message(ctx: Context, data: dict[str, Any]) -> None:
|
||||||
|
"""
|
||||||
|
Display a message when a song is queued
|
||||||
|
|
||||||
|
Args:
|
||||||
|
ctx: Command context
|
||||||
|
data: Song data dictionary
|
||||||
|
"""
|
||||||
msg = discord.Embed(
|
msg = discord.Embed(
|
||||||
title="🎵 Song Queued",
|
title="🎵 Song Queued",
|
||||||
description=f"**{data['title']}**",
|
description=f"**{data['title']}**",
|
||||||
color=discord.Color.green())
|
color=discord.Color.green()
|
||||||
|
)
|
||||||
|
|
||||||
msg.set_thumbnail(url=data['thumbnail'])
|
msg.set_thumbnail(url=data['thumbnail'])
|
||||||
msg.add_field(name="⏱️ Duration", value=format_time(data['duration']), inline=True)
|
msg.add_field(name="⏱️ Duration", value=format_time(data['duration']), inline=True)
|
||||||
@@ -276,9 +344,23 @@ async def queue_message(ctx: Context, data: dict):
|
|||||||
|
|
||||||
await ctx.send(embed=msg)
|
await ctx.send(embed=msg)
|
||||||
|
|
||||||
# Converts seconds into more readable format
|
|
||||||
def format_time(seconds):
|
# ===================================
|
||||||
|
# Utility Functions
|
||||||
|
# ===================================
|
||||||
|
|
||||||
|
def format_time(seconds: int | float) -> str:
|
||||||
|
"""
|
||||||
|
Convert seconds into readable time format (MM:SS or HH:MM:SS)
|
||||||
|
|
||||||
|
Args:
|
||||||
|
seconds: Time in seconds
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Formatted time string
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
|
seconds = int(seconds)
|
||||||
minutes, seconds = divmod(seconds, 60)
|
minutes, seconds = divmod(seconds, 60)
|
||||||
hours, minutes = divmod(minutes, 60)
|
hours, minutes = divmod(minutes, 60)
|
||||||
|
|
||||||
|
|||||||
297
config.py
297
config.py
@@ -1,115 +1,198 @@
|
|||||||
# config.py
|
# config.py
|
||||||
# This file should parse all configurations within the bot
|
# Modern configuration management using environment variables
|
||||||
|
|
||||||
|
import os
|
||||||
import discord
|
import discord
|
||||||
from discord import Color
|
from discord import Color
|
||||||
import json
|
from dotenv import load_dotenv
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
# Read data from JSON file in ./data/config.json
|
# Load environment variables from .env file
|
||||||
def read_data():
|
load_dotenv()
|
||||||
with open("./data/config.json", "r") as file:
|
|
||||||
return json.load(file)
|
|
||||||
|
|
||||||
raise Exception("Could not load config data")
|
# ===================================
|
||||||
|
# Environment Detection
|
||||||
|
# ===================================
|
||||||
def get_spotify_creds():
|
ENVIRONMENT = os.getenv("ENVIRONMENT", "dev").lower()
|
||||||
data = read_data()
|
IS_PRODUCTION = ENVIRONMENT == "live"
|
||||||
data = data.get("spotify")
|
IS_DEVELOPMENT = ENVIRONMENT == "dev"
|
||||||
|
|
||||||
SCID = data.get("SCID")
|
|
||||||
secret = data.get("SECRET")
|
|
||||||
|
|
||||||
return SCID, secret
|
|
||||||
|
|
||||||
|
|
||||||
# Reading prefix
|
|
||||||
def get_prefix():
|
|
||||||
data = read_data()
|
|
||||||
|
|
||||||
prefix = data.get('prefix')
|
|
||||||
if prefix:
|
|
||||||
return prefix
|
|
||||||
|
|
||||||
raise Exception("Missing config data: prefix")
|
|
||||||
|
|
||||||
|
|
||||||
# Fetch the bot secret token
|
|
||||||
def get_login(bot):
|
|
||||||
data = read_data()
|
|
||||||
if data is False or data.get(f"{bot}bot") is False:
|
|
||||||
raise Exception(f"Missing config data: {bot}bot")
|
|
||||||
|
|
||||||
data = data.get(f"{bot}bot")
|
|
||||||
return data.get("secret")
|
|
||||||
|
|
||||||
|
|
||||||
# Read the status and text data
|
|
||||||
def get_status():
|
|
||||||
data = read_data()
|
|
||||||
|
|
||||||
if data is False or data.get('status') is False:
|
|
||||||
raise Exception("Missing config data: status")
|
|
||||||
|
|
||||||
# Find type
|
|
||||||
data = data.get('status')
|
|
||||||
return translate_status(
|
|
||||||
data.get('type'),
|
|
||||||
data.get('text'),
|
|
||||||
data.get('link')
|
|
||||||
)
|
|
||||||
|
|
||||||
# Get colors from colorscheme
|
|
||||||
def get_color(color):
|
|
||||||
data = read_data()
|
|
||||||
|
|
||||||
if data is False or data.get('status') is False:
|
|
||||||
raise Exception("Missing config data: color")
|
|
||||||
|
|
||||||
# Grab color
|
|
||||||
string_value = data.get("colorscheme").get(color)
|
|
||||||
hex_value = Color.from_str(string_value)
|
|
||||||
return hex_value
|
|
||||||
|
|
||||||
|
|
||||||
# Taking JSON variables and converting them into a presence
|
|
||||||
# Use None url incase not provided
|
|
||||||
def translate_status(status_type, status_text, status_url=None):
|
|
||||||
if status_type == "playing":
|
|
||||||
return discord.Activity(
|
|
||||||
type=discord.ActivityType.playing,
|
|
||||||
name=status_text
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
elif status_type == "streaming":
|
|
||||||
return discord.Activity(
|
|
||||||
type=discord.ActivityType.streaming,
|
|
||||||
name=status_text,
|
|
||||||
url=status_url
|
|
||||||
)
|
|
||||||
|
|
||||||
elif status_type == "listening":
|
|
||||||
return discord.Activity(
|
|
||||||
type=discord.ActivityType.listening,
|
|
||||||
name=status_text
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
elif status_type == "watching":
|
|
||||||
return discord.Activity(
|
|
||||||
type=discord.ActivityType.watching,
|
|
||||||
name=status_text
|
|
||||||
)
|
|
||||||
|
|
||||||
elif status_type == "competing":
|
|
||||||
return discord.Activity(
|
|
||||||
type=discord.ActivityType.competing,
|
|
||||||
name=status_text
|
|
||||||
)
|
|
||||||
|
|
||||||
#TODO
|
|
||||||
# Implement custom status type
|
|
||||||
|
|
||||||
|
# ===================================
|
||||||
|
# Discord Configuration
|
||||||
|
# ===================================
|
||||||
|
def get_discord_token() -> str:
|
||||||
|
"""Get the appropriate Discord token based on environment"""
|
||||||
|
if IS_PRODUCTION:
|
||||||
|
token = os.getenv("DISCORD_TOKEN_LIVE")
|
||||||
|
if not token:
|
||||||
|
raise ValueError("DISCORD_TOKEN_LIVE not found in environment!")
|
||||||
|
return token
|
||||||
else:
|
else:
|
||||||
raise Exception(f"Invalid status type: {status_type}")
|
token = os.getenv("DISCORD_TOKEN_DEV")
|
||||||
|
if not token:
|
||||||
|
raise ValueError("DISCORD_TOKEN_DEV not found in environment!")
|
||||||
|
return token
|
||||||
|
|
||||||
|
|
||||||
|
def get_prefix() -> str:
|
||||||
|
"""Get command prefix (default: =)"""
|
||||||
|
return os.getenv("DISCORD_PREFIX", "=")
|
||||||
|
|
||||||
|
|
||||||
|
# ===================================
|
||||||
|
# Spotify Configuration
|
||||||
|
# ===================================
|
||||||
|
def get_spotify_creds() -> tuple[str, str]:
|
||||||
|
"""Get Spotify API credentials"""
|
||||||
|
client_id = os.getenv("SPOTIFY_CLIENT_ID")
|
||||||
|
client_secret = os.getenv("SPOTIFY_CLIENT_SECRET")
|
||||||
|
|
||||||
|
if not client_id or not client_secret:
|
||||||
|
raise ValueError("Spotify credentials not found in environment!")
|
||||||
|
|
||||||
|
return client_id, client_secret
|
||||||
|
|
||||||
|
|
||||||
|
# ===================================
|
||||||
|
# Database Configuration
|
||||||
|
# ===================================
|
||||||
|
def get_db_path() -> str:
|
||||||
|
"""Get SQLite database path"""
|
||||||
|
return os.getenv("DB_PATH", "./data/music.db")
|
||||||
|
|
||||||
|
|
||||||
|
# Future PostgreSQL config
|
||||||
|
def get_postgres_url() -> Optional[str]:
|
||||||
|
"""Get PostgreSQL connection URL (for future migration)"""
|
||||||
|
host = os.getenv("DB_HOST")
|
||||||
|
port = os.getenv("DB_PORT", "5432")
|
||||||
|
name = os.getenv("DB_NAME")
|
||||||
|
user = os.getenv("DB_USER")
|
||||||
|
password = os.getenv("DB_PASSWORD")
|
||||||
|
|
||||||
|
if all([host, name, user, password]):
|
||||||
|
return f"postgresql://{user}:{password}@{host}:{port}/{name}"
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# ===================================
|
||||||
|
# Bot Status/Presence
|
||||||
|
# ===================================
|
||||||
|
def get_status() -> discord.Activity:
|
||||||
|
"""Get bot status/presence"""
|
||||||
|
status_type = os.getenv("STATUS_TYPE", "listening").lower()
|
||||||
|
status_text = os.getenv("STATUS_TEXT", "Zilean's Theme")
|
||||||
|
status_url = os.getenv("STATUS_URL")
|
||||||
|
|
||||||
|
return translate_status(status_type, status_text, status_url)
|
||||||
|
|
||||||
|
|
||||||
|
def translate_status(
|
||||||
|
status_type: str,
|
||||||
|
status_text: str,
|
||||||
|
status_url: Optional[str] = None
|
||||||
|
) -> discord.Activity:
|
||||||
|
"""Convert status type string to Discord Activity"""
|
||||||
|
|
||||||
|
status_map = {
|
||||||
|
"playing": discord.ActivityType.playing,
|
||||||
|
"streaming": discord.ActivityType.streaming,
|
||||||
|
"listening": discord.ActivityType.listening,
|
||||||
|
"watching": discord.ActivityType.watching,
|
||||||
|
"competing": discord.ActivityType.competing,
|
||||||
|
}
|
||||||
|
|
||||||
|
activity_type = status_map.get(status_type)
|
||||||
|
if not activity_type:
|
||||||
|
raise ValueError(f"Invalid status type: {status_type}")
|
||||||
|
|
||||||
|
# Streaming requires URL
|
||||||
|
if status_type == "streaming":
|
||||||
|
if not status_url:
|
||||||
|
raise ValueError("Streaming status requires STATUS_URL")
|
||||||
|
return discord.Activity(
|
||||||
|
type=activity_type,
|
||||||
|
name=status_text,
|
||||||
|
url=status_url
|
||||||
|
)
|
||||||
|
|
||||||
|
return discord.Activity(type=activity_type, name=status_text)
|
||||||
|
|
||||||
|
|
||||||
|
# ===================================
|
||||||
|
# Color Scheme
|
||||||
|
# ===================================
|
||||||
|
def get_color(color_name: str) -> Color:
|
||||||
|
"""Get color from environment (hex format)"""
|
||||||
|
color_map = {
|
||||||
|
"primary": os.getenv("COLOR_PRIMARY", "#7289DA"),
|
||||||
|
"success": os.getenv("COLOR_SUCCESS", "#43B581"),
|
||||||
|
"error": os.getenv("COLOR_ERROR", "#F04747"),
|
||||||
|
"warning": os.getenv("COLOR_WARNING", "#FAA61A"),
|
||||||
|
}
|
||||||
|
|
||||||
|
hex_value = color_map.get(color_name.lower())
|
||||||
|
if not hex_value:
|
||||||
|
# Default to Discord blurple
|
||||||
|
hex_value = "#7289DA"
|
||||||
|
|
||||||
|
return Color.from_str(hex_value)
|
||||||
|
|
||||||
|
|
||||||
|
# ===================================
|
||||||
|
# Logging Configuration
|
||||||
|
# ===================================
|
||||||
|
def get_log_level() -> str:
|
||||||
|
"""Get logging level from environment"""
|
||||||
|
return os.getenv("LOG_LEVEL", "INFO").upper()
|
||||||
|
|
||||||
|
|
||||||
|
# ===================================
|
||||||
|
# Legacy Support (for backward compatibility)
|
||||||
|
# ===================================
|
||||||
|
def get_login(bot: str) -> str:
|
||||||
|
"""Legacy function - maps to new get_discord_token()"""
|
||||||
|
# Ignore the 'bot' parameter, use ENVIRONMENT instead
|
||||||
|
return get_discord_token()
|
||||||
|
|
||||||
|
|
||||||
|
# ===================================
|
||||||
|
# Validation
|
||||||
|
# ===================================
|
||||||
|
def validate_config():
|
||||||
|
"""Validate that all required config is present"""
|
||||||
|
errors = []
|
||||||
|
|
||||||
|
# Check Discord token
|
||||||
|
try:
|
||||||
|
get_discord_token()
|
||||||
|
except ValueError as e:
|
||||||
|
errors.append(str(e))
|
||||||
|
|
||||||
|
# Check Spotify creds
|
||||||
|
try:
|
||||||
|
get_spotify_creds()
|
||||||
|
except ValueError as e:
|
||||||
|
errors.append(str(e))
|
||||||
|
|
||||||
|
if errors:
|
||||||
|
error_msg = "\n".join(errors)
|
||||||
|
raise ValueError(f"Configuration errors:\n{error_msg}")
|
||||||
|
|
||||||
|
print(f"✅ Configuration validated (Environment: {ENVIRONMENT})")
|
||||||
|
|
||||||
|
|
||||||
|
# ===================================
|
||||||
|
# Startup Info
|
||||||
|
# ===================================
|
||||||
|
def print_config_info():
|
||||||
|
"""Print configuration summary (without secrets!)"""
|
||||||
|
print("=" * 50)
|
||||||
|
print("🎵 Groovy-Zilean Configuration")
|
||||||
|
print("=" * 50)
|
||||||
|
print(f"Environment: {ENVIRONMENT.upper()}")
|
||||||
|
print(f"Prefix: {get_prefix()}")
|
||||||
|
print(f"Database: {get_db_path()}")
|
||||||
|
print(f"Log Level: {get_log_level()}")
|
||||||
|
print(f"Spotify: {'Configured ✅' if os.getenv('SPOTIFY_CLIENT_ID') else 'Not configured ❌'}")
|
||||||
|
print("=" * 50)
|
||||||
|
|||||||
13
main.py
13
main.py
@@ -3,6 +3,16 @@ from bot import Groovy
|
|||||||
import config
|
import config
|
||||||
import help
|
import help
|
||||||
|
|
||||||
|
# Validate configuration before starting
|
||||||
|
try:
|
||||||
|
config.validate_config()
|
||||||
|
config.print_config_info()
|
||||||
|
except ValueError as e:
|
||||||
|
print(f"❌ Configuration Error:\n{e}")
|
||||||
|
print("\n💡 Tip: Copy .env.example to .env and fill in your values")
|
||||||
|
exit(1)
|
||||||
|
|
||||||
|
# Initialize bot with validated config
|
||||||
client = Groovy(command_prefix=config.get_prefix(), intents=discord.Intents.all())
|
client = Groovy(command_prefix=config.get_prefix(), intents=discord.Intents.all())
|
||||||
|
|
||||||
@client.event
|
@client.event
|
||||||
@@ -25,4 +35,5 @@ async def on_voice_state_update(member, before, after):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error auto-disconnecting: {e}")
|
print(f"Error auto-disconnecting: {e}")
|
||||||
|
|
||||||
client.run(config.get_login("live"))
|
# Run bot with environment-appropriate token
|
||||||
|
client.run(config.get_discord_token())
|
||||||
|
|||||||
31
mypy.ini
Normal file
31
mypy.ini
Normal file
@@ -0,0 +1,31 @@
|
|||||||
|
# mypy configuration for groovy-zilean
|
||||||
|
# Type checking configuration that's practical for a Discord bot
|
||||||
|
|
||||||
|
[mypy]
|
||||||
|
# Python version
|
||||||
|
python_version = 3.13
|
||||||
|
|
||||||
|
# Ignore missing imports for libraries without type stubs
|
||||||
|
# Discord.py, spotipy, yt-dlp don't have complete type stubs
|
||||||
|
ignore_missing_imports = True
|
||||||
|
|
||||||
|
# Be strict about our own code
|
||||||
|
# Start lenient, can tighten later
|
||||||
|
disallow_untyped_defs = False
|
||||||
|
check_untyped_defs = True
|
||||||
|
# Too noisy with discord.py
|
||||||
|
warn_return_any = False
|
||||||
|
warn_unused_configs = True
|
||||||
|
|
||||||
|
# Exclude patterns
|
||||||
|
exclude = venv/
|
||||||
|
|
||||||
|
# Per-module overrides
|
||||||
|
[mypy-discord.*]
|
||||||
|
ignore_missing_imports = True
|
||||||
|
|
||||||
|
[mypy-spotipy.*]
|
||||||
|
ignore_missing_imports = True
|
||||||
|
|
||||||
|
[mypy-yt_dlp.*]
|
||||||
|
ignore_missing_imports = True
|
||||||
@@ -1,12 +1,14 @@
|
|||||||
# Core bot framework
|
# Core bot framework
|
||||||
discord.py==2.6.4
|
discord.py>=2.6.4
|
||||||
aiohttp==3.8.4
|
aiohttp>=3.9.0
|
||||||
PyNaCl==1.5.0
|
PyNaCl>=1.5.0
|
||||||
spotipy==2.23.0
|
spotipy>=2.23.0
|
||||||
|
|
||||||
|
# Configuration management
|
||||||
|
python-dotenv>=1.0.0
|
||||||
|
|
||||||
# YouTube extractor
|
# YouTube extractor
|
||||||
yt-dlp>=2025.10.14
|
yt-dlp>=2025.10.14
|
||||||
|
|
||||||
# System dependencies
|
# Audio metadata (if needed by yt-dlp)
|
||||||
PyAudio==0.2.13
|
mutagen>=1.47.0
|
||||||
mutagen==1.46.0
|
|
||||||
|
|||||||
Reference in New Issue
Block a user