Procházet zdrojové kódy

Work with django-celery again

Ask Solem před 13 roky
rodič
revize
1262b22216

+ 4 - 2
celery/task/trace.py

@@ -28,7 +28,7 @@ from kombu.utils import kwdict
 
 from celery import current_app
 from celery import states, signals
-from celery.state import _task_stack
+from celery.state import _task_stack, default_app
 from celery.app.task import BaseTask, Context
 from celery.datastructures import ExceptionInfo
 from celery.exceptions import RetryTaskError
@@ -49,6 +49,8 @@ RETRY = states.RETRY
 FAILURE = states.FAILURE
 EXCEPTION_STATES = states.EXCEPTION_STATES
 
+_tasks = default_app._tasks
+
 
 def mro_lookup(cls, attr, stop=()):
     """Returns the first node by MRO order that defines an attribute.
@@ -290,7 +292,7 @@ def trace_task(task, uuid, args, kwargs, request=None, **opts):
 
 
 def trace_task_ret(task, uuid, args, kwargs, request):
-    return task.__tracer__(uuid, args, kwargs, request)[0]
+    return _tasks[task].__tracer__(uuid, args, kwargs, request)[0]
 
 
 def eager_trace_task(task, uuid, args, kwargs, request=None, **opts):

+ 2 - 0
celery/worker/__init__.py

@@ -33,6 +33,7 @@ from celery import platforms
 from celery.app import app_or_default, set_default_app
 from celery.app.abstract import configurated, from_config
 from celery.exceptions import SystemTerminate
+from celery.task import trace
 from celery.utils.functional import noop
 from celery.utils.imports import qualname, reload_from_cwd
 from celery.utils.log import get_logger
@@ -302,6 +303,7 @@ class WorkController(configurated):
         # and means that only a single app can be used for workers
         # running in the same process.
         set_default_app(self.app)
+        trace._tasks = self.app.tasks
 
         self._shutdown_complete = Event()
         self.setup_defaults(kwargs, namespace="celeryd")

+ 2 - 1
celery/worker/abstract.py

@@ -70,7 +70,8 @@ class Namespace(object):
         self._debug("Building boot step graph.")
         self.boot_steps = [self.bind_component(name, parent, **kwargs)
                                 for name in self._finalize_boot_steps()]
-        self._debug("New boot order: %r", [c.name for c in self.boot_steps])
+        self._debug("New boot order: {%s}",
+                ', '.join(c.name for c in self.boot_steps))
 
         for component in self.boot_steps:
             component.include(parent)

+ 1 - 1
celery/worker/job.py

@@ -222,7 +222,7 @@ class Request(object):
         request.update({"hostname": hostname, "is_eager": False,
                         "delivery_info": self.delivery_info})
         result = pool.apply_async(trace_task_ret,
-                                  args=(task, self.id,
+                                  args=(self.name, self.id,
                                         self.args, kwargs, request),
                                   accept_callback=self.on_accepted,
                                   timeout_callback=self.on_timeout,

+ 0 - 1
celery/worker/mediator.py

@@ -85,4 +85,3 @@ class Mediator(bgThread):
                          extra={"data": {"id": task.id,
                                          "name": task.name,
                                          "hostname": task.hostname}})
-    move = body   # XXX compat