Compare commits

4 Commits

13 changed files with 1215 additions and 527 deletions

67
.env_example Normal file
View File

@@ -0,0 +1,67 @@
# Groovy-Zilean Configuration
# Copy this file to .env and fill in your values
# NEVER commit .env to git!
# ===================================
# Environment Selection
# ===================================
# Set to "dev" for development bot, "live" for production
ENVIRONMENT=dev
# ===================================
# Discord Bot Tokens
# ===================================
DISCORD_TOKEN_DEV=
DISCORD_TOKEN_LIVE=
# Bot settings
DISCORD_PREFIX=
# ===================================
# Spotify Integration
# ===================================
SPOTIFY_CLIENT_ID=
SPOTIFY_CLIENT_SECRET=
# ===================================
# Database
# ===================================
# For now (SQLite)
DB_PATH=./data/music.db
# For future (PostgreSQL)
# DB_HOST=localhost
# DB_PORT=5432
# DB_NAME=groovy_zilean
# DB_USER=groovy
# DB_PASSWORD=your_db_password
# ===================================
# Bot Status/Presence
# ===================================
# Types: playing, listening, watching, streaming, competing
STATUS_TYPE=listening
STATUS_TEXT==help | /help
# STATUS_URL= # Only needed for streaming type
# ===================================
# Color Scheme (hex colors)
# ===================================
COLOR_PRIMARY=#7289DA
COLOR_SUCCESS=#43B581
COLOR_ERROR=#F04747
COLOR_WARNING=#FAA61A
# ===================================
# Web Dashboard (Future)
# ===================================
# DISCORD_CLIENT_ID=your_oauth_client_id
# DISCORD_CLIENT_SECRET=your_oauth_secret
# DISCORD_REDIRECT_URI=http://localhost:8000/callback
# WEB_SECRET_KEY=random_secret_for_sessions
# ===================================
# Logging
# ===================================
LOG_LEVEL=INFO
# Options: DEBUG, INFO, WARNING, ERROR, CRITICAL

2
.gitignore vendored
View File

@@ -160,4 +160,4 @@ cython_debug/
#.idea/
# My stuff
data/
data/*.db

201
SETUP.md Normal file
View File

@@ -0,0 +1,201 @@
# Groovy-Zilean Setup Guide
Quick start guide for getting groovy-zilean running locally or in production.
---
## Prerequisites
- **Python 3.11+** (3.9 deprecated by yt-dlp)
- **FFmpeg** installed on your system
- Discord Bot Token ([Discord Developer Portal](https://discord.com/developers/applications))
- Spotify API Credentials ([Spotify Developer Dashboard](https://developer.spotify.com/dashboard))
---
## 1. Clone & Setup Environment
```bash
# Clone the repository
git clone <your-repo-url>
cd groovy-zilean
# Create virtual environment (keeps dependencies isolated)
python3.12 -m venv venv
# Activate virtual environment
source venv/bin/activate # Linux/Mac
# OR
venv\Scripts\activate # Windows
# Install dependencies
pip install --upgrade pip
pip install -r requirements.txt
```
---
## 2. Configuration
### Create `.env` file
```bash
# Copy the example file
cp .env.example .env
# Edit with your favorite editor
nano .env
# OR
vim .env
# OR
code .env # VS Code
```
### Fill in Required Values
At minimum, you need:
```env
# Choose environment: "dev" or "live"
ENVIRONMENT=dev
# Discord bot tokens (get from Discord Developer Portal)
DISCORD_TOKEN_DEV=your_dev_bot_token_here
DISCORD_TOKEN_LIVE=your_live_bot_token_here
# Spotify credentials (get from Spotify Developer Dashboard)
SPOTIFY_CLIENT_ID=your_spotify_client_id
SPOTIFY_CLIENT_SECRET=your_spotify_secret
```
**Optional but recommended:**
```env
# Bot settings
DISCORD_PREFIX==
STATUS_TYPE=listening
STATUS_TEXT=Zilean's Theme
# Colors (hex format)
COLOR_PRIMARY=#7289DA
COLOR_SUCCESS=#43B581
COLOR_ERROR=#F04747
COLOR_WARNING=#FAA61A
```
---
## 3. Create Data Directory
```bash
# Create directory for database
mkdir -p data
# The database file (music.db) will be created automatically on first run
```
---
## 4. Run the Bot
### Development Mode
```bash
# Make sure .env has ENVIRONMENT=dev
source venv/bin/activate # If not already activated
python main.py
```
### Production Mode
```bash
# Change .env to ENVIRONMENT=live
source venv/bin/activate
python main.py
```
---
## 5. Switching Between Dev and Live
**Super easy!** Just change one line in `.env`:
```bash
# For development bot
ENVIRONMENT=dev
# For production bot
ENVIRONMENT=live
```
The bot will automatically use the correct token!
---
## Troubleshooting
### "Configuration Error: DISCORD_TOKEN_DEV not found"
- Make sure you copied `.env.example` to `.env`
- Check that `.env` has the token values filled in
- Token should NOT have quotes around it
### "No module named 'dotenv'"
```bash
pip install python-dotenv
```
### "FFmpeg not found"
```bash
# Debian/Ubuntu
sudo apt install ffmpeg
# macOS
brew install ffmpeg
# Arch Linux
sudo pacman -S ffmpeg
```
### Python version issues
yt-dlp requires Python 3.10+. Check your version:
```bash
python --version
```
If too old, install newer Python and recreate venv:
```bash
python3.12 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
```
---
## Project Structure
```
groovy-zilean/
├── main.py # Entry point (run this!)
├── bot.py # Bot class definition
├── config.py # Configuration management
├── .env # YOUR secrets (never commit!)
├── .env.example # Template (safe to commit)
├── requirements.txt # Python dependencies
├── cogs/
│ └── music/ # Music functionality
│ ├── main.py # Commands
│ ├── queue.py # Queue management
│ ├── util.py # Utilities
│ └── translate.py # URL/search handling
└── data/
└── music.db # SQLite database (auto-created)
```
---
## Next Steps
- Check out `PRODUCTION_ROADMAP.md` for the full development plan
- See `README.md` for feature list and usage
- Join your test server and try commands!
**Happy coding!** 🎵⏱️

25
bot.py
View File

@@ -1,20 +1,30 @@
"""
Groovy-Zilean Bot Class
Main bot implementation with cog loading and background tasks
"""
from typing import Any
from discord.ext import commands
from discord.ext import tasks
from cogs.music.main import music
from help import GroovyHelp # Import the new Help Cog
from help import GroovyHelp
cogs = [
# List of cogs to load on startup
cogs: list[type[commands.Cog]] = [
music,
GroovyHelp
]
class Groovy(commands.Bot):
def __init__(self, *args, **kwargs):
"""Custom bot class with automatic cog loading and inactivity checking"""
def __init__(self, *args: Any, **kwargs: Any) -> None:
# We force help_command to None because we are using a custom Cog for it
# But we pass all other args (like command_prefix) to the parent
super().__init__(*args, help_command=None, **kwargs)
async def on_ready(self):
async def on_ready(self) -> None:
import config # Imported here to avoid circular dependencies if any
# Set status
@@ -47,11 +57,12 @@ class Groovy(commands.Bot):
print(f"{self.user} is ready and online!")
@tasks.loop(seconds=30)
async def inactivity_checker(self):
"""Check for inactive voice connections"""
async def inactivity_checker(self) -> None:
"""Check for inactive voice connections every 30 seconds"""
from cogs.music import util
await util.check_inactivity(self)
@inactivity_checker.before_loop
async def before_inactivity_checker(self):
async def before_inactivity_checker(self) -> None:
"""Wait for bot to be ready before starting inactivity checker"""
await self.wait_until_ready()

315
cogs/music/db_manager.py Normal file
View File

@@ -0,0 +1,315 @@
"""
Database Manager for Groovy-Zilean
Centralizes all database operations and provides a clean interface.
Makes future PostgreSQL migration much easier.
"""
import sqlite3
from contextlib import contextmanager
from typing import Optional, List, Tuple, Any
import config
class DatabaseManager:
"""Manages database connections and operations"""
def __init__(self):
self.db_path = config.get_db_path()
@contextmanager
def get_connection(self):
"""
Context manager for database connections.
Automatically handles commit/rollback and closing.
Usage:
with db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(...)
"""
conn = sqlite3.connect(self.db_path)
try:
yield conn
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
def initialize_tables(self):
"""Create database tables if they don't exist"""
with self.get_connection() as conn:
cursor = conn.cursor()
# Create servers table
cursor.execute('''CREATE TABLE IF NOT EXISTS servers (
server_id TEXT PRIMARY KEY,
is_playing INTEGER DEFAULT 0,
song_name TEXT,
song_url TEXT,
song_thumbnail TEXT,
loop_mode TEXT DEFAULT 'off',
volume INTEGER DEFAULT 100,
effect TEXT DEFAULT 'none',
song_start_time REAL DEFAULT 0,
song_duration INTEGER DEFAULT 0
);''')
# Set all to not playing on startup
cursor.execute("UPDATE servers SET is_playing = 0;")
# Migrations for existing databases - add columns if missing
migrations = [
("loop_mode", "TEXT DEFAULT 'off'"),
("volume", "INTEGER DEFAULT 100"),
("effect", "TEXT DEFAULT 'none'"),
("song_start_time", "REAL DEFAULT 0"),
("song_duration", "INTEGER DEFAULT 0"),
("song_thumbnail", "TEXT DEFAULT ''"),
("song_url", "TEXT DEFAULT ''")
]
for col_name, col_type in migrations:
try:
cursor.execute(f"ALTER TABLE servers ADD COLUMN {col_name} {col_type};")
except sqlite3.OperationalError:
# Column already exists, skip
pass
# Create songs/queue table
cursor.execute('''CREATE TABLE IF NOT EXISTS songs (
server_id TEXT NOT NULL,
song_link TEXT,
queued_by TEXT,
position INTEGER NOT NULL,
title TEXT,
thumbnail TEXT,
duration INTEGER,
PRIMARY KEY (position),
FOREIGN KEY (server_id) REFERENCES servers(server_id)
);''')
# Clear all songs on startup
cursor.execute("DELETE FROM songs;")
# ===================================
# Server Operations
# ===================================
def ensure_server_exists(self, server_id: str) -> None:
"""Add server to database if it doesn't exist"""
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) FROM servers WHERE server_id = ?', (server_id,))
if cursor.fetchone()[0] == 0:
cursor.execute('''INSERT INTO servers (server_id, loop_mode, volume, effect, song_thumbnail, song_url)
VALUES (?, 'off', 100, 'none', '', '')''', (server_id,))
def set_server_playing(self, server_id: str, playing: bool) -> None:
"""Update server playing status"""
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
val = 1 if playing else 0
cursor.execute("UPDATE servers SET is_playing = ? WHERE server_id = ?", (val, server_id))
def is_server_playing(self, server_id: str) -> bool:
"""Check if server is currently playing"""
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
cursor.execute("SELECT is_playing FROM servers WHERE server_id = ?", (server_id,))
res = cursor.fetchone()
return True if res and res[0] == 1 else False
def set_current_song(self, server_id: str, title: str, url: str, thumbnail: str = "", duration: int = 0, start_time: float = 0) -> None:
"""Update currently playing song information"""
with self.get_connection() as conn:
cursor = conn.cursor()
# Ensure duration is an integer
try:
duration = int(duration)
except:
duration = 0
cursor.execute(''' UPDATE servers
SET song_name = ?, song_url = ?, song_thumbnail = ?, song_start_time = ?, song_duration = ?
WHERE server_id = ?''',
(title, url, thumbnail, start_time, duration, server_id))
def get_current_song(self, server_id: str) -> dict:
"""Get current song info"""
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(''' SELECT song_name, song_thumbnail, song_url FROM servers WHERE server_id = ? LIMIT 1;''', (server_id,))
result = cursor.fetchone()
if result:
return {'title': result[0], 'thumbnail': result[1], 'url': result[2]}
return {'title': "Nothing", 'thumbnail': None, 'url': ''}
def get_current_progress(self, server_id: str) -> Tuple[int, int, float]:
"""Get playback progress (elapsed, duration, percentage)"""
import time
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''SELECT song_start_time, song_duration, is_playing FROM servers WHERE server_id = ? LIMIT 1;''', (server_id,))
result = cursor.fetchone()
if not result or result[2] == 0:
return 0, 0, 0.0
start_time, duration, _ = result
if duration is None or duration == 0:
return 0, 0, 0.0
elapsed = int(time.time() - start_time)
elapsed = min(elapsed, duration)
percentage = (elapsed / duration) * 100 if duration > 0 else 0
return elapsed, duration, percentage
# ===================================
# Queue Operations
# ===================================
def add_song(self, server_id: str, song_link: str, queued_by: str, title: str, thumbnail: str = "", duration: int = 0, position: Optional[int] = None) -> int:
"""Add song to queue, returns position"""
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
if position is None:
# Add to end
cursor.execute("SELECT MAX(position) FROM songs WHERE server_id = ?", (server_id,))
max_pos = cursor.fetchone()[0]
position = (max_pos + 1) if max_pos is not None else 0
else:
# Insert at specific position (shift others down)
cursor.execute("UPDATE songs SET position = position + 1 WHERE server_id = ? AND position >= ?",
(server_id, position))
cursor.execute("""INSERT INTO songs VALUES (?, ?, ?, ?, ?, ?, ?)""",
(server_id, song_link, queued_by, position, title, thumbnail, duration))
return position
def get_next_song(self, server_id: str) -> Optional[Tuple]:
"""Get the next song in queue (doesn't remove it)"""
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''SELECT * FROM songs WHERE server_id = ? ORDER BY position LIMIT 1;''', (server_id,))
return cursor.fetchone()
def remove_song(self, server_id: str, position: int) -> None:
"""Remove song at position from queue"""
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''DELETE FROM songs WHERE server_id = ? AND position = ?''', (server_id, position))
def get_queue(self, server_id: str, limit: int = 10) -> Tuple[int, List[Tuple]]:
"""Get songs in queue (returns max_position, list of songs)"""
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
cursor.execute("SELECT title, duration, queued_by FROM songs WHERE server_id = ? ORDER BY position LIMIT ?",
(server_id, limit))
songs = cursor.fetchall()
cursor.execute("SELECT MAX(position) FROM songs WHERE server_id = ?", (server_id,))
max_pos = cursor.fetchone()[0]
max_pos = max_pos if max_pos is not None else -1
return max_pos, songs
def clear_queue(self, server_id: str) -> None:
"""Clear all songs from queue"""
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
cursor.execute("DELETE FROM songs WHERE server_id = ?", (server_id,))
def shuffle_queue(self, server_id: str) -> bool:
"""Shuffle the queue randomly, returns success"""
import random
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
cursor.execute("SELECT position, song_link, queued_by, title, thumbnail, duration FROM songs WHERE server_id = ? ORDER BY position",
(server_id,))
songs = cursor.fetchall()
if len(songs) <= 1:
return False
random.shuffle(songs)
cursor.execute("DELETE FROM songs WHERE server_id = ?", (server_id,))
for i, s in enumerate(songs):
cursor.execute("INSERT INTO songs VALUES (?, ?, ?, ?, ?, ?, ?)",
(server_id, s[1], s[2], i, s[3], s[4], s[5]))
return True
# ===================================
# Settings Operations
# ===================================
def get_loop_mode(self, server_id: str) -> str:
"""Get loop mode: 'off', 'song', or 'queue'"""
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
cursor.execute("SELECT loop_mode FROM servers WHERE server_id = ?", (server_id,))
res = cursor.fetchone()
return res[0] if res else 'off'
def set_loop_mode(self, server_id: str, mode: str) -> None:
"""Set loop mode: 'off', 'song', or 'queue'"""
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
cursor.execute("UPDATE servers SET loop_mode = ? WHERE server_id = ?", (mode, server_id))
def get_volume(self, server_id: str) -> int:
"""Get volume (0-200)"""
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
cursor.execute("SELECT volume FROM servers WHERE server_id = ?", (server_id,))
res = cursor.fetchone()
return res[0] if res else 100
def set_volume(self, server_id: str, volume: int) -> int:
"""Set volume (0-200), returns the set volume"""
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
cursor.execute("UPDATE servers SET volume = ? WHERE server_id = ?", (volume, server_id))
return volume
def get_effect(self, server_id: str) -> str:
"""Get current audio effect"""
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
cursor.execute("SELECT effect FROM servers WHERE server_id = ?", (server_id,))
res = cursor.fetchone()
return res[0] if res else 'none'
def set_effect(self, server_id: str, effect: str) -> None:
"""Set audio effect"""
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
cursor.execute("UPDATE servers SET effect = ? WHERE server_id = ?", (effect, server_id))
# Global instance
db = DatabaseManager()

View File

@@ -12,28 +12,7 @@ from cogs.music.help import music_help
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
# Fix this pls
import json
#from .. import config
# Read data from JSON file in ./data/config.json
def read_data():
with open("./data/config.json", "r") as file:
return json.load(file)
raise Exception("Could not load config data")
def get_spotify_creds():
data = read_data()
data = data.get("spotify")
SCID = data.get("SCID")
secret = data.get("SECRET")
return SCID, secret
import config # Use centralized config
@@ -51,10 +30,13 @@ class music(commands.Cog):
help_command.cog = self
self.help_command = help_command
SCID, secret = get_spotify_creds()
# Get Spotify credentials from centralized config
spotify_id, spotify_secret = config.get_spotify_creds()
# Authentication - without user
client_credentials_manager = SpotifyClientCredentials(client_id=SCID,
client_secret=secret)
client_credentials_manager = SpotifyClientCredentials(
client_id=spotify_id,
client_secret=spotify_secret
)
self.sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)
@@ -346,7 +328,7 @@ class music(commands.Cog):
app_commands.Choice(name="Song", value="song"),
app_commands.Choice(name="Queue", value="queue")
])
async def loop(self, ctx: Context, mode: str = None):
async def loop(self, ctx: Context, mode: str | None = None):
"""Toggle between loop modes or set a specific mode"""
server = ctx.guild
@@ -409,7 +391,7 @@ class music(commands.Cog):
description="Set playback volume",
aliases=['vol', 'v'])
@app_commands.describe(level="Volume level (0-200%, default shows current)")
async def volume(self, ctx: Context, level: int = None):
async def volume(self, ctx: Context, level: int | None = None):
"""Set or display the current volume"""
server = ctx.guild
@@ -453,7 +435,7 @@ class music(commands.Cog):
description="Apply audio effects to playback",
aliases=['fx', 'filter'])
@app_commands.describe(effect_name="The audio effect to apply (leave empty to see list)")
async def effect(self, ctx: Context, effect_name: str = None):
async def effect(self, ctx: Context, effect_name: str | None = None):
"""Apply or list audio effects"""
server = ctx.guild

View File

@@ -1,14 +1,14 @@
from http import server
import sqlite3
import random
import time
"""
Queue management for Groovy-Zilean music bot
Now using centralized database manager for cleaner code
"""
import discord
import asyncio
import time
from .translate import search_song
db_path = "./data/music.db"
from .db_manager import db
# Base FFmpeg options (will be modified by effects)
BASE_FFMPEG_OPTS = {
@@ -103,342 +103,224 @@ def get_effect_options(effect_name):
return effects.get(effect_name, effects['none'])
# Creates the tables if they don't exist
# ===================================
# Initialization
# ===================================
def initialize_tables():
# Connect to the database
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
"""Initialize database tables"""
db.initialize_tables()
# Create servers table if it doesn't exist
cursor.execute('''CREATE TABLE IF NOT EXISTS servers (
server_id TEXT PRIMARY KEY,
is_playing INTEGER DEFAULT 0,
song_name TEXT,
song_url TEXT,
song_thumbnail TEXT,
loop_mode TEXT DEFAULT 'off',
volume INTEGER DEFAULT 100,
effect TEXT DEFAULT 'none',
song_start_time REAL DEFAULT 0,
song_duration INTEGER DEFAULT 0
);''')
# Set all to not playing
cursor.execute("UPDATE servers SET is_playing = 0;")
# ===================================
# Queue Management
# ===================================
# Add new columns if they don't exist (for existing databases)
# Migrations for existing databases
columns = [
("loop_mode", "TEXT DEFAULT 'off'"),
("volume", "INTEGER DEFAULT 100"),
("effect", "TEXT DEFAULT 'none'"),
("song_start_time", "REAL DEFAULT 0"),
("song_duration", "INTEGER DEFAULT 0"),
("song_thumbnail", "TEXT DEFAULT ''"),
("song_url", "TEXT DEFAULT ''") # NEW
]
async def add_song(server_id, details, queued_by, position=None):
"""
Add a song to the queue
for col_name, col_type in columns:
try:
cursor.execute(f"ALTER TABLE servers ADD COLUMN {col_name} {col_type};")
except sqlite3.OperationalError:
pass
cursor.execute('''CREATE TABLE IF NOT EXISTS songs (
server_id TEXT NOT NULL,
song_link TEXT,
queued_by TEXT,
position INTEGER NOT NULL,
title TEXT,
thumbnail TEXT,
duration INTEGER,
PRIMARY KEY (position),
FOREIGN KEY (server_id) REFERENCES servers(server_id)
);''')
cursor.execute("DELETE FROM songs;")
conn.commit()
conn.close()
# Queue a song in the db
async def add_song(server_id, details, queued_by):
# Connect to db
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
await add_server(server_id, cursor, conn)
max_order_num = await get_max(server_id, cursor) + 1
Args:
server_id: Discord server ID
details: Dictionary with song info (url, title, thumbnail, duration) or string
queued_by: Username who queued the song
position: Optional position in queue (None = end of queue)
Returns:
Position in queue
"""
if isinstance(details, str):
# Fallback for raw strings
cursor.execute("""INSERT INTO songs VALUES (?, ?, ?, ?, ?, ?, ?)""",
(server_id, "Not grabbed", queued_by, max_order_num, details, "Unknown", 0))
# Fallback for raw strings (legacy support)
pos = db.add_song(
server_id=str(server_id),
song_link="Not grabbed",
queued_by=queued_by,
title=details,
thumbnail="Unknown",
duration=0,
position=position
)
else:
# Save exact duration and thumbnail from the start
cursor.execute("""INSERT INTO songs VALUES (?, ?, ?, ?, ?, ?, ?)""",
(server_id, details['url'], queued_by, max_order_num, details['title'], details['thumbnail'], details['duration']))
# Standard dictionary format
pos = db.add_song(
server_id=str(server_id),
song_link=details['url'],
queued_by=queued_by,
title=details['title'],
thumbnail=details.get('thumbnail', ''),
duration=details.get('duration', 0),
position=position
)
conn.commit()
conn.close()
return max_order_num
return pos
# Pop song from server (respects loop mode)
async def pop(server_id, ignore=False, skip_mode=False):
"""
Pop next song from queue
ignore: Skip the song without returning URL
skip_mode: True when called from skip command (affects loop song behavior)
Args:
server_id: Discord server ID
ignore: Skip the song without returning URL
skip_mode: True when called from skip command (affects loop song behavior)
Returns:
Song URL or None
"""
# Connect to db
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# JUST INCASE!
await add_server(server_id, cursor, conn)
# Fetch info: link(1), title(4), thumbnail(5), duration(6)
cursor.execute('''SELECT * FROM songs WHERE server_id = ? ORDER BY position LIMIT 1;''', (server_id,))
result = cursor.fetchone()
conn.commit()
conn.close()
result = db.get_next_song(str(server_id))
if result is None:
return None
elif ignore:
await mark_song_as_finished(server_id, result[3])
return None
elif result[1] == "Not grabbed":
# Lazy load logic
song_list = await search_song(result[4])
if not song_list:
return None
song = song_list[0]
await set_current_song(server_id, song['title'], song.get('thumbnail', ''), song.get('duration', 0))
# result format: (server_id, song_link, queued_by, position, title, thumbnail, duration)
server_id_str, song_link, queued_by, position, title, thumbnail, duration = result
if ignore:
db.remove_song(str(server_id), position)
return None
# Handle lazy-loaded songs (not yet fetched from YouTube)
if song_link == "Not grabbed":
song_list = await search_song(title)
if not song_list:
db.remove_song(str(server_id), position)
return None
song = song_list[0]
await set_current_song(
server_id,
song['title'],
song['url'],
song.get('thumbnail', ''),
song.get('duration', 0)
)
# Check loop mode before removing
loop_mode = await get_loop_mode(server_id)
if loop_mode != 'song': # Only remove if not looping song
await mark_song_as_finished(server_id, result[3])
if loop_mode != 'song':
db.remove_song(str(server_id), position)
return song['url']
# Pre-grabbed logic (Standard)
# result[1] is url, result[5] is thumbnail, result[6] is duration
await set_current_song(server_id, result[4], result[1], result[5], result[6])
# Standard pre-fetched song
await set_current_song(server_id, title, song_link, thumbnail, duration)
# Check loop mode before removing
loop_mode = await get_loop_mode(server_id)
if loop_mode != 'song': # Only remove if not looping song
await mark_song_as_finished(server_id, result[3])
if loop_mode != 'song':
db.remove_song(str(server_id), position)
return result[1]
return song_link
# Add server to db if first time queuing
async def add_server(server_id, cursor, conn):
cursor.execute('SELECT COUNT(*) FROM servers WHERE server_id = ?', (server_id,))
if cursor.fetchone()[0] == 0:
cursor.execute('''INSERT INTO servers (server_id, loop_mode, volume, effect, song_thumbnail, song_url)
VALUES (?, 'off', 100, 'none', '', '')''', (server_id,))
conn.commit()
# set song as played and update indexes
async def mark_song_as_finished(server_id, order_num):
# Connect to the database
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Update the song as finished
cursor.execute('''DELETE FROM songs
WHERE server_id = ? AND position = ?''',
(server_id, order_num))
# Close connection
conn.commit()
conn.close()
# set the current playing song of the server
async def set_current_song(server_id, title, url, thumbnail="", duration=0):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
start_time = time.time()
# Ensure duration is an integer
try:
duration = int(duration)
except:
duration = 0
cursor.execute(''' UPDATE servers
SET song_name = ?, song_url = ?, song_thumbnail = ?, song_start_time = ?, song_duration = ?
WHERE server_id = ?''',
(title, url, thumbnail, start_time, duration, server_id))
conn.commit()
conn.close()
# Returns dictionary with title and thumbnail
async def get_current_song(server_id):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute(''' SELECT song_name, song_thumbnail, song_url FROM servers WHERE server_id = ? LIMIT 1;''', (server_id,))
result = cursor.fetchone()
conn.commit()
conn.close()
if result:
return {'title': result[0], 'thumbnail': result[1], 'url': result[2]}
return {'title': "Nothing", 'thumbnail': None, 'url': ''}
async def get_current_progress(server_id):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute('''SELECT song_start_time, song_duration, is_playing FROM servers WHERE server_id = ? LIMIT 1;''', (server_id,))
result = cursor.fetchone()
conn.close() # Close quickly
if not result or result[2] == 0:
return 0, 0, 0.0
start_time, duration, _ = result
if duration is None or duration == 0:
return 0, 0, 0.0
elapsed = int(time.time() - start_time)
elapsed = min(elapsed, duration)
percentage = (elapsed / duration) * 100 if duration > 0 else 0
return elapsed, duration, percentage
async def get_max(server_id, cursor):
cursor.execute("SELECT MAX(position) FROM songs WHERE server_id = ?", (server_id,))
result = cursor.fetchone()
return result[0] if result[0] is not None else -1
async def update_server(server_id, playing):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
await add_server(server_id, cursor, conn)
val = 1 if playing else 0
cursor.execute("UPDATE servers SET is_playing = ? WHERE server_id = ?", (val, server_id))
conn.commit()
conn.close()
async def is_server_playing(server_id):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
await add_server(server_id, cursor, conn)
cursor.execute("SELECT is_playing FROM servers WHERE server_id = ?", (server_id,))
res = cursor.fetchone()
conn.close()
return True if res[0] == 1 else False
async def clear(server_id):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
await add_server(server_id, cursor, conn)
await update_server(server_id, False)
cursor.execute("DELETE FROM songs WHERE server_id = ?", (server_id,))
conn.commit()
conn.close()
async def grab_songs(server_id):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
await add_server(server_id, cursor, conn)
cursor.execute("SELECT title, duration, queued_by FROM songs WHERE server_id = ? ORDER BY position LIMIT 10", (server_id,))
songs = cursor.fetchall()
max_pos = await get_max(server_id, cursor)
conn.close()
return max_pos, songs
"""
Get current queue
# --- Effects/Loop/Shuffle/Volume (Simplified Paste) ---
async def get_loop_mode(server_id):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
await add_server(server_id, cursor, conn)
cursor.execute("SELECT loop_mode FROM servers WHERE server_id = ?", (server_id,))
res = cursor.fetchone()
conn.close()
return res[0] if res else 'off'
Returns:
Tuple of (max_position, list_of_songs)
"""
return db.get_queue(str(server_id), limit=10)
async def set_loop_mode(server_id, mode):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
await add_server(server_id, cursor, conn)
cursor.execute("UPDATE servers SET loop_mode = ? WHERE server_id = ?", (mode, server_id))
conn.commit()
conn.close()
async def get_volume(server_id):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
await add_server(server_id, cursor, conn)
cursor.execute("SELECT volume FROM servers WHERE server_id = ?", (server_id,))
res = cursor.fetchone()
conn.close()
return res[0] if res else 100
async def clear(server_id):
"""Clear the queue for a server"""
db.clear_queue(str(server_id))
await update_server(server_id, False)
async def set_volume(server_id, vol):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
await add_server(server_id, cursor, conn)
cursor.execute("UPDATE servers SET volume = ? WHERE server_id = ?", (vol, server_id))
conn.commit()
conn.close()
return vol
async def shuffle_queue(server_id):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
await add_server(server_id, cursor, conn)
cursor.execute("SELECT position, song_link, queued_by, title, thumbnail, duration FROM songs WHERE server_id = ? ORDER BY position", (server_id,))
songs = cursor.fetchall()
if len(songs) <= 1:
conn.close()
return False
random.shuffle(songs)
cursor.execute("DELETE FROM songs WHERE server_id = ?", (server_id,))
for i, s in enumerate(songs):
cursor.execute("INSERT INTO songs VALUES (?, ?, ?, ?, ?, ?, ?)", (server_id, s[1], s[2], i, s[3], s[4], s[5]))
conn.commit()
conn.close()
return True
"""Shuffle the queue randomly"""
return db.shuffle_queue(str(server_id))
# ===================================
# Server State Management
# ===================================
async def update_server(server_id, playing):
"""Update server playing status"""
db.set_server_playing(str(server_id), playing)
async def is_server_playing(server_id):
"""Check if server is currently playing"""
return db.is_server_playing(str(server_id))
async def set_current_song(server_id, title, url, thumbnail="", duration=0):
"""Set the currently playing song"""
db.set_current_song(
str(server_id),
title,
url,
thumbnail,
duration,
time.time() # start_time
)
async def get_current_song(server_id):
"""Get current song info"""
return db.get_current_song(str(server_id))
async def get_current_progress(server_id):
"""Get playback progress (elapsed, duration, percentage)"""
return db.get_current_progress(str(server_id))
# ===================================
# Settings Management
# ===================================
async def get_loop_mode(server_id):
"""Get loop mode: 'off', 'song', or 'queue'"""
return db.get_loop_mode(str(server_id))
async def set_loop_mode(server_id, mode):
"""Set loop mode: 'off', 'song', or 'queue'"""
db.set_loop_mode(str(server_id), mode)
async def get_volume(server_id):
"""Get volume (0-200)"""
return db.get_volume(str(server_id))
async def set_volume(server_id, vol):
"""Set volume (0-200)"""
return db.set_volume(str(server_id), vol)
async def get_effect(server_id):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
await add_server(server_id, cursor, conn)
cursor.execute("SELECT effect FROM servers WHERE server_id = ?", (server_id,))
res = cursor.fetchone()
conn.close()
return res[0] if res else 'none'
"""Get current audio effect"""
return db.get_effect(str(server_id))
async def set_effect(server_id, fx):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
await add_server(server_id, cursor, conn)
cursor.execute("UPDATE servers SET effect = ? WHERE server_id = ?", (fx, server_id))
conn.commit()
conn.close()
"""Set audio effect"""
db.set_effect(str(server_id), fx)
# ===================================
# Effect Metadata
# ===================================
def list_all_effects():
"""List all available audio effects"""
return [
'none', 'bassboost', 'nightcore', 'slowed', 'earrape', 'deepfry', 'distortion',
'reverse', 'chipmunk', 'demonic', 'underwater', 'robot',
'8d', 'vibrato', 'tremolo', 'echo', 'phone', 'megaphone'
]
def get_effect_emoji(effect_name):
# Short list of emoji mappings
"""Get emoji for effect"""
emojis = {
'none': '', # Changed to generic Sparkles
'none': '',
'bassboost': '💥',
'nightcore': '',
'slowed': '🐢',
@@ -459,7 +341,9 @@ def get_effect_emoji(effect_name):
}
return emojis.get(effect_name, '')
def get_effect_description(effect_name):
"""Get description for effect"""
descriptions = {
'none': 'Normal audio',
'bassboost': 'MAXIMUM BASS 🔊',
@@ -482,43 +366,60 @@ def get_effect_description(effect_name):
}
return descriptions.get(effect_name, 'Unknown effect')
# ===================================
# Playback
# ===================================
async def play(ctx):
"""
Main playback loop - plays songs from queue
"""
server_id = ctx.guild.id
voice_client = ctx.voice_client
if voice_client is None:
await update_server(server_id, False)
return
# Wait for current song to finish
while voice_client.is_playing():
await asyncio.sleep(0.5)
# Get next song
url = await pop(server_id)
if url is None:
await update_server(server_id, False)
return
try:
# Scale volume down to prevent earrape
# User sees 0-200%, but internally we scale by 0.25
# So user's 100% = 0.25 actual volume (25%)
vol = await get_volume(server_id) / 100.0 * 0.25
# Get volume and effect settings
vol = await get_volume(server_id) / 100.0 * 0.25 # Scale down by 0.25
fx = await get_effect(server_id)
opts = get_effect_options(fx)
# Create audio source
src = discord.FFmpegPCMAudio(url, **opts)
src = discord.PCMVolumeTransformer(src, volume=vol)
# After callback - play next song
def after(e):
if e: print(e)
if voice_client and not voice_client.is_connected(): return
if e:
print(f"Playback error: {e}")
if voice_client and not voice_client.is_connected():
return
# Schedule next song
coro = play(ctx)
fut = asyncio.run_coroutine_threadsafe(coro, ctx.bot.loop)
try: fut.result()
except: pass
try:
fut.result()
except Exception as ex:
print(f"Error in after callback: {ex}")
voice_client.play(src, after=after)
except Exception as e:
print(f"Play error: {e}")
# Try next song on error
await play(ctx)

View File

@@ -1,5 +1,9 @@
# Handles translating urls and search terms
"""
URL and search query handling for Groovy-Zilean
Translates YouTube, Spotify, SoundCloud URLs and search queries into playable audio
"""
from typing import Any
import yt_dlp as ytdlp
import spotipy
@@ -22,7 +26,7 @@ ydl_opts = {
},
}
async def main(url, sp):
async def main(url: str, sp: spotipy.Spotify) -> list[dict[str, Any] | str]:
#url = url.lower()
@@ -60,7 +64,7 @@ async def main(url, sp):
return []
async def search_song(search):
async def search_song(search: str) -> list[dict[str, Any]]:
with ytdlp.YoutubeDL(ydl_opts) as ydl:
try:
info = ydl.extract_info(f"ytsearch1:{search}", download=False)
@@ -89,7 +93,7 @@ async def search_song(search):
return [data]
async def spotify_song(url, sp):
async def spotify_song(url: str, sp: spotipy.Spotify) -> list[dict[str, Any]]:
track = sp.track(url.split("/")[-1].split("?")[0])
search = ""
@@ -106,7 +110,11 @@ async def spotify_song(url, sp):
return await search_song(query)
async def spotify_playlist(url, sp):
async def spotify_playlist(url: str, sp: spotipy.Spotify) -> list[str | dict[str, Any]]:
"""
Get songs from a Spotify playlist
Returns a mixed list where first item is dict, rest are search strings
"""
# Get the playlist uri code
code = url.split("/")[-1].split("?")[0]
@@ -115,42 +123,36 @@ async def spotify_playlist(url, sp):
results = sp.playlist_tracks(code)['items']
except spotipy.exceptions.SpotifyException:
return []
# Go through the tracks
songs = []
# Go through the tracks and build search queries
songs: list[str | dict[str, Any]] = [] # Explicit type for mypy
for track in results:
search = ""
# Fetch all artists
for artist in track['track']['artists']:
# Add all artists to search
search += f"{artist['name']}, "
# Remove last column
# Remove last comma
search = search[:-2]
search += f" - {track['track']['name']}"
songs.append(search)
#searched_result = search_song(search)
#if searched_result == []:
#continue
#songs.append(searched_result[0])
# Fetch first song's full data
while True:
search_result = await search_song(songs[0])
search_result = await search_song(songs[0]) # type: ignore
if search_result == []:
songs.pop(0)
continue
else:
songs[0] = search_result[0]
songs[0] = search_result[0] # Replace string with dict
break
return songs
async def song_download(url):
async def song_download(url: str) -> list[dict[str, Any]]:
with ytdlp.YoutubeDL(ydl_opts) as ydl:
try:
info = ydl.extract_info(url, download=False)
@@ -180,7 +182,7 @@ async def song_download(url):
return [data]
async def playlist_download(url):
async def playlist_download(url: str) -> list[dict[str, Any]]:
with ytdlp.YoutubeDL(ydl_opts) as ydl:
try:
info = ydl.extract_info(url, download=False)

View File

@@ -1,3 +1,9 @@
"""
Utility functions for Groovy-Zilean music bot
Handles voice channel operations, queue display, and inactivity tracking
"""
from typing import Any
import discord
from discord.ext.commands.context import Context
from discord.ext.commands.converter import CommandError
@@ -7,15 +13,29 @@ from . import queue
import asyncio
# Track last activity time for each server
last_activity = {}
last_activity: dict[int, float] = {}
# Joining/moving to the user's vc in a guild
async def join_vc(ctx: Context):
# ===================================
# Voice Channel Management
# ===================================
async def join_vc(ctx: Context) -> discord.VoiceClient:
"""
Join or move to the user's voice channel
Args:
ctx: Command context
Returns:
The voice client connection
Raises:
CommandError: If user is not in a voice channel
"""
# Get the user's vc
author_voice = getattr(ctx.author, "voice")
if author_voice is None:
# Raise exception if user is not in vc
raise CommandError("User is not in voice channel")
# Get user's vc
@@ -25,19 +45,26 @@ async def join_vc(ctx: Context):
# Join or move to the user's vc
if ctx.voice_client is None:
vc = await vc.connect()
vc_client = await vc.connect()
else:
# Safe to ignore type error for now
vc = await ctx.voice_client.move_to(vc)
vc_client = await ctx.voice_client.move_to(vc)
# Update last activity
last_activity[ctx.guild.id] = asyncio.get_event_loop().time()
return vc
return vc_client
# Leaving the voice channel of a user
async def leave_vc(ctx: Context):
async def leave_vc(ctx: Context) -> None:
"""
Leave the voice channel and clean up
Args:
ctx: Command context
Raises:
CommandError: If bot is not in VC or user is not in same VC
"""
# If the bot is not in a vc of this server
if ctx.voice_client is None:
raise CommandError("I am not in a voice channel")
@@ -73,9 +100,18 @@ async def leave_vc(ctx: Context):
del last_activity[ctx.guild.id]
# Auto-disconnect if inactive
async def check_inactivity(bot):
"""Background task to check for inactive voice connections"""
# ===================================
# Inactivity Management
# ===================================
async def check_inactivity(bot: discord.Client) -> None:
"""
Background task to check for inactive voice connections
Auto-disconnects after 5 minutes of inactivity
Args:
bot: The Discord bot instance
"""
try:
current_time = asyncio.get_event_loop().time()
@@ -98,20 +134,34 @@ async def check_inactivity(bot):
print(f"Error in inactivity checker: {e}")
# Update activity timestamp when playing
def update_activity(guild_id):
"""Call this when a song starts playing"""
def update_activity(guild_id: int) -> None:
"""
Update activity timestamp when a song starts playing
Args:
guild_id: Discord guild/server ID
"""
last_activity[guild_id] = asyncio.get_event_loop().time()
# Interactive buttons for queue control
# ===================================
# Queue Display & Controls
# ===================================
class QueueControls(View):
def __init__(self, ctx):
super().__init__(timeout=None) # No timeout allows buttons to stay active longer
"""Interactive buttons for queue control"""
def __init__(self, ctx: Context) -> None:
super().__init__(timeout=None) # No timeout allows buttons to stay active longer
self.ctx = ctx
async def refresh_message(self, interaction: discord.Interaction):
"""Helper to regenerate the embed and edit the message"""
async def refresh_message(self, interaction: discord.Interaction) -> None:
"""
Helper to regenerate the embed and edit the message
Args:
interaction: Discord interaction from button press
"""
try:
# Generate new embed
embed, view = await generate_queue_ui(self.ctx)
@@ -119,10 +169,13 @@ class QueueControls(View):
except Exception as e:
# Fallback if edit fails
if not interaction.response.is_done():
await interaction.response.send_message("Refreshed, but something went wrong updating the display.", ephemeral=True)
await interaction.response.send_message(
"Refreshed, but something went wrong updating the display.",
ephemeral=True
)
@discord.ui.button(label="⏭️ Skip", style=discord.ButtonStyle.primary)
async def skip_button(self, interaction: discord.Interaction, button: Button):
async def skip_button(self, interaction: discord.Interaction, button: Button) -> None:
if interaction.user not in self.ctx.voice_client.channel.members:
await interaction.response.send_message("❌ You must be in the voice channel!", ephemeral=True)
return
@@ -130,11 +183,6 @@ class QueueControls(View):
# Loop logic check
loop_mode = await queue.get_loop_mode(self.ctx.guild.id)
# Logic mimics the command
if loop_mode == 'song':
# Just restart current song effectively but here we assume standard skip behavior for button
pass
# Perform the skip
await queue.pop(self.ctx.guild.id, True, skip_mode=True)
if self.ctx.voice_client:
@@ -144,35 +192,45 @@ class QueueControls(View):
await self.refresh_message(interaction)
@discord.ui.button(label="🔀 Shuffle", style=discord.ButtonStyle.secondary)
async def shuffle_button(self, interaction: discord.Interaction, button: Button):
async def shuffle_button(self, interaction: discord.Interaction, button: Button) -> None:
await queue.shuffle_queue(self.ctx.guild.id)
await self.refresh_message(interaction)
@discord.ui.button(label="🔁 Loop", style=discord.ButtonStyle.secondary)
async def loop_button(self, interaction: discord.Interaction, button: Button):
async def loop_button(self, interaction: discord.Interaction, button: Button) -> None:
current_mode = await queue.get_loop_mode(self.ctx.guild.id)
new_mode = 'song' if current_mode == 'off' else ('queue' if current_mode == 'song' else 'off')
await queue.set_loop_mode(self.ctx.guild.id, new_mode)
await self.refresh_message(interaction)
@discord.ui.button(label="🗑️ Clear", style=discord.ButtonStyle.danger)
async def clear_button(self, interaction: discord.Interaction, button: Button):
async def clear_button(self, interaction: discord.Interaction, button: Button) -> None:
await queue.clear(self.ctx.guild.id)
if self.ctx.voice_client and self.ctx.voice_client.is_playing():
self.ctx.voice_client.stop()
await self.refresh_message(interaction)
@discord.ui.button(label="🔄 Refresh", style=discord.ButtonStyle.gray)
async def refresh_button(self, interaction: discord.Interaction, button: Button):
async def refresh_button(self, interaction: discord.Interaction, button: Button) -> None:
await self.refresh_message(interaction)
async def generate_queue_ui(ctx: Context):
async def generate_queue_ui(ctx: Context) -> tuple[discord.Embed, QueueControls]:
"""
Generate the queue embed and controls
Args:
ctx: Command context
Returns:
Tuple of (embed, view) for displaying queue
"""
guild_id = ctx.guild.id
server = ctx.guild
# Fetch all data
n, songs = await queue.grab_songs(guild_id)
current = await queue.get_current_song(guild_id) # Returns title, thumbnail, url
current = await queue.get_current_song(guild_id)
loop_mode = await queue.get_loop_mode(guild_id)
volume = await queue.get_volume(guild_id)
effect = await queue.get_effect(guild_id)
@@ -183,10 +241,10 @@ async def generate_queue_ui(ctx: Context):
# Map loop mode to nicer text
loop_map = {
'off': {'emoji': '⏹️', 'text': 'Off'},
'song': {'emoji': '🔂', 'text': 'Song'},
'queue': {'emoji': '🔁', 'text': 'Queue'}
}
'off': {'emoji': '⏹️', 'text': 'Off'},
'song': {'emoji': '🔂', 'text': 'Song'},
'queue': {'emoji': '🔁', 'text': 'Queue'}
}
loop_info = loop_map.get(loop_mode, loop_map['off'])
loop_emoji = loop_info['emoji']
loop_text = loop_info['text']
@@ -197,11 +255,9 @@ async def generate_queue_ui(ctx: Context):
# Progress Bar Logic
progress_bar = ""
# Only show bar if duration > 0 (prevents weird 00:00 bars)
if duration > 0:
bar_length = 16
filled = int((percentage / 100) * bar_length)
# Ensure filled isn't bigger than length
filled = min(filled, bar_length)
bar_str = '' * filled + '🔘' + '' * (bar_length - filled)
progress_bar = f"\n`{format_time(elapsed)}` {bar_str} `{format_time(duration)}`"
@@ -215,14 +271,11 @@ async def generate_queue_ui(ctx: Context):
description = "## 💤 Nothing is playing\nUse `/play` to start the party!"
else:
# Create Hyperlink [Title](URL)
# If no URL exists, link to Discord homepage as fallback or just bold
if url and url.startswith("http"):
song_link = f"[{title}]({url})"
else:
song_link = f"**{title}**"
# CLEARER STATUS LINE:
# Loop: Mode | Effect: Name | Vol: %
description = (
f"## 💿 Now Playing\n"
f"### {song_link}\n"
@@ -241,7 +294,7 @@ async def generate_queue_ui(ctx: Context):
embed.add_field(name="⏳ Up Next", value=queue_text, inline=False)
remaining = (n) - 9 # Approx calculation based on your grabbing logic
remaining = n - 9
if remaining > 0:
embed.set_footer(text=f"Waitlist: {remaining} more songs...")
else:
@@ -251,23 +304,38 @@ async def generate_queue_ui(ctx: Context):
if thumb and isinstance(thumb, str) and thumb.startswith("http"):
embed.set_thumbnail(url=thumb)
elif server.icon:
# Fallback to server icon
embed.set_thumbnail(url=server.icon.url)
view = QueueControls(ctx)
return embed, view
# The command entry point calls this
async def display_server_queue(ctx: Context, songs, n):
async def display_server_queue(ctx: Context, songs: list, n: int) -> None:
"""
Display the server's queue with interactive controls
Args:
ctx: Command context
songs: List of songs in queue
n: Total number of songs
"""
embed, view = await generate_queue_ui(ctx)
await ctx.send(embed=embed, view=view)
# Build a display message for queuing a new song
async def queue_message(ctx: Context, data: dict):
async def queue_message(ctx: Context, data: dict[str, Any]) -> None:
"""
Display a message when a song is queued
Args:
ctx: Command context
data: Song data dictionary
"""
msg = discord.Embed(
title="🎵 Song Queued",
description=f"**{data['title']}**",
color=discord.Color.green())
title="🎵 Song Queued",
description=f"**{data['title']}**",
color=discord.Color.green()
)
msg.set_thumbnail(url=data['thumbnail'])
msg.add_field(name="⏱️ Duration", value=format_time(data['duration']), inline=True)
@@ -276,9 +344,23 @@ async def queue_message(ctx: Context, data: dict):
await ctx.send(embed=msg)
# Converts seconds into more readable format
def format_time(seconds):
# ===================================
# Utility Functions
# ===================================
def format_time(seconds: int | float) -> str:
"""
Convert seconds into readable time format (MM:SS or HH:MM:SS)
Args:
seconds: Time in seconds
Returns:
Formatted time string
"""
try:
seconds = int(seconds)
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)

297
config.py
View File

@@ -1,115 +1,198 @@
# config.py
# This file should parse all configurations within the bot
# Modern configuration management using environment variables
import os
import discord
from discord import Color
import json
from dotenv import load_dotenv
from typing import Optional
# Read data from JSON file in ./data/config.json
def read_data():
with open("./data/config.json", "r") as file:
return json.load(file)
# Load environment variables from .env file
load_dotenv()
raise Exception("Could not load config data")
def get_spotify_creds():
data = read_data()
data = data.get("spotify")
SCID = data.get("SCID")
secret = data.get("SECRET")
return SCID, secret
# Reading prefix
def get_prefix():
data = read_data()
prefix = data.get('prefix')
if prefix:
return prefix
raise Exception("Missing config data: prefix")
# Fetch the bot secret token
def get_login(bot):
data = read_data()
if data is False or data.get(f"{bot}bot") is False:
raise Exception(f"Missing config data: {bot}bot")
data = data.get(f"{bot}bot")
return data.get("secret")
# Read the status and text data
def get_status():
data = read_data()
if data is False or data.get('status') is False:
raise Exception("Missing config data: status")
# Find type
data = data.get('status')
return translate_status(
data.get('type'),
data.get('text'),
data.get('link')
)
# Get colors from colorscheme
def get_color(color):
data = read_data()
if data is False or data.get('status') is False:
raise Exception("Missing config data: color")
# Grab color
string_value = data.get("colorscheme").get(color)
hex_value = Color.from_str(string_value)
return hex_value
# Taking JSON variables and converting them into a presence
# Use None url incase not provided
def translate_status(status_type, status_text, status_url=None):
if status_type == "playing":
return discord.Activity(
type=discord.ActivityType.playing,
name=status_text
)
elif status_type == "streaming":
return discord.Activity(
type=discord.ActivityType.streaming,
name=status_text,
url=status_url
)
elif status_type == "listening":
return discord.Activity(
type=discord.ActivityType.listening,
name=status_text
)
elif status_type == "watching":
return discord.Activity(
type=discord.ActivityType.watching,
name=status_text
)
elif status_type == "competing":
return discord.Activity(
type=discord.ActivityType.competing,
name=status_text
)
#TODO
# Implement custom status type
# ===================================
# Environment Detection
# ===================================
ENVIRONMENT = os.getenv("ENVIRONMENT", "dev").lower()
IS_PRODUCTION = ENVIRONMENT == "live"
IS_DEVELOPMENT = ENVIRONMENT == "dev"
# ===================================
# Discord Configuration
# ===================================
def get_discord_token() -> str:
"""Get the appropriate Discord token based on environment"""
if IS_PRODUCTION:
token = os.getenv("DISCORD_TOKEN_LIVE")
if not token:
raise ValueError("DISCORD_TOKEN_LIVE not found in environment!")
return token
else:
raise Exception(f"Invalid status type: {status_type}")
token = os.getenv("DISCORD_TOKEN_DEV")
if not token:
raise ValueError("DISCORD_TOKEN_DEV not found in environment!")
return token
def get_prefix() -> str:
"""Get command prefix (default: =)"""
return os.getenv("DISCORD_PREFIX", "=")
# ===================================
# Spotify Configuration
# ===================================
def get_spotify_creds() -> tuple[str, str]:
"""Get Spotify API credentials"""
client_id = os.getenv("SPOTIFY_CLIENT_ID")
client_secret = os.getenv("SPOTIFY_CLIENT_SECRET")
if not client_id or not client_secret:
raise ValueError("Spotify credentials not found in environment!")
return client_id, client_secret
# ===================================
# Database Configuration
# ===================================
def get_db_path() -> str:
"""Get SQLite database path"""
return os.getenv("DB_PATH", "./data/music.db")
# Future PostgreSQL config
def get_postgres_url() -> Optional[str]:
"""Get PostgreSQL connection URL (for future migration)"""
host = os.getenv("DB_HOST")
port = os.getenv("DB_PORT", "5432")
name = os.getenv("DB_NAME")
user = os.getenv("DB_USER")
password = os.getenv("DB_PASSWORD")
if all([host, name, user, password]):
return f"postgresql://{user}:{password}@{host}:{port}/{name}"
return None
# ===================================
# Bot Status/Presence
# ===================================
def get_status() -> discord.Activity:
"""Get bot status/presence"""
status_type = os.getenv("STATUS_TYPE", "listening").lower()
status_text = os.getenv("STATUS_TEXT", "Zilean's Theme")
status_url = os.getenv("STATUS_URL")
return translate_status(status_type, status_text, status_url)
def translate_status(
status_type: str,
status_text: str,
status_url: Optional[str] = None
) -> discord.Activity:
"""Convert status type string to Discord Activity"""
status_map = {
"playing": discord.ActivityType.playing,
"streaming": discord.ActivityType.streaming,
"listening": discord.ActivityType.listening,
"watching": discord.ActivityType.watching,
"competing": discord.ActivityType.competing,
}
activity_type = status_map.get(status_type)
if not activity_type:
raise ValueError(f"Invalid status type: {status_type}")
# Streaming requires URL
if status_type == "streaming":
if not status_url:
raise ValueError("Streaming status requires STATUS_URL")
return discord.Activity(
type=activity_type,
name=status_text,
url=status_url
)
return discord.Activity(type=activity_type, name=status_text)
# ===================================
# Color Scheme
# ===================================
def get_color(color_name: str) -> Color:
"""Get color from environment (hex format)"""
color_map = {
"primary": os.getenv("COLOR_PRIMARY", "#7289DA"),
"success": os.getenv("COLOR_SUCCESS", "#43B581"),
"error": os.getenv("COLOR_ERROR", "#F04747"),
"warning": os.getenv("COLOR_WARNING", "#FAA61A"),
}
hex_value = color_map.get(color_name.lower())
if not hex_value:
# Default to Discord blurple
hex_value = "#7289DA"
return Color.from_str(hex_value)
# ===================================
# Logging Configuration
# ===================================
def get_log_level() -> str:
"""Get logging level from environment"""
return os.getenv("LOG_LEVEL", "INFO").upper()
# ===================================
# Legacy Support (for backward compatibility)
# ===================================
def get_login(bot: str) -> str:
"""Legacy function - maps to new get_discord_token()"""
# Ignore the 'bot' parameter, use ENVIRONMENT instead
return get_discord_token()
# ===================================
# Validation
# ===================================
def validate_config():
"""Validate that all required config is present"""
errors = []
# Check Discord token
try:
get_discord_token()
except ValueError as e:
errors.append(str(e))
# Check Spotify creds
try:
get_spotify_creds()
except ValueError as e:
errors.append(str(e))
if errors:
error_msg = "\n".join(errors)
raise ValueError(f"Configuration errors:\n{error_msg}")
print(f"✅ Configuration validated (Environment: {ENVIRONMENT})")
# ===================================
# Startup Info
# ===================================
def print_config_info():
"""Print configuration summary (without secrets!)"""
print("=" * 50)
print("🎵 Groovy-Zilean Configuration")
print("=" * 50)
print(f"Environment: {ENVIRONMENT.upper()}")
print(f"Prefix: {get_prefix()}")
print(f"Database: {get_db_path()}")
print(f"Log Level: {get_log_level()}")
print(f"Spotify: {'Configured ✅' if os.getenv('SPOTIFY_CLIENT_ID') else 'Not configured ❌'}")
print("=" * 50)

13
main.py
View File

@@ -3,6 +3,16 @@ from bot import Groovy
import config
import help
# Validate configuration before starting
try:
config.validate_config()
config.print_config_info()
except ValueError as e:
print(f"❌ Configuration Error:\n{e}")
print("\n💡 Tip: Copy .env.example to .env and fill in your values")
exit(1)
# Initialize bot with validated config
client = Groovy(command_prefix=config.get_prefix(), intents=discord.Intents.all())
@client.event
@@ -25,4 +35,5 @@ async def on_voice_state_update(member, before, after):
except Exception as e:
print(f"Error auto-disconnecting: {e}")
client.run(config.get_login("live"))
# Run bot with environment-appropriate token
client.run(config.get_discord_token())

31
mypy.ini Normal file
View File

@@ -0,0 +1,31 @@
# mypy configuration for groovy-zilean
# Type checking configuration that's practical for a Discord bot
[mypy]
# Python version
python_version = 3.13
# Ignore missing imports for libraries without type stubs
# Discord.py, spotipy, yt-dlp don't have complete type stubs
ignore_missing_imports = True
# Be strict about our own code
# Start lenient, can tighten later
disallow_untyped_defs = False
check_untyped_defs = True
# Too noisy with discord.py
warn_return_any = False
warn_unused_configs = True
# Exclude patterns
exclude = venv/
# Per-module overrides
[mypy-discord.*]
ignore_missing_imports = True
[mypy-spotipy.*]
ignore_missing_imports = True
[mypy-yt_dlp.*]
ignore_missing_imports = True

View File

@@ -1,12 +1,14 @@
# Core bot framework
discord.py==2.6.4
aiohttp==3.8.4
PyNaCl==1.5.0
spotipy==2.23.0
discord.py>=2.6.4
aiohttp>=3.9.0
PyNaCl>=1.5.0
spotipy>=2.23.0
# Configuration management
python-dotenv>=1.0.0
# YouTube extractor
yt-dlp>=2025.10.14
# System dependencies
PyAudio==0.2.13
mutagen==1.46.0
# Audio metadata (if needed by yt-dlp)
mutagen>=1.47.0