From f2ddb0e6ccb771095f07642a287d6e72fb081ddb Mon Sep 17 00:00:00 2001 From: TeRacksito Date: Tue, 25 Apr 2023 20:46:23 +0200 Subject: [PATCH 1/4] Adding TriggerCooldown --- cooldowns/__init__.py | 3 +- cooldowns/cooldown.py | 225 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 227 insertions(+), 1 deletion(-) diff --git a/cooldowns/__init__.py b/cooldowns/__init__.py index 7f4d2b4..311b559 100644 --- a/cooldowns/__init__.py +++ b/cooldowns/__init__.py @@ -2,7 +2,7 @@ from .buckets import CooldownBucket, SlashBucket from .protocols import CooldownBucketProtocol -from .cooldown import Cooldown, cooldown, shared_cooldown +from .cooldown import Cooldown, cooldown, shared_cooldown, TriggerCooldown from .static_cooldown import StaticCooldown, static_cooldown from .cooldown_times_per import CooldownTimesPer from .static_times_per import StaticTimesPer @@ -46,6 +46,7 @@ "StaticCooldown", "static_cooldown", "define_shared_static_cooldown", + "TriggerCooldown", ) __version__ = "1.7.0" diff --git a/cooldowns/cooldown.py b/cooldowns/cooldown.py index f59f359..65270a6 100644 --- a/cooldowns/cooldown.py +++ b/cooldowns/cooldown.py @@ -3,6 +3,7 @@ import asyncio import datetime import functools +import inspect from logging import getLogger from typing import Callable, Optional, TypeVar, Dict, Union, Type @@ -438,3 +439,227 @@ def bucket(self) -> CooldownBucketProtocol: def func(self) -> Optional[Callable]: """Returns the wrapped function.""" return self._func + + +class TriggerCooldown: + """ + Creates a trigger cooldown. + + This is useful if you want to be able to trigger a specific time_period cooldown + inside the command itself. + + TriggerCooldown creates two cooldonws in one instance: + + - Normal cooldown. The same cooldown as @cooldowns.cooldown() + - Trigger cooldown. A secondary cooldown that can only be activate + with `.trigger()` + + Parameters + ---------- + limit : `int` + How many call's can be made in the time + period specified by ``time_period``. + + time_period : `Union[float, datetime.timedelta]` + The time period related to ``limit``. This is seconds. + + bucket : `Optional[CooldownBucketProtocol], optional` + The :class:`Bucket` implementation to use + as a bucket to separate cooldown buckets. + + check : `Optional[MaybeCoro], optional` + A Callable which dictates whether + to apply the cooldown on current invoke. + + If this Callable returns a truthy value, + then the cooldown will be used for the current call. + + I.e. If you wished to bypass cooldowns, you + would return False if you invoked the Callable. + + cooldown_id: Optional[Union[int, str]] + Useful for resetting individual stacked cooldowns. + This should be unique globally, + behaviour is not guaranteed if not unique. + + .. note:: + + This check will be given the same arguments as + the item you are applying the cooldown to. + + Usage + ----- + - First create an instance of TriggerCooldown() with + the desired parameters. + + ``` + trigger_cooldown = cooldowns.TriggerCooldown(1, 5, cooldowns.SlashBucket.author) + ``` + + - Then add the instance as a decorator to your command! + + ``` + @nextcord.slash_command() + @trigger_cooldown + async def command(): + ``` + + The instance has to be defined in the same scope as the decorator! + Now, `command()` has applied a normal cooldown of `1 limit` and + `5 time_period`, as we defined it. + + - Finally, inside your command, you can `trigger` the trigger cooldown: + + ``` + async def command(): + # Do things + trigger_cooldown.trigger(30) + # You can still do things after this. + # Even you can `interaction.send()`. + ``` + + From the moment when the cooldown was triggered by `.trigger(30)`, every + single call to this command within 30 seconds will raise CallableOnCooldown! + + Raises + ------ + `RuntimeError` + Expected the decorated function to be a coroutine. + `CallableOnCooldown` + This call resulted in a cooldown being put into effect. + + + + """ + def __init__( + self, + limit: int, + time_period: Union[float, datetime.timedelta], + bucket: Optional[CooldownBucketProtocol] = None, + *, + cooldown_id: Optional[Union[int, str]] = None, + check: Optional[MaybeCoro] = default_check, + ): + + self.triggered = False + + self.limit = limit + self.time_period = time_period + self.bucket = bucket + self.cooldown_id = cooldown_id + self.check = check + + # Normal Cooldown + self.cooldown = Cooldown( + limit= self.limit, + time_period= self.time_period, + bucket= self.bucket, + cooldown_id= self.cooldown_id, + check= self.check + ) + + # Trigger Cooldown + self.trigger_cooldown = Cooldown( + limit= 1, + time_period= self.time_period, + bucket= self.bucket, + cooldown_id= self.cooldown_id, + check= self.check + ) + + if cooldown_id: + utils.shared_cooldown_refs[cooldown_id] = self.cooldown + + async def trigger(self, time_period: Union[float, datetime.timedelta]) -> None: + """|coro| + + Trigger the Trigger Cooldown instantly. Has to be awaited. + + Parameters + ---------- + time_period : `Union[float, datetime.timedelta]` + The time period that cooldwon will remain triggered. + """ + self.triggered = True + self.trigger_cooldown.time_period = ( + time_period + if isinstance(time_period, (float, int)) + else time_period.total_seconds() + ) + + # Triggers the Cooldown leaving bucket.current = 0 + frame = inspect.currentframe().f_back + _, _, _, values = inspect.getargvalues(frame) + args = tuple(values.values()) + + async with self.trigger_cooldown(*args): + return None + + + def __call__(self, func: Callable) -> Callable: + """ + + Called as a decorator. + + Parameters + ---------- + func : `Callable` + The function being decorated. + + Returns + ------- + `Callable` + Decorator + + Raises + ------ + `RuntimeError` + When given function is not coroutine. + """ + + if not asyncio.iscoroutinefunction(func): + raise RuntimeError( + f"Expected `func` to be a coroutine, " + f"found {func} of type {func.__class__.__name__!r} instead" # noqa + ) + # Links the cooldowns to the given function. + self.cooldown._func = func + self.trigger_cooldown._func = func + + @functools.wraps(func) + async def inner(*args, **kwargs): + use_cooldown = await maybe_coro(self.check, *args, **kwargs) + if not use_cooldown: + return await maybe_coro(func, *args, **kwargs) + + self_arg = None + if "self" in kwargs: + self_arg = kwargs.pop("self") + + # If the cooldown is triggered... + if self.triggered: + # If still on triggered cooldown... + if self.trigger_cooldown.remaining_calls(*args, **kwargs) < 1: + # Runs the Trigger Cooldown. + async with self.trigger_cooldown(*args, **kwargs): + if self_arg: + kwargs["self"] = self_arg + result = await func(*args, **kwargs) + else: + result = await func(*args, **kwargs) + return result + # If not, untrigger the cooldown. + else: + self.triggered = False + # If the cooldown is not triggered. + # Runs the normal Cooldown. + async with self.cooldown(*args, **kwargs): + if self_arg: + kwargs["self"] = self_arg + result = await func(*args, **kwargs) + else: + result = await func(*args, **kwargs) + return result + # Return the decorator. + return inner + From 5f43942357b3977c1177afeffff74a3d677644f3 Mon Sep 17 00:00:00 2001 From: TeRacksito Date: Fri, 28 Apr 2023 14:48:29 +0200 Subject: [PATCH 2/4] Allowing shared trigger cooldowns --- cooldowns/__init__.py | 3 +- cooldowns/cooldown.py | 225 ------------------ cooldowns/trigger_cooldown.py | 269 ++++++++++++++++++++++ docs/modules/examples.rst | 25 ++ docs/modules/objects/trigger_cooldown.rst | 9 + tests/test_trigger_cooldown.py | 75 ++++++ 6 files changed, 380 insertions(+), 226 deletions(-) create mode 100644 cooldowns/trigger_cooldown.py create mode 100644 docs/modules/objects/trigger_cooldown.rst create mode 100644 tests/test_trigger_cooldown.py diff --git a/cooldowns/__init__.py b/cooldowns/__init__.py index 311b559..157754e 100644 --- a/cooldowns/__init__.py +++ b/cooldowns/__init__.py @@ -2,7 +2,8 @@ from .buckets import CooldownBucket, SlashBucket from .protocols import CooldownBucketProtocol -from .cooldown import Cooldown, cooldown, shared_cooldown, TriggerCooldown +from .cooldown import Cooldown, cooldown, shared_cooldown +from .trigger_cooldown import TriggerCooldown from .static_cooldown import StaticCooldown, static_cooldown from .cooldown_times_per import CooldownTimesPer from .static_times_per import StaticTimesPer diff --git a/cooldowns/cooldown.py b/cooldowns/cooldown.py index 65270a6..f59f359 100644 --- a/cooldowns/cooldown.py +++ b/cooldowns/cooldown.py @@ -3,7 +3,6 @@ import asyncio import datetime import functools -import inspect from logging import getLogger from typing import Callable, Optional, TypeVar, Dict, Union, Type @@ -439,227 +438,3 @@ def bucket(self) -> CooldownBucketProtocol: def func(self) -> Optional[Callable]: """Returns the wrapped function.""" return self._func - - -class TriggerCooldown: - """ - Creates a trigger cooldown. - - This is useful if you want to be able to trigger a specific time_period cooldown - inside the command itself. - - TriggerCooldown creates two cooldonws in one instance: - - - Normal cooldown. The same cooldown as @cooldowns.cooldown() - - Trigger cooldown. A secondary cooldown that can only be activate - with `.trigger()` - - Parameters - ---------- - limit : `int` - How many call's can be made in the time - period specified by ``time_period``. - - time_period : `Union[float, datetime.timedelta]` - The time period related to ``limit``. This is seconds. - - bucket : `Optional[CooldownBucketProtocol], optional` - The :class:`Bucket` implementation to use - as a bucket to separate cooldown buckets. - - check : `Optional[MaybeCoro], optional` - A Callable which dictates whether - to apply the cooldown on current invoke. - - If this Callable returns a truthy value, - then the cooldown will be used for the current call. - - I.e. If you wished to bypass cooldowns, you - would return False if you invoked the Callable. - - cooldown_id: Optional[Union[int, str]] - Useful for resetting individual stacked cooldowns. - This should be unique globally, - behaviour is not guaranteed if not unique. - - .. note:: - - This check will be given the same arguments as - the item you are applying the cooldown to. - - Usage - ----- - - First create an instance of TriggerCooldown() with - the desired parameters. - - ``` - trigger_cooldown = cooldowns.TriggerCooldown(1, 5, cooldowns.SlashBucket.author) - ``` - - - Then add the instance as a decorator to your command! - - ``` - @nextcord.slash_command() - @trigger_cooldown - async def command(): - ``` - - The instance has to be defined in the same scope as the decorator! - Now, `command()` has applied a normal cooldown of `1 limit` and - `5 time_period`, as we defined it. - - - Finally, inside your command, you can `trigger` the trigger cooldown: - - ``` - async def command(): - # Do things - trigger_cooldown.trigger(30) - # You can still do things after this. - # Even you can `interaction.send()`. - ``` - - From the moment when the cooldown was triggered by `.trigger(30)`, every - single call to this command within 30 seconds will raise CallableOnCooldown! - - Raises - ------ - `RuntimeError` - Expected the decorated function to be a coroutine. - `CallableOnCooldown` - This call resulted in a cooldown being put into effect. - - - - """ - def __init__( - self, - limit: int, - time_period: Union[float, datetime.timedelta], - bucket: Optional[CooldownBucketProtocol] = None, - *, - cooldown_id: Optional[Union[int, str]] = None, - check: Optional[MaybeCoro] = default_check, - ): - - self.triggered = False - - self.limit = limit - self.time_period = time_period - self.bucket = bucket - self.cooldown_id = cooldown_id - self.check = check - - # Normal Cooldown - self.cooldown = Cooldown( - limit= self.limit, - time_period= self.time_period, - bucket= self.bucket, - cooldown_id= self.cooldown_id, - check= self.check - ) - - # Trigger Cooldown - self.trigger_cooldown = Cooldown( - limit= 1, - time_period= self.time_period, - bucket= self.bucket, - cooldown_id= self.cooldown_id, - check= self.check - ) - - if cooldown_id: - utils.shared_cooldown_refs[cooldown_id] = self.cooldown - - async def trigger(self, time_period: Union[float, datetime.timedelta]) -> None: - """|coro| - - Trigger the Trigger Cooldown instantly. Has to be awaited. - - Parameters - ---------- - time_period : `Union[float, datetime.timedelta]` - The time period that cooldwon will remain triggered. - """ - self.triggered = True - self.trigger_cooldown.time_period = ( - time_period - if isinstance(time_period, (float, int)) - else time_period.total_seconds() - ) - - # Triggers the Cooldown leaving bucket.current = 0 - frame = inspect.currentframe().f_back - _, _, _, values = inspect.getargvalues(frame) - args = tuple(values.values()) - - async with self.trigger_cooldown(*args): - return None - - - def __call__(self, func: Callable) -> Callable: - """ - - Called as a decorator. - - Parameters - ---------- - func : `Callable` - The function being decorated. - - Returns - ------- - `Callable` - Decorator - - Raises - ------ - `RuntimeError` - When given function is not coroutine. - """ - - if not asyncio.iscoroutinefunction(func): - raise RuntimeError( - f"Expected `func` to be a coroutine, " - f"found {func} of type {func.__class__.__name__!r} instead" # noqa - ) - # Links the cooldowns to the given function. - self.cooldown._func = func - self.trigger_cooldown._func = func - - @functools.wraps(func) - async def inner(*args, **kwargs): - use_cooldown = await maybe_coro(self.check, *args, **kwargs) - if not use_cooldown: - return await maybe_coro(func, *args, **kwargs) - - self_arg = None - if "self" in kwargs: - self_arg = kwargs.pop("self") - - # If the cooldown is triggered... - if self.triggered: - # If still on triggered cooldown... - if self.trigger_cooldown.remaining_calls(*args, **kwargs) < 1: - # Runs the Trigger Cooldown. - async with self.trigger_cooldown(*args, **kwargs): - if self_arg: - kwargs["self"] = self_arg - result = await func(*args, **kwargs) - else: - result = await func(*args, **kwargs) - return result - # If not, untrigger the cooldown. - else: - self.triggered = False - # If the cooldown is not triggered. - # Runs the normal Cooldown. - async with self.cooldown(*args, **kwargs): - if self_arg: - kwargs["self"] = self_arg - result = await func(*args, **kwargs) - else: - result = await func(*args, **kwargs) - return result - # Return the decorator. - return inner - diff --git a/cooldowns/trigger_cooldown.py b/cooldowns/trigger_cooldown.py new file mode 100644 index 0000000..de92c5c --- /dev/null +++ b/cooldowns/trigger_cooldown.py @@ -0,0 +1,269 @@ + +import asyncio +import datetime +import inspect +import functools +from typing import Callable, Optional, Union +from .cooldown import Cooldown + +from .utils import ( + MaybeCoro, + maybe_coro, + default_check, +) + +from . import utils +from .protocols import CooldownBucketProtocol + +class TriggerCooldown: + def __init__( + self, + limit: int, + time_period: Union[float, datetime.timedelta], + bucket: Optional[CooldownBucketProtocol] = None, + *, + cooldown_id: Optional[Union[int, str]] = None, + trigger_cooldown_id: Optional[Union[int, str]] = None, + check: Optional[MaybeCoro] = default_check, + ): + """ + Creates a trigger cooldown. + + This is useful if you want to be able to trigger a specific time_period cooldown + inside the command itself. + + TriggerCooldown creates two cooldonws in one instance: + + - Normal cooldown. The same cooldown as @cooldowns.cooldown() + - Trigger cooldown. A secondary cooldown that can only be activate + with `.trigger()` + + Parameters + ---------- + limit : `int` + How many call's can be made in the time + period specified by ``time_period``. + + time_period : `Union[float, datetime.timedelta]` + The time period related to ``limit``. This is seconds. + + bucket : `Optional[CooldownBucketProtocol], optional` + The :class:`Bucket` implementation to use + as a bucket to separate cooldown buckets. + + check : `Optional[MaybeCoro], optional` + A Callable which dictates whether + to apply the cooldown on current invoke. + + If this Callable returns a truthy value, + then the cooldown will be used for the current call. + + I.e. If you wished to bypass cooldowns, you + would return False if you invoked the Callable. + + cooldown_id: Optional[Union[int, str]] + Useful for resetting individual stacked cooldowns. + This should be unique globally, + behaviour is not guaranteed if not unique. + + .. note:: + + This check will be given the same arguments as + the item you are applying the cooldown to. + + Usage + ----- + - First create an instance of TriggerCooldown() with + the desired parameters. + + ``` + trigger_cooldown = cooldowns.TriggerCooldown(1, 5, cooldowns.SlashBucket.author) + ``` + + - Then add the instance as a decorator to your command! + + ``` + @nextcord.slash_command() + @trigger_cooldown + async def command(): + ``` + + The instance has to be defined in the same scope as the decorator! + Now, `command()` has applied a normal cooldown of `1 limit` and + `5 time_period`, as we defined it. + + - Finally, inside your command, you can `trigger` the trigger cooldown: + + ``` + async def command(): + # Do things + trigger_cooldown.trigger(30) + # You can still do things after this. + # Even you can `interaction.send()`. + ``` + + From the moment when the cooldown was triggered by `.trigger(30)`, every + single call to this command within 30 seconds will raise CallableOnCooldown! + + Raises + ------ + `RuntimeError` + Expected the decorated function to be a coroutine. + `CallableOnCooldown` + This call resulted in a cooldown being put into effect. + """ + + self.triggered = False + + self.limit = limit + self.time_period = time_period + self.bucket = bucket + self.cooldown_id = cooldown_id + self.trigger_cooldown_id = trigger_cooldown_id + self.check = check + + # Normal Cooldown + self.cooldown = Cooldown( + limit= self.limit, + time_period= self.time_period, + bucket= self.bucket, + cooldown_id= self.cooldown_id, + check= self.check + ) + + # Trigger Cooldown + self.trigger_cooldown = Cooldown( + limit= 1, + time_period= self.time_period, + bucket= self.bucket, + cooldown_id= self.trigger_cooldown_id, + check= self.check + ) + + if cooldown_id: + utils.shared_cooldown_refs[cooldown_id] = self.cooldown + + else: + current_cooldowns = utils.shared_cooldown_refs.keys() + for i in range(10_000): + generated_id = f"normal_cooldown_{i:02}" + if generated_id not in current_cooldowns: + utils.shared_cooldown_refs[generated_id] = self.cooldown + self.cooldown_id = generated_id + + if trigger_cooldown_id: + utils.shared_cooldown_refs[trigger_cooldown_id] = self.trigger_cooldown + + else: + current_cooldowns = utils.shared_cooldown_refs.keys() + for i in range(10_000): + generated_id = f"trigger_cooldown_{i:02}" + if generated_id not in current_cooldowns: + utils.shared_cooldown_refs[generated_id] = self.trigger_cooldown + self.trigger_cooldown_id = generated_id + + async def trigger(self, time_period: Union[float, datetime.timedelta]) -> None: + """|coro| + + Trigger the Trigger Cooldown instantly. Has to be awaited. + + Parameters + ---------- + time_period : `Union[float, datetime.timedelta]` + The time period that cooldwon will remain triggered. + """ + self.triggered = True + self.trigger_cooldown.time_period = ( + time_period + if isinstance(time_period, (float, int)) + else time_period.total_seconds() + ) + + # Triggers the Cooldown leaving bucket.current = 0 + frame = inspect.currentframe().f_back + _, _, _, values = inspect.getargvalues(frame) + args = tuple(values.values()) + + async with self.trigger_cooldown(*args): + return None + + + def __call__(self, func: Callable) -> Callable: + """ + + Called as a decorator. + + Parameters + ---------- + func : `Callable` + The function being decorated. + + Returns + ------- + `Callable` + Decorator + + Raises + ------ + `RuntimeError` + When given function is not coroutine. + """ + + _cooldown: Cooldown = utils.shared_cooldown_refs[self.cooldown_id] + _trigger_cooldown: Cooldown = utils.shared_cooldown_refs[self.trigger_cooldown_id] + + if not asyncio.iscoroutinefunction(func): + raise RuntimeError( + f"Expected `func` to be a coroutine, " + f"found {func} of type {func.__class__.__name__!r} instead" # noqa + ) + # Links the cooldowns to the given function. + _cooldown._func = func + _trigger_cooldown._func = func + + attached_cooldowns = getattr(func, "_cooldowns", []) + + if _cooldown not in attached_cooldowns: + attached_cooldowns.append(_cooldown) + + if _trigger_cooldown not in attached_cooldowns: + attached_cooldowns.append(_trigger_cooldown) + + setattr(func, "_cooldowns", attached_cooldowns) + + @functools.wraps(func) + async def inner(*args, **kwargs): + use_cooldown = await maybe_coro(self.check, *args, **kwargs) + if not use_cooldown: + return await maybe_coro(func, *args, **kwargs) + + self_arg = None + if "self" in kwargs: + self_arg = kwargs.pop("self") + + # If the cooldown is triggered... + # if self.triggered: + # If still on triggered cooldown... + if _trigger_cooldown.remaining_calls(*args, **kwargs) < 1: + # Runs the Trigger Cooldown. + async with _trigger_cooldown(*args, **kwargs): + if self_arg: + kwargs["self"] = self_arg + result = await func(*args, **kwargs) + else: + result = await func(*args, **kwargs) + return result + # If not, untrigger the cooldown. + # else: + # self.triggered = False + # If the cooldown is not triggered. + # Runs the normal Cooldown. + async with _cooldown(*args, **kwargs): + if self_arg: + kwargs["self"] = self_arg + result = await func(*args, **kwargs) + else: + result = await func(*args, **kwargs) + return result + # Return the decorator. + return inner diff --git a/docs/modules/examples.rst b/docs/modules/examples.rst index 23a9727..2cc515c 100644 --- a/docs/modules/examples.rst +++ b/docs/modules/examples.rst @@ -259,3 +259,28 @@ How to use the Cooldown object without a decorator. # This will apply the cooldown ... # Do things + +TriggerCooldown usage +------------------------ + +This is useful if you want to be able to trigger a specific +time_period cooldown inside the command itself. + +.. code-block:: python + :linenos: + + + from cooldowns import TriggerCooldown, CooldownBucket + + my_trigger_cooldown = TriggerCooldown(1, 5, CooldownBucket.all) + + @my_trigger_cooldown + async def test_1(*args, **kwargs): + # Your command. + # Do things.. + + # Apply the trigger cooldown instantly. + await my_trigger_cooldown.trigger(20) + + # You can still do things.. + # But command cannot be called again within 20 seconds. \ No newline at end of file diff --git a/docs/modules/objects/trigger_cooldown.rst b/docs/modules/objects/trigger_cooldown.rst new file mode 100644 index 0000000..dfff512 --- /dev/null +++ b/docs/modules/objects/trigger_cooldown.rst @@ -0,0 +1,9 @@ +TriggerCooldown Reference +================== + +.. currentmodule:: cooldowns + +.. autoclass:: TriggerCooldown + :members: + :undoc-members: + :special-members: __init__ diff --git a/tests/test_trigger_cooldown.py b/tests/test_trigger_cooldown.py new file mode 100644 index 0000000..9ae9939 --- /dev/null +++ b/tests/test_trigger_cooldown.py @@ -0,0 +1,75 @@ +import asyncio + +import pytest + +from cooldowns import ( + CooldownBucket, + TriggerCooldown, + get_cooldown +) +from cooldowns.exceptions import CallableOnCooldown + + +@pytest.mark.asyncio +async def test_trigger_cooldown(): + my_trigger_cooldown = TriggerCooldown(1, 0.3, CooldownBucket.all) + + @my_trigger_cooldown + async def test_1(trigger_test = False): + if trigger_test: + await my_trigger_cooldown.trigger(20) + return 1 + + assert await test_1() == 1 + + with pytest.raises(CallableOnCooldown): + await test_1() + + await asyncio.sleep(0.4) + + assert await test_1(trigger_test = True) + + await asyncio.sleep(0.4) + with pytest.raises(CallableOnCooldown): + await test_1() + +@pytest.mark.asyncio +async def test_shared_trigger_cooldown(): + my_shared_trigger_cooldown = TriggerCooldown(1, 0.3, CooldownBucket.all) + + @my_shared_trigger_cooldown + async def test_1(*args, **kwargs): + return 1 + + @my_shared_trigger_cooldown + async def test_2(*args, **kwargs): + return 2 + + assert await test_1() == 1 + assert await test_2() == 2 + + with pytest.raises(CallableOnCooldown): + await test_1() + + with pytest.raises(CallableOnCooldown): + await test_2() + +@pytest.mark.asyncio +async def test_trigger_cooldown_with_id(): + my_trigger_cooldown = TriggerCooldown(1, 0.3, CooldownBucket.all, + cooldown_id= "normal_cooldown_id", + trigger_cooldown_id= "trigger_cooldown_id") + + @my_trigger_cooldown + async def test_1(*args, **kwargs): + try: + get_cooldown(test_1, "normal_cooldown_id") + get_cooldown(test_1, "trigger_cooldown_id") + return 1 + except Exception: + return 0 + + assert await test_1() == 1 + + with pytest.raises(CallableOnCooldown): + await test_1() From db1c5519e5e26ea831b70232d443ec86e43cafe6 Mon Sep 17 00:00:00 2001 From: TeRacksito Date: Thu, 4 May 2023 14:47:47 +0200 Subject: [PATCH 3/4] Solve minor misprints. --- cooldowns/trigger_cooldown.py | 9 +++------ tests/test_trigger_cooldown.py | 1 - 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/cooldowns/trigger_cooldown.py b/cooldowns/trigger_cooldown.py index de92c5c..e8711d9 100644 --- a/cooldowns/trigger_cooldown.py +++ b/cooldowns/trigger_cooldown.py @@ -113,8 +113,6 @@ async def command(): This call resulted in a cooldown being put into effect. """ - self.triggered = False - self.limit = limit self.time_period = time_period self.bucket = bucket @@ -172,7 +170,6 @@ async def trigger(self, time_period: Union[float, datetime.timedelta]) -> None: time_period : `Union[float, datetime.timedelta]` The time period that cooldwon will remain triggered. """ - self.triggered = True self.trigger_cooldown.time_period = ( time_period if isinstance(time_period, (float, int)) @@ -217,6 +214,7 @@ def __call__(self, func: Callable) -> Callable: f"Expected `func` to be a coroutine, " f"found {func} of type {func.__class__.__name__!r} instead" # noqa ) + # Links the cooldowns to the given function. _cooldown._func = func _trigger_cooldown._func = func @@ -253,9 +251,7 @@ async def inner(*args, **kwargs): else: result = await func(*args, **kwargs) return result - # If not, untrigger the cooldown. - # else: - # self.triggered = False + # If the cooldown is not triggered. # Runs the normal Cooldown. async with _cooldown(*args, **kwargs): @@ -265,5 +261,6 @@ async def inner(*args, **kwargs): else: result = await func(*args, **kwargs) return result + # Return the decorator. return inner diff --git a/tests/test_trigger_cooldown.py b/tests/test_trigger_cooldown.py index 9ae9939..966fa0f 100644 --- a/tests/test_trigger_cooldown.py +++ b/tests/test_trigger_cooldown.py @@ -46,7 +46,6 @@ async def test_2(*args, **kwargs): return 2 assert await test_1() == 1 - assert await test_2() == 2 with pytest.raises(CallableOnCooldown): await test_1() From 49f0b850e80bbd9be993c54ea7270238346c6ce9 Mon Sep 17 00:00:00 2001 From: TeRacksito Date: Tue, 9 May 2023 13:24:04 +0200 Subject: [PATCH 4/4] Trigger cooldown fixed test --- tests/test_trigger_cooldown.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/tests/test_trigger_cooldown.py b/tests/test_trigger_cooldown.py index 966fa0f..0437122 100644 --- a/tests/test_trigger_cooldown.py +++ b/tests/test_trigger_cooldown.py @@ -15,9 +15,7 @@ async def test_trigger_cooldown(): my_trigger_cooldown = TriggerCooldown(1, 0.3, CooldownBucket.all) @my_trigger_cooldown - async def test_1(trigger_test = False): - if trigger_test: - await my_trigger_cooldown.trigger(20) + async def test_1(): return 1 assert await test_1() == 1 @@ -25,9 +23,17 @@ async def test_1(trigger_test = False): with pytest.raises(CallableOnCooldown): await test_1() - await asyncio.sleep(0.4) - assert await test_1(trigger_test = True) +@pytest.mark.asyncio +async def test_trigger_cooldown_triggering(): + my_trigger_cooldown = TriggerCooldown(1, 0.3, CooldownBucket.all) + + @my_trigger_cooldown + async def test_1(): + await my_trigger_cooldown.trigger(20) + return 1 + + assert await test_1() == 1 await asyncio.sleep(0.4) with pytest.raises(CallableOnCooldown):