Compare commits

1 Commits

Author SHA1 Message Date
31309046d2 removed current roadmap 2026-02-01 18:44:36 +00:00
14 changed files with 490 additions and 2027 deletions

View File

@@ -1,67 +0,0 @@
# Groovy-Zilean Configuration
# Copy this file to .env and fill in your values
# NEVER commit .env to git!
# ===================================
# Environment Selection
# ===================================
# Set to "dev" for development bot, "live" for production
ENVIRONMENT=dev
# ===================================
# Discord Bot Tokens
# ===================================
DISCORD_TOKEN_DEV=
DISCORD_TOKEN_LIVE=
# Bot settings
DISCORD_PREFIX=
# ===================================
# Spotify Integration
# ===================================
SPOTIFY_CLIENT_ID=
SPOTIFY_CLIENT_SECRET=
# ===================================
# Database
# ===================================
# For now (SQLite)
DB_PATH=./data/music.db
# For future (PostgreSQL)
# DB_HOST=localhost
# DB_PORT=5432
# DB_NAME=groovy_zilean
# DB_USER=groovy
# DB_PASSWORD=your_db_password
# ===================================
# Bot Status/Presence
# ===================================
# Types: playing, listening, watching, streaming, competing
STATUS_TYPE=listening
STATUS_TEXT==help | /help
# STATUS_URL= # Only needed for streaming type
# ===================================
# Color Scheme (hex colors)
# ===================================
COLOR_PRIMARY=#7289DA
COLOR_SUCCESS=#43B581
COLOR_ERROR=#F04747
COLOR_WARNING=#FAA61A
# ===================================
# Web Dashboard (Future)
# ===================================
# DISCORD_CLIENT_ID=your_oauth_client_id
# DISCORD_CLIENT_SECRET=your_oauth_secret
# DISCORD_REDIRECT_URI=http://localhost:8000/callback
# WEB_SECRET_KEY=random_secret_for_sessions
# ===================================
# Logging
# ===================================
LOG_LEVEL=INFO
# Options: DEBUG, INFO, WARNING, ERROR, CRITICAL

2
.gitignore vendored
View File

@@ -160,4 +160,4 @@ cython_debug/
#.idea/ #.idea/
# My stuff # My stuff
data/*.db data/

View File

@@ -1,849 +0,0 @@
# Groovy-Zilean Production Roadmap
**Goal:** Transform groovy-zilean from a personal project into a production-ready Discord music bot with a web dashboard.
**Philosophy:** Production-quality architecture with manageable complexity for a solo developer. No overkill, no shortcuts.
---
## Table of Contents
1. [Architecture Overview](#architecture-overview)
2. [Tech Stack Decisions](#tech-stack-decisions)
3. [Development Phases](#development-phases)
4. [Python Environment Setup](#python-environment-setup)
5. [Why This Approach](#why-this-approach)
6. [Quick Reference](#quick-reference)
---
## Architecture Overview
### The Winning Design: Database-Mediated Architecture
```
┌─────────────┐ HTTP/SSE ┌──────────────┐
│ Browser │◄────────────►│ FastAPI │
│ (HTMX HTML) │ │ Backend │
└─────────────┘ └──────┬───────┘
┌──────▼───────┐ ┌─────────────┐
│ PostgreSQL │◄────────│ Discord Bot │
│ Database │ │ (discord.py)│
└──────────────┘ └─────────────┘
```
### Key Principles
1. **Decoupled Services**
- Bot and web can restart independently
- No tight coupling via IPC
- Database is the single source of truth
2. **Simple Frontend (Initially)**
- HTMX for interactivity (no build step)
- Jinja2 templates (server-side rendering)
- Tailwind CSS via CDN (beautiful, no npm)
- **Optional:** Upgrade to React later when ready
3. **Production-Ready Backend**
- PostgreSQL for reliability
- FastAPI for modern Python web
- Discord OAuth2 for authentication
- Connection pooling, proper error handling
---
## Tech Stack Decisions
### Backend
| Component | Choice | Why |
|-----------|--------|-----|
| **Bot Framework** | discord.py 2.6.4+ | Industry standard, hybrid commands |
| **Web Framework** | FastAPI | Modern, async, auto-docs, large community |
| **Database** | PostgreSQL 14+ | Production-ready, ACID compliance, better than SQLite |
| **Music Extraction** | yt-dlp | Actively maintained, multi-platform support |
| **Auth** | Discord OAuth2 | Native Discord integration |
| **ORM** | SQLAlchemy (optional) | Or use asyncpg directly for simplicity |
### Frontend (Phase 1)
| Component | Choice | Why |
|-----------|--------|-----|
| **Templates** | Jinja2 | Server-side rendering, no build step |
| **Interactivity** | HTMX | Modern interactivity without React complexity |
| **Styling** | Tailwind CSS (CDN) | Beautiful UI, no build process |
| **Real-time** | Server-Sent Events | Simple polling/updates, no WebSocket complexity |
### Frontend (Phase 2 - Optional)
| Component | Choice | Why |
|-----------|--------|-----|
| **Framework** | React 18 | If you need more complex UI later |
| **Real-time** | WebSocket | For true real-time when needed |
| **State** | Zustand/Context | Simpler than Redux |
### Infrastructure
| Component | Choice | Why |
|-----------|--------|-----|
| **Python Version** | 3.11 or 3.12 | Modern features, better performance |
| **Environment** | venv | Isolated dependencies, no PATH conflicts |
| **Process Manager** | systemd | Reliable, built into Linux |
| **Reverse Proxy** | nginx | Standard, handles SSL, static files |
---
## Development Phases
### Phase 0: Current State ✅
- Working Discord bot with music playback
- SQLite database
- Hybrid commands (slash + prefix)
- 17 audio effects
- Queue, loop, shuffle functionality
- Spotify integration
### Phase 1: Quick Wins (1-2 hours)
**Goal:** Immediate improvements for better UX
Tasks:
1. **Lower default volume from 100% to 25%**
- Change default in `queue.py:119`
- Scale volume command display (user sees 0-200%, internally 0-50%)
- Prevents earrape for new users
2. **Add .mp3 and .mp4 file support**
- Extend `translate.py` to detect direct file URLs
- Support HTTP/HTTPS links to audio files
- Validate file type before processing
**Files to modify:**
- `cogs/music/queue.py`
- `cogs/music/translate.py`
---
### Phase 2: Code Refactoring (4-6 hours)
**Goal:** Clean, maintainable, documented codebase
#### 2.1 Database Abstraction
Create `cogs/music/db_manager.py`:
- Database connection class with context manager
- Connection pooling preparation
- Centralize all SQL queries
- Remove scattered `sqlite3.connect()` calls
#### 2.2 Configuration Management
Update `config.py`:
- Use environment variables for secrets
- Create `.env.example` template
- Remove hardcoded credentials
- Add validation for required config
#### 2.3 Code Organization
```
groovy-zilean/
├── bot/
│ ├── __init__.py
│ ├── bot.py # Main bot class
│ └── cogs/
│ └── music/
│ ├── __init__.py
│ ├── commands.py # User-facing commands
│ ├── player.py # Playback logic
│ ├── queue.py # Queue management
│ ├── effects.py # Audio effects
│ ├── db_manager.py # Database abstraction
│ └── translate.py # URL/playlist parsing
├── web/
│ ├── __init__.py # (Future web app)
├── shared/
│ ├── __init__.py
│ ├── config.py # Shared configuration
│ └── models.py # Data models
├── main.py # Entry point
├── requirements.txt
├── .env.example
└── README.md
```
#### 2.4 Error Handling & Logging
- Wrap all commands in try/except
- User-friendly error messages
- Proper logging setup (rotating file logs)
- Debug mode toggle
#### 2.5 Type Hints & Documentation
- Add type hints to all functions
- Docstrings for all classes/methods
- Inline comments for complex logic
**Expected outcome:**
- Easy to navigate codebase
- No secrets in code
- Consistent patterns throughout
---
### Phase 3: PostgreSQL Migration (3-4 hours)
**Goal:** Production-ready database layer
#### 3.1 Local PostgreSQL Setup
```bash
# Install PostgreSQL
sudo apt update
sudo apt install postgresql postgresql-contrib
# Create database and user
sudo -u postgres psql
CREATE DATABASE groovy_zilean;
CREATE USER groovy WITH PASSWORD 'your_password';
GRANT ALL PRIVILEGES ON DATABASE groovy_zilean TO groovy;
```
#### 3.2 Database Schema Design
```sql
-- servers table
CREATE TABLE servers (
server_id BIGINT PRIMARY KEY,
is_playing BOOLEAN DEFAULT FALSE,
song_name TEXT,
song_url TEXT,
song_thumbnail TEXT,
loop_mode VARCHAR(10) DEFAULT 'off',
volume INTEGER DEFAULT 25, -- NEW default!
effect VARCHAR(20) DEFAULT 'none',
song_start_time DOUBLE PRECISION DEFAULT 0,
song_duration INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- songs/queue table
CREATE TABLE songs (
id SERIAL PRIMARY KEY,
server_id BIGINT NOT NULL REFERENCES servers(server_id) ON DELETE CASCADE,
song_link TEXT,
queued_by TEXT,
position INTEGER NOT NULL,
title TEXT,
thumbnail TEXT,
duration INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(server_id, position)
);
-- Future: users table for web auth
CREATE TABLE users (
discord_id BIGINT PRIMARY KEY,
username TEXT,
avatar TEXT,
access_token TEXT,
refresh_token TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_login TIMESTAMP
);
-- Future: permissions table
CREATE TABLE permissions (
id SERIAL PRIMARY KEY,
server_id BIGINT REFERENCES servers(server_id),
user_id BIGINT,
role_id BIGINT,
can_play BOOLEAN DEFAULT TRUE,
can_skip BOOLEAN DEFAULT FALSE,
can_clear BOOLEAN DEFAULT FALSE,
can_modify_settings BOOLEAN DEFAULT FALSE
);
```
#### 3.3 Migration Script
Create `scripts/migrate_to_postgres.py`:
- Read all data from SQLite
- Insert into PostgreSQL
- Validate migration
- Backup SQLite file
#### 3.4 Update Database Code
Replace all `sqlite3` calls with `asyncpg` or `psycopg3`:
```python
# Old (SQLite)
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# New (PostgreSQL with asyncpg)
async with pool.acquire() as conn:
result = await conn.fetch("SELECT * FROM servers WHERE server_id = $1", server_id)
```
#### 3.5 Connection Pooling
```python
# In bot startup
self.db_pool = await asyncpg.create_pool(
host='localhost',
database='groovy_zilean',
user='groovy',
password=os.getenv('DB_PASSWORD'),
min_size=5,
max_size=20
)
```
**Expected outcome:**
- Reliable database with ACID guarantees
- Better concurrent access handling
- Ready for multi-server production load
---
### Phase 4: Web Dashboard (20-30 hours)
**Goal:** User-friendly web interface for bot control
#### 4.1 FastAPI Backend Setup
**Project structure:**
```
web/
├── __init__.py
├── main.py # FastAPI app
├── routes/
│ ├── __init__.py
│ ├── auth.py # Discord OAuth2
│ ├── servers.py # Server list/select
│ └── playback.py # Queue/controls
├── templates/
│ ├── base.html
│ ├── index.html
│ ├── dashboard.html
│ └── components/
│ ├── queue.html
│ └── controls.html
├── static/
│ ├── css/
│ └── js/
└── dependencies.py # Auth dependencies
```
**Core dependencies:**
```bash
pip install fastapi uvicorn jinja2 python-multipart httpx
```
#### 4.2 Discord OAuth2 Authentication
**Flow:**
1. User clicks "Login with Discord"
2. Redirect to Discord OAuth
3. Discord redirects back with code
4. Exchange code for token
5. Fetch user info + guilds
6. Create session
7. Show dashboard with user's servers
**Implementation:**
```python
# routes/auth.py
@router.get("/login")
async def login():
# Redirect to Discord OAuth
discord_auth_url = (
f"https://discord.com/api/oauth2/authorize"
f"?client_id={DISCORD_CLIENT_ID}"
f"&redirect_uri={REDIRECT_URI}"
f"&response_type=code"
f"&scope=identify guilds"
)
return RedirectResponse(discord_auth_url)
@router.get("/callback")
async def callback(code: str):
# Exchange code for token
# Fetch user info
# Create session
# Redirect to dashboard
```
#### 4.3 HTMX Frontend
**Example dashboard with HTMX:**
```html
<!-- templates/dashboard.html -->
<div class="container">
<!-- Server Selector -->
<select hx-get="/api/servers/{value}/queue"
hx-target="#queue-container"
hx-trigger="change">
{% for server in user_servers %}
<option value="{{ server.id }}">{{ server.name }}</option>
{% endfor %}
</select>
<!-- Now Playing (auto-updates every 5s) -->
<div id="now-playing"
hx-get="/api/now-playing"
hx-trigger="every 5s">
<!-- Server renders this -->
</div>
<!-- Queue (auto-updates) -->
<div id="queue-container"
hx-get="/api/queue"
hx-trigger="every 3s">
<!-- Queue items here -->
</div>
<!-- Controls -->
<div class="controls">
<button hx-post="/api/skip"
hx-target="#queue-container">
⏭️ Skip
</button>
<input type="range"
min="0" max="200"
hx-post="/api/volume"
hx-trigger="change"
hx-vals='{"volume": this.value}'>
</div>
<!-- Add Song Form -->
<form hx-post="/api/play"
hx-target="#queue-container">
<input name="query" placeholder="YouTube URL or search">
<button type="submit">Add to Queue</button>
</form>
</div>
```
**Why HTMX is perfect here:**
- No JavaScript needed for interactivity
- Server renders everything (simpler)
- Auto-updates with `hx-trigger="every Xs"`
- Progressive enhancement (works without JS)
#### 4.4 API Endpoints
**Read endpoints:**
```python
GET /api/servers # User's servers (with bot)
GET /api/servers/{id}/queue # Current queue
GET /api/servers/{id}/status # Now playing, volume, etc.
```
**Write endpoints:**
```python
POST /api/servers/{id}/play # Add song (body: {query: "..."})
POST /api/servers/{id}/skip # Skip current song
POST /api/servers/{id}/volume # Set volume (body: {volume: 150})
POST /api/servers/{id}/effect # Set effect (body: {effect: "nightcore"})
POST /api/servers/{id}/loop # Set loop mode (body: {mode: "queue"})
POST /api/servers/{id}/shuffle # Shuffle queue
POST /api/servers/{id}/clear # Clear queue
```
**Implementation example:**
```python
# routes/playback.py
@router.post("/api/servers/{server_id}/skip")
async def skip_song(
server_id: int,
user: User = Depends(get_current_user),
db = Depends(get_db)
):
# 1. Check user is in server
if server_id not in user.guild_ids:
raise HTTPException(403, "Not in this server")
# 2. Check permissions (future)
# if not has_permission(user, server_id, "can_skip"):
# raise HTTPException(403, "No permission")
# 3. Write command to database
await db.execute(
"INSERT INTO commands (server_id, action, user_id) VALUES ($1, $2, $3)",
server_id, "skip", user.id
)
# 4. Return updated queue
return await get_queue(server_id, db)
```
#### 4.5 Bot Integration (Command Processing)
**Add to bot:**
```python
# In bot.py or new cogs/web_commands.py
@tasks.loop(seconds=1)
async def process_web_commands(self):
"""Process commands from web dashboard"""
async with self.db_pool.acquire() as conn:
# Fetch unprocessed commands
commands = await conn.fetch(
"SELECT * FROM commands WHERE processed = FALSE"
)
for cmd in commands:
server_id = cmd['server_id']
action = cmd['action']
data = cmd['data']
guild = self.get_guild(server_id)
if not guild or not guild.voice_client:
continue
# Execute command
if action == "skip":
guild.voice_client.stop()
elif action == "volume":
# Set volume in database, next song picks it up
await queue.set_volume(server_id, data['volume'])
elif action == "play":
# Queue song from web
# ... (use existing play logic)
# Mark as processed
await conn.execute(
"UPDATE commands SET processed = TRUE WHERE id = $1",
cmd['id']
)
```
#### 4.6 Permissions System
**Basic implementation:**
```python
# Check if user can control bot
async def can_control_bot(user_id: int, server_id: int, action: str) -> bool:
# Check if user is in voice channel with bot
# Check if user has DJ role (configurable per server)
# Check specific permission for action
# Default: anyone in VC can control
pass
```
**Advanced (Phase 5):**
- Role-based permissions
- User-specific permissions
- Configurable via web dashboard
**Expected outcome:**
- Beautiful, functional web dashboard
- Discord OAuth login
- Real-time queue display
- Full playback control from browser
- Works on mobile
---
### Phase 5: Permissions & Production (4-6 hours)
**Goal:** Production-ready deployment
#### 5.1 Permission System
- DJ role configuration
- Per-server permission settings
- Web UI for permission management
#### 5.2 Rate Limiting
```python
from slowapi import Limiter
limiter = Limiter(key_func=get_remote_address)
@app.post("/api/play")
@limiter.limit("10/minute") # Max 10 songs per minute
async def play_song(...):
...
```
#### 5.3 Logging & Monitoring
```python
import logging
from logging.handlers import RotatingFileHandler
# Setup logging
handler = RotatingFileHandler(
'logs/bot.log',
maxBytes=10_000_000, # 10MB
backupCount=5
)
logging.basicConfig(
handlers=[handler],
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
```
#### 5.4 Systemd Services
```ini
# /etc/systemd/system/groovy-bot.service
[Unit]
Description=Groovy Zilean Discord Bot
After=network.target postgresql.service
[Service]
Type=simple
User=groovy
WorkingDirectory=/home/groovy/groovy-zilean
Environment="PATH=/home/groovy/groovy-zilean/venv/bin"
ExecStart=/home/groovy/groovy-zilean/venv/bin/python main.py
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
```
```ini
# /etc/systemd/system/groovy-web.service
[Unit]
Description=Groovy Zilean Web Dashboard
After=network.target postgresql.service
[Service]
Type=simple
User=groovy
WorkingDirectory=/home/groovy/groovy-zilean
Environment="PATH=/home/groovy/groovy-zilean/venv/bin"
ExecStart=/home/groovy/groovy-zilean/venv/bin/uvicorn web.main:app --host 0.0.0.0 --port 8000
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
```
#### 5.5 Nginx Configuration
```nginx
server {
listen 80;
server_name groovy.yourdomain.com;
# Redirect to HTTPS
return 301 https://$host$request_uri;
}
server {
listen 443 ssl http2;
server_name groovy.yourdomain.com;
ssl_certificate /etc/letsencrypt/live/groovy.yourdomain.com/fullchain.pem;
ssl_certificate_key /etc/letsencrypt/live/groovy.yourdomain.com/privkey.pem;
location / {
proxy_pass http://localhost:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
location /static {
alias /home/groovy/groovy-zilean/web/static;
expires 30d;
}
}
```
#### 5.6 Database Backups
```bash
#!/bin/bash
# scripts/backup_db.sh
BACKUP_DIR="/home/groovy/backups"
DATE=$(date +%Y%m%d_%H%M%S)
pg_dump groovy_zilean > "$BACKUP_DIR/groovy_zilean_$DATE.sql"
# Keep only last 7 days
find $BACKUP_DIR -name "groovy_zilean_*.sql" -mtime +7 -delete
```
Add to crontab:
```bash
0 2 * * * /home/groovy/groovy-zilean/scripts/backup_db.sh
```
#### 5.7 Environment Variables
```bash
# .env (NEVER commit this!)
DISCORD_TOKEN=your_bot_token_here
DISCORD_CLIENT_ID=your_client_id
DISCORD_CLIENT_SECRET=your_client_secret
SPOTIFY_CLIENT_ID=your_spotify_id
SPOTIFY_CLIENT_SECRET=your_spotify_secret
DB_PASSWORD=your_db_password
SECRET_KEY=random_secret_for_sessions
ENVIRONMENT=production
```
**Expected outcome:**
- Production-ready deployment
- Automatic restarts on failure
- HTTPS enabled
- Automated backups
- Proper logging
---
## Python Environment Setup
### Avoiding Python Version Hell
**Step 1: Install Python 3.12**
```bash
# On Debian/Ubuntu
sudo apt update
sudo apt install python3.12 python3.12-venv python3.12-dev
# Verify installation
python3.12 --version
```
**Step 2: Create Virtual Environment**
```bash
cd ~/coding/groovy-zilean
# Create venv (only once)
python3.12 -m venv venv
# Activate (every time you work on project)
source venv/bin/activate
# Your prompt should change to show (venv)
```
**Step 3: Install Dependencies**
```bash
# Make sure venv is activated!
pip install --upgrade pip
pip install -r requirements.txt
```
**Step 4: Add to .gitignore**
```
venv/
__pycache__/
*.pyc
.env
*.db
logs/
```
**Helpful Aliases (add to ~/.bashrc)**
```bash
alias groovy='cd ~/coding/groovy-zilean && source venv/bin/activate'
```
Then just type `groovy` to activate your environment!
---
## Why This Approach?
### Rejected: discord-ext-ipc + Quart
**discord-ext-ipc is unmaintained** (last update 2+ years ago)
❌ Tight coupling between bot and web
❌ Both processes must run together
❌ Quart less popular than FastAPI
❌ Still need polling for real-time updates
### Rejected: FastAPI + React + Redis + WebSocket
**Overkill for solo developer**
❌ Too many moving parts (4+ services)
❌ npm/node_modules complexity
❌ Requires learning React well
❌ WebSocket complexity for minimal gain
### Chosen: FastAPI + PostgreSQL + HTMX ✅
**Production-quality architecture**
✅ Decoupled services (independent restarts)
✅ Modern, well-maintained tools
✅ No frontend build step (initially)
✅ Easy upgrade path to React later
✅ Manageable complexity for solo dev
✅ Database as source of truth
✅ All Python backend
---
## Quick Reference
### Daily Development Workflow
```bash
# Activate environment
cd ~/coding/groovy-zilean
source venv/bin/activate
# Run bot
python main.py
# Run web (in another terminal)
source venv/bin/activate
uvicorn web.main:app --reload --port 8000
# Run tests (future)
pytest
# Database migrations (future)
alembic upgrade head
```
### Project Commands
```bash
# Install new package
pip install package_name
pip freeze > requirements.txt
# Database
psql groovy_zilean # Connect to database
pg_dump groovy_zilean > backup.sql # Backup
# Systemd
sudo systemctl start groovy-bot
sudo systemctl status groovy-bot
sudo journalctl -u groovy-bot -f # View logs
```
### File Locations
- **Bot entry point:** `main.py`
- **Config:** `shared/config.py` + `.env`
- **Database:** PostgreSQL (not file-based)
- **Logs:** `logs/bot.log`, `logs/web.log`
- **Web templates:** `web/templates/`
---
## Success Metrics
By the end of this roadmap, you'll have:
✅ Clean, maintainable codebase
✅ Production-ready database
✅ Web dashboard with Discord auth
✅ Mobile-friendly interface
✅ Automated deployment
✅ Backup system
✅ Proper error handling & logging
✅ Permission system
✅ Rate limiting
✅ No earrape (25% default volume!)
✅ .mp3/.mp4 file support
**Most importantly:** A bot that real servers can use, that you can maintain solo, and that you're proud of!
---
## Next Steps
1. **Today:** Quick wins (volume + file support)
2. **This week:** Refactoring
3. **Next week:** PostgreSQL migration
4. **Week after:** Web dashboard MVP
5. **Final week:** Production deployment
Let's build something awesome! 🎵⏱️

201
SETUP.md
View File

@@ -1,201 +0,0 @@
# Groovy-Zilean Setup Guide
Quick start guide for getting groovy-zilean running locally or in production.
---
## Prerequisites
- **Python 3.11+** (3.9 deprecated by yt-dlp)
- **FFmpeg** installed on your system
- Discord Bot Token ([Discord Developer Portal](https://discord.com/developers/applications))
- Spotify API Credentials ([Spotify Developer Dashboard](https://developer.spotify.com/dashboard))
---
## 1. Clone & Setup Environment
```bash
# Clone the repository
git clone <your-repo-url>
cd groovy-zilean
# Create virtual environment (keeps dependencies isolated)
python3.12 -m venv venv
# Activate virtual environment
source venv/bin/activate # Linux/Mac
# OR
venv\Scripts\activate # Windows
# Install dependencies
pip install --upgrade pip
pip install -r requirements.txt
```
---
## 2. Configuration
### Create `.env` file
```bash
# Copy the example file
cp .env.example .env
# Edit with your favorite editor
nano .env
# OR
vim .env
# OR
code .env # VS Code
```
### Fill in Required Values
At minimum, you need:
```env
# Choose environment: "dev" or "live"
ENVIRONMENT=dev
# Discord bot tokens (get from Discord Developer Portal)
DISCORD_TOKEN_DEV=your_dev_bot_token_here
DISCORD_TOKEN_LIVE=your_live_bot_token_here
# Spotify credentials (get from Spotify Developer Dashboard)
SPOTIFY_CLIENT_ID=your_spotify_client_id
SPOTIFY_CLIENT_SECRET=your_spotify_secret
```
**Optional but recommended:**
```env
# Bot settings
DISCORD_PREFIX==
STATUS_TYPE=listening
STATUS_TEXT=Zilean's Theme
# Colors (hex format)
COLOR_PRIMARY=#7289DA
COLOR_SUCCESS=#43B581
COLOR_ERROR=#F04747
COLOR_WARNING=#FAA61A
```
---
## 3. Create Data Directory
```bash
# Create directory for database
mkdir -p data
# The database file (music.db) will be created automatically on first run
```
---
## 4. Run the Bot
### Development Mode
```bash
# Make sure .env has ENVIRONMENT=dev
source venv/bin/activate # If not already activated
python main.py
```
### Production Mode
```bash
# Change .env to ENVIRONMENT=live
source venv/bin/activate
python main.py
```
---
## 5. Switching Between Dev and Live
**Super easy!** Just change one line in `.env`:
```bash
# For development bot
ENVIRONMENT=dev
# For production bot
ENVIRONMENT=live
```
The bot will automatically use the correct token!
---
## Troubleshooting
### "Configuration Error: DISCORD_TOKEN_DEV not found"
- Make sure you copied `.env.example` to `.env`
- Check that `.env` has the token values filled in
- Token should NOT have quotes around it
### "No module named 'dotenv'"
```bash
pip install python-dotenv
```
### "FFmpeg not found"
```bash
# Debian/Ubuntu
sudo apt install ffmpeg
# macOS
brew install ffmpeg
# Arch Linux
sudo pacman -S ffmpeg
```
### Python version issues
yt-dlp requires Python 3.10+. Check your version:
```bash
python --version
```
If too old, install newer Python and recreate venv:
```bash
python3.12 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
```
---
## Project Structure
```
groovy-zilean/
├── main.py # Entry point (run this!)
├── bot.py # Bot class definition
├── config.py # Configuration management
├── .env # YOUR secrets (never commit!)
├── .env.example # Template (safe to commit)
├── requirements.txt # Python dependencies
├── cogs/
│ └── music/ # Music functionality
│ ├── main.py # Commands
│ ├── queue.py # Queue management
│ ├── util.py # Utilities
│ └── translate.py # URL/search handling
└── data/
└── music.db # SQLite database (auto-created)
```
---
## Next Steps
- Check out `PRODUCTION_ROADMAP.md` for the full development plan
- See `README.md` for feature list and usage
- Join your test server and try commands!
**Happy coding!** 🎵⏱️

25
bot.py
View File

@@ -1,30 +1,20 @@
"""
Groovy-Zilean Bot Class
Main bot implementation with cog loading and background tasks
"""
from typing import Any
from discord.ext import commands from discord.ext import commands
from discord.ext import tasks from discord.ext import tasks
from cogs.music.main import music from cogs.music.main import music
from help import GroovyHelp from help import GroovyHelp # Import the new Help Cog
# List of cogs to load on startup cogs = [
cogs: list[type[commands.Cog]] = [
music, music,
GroovyHelp GroovyHelp
] ]
class Groovy(commands.Bot): class Groovy(commands.Bot):
"""Custom bot class with automatic cog loading and inactivity checking""" def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
# We force help_command to None because we are using a custom Cog for it # We force help_command to None because we are using a custom Cog for it
# But we pass all other args (like command_prefix) to the parent # But we pass all other args (like command_prefix) to the parent
super().__init__(*args, help_command=None, **kwargs) super().__init__(*args, help_command=None, **kwargs)
async def on_ready(self) -> None: async def on_ready(self):
import config # Imported here to avoid circular dependencies if any import config # Imported here to avoid circular dependencies if any
# Set status # Set status
@@ -57,12 +47,11 @@ class Groovy(commands.Bot):
print(f"{self.user} is ready and online!") print(f"{self.user} is ready and online!")
@tasks.loop(seconds=30) @tasks.loop(seconds=30)
async def inactivity_checker(self) -> None: async def inactivity_checker(self):
"""Check for inactive voice connections every 30 seconds""" """Check for inactive voice connections"""
from cogs.music import util from cogs.music import util
await util.check_inactivity(self) await util.check_inactivity(self)
@inactivity_checker.before_loop @inactivity_checker.before_loop
async def before_inactivity_checker(self) -> None: async def before_inactivity_checker(self):
"""Wait for bot to be ready before starting inactivity checker"""
await self.wait_until_ready() await self.wait_until_ready()

View File

@@ -1,315 +0,0 @@
"""
Database Manager for Groovy-Zilean
Centralizes all database operations and provides a clean interface.
Makes future PostgreSQL migration much easier.
"""
import sqlite3
from contextlib import contextmanager
from typing import Optional, List, Tuple, Any
import config
class DatabaseManager:
"""Manages database connections and operations"""
def __init__(self):
self.db_path = config.get_db_path()
@contextmanager
def get_connection(self):
"""
Context manager for database connections.
Automatically handles commit/rollback and closing.
Usage:
with db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(...)
"""
conn = sqlite3.connect(self.db_path)
try:
yield conn
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
def initialize_tables(self):
"""Create database tables if they don't exist"""
with self.get_connection() as conn:
cursor = conn.cursor()
# Create servers table
cursor.execute('''CREATE TABLE IF NOT EXISTS servers (
server_id TEXT PRIMARY KEY,
is_playing INTEGER DEFAULT 0,
song_name TEXT,
song_url TEXT,
song_thumbnail TEXT,
loop_mode TEXT DEFAULT 'off',
volume INTEGER DEFAULT 100,
effect TEXT DEFAULT 'none',
song_start_time REAL DEFAULT 0,
song_duration INTEGER DEFAULT 0
);''')
# Set all to not playing on startup
cursor.execute("UPDATE servers SET is_playing = 0;")
# Migrations for existing databases - add columns if missing
migrations = [
("loop_mode", "TEXT DEFAULT 'off'"),
("volume", "INTEGER DEFAULT 100"),
("effect", "TEXT DEFAULT 'none'"),
("song_start_time", "REAL DEFAULT 0"),
("song_duration", "INTEGER DEFAULT 0"),
("song_thumbnail", "TEXT DEFAULT ''"),
("song_url", "TEXT DEFAULT ''")
]
for col_name, col_type in migrations:
try:
cursor.execute(f"ALTER TABLE servers ADD COLUMN {col_name} {col_type};")
except sqlite3.OperationalError:
# Column already exists, skip
pass
# Create songs/queue table
cursor.execute('''CREATE TABLE IF NOT EXISTS songs (
server_id TEXT NOT NULL,
song_link TEXT,
queued_by TEXT,
position INTEGER NOT NULL,
title TEXT,
thumbnail TEXT,
duration INTEGER,
PRIMARY KEY (position),
FOREIGN KEY (server_id) REFERENCES servers(server_id)
);''')
# Clear all songs on startup
cursor.execute("DELETE FROM songs;")
# ===================================
# Server Operations
# ===================================
def ensure_server_exists(self, server_id: str) -> None:
"""Add server to database if it doesn't exist"""
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) FROM servers WHERE server_id = ?', (server_id,))
if cursor.fetchone()[0] == 0:
cursor.execute('''INSERT INTO servers (server_id, loop_mode, volume, effect, song_thumbnail, song_url)
VALUES (?, 'off', 100, 'none', '', '')''', (server_id,))
def set_server_playing(self, server_id: str, playing: bool) -> None:
"""Update server playing status"""
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
val = 1 if playing else 0
cursor.execute("UPDATE servers SET is_playing = ? WHERE server_id = ?", (val, server_id))
def is_server_playing(self, server_id: str) -> bool:
"""Check if server is currently playing"""
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
cursor.execute("SELECT is_playing FROM servers WHERE server_id = ?", (server_id,))
res = cursor.fetchone()
return True if res and res[0] == 1 else False
def set_current_song(self, server_id: str, title: str, url: str, thumbnail: str = "", duration: int = 0, start_time: float = 0) -> None:
"""Update currently playing song information"""
with self.get_connection() as conn:
cursor = conn.cursor()
# Ensure duration is an integer
try:
duration = int(duration)
except:
duration = 0
cursor.execute(''' UPDATE servers
SET song_name = ?, song_url = ?, song_thumbnail = ?, song_start_time = ?, song_duration = ?
WHERE server_id = ?''',
(title, url, thumbnail, start_time, duration, server_id))
def get_current_song(self, server_id: str) -> dict:
"""Get current song info"""
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(''' SELECT song_name, song_thumbnail, song_url FROM servers WHERE server_id = ? LIMIT 1;''', (server_id,))
result = cursor.fetchone()
if result:
return {'title': result[0], 'thumbnail': result[1], 'url': result[2]}
return {'title': "Nothing", 'thumbnail': None, 'url': ''}
def get_current_progress(self, server_id: str) -> Tuple[int, int, float]:
"""Get playback progress (elapsed, duration, percentage)"""
import time
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''SELECT song_start_time, song_duration, is_playing FROM servers WHERE server_id = ? LIMIT 1;''', (server_id,))
result = cursor.fetchone()
if not result or result[2] == 0:
return 0, 0, 0.0
start_time, duration, _ = result
if duration is None or duration == 0:
return 0, 0, 0.0
elapsed = int(time.time() - start_time)
elapsed = min(elapsed, duration)
percentage = (elapsed / duration) * 100 if duration > 0 else 0
return elapsed, duration, percentage
# ===================================
# Queue Operations
# ===================================
def add_song(self, server_id: str, song_link: str, queued_by: str, title: str, thumbnail: str = "", duration: int = 0, position: Optional[int] = None) -> int:
"""Add song to queue, returns position"""
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
if position is None:
# Add to end
cursor.execute("SELECT MAX(position) FROM songs WHERE server_id = ?", (server_id,))
max_pos = cursor.fetchone()[0]
position = (max_pos + 1) if max_pos is not None else 0
else:
# Insert at specific position (shift others down)
cursor.execute("UPDATE songs SET position = position + 1 WHERE server_id = ? AND position >= ?",
(server_id, position))
cursor.execute("""INSERT INTO songs VALUES (?, ?, ?, ?, ?, ?, ?)""",
(server_id, song_link, queued_by, position, title, thumbnail, duration))
return position
def get_next_song(self, server_id: str) -> Optional[Tuple]:
"""Get the next song in queue (doesn't remove it)"""
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''SELECT * FROM songs WHERE server_id = ? ORDER BY position LIMIT 1;''', (server_id,))
return cursor.fetchone()
def remove_song(self, server_id: str, position: int) -> None:
"""Remove song at position from queue"""
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''DELETE FROM songs WHERE server_id = ? AND position = ?''', (server_id, position))
def get_queue(self, server_id: str, limit: int = 10) -> Tuple[int, List[Tuple]]:
"""Get songs in queue (returns max_position, list of songs)"""
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
cursor.execute("SELECT title, duration, queued_by FROM songs WHERE server_id = ? ORDER BY position LIMIT ?",
(server_id, limit))
songs = cursor.fetchall()
cursor.execute("SELECT MAX(position) FROM songs WHERE server_id = ?", (server_id,))
max_pos = cursor.fetchone()[0]
max_pos = max_pos if max_pos is not None else -1
return max_pos, songs
def clear_queue(self, server_id: str) -> None:
"""Clear all songs from queue"""
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
cursor.execute("DELETE FROM songs WHERE server_id = ?", (server_id,))
def shuffle_queue(self, server_id: str) -> bool:
"""Shuffle the queue randomly, returns success"""
import random
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
cursor.execute("SELECT position, song_link, queued_by, title, thumbnail, duration FROM songs WHERE server_id = ? ORDER BY position",
(server_id,))
songs = cursor.fetchall()
if len(songs) <= 1:
return False
random.shuffle(songs)
cursor.execute("DELETE FROM songs WHERE server_id = ?", (server_id,))
for i, s in enumerate(songs):
cursor.execute("INSERT INTO songs VALUES (?, ?, ?, ?, ?, ?, ?)",
(server_id, s[1], s[2], i, s[3], s[4], s[5]))
return True
# ===================================
# Settings Operations
# ===================================
def get_loop_mode(self, server_id: str) -> str:
"""Get loop mode: 'off', 'song', or 'queue'"""
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
cursor.execute("SELECT loop_mode FROM servers WHERE server_id = ?", (server_id,))
res = cursor.fetchone()
return res[0] if res else 'off'
def set_loop_mode(self, server_id: str, mode: str) -> None:
"""Set loop mode: 'off', 'song', or 'queue'"""
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
cursor.execute("UPDATE servers SET loop_mode = ? WHERE server_id = ?", (mode, server_id))
def get_volume(self, server_id: str) -> int:
"""Get volume (0-200)"""
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
cursor.execute("SELECT volume FROM servers WHERE server_id = ?", (server_id,))
res = cursor.fetchone()
return res[0] if res else 100
def set_volume(self, server_id: str, volume: int) -> int:
"""Set volume (0-200), returns the set volume"""
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
cursor.execute("UPDATE servers SET volume = ? WHERE server_id = ?", (volume, server_id))
return volume
def get_effect(self, server_id: str) -> str:
"""Get current audio effect"""
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
cursor.execute("SELECT effect FROM servers WHERE server_id = ?", (server_id,))
res = cursor.fetchone()
return res[0] if res else 'none'
def set_effect(self, server_id: str, effect: str) -> None:
"""Set audio effect"""
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
cursor.execute("UPDATE servers SET effect = ? WHERE server_id = ?", (effect, server_id))
# Global instance
db = DatabaseManager()

View File

@@ -12,7 +12,28 @@ from cogs.music.help import music_help
import spotipy import spotipy
from spotipy.oauth2 import SpotifyClientCredentials from spotipy.oauth2 import SpotifyClientCredentials
import config # Use centralized config
# Fix this pls
import json
#from .. import config
# Read data from JSON file in ./data/config.json
def read_data():
with open("./data/config.json", "r") as file:
return json.load(file)
raise Exception("Could not load config data")
def get_spotify_creds():
data = read_data()
data = data.get("spotify")
SCID = data.get("SCID")
secret = data.get("SECRET")
return SCID, secret
@@ -30,13 +51,10 @@ class music(commands.Cog):
help_command.cog = self help_command.cog = self
self.help_command = help_command self.help_command = help_command
# Get Spotify credentials from centralized config SCID, secret = get_spotify_creds()
spotify_id, spotify_secret = config.get_spotify_creds()
# Authentication - without user # Authentication - without user
client_credentials_manager = SpotifyClientCredentials( client_credentials_manager = SpotifyClientCredentials(client_id=SCID,
client_id=spotify_id, client_secret=secret)
client_secret=spotify_secret
)
self.sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager) self.sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)
@@ -328,7 +346,7 @@ class music(commands.Cog):
app_commands.Choice(name="Song", value="song"), app_commands.Choice(name="Song", value="song"),
app_commands.Choice(name="Queue", value="queue") app_commands.Choice(name="Queue", value="queue")
]) ])
async def loop(self, ctx: Context, mode: str | None = None): async def loop(self, ctx: Context, mode: str = None):
"""Toggle between loop modes or set a specific mode""" """Toggle between loop modes or set a specific mode"""
server = ctx.guild server = ctx.guild
@@ -391,7 +409,7 @@ class music(commands.Cog):
description="Set playback volume", description="Set playback volume",
aliases=['vol', 'v']) aliases=['vol', 'v'])
@app_commands.describe(level="Volume level (0-200%, default shows current)") @app_commands.describe(level="Volume level (0-200%, default shows current)")
async def volume(self, ctx: Context, level: int | None = None): async def volume(self, ctx: Context, level: int = None):
"""Set or display the current volume""" """Set or display the current volume"""
server = ctx.guild server = ctx.guild
@@ -435,7 +453,7 @@ class music(commands.Cog):
description="Apply audio effects to playback", description="Apply audio effects to playback",
aliases=['fx', 'filter']) aliases=['fx', 'filter'])
@app_commands.describe(effect_name="The audio effect to apply (leave empty to see list)") @app_commands.describe(effect_name="The audio effect to apply (leave empty to see list)")
async def effect(self, ctx: Context, effect_name: str | None = None): async def effect(self, ctx: Context, effect_name: str = None):
"""Apply or list audio effects""" """Apply or list audio effects"""
server = ctx.guild server = ctx.guild

View File

@@ -1,14 +1,14 @@
""" from http import server
Queue management for Groovy-Zilean music bot import sqlite3
Now using centralized database manager for cleaner code import random
""" import time
import discord import discord
import asyncio import asyncio
import time
from .translate import search_song from .translate import search_song
from .db_manager import db
db_path = "./data/music.db"
# Base FFmpeg options (will be modified by effects) # Base FFmpeg options (will be modified by effects)
BASE_FFMPEG_OPTS = { BASE_FFMPEG_OPTS = {
@@ -103,224 +103,342 @@ def get_effect_options(effect_name):
return effects.get(effect_name, effects['none']) return effects.get(effect_name, effects['none'])
# =================================== # Creates the tables if they don't exist
# Initialization
# ===================================
def initialize_tables(): def initialize_tables():
"""Initialize database tables""" # Connect to the database
db.initialize_tables() conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Create servers table if it doesn't exist
cursor.execute('''CREATE TABLE IF NOT EXISTS servers (
server_id TEXT PRIMARY KEY,
is_playing INTEGER DEFAULT 0,
song_name TEXT,
song_url TEXT,
song_thumbnail TEXT,
loop_mode TEXT DEFAULT 'off',
volume INTEGER DEFAULT 100,
effect TEXT DEFAULT 'none',
song_start_time REAL DEFAULT 0,
song_duration INTEGER DEFAULT 0
);''')
# =================================== # Set all to not playing
# Queue Management cursor.execute("UPDATE servers SET is_playing = 0;")
# ===================================
async def add_song(server_id, details, queued_by, position=None): # Add new columns if they don't exist (for existing databases)
""" # Migrations for existing databases
Add a song to the queue columns = [
("loop_mode", "TEXT DEFAULT 'off'"),
("volume", "INTEGER DEFAULT 100"),
("effect", "TEXT DEFAULT 'none'"),
("song_start_time", "REAL DEFAULT 0"),
("song_duration", "INTEGER DEFAULT 0"),
("song_thumbnail", "TEXT DEFAULT ''"),
("song_url", "TEXT DEFAULT ''") # NEW
]
Args: for col_name, col_type in columns:
server_id: Discord server ID try:
details: Dictionary with song info (url, title, thumbnail, duration) or string cursor.execute(f"ALTER TABLE servers ADD COLUMN {col_name} {col_type};")
queued_by: Username who queued the song except sqlite3.OperationalError:
position: Optional position in queue (None = end of queue) pass
cursor.execute('''CREATE TABLE IF NOT EXISTS songs (
server_id TEXT NOT NULL,
song_link TEXT,
queued_by TEXT,
position INTEGER NOT NULL,
title TEXT,
thumbnail TEXT,
duration INTEGER,
PRIMARY KEY (position),
FOREIGN KEY (server_id) REFERENCES servers(server_id)
);''')
cursor.execute("DELETE FROM songs;")
conn.commit()
conn.close()
# Queue a song in the db
async def add_song(server_id, details, queued_by):
# Connect to db
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
await add_server(server_id, cursor, conn)
max_order_num = await get_max(server_id, cursor) + 1
Returns:
Position in queue
"""
if isinstance(details, str): if isinstance(details, str):
# Fallback for raw strings (legacy support) # Fallback for raw strings
pos = db.add_song( cursor.execute("""INSERT INTO songs VALUES (?, ?, ?, ?, ?, ?, ?)""",
server_id=str(server_id), (server_id, "Not grabbed", queued_by, max_order_num, details, "Unknown", 0))
song_link="Not grabbed",
queued_by=queued_by,
title=details,
thumbnail="Unknown",
duration=0,
position=position
)
else: else:
# Standard dictionary format # Save exact duration and thumbnail from the start
pos = db.add_song( cursor.execute("""INSERT INTO songs VALUES (?, ?, ?, ?, ?, ?, ?)""",
server_id=str(server_id), (server_id, details['url'], queued_by, max_order_num, details['title'], details['thumbnail'], details['duration']))
song_link=details['url'],
queued_by=queued_by,
title=details['title'],
thumbnail=details.get('thumbnail', ''),
duration=details.get('duration', 0),
position=position
)
return pos conn.commit()
conn.close()
return max_order_num
# Pop song from server (respects loop mode)
async def pop(server_id, ignore=False, skip_mode=False): async def pop(server_id, ignore=False, skip_mode=False):
""" """
Pop next song from queue Pop next song from queue
Args:
server_id: Discord server ID
ignore: Skip the song without returning URL ignore: Skip the song without returning URL
skip_mode: True when called from skip command (affects loop song behavior) skip_mode: True when called from skip command (affects loop song behavior)
Returns:
Song URL or None
""" """
result = db.get_next_song(str(server_id)) # Connect to db
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# JUST INCASE!
await add_server(server_id, cursor, conn)
# Fetch info: link(1), title(4), thumbnail(5), duration(6)
cursor.execute('''SELECT * FROM songs WHERE server_id = ? ORDER BY position LIMIT 1;''', (server_id,))
result = cursor.fetchone()
conn.commit()
conn.close()
if result is None: if result is None:
return None return None
elif ignore:
# result format: (server_id, song_link, queued_by, position, title, thumbnail, duration) await mark_song_as_finished(server_id, result[3])
server_id_str, song_link, queued_by, position, title, thumbnail, duration = result
if ignore:
db.remove_song(str(server_id), position)
return None return None
elif result[1] == "Not grabbed":
# Handle lazy-loaded songs (not yet fetched from YouTube) # Lazy load logic
if song_link == "Not grabbed": song_list = await search_song(result[4])
song_list = await search_song(title)
if not song_list: if not song_list:
db.remove_song(str(server_id), position)
return None return None
song = song_list[0] song = song_list[0]
await set_current_song(
server_id, await set_current_song(server_id, song['title'], song.get('thumbnail', ''), song.get('duration', 0))
song['title'],
song['url'],
song.get('thumbnail', ''),
song.get('duration', 0)
)
# Check loop mode before removing # Check loop mode before removing
loop_mode = await get_loop_mode(server_id) loop_mode = await get_loop_mode(server_id)
if loop_mode != 'song': if loop_mode != 'song': # Only remove if not looping song
db.remove_song(str(server_id), position) await mark_song_as_finished(server_id, result[3])
return song['url'] return song['url']
# Standard pre-fetched song # Pre-grabbed logic (Standard)
await set_current_song(server_id, title, song_link, thumbnail, duration) # result[1] is url, result[5] is thumbnail, result[6] is duration
await set_current_song(server_id, result[4], result[1], result[5], result[6])
# Check loop mode before removing # Check loop mode before removing
loop_mode = await get_loop_mode(server_id) loop_mode = await get_loop_mode(server_id)
if loop_mode != 'song': if loop_mode != 'song': # Only remove if not looping song
db.remove_song(str(server_id), position) await mark_song_as_finished(server_id, result[3])
return song_link return result[1]
# Add server to db if first time queuing
async def add_server(server_id, cursor, conn):
cursor.execute('SELECT COUNT(*) FROM servers WHERE server_id = ?', (server_id,))
if cursor.fetchone()[0] == 0:
cursor.execute('''INSERT INTO servers (server_id, loop_mode, volume, effect, song_thumbnail, song_url)
VALUES (?, 'off', 100, 'none', '', '')''', (server_id,))
conn.commit()
async def grab_songs(server_id): # set song as played and update indexes
""" async def mark_song_as_finished(server_id, order_num):
Get current queue # Connect to the database
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
Returns: # Update the song as finished
Tuple of (max_position, list_of_songs) cursor.execute('''DELETE FROM songs
""" WHERE server_id = ? AND position = ?''',
return db.get_queue(str(server_id), limit=10) (server_id, order_num))
# Close connection
async def clear(server_id): conn.commit()
"""Clear the queue for a server""" conn.close()
db.clear_queue(str(server_id))
await update_server(server_id, False)
async def shuffle_queue(server_id):
"""Shuffle the queue randomly"""
return db.shuffle_queue(str(server_id))
# ===================================
# Server State Management
# ===================================
async def update_server(server_id, playing):
"""Update server playing status"""
db.set_server_playing(str(server_id), playing)
async def is_server_playing(server_id):
"""Check if server is currently playing"""
return db.is_server_playing(str(server_id))
# set the current playing song of the server
async def set_current_song(server_id, title, url, thumbnail="", duration=0): async def set_current_song(server_id, title, url, thumbnail="", duration=0):
"""Set the currently playing song""" conn = sqlite3.connect(db_path)
db.set_current_song( cursor = conn.cursor()
str(server_id), start_time = time.time()
title,
url,
thumbnail,
duration,
time.time() # start_time
)
# Ensure duration is an integer
try:
duration = int(duration)
except:
duration = 0
cursor.execute(''' UPDATE servers
SET song_name = ?, song_url = ?, song_thumbnail = ?, song_start_time = ?, song_duration = ?
WHERE server_id = ?''',
(title, url, thumbnail, start_time, duration, server_id))
conn.commit()
conn.close()
# Returns dictionary with title and thumbnail
async def get_current_song(server_id): async def get_current_song(server_id):
"""Get current song info""" conn = sqlite3.connect(db_path)
return db.get_current_song(str(server_id)) cursor = conn.cursor()
cursor.execute(''' SELECT song_name, song_thumbnail, song_url FROM servers WHERE server_id = ? LIMIT 1;''', (server_id,))
result = cursor.fetchone()
conn.commit()
conn.close()
if result:
return {'title': result[0], 'thumbnail': result[1], 'url': result[2]}
return {'title': "Nothing", 'thumbnail': None, 'url': ''}
async def get_current_progress(server_id): async def get_current_progress(server_id):
"""Get playback progress (elapsed, duration, percentage)""" conn = sqlite3.connect(db_path)
return db.get_current_progress(str(server_id)) cursor = conn.cursor()
cursor.execute('''SELECT song_start_time, song_duration, is_playing FROM servers WHERE server_id = ? LIMIT 1;''', (server_id,))
result = cursor.fetchone()
conn.close() # Close quickly
if not result or result[2] == 0:
return 0, 0, 0.0
# =================================== start_time, duration, _ = result
# Settings Management
# ===================================
if duration is None or duration == 0:
return 0, 0, 0.0
elapsed = int(time.time() - start_time)
elapsed = min(elapsed, duration)
percentage = (elapsed / duration) * 100 if duration > 0 else 0
return elapsed, duration, percentage
async def get_max(server_id, cursor):
cursor.execute("SELECT MAX(position) FROM songs WHERE server_id = ?", (server_id,))
result = cursor.fetchone()
return result[0] if result[0] is not None else -1
async def update_server(server_id, playing):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
await add_server(server_id, cursor, conn)
val = 1 if playing else 0
cursor.execute("UPDATE servers SET is_playing = ? WHERE server_id = ?", (val, server_id))
conn.commit()
conn.close()
async def is_server_playing(server_id):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
await add_server(server_id, cursor, conn)
cursor.execute("SELECT is_playing FROM servers WHERE server_id = ?", (server_id,))
res = cursor.fetchone()
conn.close()
return True if res[0] == 1 else False
async def clear(server_id):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
await add_server(server_id, cursor, conn)
await update_server(server_id, False)
cursor.execute("DELETE FROM songs WHERE server_id = ?", (server_id,))
conn.commit()
conn.close()
async def grab_songs(server_id):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
await add_server(server_id, cursor, conn)
cursor.execute("SELECT title, duration, queued_by FROM songs WHERE server_id = ? ORDER BY position LIMIT 10", (server_id,))
songs = cursor.fetchall()
max_pos = await get_max(server_id, cursor)
conn.close()
return max_pos, songs
# --- Effects/Loop/Shuffle/Volume (Simplified Paste) ---
async def get_loop_mode(server_id): async def get_loop_mode(server_id):
"""Get loop mode: 'off', 'song', or 'queue'""" conn = sqlite3.connect(db_path)
return db.get_loop_mode(str(server_id)) cursor = conn.cursor()
await add_server(server_id, cursor, conn)
cursor.execute("SELECT loop_mode FROM servers WHERE server_id = ?", (server_id,))
res = cursor.fetchone()
conn.close()
return res[0] if res else 'off'
async def set_loop_mode(server_id, mode): async def set_loop_mode(server_id, mode):
"""Set loop mode: 'off', 'song', or 'queue'""" conn = sqlite3.connect(db_path)
db.set_loop_mode(str(server_id), mode) cursor = conn.cursor()
await add_server(server_id, cursor, conn)
cursor.execute("UPDATE servers SET loop_mode = ? WHERE server_id = ?", (mode, server_id))
conn.commit()
conn.close()
async def get_volume(server_id): async def get_volume(server_id):
"""Get volume (0-200)""" conn = sqlite3.connect(db_path)
return db.get_volume(str(server_id)) cursor = conn.cursor()
await add_server(server_id, cursor, conn)
cursor.execute("SELECT volume FROM servers WHERE server_id = ?", (server_id,))
res = cursor.fetchone()
conn.close()
return res[0] if res else 100
async def set_volume(server_id, vol): async def set_volume(server_id, vol):
"""Set volume (0-200)""" conn = sqlite3.connect(db_path)
return db.set_volume(str(server_id), vol) cursor = conn.cursor()
await add_server(server_id, cursor, conn)
cursor.execute("UPDATE servers SET volume = ? WHERE server_id = ?", (vol, server_id))
conn.commit()
conn.close()
return vol
async def shuffle_queue(server_id):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
await add_server(server_id, cursor, conn)
cursor.execute("SELECT position, song_link, queued_by, title, thumbnail, duration FROM songs WHERE server_id = ? ORDER BY position", (server_id,))
songs = cursor.fetchall()
if len(songs) <= 1:
conn.close()
return False
random.shuffle(songs)
cursor.execute("DELETE FROM songs WHERE server_id = ?", (server_id,))
for i, s in enumerate(songs):
cursor.execute("INSERT INTO songs VALUES (?, ?, ?, ?, ?, ?, ?)", (server_id, s[1], s[2], i, s[3], s[4], s[5]))
conn.commit()
conn.close()
return True
async def get_effect(server_id): async def get_effect(server_id):
"""Get current audio effect""" conn = sqlite3.connect(db_path)
return db.get_effect(str(server_id)) cursor = conn.cursor()
await add_server(server_id, cursor, conn)
cursor.execute("SELECT effect FROM servers WHERE server_id = ?", (server_id,))
res = cursor.fetchone()
conn.close()
return res[0] if res else 'none'
async def set_effect(server_id, fx): async def set_effect(server_id, fx):
"""Set audio effect""" conn = sqlite3.connect(db_path)
db.set_effect(str(server_id), fx) cursor = conn.cursor()
await add_server(server_id, cursor, conn)
cursor.execute("UPDATE servers SET effect = ? WHERE server_id = ?", (fx, server_id))
# =================================== conn.commit()
# Effect Metadata conn.close()
# ===================================
def list_all_effects(): def list_all_effects():
"""List all available audio effects"""
return [ return [
'none', 'bassboost', 'nightcore', 'slowed', 'earrape', 'deepfry', 'distortion', 'none', 'bassboost', 'nightcore', 'slowed', 'earrape', 'deepfry', 'distortion',
'reverse', 'chipmunk', 'demonic', 'underwater', 'robot', 'reverse', 'chipmunk', 'demonic', 'underwater', 'robot',
'8d', 'vibrato', 'tremolo', 'echo', 'phone', 'megaphone' '8d', 'vibrato', 'tremolo', 'echo', 'phone', 'megaphone'
] ]
def get_effect_emoji(effect_name): def get_effect_emoji(effect_name):
"""Get emoji for effect""" # Short list of emoji mappings
emojis = { emojis = {
'none': '', 'none': '', # Changed to generic Sparkles
'bassboost': '💥', 'bassboost': '💥',
'nightcore': '', 'nightcore': '',
'slowed': '🐢', 'slowed': '🐢',
@@ -341,9 +459,7 @@ def get_effect_emoji(effect_name):
} }
return emojis.get(effect_name, '') return emojis.get(effect_name, '')
def get_effect_description(effect_name): def get_effect_description(effect_name):
"""Get description for effect"""
descriptions = { descriptions = {
'none': 'Normal audio', 'none': 'Normal audio',
'bassboost': 'MAXIMUM BASS 🔊', 'bassboost': 'MAXIMUM BASS 🔊',
@@ -366,60 +482,43 @@ def get_effect_description(effect_name):
} }
return descriptions.get(effect_name, 'Unknown effect') return descriptions.get(effect_name, 'Unknown effect')
# ===================================
# Playback
# ===================================
async def play(ctx): async def play(ctx):
"""
Main playback loop - plays songs from queue
"""
server_id = ctx.guild.id server_id = ctx.guild.id
voice_client = ctx.voice_client voice_client = ctx.voice_client
if voice_client is None: if voice_client is None:
await update_server(server_id, False) await update_server(server_id, False)
return return
# Wait for current song to finish
while voice_client.is_playing(): while voice_client.is_playing():
await asyncio.sleep(0.5) await asyncio.sleep(0.5)
# Get next song
url = await pop(server_id) url = await pop(server_id)
if url is None: if url is None:
await update_server(server_id, False) await update_server(server_id, False)
return return
try: try:
# Get volume and effect settings # Scale volume down to prevent earrape
vol = await get_volume(server_id) / 100.0 * 0.25 # Scale down by 0.25 # User sees 0-200%, but internally we scale by 0.25
# So user's 100% = 0.25 actual volume (25%)
vol = await get_volume(server_id) / 100.0 * 0.25
fx = await get_effect(server_id) fx = await get_effect(server_id)
opts = get_effect_options(fx) opts = get_effect_options(fx)
# Create audio source
src = discord.FFmpegPCMAudio(url, **opts) src = discord.FFmpegPCMAudio(url, **opts)
src = discord.PCMVolumeTransformer(src, volume=vol) src = discord.PCMVolumeTransformer(src, volume=vol)
# After callback - play next song
def after(e): def after(e):
if e: if e: print(e)
print(f"Playback error: {e}") if voice_client and not voice_client.is_connected(): return
if voice_client and not voice_client.is_connected():
return
# Schedule next song
coro = play(ctx) coro = play(ctx)
fut = asyncio.run_coroutine_threadsafe(coro, ctx.bot.loop) fut = asyncio.run_coroutine_threadsafe(coro, ctx.bot.loop)
try: try: fut.result()
fut.result() except: pass
except Exception as ex:
print(f"Error in after callback: {ex}")
voice_client.play(src, after=after) voice_client.play(src, after=after)
except Exception as e: except Exception as e:
print(f"Play error: {e}") print(f"Play error: {e}")
# Try next song on error
await play(ctx) await play(ctx)

View File

@@ -1,9 +1,5 @@
""" # Handles translating urls and search terms
URL and search query handling for Groovy-Zilean
Translates YouTube, Spotify, SoundCloud URLs and search queries into playable audio
"""
from typing import Any
import yt_dlp as ytdlp import yt_dlp as ytdlp
import spotipy import spotipy
@@ -26,7 +22,7 @@ ydl_opts = {
}, },
} }
async def main(url: str, sp: spotipy.Spotify) -> list[dict[str, Any] | str]: async def main(url, sp):
#url = url.lower() #url = url.lower()
@@ -64,7 +60,7 @@ async def main(url: str, sp: spotipy.Spotify) -> list[dict[str, Any] | str]:
return [] return []
async def search_song(search: str) -> list[dict[str, Any]]: async def search_song(search):
with ytdlp.YoutubeDL(ydl_opts) as ydl: with ytdlp.YoutubeDL(ydl_opts) as ydl:
try: try:
info = ydl.extract_info(f"ytsearch1:{search}", download=False) info = ydl.extract_info(f"ytsearch1:{search}", download=False)
@@ -93,7 +89,7 @@ async def search_song(search: str) -> list[dict[str, Any]]:
return [data] return [data]
async def spotify_song(url: str, sp: spotipy.Spotify) -> list[dict[str, Any]]: async def spotify_song(url, sp):
track = sp.track(url.split("/")[-1].split("?")[0]) track = sp.track(url.split("/")[-1].split("?")[0])
search = "" search = ""
@@ -110,11 +106,7 @@ async def spotify_song(url: str, sp: spotipy.Spotify) -> list[dict[str, Any]]:
return await search_song(query) return await search_song(query)
async def spotify_playlist(url: str, sp: spotipy.Spotify) -> list[str | dict[str, Any]]: async def spotify_playlist(url, sp):
"""
Get songs from a Spotify playlist
Returns a mixed list where first item is dict, rest are search strings
"""
# Get the playlist uri code # Get the playlist uri code
code = url.split("/")[-1].split("?")[0] code = url.split("/")[-1].split("?")[0]
@@ -124,35 +116,41 @@ async def spotify_playlist(url: str, sp: spotipy.Spotify) -> list[str | dict[str
except spotipy.exceptions.SpotifyException: except spotipy.exceptions.SpotifyException:
return [] return []
# Go through the tracks and build search queries # Go through the tracks
songs: list[str | dict[str, Any]] = [] # Explicit type for mypy songs = []
for track in results: for track in results:
search = "" search = ""
# Fetch all artists # Fetch all artists
for artist in track['track']['artists']: for artist in track['track']['artists']:
# Add all artists to search # Add all artists to search
search += f"{artist['name']}, " search += f"{artist['name']}, "
# Remove last comma # Remove last column
search = search[:-2] search = search[:-2]
search += f" - {track['track']['name']}" search += f" - {track['track']['name']}"
songs.append(search) songs.append(search)
# Fetch first song's full data #searched_result = search_song(search)
#if searched_result == []:
#continue
#songs.append(searched_result[0])
while True: while True:
search_result = await search_song(songs[0]) # type: ignore search_result = await search_song(songs[0])
if search_result == []: if search_result == []:
songs.pop(0) songs.pop(0)
continue continue
else: else:
songs[0] = search_result[0] # Replace string with dict songs[0] = search_result[0]
break break
return songs return songs
async def song_download(url: str) -> list[dict[str, Any]]: async def song_download(url):
with ytdlp.YoutubeDL(ydl_opts) as ydl: with ytdlp.YoutubeDL(ydl_opts) as ydl:
try: try:
info = ydl.extract_info(url, download=False) info = ydl.extract_info(url, download=False)
@@ -182,7 +180,7 @@ async def song_download(url: str) -> list[dict[str, Any]]:
return [data] return [data]
async def playlist_download(url: str) -> list[dict[str, Any]]: async def playlist_download(url):
with ytdlp.YoutubeDL(ydl_opts) as ydl: with ytdlp.YoutubeDL(ydl_opts) as ydl:
try: try:
info = ydl.extract_info(url, download=False) info = ydl.extract_info(url, download=False)

View File

@@ -1,9 +1,3 @@
"""
Utility functions for Groovy-Zilean music bot
Handles voice channel operations, queue display, and inactivity tracking
"""
from typing import Any
import discord import discord
from discord.ext.commands.context import Context from discord.ext.commands.context import Context
from discord.ext.commands.converter import CommandError from discord.ext.commands.converter import CommandError
@@ -13,29 +7,15 @@ from . import queue
import asyncio import asyncio
# Track last activity time for each server # Track last activity time for each server
last_activity: dict[int, float] = {} last_activity = {}
# Joining/moving to the user's vc in a guild
async def join_vc(ctx: Context):
# ===================================
# Voice Channel Management
# ===================================
async def join_vc(ctx: Context) -> discord.VoiceClient:
"""
Join or move to the user's voice channel
Args:
ctx: Command context
Returns:
The voice client connection
Raises:
CommandError: If user is not in a voice channel
"""
# Get the user's vc # Get the user's vc
author_voice = getattr(ctx.author, "voice") author_voice = getattr(ctx.author, "voice")
if author_voice is None: if author_voice is None:
# Raise exception if user is not in vc
raise CommandError("User is not in voice channel") raise CommandError("User is not in voice channel")
# Get user's vc # Get user's vc
@@ -45,26 +25,19 @@ async def join_vc(ctx: Context) -> discord.VoiceClient:
# Join or move to the user's vc # Join or move to the user's vc
if ctx.voice_client is None: if ctx.voice_client is None:
vc_client = await vc.connect() vc = await vc.connect()
else: else:
vc_client = await ctx.voice_client.move_to(vc) # Safe to ignore type error for now
vc = await ctx.voice_client.move_to(vc)
# Update last activity # Update last activity
last_activity[ctx.guild.id] = asyncio.get_event_loop().time() last_activity[ctx.guild.id] = asyncio.get_event_loop().time()
return vc_client return vc
async def leave_vc(ctx: Context) -> None: # Leaving the voice channel of a user
""" async def leave_vc(ctx: Context):
Leave the voice channel and clean up
Args:
ctx: Command context
Raises:
CommandError: If bot is not in VC or user is not in same VC
"""
# If the bot is not in a vc of this server # If the bot is not in a vc of this server
if ctx.voice_client is None: if ctx.voice_client is None:
raise CommandError("I am not in a voice channel") raise CommandError("I am not in a voice channel")
@@ -100,18 +73,9 @@ async def leave_vc(ctx: Context) -> None:
del last_activity[ctx.guild.id] del last_activity[ctx.guild.id]
# =================================== # Auto-disconnect if inactive
# Inactivity Management async def check_inactivity(bot):
# =================================== """Background task to check for inactive voice connections"""
async def check_inactivity(bot: discord.Client) -> None:
"""
Background task to check for inactive voice connections
Auto-disconnects after 5 minutes of inactivity
Args:
bot: The Discord bot instance
"""
try: try:
current_time = asyncio.get_event_loop().time() current_time = asyncio.get_event_loop().time()
@@ -134,34 +98,20 @@ async def check_inactivity(bot: discord.Client) -> None:
print(f"Error in inactivity checker: {e}") print(f"Error in inactivity checker: {e}")
def update_activity(guild_id: int) -> None: # Update activity timestamp when playing
""" def update_activity(guild_id):
Update activity timestamp when a song starts playing """Call this when a song starts playing"""
Args:
guild_id: Discord guild/server ID
"""
last_activity[guild_id] = asyncio.get_event_loop().time() last_activity[guild_id] = asyncio.get_event_loop().time()
# =================================== # Interactive buttons for queue control
# Queue Display & Controls
# ===================================
class QueueControls(View): class QueueControls(View):
"""Interactive buttons for queue control""" def __init__(self, ctx):
def __init__(self, ctx: Context) -> None:
super().__init__(timeout=None) # No timeout allows buttons to stay active longer super().__init__(timeout=None) # No timeout allows buttons to stay active longer
self.ctx = ctx self.ctx = ctx
async def refresh_message(self, interaction: discord.Interaction) -> None: async def refresh_message(self, interaction: discord.Interaction):
""" """Helper to regenerate the embed and edit the message"""
Helper to regenerate the embed and edit the message
Args:
interaction: Discord interaction from button press
"""
try: try:
# Generate new embed # Generate new embed
embed, view = await generate_queue_ui(self.ctx) embed, view = await generate_queue_ui(self.ctx)
@@ -169,13 +119,10 @@ class QueueControls(View):
except Exception as e: except Exception as e:
# Fallback if edit fails # Fallback if edit fails
if not interaction.response.is_done(): if not interaction.response.is_done():
await interaction.response.send_message( await interaction.response.send_message("Refreshed, but something went wrong updating the display.", ephemeral=True)
"Refreshed, but something went wrong updating the display.",
ephemeral=True
)
@discord.ui.button(label="⏭️ Skip", style=discord.ButtonStyle.primary) @discord.ui.button(label="⏭️ Skip", style=discord.ButtonStyle.primary)
async def skip_button(self, interaction: discord.Interaction, button: Button) -> None: async def skip_button(self, interaction: discord.Interaction, button: Button):
if interaction.user not in self.ctx.voice_client.channel.members: if interaction.user not in self.ctx.voice_client.channel.members:
await interaction.response.send_message("❌ You must be in the voice channel!", ephemeral=True) await interaction.response.send_message("❌ You must be in the voice channel!", ephemeral=True)
return return
@@ -183,6 +130,11 @@ class QueueControls(View):
# Loop logic check # Loop logic check
loop_mode = await queue.get_loop_mode(self.ctx.guild.id) loop_mode = await queue.get_loop_mode(self.ctx.guild.id)
# Logic mimics the command
if loop_mode == 'song':
# Just restart current song effectively but here we assume standard skip behavior for button
pass
# Perform the skip # Perform the skip
await queue.pop(self.ctx.guild.id, True, skip_mode=True) await queue.pop(self.ctx.guild.id, True, skip_mode=True)
if self.ctx.voice_client: if self.ctx.voice_client:
@@ -192,45 +144,35 @@ class QueueControls(View):
await self.refresh_message(interaction) await self.refresh_message(interaction)
@discord.ui.button(label="🔀 Shuffle", style=discord.ButtonStyle.secondary) @discord.ui.button(label="🔀 Shuffle", style=discord.ButtonStyle.secondary)
async def shuffle_button(self, interaction: discord.Interaction, button: Button) -> None: async def shuffle_button(self, interaction: discord.Interaction, button: Button):
await queue.shuffle_queue(self.ctx.guild.id) await queue.shuffle_queue(self.ctx.guild.id)
await self.refresh_message(interaction) await self.refresh_message(interaction)
@discord.ui.button(label="🔁 Loop", style=discord.ButtonStyle.secondary) @discord.ui.button(label="🔁 Loop", style=discord.ButtonStyle.secondary)
async def loop_button(self, interaction: discord.Interaction, button: Button) -> None: async def loop_button(self, interaction: discord.Interaction, button: Button):
current_mode = await queue.get_loop_mode(self.ctx.guild.id) current_mode = await queue.get_loop_mode(self.ctx.guild.id)
new_mode = 'song' if current_mode == 'off' else ('queue' if current_mode == 'song' else 'off') new_mode = 'song' if current_mode == 'off' else ('queue' if current_mode == 'song' else 'off')
await queue.set_loop_mode(self.ctx.guild.id, new_mode) await queue.set_loop_mode(self.ctx.guild.id, new_mode)
await self.refresh_message(interaction) await self.refresh_message(interaction)
@discord.ui.button(label="🗑️ Clear", style=discord.ButtonStyle.danger) @discord.ui.button(label="🗑️ Clear", style=discord.ButtonStyle.danger)
async def clear_button(self, interaction: discord.Interaction, button: Button) -> None: async def clear_button(self, interaction: discord.Interaction, button: Button):
await queue.clear(self.ctx.guild.id) await queue.clear(self.ctx.guild.id)
if self.ctx.voice_client and self.ctx.voice_client.is_playing(): if self.ctx.voice_client and self.ctx.voice_client.is_playing():
self.ctx.voice_client.stop() self.ctx.voice_client.stop()
await self.refresh_message(interaction) await self.refresh_message(interaction)
@discord.ui.button(label="🔄 Refresh", style=discord.ButtonStyle.gray) @discord.ui.button(label="🔄 Refresh", style=discord.ButtonStyle.gray)
async def refresh_button(self, interaction: discord.Interaction, button: Button) -> None: async def refresh_button(self, interaction: discord.Interaction, button: Button):
await self.refresh_message(interaction) await self.refresh_message(interaction)
async def generate_queue_ui(ctx: Context):
async def generate_queue_ui(ctx: Context) -> tuple[discord.Embed, QueueControls]:
"""
Generate the queue embed and controls
Args:
ctx: Command context
Returns:
Tuple of (embed, view) for displaying queue
"""
guild_id = ctx.guild.id guild_id = ctx.guild.id
server = ctx.guild server = ctx.guild
# Fetch all data # Fetch all data
n, songs = await queue.grab_songs(guild_id) n, songs = await queue.grab_songs(guild_id)
current = await queue.get_current_song(guild_id) current = await queue.get_current_song(guild_id) # Returns title, thumbnail, url
loop_mode = await queue.get_loop_mode(guild_id) loop_mode = await queue.get_loop_mode(guild_id)
volume = await queue.get_volume(guild_id) volume = await queue.get_volume(guild_id)
effect = await queue.get_effect(guild_id) effect = await queue.get_effect(guild_id)
@@ -255,9 +197,11 @@ async def generate_queue_ui(ctx: Context) -> tuple[discord.Embed, QueueControls]
# Progress Bar Logic # Progress Bar Logic
progress_bar = "" progress_bar = ""
# Only show bar if duration > 0 (prevents weird 00:00 bars)
if duration > 0: if duration > 0:
bar_length = 16 bar_length = 16
filled = int((percentage / 100) * bar_length) filled = int((percentage / 100) * bar_length)
# Ensure filled isn't bigger than length
filled = min(filled, bar_length) filled = min(filled, bar_length)
bar_str = '' * filled + '🔘' + '' * (bar_length - filled) bar_str = '' * filled + '🔘' + '' * (bar_length - filled)
progress_bar = f"\n`{format_time(elapsed)}` {bar_str} `{format_time(duration)}`" progress_bar = f"\n`{format_time(elapsed)}` {bar_str} `{format_time(duration)}`"
@@ -271,11 +215,14 @@ async def generate_queue_ui(ctx: Context) -> tuple[discord.Embed, QueueControls]
description = "## 💤 Nothing is playing\nUse `/play` to start the party!" description = "## 💤 Nothing is playing\nUse `/play` to start the party!"
else: else:
# Create Hyperlink [Title](URL) # Create Hyperlink [Title](URL)
# If no URL exists, link to Discord homepage as fallback or just bold
if url and url.startswith("http"): if url and url.startswith("http"):
song_link = f"[{title}]({url})" song_link = f"[{title}]({url})"
else: else:
song_link = f"**{title}**" song_link = f"**{title}**"
# CLEARER STATUS LINE:
# Loop: Mode | Effect: Name | Vol: %
description = ( description = (
f"## 💿 Now Playing\n" f"## 💿 Now Playing\n"
f"### {song_link}\n" f"### {song_link}\n"
@@ -294,7 +241,7 @@ async def generate_queue_ui(ctx: Context) -> tuple[discord.Embed, QueueControls]
embed.add_field(name="⏳ Up Next", value=queue_text, inline=False) embed.add_field(name="⏳ Up Next", value=queue_text, inline=False)
remaining = n - 9 remaining = (n) - 9 # Approx calculation based on your grabbing logic
if remaining > 0: if remaining > 0:
embed.set_footer(text=f"Waitlist: {remaining} more songs...") embed.set_footer(text=f"Waitlist: {remaining} more songs...")
else: else:
@@ -304,38 +251,23 @@ async def generate_queue_ui(ctx: Context) -> tuple[discord.Embed, QueueControls]
if thumb and isinstance(thumb, str) and thumb.startswith("http"): if thumb and isinstance(thumb, str) and thumb.startswith("http"):
embed.set_thumbnail(url=thumb) embed.set_thumbnail(url=thumb)
elif server.icon: elif server.icon:
# Fallback to server icon
embed.set_thumbnail(url=server.icon.url) embed.set_thumbnail(url=server.icon.url)
view = QueueControls(ctx) view = QueueControls(ctx)
return embed, view return embed, view
# The command entry point calls this
async def display_server_queue(ctx: Context, songs: list, n: int) -> None: async def display_server_queue(ctx: Context, songs, n):
"""
Display the server's queue with interactive controls
Args:
ctx: Command context
songs: List of songs in queue
n: Total number of songs
"""
embed, view = await generate_queue_ui(ctx) embed, view = await generate_queue_ui(ctx)
await ctx.send(embed=embed, view=view) await ctx.send(embed=embed, view=view)
# Build a display message for queuing a new song
async def queue_message(ctx: Context, data: dict[str, Any]) -> None: async def queue_message(ctx: Context, data: dict):
"""
Display a message when a song is queued
Args:
ctx: Command context
data: Song data dictionary
"""
msg = discord.Embed( msg = discord.Embed(
title="🎵 Song Queued", title="🎵 Song Queued",
description=f"**{data['title']}**", description=f"**{data['title']}**",
color=discord.Color.green() color=discord.Color.green())
)
msg.set_thumbnail(url=data['thumbnail']) msg.set_thumbnail(url=data['thumbnail'])
msg.add_field(name="⏱️ Duration", value=format_time(data['duration']), inline=True) msg.add_field(name="⏱️ Duration", value=format_time(data['duration']), inline=True)
@@ -344,23 +276,9 @@ async def queue_message(ctx: Context, data: dict[str, Any]) -> None:
await ctx.send(embed=msg) await ctx.send(embed=msg)
# Converts seconds into more readable format
# =================================== def format_time(seconds):
# Utility Functions
# ===================================
def format_time(seconds: int | float) -> str:
"""
Convert seconds into readable time format (MM:SS or HH:MM:SS)
Args:
seconds: Time in seconds
Returns:
Formatted time string
"""
try: try:
seconds = int(seconds)
minutes, seconds = divmod(seconds, 60) minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60) hours, minutes = divmod(minutes, 60)

255
config.py
View File

@@ -1,198 +1,115 @@
# config.py # config.py
# Modern configuration management using environment variables # This file should parse all configurations within the bot
import os
import discord import discord
from discord import Color from discord import Color
from dotenv import load_dotenv import json
from typing import Optional
# Load environment variables from .env file # Read data from JSON file in ./data/config.json
load_dotenv() def read_data():
with open("./data/config.json", "r") as file:
return json.load(file)
# =================================== raise Exception("Could not load config data")
# Environment Detection
# ===================================
ENVIRONMENT = os.getenv("ENVIRONMENT", "dev").lower()
IS_PRODUCTION = ENVIRONMENT == "live"
IS_DEVELOPMENT = ENVIRONMENT == "dev"
# ===================================
# Discord Configuration
# ===================================
def get_discord_token() -> str:
"""Get the appropriate Discord token based on environment"""
if IS_PRODUCTION:
token = os.getenv("DISCORD_TOKEN_LIVE")
if not token:
raise ValueError("DISCORD_TOKEN_LIVE not found in environment!")
return token
else:
token = os.getenv("DISCORD_TOKEN_DEV")
if not token:
raise ValueError("DISCORD_TOKEN_DEV not found in environment!")
return token
def get_prefix() -> str: def get_spotify_creds():
"""Get command prefix (default: =)""" data = read_data()
return os.getenv("DISCORD_PREFIX", "=") data = data.get("spotify")
SCID = data.get("SCID")
secret = data.get("SECRET")
return SCID, secret
# =================================== # Reading prefix
# Spotify Configuration def get_prefix():
# =================================== data = read_data()
def get_spotify_creds() -> tuple[str, str]:
"""Get Spotify API credentials"""
client_id = os.getenv("SPOTIFY_CLIENT_ID")
client_secret = os.getenv("SPOTIFY_CLIENT_SECRET")
if not client_id or not client_secret: prefix = data.get('prefix')
raise ValueError("Spotify credentials not found in environment!") if prefix:
return prefix
return client_id, client_secret raise Exception("Missing config data: prefix")
# =================================== # Fetch the bot secret token
# Database Configuration def get_login(bot):
# =================================== data = read_data()
def get_db_path() -> str: if data is False or data.get(f"{bot}bot") is False:
"""Get SQLite database path""" raise Exception(f"Missing config data: {bot}bot")
return os.getenv("DB_PATH", "./data/music.db")
data = data.get(f"{bot}bot")
return data.get("secret")
# Future PostgreSQL config # Read the status and text data
def get_postgres_url() -> Optional[str]: def get_status():
"""Get PostgreSQL connection URL (for future migration)""" data = read_data()
host = os.getenv("DB_HOST")
port = os.getenv("DB_PORT", "5432")
name = os.getenv("DB_NAME")
user = os.getenv("DB_USER")
password = os.getenv("DB_PASSWORD")
if all([host, name, user, password]): if data is False or data.get('status') is False:
return f"postgresql://{user}:{password}@{host}:{port}/{name}" raise Exception("Missing config data: status")
return None
# Find type
data = data.get('status')
return translate_status(
data.get('type'),
data.get('text'),
data.get('link')
)
# Get colors from colorscheme
def get_color(color):
data = read_data()
if data is False or data.get('status') is False:
raise Exception("Missing config data: color")
# Grab color
string_value = data.get("colorscheme").get(color)
hex_value = Color.from_str(string_value)
return hex_value
# =================================== # Taking JSON variables and converting them into a presence
# Bot Status/Presence # Use None url incase not provided
# =================================== def translate_status(status_type, status_text, status_url=None):
def get_status() -> discord.Activity: if status_type == "playing":
"""Get bot status/presence"""
status_type = os.getenv("STATUS_TYPE", "listening").lower()
status_text = os.getenv("STATUS_TEXT", "Zilean's Theme")
status_url = os.getenv("STATUS_URL")
return translate_status(status_type, status_text, status_url)
def translate_status(
status_type: str,
status_text: str,
status_url: Optional[str] = None
) -> discord.Activity:
"""Convert status type string to Discord Activity"""
status_map = {
"playing": discord.ActivityType.playing,
"streaming": discord.ActivityType.streaming,
"listening": discord.ActivityType.listening,
"watching": discord.ActivityType.watching,
"competing": discord.ActivityType.competing,
}
activity_type = status_map.get(status_type)
if not activity_type:
raise ValueError(f"Invalid status type: {status_type}")
# Streaming requires URL
if status_type == "streaming":
if not status_url:
raise ValueError("Streaming status requires STATUS_URL")
return discord.Activity( return discord.Activity(
type=activity_type, type=discord.ActivityType.playing,
name=status_text
)
elif status_type == "streaming":
return discord.Activity(
type=discord.ActivityType.streaming,
name=status_text, name=status_text,
url=status_url url=status_url
) )
return discord.Activity(type=activity_type, name=status_text) elif status_type == "listening":
return discord.Activity(
type=discord.ActivityType.listening,
name=status_text
)
# =================================== elif status_type == "watching":
# Color Scheme return discord.Activity(
# =================================== type=discord.ActivityType.watching,
def get_color(color_name: str) -> Color: name=status_text
"""Get color from environment (hex format)""" )
color_map = {
"primary": os.getenv("COLOR_PRIMARY", "#7289DA"),
"success": os.getenv("COLOR_SUCCESS", "#43B581"),
"error": os.getenv("COLOR_ERROR", "#F04747"),
"warning": os.getenv("COLOR_WARNING", "#FAA61A"),
}
hex_value = color_map.get(color_name.lower()) elif status_type == "competing":
if not hex_value: return discord.Activity(
# Default to Discord blurple type=discord.ActivityType.competing,
hex_value = "#7289DA" name=status_text
)
return Color.from_str(hex_value) #TODO
# Implement custom status type
else:
# =================================== raise Exception(f"Invalid status type: {status_type}")
# Logging Configuration
# ===================================
def get_log_level() -> str:
"""Get logging level from environment"""
return os.getenv("LOG_LEVEL", "INFO").upper()
# ===================================
# Legacy Support (for backward compatibility)
# ===================================
def get_login(bot: str) -> str:
"""Legacy function - maps to new get_discord_token()"""
# Ignore the 'bot' parameter, use ENVIRONMENT instead
return get_discord_token()
# ===================================
# Validation
# ===================================
def validate_config():
"""Validate that all required config is present"""
errors = []
# Check Discord token
try:
get_discord_token()
except ValueError as e:
errors.append(str(e))
# Check Spotify creds
try:
get_spotify_creds()
except ValueError as e:
errors.append(str(e))
if errors:
error_msg = "\n".join(errors)
raise ValueError(f"Configuration errors:\n{error_msg}")
print(f"✅ Configuration validated (Environment: {ENVIRONMENT})")
# ===================================
# Startup Info
# ===================================
def print_config_info():
"""Print configuration summary (without secrets!)"""
print("=" * 50)
print("🎵 Groovy-Zilean Configuration")
print("=" * 50)
print(f"Environment: {ENVIRONMENT.upper()}")
print(f"Prefix: {get_prefix()}")
print(f"Database: {get_db_path()}")
print(f"Log Level: {get_log_level()}")
print(f"Spotify: {'Configured ✅' if os.getenv('SPOTIFY_CLIENT_ID') else 'Not configured ❌'}")
print("=" * 50)

13
main.py
View File

@@ -3,16 +3,6 @@ from bot import Groovy
import config import config
import help import help
# Validate configuration before starting
try:
config.validate_config()
config.print_config_info()
except ValueError as e:
print(f"❌ Configuration Error:\n{e}")
print("\n💡 Tip: Copy .env.example to .env and fill in your values")
exit(1)
# Initialize bot with validated config
client = Groovy(command_prefix=config.get_prefix(), intents=discord.Intents.all()) client = Groovy(command_prefix=config.get_prefix(), intents=discord.Intents.all())
@client.event @client.event
@@ -35,5 +25,4 @@ async def on_voice_state_update(member, before, after):
except Exception as e: except Exception as e:
print(f"Error auto-disconnecting: {e}") print(f"Error auto-disconnecting: {e}")
# Run bot with environment-appropriate token client.run(config.get_login("live"))
client.run(config.get_discord_token())

View File

@@ -1,31 +0,0 @@
# mypy configuration for groovy-zilean
# Type checking configuration that's practical for a Discord bot
[mypy]
# Python version
python_version = 3.13
# Ignore missing imports for libraries without type stubs
# Discord.py, spotipy, yt-dlp don't have complete type stubs
ignore_missing_imports = True
# Be strict about our own code
# Start lenient, can tighten later
disallow_untyped_defs = False
check_untyped_defs = True
# Too noisy with discord.py
warn_return_any = False
warn_unused_configs = True
# Exclude patterns
exclude = venv/
# Per-module overrides
[mypy-discord.*]
ignore_missing_imports = True
[mypy-spotipy.*]
ignore_missing_imports = True
[mypy-yt_dlp.*]
ignore_missing_imports = True

View File

@@ -1,14 +1,12 @@
# Core bot framework # Core bot framework
discord.py>=2.6.4 discord.py==2.6.4
aiohttp>=3.9.0 aiohttp==3.8.4
PyNaCl>=1.5.0 PyNaCl==1.5.0
spotipy>=2.23.0 spotipy==2.23.0
# Configuration management
python-dotenv>=1.0.0
# YouTube extractor # YouTube extractor
yt-dlp>=2025.10.14 yt-dlp>=2025.10.14
# Audio metadata (if needed by yt-dlp) # System dependencies
mutagen>=1.47.0 PyAudio==0.2.13
mutagen==1.46.0