Files
groovy-zilean/cogs/music/util.py

166 lines
5.3 KiB
Python

import discord
from discord.ext.commands.context import Context
from discord.ext.commands.converter import CommandError
import config
from . import queue
import asyncio
# Track last activity time for each server
last_activity = {}
# Joining/moving to the user's vc in a guild
async def join_vc(ctx: Context):
# Get the user's vc
author_voice = getattr(ctx.author, "voice")
if author_voice is None:
# Raise exception if user is not in vc
raise CommandError("User is not in voice channel")
# Get user's vc
vc = getattr(author_voice, "channel")
if vc is None:
raise CommandError("Unable to find voice channel")
# Join or move to the user's vc
if ctx.voice_client is None:
vc = await vc.connect(reconnect=True, timeout=60.0)
else:
# Safe to ignore type error for now
vc = await ctx.voice_client.move_to(vc)
# Update last activity
last_activity[ctx.guild.id] = asyncio.get_event_loop().time()
return vc
# Leaving the voice channel of a user
async def leave_vc(ctx: Context):
# If the bot is not in a vc of this server
if ctx.voice_client is None:
raise CommandError("I am not in a voice channel")
# if user is not in voice of the server
author_voice = getattr(ctx.author, "voice")
if author_voice is None:
raise CommandError("You are not in a voice channel")
# Make sure both bot and User are in same vc
vc = ctx.voice_client.channel
author_vc = getattr(author_voice, "channel")
if author_vc is None or vc != author_vc:
raise CommandError("You are not in this voice channel")
# Clear the queue for this server
await queue.clear(ctx.guild.id)
# Stop any currently playing audio
if ctx.voice_client.is_playing():
ctx.voice_client.stop()
# Disconnect with force to ensure it actually leaves
try:
await ctx.voice_client.disconnect(force=True)
except Exception as e:
print(f"Error disconnecting: {e}")
# If regular disconnect fails, try cleanup
await ctx.voice_client.cleanup()
# Remove from activity tracker
if ctx.guild.id in last_activity:
del last_activity[ctx.guild.id]
# Auto-disconnect if inactive
async def check_inactivity(bot):
"""Background task to check for inactive voice connections"""
while True:
try:
current_time = asyncio.get_event_loop().time()
for guild_id, last_time in list(last_activity.items()):
# If inactive for more than 5 minutes
if current_time - last_time > 300: # 300 seconds = 5 minutes
# Find the guild and voice client
guild = bot.get_guild(guild_id)
if guild and guild.voice_client:
# Check if not playing
if not guild.voice_client.is_playing():
print(f"Auto-disconnecting from {guild.name} due to inactivity")
await queue.clear(guild_id)
try:
await guild.voice_client.disconnect(force=True)
except:
pass
del last_activity[guild_id]
# Check every 30 seconds
await asyncio.sleep(30)
except Exception as e:
print(f"Error in inactivity checker: {e}")
await asyncio.sleep(30)
# Update activity timestamp when playing
def update_activity(guild_id):
"""Call this when a song starts playing"""
last_activity[guild_id] = asyncio.get_event_loop().time()
# Build a display message for queuing a new song
async def queue_message(ctx: Context, data: dict):
msg = discord.Embed(
title=f"{ctx.author.display_name} queued a song!",
color=config.get_color("main"))
msg.set_thumbnail(url=data['thumbnail'])
msg.add_field(name=data['title'],
value=f"Duration: {format_time(data['duration'])}" + '\n'
+ f"Position: {data['position']}")
await ctx.send(embed=msg)
# Build an embed message that shows the queue
async def display_server_queue(ctx: Context, songs, n):
server = ctx.guild
msg = discord.Embed(
title=f"{server.name}'s Queue!",
color=config.get_color("main"))
current_song = await queue.get_current_song(ctx.guild.id)
display = f"🔊 Currently playing: ``{current_song}``\n\n"
for i, song in enumerate(songs):
# If text is not avaialable do not display
time = '' if isinstance(song[1], str) else format_time(song[1])
display += f"``{i + 1}.`` {song[0]} - {time} Queued by {song[2]}\n"
msg.add_field(name="Songs:",
value=display,
inline=True)
if n > 10:
msg.set_footer(text=f"and {n - 10} more!..")
await ctx.send(embed=msg)
# Converts seconds into more readable format
def format_time(seconds):
try:
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
if hours > 0:
return f"{hours}:{minutes:02d}:{seconds:02d}"
elif minutes > 0:
return f"{minutes}:{seconds:02d}"
else:
return f"{seconds} seconds"
except:
return ""