瀏覽代碼

Merge branch 'master' into statistics

Conflicts:
	celery/bin/celeryd.py
Ask Solem 16 年之前
父節點
當前提交
dbfbd2fe3b
共有 8 個文件被更改,包括 119 次插入54 次删除
  1. 1 0
      AUTHORS
  2. 39 7
      README
  3. 3 0
      celery/backends/base.py
  4. 2 1
      celery/bin/celeryd.py
  5. 8 0
      celery/fields.py
  6. 1 1
      celery/log.py
  7. 19 18
      celery/registry.py
  8. 46 27
      celery/worker.py

+ 1 - 0
AUTHORS

@@ -1,3 +1,4 @@
 Ask Solem <askh@opera.com>
 Ask Solem <askh@opera.com>
 Grégoire Cachet <gregoire@audacy.fr>
 Grégoire Cachet <gregoire@audacy.fr>
 Vitaly Babiy <vbabiy86@gmail.com>
 Vitaly Babiy <vbabiy86@gmail.com>
+Brian Rosner <brosner@gmail.com>

+ 39 - 7
README

@@ -81,12 +81,28 @@ To install using ``easy_install``,::
 
 
     $ easy_install celery
     $ easy_install celery
 
 
-If you have downloaded a source tarball you can install it
-by doing the following,::
+Downloading and installing from source
+--------------------------------------
 
 
+Download the latest version of ``celery`` from
+http://pypi.python.org/pypi/celery/
+
+You can install it by doing the following,::
+
+    $ tar xvfz celery-0.0.0.tar.gz
+    $ cd celery-0.0.0
     $ python setup.py build
     $ python setup.py build
     # python setup.py install # as root
     # python setup.py install # as root
 
 
+Using the development version
+------------------------------
+
+
+You can clone the repository by doing the following::
+
+    $ git clone git@github.com:ask/celery.git celery
+
+
 Usage
 Usage
 =====
 =====
 
 
@@ -146,10 +162,10 @@ available, please consult the `API Reference`_
 ``celeryd`` will only be able to process one task at a time, this is
 ``celeryd`` will only be able to process one task at a time, this is
 because SQLite doesn't allow concurrent writes.
 because SQLite doesn't allow concurrent writes.
 
 
-Running the celery worker daemon
+Running the celery worker server
 --------------------------------
 --------------------------------
 
 
-To test this we'll be running the worker daemon in the foreground, so we can
+To test this we'll be running the worker server in the foreground, so we can
 see what's going on without consulting the logfile::
 see what's going on without consulting the logfile::
 
 
     $ python manage.py celeryd
     $ python manage.py celeryd
@@ -158,10 +174,10 @@ see what's going on without consulting the logfile::
 However, in production you'll probably want to run the worker in the
 However, in production you'll probably want to run the worker in the
 background as a daemon instead::
 background as a daemon instead::
 
 
-    $ python manage.py celeryd --daemon
+    $ python manage.py celeryd --detach
 
 
 
 
-For help on command line arguments to the worker daemon, you can execute the
+For help on command line arguments to the worker server, you can execute the
 help command::
 help command::
 
 
     $ python manage.py help celeryd
     $ python manage.py help celeryd
@@ -175,7 +191,7 @@ be defined in the python shell or ipython/bpython. This is because the celery
 worker server needs access to the task function to be able to run it.
 worker server needs access to the task function to be able to run it.
 So while it looks like we use the python shell to define the tasks in these
 So while it looks like we use the python shell to define the tasks in these
 examples, you can't do it this way. Put them in the ``tasks`` module of your
 examples, you can't do it this way. Put them in the ``tasks`` module of your
-Django application. The worker daemon will automatically load any ``tasks.py``
+Django application. The worker server will automatically load any ``tasks.py``
 file for all of the applications listed in ``settings.INSTALLED_APPS``.
 file for all of the applications listed in ``settings.INSTALLED_APPS``.
 Executing tasks using ``delay`` and ``apply_async`` can be done from the
 Executing tasks using ``delay`` and ``apply_async`` can be done from the
 python shell, but keep in mind that since arguments are pickled, you can't
 python shell, but keep in mind that since arguments are pickled, you can't
@@ -261,6 +277,22 @@ Here's an example of a periodic task:
 **Note:** Periodic tasks does not support arguments, as this doesn't
 **Note:** Periodic tasks does not support arguments, as this doesn't
 really make sense.
 really make sense.
 
 
+
+Bug tracker
+===========
+
+If you have any suggestions, bug reports or annoyances please report them
+to our issue tracker at http://github.com/ask/celery/issues/
+
+Contributing
+============
+
+Development of ``celery`` happens at Github: http://github.com/ask/celery
+
+You are highly encouraged to participate in the development
+of ``celery``. If you don't like Github (for some reason) you're welcome
+to send regular patches.
+
 License
 License
 =======
 =======
 
 

+ 3 - 0
celery/backends/base.py

@@ -1,4 +1,6 @@
 """celery.backends.base"""
 """celery.backends.base"""
+import time
+
 from celery.timer import TimeoutTimer
 from celery.timer import TimeoutTimer
 try:
 try:
     import cPickle as pickle
     import cPickle as pickle
@@ -173,6 +175,7 @@ class BaseBackend(object):
                 return self.get_result(task_id)
                 return self.get_result(task_id)
             elif status == "FAILURE":
             elif status == "FAILURE":
                 raise self.get_result(task_id)
                 raise self.get_result(task_id)
+            time.sleep(0.5) # avoid hammering the CPU checking status.
             timeout_timer.tick()
             timeout_timer.tick()
 
 
     def process_cleanup(self):
     def process_cleanup(self):

+ 2 - 1
celery/bin/celeryd.py

@@ -82,6 +82,7 @@ import optparse
 import atexit
 import atexit
 from daemon import DaemonContext
 from daemon import DaemonContext
 from daemon.pidlockfile import PIDLockFile
 from daemon.pidlockfile import PIDLockFile
+import errno
 
 
 USE_STATISTICS = getattr(settings, "CELERY_STATISTICS", False)
 USE_STATISTICS = getattr(settings, "CELERY_STATISTICS", False)
 settings.CELERY_STATISTICS = USE_STATISTICS
 settings.CELERY_STATISTICS = USE_STATISTICS
@@ -183,7 +184,7 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, daemon=False,
         emergency_error(logfile, "celeryd raised exception %s: %s\n%s" % (
         emergency_error(logfile, "celeryd raised exception %s: %s\n%s" % (
                             e.__class__, e, traceback.format_exc()))
                             e.__class__, e, traceback.format_exc()))
     except:
     except:
-        if context:
+        if daemon:
             context.close()
             context.close()
         raise
         raise
 
 

+ 8 - 0
celery/fields.py

@@ -4,6 +4,7 @@ Custom Django Model Fields.
 
 
 """
 """
 from django.db import models
 from django.db import models
+from django.conf import settings
 
 
 try:
 try:
     import cPickle as pickle
     import cPickle as pickle
@@ -18,6 +19,13 @@ class PickledObject(str):
     pass
     pass
 
 
 
 
+if settings.DATABASE_ENGINE == "postgresql_psycopg2":
+    import psycopg2.extensions
+    # register PickledObject as a QuotedString otherwise we will see
+    # can't adapt errors from psycopg2.
+    psycopg2.extensions.register_adapter(PickledObject, psycopg2.extensions.QuotedString)
+
+
 class PickledObjectField(models.Field):
 class PickledObjectField(models.Field):
     """A field that automatically pickles/unpickles its value."""
     """A field that automatically pickles/unpickles its value."""
     __metaclass__ = models.SubfieldBase
     __metaclass__ = models.SubfieldBase

+ 1 - 1
celery/log.py

@@ -1,5 +1,4 @@
 """celery.log"""
 """celery.log"""
-import multiprocessing
 import os
 import os
 import sys
 import sys
 import time
 import time
@@ -14,6 +13,7 @@ def setup_logger(loglevel=DAEMON_LOG_LEVEL, logfile=None, format=LOG_FORMAT,
 
 
     Returns logger object.
     Returns logger object.
     """
     """
+    import multiprocessing
     logger = multiprocessing.get_logger()
     logger = multiprocessing.get_logger()
     for handler in logger.handlers:
     for handler in logger.handlers:
         if hasattr(handler, "close"):
         if hasattr(handler, "close"):

+ 19 - 18
celery/registry.py

@@ -24,13 +24,13 @@ class TaskRegistry(UserDict):
         """Autodiscovers tasks using :func:`celery.discovery.autodiscover`."""
         """Autodiscovers tasks using :func:`celery.discovery.autodiscover`."""
         discovery.autodiscover()
         discovery.autodiscover()
 
 
-    def register(self, task, task_name=None):
+    def register(self, task, name=None):
         """Register a task in the task registry.
         """Register a task in the task registry.
 
 
         Task can either be a regular function, or a class inheriting
         Task can either be a regular function, or a class inheriting
         from :class:`celery.task.Task`.
         from :class:`celery.task.Task`.
 
 
-        :keyword task_name: By default the :attr:`Task.name` attribute on the
+        :keyword name: By default the :attr:`Task.name` attribute on the
             task is used as the name of the task, but you can override it
             task is used as the name of the task, but you can override it
             using this option.
             using this option.
 
 
@@ -39,34 +39,35 @@ class TaskRegistry(UserDict):
         """
         """
         is_class = hasattr(task, "run")
         is_class = hasattr(task, "run")
 
 
-        if not task_name:
-            task_name = getattr(task, "name")
+        if not name:
+            name = getattr(task, "name")
 
 
-        if task_name in self.data:
+        if name in self.data:
             raise self.AlreadyRegistered(
             raise self.AlreadyRegistered(
-                    "Task with name %s is already registered." % task_name)
+                    "Task with name %s is already registered." % name)
 
 
         if is_class:
         if is_class:
-            self.data[task_name] = task() # instantiate Task class
+            self.data[name] = task() # instantiate Task class
         else:
         else:
-            task.name = task_name
+            task.name = name
             task.type = "regular"
             task.type = "regular"
-            self.data[task_name] = task
+            self.data[name] = task
 
 
-    def unregister(self, task_name):
+    def unregister(self, name):
         """Unregister task by name.
         """Unregister task by name.
 
 
-        :param task_name: name of the task to unregister.
+        :param name: name of the task to unregister, or a
+        :class:`celery.task.Task` class with a valid ``name`` attribute.
 
 
         :raises NotRegistered: if the task has not been registered.
         :raises NotRegistered: if the task has not been registered.
 
 
         """
         """
-        if hasattr(task_name, "run"):
-            task_name = task_name.name
-        if task_name not in self.data:
+        if hasattr(name, "run"):
+            name = name.name
+        if name not in self.data:
             raise self.NotRegistered(
             raise self.NotRegistered(
-                    "Task with name %s is not registered." % task_name)
-        del self.data[task_name]
+                    "Task with name %s is not registered." % name)
+        del self.data[name]
 
 
     def get_all(self):
     def get_all(self):
         """Get all task types."""
         """Get all task types."""
@@ -86,9 +87,9 @@ class TaskRegistry(UserDict):
         """Get all periodic task types."""
         """Get all periodic task types."""
         return self.filter_types(type="periodic")
         return self.filter_types(type="periodic")
 
 
-    def get_task(self, task_name):
+    def get_task(self, name):
         """Get task by name."""
         """Get task by name."""
-        return self.data[task_name]
+        return self.data[name]
 
 
 """
 """
 .. data:: tasks
 .. data:: tasks

+ 46 - 27
celery/worker.py

@@ -281,12 +281,24 @@ class PeriodicWorkController(threading.Thread):
         >>> PeriodicWorkController().start()
         >>> PeriodicWorkController().start()
     
     
     """
     """
-
+    
+    def __init__(self):
+        super(PeriodicWorkController, self).__init__()
+        self._shutdown = threading.Event()
+        self._stopped = threading.Event()
+    
     def run(self):
     def run(self):
         """Don't use :meth:`run`. use :meth:`start`."""
         """Don't use :meth:`run`. use :meth:`start`."""
         while True:
         while True:
+            if self._shutdown.isSet():
+                break
             default_periodic_status_backend.run_periodic_tasks()
             default_periodic_status_backend.run_periodic_tasks()
             time.sleep(1)
             time.sleep(1)
+        self._stopped.set() # indicate that we are stopped
+    
+    def stop(self):
+        self._shutdown.set()
+        self._stopped.wait() # block until this thread is done
 
 
 
 
 class WorkController(object):
 class WorkController(object):
@@ -355,6 +367,7 @@ class WorkController(object):
                                     self.queue_wakeup_after
                                     self.queue_wakeup_after
         self.logger = setup_logger(loglevel, logfile)
         self.logger = setup_logger(loglevel, logfile)
         self.pool = TaskPool(self.concurrency, logger=self.logger)
         self.pool = TaskPool(self.concurrency, logger=self.logger)
+        self.periodicworkcontroller = PeriodicWorkController()
         self.task_consumer = None
         self.task_consumer = None
         self.task_consumer_it = None
         self.task_consumer_it = None
         self.is_detached = is_detached
         self.is_detached = is_detached
@@ -426,15 +439,19 @@ class WorkController(object):
     def schedule_retry_tasks(self):
     def schedule_retry_tasks(self):
         """Reschedule all requeued tasks waiting for retry."""
         """Reschedule all requeued tasks waiting for retry."""
         pass
         pass
-
+    
+    def shutdown(self):
+        # shut down the periodic work controller thread
+        self.periodicworkcontroller.stop()
+        self.pool.terminate()
 
 
     def run(self):
     def run(self):
         """Starts the workers main loop."""
         """Starts the workers main loop."""
-        log_wait = lambda: self.logger.info("Waiting for queue...")
+        log_wait = lambda: self.logger.debug("Waiting for queue...")
         ev_msg_waiting = EventTimer(log_wait, self.empty_msg_emit_every)
         ev_msg_waiting = EventTimer(log_wait, self.empty_msg_emit_every)
 
 
         self.pool.run()
         self.pool.run()
-        PeriodicWorkController().start()
+        self.periodicworkcontroller.start()
 
 
         # If not running as daemon, and DEBUG logging level is enabled,
         # If not running as daemon, and DEBUG logging level is enabled,
         # print pool PIDs and sleep for a second before we start.
         # print pool PIDs and sleep for a second before we start.
@@ -443,26 +460,28 @@ class WorkController(object):
                 "|".join(map(str, self.pool.get_worker_pids()))))
                 "|".join(map(str, self.pool.get_worker_pids()))))
             if not self.is_detached:
             if not self.is_detached:
                 time.sleep(1)
                 time.sleep(1)
-
-        while True:
-            try:
-                self.execute_next_task()
-            except ValueError:
-                # execute_next_task didn't return a r/name/id tuple,
-                # probably because it got an exception.
-                continue
-            except EmptyQueue:
-                ev_msg_waiting.tick()
-                time.sleep(self.queue_wakeup_after)
-                continue
-            except UnknownTask, exc:
-                self.logger.info("Unknown task ignored: %s" % (exc))
-                continue
-            except Exception, exc:
-                self.logger.critical("Message queue raised %s: %s\n%s" % (
-                             exc.__class__, exc, traceback.format_exc()))
-                continue
-            except:
-                self.pool.terminate()
-                raise
-
+        
+        try:
+            while True:
+                try:
+                    self.execute_next_task()
+                except ValueError:
+                    # execute_next_task didn't return a r/name/id tuple,
+                    # probably because it got an exception.
+                    continue
+                except EmptyQueue:
+                    ev_msg_waiting.tick()
+                    time.sleep(self.queue_wakeup_after)
+                    continue
+                except UnknownTask, exc:
+                    self.logger.info("Unknown task ignored: %s" % (exc))
+                    continue
+                except Exception, exc:
+                    self.logger.critical("Message queue raised %s: %s\n%s" % (
+                                 exc.__class__, exc, traceback.format_exc()))
+                    continue
+                except:
+                    self.shutdown()
+                    raise
+        except (SystemExit, KeyboardInterrupt):
+            self.shutdown()