import discord from discord.ext.commands.context import Context from discord.ext.commands.converter import CommandError from discord.ui import Button, View import config from . import queue import asyncio # Track last activity time for each server last_activity = {} # Joining/moving to the user's vc in a guild async def join_vc(ctx: Context): # Get the user's vc author_voice = getattr(ctx.author, "voice") if author_voice is None: # Raise exception if user is not in vc raise CommandError("User is not in voice channel") # Get user's vc vc = getattr(author_voice, "channel") if vc is None: raise CommandError("Unable to find voice channel") # Join or move to the user's vc if ctx.voice_client is None: vc = await vc.connect() else: # Safe to ignore type error for now vc = await ctx.voice_client.move_to(vc) # Update last activity last_activity[ctx.guild.id] = asyncio.get_event_loop().time() return vc # Leaving the voice channel of a user async def leave_vc(ctx: Context): # If the bot is not in a vc of this server if ctx.voice_client is None: raise CommandError("I am not in a voice channel") # if user is not in voice of the server author_voice = getattr(ctx.author, "voice") if author_voice is None: raise CommandError("You are not in a voice channel") # Make sure both bot and User are in same vc vc = ctx.voice_client.channel author_vc = getattr(author_voice, "channel") if author_vc is None or vc != author_vc: raise CommandError("You are not in this voice channel") # Clear the queue for this server await queue.clear(ctx.guild.id) # Stop any currently playing audio if ctx.voice_client.is_playing(): ctx.voice_client.stop() # Disconnect with force to ensure it actually leaves try: await ctx.voice_client.disconnect(force=True) except Exception as e: print(f"Error disconnecting: {e}") # If regular disconnect fails, try cleanup await ctx.voice_client.cleanup() # Remove from activity tracker if ctx.guild.id in last_activity: del last_activity[ctx.guild.id] # Auto-disconnect if inactive async def check_inactivity(bot): """Background task to check for inactive voice connections""" try: current_time = asyncio.get_event_loop().time() for guild_id, last_time in list(last_activity.items()): # If inactive for more than 5 minutes if current_time - last_time > 300: # 300 seconds = 5 minutes # Find the guild and voice client guild = bot.get_guild(guild_id) if guild and guild.voice_client: # Check if not playing if not guild.voice_client.is_playing(): print(f"Auto-disconnecting from {guild.name} due to inactivity") await queue.clear(guild_id) try: await guild.voice_client.disconnect(force=True) except: pass del last_activity[guild_id] except Exception as e: print(f"Error in inactivity checker: {e}") # Update activity timestamp when playing def update_activity(guild_id): """Call this when a song starts playing""" last_activity[guild_id] = asyncio.get_event_loop().time() # Interactive buttons for queue control class QueueControls(View): def __init__(self, ctx): super().__init__(timeout=300) # 5 minute timeout self.ctx = ctx @discord.ui.button(label="⏭️ Skip", style=discord.ButtonStyle.primary) async def skip_button(self, interaction: discord.Interaction, button: Button): if interaction.user != self.ctx.author: await interaction.response.send_message("❌ Only the person who requested the queue can use these buttons!", ephemeral=True) return if self.ctx.voice_client and self.ctx.voice_client.is_playing(): self.ctx.voice_client.stop() await interaction.response.send_message("⏭️ Skipped!", ephemeral=True) else: await interaction.response.send_message("❌ Nothing is playing!", ephemeral=True) @discord.ui.button(label="🔀 Shuffle", style=discord.ButtonStyle.secondary) async def shuffle_button(self, interaction: discord.Interaction, button: Button): if interaction.user != self.ctx.author: await interaction.response.send_message("❌ Only the person who requested the queue can use these buttons!", ephemeral=True) return success = await queue.shuffle_queue(self.ctx.guild.id) if success: await interaction.response.send_message("🔀 Queue shuffled!", ephemeral=True) else: await interaction.response.send_message("❌ Not enough songs to shuffle!", ephemeral=True) @discord.ui.button(label="🔁 Loop", style=discord.ButtonStyle.secondary) async def loop_button(self, interaction: discord.Interaction, button: Button): if interaction.user != self.ctx.author: await interaction.response.send_message("❌ Only the person who requested the queue can use these buttons!", ephemeral=True) return current_mode = await queue.get_loop_mode(self.ctx.guild.id) # Cycle through modes if current_mode == 'off': new_mode = 'song' elif current_mode == 'song': new_mode = 'queue' else: new_mode = 'off' await queue.set_loop_mode(self.ctx.guild.id, new_mode) emojis = {'off': '⏹️', 'song': '🔂', 'queue': '🔁'} messages = { 'off': 'Loop disabled', 'song': 'Looping current song 🔂', 'queue': 'Looping entire queue 🔁' } await interaction.response.send_message(f"{emojis[new_mode]} {messages[new_mode]}", ephemeral=True) @discord.ui.button(label="🗑️ Clear", style=discord.ButtonStyle.danger) async def clear_button(self, interaction: discord.Interaction, button: Button): if interaction.user != self.ctx.author: await interaction.response.send_message("❌ Only the person who requested the queue can use these buttons!", ephemeral=True) return await queue.clear(self.ctx.guild.id) if self.ctx.voice_client and self.ctx.voice_client.is_playing(): self.ctx.voice_client.stop() await interaction.response.send_message("🗑️ Queue cleared!", ephemeral=True) # Build a display message for queuing a new song async def queue_message(ctx: Context, data: dict): msg = discord.Embed( title="🎵 Song Queued", description=f"**{data['title']}**", color=discord.Color.green()) msg.set_thumbnail(url=data['thumbnail']) msg.add_field(name="⏱️ Duration", value=format_time(data['duration']), inline=True) msg.add_field(name="📍 Position", value=f"#{data['position']}", inline=True) msg.set_footer(text=f"Queued by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url) await ctx.send(embed=msg) # Build an embed message that shows the queue async def display_server_queue(ctx: Context, songs, n): server = ctx.guild # Get current settings current_song = await queue.get_current_song(ctx.guild.id) loop_mode = await queue.get_loop_mode(ctx.guild.id) volume = await queue.get_volume(ctx.guild.id) effect = await queue.get_effect(ctx.guild.id) elapsed, duration, percentage = await queue.get_current_progress(ctx.guild.id) # Build a beautiful embed embed = discord.Embed( title=f"🎵 {server.name}'s Queue", color=discord.Color.blue() ) # Add loop emoji based on mode loop_emojis = {'off': '', 'song': '🔂', 'queue': '🔁'} loop_emoji = loop_emojis.get(loop_mode, '') effect_emoji = queue.get_effect_emoji(effect) # Progress bar (━ for filled, ─ for empty) progress_bar = "" if duration > 0: bar_length = 15 filled = int((percentage / 100) * bar_length) progress_bar = f"\n{'━' * filled}{'─' * (bar_length - filled)} `{format_time(elapsed)} / {format_time(duration)}`" # Now playing section now_playing = f"### 🔊 Now Playing\n**{current_song}** {loop_emoji}{progress_bar}\n" embed.add_field(name="", value=now_playing, inline=False) # Settings section settings = f"🔊 Volume: **{volume}%** | 🔁 Loop: **{loop_mode}** | {effect_emoji} Effect: **{effect}**" embed.add_field(name="⚙️ Settings", value=settings, inline=False) # Queue section if len(songs) > 0: queue_text = "" for i, song in enumerate(songs[:10]): # Show max 10 time = '' if isinstance(song[1], str) else format_time(song[1]) queue_text += f"`{i + 1}.` **{song[0]}**\n└ ⏱️ {time} • Queued by {song[2]}\n" embed.add_field(name="📜 Up Next", value=queue_text, inline=False) if n > 10: embed.set_footer(text=f"...and {n - 10} more songs in queue") else: embed.add_field(name="📜 Queue", value="*Queue is empty*", inline=False) # Add thumbnail embed.set_thumbnail(url=server.icon.url if server.icon else None) # Send with interactive buttons view = QueueControls(ctx) await ctx.send(embed=embed, view=view) # Converts seconds into more readable format def format_time(seconds): try: minutes, seconds = divmod(seconds, 60) hours, minutes = divmod(minutes, 60) if hours > 0: return f"{hours}:{minutes:02d}:{seconds:02d}" elif minutes > 0: return f"{minutes}:{seconds:02d}" else: return f"0:{seconds:02d}" except: return "Unknown"