from datetime import timedelta
import numpy as np
from scipy import signal
from slider.beatmap import Circle, Slider, Spinner as SliderSpinner
from circleguard.mod import Mod
from circleguard.utils import KEY_MASK
from circleguard import utils
from circleguard.game_version import GameVersion
from circleguard.hitobjects import Hitobject, Spinner
from circleguard.judgment import JudgmentType, Miss, Hit
class Investigations:
# https://osu.ppy.sh/home/changelog/stable40/20190207.2
VERSION_SLIDERBUG_FIXED_STABLE = GameVersion(20190207, concrete=True)
# https://osu.ppy.sh/home/changelog/cuttingedge/20190111
VERSION_SLIDERBUG_FIXED_CUTTING_EDGE = GameVersion(20190111, concrete=True)
@staticmethod
def ur(replay, beatmap, adjusted):
"""
Calculates the ur of ``replay`` when played against ``beatmap``.
Parameters
----------
replay: :class:`~.Replay`
The replay to calculate the ur of.
beatmap: :class:`slider.beatmap.Beatmap`
The beatmap to calculate ``replay``'s ur with.
adjusted: boolean
Whether to filter outlier hits before calculating ur.
"""
# TODO cache hits in replay so we don't recalculate hits for both ur
# and hits / judgments?
hits = Investigations.hits(replay, beatmap)
diffs = [hit.error() for hit in hits]
if adjusted:
diffs = utils.filter_outliers(diffs)
return np.std(diffs) * 10
@staticmethod
def snaps(replay, max_angle, min_distance, beatmap):
"""
Calculates the angle between each set of three points (a,b,c) and finds
points where this angle is extremely acute and neither ``|ab|`` or
``|bc|`` are small.
Parameters
----------
replay: :class:`~.Replay`
The replay to investigate for aim correction.
max_angle: float
Consider only (a,b,c) where ``∠abc < max_angle``
min_distance: float
Consider only (a,b,c) where ``|ab| > min_distance`` and
``|bc| > min_distance``.
beatmap: :class:`slider.beatmap.Beatmap`
If passed, only the snaps that occur on a hitobject in this beatmap
will be returned.
Returns
-------
list[:class:`~.Snap`]
Hits where the angle was less than ``max_angle`` and the distance
was more than ``min_distance``.
Notes
-----
This does not detect correction where multiple datapoints are placed
at the correction site (which creates a small ``min_distance``).
Another possible method is to look at the ratio between the angle
and distance.
"""
# when we leave mutliple frames with the same time values, they
# sometimes get detected (falesly) as aim correction.
# TODO Worth looking into a bit more to see if we can avoid it without
# removing the frames entirely.
t, xy = Investigations.remove_duplicate_t(replay.t, replay.xy)
t = t[1:-1]
# label three consecutive points (a b c) and the vectors between them
# (ab, bc, ac)
a = xy[:-2]
b = xy[1:-1]
c = xy[2:]
ab = b - a
bc = c - b
ac = c - a
# Distance a to b, b to c, and a to c
AB = np.linalg.norm(ab, axis=1)
BC = np.linalg.norm(bc, axis=1)
AC = np.linalg.norm(ac, axis=1)
# Law of cosines, solve for beta
# AC^2 = AB^2 + BC^2 - 2 * AB * BC * cos(beta)
# cos(beta) = -(AC^2 - AB^2 - BC^2) / (2 * AB * BC)
num = -(AC ** 2 - AB ** 2 - BC ** 2)
denom = (2 * AB * BC)
# use true_divide for handling division by zero
cos_beta = np.true_divide(num, denom, out=np.full_like(num, np.nan),
where=denom!=0)
# rounding issues makes cos_beta go out of arccos' domain, so restrict
# it
cos_beta = np.clip(cos_beta, -1, 1)
beta = np.rad2deg(np.arccos(cos_beta))
min_AB_BC = np.minimum(AB, BC)
dist_mask = min_AB_BC > min_distance
# use less to avoid comparing to nan
angle_mask = np.less(beta, max_angle, where=~np.isnan(beta))
# datapoints where both distance and angle requirements are met
mask = dist_mask & angle_mask
snaps = []
for (t, xy, b, d) in zip(t[mask], b[mask], beta[mask], min_AB_BC[mask]):
# can't discard any snaps if we don't know the beatmap, so count all
# of them
if not beatmap:
snaps.append(Snap(t, b, d))
continue
hitobj = beatmap.closest_hitobject(timedelta(milliseconds=int(t)))
hitobj = Hitobject.from_slider_hitobj(hitobj, replay, beatmap)
# ignore snaps on spinners
if isinstance(hitobj, Spinner):
continue
easy = Mod.EZ in replay.mods
hard_rock = Mod.HR in replay.mods
OD = beatmap.od(easy=easy, hard_rock=hard_rock)
hitwindow = utils.hitwindow(OD)
# only count snaps that occur inside hitobjects
inside_hitobj_pos = np.linalg.norm(xy - hitobj.xy) <= hitobj.radius
inside_hitobj_t = (hitobj.t - hitwindow) < t < (hitobj.t + hitwindow)
if inside_hitobj_pos and inside_hitobj_t:
snaps.append(Snap(t, b, d))
return snaps
@staticmethod
def snaps_sam(replay_data, num_jerks, min_jerk):
"""
Calculates the jerk at each moment in the Replay, counts the number of
times it exceeds min_jerk and reports a positive if that number is over
num_jerks. Also reports all suspicious jerks and their timestamps.
WARNING
-------
Unused function. Kept for historical purposes and ease of viewing in
case we want to switch to this track of aim correction in the future,
or provide it as an alternative.
"""
# get all replay data as an array of type [(t, x, y, k)]
txyk = np.array(replay_data)
# drop keypresses
txy = txyk[:, :3]
# separate time and space
t = txy[:, 0]
xy = txy[:, 1:]
# j_x = (d/dt)^3 x
# calculated as (d/dT dT/dt)^3 x = (dT/dt)^3 (d/dT)^3 x
# (d/dT)^3 x = d(d(dx/dT)/dT)/dT
# (dT/dt)^3 = 1/(dt/dT)^3
dtdT = np.diff(t)
d3xy = np.diff(xy, axis=0, n=3)
# safely calculate the division and replace with zero if the divisor is
# zero. dtdT is sliced with 2: because differentiating drops one element
# for each order (slice (n - 1,) to (n - 3,)).
# d3xy is of shape (n - 3, 2) so dtdT is also reshaped from (n - 3,) to
# (n - 3, 1) to align the axes.
jerk = np.divide(d3xy, dtdT[2:, None] ** 3, out=np.zeros_like(d3xy),
where=dtdT[2:,None]!=0)
# take the absolute value of the jerk
jerk = np.linalg.norm(jerk, axis=1)
# create a mask of where the jerk reaches suspicious values
anomalous = jerk > min_jerk
# and retrieve and store the timestamps and the values themself
timestamps = t[3:][anomalous]
values = jerk[anomalous]
# reshape to an array of type [(t, j)]
jerks = np.vstack((timestamps, values)).T
# count the anomalies
ischeat = anomalous.sum() > num_jerks
return [jerks, ischeat]
@staticmethod
def frametime(replay):
"""
Calculates the median time between the frames of ``replay``.
Parameters
----------
replay: :class:`~.Replay`
The replay to calculate the frametime of.
Notes
-----
Median is used instead of mean to lessen the effect of outliers.
"""
frametimes = Investigations.frametimes(replay)
return np.median(frametimes)
@staticmethod
def frametimes(replay):
"""
Returns the time between each two consecutive frames in ``replay``.
Parameters
----------
replay: :class:`~.Replay`
The replay to get the frametimes of.
"""
return np.diff(replay.t)
@staticmethod
def keydown_frames(replay):
"""
Get the frames of ``replay`` which had a keydown event, and we should
consider eligible to hit a hitobject.
Parameters
----------
replay: :class:`~.Replay`
The replay to get the keydown frames of.
Returns
-------
ndarray(float, [float, float])
The keydown frames for the replay. The first float is the time of
that frame, the second and third floats are the x and y position
of the cursor at that frame.
"""
keydown_frames = []
# the keydowns for each frame. Frames are "keydown" frames if an
# additional key was pressed from the previous frame. If keys pressed
# remained the same or decreased (a key previously pressed is no longer
# pressed) from the previous frame, ``keydowns`` is zero for that frame.
keydowns = replay.keydowns
for i, keydown in enumerate(keydowns):
if keydown != 0:
keydown_frames.append([replay.t[i], replay.xy[i]])
# add a duplicate frame when 2 keys are pressed at the same time
keydowns = keydowns[keydowns != 0]
i = 0
for j in np.where(keydowns == KEY_MASK)[0]:
keydown_frames.insert(j + i + 1, keydown_frames[j + i])
i += 1
return keydown_frames
@staticmethod
def hits(replay, beatmap):
judgment = Investigations.judgments(replay, beatmap)
judgments = [j for j in judgment if isinstance(j, Hit)]
return judgments
# TODO add exception for 2b objects (>1 object at the same time) for current
# version of notelock
@staticmethod
def judgments(replay, beatmap):
"""
Determines the judgments (where hitobjs were hit or missed, and if hit
then what type of hit - a 300, 100, or 50) of the given ``replay``
played against the given ``beatmap``.
Parameters
----------
replay: :class:`~.Replay`
The replay to determine the judgments of when played against
``beatmap``.
beatmap: :class:`slider.beatmap.Beatmap`
The beatmap to determine the udgments of when ``replay`` is played
against it.
"""
game_version = replay.game_version
if not game_version.available():
# if we have no information about the version, assume it was played
# after sliderbug was fixed.
sliderbug_fixed = True
if not game_version.concrete:
# if we're only estimating the version, assume the replay was played
# on stable. if we used the cutting edge version instead, we would
# be incorrectly using logic for sliderbug being fixed for all
# replays between the cutting edge version and the stable version
# which were played on stable.
# This is wrong for cutting edge replays between those two versions
# which do not have a concrete version, but that's better than being
# wrong for stable replays between those two versions.
sliderbug_fixed = game_version >= Investigations.VERSION_SLIDERBUG_FIXED_STABLE
else:
sliderbug_fixed = game_version >= Investigations.VERSION_SLIDERBUG_FIXED_CUTTING_EDGE
easy = Mod.EZ in replay.mods
hard_rock = Mod.HR in replay.mods
hitobjs = beatmap.hit_objects(easy=easy, hard_rock=hard_rock)
OD = beatmap.od(easy=easy, hard_rock=hard_rock)
CS = beatmap.cs(easy=easy, hard_rock=hard_rock)
keydowns = Investigations.keydown_frames(replay)
judgments = []
(hw_50, hw_100, hw_300) = utils.hitwindows(OD)
hitradius = utils.hitradius(CS)
# keep track of the indices of which hitobjs have been hit so far (none
# have been hit to start). At the end we'll look through this array and
# if any index is still ``False``, we'll mark that as a miss.
hitobj_hit = np.zeros(len(hitobjs), dtype=bool)
hitobj_i = 0
keydown_i = 0
while hitobj_i < len(hitobjs) and keydown_i < len(keydowns):
hitobj = hitobjs[hitobj_i]
hitobj_t = hitobj.time.total_seconds() * 1000
hitobj_xy = [hitobj.position.x, hitobj.position.y]
keydown_t = keydowns[keydown_i][0]
keydown_xy = keydowns[keydown_i][1]
if isinstance(hitobj, Circle):
hitobj_type = 0
hitobj_end_time = hitobj_t + hw_50
elif isinstance(hitobj, Slider):
hitobj_type = 1
hitobj_end_time = hitobj.end_time.total_seconds() * 1000
else:
hitobj_type = 2
hitobj_end_time = hitobj.end_time.total_seconds() * 1000
# before sliderbug fix, notelock ended after hitwindow50
if not sliderbug_fixed:
notelock_end_time = hitobj_t + hw_50
# exception for sliders/spinners, where notelock ends after
# hitobject end time if it's earlier
if hitobj_type != 0:
notelock_end_time = min(notelock_end_time, hitobj_end_time)
# after sliderbug fix, notelock ends after hitobject end time
else:
notelock_end_time = hitobj_end_time
# apparently notelock was increased by 1ms for circles
# (from testing)
if hitobj_type == 0:
notelock_end_time += 1
# can't press on hitobjects before hitwindowmiss
if keydown_t < hitobj_t - 400:
keydown_i += 1
continue
if keydown_t <= hitobj_t - hw_50:
# pressing on a circle or slider during hitwindowmiss will cause
# a miss
if np.linalg.norm(keydown_xy - hitobj_xy) <= hitradius and hitobj_type != 2:
# sliders don't disappear after missing
# so we skip to the press_i that is after notelock_end_time
if hitobj_type == 1 and sliderbug_fixed:
while keydowns[keydown_i][0] < notelock_end_time:
keydown_i += 1
if keydown_i >= len(keydowns):
break
else:
keydown_i += 1
hitobj_i += 1
# keypress not on object, so we move to the next keypress
else:
keydown_i += 1
elif keydown_t >= notelock_end_time:
# can no longer interact with hitobject after notelock_end_time
# so we move to the next object
hitobj_i += 1
else:
if (keydown_t < hitobj_t + hw_50 and
np.linalg.norm(keydown_xy - hitobj_xy) <= hitradius and
hitobj_type != 2):
# sliderheads are always 300s even if you click early or
# late
if hitobj_type == 1:
hit_type = JudgmentType.Hit300
# TODO: should these ranges be inclusive?
elif abs(keydown_t - hitobj_t) < hw_300:
hit_type = JudgmentType.Hit300
elif abs(keydown_t - hitobj_t) < hw_100:
hit_type = JudgmentType.Hit100
elif abs(keydown_t - hitobj_t) < hw_50:
hit_type = JudgmentType.Hit50
judgment = Hit(hitobj, keydown_t, keydown_xy,
replay, beatmap, hit_type)
judgments.append(judgment)
hitobj_hit[hitobj_i] = True
# sliders don't disappear after clicking
# so we skip to the press_i that is after notelock_end_time
if hitobj_type == 1 and sliderbug_fixed:
while keydowns[keydown_i][0] < notelock_end_time:
keydown_i += 1
if keydown_i >= len(keydowns):
break
else:
keydown_i += 1
hitobj_i += 1
# keypress not on object, so we move to the next keypress
else:
keydown_i += 1
# add a Miss for each hitobj that was never hit
for i, hitobj_hit_ in enumerate(hitobj_hit):
# ignore if the hitobj is a spinner, we don't calculate judgments
# for spinners yet
if not hitobj_hit_ and not isinstance(hitobjs[i], SliderSpinner):
judgment = Miss(hitobjs[i], replay, beatmap)
judgments.append(judgment)
return judgments
@staticmethod
def similarity(replay1, replay2, method, num_chunks, mods_unknown):
"""
Compares two :class:`~.replay.Replay`\s.
Parameters
----------
replay1: :class:`~.replay.Replay`
The first replay to compare.
replay2: :class:`~.replay.Replay`
The second replay to compare.
Returns
-------
:class:`~.result.ComparisonResult`
The result of comparing ``replay1`` to ``replay2``.
"""
# perform preprocessing here as an optimization, so it is not repeated
# within different comparison algorithms. This will likely need to
# become more advanced if we add more (and different) algorithms.
# interpolation breaks when multiple frames have the same time values
# (which occurs semi frequently in replays). So filter them out
t1, xy1 = Investigations.remove_duplicate_t(replay1.t, replay1.xy)
t2, xy2 = Investigations.remove_duplicate_t(replay2.t, replay2.xy)
xy1, xy2 = Investigations.interpolate(t1, t2, xy1, xy2)
xy1, xy2 = Investigations.clean(xy1, xy2)
# kind of a dirty function with all the switching between similarity
# and correlation, but I'm not sure I can make it any cleaner
if not replay1.mods or not replay2.mods:
# first compute with no modifications
if method == "similarity":
sim1 = Investigations.compute_similarity(xy1, xy2)
if method == "correlation":
sim1 = Investigations.compute_correlation(xy1, xy2, num_chunks)
# then compute with hr applied to ``replay1``
xy1[:, 1] = 384 - xy1[:, 1]
if method == "similarity":
sim2 = Investigations.compute_similarity(xy1, xy2)
if method == "correlation":
sim2 = Investigations.compute_correlation(xy1, xy2, num_chunks)
if mods_unknown == "best":
if method == "similarity":
return min(sim1, sim2)
if method == "correlation":
return max(sim1, sim2)
if mods_unknown == "both":
return (sim1, sim2)
# flip if one but not both has HR
if (Mod.HR in replay1.mods) ^ (Mod.HR in replay2.mods):
xy1[:, 1] = 384 - xy1[:, 1]
if method == "similarity":
return Investigations.compute_similarity(xy1, xy2)
if method == "correlation":
return Investigations.compute_correlation(xy1, xy2, num_chunks)
@staticmethod
def compute_similarity(xy1, xy2):
"""
Calculates the average distance between two sets of cursor position
data.
Parameters
----------
replay1: ndarray
The first xy data to compare.
replay2: ndarray
The second xy data to compare.
Returns
-------
float
The mean distance between the two datasets.
"""
# euclidean distance
distance = xy1 - xy2
distance = (distance ** 2).sum(axis=1) ** 0.5
return distance.mean()
@staticmethod
def compute_correlation(xy1, xy2, num_chunks):
xy1 = xy1.T
xy2 = xy2.T
# section into chunks, used to reduce the effect of outlier data
# (eg. cheater inserts replay data during breaks that places them
# far away from the actual replay)
horizontal_length = xy1.shape[1] - xy1.shape[1] % num_chunks
xy1_parts = np.hsplit(xy1[:,:horizontal_length], num_chunks)
xy2_parts = np.hsplit(xy2[:,:horizontal_length], num_chunks)
correlations = []
for (xy1_part, xy2_part) in zip(xy1_parts, xy2_parts):
xy1_part -= np.mean(xy1_part)
xy2_part -= np.mean(xy2_part)
norm = np.std(xy1_part) * np.std(xy2_part) * xy1_part.size
# matrix of correlations between xy1 and xy2 at different time
# shifts
cross_corr_matrix = signal.correlate(xy1_part, xy2_part) / norm
# pick the maximum correlation, which will probably be at 0
# time shift, unless the replays have been intentionally shifted in
# time
max_corr = np.max(cross_corr_matrix)
correlations.append(max_corr)
# take the median of all the chunks to reduce the effect of outliers
return np.median(correlations)
@staticmethod
def remove_duplicate_t(t, data):
t, t_sort = np.unique(t, return_index=True)
data = data[t_sort]
return (t, data)
@staticmethod
def interpolate(t1, t2, xy1, xy2):
"""
Interpolates the xy data of the shorter replay to the longer replay.
Returns
-------
(ndarray, ndarray)
The interpolated replay data of the first and second replay
respectively.
Notes
-----
The length of the two returned arrays will be equal. This is a
(desirous) side effect of interpolating.
"""
if len(t1) > len(t2):
xy2x = np.interp(t1, t2, xy2[:, 0])
xy2y = np.interp(t1, t2, xy2[:, 1])
xy2 = np.array([xy2x, xy2y]).T
else:
xy1x = np.interp(t2, t1, xy1[:, 0])
xy1y = np.interp(t2, t1, xy1[:, 1])
xy1 = np.array([xy1x, xy1y]).T
return (xy1, xy2)
@staticmethod
def clean(xy1, xy2):
"""
Cleans the given xy data to only include indices where both coordinates
are inside the osu gameplay window (a 512 by 384 osu!pixel window).
Warnings
--------
The length of the two passed arrays must be equal.
"""
valid = np.all(([0, 0] <= xy1) & (xy1 <= [512, 384]), axis=1) & \
np.all(([0, 0] <= xy2) & (xy2 <= [512, 384]), axis=1)
xy1 = xy1[valid]
xy2 = xy2[valid]
return (xy1, xy2)
[docs]class Snap:
"""
A suspicious hit in a replay, specifically so because it snaps away from
the otherwise normal path. Snaps represent the middle frame in a set of
three replay frames (so for example ``time`` is the time of the middle
frame).
Parameters
----------
time: int
The time value of the middle datapoint, in ms. 0 represents the
beginning of the replay.
angle: float
The angle between the three datapoints.
distance: float
``min(dist_a_b, dist_b_c)`` if ``a``, ``b``, and ``c`` are three
datapoints with ``b`` being the middle one.
"""
def __init__(self, time, angle, distance):
self.time = time
self.angle = angle
self.distance = distance
def __eq__(self, other):
return (self.time == other.time and self.angle == other.angle
and self.distance == other.distance)
def __hash__(self):
return hash((self.time, self.angle, self.distance))