[docs]classCeleryWorker(BaseWorkerBackend):"""Backend to start a celery worker."""def__init__(self,**server_config:Any)->None:celery_app_str=server_config.get("APP")self.app=import_string(celery_app_str)super().__init__(**server_config)
[docs]defstart_server(self,*args:str)->None:"""Start Celery Worker via the ``worker`` subcommand on the Celery app."""self.app.start(argv=["worker",*args])
[docs]classCeleryBeat(CeleryWorker):"""Backend to start a celery beat process."""
[docs]defstart_server(self,*args:str)->None:"""Start Celery beat via the ``beat`` subcommand on the Celery app."""self.app.start(argv=["beat",*args])