Source code for circleguard.loader

import base64
import logging
from lzma import LZMAError
from functools import lru_cache
from pathlib import Path
import sqlite3
import wtc

import osrparse
from ossapi import Ossapi, ReplayUnavailableException

from circleguard.utils import TRACE
from circleguard.span import Span


[docs]class NoInfoAvailableException(Exception): def __init__(self): super().__init__("No info was available from the api for the given " "arguments.")
[docs]def check_cache(function): """ A decorator that checks if the passed :class:`~.ReplayInfo` has its replay cached. If so, returns a :class:`~circleguard.loadables.Replay` instance from the cached data. Otherwise, calls and returns the `function` as normal. Parameters ---------- function: callable The function to wrap. Notes ----- ``self`` and ``replay_info`` **MUST** be the first and second arguments to the function, respectively. Returns ------- :class:`~circleguard.loadables.Replay` or Unknown: A :class:`~circleguard.loadables.Replay` instance from the cached data if it was cached, or the return value of the function if not. """ def wrapper(*args, **kwargs): self = args[0] replay_info = args[1] decompressed_lzma = self._check_cache(replay_info) if not decompressed_lzma: return function(*args, **kwargs) return osrparse.parse_replay_data(decompressed_lzma, decompressed=True) return wrapper
[docs]class Loader: """ Manages interactions with the osu api, using the :mod:`ossapi` wrapper. Parameters ---------- key: str A valid api key. Can be retrieved from https://osu.ppy.sh/p/api/. cache_path: str The path to the database to use for caching. A new database will be created at this location if one doesn't exist already. |br| If ``None``, no cache will be used or created. Notes ----- If the api ratelimits the key, we wait until our ratelimits are refreshed and retry the request. Because the api does not provide the time until the next refresh (and we do not use exponential backoff or another retry strategy), if the key is ratelimited because of an interaction not managed by this class, the class may wait more time than necessary for the key to refresh. """ # the maximum number of replay info available through the respective api # calls. Note that osu! stores at least the top 1000 replays, but does not # make these discoverable unless you know the exact user id, map id, and # mods of the replay. MAX_MAP_SPAN = Span("1-100") MAX_USER_SPAN = Span("1-100") def __init__(self, key, cache_path=None, write_to_cache=True): self.api = Ossapi(key) self.log = logging.getLogger(__name__) self._conn = None self._cursor = None self.write_to_cache = write_to_cache and bool(cache_path) self.read_from_cache = bool(cache_path) if cache_path: cache_path = Path(cache_path) if not cache_path.is_file(): self._create_cache(cache_path) self._conn = sqlite3.connect(str(cache_path)) self._cursor = self._conn.cursor()
[docs] def replay_info(self, beatmap_id, span=None, user_id=None, mods=None, \ limit=True): """ Retrieves replay infos from a map's leaderboard. Parameters ---------- beatmap_id: int The map id to retrieve replay info for. span: Span A comma separated list of ranges of top replays on the map to retrieve. ``span="1-3,6,2-4"`` -> replays in the range ``[1,2,3,4,6]``. user_id: int If passed, only retrieve replay info on ``map_id`` for this user. Note that this is not necessarily limited to just the user's top score on the map. See ``limit``. mods: :class:`~.ModCombination` If passed, will only retrieve replay infos for scores that were played with the given mods. limit: bool Whether to limit to only one response. Only has an effect if ``user_id`` is passed. If ``limit`` is ``True``, will only return the top scoring replay info by ``user_id``. If ``False``, will return all scores by ``user_id``. Returns ------- list[:class:`~.ReplayInfo`] The replay infos representing the map's leaderboard. :class:`~.ReplayInfo` If ``limit`` is ``True`` and ``user_id`` is passed. Notes ----- One of ``user_id`` or ``span`` must be passed. Raises ------ NoInfoAvailableException If there is no info available for the given parameters. """ # we have to define a new variable to hold locals - otherwise when we # call it twice inside the dict comprehension, it rebinds to the comp # scope and takes on different locals. locals_ = locals() self.log.log(TRACE, "Loading replay info on map %d with options %s", beatmap_id, {k: locals_[k] for k in locals_ if k != 'self'}) if not (span or user_id): raise ValueError("One of user_id or span must be passed, but not " "both") api_limit = None if span: api_limit = max(span) mods = None if mods is None else mods.value scores = self.api.get_scores(beatmap_id, mode=0, limit=api_limit, user=user_id, mods=mods) if scores == []: # The logic below allows us to load eg # ``Map(221777, mods=Mod.SO + Mod.PF + Mod.HT)`` or some equally # absurd mod combination for which there are no replays, and have # that loading not throw ``NoInfoAvailableException``. Instead, # the map's replays list will just be empty. # However, we only want to apply this if we're loading a map, ie # ``span`` has been passed. If ``user_id`` was passed instead, raise # the exception as usual. if user_id: raise NoInfoAvailableException() # the osu! api doesn't distinguish between a map not existing, and # no scores having been set on that map for a particular mod # combination - both are empty responses which will trigger a no # info available exception. We need to figure out which case has # occurred here to determine if we should raise or not. beatmap_response = self.api.get_beatmaps(beatmap_id=beatmap_id) # If the beatmap does not exist, this response will be empty. if not beatmap_response: raise NoInfoAvailableException() # else, the empty response is ok. if span: # important: if we iterated over ``span`` instead, we would change # the order of the scores returned, since ``Span`` is an (unordered) # set. Iterate over the scores instead, which have a guaranteed # order. scores = [score for (i, score) in enumerate(scores, 1) if i in span] # limit only applies if user_id was set return scores[0] if (limit and user_id) else scores
[docs] def get_user_best(self, user_id, span, mods=None): """ Retrieves replay infos from a user's top plays. Parameters ---------- user_id: int The user id to get best plays of. span: Span A comma separated list of ranges of top plays to retrieve. ``span="1-3,6,2-4"`` -> replays in the range ``[1,2,3,4,6]``. mods: :class:`~.ModCombination` If passed, will only retrieve replay infos for scores that were played with the given mods. Returns ------- list[:class:`~.ReplayInfo`] The replay infos representing the user's top plays. """ locals_ = locals() self.log.log(TRACE, "Loading user best of %s with options %s", user_id, {k: locals_[k] for k in locals_ if k != 'self'}) scores = self.api.get_user_best(user_id, mode=0, limit=max(span)) if scores == []: raise NoInfoAvailableException() if mods: _scores = [] for score in scores: if score.mods == mods: _scores.append(score) scores = _scores # remove span indices which would cause an index error because there # weren't that many replay infos returned by the api. eg if there # were 4 responses, remove any span above 4 _span = [x for x in span if x <= len(scores)] scores = [scores[i-1] for i in _span] return scores
[docs] def load_replay_data(self, beatmap_id, user_id, mods=None): """ Retrieves replay data from the api. Parameters ---------- beatmap_id: int The map the replay was played on. user_id: int The user that played the replay. mods: :class:`~.ModCombination` The mods the replay was played with, or ``None`` for the highest scoring replay, regardless of mods. Returns ------- str The lzma-encoded string, decoded from the base 64 api response, representing the replay. None If no replay data was available. Notes ----- This is the low level implementation of :func:`~.replay_data`, handling the actual api request. """ self.log.log(TRACE, "Requesting replay data by user %d on map %d with " "mods %s", user_id, beatmap_id, mods) mods = None if mods is None else mods.value content = self.api.get_replay(beatmap_id=beatmap_id, user=user_id, mods=mods, mode=0) return base64.b64decode(content)
@check_cache def replay_data(self, replay_info, cache=None): """ Retrieves replay data from the api, or from the cache if it is already cached. Parameters ---------- replay_info: :class:`~.ReplayInfo` The replay info representing the replay to retrieve. Returns ------- list[:class:`osrparse.replay.ReplayEvent`] The replay events with attributes ``x``, ``y``, ``time_delta``, and ``keys``. None If no replay data was available. Raises ------ ReplayUnavailableException If ``user_info.replay_available` was 1, but we did not receive replay data from the api. """ user_id = replay_info.user_id beatmap_id = replay_info.beatmap_id mods = replay_info.mods if not replay_info.replay_available: self.log.debug("Replay data by user %d on map %d with mods %s not " "available", user_id, beatmap_id, mods) return None lzma_bytes = self.load_replay_data(beatmap_id, user_id, mods) # TODO can this ever be `None`? shouldn't the `base64.b64decode` call in # `self.load_replay_data` error on a `None` value? in other words, I # don't see how the decode function could ever return `None`. if lzma_bytes is None: raise ReplayUnavailableException("The api guaranteed there " "would be a replay available, but we did not receive any data.") try: replay_data = osrparse.parse_replay_data(lzma_bytes, decoded=True) # see https://github.com/circleguard/circlecore/issues/61 # api sometimes returns corrupt replays except LZMAError: self.log.warning("lzma from %r could not be decompressed, api " "returned corrupt replay", replay_info) return None if cache: self._cache(lzma_bytes, replay_info) return replay_data # TODO make this check cache for the replay
[docs] def replay_data_from_id(self, replay_id, _cache): """ Retrieves replay data from the api, given a replay id. Parameters ---------- replay_id: int The id of the replay to retrieve data for. """ content = self.api.get_replay(score_id=replay_id) replay_data = osrparse.parse_replay_data(content) # TODO cache the replay here, might require some restructuring/double # checking everything will work because we only have its id, not map # or user id. In fact I think our db asserts map and user id are nonull # so insertion into old dbs probably won't work (and we'd have to change # the schema). # TODO include a db version in the db for future scenarios like this? # look into how that's typically done, maybe just a `VERSION` table with # a single row return replay_data
[docs] @lru_cache() def beatmap_id(self, beatmap_hash): """ Retrieves a beatmap id from a corresponding beatmap hash through the api. Parameters ---------- map_hash: str The md5 hash of the map to get the id of. Returns ------- int The map id that corresponds to ``map_hash``, or ``0`` if ``map_hash`` doesn't mach any map. Notes ----- This function is wrapped in a :func:`functools.lru_cache` to prevent duplicate api calls. """ beatmaps = self.api.get_beatmaps(beatmap_hash=beatmap_hash) if beatmaps == []: return 0 return beatmaps[0].beatmap_id
# TODO remove in core 6.0.0 map_id = beatmap_id
[docs] @lru_cache() def user_id(self, username): """ Retrieves a user id from a corresponding username through the api. Parameters ---------- username: str The username of the user to get the user id of. Returns ------- int The user id that corresponds to ``username``, or ``0`` if ``username`` doesn't match any user. Notes ----- The api redirects name changes to the current username. For instance, ``user_id("cookiezi")`` will return ``124493``, despite shige's current osu! username being ``chocomint``. However, I am not sure if this behavior is as well defined when someone else takes the previous name of a user. This function is case insensitive. This function is wrapped in a :func:`functools.lru_cache` to prevent duplicate api calls. """ user = self.api.get_user(username, user_type="string") if user == []: return 0 return user.user_id
[docs] @lru_cache() def username(self, user_id): """ Retrieves the username from a corresponding user id through the api. Parameters ---------- user_id: int The user id of the user to get the username of. Returns ------- str The username that corresponds to ``user_id``, or an empty string if ``user_id`` doesn't match any user. Notes ----- This function is the inverse of :meth:`~circleguard.loader.Loader.user_id`. This function is wrapped in a :func:`functools.lru_cache` to prevent duplicate api calls. """ user = self.api.get_user(user_id, user_type="id") if user == []: return "" return user.username
def _create_cache(self, path): """ Creates a database with the necessary tables at the given path. Parameters ---------- path: str The absolute path to where the database should be created. Notes ----- This function will create directories specified in the path if they don't already exist. """ self.log.info("Cache not found at path %s, creating cache", path) # create dir if nonexistent import os if not os.path.exists(path.parent): os.makedirs(path.parent) conn = sqlite3.connect(str(path)) c = conn.cursor() c.execute( """ CREATE TABLE "REPLAYS" ( `MAP_ID` INTEGER NOT NULL, `USER_ID` INTEGER NOT NULL, `REPLAY_DATA` MEDIUMTEXT NOT NULL, `REPLAY_ID` INTEGER NOT NULL, `MODS` INTEGER NOT NULL, PRIMARY KEY(`REPLAY_ID`) )""") # create our index - this does unfortunately add some size (and # insertion time) to the db, but it's worth it to get fast lookups on # a map, user, or mods, which are all common operations. c.execute( """ CREATE INDEX `lookup_index` ON `REPLAYS` ( `MAP_ID`, `USER_ID`, `MODS` ) """) conn.close() def _cache(self, lzma_bytes, replay_info): """ Compresses and caches the given lzma_bytes to the database, linking it to the given replay_info. If an entry with the given replay info already exists, it is overwritten. Parameters ---------- lzma_bytes: str The lzma stream to compress and insert into the db. replay_info: :class:`~circleguard.loader.ReplayInfo` The ReplayInfo object representing this replay. """ if not self.write_to_cache: return compressed_bytes = wtc.compress(lzma_bytes) beatmap_id = replay_info.beatmap_id user_id = replay_info.user_id mods = replay_info.mods.value replay_id = replay_info.replay_id self.log.log(TRACE, "Writing compressed lzma to db") self._cursor.execute("INSERT INTO replays VALUES(?, ?, ?, ?, ?)", [beatmap_id, user_id, compressed_bytes, replay_id, mods]) self._conn.commit() def _check_cache(self, replay_info): """ Checks the cache for a replay matching ``replay_info``. Parameters ---------- replay_info: :class:`~circleguard.loader.ReplayInfo` The replay info to search for a matching replay with. Returns ------- str or None The replay data in decompressed lzma form if the cache contains the replay, or None if not. """ if not self.read_from_cache: return None replay_id = replay_info.replay_id self.log.log(TRACE, "Checking cache for replay info %s", replay_info) result = self._cursor.execute("SELECT replay_data FROM replays WHERE " "replay_id=?", [replay_id]).fetchone() if result: self.log.debug("Loading replay for replay info %s from cache", replay_info) return wtc.decompress(result[0], decompressed_lzma=True) self.log.log(TRACE, "No replay found in cache")