Просмотр исходного кода

Merge branch 'release21-maint'

Conflicts:
	Changelog
	celery/apps/beat.py
	celery/worker/job.py
	docs/community.rst
Ask Solem 14 лет назад
Родитель
Сommit
26f81734d2
7 измененных файлов с 202 добавлено и 15 удалено
  1. 160 1
      Changelog
  2. 5 4
      celery/events/state.py
  3. 1 0
      celery/states.py
  4. 21 5
      celery/worker/job.py
  5. 12 0
      docs/community.rst
  6. 2 3
      docs/internals/events.rst
  7. 1 2
      docs/userguide/monitoring.rst

+ 160 - 1
Changelog

@@ -11,7 +11,166 @@
 =====
 =====
 :release-date: TBA
 :release-date: TBA
 :status: in-progress
 :status: in-progress
-:branch: app
+:branch: master
+
+
+.. _version-2.1.2:
+
+2.1.2
+=====
+:release-data: TBA
+
+.. _v212-fixes:
+
+Fixes
+-----
+
+* celeryd: Now sends the ``task-retried`` event for retried tasks.
+
+* celeryd: Now honors ignore result for
+  :exc:`~celery.exceptions.WorkerLostError` and timeout errors.
+
+* celerybeat: Fixed :exc:`UnboundLocalError` in celerybeat logging
+  when using logging setup signals.
+
+* celeryd: All log messages now includes ``exc_info``.
+
+.. _version-2.1.1:
+
+2.1.1
+=====
+:release-date: 2010-10-14 14:00 PM CEST
+
+.. _v211-fixes:
+
+Fixes
+-----
+
+* Now working on Windows again.
+
+   Removed dependency on the pwd/grp modules.
+
+* snapshots: Fixed race condition leading to loss of events.
+
+* celeryd: Reject tasks with an eta that cannot be converted to timestamp.
+
+    See issue #209
+
+* concurrency.processes.pool: The semaphore was released twice for each task
+  (both at ACK and result ready).
+
+    This has been fixed, and it is now released only once per task.
+
+* docs/configuration: Fixed typo ``CELERYD_SOFT_TASK_TIME_LIMIT`` ->
+  :setting:`CELERYD_TASK_SOFT_TIME_LIMIT`.
+
+    See issue #214
+
+* control command ``dump_scheduled``: was using old .info attribute
+
+* :program:`celeryd-multi`: Fixed ``set changed size during iteration`` bug
+    occuring in the restart command.
+
+* celeryd: Accidentally tried to use additional command line arguments.
+
+   This would lead to an error like:
+
+    ``got multiple values for keyword argument 'concurrency'``.
+
+    Additional command line arguments are now ignored, and does not
+    produce this error.  However -- we do reserve the right to use
+    positional arguments in the future, so please do not depend on this
+    behavior.
+
+* celerybeat: Now respects routers and task execution options again.
+
+* celerybeat: Now reuses the publisher instead of the connection.
+
+* Cache result backend: Using :class:`float` as the expires argument
+  to ``cache.set`` is deprecated by the memcached libraries,
+  so we now automatically cast to :class:`int`.
+
+* unittests: No longer emits logging and warnings in test output.
+
+.. _v211-news:
+
+News
+----
+
+* Now depends on carrot version 0.10.7.
+
+* Added :setting:`CELERY_REDIRECT_STDOUTS`, and
+  :setting:`CELERYD_REDIRECT_STDOUTS_LEVEL` settings.
+
+    :setting:`CELERY_REDIRECT_STDOUTS` is used by :program:`celeryd` and
+    :program:`celerybeat`.  All output to ``stdout`` and ``stderr`` will be
+    redirected to the current logger if enabled.
+
+    :setting:`CELERY_REDIRECT_STDOUTS_LEVEL` decides the loglevel used and is
+    :const:`WARNING` by default.
+
+* Added :setting:`CELERYBEAT_SCHEDULER` setting.
+
+    This setting is used to define the default for the -S option to
+    :program:`celerybeat`.
+
+    Example:
+
+    .. code-block:: python
+
+        CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"
+
+* Added Task.expires: Used to set default expiry time for tasks.
+
+* New remote control commands: ``add_consumer`` and ``cancel_consumer``.
+
+    .. method:: add_consumer(queue, exchange, exchange_type, routing_key,
+                             **options)
+        :module:
+
+        Tells the worker to declare and consume from the specified
+        declaration.
+
+    .. method:: cancel_consumer(queue_name)
+        :module:
+
+        Tells the worker to stop consuming from queue (by queue name).
+
+
+    Commands also added to :program:`celeryctl` and
+    :class:`~celery.task.control.inspect`.
+
+
+    Example using celeryctl to start consuming from queue "queue", in 
+    exchange "exchange", of type "direct" using binding key "key"::
+
+        $ celeryctl inspect add_consumer queue exchange direct key
+        $ celeryctl inspect cancel_consumer queue
+
+    See :ref:`monitoring-celeryctl` for more information about the
+    :program:`celeryctl` program.
+
+
+    Another example using :class:`~celery.task.control.inspect`:
+
+    .. code-block:: python
+
+        >>> from celery.task.control import inspect
+        >>> inspect.add_consumer(queue="queue", exchange="exchange",
+        ...                      exchange_type="direct",
+        ...                      routing_key="key",
+        ...                      durable=False,
+        ...                      auto_delete=True)
+
+        >>> inspect.cancel_consumer("queue")
+
+* celerybeat: Now logs the traceback if a message can't be sent.
+
+* celerybeat: Now enables a default socket timeout of 30 seconds.
+
+* README/introduction/homepage: Added link to `Flask-Celery`_.
+
+.. _`Flask-Celery`: http://github.com/ask/flask-celery
 
 
 .. _version-2.1.0:
 .. _version-2.1.0:
 
 

+ 5 - 4
celery/events/state.py

@@ -59,8 +59,8 @@ class Task(Element):
                     "result", "eta", "runtime", "expires",
                     "result", "eta", "runtime", "expires",
                     "exception")
                     "exception")
 
 
-    _merge_rules = {states.RECEIVED: ("name", "args", "kwargs",
-                                      "retries", "eta", "expires")}
+    merge_rules = {states.RECEIVED: ("name", "args", "kwargs",
+                                     "retries", "eta", "expires")}
 
 
     _defaults = dict(uuid=None,
     _defaults = dict(uuid=None,
                      name=None,
                      name=None,
@@ -97,7 +97,8 @@ class Task(Element):
     def update(self, state, timestamp, fields):
     def update(self, state, timestamp, fields):
         if self.worker:
         if self.worker:
             self.worker.on_heartbeat(timestamp=timestamp)
             self.worker.on_heartbeat(timestamp=timestamp)
-        if states.state(state) < states.state(self.state):
+        if state != states.RETRY and self.state != states.RETRY and \
+                states.state(state) < states.state(self.state):
             self.merge(state, timestamp, fields)
             self.merge(state, timestamp, fields)
         else:
         else:
             self.state = state
             self.state = state
@@ -105,7 +106,7 @@ class Task(Element):
             super(Task, self).update(fields)
             super(Task, self).update(fields)
 
 
     def merge(self, state, timestamp, fields):
     def merge(self, state, timestamp, fields):
-        keep = self._merge_rules.get(state)
+        keep = self.merge_rules.get(state)
         if keep is not None:
         if keep is not None:
             fields = dict((key, fields[key]) for key in keep)
             fields = dict((key, fields[key]) for key in keep)
             super(Task, self).update(fields)
             super(Task, self).update(fields)

+ 1 - 0
celery/states.py

@@ -57,6 +57,7 @@ PRECEDENCE = ["SUCCESS",
               "REVOKED",
               "REVOKED",
               "STARTED",
               "STARTED",
               "RECEIVED",
               "RECEIVED",
+              "RETRY",
               "PENDING"]
               "PENDING"]
 
 
 
 

+ 21 - 5
celery/worker/job.py

@@ -10,7 +10,7 @@ from celery import platforms
 from celery.app import app_or_default
 from celery.app import app_or_default
 from celery.datastructures import ExceptionInfo
 from celery.datastructures import ExceptionInfo
 from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
 from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
-from celery.exceptions import WorkerLostError
+from celery.exceptions import WorkerLostError, RetryTaskError
 from celery.execute.trace import TaskTrace
 from celery.execute.trace import TaskTrace
 from celery.registry import tasks
 from celery.registry import tasks
 from celery.utils import noop, kwdict, fun_takes_kwargs
 from celery.utils import noop, kwdict, fun_takes_kwargs
@@ -121,7 +121,7 @@ class WorkerTaskTrace(TaskTrace):
         message, orig_exc = exc.args
         message, orig_exc = exc.args
         if self._store_errors:
         if self._store_errors:
             self.task.backend.mark_as_retry(self.task_id, orig_exc, strtb)
             self.task.backend.mark_as_retry(self.task_id, orig_exc, strtb)
-        self.super.handle_retry(exc, type_, tb, strtb)
+        return self.super.handle_retry(exc, type_, tb, strtb)
 
 
     def handle_failure(self, exc, type_, tb, strtb):
     def handle_failure(self, exc, type_, tb, strtb):
         """Handle exception."""
         """Handle exception."""
@@ -201,6 +201,9 @@ class TaskRequest(object):
     error_msg = """
     error_msg = """
         Task %(name)s[%(id)s] raised exception: %(exc)s\n%(traceback)s
         Task %(name)s[%(id)s] raised exception: %(exc)s\n%(traceback)s
     """
     """
+    retry_msg = """
+        Task %(name)s[%(id)s] retry: %(exc)s
+    """
 
 
     # E-mails
     # E-mails
     email_subject = """
     email_subject = """
@@ -447,6 +450,16 @@ class TaskRequest(object):
         #     "the quick brown fox jumps over the lazy dog" :)
         #     "the quick brown fox jumps over the lazy dog" :)
         return truncate_text(repr(result), maxlen)
         return truncate_text(repr(result), maxlen)
 
 
+    def on_retry(self, exc_info):
+        self.send_event("task-retried", uuid=self.task_id,
+                                        exception=repr(exc_info.exception.exc),
+                                        traceback=repr(exc_info.traceback))
+        msg = self.retry_msg.strip() % {
+                "id": self.task_id,
+                "name": self.task_name,
+                "exc": repr(exc_info.exception.exc)}
+        self.logger.info(msg)
+
     def on_failure(self, exc_info):
     def on_failure(self, exc_info):
         """The handler used if the task raised an exception."""
         """The handler used if the task raised an exception."""
         state.task_ready(self)
         state.task_ready(self)
@@ -454,9 +467,8 @@ class TaskRequest(object):
         if self.task.acks_late:
         if self.task.acks_late:
             self.acknowledge()
             self.acknowledge()
 
 
-        self.send_event("task-failed", uuid=self.task_id,
-                                       exception=repr(exc_info.exception),
-                                       traceback=exc_info.traceback)
+        if isinstance(exc_info.exception, RetryTaskError):
+            return self.on_retry(exc_info)
 
 
         # This is a special case as the process would not have had
         # This is a special case as the process would not have had
         # time to write the result.
         # time to write the result.
@@ -465,6 +477,10 @@ class TaskRequest(object):
                 self.task.backend.mark_as_failure(self.task_id,
                 self.task.backend.mark_as_failure(self.task_id,
                                                   exc_info.exception)
                                                   exc_info.exception)
 
 
+        self.send_event("task-failed", uuid=self.task_id,
+                                       exception=repr(exc_info.exception),
+                                       traceback=exc_info.traceback)
+
         context = {"hostname": self.hostname,
         context = {"hostname": self.hostname,
                    "id": self.task_id,
                    "id": self.task_id,
                    "name": self.task_name,
                    "name": self.task_name,

+ 12 - 0
docs/community.rst

@@ -60,6 +60,18 @@ Celery, RabbitMQ and sending messages directly.
 -----------------------------------------------
 -----------------------------------------------
 http://blog.timc3.com/2010/10/17/celery-rabbitmq-and-sending-messages-directly/
 http://blog.timc3.com/2010/10/17/celery-rabbitmq-and-sending-messages-directly/
 
 
+Cron dentro do Django com Celery (Portugese)
+--------------------------------------------
+http://blog.avelino.us/2010/10/cron-dentro-do-django-com-celery.html
+
+RabbitMQとCeleryを使ってDjangoでジョブキューしてみる (Japanese)
+---------------------------------------------------------------
+http://d.hatena.ne.jp/yuku_t/
+
+Celery - Eine asynchrone Task Queue (nicht nur) für Django (German)
+-------------------------------------------------------------------
+http://www.scribd.com/doc/39203296/Celery-Eine-asynchrone-Task-Queue-nicht-nur-fur-Django
+
 Asynchronous Processing Using Celery (historio.us)
 Asynchronous Processing Using Celery (historio.us)
 --------------------------------------------------
 --------------------------------------------------
 http://blog.historio.us/asynchronous-processing-using-celery
 http://blog.historio.us/asynchronous-processing-using-celery

+ 2 - 3
docs/internals/events.rst

@@ -36,10 +36,9 @@ Task Events
     Sent if the task has been revoked (Note that this is likely
     Sent if the task has been revoked (Note that this is likely
     to be sent by more than one worker)
     to be sent by more than one worker)
 
 
-* task-retried(uuid, exception, traceback, hostname, delay, timestamp)
+* task-retried(uuid, exception, traceback, hostname, timestamp)
 
 
-    Sent if the task failed, but will be retried in the future.
-    (**NOT IMPLEMENTED**)
+    Sent if the task failed, but will be retried.
 
 
 Worker Events
 Worker Events
 =============
 =============

+ 1 - 2
docs/userguide/monitoring.rst

@@ -507,10 +507,9 @@ Task Events
     Sent if the task has been revoked (Note that this is likely
     Sent if the task has been revoked (Note that this is likely
     to be sent by more than one worker).
     to be sent by more than one worker).
 
 
-* ``task-retried(uuid, exception, traceback, hostname, delay, timestamp)``
+* ``task-retried(uuid, exception, traceback, hostname, timestamp)``
 
 
     Sent if the task failed, but will be retried in the future.
     Sent if the task failed, but will be retried in the future.
-    (**NOT IMPLEMENTED**)
 
 
 .. _event-reference-worker:
 .. _event-reference-worker: