Browse Source

Merge branch '3.0'

Conflicts:
	celery/worker/__init__.py
Ask Solem 12 years ago
parent
commit
23f1a7fc17
2 changed files with 16 additions and 5 deletions
  1. 8 4
      celery/app/task.py
  2. 8 1
      celery/worker/__init__.py

+ 8 - 4
celery/app/task.py

@@ -196,11 +196,11 @@ class Task(object):
     serializer = None
 
     #: Hard time limit.
-    #: Defaults to the :setting:`CELERY_TASK_TIME_LIMIT` setting.
+    #: Defaults to the :setting:`CELERYD_TASK_TIME_LIMIT` setting.
     time_limit = None
 
     #: Soft time limit.
-    #: Defaults to the :setting:`CELERY_TASK_SOFT_TIME_LIMIT` setting.
+    #: Defaults to the :setting:`CELERYD_TASK_SOFT_TIME_LIMIT` setting.
     soft_time_limit = None
 
     #: The result store backend used for this task.
@@ -461,7 +461,8 @@ class Task(object):
             args = (self.__self__, ) + tuple(args)
 
         if conf.CELERY_ALWAYS_EAGER:
-            return self.apply(args, kwargs, task_id=task_id, **options)
+            return self.apply(args, kwargs, task_id=task_id,
+                              link=link, link_error=link_error, **options)
         options = dict(extract_exec_options(self), **options)
         options = router.route(options, self.name, args, kwargs)
 
@@ -585,7 +586,8 @@ class Task(object):
             raise ret
         return ret
 
-    def apply(self, args=None, kwargs=None, **options):
+    def apply(self, args=None, kwargs=None,
+              link=None, link_error=None, **options):
         """Execute this task locally, by blocking until the task returns.
 
         :param args: positional arguments passed on to the task.
@@ -619,6 +621,8 @@ class Task(object):
                    'is_eager': True,
                    'logfile': options.get('logfile'),
                    'loglevel': options.get('loglevel', 0),
+                   'callbacks': maybe_list(link),
+                   'errbacks': maybe_list(link_error),
                    'delivery_info': {'is_eager': True}}
         if self.accept_magic_kwargs:
             default_kwargs = {'task_name': task.name,

+ 8 - 1
celery/worker/__init__.py

@@ -104,7 +104,10 @@ class WorkController(configurated):
         self.app.loader.init_worker()
         self.on_before_init(**kwargs)
 
-        self._finalize = Finalize(self, self.stop, exitpriority=1)
+        self._finalize = [
+            Finalize(self, self.stop, exitpriority=1),
+            Finalize(self, self._send_worker_shutdown, exitpriority=10),
+        ]
         self.setup_instance(**self.prepare_args(**kwargs))
 
     def setup_instance(self, queues=None, ready_callback=None, pidfile=None,
@@ -124,6 +127,7 @@ class WorkController(configurated):
         # Options
         self.loglevel = mlevel(self.loglevel)
         self.ready_callback = ready_callback or self.on_consumer_ready
+
         # this connection is not established, only used for params
         self._conninfo = self.app.connection()
         self.use_eventloop = (
@@ -195,6 +199,9 @@ class WorkController(configurated):
     def prepare_args(self, **kwargs):
         return kwargs
 
+    def _send_worker_shutdown(self):
+        signals.worker_shutdown.send(sender=self)
+
     def start(self):
         """Starts the workers main loop."""
         try: