Ask Solem преди 13 години
родител
ревизия
c50789350d
променени са 7 файла, в които са добавени 35 реда и са изтрити 56 реда
  1. 3 43
      celery/app/task.py
  2. 1 1
      celery/concurrency/processes/__init__.py
  3. 22 3
      celery/task/base.py
  4. 4 4
      celery/task/trace.py
  5. 1 1
      celery/tests/tasks/test_trace.py
  6. 1 1
      celery/worker/consumer.py
  7. 3 3
      celery/worker/job.py

+ 3 - 43
celery/app/task.py

@@ -16,7 +16,6 @@ import logging
 import os
 import sys
 
-from kombu import Exchange
 from kombu.utils import cached_property
 
 from celery import current_app
@@ -168,7 +167,7 @@ class Task(object):
 
     """
     __metaclass__ = TaskType
-    __tracer__ = None
+    __trace__ = None
 
     ErrorMail = ErrorMail
     MaxRetriesExceededError = MaxRetriesExceededError
@@ -192,36 +191,6 @@ class Task(object):
     #: Deprecated and scheduled for removal in v3.0.
     accept_magic_kwargs = None
 
-    #: Destination queue.  The queue needs to exist
-    #: in :setting:`CELERY_QUEUES`.  The `routing_key`, `exchange` and
-    #: `exchange_type` attributes will be ignored if this is set.
-    queue = None
-
-    #: Overrides the apps default `routing_key` for this task.
-    routing_key = None
-
-    #: Overrides the apps default `exchange` for this task.
-    exchange = None
-
-    #: Overrides the apps default exchange type for this task.
-    exchange_type = None
-
-    #: Override the apps default delivery mode for this task.  Default is
-    #: `"persistent"`, but you can change this to `"transient"`, which means
-    #: messages will be lost if the broker is restarted.  Consult your broker
-    #: manual for any additional delivery modes.
-    delivery_mode = None
-
-    #: Mandatory message routing.
-    mandatory = False
-
-    #: Request immediate delivery.
-    immediate = False
-
-    #: Default message priority.  A number between 0 to 9, where 0 is the
-    #: highest.  Note that RabbitMQ does not support priorities.
-    priority = None
-
     #: Maximum number of retries before giving up.  If set to :const:`None`,
     #: it will **never** stop retrying.
     max_retries = 3
@@ -247,10 +216,6 @@ class Task(object):
     #: If enabled an email will be sent to :setting:`ADMINS` whenever a task
     #: of this type fails.
     send_error_emails = False
-    disable_error_emails = False                            # FIXME
-
-    #: List of exception types to send error emails for.
-    error_whitelist = ()
 
     #: The name of a serializer that are registered with
     #: :mod:`kombu.serialization.registry`.  Default is `"pickle"`.
@@ -298,16 +263,10 @@ class Task(object):
     #: Default task expiry time.
     expires = None
 
-    #: The type of task *(no longer used)*.
-    type = "regular"
-
     __bound__ = False
 
     from_config = (
-        ("exchange_type", "CELERY_DEFAULT_EXCHANGE_TYPE"),
-        ("delivery_mode", "CELERY_DEFAULT_DELIVERY_MODE"),
         ("send_error_emails", "CELERY_SEND_TASK_ERROR_EMAILS"),
-        ("error_whitelist", "CELERY_TASK_ERROR_WHITELIST"),
         ("serializer", "CELERY_TASK_SERIALIZER"),
         ("rate_limit", "CELERY_DEFAULT_RATE_LIMIT"),
         ("track_started", "CELERY_TRACK_STARTED"),
@@ -808,7 +767,8 @@ class Task(object):
         pass
 
     def send_error_email(self, context, exc, **kwargs):
-        if self.send_error_emails and not self.disable_error_emails:
+        if self.send_error_emails and \
+                not getattr(self, "disable_error_emails", None):
             self.ErrorMail(self, **kwargs).send(context, exc)
 
     def execute(self, request, pool, loglevel, logfile, **kwargs):

+ 1 - 1
celery/concurrency/processes/__init__.py

@@ -52,7 +52,7 @@ def process_initializer(app, hostname):
 
     from celery.task.trace import build_tracer
     for name, task in app.tasks.iteritems():
-        task.__tracer__ = build_tracer(name, task, app.loader, hostname)
+        task.__trace__ = build_tracer(name, task, app.loader, hostname)
     signals.worker_process_init.send(sender=None)
 
 

+ 22 - 3
celery/task/base.py

@@ -13,6 +13,8 @@
 """
 from __future__ import absolute_import
 
+from kombu import Exchange
+
 from celery import current_app
 from celery.__compat__ import class_property, reclassmethod
 from celery.app.task import Context, TaskType, Task as BaseTask  # noqa
@@ -33,6 +35,26 @@ class Task(BaseTask):
     abstract = True
     __bound__ = False
 
+    #- Deprecated compat. attributes -:
+
+    queue = None
+    routing_key = None
+    exchange = None
+    exchange_type = None
+    delivery_mode = None
+    mandatory = False
+    immediate = False
+    priority = None
+    type = "regular"
+    error_whitelist = ()
+    disable_error_emails = False
+
+    from_config = BaseTask.from_config + (
+        ("exchange_type", "CELERY_DEFAULT_EXCHANGE_TYPE"),
+        ("delivery_mode", "CELERY_DEFAULT_DELIVERY_MODE"),
+        ("error_whitelist", "CELERY_TASK_ERROR_WHITELIST"),
+    )
+
     # In old Celery the @task decorator didn't exist, so one would create
     # classes instead and use them directly (e.g. MyTask.apply_async()).
     # the use of classmethods was a hack so that it was not necessary
@@ -69,7 +91,6 @@ class Task(BaseTask):
         return self._get_app().broker_connection(
                 connect_timeout=connect_timeout)
 
-
     def get_publisher(self, connection=None, exchange=None,
             connect_timeout=None, exchange_type=None, **options):
         """Deprecated method to get the task publisher (now called producer).
@@ -91,7 +112,6 @@ class Task(BaseTask):
                 exchange=exchange and Exchange(exchange, exchange_type),
                 routing_key=self.routing_key, **options)
 
-
     @classmethod
     def get_consumer(self, connection=None, queues=None, **kwargs):
         """Deprecated method used to get consumer for the queue
@@ -107,7 +127,6 @@ class Task(BaseTask):
         return Q.TaskConsumer(connection, queues, **kwargs)
 
 
-
 class PeriodicTask(Task):
     """A periodic task is a task that adds itself to the
     :setting:`CELERYBEAT_SCHEDULE` setting."""

+ 4 - 4
celery/task/trace.py

@@ -284,15 +284,15 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
 
 def trace_task(task, uuid, args, kwargs, request=None, **opts):
     try:
-        if task.__tracer__ is None:
-            task.__tracer__ = build_tracer(task.name, task, **opts)
-        return task.__tracer__(uuid, args, kwargs, request)
+        if task.__trace__ is None:
+            task.__trace__ = build_tracer(task.name, task, **opts)
+        return task.__trace__(uuid, args, kwargs, request)
     except Exception, exc:
         return report_internal_error(task, exc), None
 
 
 def trace_task_ret(task, uuid, args, kwargs, request):
-    return _tasks[task].__tracer__(uuid, args, kwargs, request)[0]
+    return _tasks[task].__trace__(uuid, args, kwargs, request)[0]
 
 
 def eager_trace_task(task, uuid, args, kwargs, request=None, **opts):

+ 1 - 1
celery/tests/tasks/test_trace.py

@@ -70,7 +70,7 @@ class test_trace(Case):
 
         trace_task(xtask, "uuid", (), {})
         self.assertTrue(report_internal_error.call_count)
-        self.assertIs(xtask.__tracer__, tracer)
+        self.assertIs(xtask.__trace__, tracer)
 
 
 class test_TraceInfo(Case):

+ 1 - 1
celery/worker/consumer.py

@@ -348,7 +348,7 @@ class Consumer(object):
         hostname = self.hostname
         for name, task in self.app.tasks.iteritems():
             S[name] = task.start_strategy(app, self)
-            task.__tracer__ = build_tracer(name, task, loader, hostname)
+            task.__trace__ = build_tracer(name, task, loader, hostname)
 
     def start(self):
         """Start the consumer.

+ 3 - 3
celery/worker/job.py

@@ -73,9 +73,9 @@ def execute_and_trace(name, uuid, args, kwargs, request=None, **opts):
         hostname = opts.get("hostname")
         setps("celeryd", name, hostname, rate_limit=True)
         try:
-            if task.__tracer__ is None:
-                task.__tracer__ = build_tracer(name, task, **opts)
-            return task.__tracer__(uuid, args, kwargs, request)[0]
+            if task.__trace__ is None:
+                task.__trace__ = build_tracer(name, task, **opts)
+            return task.__trace__(uuid, args, kwargs, request)[0]
         finally:
             setps("celeryd", "-idle-", hostname, rate_limit=True)
     except Exception, exc: