Browse Source

Merge branch 'events'

Conflicts:
	celery/bin/celerybeat.py
	celery/bin/celeryd.py
	celery/messaging.py
	celery/worker/__init__.py
Ask Solem 15 năm trước cách đây
mục cha
commit
d4b6c00585
51 tập tin đã thay đổi với 2435 bổ sung651 xóa
  1. 0 4
      README.rst
  2. 0 0
      bin/celeryinit
  3. 3 3
      celery/bin/celerybeat.py
  4. 15 21
      celery/bin/celeryd.py
  5. 63 12
      celery/conf.py
  6. 93 0
      celery/events.py
  7. 30 47
      celery/execute.py
  8. 0 23
      celery/management/commands/celerystats.py
  9. 39 23
      celery/messaging.py
  10. 0 238
      celery/monitoring.py
  11. 9 0
      celery/platform.py
  12. 19 1
      celery/task/__init__.py
  13. 0 96
      celery/tests/test_monitoring.py
  14. 1 1
      celery/tests/test_task.py
  15. 20 4
      celery/tests/test_worker.py
  16. 17 162
      celery/worker/__init__.py
  17. 8 1
      celery/worker/controllers.py
  18. 50 0
      celery/worker/heartbeat.py
  19. 25 11
      celery/worker/job.py
  20. 211 0
      celery/worker/listener.py
  21. 43 0
      celery/worker/revoke.py
  22. 49 0
      docs/internals/events.rst
  23. 1 0
      docs/internals/index.rst
  24. 8 0
      docs/internals/reference/celery.patch.rst
  25. 8 0
      docs/internals/reference/celery.worker.heartbeat.rst
  26. 8 0
      docs/internals/reference/celery.worker.listener.rst
  27. 8 0
      docs/internals/reference/celery.worker.revoke.rst
  28. 4 0
      docs/internals/reference/index.rst
  29. 0 4
      docs/introduction.rst
  30. 8 0
      docs/reference/celery.events.rst
  31. 1 0
      docs/reference/index.rst
  32. 0 0
      examples/django/demoproject/__init__.py
  33. 0 0
      examples/django/demoproject/demoapp/__init__.py
  34. 3 0
      examples/django/demoproject/demoapp/models.py
  35. 0 0
      examples/django/demoproject/demoapp/tasks.py
  36. 23 0
      examples/django/demoproject/demoapp/tests.py
  37. 1 0
      examples/django/demoproject/demoapp/views.py
  38. 11 0
      examples/django/demoproject/manage.py
  39. 71 0
      examples/django/demoproject/settings.py
  40. 0 0
      examples/django/demoproject/twitterfollow/__init__.py
  41. 27 0
      examples/django/demoproject/twitterfollow/models.py
  42. 0 0
      examples/django/demoproject/twitterfollow/tasks.py
  43. 23 0
      examples/django/demoproject/twitterfollow/tests.py
  44. 1 0
      examples/django/demoproject/twitterfollow/views.py
  45. 17 0
      examples/django/demoproject/urls.py
  46. 0 0
      examples/pythonproject/demoapp/__init__.py
  47. BIN
      examples/pythonproject/demoapp/celery.db
  48. 12 0
      examples/pythonproject/demoapp/celeryconfig.py
  49. 1478 0
      examples/pythonproject/demoapp/erl_crash.dump
  50. 11 0
      examples/pythonproject/demoapp/tasks.py
  51. 16 0
      examples/pythonproject/demoapp/test.py

+ 0 - 4
README.rst

@@ -94,10 +94,6 @@ Features
       returns a JSON-serialized data structure containing the task status,
       returns a JSON-serialized data structure containing the task status,
       and the return value if completed, or exception on failure.
       and the return value if completed, or exception on failure.
 
 
-    * The worker can collect statistics, like, how many tasks has been
-      executed by type, and the time it took to process them. Very useful
-      for monitoring and profiling.
-
     * Pool workers are supervised, so if for some reason a worker crashes
     * Pool workers are supervised, so if for some reason a worker crashes
         it is automatically replaced by a new worker.
         it is automatically replaced by a new worker.
 
 

+ 0 - 0
bin/celeryinit


+ 3 - 3
celery/bin/celerybeat.py

@@ -56,9 +56,9 @@ from celery.messaging import get_connection_info
 
 
 STARTUP_INFO_FMT = """
 STARTUP_INFO_FMT = """
 Configuration ->
 Configuration ->
-    * Broker -> %(conninfo)s
-    * Exchange -> %(exchange)s (%(exchange_type)s)
-    * Consumer -> Queue:%(consumer_queue)s Binding:%(consumer_rkey)s
+    . broker -> %(conninfo)s
+    . exchange -> %(exchange)s (%(exchange_type)s)
+    . consumer -> queue:%(consumer_queue)s binding:%(consumer_rkey)s
 """.strip()
 """.strip()
 
 
 OPTION_LIST = (
 OPTION_LIST = (

+ 15 - 21
celery/bin/celeryd.py

@@ -26,10 +26,9 @@
     Also run the ``celerybeat`` periodic task scheduler. Please note that
     Also run the ``celerybeat`` periodic task scheduler. Please note that
     there must only be one instance of this service.
     there must only be one instance of this service.
 
 
-.. cmdoption:: -s, --statistics
+.. cmdoption:: -E, --events
 
 
-    Turn on reporting of statistics (remember to flush the statistics message
-    queue from time to time).
+    Send events that can be captured by monitors like ``celerymon``.
 
 
 .. cmdoption:: -d, --detach, --daemon
 .. cmdoption:: -d, --detach, --daemon
 
 
@@ -80,18 +79,15 @@ from celery.loaders import current_loader, settings
 from celery.loaders import settings
 from celery.loaders import settings
 from celery.messaging import get_connection_info
 from celery.messaging import get_connection_info
 
 
-USE_STATISTICS = getattr(settings, "CELERY_STATISTICS", False)
-# Make sure the setting exists.
-settings.CELERY_STATISTICS = USE_STATISTICS
-
 STARTUP_INFO_FMT = """
 STARTUP_INFO_FMT = """
 Configuration ->
 Configuration ->
-    * Broker -> %(conninfo)s
-    * Exchange -> %(exchange)s (%(exchange_type)s)
-    * Consumer -> Queue:%(consumer_queue)s Binding:%(consumer_rkey)s
-    * Concurrency -> %(concurrency)s
-    * Statistics -> %(statistics)s
-    * Celerybeat -> %(celerybeat)s
+    . broker -> %(conninfo)s
+    . exchange -> %(exchange)s (%(exchange_type)s)
+    . consumer -> queue:%(consumer_queue)s binding:%(consumer_rkey)s
+    . concurrency -> %(concurrency)s
+    . events -> %(events)s
+    . beat -> %(celerybeat)s
+>>>>>>> events
 """.strip()
 """.strip()
 
 
 OPTION_LIST = (
 OPTION_LIST = (
@@ -104,9 +100,6 @@ OPTION_LIST = (
             help="Discard all waiting tasks before the server is started. "
             help="Discard all waiting tasks before the server is started. "
                  "WARNING: This is unrecoverable, and the tasks will be "
                  "WARNING: This is unrecoverable, and the tasks will be "
                  "deleted from the messaging server."),
                  "deleted from the messaging server."),
-    optparse.make_option('-s', '--statistics', default=USE_STATISTICS,
-            action="store_true", dest="statistics",
-            help="Collect statistics."),
     optparse.make_option('-f', '--logfile', default=conf.DAEMON_LOG_FILE,
     optparse.make_option('-f', '--logfile', default=conf.DAEMON_LOG_FILE,
             action="store", dest="logfile",
             action="store", dest="logfile",
             help="Path to log file."),
             help="Path to log file."),
@@ -120,6 +113,9 @@ OPTION_LIST = (
             action="store_true", dest="run_clockservice",
             action="store_true", dest="run_clockservice",
             help="Also run the celerybeat periodic task scheduler. \
             help="Also run the celerybeat periodic task scheduler. \
                   Please note that only one instance must be running."),
                   Please note that only one instance must be running."),
+    optparse.make_option('-E', '--events', default=conf.CELERY_SEND_EVENTS,
+            action="store_true", dest="events",
+            help="Send events so celery can be monitored by e.g. celerymon."),
     optparse.make_option('-d', '--detach', '--daemon', default=False,
     optparse.make_option('-d', '--detach', '--daemon', default=False,
             action="store_true", dest="detach",
             action="store_true", dest="detach",
             help="Run in the background as a daemon."),
             help="Run in the background as a daemon."),
@@ -145,14 +141,11 @@ def run_worker(concurrency=conf.DAEMON_CONCURRENCY, detach=False,
         loglevel=conf.DAEMON_LOG_LEVEL, logfile=conf.DAEMON_LOG_FILE,
         loglevel=conf.DAEMON_LOG_LEVEL, logfile=conf.DAEMON_LOG_FILE,
         discard=False, pidfile=conf.DAEMON_PID_FILE, umask=0,
         discard=False, pidfile=conf.DAEMON_PID_FILE, umask=0,
         uid=None, gid=None, working_directory=None,
         uid=None, gid=None, working_directory=None,
-        chroot=None, statistics=None, run_clockservice=False, **kwargs):
+        chroot=None, run_clockservice=False, events=False, **kwargs):
     """Starts the celery worker server."""
     """Starts the celery worker server."""
 
 
     print("Celery %s is starting." % __version__)
     print("Celery %s is starting." % __version__)
 
 
-    if statistics is not None:
-        settings.CELERY_STATISTICS = statistics
-
     if not concurrency:
     if not concurrency:
         concurrency = multiprocessing.cpu_count()
         concurrency = multiprocessing.cpu_count()
 
 
@@ -190,8 +183,8 @@ def run_worker(concurrency=conf.DAEMON_CONCURRENCY, detach=False,
             "concurrency": concurrency,
             "concurrency": concurrency,
             "loglevel": loglevel,
             "loglevel": loglevel,
             "pidfile": pidfile,
             "pidfile": pidfile,
-            "statistics": settings.CELERY_STATISTICS and "ON" or "OFF",
             "celerybeat": run_clockservice and "ON" or "OFF",
             "celerybeat": run_clockservice and "ON" or "OFF",
+            "events": events and "ON" or "OFF",
     })
     })
 
 
     print("Celery has started.")
     print("Celery has started.")
@@ -216,6 +209,7 @@ def run_worker(concurrency=conf.DAEMON_CONCURRENCY, detach=False,
                                 loglevel=loglevel,
                                 loglevel=loglevel,
                                 logfile=logfile,
                                 logfile=logfile,
                                 embed_clockservice=run_clockservice,
                                 embed_clockservice=run_clockservice,
+                                send_events=events,
                                 is_detached=detach)
                                 is_detached=detach)
 
 
         # Install signal handler that restarts celeryd on SIGHUP,
         # Install signal handler that restarts celeryd on SIGHUP,

+ 63 - 12
celery/conf.py

@@ -15,7 +15,6 @@ DEFAULT_LOG_FMT = '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s'
 DEFAULT_DAEMON_LOG_LEVEL = "WARN"
 DEFAULT_DAEMON_LOG_LEVEL = "WARN"
 DEFAULT_DAEMON_LOG_FILE = "celeryd.log"
 DEFAULT_DAEMON_LOG_FILE = "celeryd.log"
 DEFAULT_AMQP_CONNECTION_TIMEOUT = 4
 DEFAULT_AMQP_CONNECTION_TIMEOUT = 4
-DEFAULT_STATISTICS = False
 DEFAULT_ALWAYS_EAGER = False
 DEFAULT_ALWAYS_EAGER = False
 DEFAULT_TASK_RESULT_EXPIRES = timedelta(days=5)
 DEFAULT_TASK_RESULT_EXPIRES = timedelta(days=5)
 DEFAULT_AMQP_CONNECTION_RETRY = True
 DEFAULT_AMQP_CONNECTION_RETRY = True
@@ -27,6 +26,10 @@ DEFAULT_CELERYBEAT_PID_FILE = "celerybeat.pid"
 DEFAULT_CELERYBEAT_LOG_LEVEL = "INFO"
 DEFAULT_CELERYBEAT_LOG_LEVEL = "INFO"
 DEFAULT_CELERYBEAT_LOG_FILE = "celerybeat.log"
 DEFAULT_CELERYBEAT_LOG_FILE = "celerybeat.log"
 DEFAULT_CELERYBEAT_SCHEDULE_FILENAME = "celerybeat-schedule"
 DEFAULT_CELERYBEAT_SCHEDULE_FILENAME = "celerybeat-schedule"
+DEFAULT_CELERYMON_PID_FILE = "celerymon.pid"
+DEFAULT_CELERYMON_LOG_LEVEL = "INFO"
+DEFAULT_CELERYMON_LOG_FILE = "celerymon.log"
+DEFAULT_SEND_EVENTS = False
 
 
 
 
 """
 """
@@ -258,17 +261,6 @@ cache backend in ``CACHE_BACKEND`` will be used.
 """
 """
 CELERY_CACHE_BACKEND = getattr(settings, "CELERY_CACHE_BACKEND", None)
 CELERY_CACHE_BACKEND = getattr(settings, "CELERY_CACHE_BACKEND", None)
 
 
-"""
-
-.. data:: CELERYBEAT_PID_FILE
-
-Name of celerybeats pid file.
-Default is: ``celerybeat.pid``.
-
-"""
-CELERYBEAT_PID_FILE = getattr(settings, "CELERYBEAT_PID_FILE",
-                              DEFAULT_CELERYBEAT_PID_FILE)
-
 
 
 """
 """
 
 
@@ -293,6 +285,18 @@ DISABLE_RATE_LIMITS = getattr(settings, "CELERY_DISABLE_RATE_LIMITS",
 
 
 """
 """
 
 
+.. data:: CELERYBEAT_PID_FILE
+
+Name of celerybeats pid file.
+Default is: ``celerybeat.pid``.
+
+"""
+CELERYBEAT_PID_FILE = getattr(settings, "CELERYBEAT_PID_FILE",
+                              DEFAULT_CELERYBEAT_PID_FILE)
+
+
+"""
+
 .. data:: CELERYBEAT_LOG_LEVEL
 .. data:: CELERYBEAT_LOG_LEVEL
 
 
 Default log level for celerybeat.
 Default log level for celerybeat.
@@ -324,3 +328,50 @@ Default is: ``celerybeat-schedule``.
 CELERYBEAT_SCHEDULE_FILENAME = getattr(settings,
 CELERYBEAT_SCHEDULE_FILENAME = getattr(settings,
                                        "CELERYBEAT_SCHEDULE_FILENAME",
                                        "CELERYBEAT_SCHEDULE_FILENAME",
                                        DEFAULT_CELERYBEAT_SCHEDULE_FILENAME)
                                        DEFAULT_CELERYBEAT_SCHEDULE_FILENAME)
+
+"""
+
+.. data:: CELERYMON_PID_FILE
+
+Name of celerymons pid file.
+Default is: ``celerymon.pid``.
+
+"""
+CELERYMON_PID_FILE = getattr(settings, "CELERYMON_PID_FILE",
+                              DEFAULT_CELERYMON_PID_FILE)
+
+
+"""
+
+.. data:: CELERYMON_LOG_LEVEL
+
+Default log level for celerymon.
+Default is: ``INFO``.
+
+"""
+CELERYMON_LOG_LEVEL = getattr(settings, "CELERYMON_LOG_LEVEL",
+                               DEFAULT_CELERYMON_LOG_LEVEL)
+
+"""
+
+.. data:: CELERYMON_LOG_FILE
+
+Default log file for celerymon.
+Default is: ``celerymon.log``.
+
+"""
+CELERYMON_LOG_FILE = getattr(settings, "CELERYMON_LOG_FILE",
+                              DEFAULT_CELERYMON_LOG_FILE)
+
+
+"""
+
+.. data:: CELERY_SEND_EVENTS
+
+If set, celery will send events that can be captured by monitors like
+``celerymon``.
+Default is: ``False``.
+
+"""
+CELERY_SEND_EVENTS = getattr(settings, "CELERY_SEND_EVENTS",
+                             DEFAULT_SEND_EVENTS)

+ 93 - 0
celery/events.py

@@ -0,0 +1,93 @@
+import time
+import socket
+import threading
+
+from celery.messaging import EventPublisher, EventConsumer
+
+
+def Event(type, **fields):
+    return dict(fields, type=type, timestamp=time.time())
+
+
+class EventDispatcher(object):
+    """Send events as messages.
+
+    :param connection: Carrot connection.
+
+    :keyword hostname: Hostname to identify ourselves as,
+        by default uses the hostname returned by :func:`socket.gethostname`.
+
+    :keyword enabled: Set to ``False`` to not actually publish any events,
+        making :meth:`send` a noop operation.
+
+    You need to :meth:`close` this after use.
+
+    """
+
+    def __init__(self, connection, hostname=None, enabled=True):
+        self.connection = connection
+        self.publisher = EventPublisher(self.connection)
+        self.hostname = hostname or socket.gethostname()
+        self.enabled = enabled
+        self._lock = threading.Lock()
+
+    def send(self, type, **fields):
+        """Send event.
+
+        :param type: Kind of event.
+        :keyword \*\*fields: Event arguments.
+
+        """
+        if not self.enabled:
+            return
+        self._lock.acquire()
+        try:
+            fields["timestamp"] = time.time()
+            fields["hostname"] = self.hostname
+            self.publisher.send(Event(type, **fields))
+        finally:
+            self._lock.release()
+
+    def close(self):
+        """Close the event dispatcher."""
+        self._lock.locked() and self._lock.release()
+        self.publisher and self.publisher.close()
+
+
+class EventReceiver(object):
+    """Capture events.
+
+    :param connection: Carrot connection.
+    :keyword handlers: Event handlers.
+
+    :attr:`handlers`` is a dict of event types and their handlers,
+    the special handler ``"*`"`` captures all events that doesn't have a
+    handler.
+
+    """
+    handlers = {}
+
+    def __init__(self, connection, handlers=None):
+        self.connection = connection
+        if handlers is not None:
+            self.handlers = handlers
+
+    def process(self, type, event):
+        """Process the received event by dispatching it to the appropriate
+        handler."""
+        print("Received event: %s" % event)
+        handler = self.handlers.get(type) or self.handlers.get("*")
+        handler and handler(event)
+
+    def capture(self, limit=None):
+        """Open up a consumer capturing events. This has to be running
+        in the main process, and it will never stop unless forced"""
+        consumer = EventConsumer(self.connection)
+        consumer.register_callback(self._receive)
+        it = consumer.iterconsume(limit=limit)
+        while True:
+            it.next()
+
+    def _receive(self, message_data, message):
+        type = message_data.pop("type").lower()
+        self.process(type, Event(type, **message_data))

+ 30 - 47
celery/execute.py

@@ -11,29 +11,33 @@ from celery.conf import AMQP_CONNECTION_TIMEOUT
 from celery.utils import gen_unique_id, noop, fun_takes_kwargs
 from celery.utils import gen_unique_id, noop, fun_takes_kwargs
 from celery.result import AsyncResult, EagerResult
 from celery.result import AsyncResult, EagerResult
 from celery.registry import tasks
 from celery.registry import tasks
-from celery.messaging import TaskPublisher
+from celery.messaging import TaskPublisher, with_connection
 from celery.exceptions import RetryTaskError
 from celery.exceptions import RetryTaskError
 from celery.datastructures import ExceptionInfo
 from celery.datastructures import ExceptionInfo
 
 
+TASK_EXEC_OPTIONS = ("routing_key", "exchange",
+                     "immediate", "mandatory",
+                     "priority", "serializer")
+
 
 
 def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
 def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
-        routing_key=None, exchange=None, task_id=None,
-        immediate=None, mandatory=None, priority=None, connection=None,
-        connect_timeout=AMQP_CONNECTION_TIMEOUT, serializer=None, **opts):
+        task_id=None, publisher=None, connection=None, connect_timeout=None,
+        **options):
     """Run a task asynchronously by the celery daemon(s).
     """Run a task asynchronously by the celery daemon(s).
 
 
     :param task: The task to run (a callable object, or a :class:`Task`
     :param task: The task to run (a callable object, or a :class:`Task`
         instance
         instance
 
 
-    :param args: The positional arguments to pass on to the task (a ``list``).
+    :keyword args: The positional arguments to pass on to the
+        task (a ``list``).
 
 
-    :param kwargs: The keyword arguments to pass on to the task (a ``dict``)
+    :keyword kwargs: The keyword arguments to pass on to the task (a ``dict``)
 
 
-    :param countdown: Number of seconds into the future that the task should
+    :keyword countdown: Number of seconds into the future that the task should
         execute. Defaults to immediate delivery (Do not confuse that with
         execute. Defaults to immediate delivery (Do not confuse that with
         the ``immediate`` setting, they are unrelated).
         the ``immediate`` setting, they are unrelated).
 
 
-    :param eta: A :class:`datetime.datetime` object that describes the
+    :keyword eta: A :class:`datetime.datetime` object that describes the
         absolute time when the task should execute. May not be specified
         absolute time when the task should execute. May not be specified
         if ``countdown`` is also supplied. (Do not confuse this with the
         if ``countdown`` is also supplied. (Do not confuse this with the
         ``immediate`` setting, they are unrelated).
         ``immediate`` setting, they are unrelated).
@@ -70,50 +74,29 @@ def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
     replaced by a local :func:`apply` call instead.
     replaced by a local :func:`apply` call instead.
 
 
     """
     """
-    args = args or []
-    kwargs = kwargs or {}
-    routing_key = routing_key or getattr(task, "routing_key", None)
-    exchange = exchange or getattr(task, "exchange", None)
-    if immediate is None:
-        immediate = getattr(task, "immediate", None)
-    if mandatory is None:
-        mandatory = getattr(task, "mandatory", None)
-    if priority is None:
-        priority = getattr(task, "priority", None)
-    serializer = serializer or getattr(task, "serializer", None)
-    taskset_id = opts.get("taskset_id")
-    publisher = opts.get("publisher")
-    retries = opts.get("retries", 0)
-    if countdown:
-        eta = datetime.now() + timedelta(seconds=countdown)
-
     from celery.conf import ALWAYS_EAGER
     from celery.conf import ALWAYS_EAGER
     if ALWAYS_EAGER:
     if ALWAYS_EAGER:
         return apply(task, args, kwargs)
         return apply(task, args, kwargs)
 
 
-    need_to_close_connection = False
-    if not publisher:
-        if not connection:
-            connection = DjangoBrokerConnection(
-                            connect_timeout=connect_timeout)
-            need_to_close_connection = True
-        publisher = TaskPublisher(connection=connection)
-
-    delay_task = publisher.delay_task
-    if taskset_id:
-        delay_task = curry(publisher.delay_task_in_set, taskset_id)
-
-    task_id = delay_task(task.name, args, kwargs,
-                         task_id=task_id, retries=retries,
-                         routing_key=routing_key, exchange=exchange,
-                         mandatory=mandatory, immediate=immediate,
-                         serializer=serializer, priority=priority,
-                         eta=eta)
-
-    if need_to_close_connection:
-        publisher.close()
-        connection.close()
+    for option_name in TASK_EXEC_OPTIONS:
+        if option_name not in options:
+            options[option_name] = getattr(task, option_name, None)
+
+    if countdown: # Convert countdown to ETA.
+        eta = datetime.now() + timedelta(seconds=countdown)
 
 
+    def _delay_task(connection):
+        publish = publisher or TaskPublisher(connection)
+        try:
+            return publish.delay_task(task.name, args or [], kwargs or {},
+                                      task_id=task_id,
+                                      eta=eta,
+                                      **options)
+        finally:
+            publisher or publish.close()
+
+    task_id = with_connection(_delay_task, connection=connection,
+                                           connect_timeout=connect_timeout)
     return AsyncResult(task_id)
     return AsyncResult(task_id)
 
 
 
 

+ 0 - 23
celery/management/commands/celerystats.py

@@ -1,23 +0,0 @@
-"""
-
-Start the celery daemon from the Django management command.
-
-"""
-from django.core.management.base import BaseCommand
-
-from celery.monitoring import StatsCollector
-
-
-class Command(BaseCommand):
-    """Collect/flush and dump a report from the currently available
-    statistics."""
-    option_list = BaseCommand.option_list
-    help = "Collect/flush and dump a report from the currently available " + \
-            "statistics"
-
-    def handle(self, *args, **options):
-        """Handle the management command."""
-        stats = StatsCollector()
-        print("* Gathering statistics...")
-        stats.collect()
-        stats.report()

+ 39 - 23
celery/messaging.py

@@ -32,14 +32,7 @@ class TaskPublisher(Publisher):
         return self._delay_task(task_name=task_name, task_args=task_args,
         return self._delay_task(task_name=task_name, task_args=task_args,
                                 task_kwargs=task_kwargs, **kwargs)
                                 task_kwargs=task_kwargs, **kwargs)
 
 
-    def delay_task_in_set(self, taskset_id, task_name, task_args, task_kwargs,
-            **kwargs):
-        """Delay a task which part of a task set."""
-        return self._delay_task(task_name=task_name, part_of_set=taskset_id,
-                                task_args=task_args, task_kwargs=task_kwargs,
-                                **kwargs)
-
-    def _delay_task(self, task_name, task_id=None, part_of_set=None,
+    def _delay_task(self, task_name, task_id=None, taskset_id=None,
             task_args=None, task_kwargs=None, **kwargs):
             task_args=None, task_kwargs=None, **kwargs):
         """INTERNAL"""
         """INTERNAL"""
 
 
@@ -56,8 +49,8 @@ class TaskPublisher(Publisher):
             "eta": eta,
             "eta": eta,
         }
         }
 
 
-        if part_of_set:
-            message_data["taskset"] = part_of_set
+        if taskset_id:
+            message_data["taskset"] = taskset_id
 
 
         self.send(message_data, **extract_msg_options(kwargs))
         self.send(message_data, **extract_msg_options(kwargs))
         signals.task_sent.send(sender=task_name, **message_data)
         signals.task_sent.send(sender=task_name, **message_data)
@@ -79,19 +72,6 @@ class TaskConsumer(Consumer):
     no_ack = False
     no_ack = False
 
 
 
 
-class StatsPublisher(Publisher):
-    exchange = "celerygraph"
-    routing_key = "stats"
-
-
-class StatsConsumer(Consumer):
-    queue = "celerygraph"
-    exchange = "celerygraph"
-    routing_key = "stats"
-    exchange_type = "direct"
-    no_ack = True
-
-
 class EventPublisher(Publisher):
 class EventPublisher(Publisher):
     exchange = "celeryevent"
     exchange = "celeryevent"
     routing_key = "event"
     routing_key = "event"
@@ -105,6 +85,42 @@ class EventConsumer(Consumer):
     no_ack = True
     no_ack = True
 
 
 
 
+class BroadcastPublisher(Publisher):
+    exchange = "celeryctl"
+    exchange_type = "fanout"
+    routing_key = ""
+
+    def revoke(self, task_id):
+        self.send("revoke", dict(task_id=task_id))
+
+    def send(self, type, data):
+        data["command"] = type
+        super(BroadcastPublisher, self).send({"control": data})
+
+
+class BroadcastConsumer(Consumer):
+    queue = "celeryctl"
+    exchange = "celeryctl"
+    routing_key = ""
+    exchange_type = "fanout"
+    no_ack = True
+
+
+def establish_connection(connect_timeout=conf.AMQP_CONNECTION_TIMEOUT):
+    return DjangoBrokerConnection(connect_timeout=connect_timeout)
+
+
+def with_connection(fun, connection=None,
+        connect_timeout=conf.AMQP_CONNECTION_TIMEOUT):
+    conn = connection or establish_connection()
+    close_connection = not connection and conn.close or noop
+
+    try:
+        return fun(conn)
+    finally:
+        close_connection()
+
+
 def get_connection_info():
 def get_connection_info():
     broker_connection = DjangoBrokerConnection()
     broker_connection = DjangoBrokerConnection()
     carrot_backend = broker_connection.backend_cls
     carrot_backend = broker_connection.backend_cls

+ 0 - 238
celery/monitoring.py

@@ -1,238 +0,0 @@
-"""
-
-    Publishing Statistics and Monitoring Celery.
-
-"""
-import time
-
-from django.core.cache import cache
-from carrot.connection import DjangoBrokerConnection
-
-from celery.loaders import settings
-from celery.messaging import StatsPublisher, StatsConsumer
-
-DEFAULT_CACHE_KEY_PREFIX = "celery-statistics"
-
-
-class Statistics(object):
-    """Base class for classes publishing celery statistics.
-
-    .. attribute:: type
-
-        **REQUIRED** The type of statistics this class handles.
-
-    **Required handlers**
-
-        * on_start()
-
-        * on_stop()
-
-    """
-    type = None
-
-    def __init__(self, **kwargs):
-        self.enabled = getattr(settings, "CELERY_STATISTICS", False)
-        if not self.type:
-            raise NotImplementedError(
-                "Statistic classes must define their type.")
-
-    def publish(self, **data):
-        """Publish statistics to be collected later by
-        :class:`StatsCollector`.
-
-        :param data: An arbitrary Python object containing the statistics
-            to be published.
-
-        """
-        if not self.enabled:
-            return
-        connection = DjangoBrokerConnection()
-        publisher = StatsPublisher(connection=connection)
-        publisher.send({"type": self.type, "data": data})
-        publisher.close()
-        connection.close()
-
-    @classmethod
-    def start(cls, *args, **kwargs):
-        """Convenience method instantiating and running :meth:`run` in
-        one swoop."""
-        stat = cls()
-        stat.run(*args, **kwargs)
-        return stat
-
-    def run(self, *args, **kwargs):
-        """Start producing statistics."""
-        if self.enabled:
-            return self.on_start(*args, **kwargs)
-
-    def stop(self, *args, **kwargs):
-        """Stop producing and publish statistics."""
-        if self.enabled:
-            return self.on_finish(*args, **kwargs)
-
-    def on_start(self, *args, **kwargs):
-        """What to do when the :meth:`run` method is called."""
-        raise NotImplementedError(
-                "Statistics classes must define a on_start handler.")
-
-    def on_stop(self, *args, **kwargs):
-        """What to do when the :meth:`stop` method is called."""
-        raise NotImplementedError(
-                "Statistics classes must define a on_stop handler.")
-
-
-class TimerStats(Statistics):
-    """A generic timer producing ``celery`` statistics.
-
-    .. attribute:: time_start
-
-        The time when this class was instantiated (in :func:`time.time`
-        format.)
-
-    """
-    time_start = None
-
-    def on_start(self, task_id, task_name, args, kwargs):
-        """What to do when the timers :meth:`run` method is called."""
-        self.task_id = task_id
-        self.task_name = task_name
-        self.args = args
-        self.kwargs = kwargs
-        self.time_start = time.time()
-
-    def on_finish(self):
-        """What to do when the timers :meth:`stop` method is called.
-
-        :returns: the time in seconds it took between calling :meth:`start` on
-            this class and :meth:`stop`.
-        """
-        nsecs = time.time() - self.time_start
-        self.publish(task_id=self.task_id,
-                     task_name=self.task_name,
-                     args=self.args,
-                     kwargs=self.kwargs,
-                     nsecs=str(nsecs))
-        return nsecs
-
-
-class TaskTimerStats(TimerStats):
-    """Time a running :class:`celery.task.Task`."""
-    type = "task_time_running"
-
-
-class StatsCollector(object):
-    """Collect and report Celery statistics.
-
-    **NOTE**: Please run only one collector at any time, or your stats
-        will be skewed.
-
-    .. attribute:: total_tasks_processed
-
-        The number of tasks executed in total since the first time
-        :meth:`collect` was executed on this class instance.
-
-    .. attribute:: total_tasks_processed_by_type
-
-        A dictionary of task names and how many times they have been
-        executed in total since the first time :meth:`collect` was executed
-        on this class instance.
-
-    .. attribute:: total_task_time_running
-
-        The total time, in seconds, it took to process all the tasks executed
-        since the first time :meth:`collect` was executed on this class
-        instance.
-
-    .. attribute:: total_task_time_running_by_type
-
-        A dictionary of task names and their total running time in seconds,
-        counting all the tasks that has been run since the first time
-        :meth:`collect` was executed on this class instance.
-
-    **NOTE**: You have to run :meth:`collect` for these attributes
-        to be filled.
-
-
-    """
-
-    allowed_types = ["task_time_running"]
-
-    def __init__(self):
-        self.total_tasks_processed = 0
-        self.total_tasks_processed_by_type = {}
-        self.total_task_time_running = 0.0
-        self.total_task_time_running_by_type = {}
-
-    def collect(self):
-        """Collect any new statistics available since the last time
-        :meth:`collect` was executed."""
-        connection = DjangoBrokerConnection()
-        consumer = StatsConsumer(connection=connection)
-        it = consumer.iterqueue(infinite=False)
-        for message in it:
-            stats_entry = message.decode()
-            stat_type = stats_entry["type"]
-            if stat_type in self.allowed_types:
-                # Decode keys to unicode for use as kwargs.
-                data = dict((key.encode("utf-8"), value)
-                                for key, value in stats_entry["data"].items())
-                handler = getattr(self, stat_type)
-                handler(**data)
-
-    def dump_to_cache(self, cache_key_prefix=DEFAULT_CACHE_KEY_PREFIX):
-        """Store collected statistics in the cache."""
-        cache.set("%s-total_tasks_processed" % cache_key_prefix,
-                self.total_tasks_processed)
-        cache.set("%s-total_tasks_processed_by_type" % cache_key_prefix,
-                    self.total_tasks_processed_by_type)
-        cache.set("%s-total_task_time_running" % cache_key_prefix,
-                    self.total_task_time_running)
-        cache.set("%s-total_task_time_running_by_type" % cache_key_prefix,
-                    self.total_task_time_running_by_type)
-
-    def task_time_running(self, task_id, task_name, args, kwargs, nsecs):
-        """Process statistics regarding how long a task has been running
-        (the :class:TaskTimerStats` class is responsible for sending these).
-
-        :param task_id: The UUID of the task.
-        :param task_name: The name of task.
-        :param args: The tasks positional arguments.
-        :param kwargs: The tasks keyword arguments.
-        :param nsecs: The number of seconds (in :func:`time.time` format)
-            it took to execute the task.
-
-        """
-        nsecs = float(nsecs)
-        self.total_tasks_processed += 1
-        self.total_task_time_running += nsecs
-        if task_name not in self.total_task_time_running_by_type:
-            self.total_task_time_running_by_type[task_name] = nsecs
-        else:
-            self.total_task_time_running_by_type[task_name] += nsecs
-        if task_name not in self.total_tasks_processed_by_type:
-            self.total_tasks_processed_by_type[task_name] = 1
-        else:
-            self.total_tasks_processed_by_type[task_name] += 1
-
-    def report(self):
-        """Dump a nice statistics report from the data collected since
-        the first time :meth:`collect` was executed on this instance.
-
-        It outputs the following information:
-
-            * Total processing time by task type and how many times each
-                task has been excuted.
-
-            * Total task processing time.
-
-            * Total number of tasks executed
-
-        """
-        print("Total processing time by task type:")
-        for task_name, nsecs in self.total_task_time_running_by_type.items():
-            print("\t%s: %s secs. (for a total of %d executed.)" % (
-                    task_name, nsecs,
-                    self.total_tasks_processed_by_type.get(task_name)))
-        print("Total task processing time: %s secs." % (
-            self.total_task_time_running))
-        print("Total tasks processed: %d" % self.total_tasks_processed)

+ 9 - 0
celery/platform.py

@@ -1,6 +1,10 @@
 import os
 import os
 import sys
 import sys
 import signal
 import signal
+try:
+    from setproctitle import setproctitle as _setproctitle
+except ImportError:
+    _setproctitle = None
 
 
 
 
 CAN_DETACH = True
 CAN_DETACH = True
@@ -90,3 +94,8 @@ def install_signal_handler(signal_name, handler):
 
 
     signum = getattr(signal, signal_name)
     signum = getattr(signal, signal_name)
     signal.signal(signum, handler)
     signal.signal(signum, handler)
+
+
+def set_process_title(title):
+    if _setproctitle is not None:
+        _setproctitle(title)

+ 19 - 1
celery/task/__init__.py

@@ -10,7 +10,7 @@ from celery.conf import AMQP_CONNECTION_TIMEOUT
 from celery.execute import apply_async
 from celery.execute import apply_async
 from celery.registry import tasks
 from celery.registry import tasks
 from celery.backends import default_backend
 from celery.backends import default_backend
-from celery.messaging import TaskConsumer
+from celery.messaging import TaskConsumer, BroadcastPublisher, with_connection
 from celery.task.base import Task, TaskSet, PeriodicTask
 from celery.task.base import Task, TaskSet, PeriodicTask
 from celery.task.base import ExecuteRemoteTask, AsynchronousMapTask
 from celery.task.base import ExecuteRemoteTask, AsynchronousMapTask
 from celery.task.rest import RESTProxyTask
 from celery.task.rest import RESTProxyTask
@@ -35,6 +35,24 @@ def discard_all(connect_timeout=AMQP_CONNECTION_TIMEOUT):
     return discarded_count
     return discarded_count
 
 
 
 
+def revoke(task_id, connection=None, connect_timeout=None):
+    """Revoke a task by id.
+
+    Revoked tasks will not be executed after all.
+
+    """
+
+    def _revoke(connection):
+        broadcast = BroadcastPublisher(connection)
+        try:
+            broadcast.revoke(task_id)
+        finally:
+            broadcast.close()
+
+    return with_connection(_revoke, connection=connection,
+                           connect_timeout=connect_timeout)
+
+
 def is_successful(task_id):
 def is_successful(task_id):
     """Returns ``True`` if task with ``task_id`` has been executed.
     """Returns ``True`` if task with ``task_id`` has been executed.
 
 

+ 0 - 96
celery/tests/test_monitoring.py

@@ -1,96 +0,0 @@
-from __future__ import with_statement
-import unittest
-import time
-from celery.monitoring import TaskTimerStats, Statistics, StatsCollector
-from carrot.connection import DjangoBrokerConnection
-from celery.messaging import StatsConsumer
-from celery.tests.utils import override_stdouts
-
-
-class PartialStatistics(Statistics):
-    type = "c.u.partial"
-
-
-class TestStatisticsInterface(unittest.TestCase):
-
-    def test_must_have_type(self):
-        self.assertRaises(NotImplementedError, Statistics)
-
-    def test_must_have_on_start(self):
-        self.assertRaises(NotImplementedError, PartialStatistics().on_start)
-
-    def test_must_have_on_stop(self):
-        self.assertRaises(NotImplementedError, PartialStatistics().on_stop)
-
-
-class TestTaskTimerStats(unittest.TestCase):
-
-    def test_time(self):
-        self.assertTimeElapsed(0.5, 1, 0, "0.5")
-        self.assertTimeElapsed(0.002, 0.05, 0, "0.0")
-        self.assertTimeElapsed(0.1, 0.5, 0, "0.1")
-
-    def test_not_enabled(self):
-        t = TaskTimerStats()
-        t.enabled = False
-        self.assertFalse(t.publish(isnot="enabled"))
-        self.assertFalse(getattr(t, "time_start", None))
-        t.run("foo", "bar", [], {})
-        t.stop()
-
-    def assertTimeElapsed(self, time_sleep, max_appx, min_appx, appx):
-        t = TaskTimerStats()
-        t.enabled = True
-        t.run("foo", "bar", [], {})
-        self.assertTrue(t.time_start)
-        time.sleep(time_sleep)
-        time_stop = t.stop()
-        self.assertTrue(time_stop)
-        self.assertFalse(time_stop > max_appx)
-        self.assertFalse(time_stop <= min_appx)
-
-        strstop = str(time_stop)[0:3]
-        # Time elapsed is approximately 0.1 seconds.
-        self.assertTrue(strstop == appx)
-
-
-class TestStatsCollector(unittest.TestCase):
-
-    def setUp(self):
-        conn = DjangoBrokerConnection()
-        consumer = StatsConsumer(connection=conn)
-        consumer.discard_all()
-        conn.close()
-        consumer.close()
-        self.s = StatsCollector()
-        self.assertEquals(self.s.total_tasks_processed, 0)
-        self.assertEquals(self.s.total_tasks_processed_by_type, {})
-        self.assertEquals(self.s.total_task_time_running, 0.0)
-        self.assertEquals(self.s.total_task_time_running_by_type, {})
-
-    def test_collect_report_dump(self):
-        timer1 = TaskTimerStats()
-        timer1.enabled = True
-        timer1.run("foo", "bar", [], {})
-        timer2 = TaskTimerStats()
-        timer2.enabled = True
-        timer2.run("foo", "bar", [], {})
-        timer3 = TaskTimerStats()
-        timer3.enabled = True
-        timer3.run("foo", "bar", [], {})
-        for timer in (timer1, timer2, timer3):
-            timer.stop()
-
-        # Collect
-        self.s.collect()
-        self.assertEquals(self.s.total_tasks_processed, 3)
-
-        # Report
-        with override_stdouts() as outs:
-            stdout, stderr = outs
-            self.s.report()
-            self.assertTrue(
-                "Total processing time by task type:" in stdout.getvalue())
-
-        # Dump to cache
-        self.s.dump_to_cache()

+ 1 - 1
celery/tests/test_task.py

@@ -8,7 +8,7 @@ from celery.result import EagerResult
 from celery.backends import default_backend
 from celery.backends import default_backend
 from datetime import datetime, timedelta
 from datetime import datetime, timedelta
 from celery.decorators import task as task_dec
 from celery.decorators import task as task_dec
-from celery.worker import parse_iso8601
+from celery.worker.listener import parse_iso8601
 
 
 def return_True(*args, **kwargs):
 def return_True(*args, **kwargs):
     # Task run functions can't be closures/lambdas, as they're pickled.
     # Task run functions can't be closures/lambdas, as they're pickled.

+ 20 - 4
celery/tests/test_worker.py

@@ -15,6 +15,15 @@ from celery.worker.scheduler import Scheduler
 from celery.decorators import task as task_dec
 from celery.decorators import task as task_dec
 
 
 
 
+class MockEventDispatcher(object):
+
+    def send(self, *args, **kwargs):
+        pass
+
+    def close(self):
+        pass
+
+
 @task_dec()
 @task_dec()
 def foo_task(x, y, z, **kwargs):
 def foo_task(x, y, z, **kwargs):
     return x * y * z
     return x * y * z
@@ -92,7 +101,8 @@ class TestCarrotListener(unittest.TestCase):
         self.logger.setLevel(0)
         self.logger.setLevel(0)
 
 
     def test_connection(self):
     def test_connection(self):
-        l = CarrotListener(self.ready_queue, self.eta_scheduler, self.logger)
+        l = CarrotListener(self.ready_queue, self.eta_scheduler, self.logger,
+                           send_events=False)
 
 
         c = l.reset_connection()
         c = l.reset_connection()
         self.assertTrue(isinstance(l.amqp_connection, BrokerConnection))
         self.assertTrue(isinstance(l.amqp_connection, BrokerConnection))
@@ -109,11 +119,13 @@ class TestCarrotListener(unittest.TestCase):
         self.assertTrue(l.task_consumer is None)
         self.assertTrue(l.task_consumer is None)
 
 
     def test_receieve_message(self):
     def test_receieve_message(self):
-        l = CarrotListener(self.ready_queue, self.eta_scheduler, self.logger)
+        l = CarrotListener(self.ready_queue, self.eta_scheduler, self.logger,
+                           send_events=False)
         backend = MockBackend()
         backend = MockBackend()
         m = create_message(backend, task=foo_task.name,
         m = create_message(backend, task=foo_task.name,
                            args=[2, 4, 8], kwargs={})
                            args=[2, 4, 8], kwargs={})
 
 
+        l.event_dispatcher = MockEventDispatcher()
         l.receive_message(m.decode(), m)
         l.receive_message(m.decode(), m)
 
 
         in_bucket = self.ready_queue.get_nowait()
         in_bucket = self.ready_queue.get_nowait()
@@ -123,22 +135,26 @@ class TestCarrotListener(unittest.TestCase):
         self.assertTrue(self.eta_scheduler.empty())
         self.assertTrue(self.eta_scheduler.empty())
 
 
     def test_receieve_message_not_registered(self):
     def test_receieve_message_not_registered(self):
-        l = CarrotListener(self.ready_queue, self.eta_scheduler, self.logger)
+        l = CarrotListener(self.ready_queue, self.eta_scheduler, self.logger,
+                          send_events=False)
         backend = MockBackend()
         backend = MockBackend()
         m = create_message(backend, task="x.X.31x", args=[2, 4, 8], kwargs={})
         m = create_message(backend, task="x.X.31x", args=[2, 4, 8], kwargs={})
 
 
+        l.event_dispatcher = MockEventDispatcher()
         self.assertFalse(l.receive_message(m.decode(), m))
         self.assertFalse(l.receive_message(m.decode(), m))
         self.assertRaises(Empty, self.ready_queue.get_nowait)
         self.assertRaises(Empty, self.ready_queue.get_nowait)
         self.assertTrue(self.eta_scheduler.empty())
         self.assertTrue(self.eta_scheduler.empty())
 
 
     def test_receieve_message_eta(self):
     def test_receieve_message_eta(self):
-        l = CarrotListener(self.ready_queue, self.eta_scheduler, self.logger)
+        l = CarrotListener(self.ready_queue, self.eta_scheduler, self.logger,
+                          send_events=False)
         backend = MockBackend()
         backend = MockBackend()
         m = create_message(backend, task=foo_task.name,
         m = create_message(backend, task=foo_task.name,
                            args=[2, 4, 8], kwargs={},
                            args=[2, 4, 8], kwargs={},
                            eta=(datetime.now() +
                            eta=(datetime.now() +
                                timedelta(days=1)).isoformat())
                                timedelta(days=1)).isoformat())
 
 
+        l.reset_connection()
         l.receive_message(m.decode(), m)
         l.receive_message(m.decode(), m)
 
 
         in_hold = self.eta_scheduler.queue[0]
         in_hold = self.eta_scheduler.queue[0]

+ 17 - 162
celery/worker/__init__.py

@@ -5,176 +5,17 @@ The Multiprocessing Worker Server
 """
 """
 import traceback
 import traceback
 import logging
 import logging
-import socket
 from Queue import Queue
 from Queue import Queue
-from datetime import datetime
-
-from dateutil.parser import parse as parse_iso8601
-from carrot.connection import DjangoBrokerConnection, AMQPConnectionException
 
 
 from celery import conf
 from celery import conf
 from celery import registry
 from celery import registry
 from celery.log import setup_logger
 from celery.log import setup_logger
 from celery.beat import ClockServiceThread
 from celery.beat import ClockServiceThread
-from celery.utils import retry_over_time
 from celery.worker.pool import TaskPool
 from celery.worker.pool import TaskPool
-from celery.worker.job import TaskWrapper
+from celery.worker.buckets import TaskBucket
+from celery.worker.listener import CarrotListener
 from celery.worker.scheduler import Scheduler
 from celery.worker.scheduler import Scheduler
 from celery.worker.controllers import Mediator, ScheduleController
 from celery.worker.controllers import Mediator, ScheduleController
-from celery.worker.buckets import TaskBucket
-from celery.messaging import get_consumer_set
-from celery.exceptions import NotRegistered
-from celery.datastructures import SharedCounter
-
-
-class CarrotListener(object):
-    """Listen for messages received from the AMQP broker and
-    move them the the bucket queue for task processing.
-
-    :param ready_queue: See :attr:`ready_queue`.
-    :param eta_scheduler: See :attr:`eta_scheduler`.
-
-    .. attribute:: ready_queue
-
-        The queue that holds tasks ready for processing immediately.
-
-    .. attribute:: eta_scheduler
-
-        Scheduler for paused tasks. Reasons for being paused include
-        a countdown/eta or that it's waiting for retry.
-
-    .. attribute:: logger
-
-        The logger used.
-
-    """
-
-    def __init__(self, ready_queue, eta_scheduler, logger,
-            initial_prefetch_count=2):
-        self.amqp_connection = None
-        self.task_consumer = None
-        self.ready_queue = ready_queue
-        self.eta_scheduler = eta_scheduler
-        self.logger = logger
-        self.prefetch_count = SharedCounter(initial_prefetch_count)
-
-    def start(self):
-        """Start the consumer.
-
-        If the connection is lost, it tries to re-establish the connection
-        over time and restart consuming messages.
-
-        """
-
-        while True:
-            self.reset_connection()
-            try:
-                self.consume_messages()
-            except (socket.error, AMQPConnectionException, IOError):
-                self.logger.error("CarrotListener: Connection to broker lost."
-                                + " Trying to re-establish connection...")
-
-    def consume_messages(self):
-        """Consume messages forever (or until an exception is raised)."""
-        task_consumer = self.task_consumer
-
-        self.logger.debug("CarrotListener: Starting message consumer...")
-        it = task_consumer.iterconsume(limit=None)
-
-        self.logger.debug("CarrotListener: Ready to accept tasks!")
-
-        prev_pcount = None
-        while True:
-            if not prev_pcount or int(self.prefetch_count) != prev_pcount:
-                self.task_consumer.qos(prefetch_count=int(self.prefetch_count))
-                prev_pcount = int(self.prefetch_count)
-            it.next()
-
-    def stop(self):
-        """Stop processing AMQP messages and close the connection
-        to the broker."""
-        self.close_connection()
-
-    def receive_message(self, message_data, message):
-        """The callback called when a new message is received.
-
-        If the message has an ``eta`` we move it to the hold queue,
-        otherwise we move it the bucket queue for immediate processing.
-
-        """
-        try:
-            task = TaskWrapper.from_message(message, message_data,
-                                            logger=self.logger)
-        except NotRegistered, exc:
-            self.logger.error("Unknown task ignored: %s" % (exc))
-            return
-
-        eta = message_data.get("eta")
-        if eta:
-            if not isinstance(eta, datetime):
-                eta = parse_iso8601(eta)
-            self.prefetch_count.increment()
-            self.logger.info("Got task from broker: %s[%s] eta:[%s]" % (
-                    task.task_name, task.task_id, eta))
-            self.eta_scheduler.enter(task,
-                                     eta=eta,
-                                     callback=self.prefetch_count.decrement)
-        else:
-            self.logger.info("Got task from broker: %s[%s]" % (
-                    task.task_name, task.task_id))
-            self.ready_queue.put(task)
-
-    def close_connection(self):
-        """Close the AMQP connection."""
-        if self.task_consumer:
-            self.task_consumer.close()
-            self.task_consumer = None
-        if self.amqp_connection:
-            self.logger.debug(
-                    "CarrotListener: Closing connection to the broker...")
-            self.amqp_connection.close()
-            self.amqp_connection = None
-
-    def reset_connection(self):
-        """Reset the AMQP connection, and reinitialize the
-        :class:`carrot.messaging.ConsumerSet` instance.
-
-        Resets the task consumer in :attr:`task_consumer`.
-
-        """
-        self.logger.debug(
-                "CarrotListener: Re-establishing connection to the broker...")
-        self.close_connection()
-        self.amqp_connection = self._open_connection()
-        self.task_consumer = get_consumer_set(connection=self.amqp_connection)
-        self.task_consumer.register_callback(self.receive_message)
-
-    def _open_connection(self):
-        """Retries connecting to the AMQP broker over time.
-
-        See :func:`celery.utils.retry_over_time`.
-
-        """
-
-        def _connection_error_handler(exc, interval):
-            """Callback handler for connection errors."""
-            self.logger.error("AMQP Listener: Connection Error: %s. " % exc
-                     + "Trying again in %d seconds..." % interval)
-
-        def _establish_connection():
-            """Establish a connection to the AMQP broker."""
-            conn = DjangoBrokerConnection()
-            connected = conn.connection # Connection is established lazily.
-            return conn
-
-        if not conf.AMQP_CONNECTION_RETRY:
-            return _establish_connection()
-
-        conn = retry_over_time(_establish_connection, (socket.error, IOError),
-                               errback=_connection_error_handler,
-                               max_retries=conf.AMQP_CONNECTION_MAX_RETRIES)
-        self.logger.debug("CarrotListener: Connection Established.")
-        return conn
 
 
 
 
 class WorkController(object):
 class WorkController(object):
@@ -183,6 +24,8 @@ class WorkController(object):
     :param concurrency: see :attr:`concurrency`.
     :param concurrency: see :attr:`concurrency`.
     :param logfile: see :attr:`logfile`.
     :param logfile: see :attr:`logfile`.
     :param loglevel: see :attr:`loglevel`.
     :param loglevel: see :attr:`loglevel`.
+    :param embed_clockservice: see :attr:`run_clockservice`.
+    :param send_events: see :attr:`send_events`.
 
 
 
 
     .. attribute:: concurrency
     .. attribute:: concurrency
@@ -199,6 +42,16 @@ class WorkController(object):
         The logfile used, if no logfile is specified it uses ``stderr``
         The logfile used, if no logfile is specified it uses ``stderr``
         (default: :const:`celery.conf.DAEMON_LOG_FILE`).
         (default: :const:`celery.conf.DAEMON_LOG_FILE`).
 
 
+    .. attribute:: embed_clockservice
+
+        If ``True``, celerybeat is embedded, running in the main worker
+        process as a thread.
+
+    .. attribute:: send_events
+
+        Enable the sending of monitoring events, these events can be captured
+        by monitors (celerymon).
+
     .. attribute:: logger
     .. attribute:: logger
 
 
         The :class:`logging.Logger` instance used for logging.
         The :class:`logging.Logger` instance used for logging.
@@ -241,6 +94,7 @@ class WorkController(object):
     _state = None
     _state = None
 
 
     def __init__(self, concurrency=None, logfile=None, loglevel=None,
     def __init__(self, concurrency=None, logfile=None, loglevel=None,
+            send_events=conf.CELERY_SEND_EVENTS,
             is_detached=False, embed_clockservice=False):
             is_detached=False, embed_clockservice=False):
 
 
         # Options
         # Options
@@ -250,6 +104,7 @@ class WorkController(object):
         self.is_detached = is_detached
         self.is_detached = is_detached
         self.logger = setup_logger(loglevel, logfile)
         self.logger = setup_logger(loglevel, logfile)
         self.embed_clockservice = embed_clockservice
         self.embed_clockservice = embed_clockservice
+        self.send_events = send_events
 
 
         # Queues
         # Queues
         if conf.DISABLE_RATE_LIMITS:
         if conf.DISABLE_RATE_LIMITS:
@@ -266,6 +121,7 @@ class WorkController(object):
         self.broker_listener = CarrotListener(self.ready_queue,
         self.broker_listener = CarrotListener(self.ready_queue,
                                         self.eta_scheduler,
                                         self.eta_scheduler,
                                         logger=self.logger,
                                         logger=self.logger,
+                                        send_events=send_events,
                                         initial_prefetch_count=concurrency)
                                         initial_prefetch_count=concurrency)
         self.mediator = Mediator(self.ready_queue, self.safe_process_task)
         self.mediator = Mediator(self.ready_queue, self.safe_process_task)
 
 
@@ -314,7 +170,6 @@ class WorkController(object):
 
 
     def stop(self):
     def stop(self):
         """Gracefully shutdown the worker server."""
         """Gracefully shutdown the worker server."""
-        # shut down the periodic work controller thread
         if self._state != "RUN":
         if self._state != "RUN":
             return
             return
 
 

+ 8 - 1
celery/worker/controllers.py

@@ -9,6 +9,7 @@ from Queue import Empty as QueueEmpty
 from datetime import datetime
 from datetime import datetime
 
 
 from celery.log import get_default_logger
 from celery.log import get_default_logger
+from celery.worker.revoke import revoked
 
 
 
 
 class BackgroundThread(threading.Thread):
 class BackgroundThread(threading.Thread):
@@ -91,9 +92,15 @@ class Mediator(BackgroundThread):
         except QueueEmpty:
         except QueueEmpty:
             time.sleep(1)
             time.sleep(1)
         else:
         else:
+            if task.task_id in revoked: # task revoked
+                task.on_ack()
+                logger.warn("Mediator: Skipping revoked task: %s[%s]" % (
+                    task.task_name, task.task_id))
+                return
+
             logger.debug("Mediator: Running callback for task: %s[%s]" % (
             logger.debug("Mediator: Running callback for task: %s[%s]" % (
                 task.task_name, task.task_id))
                 task.task_name, task.task_id))
-            self.callback(task)
+            self.callback(task) # execute
 
 
 
 
 class ScheduleController(BackgroundThread):
 class ScheduleController(BackgroundThread):

+ 50 - 0
celery/worker/heartbeat.py

@@ -0,0 +1,50 @@
+import threading
+from time import time, sleep
+
+
+class Heart(threading.Thread):
+    interval = 60
+
+    def __init__(self, eventer, interval=None):
+        super(Heart, self).__init__()
+        self.eventer = eventer
+        self.interval = interval or self.interval
+        self._shutdown = threading.Event()
+        self._stopped = threading.Event()
+        self.setDaemon(True)
+        self._state = None
+
+    def run(self):
+        self._state = "RUN"
+        interval = self.interval
+        dispatch = self.eventer.send
+
+        dispatch("worker-online")
+
+
+        # We can't sleep all of the interval, because then
+        # it takes 60 seconds (or value of interval) to shutdown
+        # the thread.
+
+        last_beat = None
+        while 1:
+            if self._shutdown.isSet():
+                break
+            now = time()
+            if not last_beat or now > last_beat + interval:
+                last_beat = now
+                dispatch("worker-heartbeat")
+            sleep(1)
+
+        try:
+            dispatch("worker-offline")
+        finally:
+            self._stopped.set()
+
+    def stop(self):
+        """Gracefully shutdown the thread."""
+        if not self._state == "RUN":
+            return
+        self._state = "CLOSE"
+        self._shutdown.set()
+        self._stopped.wait() # block until this thread is done

+ 25 - 11
celery/worker/job.py

@@ -4,6 +4,7 @@ Jobs Executable by the Worker Server.
 
 
 """
 """
 import sys
 import sys
+import time
 import socket
 import socket
 import warnings
 import warnings
 
 
@@ -15,7 +16,6 @@ from celery.loaders import current_loader
 from celery.execute import TaskTrace
 from celery.execute import TaskTrace
 from celery.registry import tasks
 from celery.registry import tasks
 from celery.exceptions import NotRegistered
 from celery.exceptions import NotRegistered
-from celery.monitoring import TaskTimerStats
 from celery.datastructures import ExceptionInfo
 from celery.datastructures import ExceptionInfo
 
 
 # pep8.py borks on a inline signature separator and
 # pep8.py borks on a inline signature separator and
@@ -88,12 +88,7 @@ class WorkerTaskTrace(TaskTrace):
         # Backend process cleanup
         # Backend process cleanup
         self.task.backend.process_cleanup()
         self.task.backend.process_cleanup()
 
 
-        timer_stat = TaskTimerStats.start(self.task_id, self.task_name,
-                                          self.args, self.kwargs)
-        try:
-            return self._trace()
-        finally:
-            timer_stat.stop()
+        return self._trace()
 
 
     def handle_success(self, retval, *args):
     def handle_success(self, retval, *args):
         """Handle successful execution.
         """Handle successful execution.
@@ -181,9 +176,11 @@ class TaskWrapper(object):
         self.retries = retries
         self.retries = retries
         self.args = args
         self.args = args
         self.kwargs = kwargs
         self.kwargs = kwargs
-        self.logger = kwargs.get("logger")
+        self.logger = opts.get("logger")
+        self.eventer = opts.get("eventer")
         self.on_ack = on_ack
         self.on_ack = on_ack
         self.executed = False
         self.executed = False
+        self.time_start = None
         for opt in ("success_msg", "fail_msg", "fail_email_subject",
         for opt in ("success_msg", "fail_msg", "fail_email_subject",
                 "fail_email_body"):
                 "fail_email_body"):
             setattr(self, opt, opts.get(opt, getattr(self, opt, None)))
             setattr(self, opt, opts.get(opt, getattr(self, opt, None)))
@@ -200,7 +197,7 @@ class TaskWrapper(object):
                 self.args, self.kwargs)
                 self.args, self.kwargs)
 
 
     @classmethod
     @classmethod
-    def from_message(cls, message, message_data, logger=None):
+    def from_message(cls, message, message_data, logger=None, eventer=None):
         """Create a :class:`TaskWrapper` from a task message sent by
         """Create a :class:`TaskWrapper` from a task message sent by
         :class:`celery.messaging.TaskPublisher`.
         :class:`celery.messaging.TaskPublisher`.
 
 
@@ -221,7 +218,8 @@ class TaskWrapper(object):
                         for key, value in kwargs.items())
                         for key, value in kwargs.items())
 
 
         return cls(task_name, task_id, args, kwargs,
         return cls(task_name, task_id, args, kwargs,
-                    retries=retries, on_ack=message.ack, logger=logger)
+                    retries=retries, on_ack=message.ack,
+                    logger=logger, eventer=eventer)
 
 
     def extend_with_default_kwargs(self, loglevel, logfile):
     def extend_with_default_kwargs(self, loglevel, logfile):
         """Extend the tasks keyword arguments with standard task arguments.
         """Extend the tasks keyword arguments with standard task arguments.
@@ -275,6 +273,10 @@ class TaskWrapper(object):
         tracer = WorkerTaskTrace(*self._get_tracer_args(loglevel, logfile))
         tracer = WorkerTaskTrace(*self._get_tracer_args(loglevel, logfile))
         return tracer.execute()
         return tracer.execute()
 
 
+    def send_event(self, type, **fields):
+        if self.eventer:
+            self.eventer.send(type, **fields)
+
     def execute_using_pool(self, pool, loglevel=None, logfile=None):
     def execute_using_pool(self, pool, loglevel=None, logfile=None):
         """Like :meth:`execute`, but using the :mod:`multiprocessing` pool.
         """Like :meth:`execute`, but using the :mod:`multiprocessing` pool.
 
 
@@ -290,7 +292,10 @@ class TaskWrapper(object):
         # Make sure task has not already been executed.
         # Make sure task has not already been executed.
         self._set_executed_bit()
         self._set_executed_bit()
 
 
+        self.send_event("task-accepted", uuid=self.task_id)
+
         args = self._get_tracer_args(loglevel, logfile)
         args = self._get_tracer_args(loglevel, logfile)
+        self.time_start = time.time()
         return pool.apply_async(execute_and_trace, args=args,
         return pool.apply_async(execute_and_trace, args=args,
                 callbacks=[self.on_success], errbacks=[self.on_failure],
                 callbacks=[self.on_success], errbacks=[self.on_failure],
                 on_ack=self.on_ack)
                 on_ack=self.on_ack)
@@ -298,6 +303,11 @@ class TaskWrapper(object):
     def on_success(self, ret_value):
     def on_success(self, ret_value):
         """The handler used if the task was successfully processed (
         """The handler used if the task was successfully processed (
         without raising an exception)."""
         without raising an exception)."""
+
+        runtime = time.time() - self.time_start
+        self.send_event("task-succeeded", uuid=self.task_id,
+                        result=ret_value, runtime=runtime)
+
         msg = self.success_msg.strip() % {
         msg = self.success_msg.strip() % {
                 "id": self.task_id,
                 "id": self.task_id,
                 "name": self.task_name,
                 "name": self.task_name,
@@ -308,6 +318,10 @@ class TaskWrapper(object):
         """The handler used if the task raised an exception."""
         """The handler used if the task raised an exception."""
         from celery.conf import SEND_CELERY_TASK_ERROR_EMAILS
         from celery.conf import SEND_CELERY_TASK_ERROR_EMAILS
 
 
+        self.send_event("task-failed", uuid=self.task_id,
+                                       exception=exc_info.exception,
+                                       traceback=exc_info.traceback)
+
         context = {
         context = {
             "hostname": socket.gethostname(),
             "hostname": socket.gethostname(),
             "id": self.task_id,
             "id": self.task_id,
@@ -321,7 +335,7 @@ class TaskWrapper(object):
 
 
         task_obj = tasks.get(self.task_name, object)
         task_obj = tasks.get(self.task_name, object)
         send_error_email = SEND_CELERY_TASK_ERROR_EMAILS and not \
         send_error_email = SEND_CELERY_TASK_ERROR_EMAILS and not \
-                getattr(task_obj, "disable_error_emails", False)
+                                task_obj.disable_error_emails
         if send_error_email:
         if send_error_email:
             subject = self.fail_email_subject.strip() % context
             subject = self.fail_email_subject.strip() % context
             body = self.fail_email_body.strip() % context
             body = self.fail_email_body.strip() % context

+ 211 - 0
celery/worker/listener.py

@@ -0,0 +1,211 @@
+import socket
+from datetime import datetime
+
+from dateutil.parser import parse as parse_iso8601
+from carrot.connection import DjangoBrokerConnection, AMQPConnectionException
+
+from celery import conf
+from celery.utils import retry_over_time
+from celery.worker.job import TaskWrapper
+from celery.worker.revoke import revoked
+from celery.worker.heartbeat import Heart
+from celery.events import EventDispatcher
+from celery.messaging import get_consumer_set, BroadcastConsumer
+from celery.exceptions import NotRegistered
+from celery.datastructures import SharedCounter
+
+RUN = 0x0
+CLOSE = 0x1
+
+
+class CarrotListener(object):
+    """Listen for messages received from the broker and
+    move them the the ready queue for task processing.
+
+    :param ready_queue: See :attr:`ready_queue`.
+    :param eta_scheduler: See :attr:`eta_scheduler`.
+
+    .. attribute:: ready_queue
+
+        The queue that holds tasks ready for processing immediately.
+
+    .. attribute:: eta_scheduler
+
+        Scheduler for paused tasks. Reasons for being paused include
+        a countdown/eta or that it's waiting for retry.
+
+    .. attribute:: logger
+
+        The logger used.
+
+    """
+
+    def __init__(self, ready_queue, eta_scheduler, logger,
+            send_events=False, initial_prefetch_count=2):
+        self.amqp_connection = None
+        self.task_consumer = None
+        self.ready_queue = ready_queue
+        self.eta_scheduler = eta_scheduler
+        self.send_events = send_events
+        self.logger = logger
+        self.prefetch_count = SharedCounter(initial_prefetch_count)
+        self.event_dispatcher = None
+        self.heart = None
+        self._state = None
+
+    def start(self):
+        """Start the consumer.
+
+        If the connection is lost, it tries to re-establish the connection
+        over time and restart consuming messages.
+
+        """
+
+        while 1:
+            self.reset_connection()
+            try:
+                self.consume_messages()
+            except (socket.error, AMQPConnectionException, IOError):
+                self.logger.error("CarrotListener: Connection to broker lost."
+                                + " Trying to re-establish connection...")
+
+    def consume_messages(self):
+        """Consume messages forever (or until an exception is raised)."""
+        task_consumer = self.task_consumer
+
+        self.logger.debug("CarrotListener: Starting message consumer...")
+        wait_for_message = task_consumer.iterconsume(limit=None).next
+        self.logger.debug("CarrotListener: Ready to accept tasks!")
+
+        prev_pcount = None
+        while 1:
+            pcount = int(self.prefetch_count) # Convert SharedCounter to int
+            if not prev_pcount or pcount != prev_pcount:
+                task_consumer.qos(prefetch_count=pcount)
+                prev_pcount = pcount
+
+            wait_for_message()
+
+    def on_control_command(self, command):
+        if command["command"] == "revoke":
+            revoke_uuid = command["task_id"]
+            revoked.add(revoke_uuid)
+            self.logger.warn("Task %s marked as revoked." % revoke_uuid)
+            return
+
+    def on_task(self, task, eta=None):
+        """Handle received task.
+
+        If the task has an ``eta`` we enter it into the ETA schedule,
+        otherwise we move it the ready queue for immediate processing.
+
+        """
+
+        if task.task_id in revoked:
+            self.logger.warn("Got revoked task from broker: %s[%s]" % (
+                task.task_name, task.task_id))
+            return task.on_ack()
+
+        self.event_dispatcher.send("task-received", uuid=task.task_id,
+                name=task.task_name, args=task.args, kwargs=task.kwargs,
+                retries=task.retries, eta=eta)
+
+        if eta:
+            if not isinstance(eta, datetime):
+                eta = parse_iso8601(eta)
+            self.prefetch_count.increment()
+            self.logger.info("Got task from broker: %s[%s] eta:[%s]" % (
+                    task.task_name, task.task_id, eta))
+            self.eta_scheduler.enter(task, eta=eta,
+                                     callback=self.prefetch_count.decrement)
+        else:
+            self.logger.info("Got task from broker: %s[%s]" % (
+                    task.task_name, task.task_id))
+            self.ready_queue.put(task)
+
+    def receive_message(self, message_data, message):
+        """The callback called when a new message is received. """
+
+        # Handle task
+        if message_data.get("task"):
+            try:
+                task = TaskWrapper.from_message(message, message_data,
+                                                logger=self.logger,
+                                                eventer=self.event_dispatcher)
+            except NotRegistered, exc:
+                self.logger.error("Unknown task ignored: %s" % (exc))
+            else:
+                self.on_task(task, eta=message_data.get("eta"))
+            return
+
+        # Handle control command
+        control = message_data.get("control")
+        if control:
+            self.on_control_command(control)
+        return
+
+    def close_connection(self):
+        if not self._state == RUN:
+            return
+        self._state = CLOSE
+
+        self.logger.debug("Heart: Going into cardiac arrest...")
+        self.heart = self.heart and self.heart.stop()
+
+        self.logger.debug("TaskConsumer: Shutting down...")
+        self.task_consumer = self.task_consumer and self.task_consumer.close()
+
+        self.logger.debug("EventDispatcher: Shutting down...")
+        self.event_dispatcher = self.event_dispatcher and \
+                                    self.event_dispatcher.close()
+        self.logger.debug(
+                "CarrotListener: Closing connection to broker...")
+        self.amqp_connection = self.amqp_connection and \
+                                    self.amqp_connection.close()
+
+    def reset_connection(self):
+        self.logger.debug(
+                "CarrotListener: Re-establishing connection to the broker...")
+        self.close_connection()
+        self.amqp_connection = self._open_connection()
+        self.logger.debug("CarrotListener: Connection Established.")
+        self.task_consumer = get_consumer_set(connection=self.amqp_connection)
+        self.broadcast_consumer = BroadcastConsumer(self.amqp_connection)
+        self.task_consumer.add_consumer(self.broadcast_consumer)
+        self.task_consumer.register_callback(self.receive_message)
+        self.event_dispatcher = EventDispatcher(self.amqp_connection,
+                                                enabled=self.send_events)
+        self.heart = Heart(self.event_dispatcher)
+        self.heart.start()
+
+        self._state = RUN
+
+    def _open_connection(self):
+        """Retries connecting to the AMQP broker over time.
+
+        See :func:`celery.utils.retry_over_time`.
+
+        """
+
+        def _connection_error_handler(exc, interval):
+            """Callback handler for connection errors."""
+            self.logger.error("AMQP Listener: Connection Error: %s. " % exc
+                     + "Trying again in %d seconds..." % interval)
+
+        def _establish_connection():
+            """Establish a connection to the AMQP broker."""
+            conn = DjangoBrokerConnection()
+            connected = conn.connection # Connection is established lazily.
+            return conn
+
+        if not conf.AMQP_CONNECTION_RETRY:
+            return _establish_connection()
+
+        conn = retry_over_time(_establish_connection, (socket.error, IOError),
+                               errback=_connection_error_handler,
+                               max_retries=conf.AMQP_CONNECTION_MAX_RETRIES)
+        return conn
+
+    def stop(self):
+        self.close_connection()
+

+ 43 - 0
celery/worker/revoke.py

@@ -0,0 +1,43 @@
+import time
+from UserDict import UserDict
+
+from carrot.connection import DjangoBrokerConnection
+
+from celery.messaging import BroadcastPublisher
+from celery.utils import noop
+
+REVOKES_MAX = 1000
+REVOKE_EXPIRES = 60 * 60 # one hour.
+
+
+class RevokeRegistry(UserDict):
+
+    def __init__(self, maxlen=REVOKES_MAX, expires=REVOKE_EXPIRES):
+        self.maxlen = maxlen
+        self.expires = expires
+        self.data = {}
+
+    def add(self, uuid):
+        self._expire_item()
+        self[uuid] = time.time()
+
+    def _expire_item(self):
+        while 1:
+            if len(self) > self.maxlen:
+                uuid, when = self.oldest
+                if time.time() > when + self.expires:
+                    try:
+                        self.pop(uuid, None)
+                    except TypeError:
+                        continue
+            break
+
+    @property
+    def oldest(self):
+        return sorted(self.items(), key=lambda (uuid, when): when)[0]
+
+
+
+revoked = RevokeRegistry()
+
+

+ 49 - 0
docs/internals/events.rst

@@ -0,0 +1,49 @@
+=======================
+ List of Worker Events
+=======================
+
+This is the list of events sent by the worker.
+The monitor uses these to visualize the state of the cluster.
+
+Task Events
+-----------
+
+* task-received(uuid, name, args, kwargs, retries, eta, hostname, timestamp)
+
+    Sent when the worker receives a task.
+
+* task-accepted(uuid, hostname, timestamp)
+
+    Sent just before the worker executes the task.
+
+* task-succeeded(uuid, result, runtime, hostname, timestamp)
+
+    Sent if the task executed successfully.
+    Runtime is the time it took to execute the task using the pool.
+    (Time starting from the task is sent to the pool, and ending when the
+    pool result handlers callback is called).
+
+* task-failed(uuid, exception, traceback, hostname, timestamp)
+
+    Sent if the execution of the task failed.
+
+* task-retried(uuid, exception, traceback, hostname, delay, timestamp)
+
+    Sent if the task failed, but will be retried in the future.
+    (**NOT IMPLEMENTED**)
+
+Worker Events
+-------------
+
+* worker-online(hostname, timestamp)
+
+    The worker has connected to the broker and is online.
+
+* worker-heartbeat(hostname, timestamp)
+
+    Sent every minute, if the worker has not sent a heartbeat in 2 minutes,
+    it's considered to be offline.
+
+* worker-offline(hostname, timestamp)
+
+    The worker has disconnected from the broker.

+ 1 - 0
docs/internals/index.rst

@@ -10,4 +10,5 @@
 
 
     worker
     worker
     protocol
     protocol
+    events
     reference/index
     reference/index

+ 8 - 0
docs/internals/reference/celery.patch.rst

@@ -0,0 +1,8 @@
+======================================
+ Compatibility Patches - celery.patch
+======================================
+
+.. currentmodule:: celery.patch
+
+.. automodule:: celery.patch
+    :members:

+ 8 - 0
docs/internals/reference/celery.worker.heartbeat.rst

@@ -0,0 +1,8 @@
+=============================================
+ Worker Heartbeats - celery.worker.heartbeat
+=============================================
+
+.. currentmodule:: celery.worker.heartbeat
+
+.. automodule:: celery.worker.heartbeat
+    :members:

+ 8 - 0
docs/internals/reference/celery.worker.listener.rst

@@ -0,0 +1,8 @@
+==================================================
+ Worker Message Listener - celery.worker.listener
+==================================================
+
+.. currentmodule:: celery.worker.listener
+
+.. automodule:: celery.worker.listener
+    :members:

+ 8 - 0
docs/internals/reference/celery.worker.revoke.rst

@@ -0,0 +1,8 @@
+==============================================
+ Worker Revoked Tasks - celery.worker.revoke
+==============================================
+
+.. currentmodule:: celery.worker.revoke
+
+.. automodule:: celery.worker.revoke
+    :members:

+ 4 - 0
docs/internals/reference/index.rst

@@ -9,11 +9,14 @@
     :maxdepth: 2
     :maxdepth: 2
 
 
     celery.worker
     celery.worker
+    celery.worker.listener
     celery.worker.job
     celery.worker.job
     celery.worker.controllers
     celery.worker.controllers
     celery.worker.buckets
     celery.worker.buckets
     celery.worker.scheduler
     celery.worker.scheduler
     celery.worker.pool
     celery.worker.pool
+    celery.worker.heartbeat
+    celery.worker.revoke
     celery.beat
     celery.beat
     celery.backends
     celery.backends
     celery.backends.base
     celery.backends.base
@@ -30,3 +33,4 @@
     celery.platform
     celery.platform
     celery.managers
     celery.managers
     celery.models
     celery.models
+    celery.patch

+ 0 - 4
docs/introduction.rst

@@ -94,10 +94,6 @@ Features
       returns a JSON-serialized data structure containing the task status,
       returns a JSON-serialized data structure containing the task status,
       and the return value if completed, or exception on failure.
       and the return value if completed, or exception on failure.
 
 
-    * The worker can collect statistics, like, how many tasks has been
-      executed by type, and the time it took to process them. Very useful
-      for monitoring and profiling.
-
     * Pool workers are supervised, so if for some reason a worker crashes
     * Pool workers are supervised, so if for some reason a worker crashes
         it is automatically replaced by a new worker.
         it is automatically replaced by a new worker.
 
 

+ 8 - 0
docs/reference/celery.events.rst

@@ -0,0 +1,8 @@
+========================
+ Events - celery.events
+========================
+
+.. currentmodule:: celery.events
+
+.. automodule:: celery.events
+    :members:

+ 1 - 0
docs/reference/index.rst

@@ -28,6 +28,7 @@
     celery.messaging
     celery.messaging
     celery.contrib.test_runner
     celery.contrib.test_runner
     celery.views
     celery.views
+    celery.events
     celery.bin.celeryd
     celery.bin.celeryd
     celery.bin.celerybeat
     celery.bin.celerybeat
     celery.bin.celeryinit
     celery.bin.celeryinit

+ 0 - 0
examples/django/demoproject/__init__.py


+ 0 - 0
examples/django/demoproject/demoapp/__init__.py


+ 3 - 0
examples/django/demoproject/demoapp/models.py

@@ -0,0 +1,3 @@
+from django.db import models
+
+# Create your models here.

+ 0 - 0
examples/django/demoproject/demoapp/tasks.py


+ 23 - 0
examples/django/demoproject/demoapp/tests.py

@@ -0,0 +1,23 @@
+"""
+This file demonstrates two different styles of tests (one doctest and one
+unittest). These will both pass when you run "manage.py test".
+
+Replace these with more appropriate tests for your application.
+"""
+
+from django.test import TestCase
+
+class SimpleTest(TestCase):
+    def test_basic_addition(self):
+        """
+        Tests that 1 + 1 always equals 2.
+        """
+        self.failUnlessEqual(1 + 1, 2)
+
+__test__ = {"doctest": """
+Another way to test that 1 + 1 is equal to 2.
+
+>>> 1 + 1 == 2
+True
+"""}
+

+ 1 - 0
examples/django/demoproject/demoapp/views.py

@@ -0,0 +1 @@
+# Create your views here.

+ 11 - 0
examples/django/demoproject/manage.py

@@ -0,0 +1,11 @@
+#!/usr/bin/env python
+from django.core.management import execute_manager
+try:
+    import settings # Assumed to be in the same directory.
+except ImportError:
+    import sys
+    sys.stderr.write("Error: Can't find the file 'settings.py' in the directory containing %r. It appears you've customized things.\nYou'll have to run django-admin.py, passing it your settings module.\n(If the file settings.py does indeed exist, it's causing an ImportError somehow.)\n" % __file__)
+    sys.exit(1)
+
+if __name__ == "__main__":
+    execute_manager(settings)

+ 71 - 0
examples/django/demoproject/settings.py

@@ -0,0 +1,71 @@
+# Django settings for demoproject project.
+
+DEBUG = True
+TEMPLATE_DEBUG = DEBUG
+
+ADMINS = (
+    # ('Your Name', 'your_email@domain.com'),
+)
+
+MANAGERS = ADMINS
+
+DATABASE_ENGINE = ''           # 'postgresql_psycopg2', 'postgresql', 'mysql', 'sqlite3' or 'oracle'.
+DATABASE_NAME = ''             # Or path to database file if using sqlite3.
+DATABASE_USER = ''             # Not used with sqlite3.
+DATABASE_PASSWORD = ''         # Not used with sqlite3.
+DATABASE_HOST = ''             # Set to empty string for localhost. Not used with sqlite3.
+DATABASE_PORT = ''             # Set to empty string for default. Not used with sqlite3.
+
+INSTALLED_APPS = (
+    'django.contrib.auth',
+    'django.contrib.contenttypes',
+    'django.contrib.sessions',
+    'django.contrib.sites',
+    'celery',
+    'demoapp',
+    'twitterfollow',
+)
+
+TIME_ZONE = 'America/Chicago'
+LANGUAGE_CODE = 'en-us'
+SITE_ID = 1
+USE_I18N = True
+
+# Absolute path to the directory that holds media.
+# Example: "/home/media/media.lawrence.com/"
+MEDIA_ROOT = ''
+
+# URL that handles the media served from MEDIA_ROOT. Make sure to use a
+# trailing slash if there is a path component (optional in other cases).
+# Examples: "http://media.lawrence.com", "http://example.com/media/"
+MEDIA_URL = ''
+
+# URL prefix for admin media -- CSS, JavaScript and images. Make sure to use a
+# trailing slash.
+# Examples: "http://foo.com/media/", "/media/".
+ADMIN_MEDIA_PREFIX = '/media/'
+
+# Make this unique, and don't share it with anybody.
+SECRET_KEY = '!j@j8x^6hr&ue7#n1w@d8zr@#=xqb#br4tjcjy50wuv_5rs7r('
+
+# List of callables that know how to import templates from various sources.
+TEMPLATE_LOADERS = (
+    'django.template.loaders.filesystem.load_template_source',
+    'django.template.loaders.app_directories.load_template_source',
+#     'django.template.loaders.eggs.load_template_source',
+)
+
+MIDDLEWARE_CLASSES = (
+    'django.middleware.common.CommonMiddleware',
+    'django.contrib.sessions.middleware.SessionMiddleware',
+    'django.contrib.auth.middleware.AuthenticationMiddleware',
+)
+
+ROOT_URLCONF = 'demoproject.urls'
+
+TEMPLATE_DIRS = (
+    # Put strings here, like "/home/html/django_templates" or "C:/www/django/templates".
+    # Always use forward slashes, even on Windows.
+    # Don't forget to use absolute paths, not relative paths.
+)
+

+ 0 - 0
examples/django/demoproject/twitterfollow/__init__.py


+ 27 - 0
examples/django/demoproject/twitterfollow/models.py

@@ -0,0 +1,27 @@
+from django.db import models
+
+
+class User(models.Model):
+    userid = models.PositiveIntegerField(unique=True)
+    screen_name = models.CharField(max_length=200)
+    name = models.CharField(max_length=200)
+    description = models.CharField(max_length=200)
+    favourites_count = models.PositiveIntegerField(default=0)
+    followers_count = models.PositiveIntegerField(default=0)
+    location = models.CharField(max_length=200)
+    statuses_count = models.PositiveIntegerField(default=0)
+    url = models.URLField(verify_exists=False)
+
+
+class Status(models.Model):
+    status_id = models.PositiveIntegerField()
+    screen_name = models.CharField(max_length=200)
+    created_at = models.DateTimeField()
+    text = models.CharField(max_length=200)
+
+
+
+
+
+
+

+ 0 - 0
examples/django/demoproject/twitterfollow/tasks.py


+ 23 - 0
examples/django/demoproject/twitterfollow/tests.py

@@ -0,0 +1,23 @@
+"""
+This file demonstrates two different styles of tests (one doctest and one
+unittest). These will both pass when you run "manage.py test".
+
+Replace these with more appropriate tests for your application.
+"""
+
+from django.test import TestCase
+
+class SimpleTest(TestCase):
+    def test_basic_addition(self):
+        """
+        Tests that 1 + 1 always equals 2.
+        """
+        self.failUnlessEqual(1 + 1, 2)
+
+__test__ = {"doctest": """
+Another way to test that 1 + 1 is equal to 2.
+
+>>> 1 + 1 == 2
+True
+"""}
+

+ 1 - 0
examples/django/demoproject/twitterfollow/views.py

@@ -0,0 +1 @@
+# Create your views here.

+ 17 - 0
examples/django/demoproject/urls.py

@@ -0,0 +1,17 @@
+from django.conf.urls.defaults import *
+
+# Uncomment the next two lines to enable the admin:
+# from django.contrib import admin
+# admin.autodiscover()
+
+urlpatterns = patterns('',
+    # Example:
+    # (r'^demoproject/', include('demoproject.foo.urls')),
+
+    # Uncomment the admin/doc line below and add 'django.contrib.admindocs' 
+    # to INSTALLED_APPS to enable admin documentation:
+    # (r'^admin/doc/', include('django.contrib.admindocs.urls')),
+
+    # Uncomment the next line to enable the admin:
+    # (r'^admin/', include(admin.site.urls)),
+)

+ 0 - 0
examples/pythonproject/demoapp/__init__.py


BIN
examples/pythonproject/demoapp/celery.db


+ 12 - 0
examples/pythonproject/demoapp/celeryconfig.py

@@ -0,0 +1,12 @@
+import os
+import sys
+sys.path.insert(0, os.getcwd())
+
+DATABASE_ENGINE = "sqlite3"
+DATABASE_NAME = "celery.db"
+BROKER_HOST = "localhost"
+BROKER_USER = "guest"
+BROKER_PASSWORD = "guest"
+BROKER_VHOST = "/"
+CELERY_BACKEND = "amqp"
+CELERY_IMPORTS = ("tasks", )

Những thai đổi đã bị hủy bỏ vì nó quá lớn
+ 1478 - 0
examples/pythonproject/demoapp/erl_crash.dump


+ 11 - 0
examples/pythonproject/demoapp/tasks.py

@@ -0,0 +1,11 @@
+from celery.decorators import task
+
+
+@task()
+def add(x, y):
+    return x + y
+
+
+@task()
+def mul(x, y):
+    return x * y

+ 16 - 0
examples/pythonproject/demoapp/test.py

@@ -0,0 +1,16 @@
+from tasks import add
+
+
+print(">>> from tasks import add")
+print(">>> add(4, 4)")
+res = add(4, 4)
+print(repr(res))
+
+print(">>> add.delay(4, 4)")
+res = add.delay(4, 4)
+print(repr(res))
+
+print(">>> add.delay(4, 4).wait()")
+res = add.delay(4, 4).wait()
+print(repr(res))
+

Một số tệp đã không được hiển thị bởi vì quá nhiều tập tin thay đổi trong này khác