Code source de lgrez.blocs.console
"""lg-rez / blocs / Console d'administration (avancé)
Permet de contrôler le bot depuis la console le faisant tourner.
"""
import sys
import asyncio
import aioconsole
import asyncode
import discord
from lgrez import config
[docs]class CaptureLogs():
"""Context manager: capture the messages posted to a Discord channel.
Only capture messages posted by the bot itself: these messages
are not sent at all to Discord.
Args:
chan (discord.TextChannel): the channel to capture.
"""
def __init__(self):
"""Initialize self."""
self._chan = config.Channel.logs
self._calls = []
async def _cor(text, *args, **kwargs):
self._calls.append(text)
class _ProxyChannel(discord.TextChannel):
def __init__(self):
# We inherit TextChannel for type checks, but we
# do NOT want to create a new TextChannel object
pass
def __getattribute__(slf, attr):
if attr == "send":
return _cor
return getattr(self._chan, attr)
self.proxy = _ProxyChannel()
def __enter__(self):
"""Enter the context: start capturing."""
config.Channel.logs = self.proxy
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit the context: stop capturing."""
config.Channel.logs = self._chan
@property
def text(self):
"""str: The messages sent in the channel, joined with newlines."""
return "\n".join(self._calls)
[docs]class AdminConsole(asyncode.AsyncInteractiveConsole):
"""Python console to control LGBot from the running shell.
(subclass of :class:`asyncode.AsyncInteractiveConsole`)
Args:
locals, filename: see :class:`asyncode.AsyncInteractiveConsole`.
"""
def __init__(self, locals={}, filename="__console__"):
"""Initialize self"""
locals["_console"] = self
super().__init__(locals, filename)
[docs] async def interact(self, banner=None, exitmsg="Byebye!"):
"""Launchs the console.
(see :meth:`asyncode.AsyncInteractiveConsole.interact`)
"""
await aioconsole.aprint("\nLaunching interactive console...")
if banner is None:
base = ('Type "help", "copyright", "credits" or "license" for '
'more information.\nasync REPL: use "await" directly '
'instead of asyncio.run().')
warn = ('WARNING: Sending KeyboardInterrupt (^C) will stop '
'the whole LGBot.')
banner = f"Python {sys.version} on {sys.platform}\n{base}\n{warn}"
try:
await super().interact(banner, exitmsg)
except (SystemExit, KeyboardInterrupt, EOFError) as exc:
yn = await aioconsole.ainput("Exit LG-Rez? (y/N) ")
if yn.lower().strip() in ("o", "y", "oui", "yes"):
raise KeyboardInterrupt from None
[docs] async def write(self, data):
"""Method called on each print / repr / traceback...
Relies on :meth:`aioconsole.aprint`.
Args:
data (str): text to display.
"""
if not hasattr(sys, "ps1"):
sys.ps1 = '>>> '
if not hasattr(sys, "ps2"):
sys.ps2 = '... '
await aioconsole.aprint(data, end="")
[docs]async def run_admin_console(locals):
"""Launch the Admin console and restart it if crashing.
Args:
locals (dict): the objects accessible from the console.
"""
while True:
await asyncio.sleep(1)
cons = AdminConsole(locals=locals)
await cons.interact()
await aioconsole.aprint("Aborted, restarting admin console...")
[docs]async def execute_command(entree):
"""Execute a command.
Create a fake message in the #logs channel by the server owner,
and process it while capturing the output in the channel.
Args:
entree (str): The command to execute (must begin with the
command prefix)
Returns:
str: The messages get in response to the command execution,
joined with newlines.
"""
message = (await config.Channel.logs.history(limit=1).flatten())[0]
# On a besoin de récupérer un message, ici le dernier de #logs
message.author = config.guild.owner
message.content = entree
ctx = await config.bot.get_context(message)
_calls = []
async def _cor(text, *args, **kwargs):
_calls.append(text)
ctx.send = _cor
ctx.reply = _cor
with CaptureLogs() as cc:
await config.bot.invoke(ctx)
text = "\n".join(_calls)
if cc.text:
text += f"\n[LOGGED] > {cc.text}"
text = text.replace('"""', '\\"\\"\\"')
return f'print("""{text}""")'