import discord from discord.ext.commands.context import Context from discord.ext.commands.converter import CommandError from discord.ui import Button, View import config from . import queue import asyncio # Track last activity time for each server last_activity = {} # Joining/moving to the user's vc in a guild async def join_vc(ctx: Context): # Get the user's vc author_voice = getattr(ctx.author, "voice") if author_voice is None: # Raise exception if user is not in vc raise CommandError("User is not in voice channel") # Get user's vc vc = getattr(author_voice, "channel") if vc is None: raise CommandError("Unable to find voice channel") # Join or move to the user's vc if ctx.voice_client is None: vc = await vc.connect() else: # Safe to ignore type error for now vc = await ctx.voice_client.move_to(vc) # Update last activity last_activity[ctx.guild.id] = asyncio.get_event_loop().time() return vc # Leaving the voice channel of a user async def leave_vc(ctx: Context): # If the bot is not in a vc of this server if ctx.voice_client is None: raise CommandError("I am not in a voice channel") # if user is not in voice of the server author_voice = getattr(ctx.author, "voice") if author_voice is None: raise CommandError("You are not in a voice channel") # Make sure both bot and User are in same vc vc = ctx.voice_client.channel author_vc = getattr(author_voice, "channel") if author_vc is None or vc != author_vc: raise CommandError("You are not in this voice channel") # Clear the queue for this server await queue.clear(ctx.guild.id) # Stop any currently playing audio if ctx.voice_client.is_playing(): ctx.voice_client.stop() # Disconnect with force to ensure it actually leaves try: await ctx.voice_client.disconnect(force=True) except Exception as e: print(f"Error disconnecting: {e}") # If regular disconnect fails, try cleanup await ctx.voice_client.cleanup() # Remove from activity tracker if ctx.guild.id in last_activity: del last_activity[ctx.guild.id] # Auto-disconnect if inactive async def check_inactivity(bot): """Background task to check for inactive voice connections""" try: current_time = asyncio.get_event_loop().time() for guild_id, last_time in list(last_activity.items()): # If inactive for more than 5 minutes if current_time - last_time > 300: # 300 seconds = 5 minutes # Find the guild and voice client guild = bot.get_guild(guild_id) if guild and guild.voice_client: # Check if not playing if not guild.voice_client.is_playing(): print(f"Auto-disconnecting from {guild.name} due to inactivity") await queue.clear(guild_id) try: await guild.voice_client.disconnect(force=True) except: pass del last_activity[guild_id] except Exception as e: print(f"Error in inactivity checker: {e}") # Update activity timestamp when playing def update_activity(guild_id): """Call this when a song starts playing""" last_activity[guild_id] = asyncio.get_event_loop().time() # Interactive buttons for queue control class QueueControls(View): def __init__(self, ctx): super().__init__(timeout=None) # No timeout allows buttons to stay active longer self.ctx = ctx async def refresh_message(self, interaction: discord.Interaction): """Helper to regenerate the embed and edit the message""" try: # Generate new embed embed, view = await generate_queue_ui(self.ctx) await interaction.response.edit_message(embed=embed, view=view) except Exception as e: # Fallback if edit fails if not interaction.response.is_done(): await interaction.response.send_message("Refreshed, but something went wrong updating the display.", ephemeral=True) @discord.ui.button(label="â­ī¸ Skip", style=discord.ButtonStyle.primary) async def skip_button(self, interaction: discord.Interaction, button: Button): if interaction.user not in self.ctx.voice_client.channel.members: await interaction.response.send_message("❌ You must be in the voice channel!", ephemeral=True) return # Loop logic check loop_mode = await queue.get_loop_mode(self.ctx.guild.id) # Logic mimics the command if loop_mode == 'song': # Just restart current song effectively but here we assume standard skip behavior for button pass # Perform the skip await queue.pop(self.ctx.guild.id, True, skip_mode=True) if self.ctx.voice_client: self.ctx.voice_client.stop() # Refresh UI await self.refresh_message(interaction) @discord.ui.button(label="🔀 Shuffle", style=discord.ButtonStyle.secondary) async def shuffle_button(self, interaction: discord.Interaction, button: Button): await queue.shuffle_queue(self.ctx.guild.id) await self.refresh_message(interaction) @discord.ui.button(label="🔁 Loop", style=discord.ButtonStyle.secondary) async def loop_button(self, interaction: discord.Interaction, button: Button): current_mode = await queue.get_loop_mode(self.ctx.guild.id) new_mode = 'song' if current_mode == 'off' else ('queue' if current_mode == 'song' else 'off') await queue.set_loop_mode(self.ctx.guild.id, new_mode) await self.refresh_message(interaction) @discord.ui.button(label="đŸ—‘ī¸ Clear", style=discord.ButtonStyle.danger) async def clear_button(self, interaction: discord.Interaction, button: Button): await queue.clear(self.ctx.guild.id) if self.ctx.voice_client and self.ctx.voice_client.is_playing(): self.ctx.voice_client.stop() await self.refresh_message(interaction) @discord.ui.button(label="🔄 Refresh", style=discord.ButtonStyle.gray) async def refresh_button(self, interaction: discord.Interaction, button: Button): await self.refresh_message(interaction) async def generate_queue_ui(ctx: Context): guild_id = ctx.guild.id server = ctx.guild # Fetch all data n, songs = await queue.grab_songs(guild_id) current = await queue.get_current_song(guild_id) # Returns title, thumbnail, url loop_mode = await queue.get_loop_mode(guild_id) volume = await queue.get_volume(guild_id) effect = await queue.get_effect(guild_id) elapsed, duration, percentage = await queue.get_current_progress(guild_id) # Configs effect_emoji = queue.get_effect_emoji(effect) # Map loop mode to nicer text loop_map = { 'off': {'emoji': 'âšī¸', 'text': 'Off'}, 'song': {'emoji': '🔂', 'text': 'Song'}, 'queue': {'emoji': '🔁', 'text': 'Queue'} } loop_info = loop_map.get(loop_mode, loop_map['off']) loop_emoji = loop_info['emoji'] loop_text = loop_info['text'] # Build Embed embed = discord.Embed(color=discord.Color.from_rgb(43, 45, 49)) embed.set_author(name=f"{server.name}'s Queue", icon_url=server.icon.url if server.icon else None) # Progress Bar Logic progress_bar = "" # Only show bar if duration > 0 (prevents weird 00:00 bars) if duration > 0: bar_length = 16 filled = int((percentage / 100) * bar_length) # Ensure filled isn't bigger than length filled = min(filled, bar_length) bar_str = 'â–Ŧ' * filled + '🔘' + 'â–Ŧ' * (bar_length - filled) progress_bar = f"\n`{format_time(elapsed)}` {bar_str} `{format_time(duration)}`" # Now Playing Header title = current.get('title', 'Nothing Playing') thumb = current.get('thumbnail') url = current.get('url', '') if title == 'Nothing': description = "## 💤 Nothing is playing\nUse `/play` to start the party!" else: # Create Hyperlink [Title](URL) # If no URL exists, link to Discord homepage as fallback or just bold if url and url.startswith("http"): song_link = f"[{title}]({url})" else: song_link = f"**{title}**" # CLEARER STATUS LINE: # Loop: Mode | Effect: Name | Vol: % description = ( f"## đŸ’ŋ Now Playing\n" f"### {song_link}\n" f"{loop_emoji} **Loop: {loop_text}** | {effect_emoji} **Effect: {effect}** | 🔊 **{volume}%**" f"{progress_bar}" ) embed.description = description # Queue List if len(songs) > 0: queue_text = "" for i, song in enumerate(songs[:10]): dur = '' if isinstance(song[1], str) else f" | `{format_time(song[1])}`" queue_text += f"**{i+1}.** {song[0]}{dur}\n" embed.add_field(name="âŗ Up Next", value=queue_text, inline=False) remaining = (n) - 9 # Approx calculation based on your grabbing logic if remaining > 0: embed.set_footer(text=f"Waitlist: {remaining} more songs...") else: embed.add_field(name="âŗ Up Next", value="*The queue is empty.*") # Set Thumbnail safely if thumb and isinstance(thumb, str) and thumb.startswith("http"): embed.set_thumbnail(url=thumb) elif server.icon: # Fallback to server icon embed.set_thumbnail(url=server.icon.url) view = QueueControls(ctx) return embed, view # The command entry point calls this async def display_server_queue(ctx: Context, songs, n): embed, view = await generate_queue_ui(ctx) await ctx.send(embed=embed, view=view) # Build a display message for queuing a new song async def queue_message(ctx: Context, data: dict): msg = discord.Embed( title="đŸŽĩ Song Queued", description=f"**{data['title']}**", color=discord.Color.green()) msg.set_thumbnail(url=data['thumbnail']) msg.add_field(name="âąī¸ Duration", value=format_time(data['duration']), inline=True) msg.add_field(name="📍 Position", value=f"#{data['position']}", inline=True) msg.set_footer(text=f"Queued by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url) await ctx.send(embed=msg) # Converts seconds into more readable format def format_time(seconds): try: minutes, seconds = divmod(seconds, 60) hours, minutes = divmod(minutes, 60) if hours > 0: return f"{hours}:{minutes:02d}:{seconds:02d}" elif minutes > 0: return f"{minutes}:{seconds:02d}" else: return f"0:{seconds:02d}" except: return "Unknown"