From 64881f7038b007fb52e797787abe1e05d0570765 Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Tue, 5 May 2026 23:22:43 +0200 Subject: [PATCH] Made tasks independent from brokers. --- pyproject.toml | 57 +++++++++++----------- taskiq/__init__.py | 3 ++ taskiq/abc/broker.py | 69 ++++++--------------------- taskiq/decor.py | 6 ++- taskiq/kicker.py | 4 +- taskiq/task_gen.py | 110 +++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 165 insertions(+), 84 deletions(-) create mode 100644 taskiq/task_gen.py diff --git a/pyproject.toml b/pyproject.toml index ae49ea5f..64bc91fe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,23 @@ dependencies = [ ] dynamic = ["version"] +[project.urls] +"Bug Tracker" = "https://github.com/taskiq-python/taskiq/issues" +Changelog = "https://github.com/taskiq-python/taskiq/releases" +Documentation = "https://taskiq-python.github.io/" +Homepage = "https://taskiq-python.github.io/" +Repository = "https://github.com/taskiq-python/taskiq" + +[project.scripts] +taskiq = "taskiq.__main__:main" + +[project.entry-points.opentelemetry_instrumentor] +taskiq = "taskiq.instrumentation:TaskiqInstrumentor" + +[project.entry-points.taskiq_cli] +worker = "taskiq.cli.worker.cmd:WorkerCMD" +scheduler = "taskiq.cli.scheduler.cmd:SchedulerCMD" + [project.optional-dependencies] cbor = ["cbor2>=5"] metrics = ["prometheus_client>=0"] @@ -49,23 +66,6 @@ reload = ["watchdog>=4", "gitignore-parser>=0"] uv = ["uvloop>=0.16.0,<1; sys_platform != 'win32'"] zmq = ["pyzmq>=26"] -[project.entry-points.opentelemetry_instrumentor] -taskiq = "taskiq.instrumentation:TaskiqInstrumentor" - -[project.entry-points.taskiq_cli] -worker = "taskiq.cli.worker.cmd:WorkerCMD" -scheduler = "taskiq.cli.scheduler.cmd:SchedulerCMD" - -[project.scripts] -taskiq = "taskiq.__main__:main" - -[project.urls] -"Bug Tracker" = "https://github.com/taskiq-python/taskiq/issues" -Changelog = "https://github.com/taskiq-python/taskiq/releases" -Documentation = "https://taskiq-python.github.io/" -Homepage = "https://taskiq-python.github.io/" -Repository = "https://github.com/taskiq-python/taskiq" - [dependency-groups] dev = [ "black>=25.11.0", @@ -171,7 +171,8 @@ lint.ignore = [ "D100", # Missing docstring in public module "ANN401", # typing.Any are disallowed in `**kwargs "PLR0913", # Too many arguments for function call - "D106" # Missing docstring in public nested class + "D106", # Missing docstring in public nested class + "UP037" # Remove quotes from a type def ] lint.mccabe = { max-complexity = 10 } @@ -198,6 +199,16 @@ lint.mccabe = { max-complexity = 10 } "T201" # print found ] +[tool.ruff.lint.flake8-bugbear] +extend-immutable-calls = ["taskiq_dependencies.Depends", "taskiq.TaskiqDepends"] + +[tool.ruff.lint.pydocstyle] +convention = "pep257" +ignore-decorators = ["typing.overload"] + +[tool.ruff.lint.pylint] +allow-magic-value-types = ["int", "str", "float"] + [tool.tox] requires = ["tox>=4"] isolated_build = true @@ -213,13 +224,3 @@ commands = [["pytest", "-vv", "-n", "auto"]] extend-exclude = [ "docs/README.md" # because of identifier in head section ] - -[tool.ruff.lint.flake8-bugbear] -extend-immutable-calls = ["taskiq_dependencies.Depends", "taskiq.TaskiqDepends"] - -[tool.ruff.lint.pydocstyle] -convention = "pep257" -ignore-decorators = ["typing.overload"] - -[tool.ruff.lint.pylint] -allow-magic-value-types = ["int", "str", "float"] diff --git a/taskiq/__init__.py b/taskiq/__init__.py index 2414754f..3c5d3279 100644 --- a/taskiq/__init__.py +++ b/taskiq/__init__.py @@ -36,6 +36,7 @@ from taskiq.scheduler.scheduler import TaskiqScheduler from taskiq.state import TaskiqState from taskiq.task import AsyncTaskiqTask +from taskiq.task_gen import TaskiqTaskGenerator, task_gen __version__ = version("taskiq") @@ -68,8 +69,10 @@ "TaskiqResultTimeoutError", "TaskiqScheduler", "TaskiqState", + "TaskiqTaskGenerator", "ZeroMQBroker", "__version__", "async_shared_broker", "gather", + "task_gen", ] diff --git a/taskiq/abc/broker.py b/taskiq/abc/broker.py index ea2e86c0..c3b90187 100644 --- a/taskiq/abc/broker.py +++ b/taskiq/abc/broker.py @@ -20,6 +20,7 @@ from taskiq.abc.middleware import TaskiqMiddleware from taskiq.abc.serializer import TaskiqSerializer +from taskiq.task_gen import TaskiqTaskGenerator from taskiq.acks import AckableMessage from taskiq.decor import AsyncTaskiqDecoratedTask from taskiq.events import TaskiqEvents @@ -301,64 +302,23 @@ def task( # type: ignore[misc] :returns: decorator function or AsyncTaskiqDecoratedTask. """ - - def make_decorated_task( - inner_labels: dict[str, str | int], - inner_task_name: str | None = None, - ) -> Callable[ - [Callable[_FuncParams, _ReturnType]], - AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType], - ]: - def inner( - func: Callable[_FuncParams, _ReturnType], - ) -> AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]: - nonlocal inner_task_name - if inner_task_name is None: - fmodule = func.__module__ - if fmodule == "__main__": # pragma: no cover - fmodule = ".".join( - os.path.normpath(sys.argv[0]) - .removesuffix(".py") - .split(os.path.sep), - ) - fname = func.__name__ - if fname == "": - fname = f"lambda_{uuid4().hex}" - inner_task_name = f"{fmodule}:{fname}" - wrapper = wraps(func) - - sign = get_type_hints(func) - return_type = None - if "return" in sign: - return_type = sign["return"] - - decorated_task = wrapper( - self.decorator_class( - broker=self, - original_func=func, - labels=inner_labels, - task_name=inner_task_name, - return_type=return_type, # type: ignore - ), - ) - - self._register_task(decorated_task.task_name, decorated_task) # type: ignore - - return decorated_task # type: ignore - - return inner + warnings.warn( + "Tasks are not independent from brokers. " + "Use `taskiq.task` as a decorator instead.", + TaskiqDeprecationWarning, + stacklevel=2, + ) + generator = TaskiqTaskGenerator().labels(**labels).broker(self) if callable(task_name): # This is an edge case, # when decorator called without parameters. - return make_decorated_task( - inner_labels=labels or {}, - )(task_name) + return generator(task_name) - return make_decorated_task( - inner_task_name=task_name, - inner_labels=labels or {}, - ) + if task_name: + generator = generator.name(task_name) + + return generator def register_task( self, @@ -534,9 +494,10 @@ def _register_task( raise TaskBrokerMismatchError(broker=task.broker) self.local_task_registry[task_name] = task - async def __aenter__(self) -> None: + async def __aenter__(self) -> "Self": """Starts the broker as ctx manager.""" await self.startup() + return self async def __aexit__(self, *args: object, **kwargs: Any) -> None: """Shuts down the broker as ctx manager.""" diff --git a/taskiq/decor.py b/taskiq/decor.py index 8f90541c..c1bd3965 100644 --- a/taskiq/decor.py +++ b/taskiq/decor.py @@ -45,10 +45,10 @@ class AsyncTaskiqDecoratedTask(Generic[_FuncParams, _ReturnType]): def __init__( self, - broker: "AsyncBroker", task_name: str, original_func: Callable[_FuncParams, _ReturnType], labels: dict[str, Any], + broker: "AsyncBroker | None" = None, return_type: type[_ReturnType] | None = None, ) -> None: self.broker = broker @@ -230,5 +230,9 @@ def kicker(self) -> AsyncKicker[_FuncParams, _ReturnType]: return_type=self.return_type, ) + def set_broker(self, broker: "AsyncBroker") -> None: + """Set broker for the task.""" + self.broker = broker + def __repr__(self) -> str: return f"AsyncTaskiqDecoratedTask({self.task_name})" diff --git a/taskiq/kicker.py b/taskiq/kicker.py index dc113a7e..82f088de 100644 --- a/taskiq/kicker.py +++ b/taskiq/kicker.py @@ -42,10 +42,12 @@ class AsyncKicker(Generic[_FuncParams, _ReturnType]): def __init__( self, task_name: str, - broker: "AsyncBroker", + broker: "AsyncBroker | None", labels: dict[str, Any], return_type: type[_ReturnType] | None = None, ) -> None: + if broker is None: + raise RuntimeError("Broker cannot be None for AsyncKicker.") self.task_name = task_name self.broker = broker self.labels = labels diff --git a/taskiq/task_gen.py b/taskiq/task_gen.py new file mode 100644 index 00000000..3ed528c9 --- /dev/null +++ b/taskiq/task_gen.py @@ -0,0 +1,110 @@ +import os +import sys +from collections.abc import Callable +from copy import copy +from functools import wraps +from typing import TYPE_CHECKING, Any, get_type_hints +from uuid import uuid4 + +from typing_extensions import ParamSpec, Self, TypeVar + +from taskiq.decor import AsyncTaskiqDecoratedTask + +if TYPE_CHECKING: + from taskiq.abc.broker import AsyncBroker + +_FuncParams = ParamSpec("_FuncParams") +_ReturnType = TypeVar("_ReturnType") + + +class TaskiqTaskGenerator: + """Class used for task generation.""" + + def __init__(self) -> None: + self._labels: dict[str, Any] = {} + self._name: str | None = None + self._broker: "AsyncBroker | None" = None + + def name(self, name: str) -> Self: + """Set task name.""" + inst = copy(self) + self._name = name + return inst + + def labels(self, **labels: Any) -> Self: + """Set task's static labels.""" + inst = copy(self) + self._labels = labels + return inst + + def broker(self, broker: "AsyncBroker") -> Self: + """Set a broker.""" + inst = copy(self) + self._broker = broker + return inst + + @classmethod + def make_task( + cls, + task_name: str, + broker: "AsyncBroker | None", + original_func: Callable[_FuncParams, _ReturnType], + labels: dict[str, Any], + return_type: type[_ReturnType] | None = None, + ) -> AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]: + """ + Create a task out of given inputs. + + This method can be overridden to create custom task classes + with custom arguments and logic. + """ + return AsyncTaskiqDecoratedTask( + broker=broker, + task_name=task_name, + original_func=original_func, + labels=labels, + return_type=return_type, + ) + + def __call__( + self, + func: Callable[_FuncParams, _ReturnType], + ) -> AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]: + """ + Make a decorated task. + + This function is the main point for creating a task + from a raw function. + """ + task_name = self._name + if task_name is None: + fmodule = func.__module__ + if fmodule == "__main__": # pragma: no cover + fmodule = ".".join( + os.path.normpath(sys.argv[0]) + .removesuffix(".py") + .split(os.path.sep), + ) + fname = func.__name__ + if fname == "": + fname = f"lambda_{uuid4().hex}" + task_name = f"{fmodule}:{fname}" + wrapper = wraps(func) + + sign = get_type_hints(func) + return_type = None + if "return" in sign: + return_type = sign["return"] + + return wrapper( + self.make_task( + original_func=func, + labels=self._labels, + task_name=task_name, + broker=self._broker, + return_type=return_type, # type: ignore + ), + ) + + +task_gen = TaskiqTaskGenerator()