diff --git a/cooldowns/__init__.py b/cooldowns/__init__.py index 7f4d2b4..157754e 100644 --- a/cooldowns/__init__.py +++ b/cooldowns/__init__.py @@ -3,6 +3,7 @@ from .buckets import CooldownBucket, SlashBucket from .protocols import CooldownBucketProtocol from .cooldown import Cooldown, cooldown, shared_cooldown +from .trigger_cooldown import TriggerCooldown from .static_cooldown import StaticCooldown, static_cooldown from .cooldown_times_per import CooldownTimesPer from .static_times_per import StaticTimesPer @@ -46,6 +47,7 @@ "StaticCooldown", "static_cooldown", "define_shared_static_cooldown", + "TriggerCooldown", ) __version__ = "1.7.0" diff --git a/cooldowns/trigger_cooldown.py b/cooldowns/trigger_cooldown.py new file mode 100644 index 0000000..e8711d9 --- /dev/null +++ b/cooldowns/trigger_cooldown.py @@ -0,0 +1,266 @@ + +import asyncio +import datetime +import inspect +import functools +from typing import Callable, Optional, Union +from .cooldown import Cooldown + +from .utils import ( + MaybeCoro, + maybe_coro, + default_check, +) + +from . import utils +from .protocols import CooldownBucketProtocol + +class TriggerCooldown: + def __init__( + self, + limit: int, + time_period: Union[float, datetime.timedelta], + bucket: Optional[CooldownBucketProtocol] = None, + *, + cooldown_id: Optional[Union[int, str]] = None, + trigger_cooldown_id: Optional[Union[int, str]] = None, + check: Optional[MaybeCoro] = default_check, + ): + """ + Creates a trigger cooldown. + + This is useful if you want to be able to trigger a specific time_period cooldown + inside the command itself. + + TriggerCooldown creates two cooldonws in one instance: + + - Normal cooldown. The same cooldown as @cooldowns.cooldown() + - Trigger cooldown. A secondary cooldown that can only be activate + with `.trigger()` + + Parameters + ---------- + limit : `int` + How many call's can be made in the time + period specified by ``time_period``. + + time_period : `Union[float, datetime.timedelta]` + The time period related to ``limit``. This is seconds. + + bucket : `Optional[CooldownBucketProtocol], optional` + The :class:`Bucket` implementation to use + as a bucket to separate cooldown buckets. + + check : `Optional[MaybeCoro], optional` + A Callable which dictates whether + to apply the cooldown on current invoke. + + If this Callable returns a truthy value, + then the cooldown will be used for the current call. + + I.e. If you wished to bypass cooldowns, you + would return False if you invoked the Callable. + + cooldown_id: Optional[Union[int, str]] + Useful for resetting individual stacked cooldowns. + This should be unique globally, + behaviour is not guaranteed if not unique. + + .. note:: + + This check will be given the same arguments as + the item you are applying the cooldown to. + + Usage + ----- + - First create an instance of TriggerCooldown() with + the desired parameters. + + ``` + trigger_cooldown = cooldowns.TriggerCooldown(1, 5, cooldowns.SlashBucket.author) + ``` + + - Then add the instance as a decorator to your command! + + ``` + @nextcord.slash_command() + @trigger_cooldown + async def command(): + ``` + + The instance has to be defined in the same scope as the decorator! + Now, `command()` has applied a normal cooldown of `1 limit` and + `5 time_period`, as we defined it. + + - Finally, inside your command, you can `trigger` the trigger cooldown: + + ``` + async def command(): + # Do things + trigger_cooldown.trigger(30) + # You can still do things after this. + # Even you can `interaction.send()`. + ``` + + From the moment when the cooldown was triggered by `.trigger(30)`, every + single call to this command within 30 seconds will raise CallableOnCooldown! + + Raises + ------ + `RuntimeError` + Expected the decorated function to be a coroutine. + `CallableOnCooldown` + This call resulted in a cooldown being put into effect. + """ + + self.limit = limit + self.time_period = time_period + self.bucket = bucket + self.cooldown_id = cooldown_id + self.trigger_cooldown_id = trigger_cooldown_id + self.check = check + + # Normal Cooldown + self.cooldown = Cooldown( + limit= self.limit, + time_period= self.time_period, + bucket= self.bucket, + cooldown_id= self.cooldown_id, + check= self.check + ) + + # Trigger Cooldown + self.trigger_cooldown = Cooldown( + limit= 1, + time_period= self.time_period, + bucket= self.bucket, + cooldown_id= self.trigger_cooldown_id, + check= self.check + ) + + if cooldown_id: + utils.shared_cooldown_refs[cooldown_id] = self.cooldown + + else: + current_cooldowns = utils.shared_cooldown_refs.keys() + for i in range(10_000): + generated_id = f"normal_cooldown_{i:02}" + if generated_id not in current_cooldowns: + utils.shared_cooldown_refs[generated_id] = self.cooldown + self.cooldown_id = generated_id + + if trigger_cooldown_id: + utils.shared_cooldown_refs[trigger_cooldown_id] = self.trigger_cooldown + + else: + current_cooldowns = utils.shared_cooldown_refs.keys() + for i in range(10_000): + generated_id = f"trigger_cooldown_{i:02}" + if generated_id not in current_cooldowns: + utils.shared_cooldown_refs[generated_id] = self.trigger_cooldown + self.trigger_cooldown_id = generated_id + + async def trigger(self, time_period: Union[float, datetime.timedelta]) -> None: + """|coro| + + Trigger the Trigger Cooldown instantly. Has to be awaited. + + Parameters + ---------- + time_period : `Union[float, datetime.timedelta]` + The time period that cooldwon will remain triggered. + """ + self.trigger_cooldown.time_period = ( + time_period + if isinstance(time_period, (float, int)) + else time_period.total_seconds() + ) + + # Triggers the Cooldown leaving bucket.current = 0 + frame = inspect.currentframe().f_back + _, _, _, values = inspect.getargvalues(frame) + args = tuple(values.values()) + + async with self.trigger_cooldown(*args): + return None + + + def __call__(self, func: Callable) -> Callable: + """ + + Called as a decorator. + + Parameters + ---------- + func : `Callable` + The function being decorated. + + Returns + ------- + `Callable` + Decorator + + Raises + ------ + `RuntimeError` + When given function is not coroutine. + """ + + _cooldown: Cooldown = utils.shared_cooldown_refs[self.cooldown_id] + _trigger_cooldown: Cooldown = utils.shared_cooldown_refs[self.trigger_cooldown_id] + + if not asyncio.iscoroutinefunction(func): + raise RuntimeError( + f"Expected `func` to be a coroutine, " + f"found {func} of type {func.__class__.__name__!r} instead" # noqa + ) + + # Links the cooldowns to the given function. + _cooldown._func = func + _trigger_cooldown._func = func + + attached_cooldowns = getattr(func, "_cooldowns", []) + + if _cooldown not in attached_cooldowns: + attached_cooldowns.append(_cooldown) + + if _trigger_cooldown not in attached_cooldowns: + attached_cooldowns.append(_trigger_cooldown) + + setattr(func, "_cooldowns", attached_cooldowns) + + @functools.wraps(func) + async def inner(*args, **kwargs): + use_cooldown = await maybe_coro(self.check, *args, **kwargs) + if not use_cooldown: + return await maybe_coro(func, *args, **kwargs) + + self_arg = None + if "self" in kwargs: + self_arg = kwargs.pop("self") + + # If the cooldown is triggered... + # if self.triggered: + # If still on triggered cooldown... + if _trigger_cooldown.remaining_calls(*args, **kwargs) < 1: + # Runs the Trigger Cooldown. + async with _trigger_cooldown(*args, **kwargs): + if self_arg: + kwargs["self"] = self_arg + result = await func(*args, **kwargs) + else: + result = await func(*args, **kwargs) + return result + + # If the cooldown is not triggered. + # Runs the normal Cooldown. + async with _cooldown(*args, **kwargs): + if self_arg: + kwargs["self"] = self_arg + result = await func(*args, **kwargs) + else: + result = await func(*args, **kwargs) + return result + + # Return the decorator. + return inner diff --git a/docs/modules/examples.rst b/docs/modules/examples.rst index 23a9727..2cc515c 100644 --- a/docs/modules/examples.rst +++ b/docs/modules/examples.rst @@ -259,3 +259,28 @@ How to use the Cooldown object without a decorator. # This will apply the cooldown ... # Do things + +TriggerCooldown usage +------------------------ + +This is useful if you want to be able to trigger a specific +time_period cooldown inside the command itself. + +.. code-block:: python + :linenos: + + + from cooldowns import TriggerCooldown, CooldownBucket + + my_trigger_cooldown = TriggerCooldown(1, 5, CooldownBucket.all) + + @my_trigger_cooldown + async def test_1(*args, **kwargs): + # Your command. + # Do things.. + + # Apply the trigger cooldown instantly. + await my_trigger_cooldown.trigger(20) + + # You can still do things.. + # But command cannot be called again within 20 seconds. \ No newline at end of file diff --git a/docs/modules/objects/trigger_cooldown.rst b/docs/modules/objects/trigger_cooldown.rst new file mode 100644 index 0000000..dfff512 --- /dev/null +++ b/docs/modules/objects/trigger_cooldown.rst @@ -0,0 +1,9 @@ +TriggerCooldown Reference +================== + +.. currentmodule:: cooldowns + +.. autoclass:: TriggerCooldown + :members: + :undoc-members: + :special-members: __init__ diff --git a/tests/test_trigger_cooldown.py b/tests/test_trigger_cooldown.py new file mode 100644 index 0000000..0437122 --- /dev/null +++ b/tests/test_trigger_cooldown.py @@ -0,0 +1,80 @@ +import asyncio + +import pytest + +from cooldowns import ( + CooldownBucket, + TriggerCooldown, + get_cooldown +) +from cooldowns.exceptions import CallableOnCooldown + + +@pytest.mark.asyncio +async def test_trigger_cooldown(): + my_trigger_cooldown = TriggerCooldown(1, 0.3, CooldownBucket.all) + + @my_trigger_cooldown + async def test_1(): + return 1 + + assert await test_1() == 1 + + with pytest.raises(CallableOnCooldown): + await test_1() + + +@pytest.mark.asyncio +async def test_trigger_cooldown_triggering(): + my_trigger_cooldown = TriggerCooldown(1, 0.3, CooldownBucket.all) + + @my_trigger_cooldown + async def test_1(): + await my_trigger_cooldown.trigger(20) + return 1 + + assert await test_1() == 1 + + await asyncio.sleep(0.4) + with pytest.raises(CallableOnCooldown): + await test_1() + +@pytest.mark.asyncio +async def test_shared_trigger_cooldown(): + my_shared_trigger_cooldown = TriggerCooldown(1, 0.3, CooldownBucket.all) + + @my_shared_trigger_cooldown + async def test_1(*args, **kwargs): + return 1 + + @my_shared_trigger_cooldown + async def test_2(*args, **kwargs): + return 2 + + assert await test_1() == 1 + + with pytest.raises(CallableOnCooldown): + await test_1() + + with pytest.raises(CallableOnCooldown): + await test_2() + +@pytest.mark.asyncio +async def test_trigger_cooldown_with_id(): + my_trigger_cooldown = TriggerCooldown(1, 0.3, CooldownBucket.all, + cooldown_id= "normal_cooldown_id", + trigger_cooldown_id= "trigger_cooldown_id") + + @my_trigger_cooldown + async def test_1(*args, **kwargs): + try: + get_cooldown(test_1, "normal_cooldown_id") + get_cooldown(test_1, "trigger_cooldown_id") + return 1 + except Exception: + return 0 + + assert await test_1() == 1 + + with pytest.raises(CallableOnCooldown): + await test_1()