Code source de discord.utils

# -*- coding: utf-8 -*-

"""
The MIT License (MIT)

Copyright (c) 2015-present Rapptz

Permission is hereby granted, free of charge, to any person obtaining a
copy of this software and associated documentation files (the "Software"),
to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
"""

import array
import asyncio
import collections.abc
import unicodedata
from base64 import b64encode
from bisect import bisect_left
import datetime
import functools
from inspect import isawaitable as _isawaitable, signature as _signature
from operator import attrgetter
import json
import re
import warnings

from .errors import InvalidArgument

DISCORD_EPOCH = 1420070400000
MAX_ASYNCIO_SECONDS = 3456000

class cached_property:
    def __init__(self, function):
        self.function = function
        self.__doc__ = getattr(function, '__doc__')

    def __get__(self, instance, owner):
        if instance is None:
            return self

        value = self.function(instance)
        setattr(instance, self.function.__name__, value)

        return value

class CachedSlotProperty:
    def __init__(self, name, function):
        self.name = name
        self.function = function
        self.__doc__ = getattr(function, '__doc__')

    def __get__(self, instance, owner):
        if instance is None:
            return self

        try:
            return getattr(instance, self.name)
        except AttributeError:
            value = self.function(instance)
            setattr(instance, self.name, value)
            return value

def cached_slot_property(name):
    def decorator(func):
        return CachedSlotProperty(name, func)
    return decorator

class SequenceProxy(collections.abc.Sequence):
    """Read-only proxy of a Sequence."""
    def __init__(self, proxied):
        self.__proxied = proxied

    def __getitem__(self, idx):
        return self.__proxied[idx]

    def __len__(self):
        return len(self.__proxied)

    def __contains__(self, item):
        return item in self.__proxied

    def __iter__(self):
        return iter(self.__proxied)

    def __reversed__(self):
        return reversed(self.__proxied)

    def index(self, value, *args, **kwargs):
        return self.__proxied.index(value, *args, **kwargs)

    def count(self, value):
        return self.__proxied.count(value)

def parse_time(timestamp):
    if timestamp:
        return datetime.datetime(*map(int, re.split(r'[^\d]', timestamp.replace('+00:00', ''))))
    return None

def copy_doc(original):
    def decorator(overriden):
        overriden.__doc__ = original.__doc__
        overriden.__signature__ = _signature(original)
        return overriden
    return decorator

def deprecated(instead=None):
    def actual_decorator(func):
        @functools.wraps(func)
        def decorated(*args, **kwargs):
            warnings.simplefilter('always', DeprecationWarning) # turn off filter
            if instead:
                fmt = "{0.__name__} is deprecated, use {1} instead."
            else:
                fmt = '{0.__name__} is deprecated.'

            warnings.warn(fmt.format(func, instead), stacklevel=3, category=DeprecationWarning)
            warnings.simplefilter('default', DeprecationWarning) # reset filter
            return func(*args, **kwargs)
        return decorated
    return actual_decorator

def oauth_url(client_id, permissions=None, guild=None, redirect_uri=None, scopes=None):
    """A helper function that returns the OAuth2 URL for inviting the bot
    into guilds.

    Parameters
    -----------
    client_id: :class:`str`
        The client ID for your bot.
    permissions: :class:`~discord.Permissions`
        The permissions you're requesting. If not given then you won't be requesting any
        permissions.
    guild: :class:`~discord.Guild`
        The guild to pre-select in the authorization screen, if available.
    redirect_uri: :class:`str`
        An optional valid redirect URI.
    scopes: Iterable[:class:`str`]
        An optional valid list of scopes. Defaults to ``('bot',)``.

        .. versionadded:: 1.7

    Returns
    --------
    :class:`str`
        The OAuth2 URL for inviting the bot into guilds.
    """
    url = 'https://discord.com/oauth2/authorize?client_id={}'.format(client_id)
    url = url + '&scope=' + '+'.join(scopes or ('bot',))
    if permissions is not None:
        url = url + '&permissions=' + str(permissions.value)
    if guild is not None:
        url = url + "&guild_id=" + str(guild.id)
    if redirect_uri is not None:
        from urllib.parse import urlencode
        url = url + "&response_type=code&" + urlencode({'redirect_uri': redirect_uri})
    return url


def snowflake_time(id):
    """
    Parameters
    -----------
    id: :class:`int`
        The snowflake ID.

    Returns
    --------
    :class:`datetime.datetime`
        The creation date in UTC of a Discord snowflake ID."""
    return datetime.datetime.utcfromtimestamp(((id >> 22) + DISCORD_EPOCH) / 1000)

def time_snowflake(datetime_obj, high=False):
    """Returns a numeric snowflake pretending to be created at the given date.

    When using as the lower end of a range, use ``time_snowflake(high=False) - 1`` to be inclusive, ``high=True`` to be exclusive
    When using as the higher end of a range, use ``time_snowflake(high=True)`` + 1 to be inclusive, ``high=False`` to be exclusive

    Parameters
    -----------
    datetime_obj: :class:`datetime.datetime`
        A timezone-naive datetime object representing UTC time.
    high: :class:`bool`
        Whether or not to set the lower 22 bit to high or low.
    """
    unix_seconds = (datetime_obj - type(datetime_obj)(1970, 1, 1)).total_seconds()
    discord_millis = int(unix_seconds * 1000 - DISCORD_EPOCH)

    return (discord_millis << 22) + (2**22-1 if high else 0)

def find(predicate, seq):
    """A helper to return the first element found in the sequence
    that meets the predicate. For example: ::

        member = discord.utils.find(lambda m: m.name == 'Mighty', channel.guild.members)

    would find the first :class:`~discord.Member` whose name is 'Mighty' and return it.
    If an entry is not found, then ``None`` is returned.

    This is different from :func:`py:filter` due to the fact it stops the moment it finds
    a valid entry.

    Parameters
    -----------
    predicate
        A function that returns a boolean-like result.
    seq: iterable
        The iterable to search through.
    """

    for element in seq:
        if predicate(element):
            return element
    return None

[docs]def get(iterable, **attrs): r"""A helper that returns the first element in the iterable that meets all the traits passed in ``attrs``. This is an alternative for :func:`~discord.utils.find`. When multiple attributes are specified, they are checked using logical AND, not logical OR. Meaning they have to meet every attribute passed in and not one of them. To have a nested attribute search (i.e. search by ``x.y``) then pass in ``x__y`` as the keyword argument. If nothing is found that matches the attributes passed, then ``None`` is returned. Examples --------- Basic usage: .. code-block:: python3 member = discord.utils.get(message.guild.members, name='Foo') Multiple attribute matching: .. code-block:: python3 channel = discord.utils.get(guild.voice_channels, name='Foo', bitrate=64000) Nested attribute matching: .. code-block:: python3 channel = discord.utils.get(client.get_all_channels(), guild__name='Cool', name='general') Parameters ----------- iterable An iterable to search through. \*\*attrs Keyword arguments that denote attributes to search with. """ # global -> local _all = all attrget = attrgetter # Special case the single element call if len(attrs) == 1: k, v = attrs.popitem() pred = attrget(k.replace('__', '.')) for elem in iterable: if pred(elem) == v: return elem return None converted = [ (attrget(attr.replace('__', '.')), value) for attr, value in attrs.items() ] for elem in iterable: if _all(pred(elem) == value for pred, value in converted): return elem return None
def _unique(iterable): seen = set() adder = seen.add return [x for x in iterable if not (x in seen or adder(x))] def _get_as_snowflake(data, key): try: value = data[key] except KeyError: return None else: return value and int(value) def _get_mime_type_for_image(data): if data.startswith(b'\x89\x50\x4E\x47\x0D\x0A\x1A\x0A'): return 'image/png' elif data[0:3] == b'\xff\xd8\xff' or data[6:10] in (b'JFIF', b'Exif'): return 'image/jpeg' elif data.startswith((b'\x47\x49\x46\x38\x37\x61', b'\x47\x49\x46\x38\x39\x61')): return 'image/gif' elif data.startswith(b'RIFF') and data[8:12] == b'WEBP': return 'image/webp' else: raise InvalidArgument('Unsupported image type given') def _bytes_to_base64_data(data): fmt = 'data:{mime};base64,{data}' mime = _get_mime_type_for_image(data) b64 = b64encode(data).decode('ascii') return fmt.format(mime=mime, data=b64) def to_json(obj): return json.dumps(obj, separators=(',', ':'), ensure_ascii=True) def _parse_ratelimit_header(request, *, use_clock=False): reset_after = request.headers.get('X-Ratelimit-Reset-After') if use_clock or not reset_after: utc = datetime.timezone.utc now = datetime.datetime.now(utc) reset = datetime.datetime.fromtimestamp(float(request.headers['X-Ratelimit-Reset']), utc) return (reset - now).total_seconds() else: return float(reset_after) async def maybe_coroutine(f, *args, **kwargs): value = f(*args, **kwargs) if _isawaitable(value): return await value else: return value async def async_all(gen, *, check=_isawaitable): for elem in gen: if check(elem): elem = await elem if not elem: return False return True async def sane_wait_for(futures, *, timeout): ensured = [ asyncio.ensure_future(fut) for fut in futures ] done, pending = await asyncio.wait(ensured, timeout=timeout, return_when=asyncio.ALL_COMPLETED) if len(pending) != 0: raise asyncio.TimeoutError() return done async def sleep_until(when, result=None): """|coro| Sleep until a specified time. If the time supplied is in the past this function will yield instantly. .. versionadded:: 1.3 Parameters ----------- when: :class:`datetime.datetime` The timestamp in which to sleep until. If the datetime is naive then it is assumed to be in UTC. result: Any If provided is returned to the caller when the coroutine completes. """ if when.tzinfo is None: when = when.replace(tzinfo=datetime.timezone.utc) now = datetime.datetime.now(datetime.timezone.utc) delta = (when - now).total_seconds() while delta > MAX_ASYNCIO_SECONDS: await asyncio.sleep(MAX_ASYNCIO_SECONDS) delta -= MAX_ASYNCIO_SECONDS return await asyncio.sleep(max(delta, 0), result) def valid_icon_size(size): """Icons must be power of 2 within [16, 4096].""" return not size & (size - 1) and size in range(16, 4097) class SnowflakeList(array.array): """Internal data storage class to efficiently store a list of snowflakes. This should have the following characteristics: - Low memory usage - O(n) iteration (obviously) - O(n log n) initial creation if data is unsorted - O(log n) search and indexing - O(n) insertion """ __slots__ = () def __new__(cls, data, *, is_sorted=False): return array.array.__new__(cls, 'Q', data if is_sorted else sorted(data)) def add(self, element): i = bisect_left(self, element) self.insert(i, element) def get(self, element): i = bisect_left(self, element) return self[i] if i != len(self) and self[i] == element else None def has(self, element): i = bisect_left(self, element) return i != len(self) and self[i] == element _IS_ASCII = re.compile(r'^[\x00-\x7f]+$') def _string_width(string, *, _IS_ASCII=_IS_ASCII): """Returns string's width.""" match = _IS_ASCII.match(string) if match: return match.endpos UNICODE_WIDE_CHAR_TYPE = 'WFA' func = unicodedata.east_asian_width return sum(2 if func(char) in UNICODE_WIDE_CHAR_TYPE else 1 for char in string) def resolve_invite(invite): """ Resolves an invite from a :class:`~discord.Invite`, URL or code. Parameters ----------- invite: Union[:class:`~discord.Invite`, :class:`str`] The invite. Returns -------- :class:`str` The invite code. """ from .invite import Invite # circular import if isinstance(invite, Invite): return invite.code else: rx = r'(?:https?\:\/\/)?discord(?:\.gg|(?:app)?\.com\/invite)\/(.+)' m = re.match(rx, invite) if m: return m.group(1) return invite def resolve_template(code): """ Resolves a template code from a :class:`~discord.Template`, URL or code. .. versionadded:: 1.4 Parameters ----------- code: Union[:class:`~discord.Template`, :class:`str`] The code. Returns -------- :class:`str` The template code. """ from .template import Template # circular import if isinstance(code, Template): return code.code else: rx = r'(?:https?\:\/\/)?discord(?:\.new|(?:app)?\.com\/template)\/(.+)' m = re.match(rx, code) if m: return m.group(1) return code _MARKDOWN_ESCAPE_SUBREGEX = '|'.join(r'\{0}(?=([\s\S]*((?<!\{0})\{0})))'.format(c) for c in ('*', '`', '_', '~', '|')) _MARKDOWN_ESCAPE_COMMON = r'^>(?:>>)?\s|\[.+\]\(.+\)' _MARKDOWN_ESCAPE_REGEX = re.compile(r'(?P<markdown>%s|%s)' % (_MARKDOWN_ESCAPE_SUBREGEX, _MARKDOWN_ESCAPE_COMMON), re.MULTILINE) _URL_REGEX = r'(?P<url><[^: >]+:\/[^ >]+>|(?:https?|steam):\/\/[^\s<]+[^<.,:;\"\'\]\s])' _MARKDOWN_STOCK_REGEX = r'(?P<markdown>[_\\~|\*`]|%s)' % _MARKDOWN_ESCAPE_COMMON def remove_markdown(text, *, ignore_links=True): """A helper function that removes markdown characters. .. versionadded:: 1.7 .. note:: This function is not markdown aware and may remove meaning from the original text. For example, if the input contains ``10 * 5`` then it will be converted into ``10 5``. Parameters ----------- text: :class:`str` The text to remove markdown from. ignore_links: :class:`bool` Whether to leave links alone when removing markdown. For example, if a URL in the text contains characters such as ``_`` then it will be left alone. Defaults to ``True``. Returns -------- :class:`str` The text with the markdown special characters removed. """ def replacement(match): groupdict = match.groupdict() return groupdict.get('url', '') regex = _MARKDOWN_STOCK_REGEX if ignore_links: regex = '(?:%s|%s)' % (_URL_REGEX, regex) return re.sub(regex, replacement, text, 0, re.MULTILINE) def escape_markdown(text, *, as_needed=False, ignore_links=True): r"""A helper function that escapes Discord's markdown. Parameters ----------- text: :class:`str` The text to escape markdown from. as_needed: :class:`bool` Whether to escape the markdown characters as needed. This means that it does not escape extraneous characters if it's not necessary, e.g. ``**hello**`` is escaped into ``\*\*hello**`` instead of ``\*\*hello\*\*``. Note however that this can open you up to some clever syntax abuse. Defaults to ``False``. ignore_links: :class:`bool` Whether to leave links alone when escaping markdown. For example, if a URL in the text contains characters such as ``_`` then it will be left alone. This option is not supported with ``as_needed``. Defaults to ``True``. Returns -------- :class:`str` The text with the markdown special characters escaped with a slash. """ if not as_needed: def replacement(match): groupdict = match.groupdict() is_url = groupdict.get('url') if is_url: return is_url return '\\' + groupdict['markdown'] regex = _MARKDOWN_STOCK_REGEX if ignore_links: regex = '(?:%s|%s)' % (_URL_REGEX, regex) return re.sub(regex, replacement, text, 0, re.MULTILINE) else: text = re.sub(r'\\', r'\\\\', text) return _MARKDOWN_ESCAPE_REGEX.sub(r'\\\1', text) def escape_mentions(text): """A helper function that escapes everyone, here, role, and user mentions. .. note:: This does not include channel mentions. .. note:: For more granular control over what mentions should be escaped within messages, refer to the :class:`~discord.AllowedMentions` class. Parameters ----------- text: :class:`str` The text to escape mentions from. Returns -------- :class:`str` The text with the mentions removed. """ return re.sub(r'@(everyone|here|[!&]?[0-9]{17,20})', '@\u200b\\1', text)