Browse Source

Merge branch '3.0'

Conflicts:
	Changelog
	README.rst
	celery/__init__.py
	celery/apps/worker.py
	celery/utils/mail.py
	docs/includes/introduction.txt
	requirements/default.txt
	setup.cfg
Ask Solem 12 years ago
parent
commit
2f5f0e3416

+ 44 - 4
Changelog

@@ -20,6 +20,46 @@ If you're looking for versions prior to 3.x you should see :ref:`history`.
 - `Task.apply_async` now supports timeout and soft_timeout arguments (Issue #802)
 - `Task.apply_async` now supports timeout and soft_timeout arguments (Issue #802)
 - `App.control.Inspect.conf` can be used for inspecting worker configuration
 - `App.control.Inspect.conf` can be used for inspecting worker configuration
 
 
+.. _version-3.0.8:
+
+3.0.8
+=====
+:release-date: 2012-08-29 05:00 P.M BST
+
+- Now depends on Kombu 2.4.4
+
+- Fixed problem with amqplib and receiving larger message payloads
+  (Issue #922).
+
+    The problem would manifest itself as either the worker hanging,
+    or occasionally a ``Framing error`` exception appearing.
+
+    Users of the new ``pyamqp://`` transport must upgrade to
+    :mod:`amqp` 0.9.3.
+
+- Beat: Fixed another timezone bug with interval and crontab schedules
+  (Issue #943).
+
+- Beat: The schedule file is now automatically cleared if the timezone
+  is changed.
+
+    The schedule is also cleared when you upgrade to 3.0.8 from an earlier
+    version, this to register the initial timezone info.
+
+- Events: The :event:`worker-heartbeat` event now include processed and active
+  count fields.
+
+    Contributed by Mher Movsisyan.
+
+- Fixed error with error email and new task classes (Issue #931).
+
+- ``BaseTask.__call__`` is no longer optimized away if it has been monkey
+  patched.
+
+- Fixed shutdown issue when using gevent (Issue #911 & Issue #936).
+
+    Fix contributed by Thomas Meson.
+
 .. _version-3.0.7:
 .. _version-3.0.7:
 
 
 3.0.7
 3.0.7
@@ -35,7 +75,7 @@ If you're looking for versions prior to 3.x you should see :ref:`history`.
     - Fixes an infinite loop that could happen when retrying establishing
     - Fixes an infinite loop that could happen when retrying establishing
       the broker connection.
       the broker connection.
 
 
-- Daemons now redirect standard file descriptors to /dev/null
+- Daemons now redirect standard file descriptors to :file:`/dev/null`
 
 
     Though by default the standard outs are also redirected
     Though by default the standard outs are also redirected
     to the logger instead, but you can disable this by changing
     to the logger instead, but you can disable this by changing
@@ -60,7 +100,7 @@ If you're looking for versions prior to 3.x you should see :ref:`history`.
 
 
 - Celery command: Extensions are now sorted by name.
 - Celery command: Extensions are now sorted by name.
 
 
-- A regression caused the ``task-failed`` event to be sent
+- A regression caused the :event:`task-failed` event to be sent
   with the exception object instead of its string representation.
   with the exception object instead of its string representation.
 
 
 - The worker daemon would try to create the pid file before daemonizing
 - The worker daemon would try to create the pid file before daemonizing
@@ -122,7 +162,7 @@ If you're looking for versions prior to 3.x you should see :ref:`history`.
 
 
 - ``AsyncResult.revoke`` now accepts ``terminate`` and ``signal`` arguments.
 - ``AsyncResult.revoke`` now accepts ``terminate`` and ``signal`` arguments.
 
 
-- The ``task-revoked`` event now includes new fields: ``terminated``,
+- The :event:`task-revoked` event now includes new fields: ``terminated``,
   ``signum``, and ``expired``.
   ``signum``, and ``expired``.
 
 
 - The argument to :class:`~celery.exceptions.TaskRevokedError` is now one
 - The argument to :class:`~celery.exceptions.TaskRevokedError` is now one
@@ -429,7 +469,7 @@ If you're looking for versions prior to 3.x you should see :ref:`history`.
     - :func:`~celery.contrib.migrate.move_tasks`
     - :func:`~celery.contrib.migrate.move_tasks`
     - :func:`~celery.contrib.migrate.move_task_by_id`
     - :func:`~celery.contrib.migrate.move_task_by_id`
 
 
-- The task-sent event now contains ``exchange`` and ``routing_key``
+- The :event:`task-sent` event now contains ``exchange`` and ``routing_key``
   fields.
   fields.
 
 
 - Fixes bug with installing on Python 3.
 - Fixes bug with installing on Python 3.

+ 12 - 0
celery/app/base.py

@@ -99,6 +99,15 @@ class Celery(object):
     def set_current(self):
     def set_current(self):
         _tls.current_app = self
         _tls.current_app = self
 
 
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *exc_info):
+        self.close()
+
+    def close(self):
+        self._maybe_close_pool()
+
     def on_init(self):
     def on_init(self):
         """Optional callback called at init."""
         """Optional callback called at init."""
         pass
         pass
@@ -319,6 +328,9 @@ class Celery(object):
         return s
         return s
 
 
     def _after_fork(self, obj_):
     def _after_fork(self, obj_):
+        self._maybe_close_pool()
+
+    def _maybe_close_pool(self):
         if self._pool:
         if self._pool:
             self._pool.force_close_all()
             self._pool.force_close_all()
             self._pool = None
             self._pool = None

+ 4 - 3
celery/apps/worker.py

@@ -259,7 +259,7 @@ def _clone_current_worker():
 
 
 def install_worker_restart_handler(worker, sig='SIGHUP'):
 def install_worker_restart_handler(worker, sig='SIGHUP'):
 
 
-    def restart_worker_sig_handler(signum, frame):
+    def restart_worker_sig_handler(*args):
         """Signal handler restarting the current python program."""
         """Signal handler restarting the current python program."""
         set_in_sighandler(True)
         set_in_sighandler(True)
         safe_say('Restarting celeryd ({0})'.format(' '.join(sys.argv)))
         safe_say('Restarting celeryd ({0})'.format(' '.join(sys.argv)))
@@ -275,7 +275,7 @@ def install_cry_handler():
     if is_jython or is_pypy:  # pragma: no cover
     if is_jython or is_pypy:  # pragma: no cover
         return
         return
 
 
-    def cry_handler(signum, frame):
+    def cry_handler(*args):
         """Signal handler logging the stacktrace of all active threads."""
         """Signal handler logging the stacktrace of all active threads."""
         with in_sighandler():
         with in_sighandler():
             safe_say(cry())
             safe_say(cry())
@@ -285,9 +285,10 @@ def install_cry_handler():
 def install_rdb_handler(envvar='CELERY_RDBSIG',
 def install_rdb_handler(envvar='CELERY_RDBSIG',
                         sig='SIGUSR2'):  # pragma: no cover
                         sig='SIGUSR2'):  # pragma: no cover
 
 
-    def rdb_handler(signum, frame):
+    def rdb_handler(*args):
         """Signal handler setting a rdb breakpoint at the current frame."""
         """Signal handler setting a rdb breakpoint at the current frame."""
         with in_sighandler():
         with in_sighandler():
+            _, frame = args
             from celery.contrib import rdb
             from celery.contrib import rdb
             rdb.set_trace(frame)
             rdb.set_trace(frame)
     if os.environ.get(envvar):
     if os.environ.get(envvar):

+ 7 - 3
celery/beat.py

@@ -342,12 +342,16 @@ class PersistentScheduler(Scheduler):
         else:
         else:
             if '__version__' not in self._store:
             if '__version__' not in self._store:
                 self._store.clear()   # remove schedule at 2.2.2 upgrade.
                 self._store.clear()   # remove schedule at 2.2.2 upgrade.
-            if 'utc' not in self._store:
-                self._store.clear()   # remove schedule at 3.0.1 upgrade.
+            if 'tz' not in self._store:
+                self._store.clear()   # remove schedule at 3.0.8 upgrade
+        tz = self.app.conf.CELERY_TIMEZONE
+        current_tz = self._store.get('tz')
+        if current_tz is not None and current_tz != tz:
+            self._store_clear()   # Timezone changed, reset db!
         entries = self._store.setdefault('entries', {})
         entries = self._store.setdefault('entries', {})
         self.merge_inplace(self.app.conf.CELERYBEAT_SCHEDULE)
         self.merge_inplace(self.app.conf.CELERYBEAT_SCHEDULE)
         self.install_default_entries(self.schedule)
         self.install_default_entries(self.schedule)
-        self._store.update(__version__=__version__, utc=True)
+        self._store.update(__version__=__version__, utc=True, tz=tz)
         self.sync()
         self.sync()
         debug('Current schedule:\n' + '\n'.join(repr(entry)
         debug('Current schedule:\n' + '\n'.join(repr(entry)
                                     for entry in entries.itervalues()))
                                     for entry in entries.itervalues()))

+ 8 - 1
celery/concurrency/gevent.py

@@ -13,8 +13,15 @@ import os
 PATCHED = [0]
 PATCHED = [0]
 if not os.environ.get('GEVENT_NOPATCH') and not PATCHED[0]:
 if not os.environ.get('GEVENT_NOPATCH') and not PATCHED[0]:
     PATCHED[0] += 1
     PATCHED[0] += 1
-    from gevent import monkey
+    from gevent import monkey, version_info
     monkey.patch_all()
     monkey.patch_all()
+    if version_info[0] == 0:
+        # Signals are not working along gevent in version prior 1.0
+        # and they are not monkey patch by monkey.patch_all()
+        from gevent import signal as _gevent_signal
+        _signal = __import__('signal')
+        _signal.signal = _gevent_signal
+
 
 
 from time import time
 from time import time
 
 

+ 12 - 12
celery/events/state.py

@@ -54,17 +54,17 @@ class Worker(Element):
         self.heartbeats = []
         self.heartbeats = []
 
 
     def on_online(self, timestamp=None, **kwargs):
     def on_online(self, timestamp=None, **kwargs):
-        """Callback for the `worker-online` event."""
+        """Callback for the :event:`worker-online` event."""
         self.update(**kwargs)
         self.update(**kwargs)
         self._heartpush(timestamp)
         self._heartpush(timestamp)
 
 
     def on_offline(self, **kwargs):
     def on_offline(self, **kwargs):
-        """Callback for the `worker-offline` event."""
+        """Callback for the :event:`worker-offline` event."""
         self.update(**kwargs)
         self.update(**kwargs)
         self.heartbeats = []
         self.heartbeats = []
 
 
     def on_heartbeat(self, timestamp=None, **kwargs):
     def on_heartbeat(self, timestamp=None, **kwargs):
-        """Callback for the `worker-heartbeat` event."""
+        """Callback for the :event:`worker-heartbeat` event."""
         self.update(**kwargs)
         self.update(**kwargs)
         self._heartpush(timestamp)
         self._heartpush(timestamp)
 
 
@@ -95,8 +95,8 @@ class Task(Element):
     """Task State."""
     """Task State."""
 
 
     #: How to merge out of order events.
     #: How to merge out of order events.
-    #: Disorder is detected by logical ordering (e.g. task-received must have
-    #: happened before a task-failed event).
+    #: Disorder is detected by logical ordering (e.g. :event:`task-received`
+    #: must have happened before a :event:`task-failed` event).
     #:
     #:
     #: A merge rule consists of a state and a list of fields to keep from
     #: A merge rule consists of a state and a list of fields to keep from
     #: that state. ``(RECEIVED, ('name', 'args')``, means the name and args
     #: that state. ``(RECEIVED, ('name', 'args')``, means the name and args
@@ -149,37 +149,37 @@ class Task(Element):
             super(Task, self).update(fields)
             super(Task, self).update(fields)
 
 
     def on_sent(self, timestamp=None, **fields):
     def on_sent(self, timestamp=None, **fields):
-        """Callback for the ``task-sent`` event."""
+        """Callback for the :event:`task-sent` event."""
         self.sent = timestamp
         self.sent = timestamp
         self.update(states.PENDING, timestamp, fields)
         self.update(states.PENDING, timestamp, fields)
 
 
     def on_received(self, timestamp=None, **fields):
     def on_received(self, timestamp=None, **fields):
-        """Callback for the ``task-received`` event."""
+        """Callback for the :event:`task-received` event."""
         self.received = timestamp
         self.received = timestamp
         self.update(states.RECEIVED, timestamp, fields)
         self.update(states.RECEIVED, timestamp, fields)
 
 
     def on_started(self, timestamp=None, **fields):
     def on_started(self, timestamp=None, **fields):
-        """Callback for the ``task-started`` event."""
+        """Callback for the :event:`task-started` event."""
         self.started = timestamp
         self.started = timestamp
         self.update(states.STARTED, timestamp, fields)
         self.update(states.STARTED, timestamp, fields)
 
 
     def on_failed(self, timestamp=None, **fields):
     def on_failed(self, timestamp=None, **fields):
-        """Callback for the ``task-failed`` event."""
+        """Callback for the :event:`task-failed` event."""
         self.failed = timestamp
         self.failed = timestamp
         self.update(states.FAILURE, timestamp, fields)
         self.update(states.FAILURE, timestamp, fields)
 
 
     def on_retried(self, timestamp=None, **fields):
     def on_retried(self, timestamp=None, **fields):
-        """Callback for the ``task-retried`` event."""
+        """Callback for the :event:`task-retried` event."""
         self.retried = timestamp
         self.retried = timestamp
         self.update(states.RETRY, timestamp, fields)
         self.update(states.RETRY, timestamp, fields)
 
 
     def on_succeeded(self, timestamp=None, **fields):
     def on_succeeded(self, timestamp=None, **fields):
-        """Callback for the ``task-succeeded`` event."""
+        """Callback for the :event:`task-succeeded` event."""
         self.succeeded = timestamp
         self.succeeded = timestamp
         self.update(states.SUCCESS, timestamp, fields)
         self.update(states.SUCCESS, timestamp, fields)
 
 
     def on_revoked(self, timestamp=None, **fields):
     def on_revoked(self, timestamp=None, **fields):
-        """Callback for the ``task-revoked`` event."""
+        """Callback for the :event:`task-revoked` event."""
         self.revoked = timestamp
         self.revoked = timestamp
         self.update(states.REVOKED, timestamp, fields)
         self.update(states.REVOKED, timestamp, fields)
 
 

+ 1 - 2
celery/schedules.py

@@ -90,7 +90,7 @@ class schedule(object):
 
 
     def maybe_make_aware(self, dt):
     def maybe_make_aware(self, dt):
         if self.app.conf.CELERY_ENABLE_UTC:
         if self.app.conf.CELERY_ENABLE_UTC:
-            return maybe_make_aware(dt)
+            return maybe_make_aware(dt, self.tz)
         return dt
         return dt
 
 
     def __repr__(self):
     def __repr__(self):
@@ -490,7 +490,6 @@ class crontab(schedule):
         See :meth:`celery.schedules.schedule.is_due` for more information.
         See :meth:`celery.schedules.schedule.is_due` for more information.
 
 
         """
         """
-        last_run_at = self.maybe_make_aware(last_run_at)
         rem_delta = self.remaining_estimate(last_run_at)
         rem_delta = self.remaining_estimate(last_run_at)
         rem = timedelta_seconds(rem_delta)
         rem = timedelta_seconds(rem_delta)
         due = rem == 0
         due = rem == 0

+ 14 - 2
celery/task/trace.py

@@ -53,16 +53,27 @@ except AttributeError:
     pass
     pass
 
 
 
 
-def mro_lookup(cls, attr, stop=()):
+def mro_lookup(cls, attr, stop=(), monkey_patched=[]):
     """Returns the first node by MRO order that defines an attribute.
     """Returns the first node by MRO order that defines an attribute.
 
 
     :keyword stop: A list of types that if reached will stop the search.
     :keyword stop: A list of types that if reached will stop the search.
+    :keyword monkey_patched: Use one of the stop classes if the attr's
+        module origin is not in this list, this to detect monkey patched
+        attributes.
 
 
     :returns None: if the attribute was not found.
     :returns None: if the attribute was not found.
 
 
     """
     """
     for node in cls.mro():
     for node in cls.mro():
         if node in stop:
         if node in stop:
+            try:
+                attr = node.__dict__[attr]
+                module_origin = attr.__module__
+            except (AttributeError, KeyError):
+                pass
+            else:
+                if module_origin not in monkey_patched:
+                    return node
             return
             return
         if attr in node.__dict__:
         if attr in node.__dict__:
             return node
             return node
@@ -71,7 +82,8 @@ def mro_lookup(cls, attr, stop=()):
 def task_has_custom(task, attr):
 def task_has_custom(task, attr):
     """Returns true if the task or one of its bases
     """Returns true if the task or one of its bases
     defines ``attr`` (excluding the one in BaseTask)."""
     defines ``attr`` (excluding the one in BaseTask)."""
-    return mro_lookup(task.__class__, attr, stop=(BaseTask, object))
+    return mro_lookup(task.__class__, attr, stop=(BaseTask, object),
+                      monkey_patched=['celery.app.task'])
 
 
 
 
 class TraceInfo(object):
 class TraceInfo(object):

+ 2 - 0
celery/tests/concurrency/test_gevent.py

@@ -38,7 +38,9 @@ class test_gevent_patch(GeventCase):
     def test_is_patched(self):
     def test_is_patched(self):
         with mock_module(*gevent_modules):
         with mock_module(*gevent_modules):
             monkey_patched = []
             monkey_patched = []
+            import gevent
             from gevent import monkey
             from gevent import monkey
+            gevent.version_info = (1, 0, 0)
             prev_monkey_patch = monkey.patch_all
             prev_monkey_patch = monkey.patch_all
             monkey.patch_all = lambda: monkey_patched.append(True)
             monkey.patch_all = lambda: monkey_patched.append(True)
             prev_gevent = sys.modules.pop('celery.concurrency.gevent', None)
             prev_gevent = sys.modules.pop('celery.concurrency.gevent', None)

+ 3 - 2
celery/utils/timeutils.py

@@ -230,5 +230,6 @@ def to_utc(dt):
 
 
 def maybe_make_aware(dt, tz=None):
 def maybe_make_aware(dt, tz=None):
     if is_naive(dt):
     if is_naive(dt):
-        return to_utc(dt)
-    return localize(dt, timezone.utc if tz is None else tz)
+        dt = to_utc(dt)
+    return localize(dt,
+        timezone.utc if tz is None else timezone.tz_or_local(tz))

+ 5 - 0
docs/_ext/celerydocs.py

@@ -147,3 +147,8 @@ def setup(app):
         rolename="signal",
         rolename="signal",
         indextemplate="pair: %s; signal",
         indextemplate="pair: %s; signal",
     )
     )
+    app.add_crossref_type(
+        directivename="event",
+        rolename="event",
+        indextemplate="pair: %s; event",
+    )

+ 1 - 1
docs/configuration.rst

@@ -1245,7 +1245,7 @@ CELERY_SEND_TASK_SENT_EVENT
 
 
 .. versionadded:: 2.2
 .. versionadded:: 2.2
 
 
-If enabled, a `task-sent` event will be sent for every task so tasks can be
+If enabled, a :event:`task-sent` event will be sent for every task so tasks can be
 tracked before they are consumed by a worker.
 tracked before they are consumed by a worker.
 
 
 Disabled by default.
 Disabled by default.

+ 2 - 2
docs/history/changelog-1.0.rst

@@ -813,9 +813,9 @@ News
     Excellent for monitoring tools, one is already in the making
     Excellent for monitoring tools, one is already in the making
     (http://github.com/celery/celerymon).
     (http://github.com/celery/celerymon).
 
 
-    Current events include: worker-heartbeat,
+    Current events include: :event:`worker-heartbeat`,
     task-[received/succeeded/failed/retried],
     task-[received/succeeded/failed/retried],
-    worker-online, worker-offline.
+    :event:`worker-online`, :event:`worker-offline`.
 
 
 * You can now delete (revoke) tasks that has already been applied.
 * You can now delete (revoke) tasks that has already been applied.
 
 

+ 1 - 1
docs/history/changelog-2.1.rst

@@ -101,7 +101,7 @@ Documentation
 Fixes
 Fixes
 -----
 -----
 
 
-* celeryd: Now sends the `task-retried` event for retried tasks.
+* celeryd: Now sends the :event:`task-retried` event for retried tasks.
 
 
 * celeryd: Now honors ignore result for
 * celeryd: Now honors ignore result for
   :exc:`~@WorkerLostError` and timeout errors.
   :exc:`~@WorkerLostError` and timeout errors.

+ 1 - 1
docs/history/changelog-2.2.rst

@@ -881,7 +881,7 @@ News
 
 
 
 
 * The PID of the child worker process accepting a task is now sent as a field
 * The PID of the child worker process accepting a task is now sent as a field
-  with the `task-started` event.
+  with the :event:`task-started` event.
 
 
 * The following fields have been added to all events in the worker class:
 * The following fields have been added to all events in the worker class:
 
 

+ 1 - 1
docs/history/changelog-2.3.rst

@@ -100,7 +100,7 @@ Fixes
 * Fixes case where the worker could become unresponsive because of tasks
 * Fixes case where the worker could become unresponsive because of tasks
   exceeding the hard time limit.
   exceeding the hard time limit.
 
 
-* The ``task-sent`` event was missing from the event reference.
+* The :event:`task-sent` event was missing from the event reference.
 
 
 * ``ResultSet.iterate`` now returns results as they finish (Issue #459).
 * ``ResultSet.iterate`` now returns results as they finish (Issue #459).
 
 

+ 1 - 1
docs/userguide/canvas.rst

@@ -16,7 +16,7 @@ Subtasks
 .. versionadded:: 2.0
 .. versionadded:: 2.0
 
 
 You just learned how to call a task using the tasks ``delay`` method
 You just learned how to call a task using the tasks ``delay`` method
-in the :ref:`calling <calling>` guide, and this is often all you need,
+in the :ref:`calling <guide-calling>` guide, and this is often all you need,
 but sometimes you may want to pass the signature of a task invocation to
 but sometimes you may want to pass the signature of a task invocation to
 another process or as an argument to another function, for this Celery uses
 another process or as an argument to another function, for this Celery uses
 something called *subtasks*.
 something called *subtasks*.

+ 99 - 39
docs/userguide/monitoring.rst

@@ -773,77 +773,137 @@ You can listen to specific events by specifying the handlers:
 .. _event-reference:
 .. _event-reference:
 
 
 Event Reference
 Event Reference
----------------
+===============
 
 
 This list contains the events sent by the worker, and their arguments.
 This list contains the events sent by the worker, and their arguments.
 
 
 .. _event-reference-task:
 .. _event-reference-task:
 
 
 Task Events
 Task Events
-~~~~~~~~~~~
+-----------
+
+.. event:: task-sent
+
+task-sent
+~~~~~~~~~
+
+:signature: ``task-sent(uuid, name, args, kwargs, retries, eta, expires,
+              queue, exchange, routing_key)``
+
+Sent when a task message is published and
+the :setting:`CELERY_SEND_TASK_SENT_EVENT` setting is enabled.
+
+.. event:: task-received
+
+task-received
+~~~~~~~~~~~~~
+
+:signature: ``task-received(uuid, name, args, kwargs, retries, eta, hostname,
+              timestamp)``
+
+Sent when the worker receives a task.
+
+.. event:: task-started
+
+task-started
+~~~~~~~~~~~~
 
 
-* ``task-sent(uuid, name, args, kwargs, retries, eta, expires,
-  queue, exchange, routing_key)``
+:signature: ``task-started(uuid, hostname, timestamp, pid)``
 
 
-   Sent when a task message is published and
-   the :setting:`CELERY_SEND_TASK_SENT_EVENT` setting is enabled.
+Sent just before the worker executes the task.
 
 
-* ``task-received(uuid, name, args, kwargs, retries, eta, hostname,
-  timestamp)``
+.. event:: task-succeeded
 
 
-    Sent when the worker receives a task.
+task-succeeded
+~~~~~~~~~~~~~~
 
 
-* ``task-started(uuid, hostname, timestamp, pid)``
+:signature: ``task-succeeded(uuid, result, runtime, hostname, timestamp)``
 
 
-    Sent just before the worker executes the task.
+Sent if the task executed successfully.
 
 
-* ``task-succeeded(uuid, result, runtime, hostname, timestamp)``
+Runtime is the time it took to execute the task using the pool.
+(Starting from the task is sent to the worker pool, and ending when the
+pool result handler callback is called).
 
 
-    Sent if the task executed successfully.
+.. event:: task-failed
 
 
-    Runtime is the time it took to execute the task using the pool.
-    (Starting from the task is sent to the worker pool, and ending when the
-    pool result handler callback is called).
+task-failed
+~~~~~~~~~~~
+
+:signature: ``task-failed(uuid, exception, traceback, hostname, timestamp)``
+
+Sent if the execution of the task failed.
 
 
-* ``task-failed(uuid, exception, traceback, hostname, timestamp)``
+.. event:: task-revoked
 
 
-    Sent if the execution of the task failed.
+task-revoked
+~~~~~~~~~~~~
 
 
-* ``task-revoked(uuid, terminated, signum, expired)``
+:signature: ``task-revoked(uuid, terminated, signum, expired)``
 
 
-    Sent if the task has been revoked (Note that this is likely
-    to be sent by more than one worker).
+Sent if the task has been revoked (Note that this is likely
+to be sent by more than one worker).
 
 
-    - ``terminated`` is set to true if the task process was terminated,
-      and the ``signum`` field set to the signal used.
+- ``terminated`` is set to true if the task process was terminated,
+    and the ``signum`` field set to the signal used.
 
 
-    - ``expired`` is set to true if the task expired.
+- ``expired`` is set to true if the task expired.
 
 
-* ``task-retried(uuid, exception, traceback, hostname, timestamp)``
+.. event:: task-retried
 
 
-    Sent if the task failed, but will be retried in the future.
+task-retried
+~~~~~~~~~~~~
+
+:signature: ``task-retried(uuid, exception, traceback, hostname, timestamp)``
+
+Sent if the task failed, but will be retried in the future.
 
 
 .. _event-reference-worker:
 .. _event-reference-worker:
 
 
 Worker Events
 Worker Events
+-------------
+
+.. event:: worker-online
+
+worker-online
 ~~~~~~~~~~~~~
 ~~~~~~~~~~~~~
 
 
-* ``worker-online(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys)``
+:signature: ``worker-online(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys)``
+
+The worker has connected to the broker and is online.
+
+- `hostname`: Hostname of the worker.
+- `timestamp`: Event timestamp.
+- `freq`: Heartbeat frequency in seconds (float).
+- `sw_ident`: Name of worker software (e.g. ``py-celery``).
+- `sw_ver`: Software version (e.g. 2.2.0).
+- `sw_sys`: Operating System (e.g. Linux, Windows, Darwin).
+
+.. event:: worker-heartbeat
+
+worker-heartbeat
+~~~~~~~~~~~~~~~~
+
+:signature: ``worker-heartbeat(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys,
+              active, processed)``
 
 
-    The worker has connected to the broker and is online.
+Sent every minute, if the worker has not sent a heartbeat in 2 minutes,
+it is considered to be offline.
 
 
-    * `hostname`: Hostname of the worker.
-    * `timestamp`: Event timestamp.
-    * `freq`: Heartbeat frequency in seconds (float).
-    * `sw_ident`: Name of worker software (e.g. ``py-celery``).
-    * `sw_ver`: Software version (e.g. 2.2.0).
-    * `sw_sys`: Operating System (e.g. Linux, Windows, Darwin).
+- `hostname`: Hostname of the worker.
+- `timestamp`: Event timestamp.
+- `freq`: Heartbeat frequency in seconds (float).
+- `sw_ident`: Name of worker software (e.g. ``py-celery``).
+- `sw_ver`: Software version (e.g. 2.2.0).
+- `sw_sys`: Operating System (e.g. Linux, Windows, Darwin).
+- `active`: Number of currently executing tasks.
+- `processed`: Total number of tasks processed by this worker.
 
 
-* ``worker-heartbeat(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys)``
+.. event:: worker-offline
 
 
-    Sent every minute, if the worker has not sent a heartbeat in 2 minutes,
-    it is considered to be offline.
+worker-offline
+~~~~~~~~~~~~~~
 
 
-* ``worker-offline(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys)``
+:signature: ``worker-offline(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys)``
 
 
-    The worker has disconnected from the broker.
+The worker has disconnected from the broker.

+ 34 - 0
docs/userguide/periodic-tasks.rst

@@ -22,6 +22,38 @@ at a time, otherwise you would end up with duplicate tasks.  Using
 a centralized approach means the schedule does not have to be synchronized,
 a centralized approach means the schedule does not have to be synchronized,
 and the service can operate without using locks.
 and the service can operate without using locks.
 
 
+.. _beat-timezones:
+
+Time Zones
+==========
+
+The periodic task schedules uses the UTC time zone by default,
+but you can change the time zone used using the :setting:`CELERY_TIMEZONE`
+setting.
+
+If you use a time zone other than UTC it's recommended to install the
+:mod:`pytz` library as this can improve the accuracy and keep your timezone
+specifications up to date:
+
+.. code-block:: bash
+
+    $ pip install -U pytz
+
+
+An example time zone could be `Europe/London`:
+
+.. code-block:: python
+
+    CELERY_TIMEZONE = 'Europe/London'
+
+.. admonition:: Changing the time zone
+
+The default scheduler (storing the schedule in the :file:`celerybeat-schedule`
+file) will automatically detect that the timezone has changed, and so will
+reset the schedule itself, but other schedulers may not be so smart (e.g. the
+Django database scheduler) and in that case you will have to reset the
+schedule manually.
+
 .. _beat-entries:
 .. _beat-entries:
 
 
 Entries
 Entries
@@ -44,6 +76,8 @@ Example: Run the `tasks.add` task every 30 seconds.
         },
         },
     }
     }
 
 
+    CELERY_TIMEZONE = 'UTC'
+
 
 
 Using a :class:`~datetime.timedelta` for the schedule means the task will
 Using a :class:`~datetime.timedelta` for the schedule means the task will
 be executed 30 seconds after `celery beat` starts, and then every 30 seconds
 be executed 30 seconds after `celery beat` starts, and then every 30 seconds

+ 2 - 2
docs/whatsnew-3.0.rst

@@ -701,7 +701,7 @@ In Other News
         class Worker(celery.Worker):
         class Worker(celery.Worker):
             ...
             ...
 
 
-- New signal: :signal:`task-success`.
+- New signal: :signal:`task_success`.
 
 
 - Multiprocessing logs are now only emitted if the :envvar:`MP_LOG`
 - Multiprocessing logs are now only emitted if the :envvar:`MP_LOG`
   environment variable is set.
   environment variable is set.
@@ -774,7 +774,7 @@ In Other News
 
 
 - Module ``celery.task.control`` moved to :mod:`celery.app.control`.
 - Module ``celery.task.control`` moved to :mod:`celery.app.control`.
 
 
-- New signal: :signal:`task-revoked`
+- New signal: :signal:`task_revoked`
 
 
     Sent in the main process when the task is revoked or terminated.
     Sent in the main process when the task is revoked or terminated.
 
 

+ 1 - 1
requirements/default-py3k.txt

@@ -1,4 +1,4 @@
 billiard>=2.7.3.12
 billiard>=2.7.3.12
 python-dateutil>=2.1
 python-dateutil>=2.1
 pytz
 pytz
-kombu>=2.4.2,<3.0
+kombu>=2.4.4,<3.0

+ 1 - 1
requirements/default.txt

@@ -1,3 +1,3 @@
 billiard>=2.7.3.12
 billiard>=2.7.3.12
 python-dateutil>=2.1
 python-dateutil>=2.1
-kombu>=2.4.3,<3.0
+kombu>=2.4.4,<3.0

+ 1 - 1
setup.cfg

@@ -18,5 +18,5 @@ requires = uuid
            importlib
            importlib
            billiard >= 2.7.3.12
            billiard >= 2.7.3.12
            python-dateutil >= 2.1
            python-dateutil >= 2.1
-           kombu >= 2.4.3
+           kombu >= 2.4.4
            ordereddict
            ordereddict