Ask Solem 14 年之前
父節點
當前提交
4fbdffdfbc

+ 8 - 55
FAQ

@@ -367,65 +367,18 @@ Results can also be disabled globally using the
 Can I use Celery with ActiveMQ/STOMP?
 -------------------------------------
 
-**Answer**: Yes, but this is somewhat experimental for now.
-It is working ok in a test configuration, but it has not
-been tested in production. If you have any problems
-using STOMP with Celery, please report an issue here::
+**Answer**: No.  It used to be supported by Carrot,
+but is not currently supported in Kombu.
 
-    http://github.com/ask/celery/issues/
+.. _faq-non-amqp-missing-features:
 
-The STOMP carrot backend requires the `stompy`_ library::
-
-    $ pip install stompy
-    $ cd python-stomp
-    $ sudo python setup.py install
-    $ cd ..
-
-.. _`stompy`: http://pypi.python.org/pypi/stompy
-
-In this example we will use a queue called `celery` which we created in
-the ActiveMQ web admin interface.
-
-**Note**: When using ActiveMQ the queue name needs to have `"/queue/"`
-prepended to it. i.e. the queue `celery` becomes `/queue/celery`.
-
-Since STOMP doesn't have exchanges and the routing capabilities of AMQP,
-you need to set `exchange` name to the same as the queue name. This is
-a minor inconvenience since carrot needs to maintain the same interface
-for both AMQP and STOMP.
-
-Use the following settings in your :file:`celeryconfig.py`/
-django :file:`settings.py`:
-
-.. code-block:: python
-
-    # Use the stomp carrot backend.
-    CARROT_BACKEND = "stomp"
-
-    # STOMP hostname and port settings.
-    BROKER_HOST = "localhost"
-    BROKER_PORT = 61613
-
-    # The queue name to use (the exchange *must* be set to the
-    # same as the queue name when using STOMP)
-    CELERY_DEFAULT_QUEUE = "/queue/celery"
-    CELERY_DEFAULT_EXCHANGE = "/queue/celery" 
-
-    CELERY_QUEUES = {
-        "/queue/celery": {"exchange": "/queue/celery"}
-    }
-
-.. _faq-stomp-missing-features:
-
-What features are not supported when using ghettoq/STOMP?
----------------------------------------------------------
-
-This is a (possible incomplete) list of features not available when
-using the STOMP backend:
+What features are not supported when not using an AMQP broker?
+--------------------------------------------------------------
 
-    * routing keys
+This is an incomplete list of features not available when
+using the virtual transports:
 
-    * exchange types (direct, topic, headers, etc)
+    * The `header` exchange type.
 
     * immediate
 

+ 1 - 1
celery/apps/worker.py

@@ -6,7 +6,7 @@ import socket
 import sys
 import warnings
 
-from carrot.utils import partition
+from kombu.utils import partition
 
 from celery import __version__
 from celery import platforms

+ 2 - 2
celery/bin/camqadm.py

@@ -131,8 +131,8 @@ class AMQShell(cmd.Cmd):
     :keyword connect: Function used to connect to the server, must return
         connection object.
 
-    :keyword silent: If :const:`True`, the commands won't have annoying output not
-        relevant when running in non-shell mode.
+    :keyword silent: If :const:`True`, the commands won't have annoying
+                     output not relevant when running in non-shell mode.
 
 
     .. attribute: builtins

+ 1 - 1
celery/bin/celeryctl.py

@@ -177,7 +177,7 @@ class inspect(Command):
                "disable_events": 1.0,
                "ping": 0.2,
                "add_consumer": 1.0,
-               "cancel_consumer": 1.0,}
+               "cancel_consumer": 1.0}
     option_list = Command.option_list + (
                 Option("--timeout", "-t", type="float", dest="timeout",
                     default=None,

+ 0 - 1
celery/concurrency/processes/pool.py

@@ -108,7 +108,6 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
     if SIG_SOFT_TIMEOUT is not None:
         signal.signal(SIG_SOFT_TIMEOUT, soft_timeout_sighandler)
 
-
     completed = 0
     while maxtasks is None or (maxtasks and completed < maxtasks):
         try:

+ 2 - 2
celery/events/__init__.py

@@ -28,7 +28,7 @@ def Event(type, **fields):
 class EventDispatcher(object):
     """Send events as messages.
 
-    :param connection: Carrot connection.
+    :param connection: Connection to the broker.
 
     :keyword hostname: Hostname to identify ourselves as,
         by default uses the hostname returned by :func:`socket.gethostname`.
@@ -101,7 +101,7 @@ class EventDispatcher(object):
 class EventReceiver(object):
     """Capture events.
 
-    :param connection: Carrot connection.
+    :param connection: Connection to the broker.
     :keyword handlers: Event handlers.
 
     :attr:`handlers` is a dict of event types and their handlers,

+ 3 - 3
celery/loaders/base.py

@@ -114,11 +114,11 @@ class BaseLoader(object):
         return dict(map(getarg, args))
 
     def mail_admins(self, subject, message, fail_silently=False,
-            sender=None, to=None, host=None, port=None, 
+            sender=None, to=None, host=None, port=None,
             user=None, password=None):
         from celery.utils import mail
-        message = mail.Message(sender=sender,
-                               to=to, subject=subject, body=message)
+        message = mail.Message(sender=sender, to=to,
+                               subject=subject, body=message)
         mailer = mail.Mailer(host, port, user, password)
         mailer.send(message, fail_silently=fail_silently)
 

+ 3 - 2
celery/log.py

@@ -35,14 +35,15 @@ class ColorFormatter(logging.Formatter):
     def formatException(self, ei):
         r = logging.Formatter.formatException(self, ei)
         if type(r) in [types.StringType]:
-            r = r.decode("utf-8", "replace") # Convert to unicode
+            r = r.decode("utf-8", "replace")    # Convert to unicode
         return r
 
     def format(self, record):
         levelname = record.levelname
 
         if self.use_color and levelname in COLORS:
-            record.msg = unicode(colored().names[COLORS[levelname]](record.msg))
+            record.msg = unicode(colored().names[COLORS[levelname]](
+                            record.msg))
 
         # Very ugly, but have to make sure processName is supported
         # by foreign logger instances.

+ 2 - 1
celery/result.py

@@ -107,7 +107,8 @@ class BaseAsyncResult(object):
     @property
     def result(self):
         """When the task has been executed, this contains the return value.
-        If the task raised an exception, this will be the exception instance."""
+        If the task raised an exception, this will be the exception
+        instance."""
         return self.backend.get_result(self.task_id)
 
     @property

+ 1 - 1
celery/schedules.py

@@ -54,7 +54,7 @@ class schedule(object):
 
 class crontab_parser(object):
     """Parser for crontab expressions. Any expression of the form 'groups'
-    (see BNF grammar below) is accepted and expanded to a set of numbers. 
+    (see BNF grammar below) is accepted and expanded to a set of numbers.
     These numbers represent the units of time that the crontab needs to
     run on::
 

+ 0 - 1
celery/tests/test_routes.py

@@ -95,7 +95,6 @@ class test_lookup_route(unittest.TestCase):
                                        route)
         self.assertNotIn("queue", route)
 
-
     @with_queues(foo=a_queue, bar=b_queue)
     def test_lookup_paths_traversed(self):
         R = routes.prepare(({"celery.xaza": {"queue": "bar"}},

+ 7 - 8
celery/tests/test_task.py

@@ -761,7 +761,7 @@ class test_crontab_is_due(unittest.TestCase):
     def test_every_hour_execution_is_due(self):
         due, remaining = HourlyPeriodic().is_due(datetime(2010, 5, 10, 6, 30))
         self.assertTrue(due)
-        self.assertEquals(remaining, 60*60)
+        self.assertEquals(remaining, 60 * 60)
 
     @patch_crontab_nowfun(HourlyPeriodic, datetime(2010, 5, 10, 10, 29))
     def test_every_hour_execution_is_not_due(self):
@@ -774,14 +774,14 @@ class test_crontab_is_due(unittest.TestCase):
         due, remaining = QuarterlyPeriodic().is_due(
                             datetime(2010, 5, 10, 6, 30))
         self.assertTrue(due)
-        self.assertEquals(remaining, 15*60)
+        self.assertEquals(remaining, 15 * 60)
 
     @patch_crontab_nowfun(QuarterlyPeriodic, datetime(2010, 5, 10, 10, 30))
     def test_second_quarter_execution_is_due(self):
         due, remaining = QuarterlyPeriodic().is_due(
                             datetime(2010, 5, 10, 6, 30))
         self.assertTrue(due)
-        self.assertEquals(remaining, 15*60)
+        self.assertEquals(remaining, 15 * 60)
 
     @patch_crontab_nowfun(QuarterlyPeriodic, datetime(2010, 5, 10, 10, 14))
     def test_first_quarter_execution_is_not_due(self):
@@ -801,23 +801,22 @@ class test_crontab_is_due(unittest.TestCase):
     def test_daily_execution_is_due(self):
         due, remaining = DailyPeriodic().is_due(datetime(2010, 5, 9, 7, 30))
         self.assertTrue(due)
-        self.assertEquals(remaining, 24*60*60)
+        self.assertEquals(remaining, 24 * 60 * 60)
 
     @patch_crontab_nowfun(DailyPeriodic, datetime(2010, 5, 10, 10, 30))
     def test_daily_execution_is_not_due(self):
         due, remaining = DailyPeriodic().is_due(datetime(2010, 5, 10, 7, 30))
         self.assertFalse(due)
-        self.assertEquals(remaining, 21*60*60)
+        self.assertEquals(remaining, 21 * 60 * 60)
 
     @patch_crontab_nowfun(WeeklyPeriodic, datetime(2010, 5, 6, 7, 30))
     def test_weekly_execution_is_due(self):
         due, remaining = WeeklyPeriodic().is_due(datetime(2010, 4, 30, 7, 30))
         self.assertTrue(due)
-        self.assertEquals(remaining, 7*24*60*60)
+        self.assertEquals(remaining, 7 * 24 * 60 * 60)
 
     @patch_crontab_nowfun(WeeklyPeriodic, datetime(2010, 5, 7, 10, 30))
     def test_weekly_execution_is_not_due(self):
         due, remaining = WeeklyPeriodic().is_due(datetime(2010, 5, 6, 7, 30))
         self.assertFalse(due)
-        self.assertEquals(remaining, 6*24*60*60 - 3*60*60)
-
+        self.assertEquals(remaining, 6 * 24 * 60 * 60 - 3 * 60 * 60)

+ 0 - 1
celery/tests/test_task_control.py

@@ -26,7 +26,6 @@ class Control(control.Control):
     Mailbox = MockMailbox
 
 
-
 def with_mock_broadcast(fun):
 
     @wraps(fun)

+ 0 - 1
celery/utils/__init__.py

@@ -320,7 +320,6 @@ def textindent(t, indent=0):
         return "\n".join(" " * indent + p for p in t.split("\n"))
 
 
-
 def import_from_cwd(module, imp=None):
     """Import module, but make sure it finds modules
     located in the current directory.

+ 3 - 2
celery/worker/consumer.py

@@ -214,7 +214,7 @@ class Consumer(object):
         pidbox_state = AttributeDict(app=self.app,
                                      logger=logger,
                                      hostname=self.hostname,
-                                     listener=self, # pre 2.2
+                                     listener=self,     # pre 2.2
                                      consumer=self)
         self.pidbox_node = self.app.control.mailbox.Node(self.hostname,
                                                          state=pidbox_state,
@@ -279,7 +279,8 @@ class Consumer(object):
             except OverflowError, exc:
                 self.logger.error(
                     "Couldn't convert eta %s to timestamp: %r. Task: %r" % (
-                        task.eta, exc, task.info(safe=True)), exc_info=sys.exc_info())
+                        task.eta, exc, task.info(safe=True)),
+                    exc_info=sys.exc_info())
                 task.acknowledge()
             else:
                 self.qos.increment()

+ 1 - 0
celery/worker/control/builtins.py

@@ -172,6 +172,7 @@ def pool_grow(panel, n=1, **kwargs):
     panel.listener.pool.grow(n)
     return {"ok": "spawned worker processes"}
 
+
 @Panel.register
 def pool_shrink(panel, n=1, **kwargs):
     panel.listener.pool.shrink(n)

+ 0 - 1
celery/worker/job.py

@@ -553,4 +553,3 @@ class TaskRequest(object):
                    "Task %s[%s] has already been executed" % (
                        self.task_name, self.task_id))
         self.executed = True
-

+ 1 - 1
contrib/requirements/default.txt

@@ -1,5 +1,5 @@
 python-dateutil
 sqlalchemy
 anyjson
-kombu>=0.9.0
+kombu>=0.9.1
 pyparsing

+ 12 - 12
docs/_ext/celerydocs.py

@@ -1,21 +1,21 @@
 def setup(app):
     app.add_crossref_type(
-        directivename = "setting",
-        rolename      = "setting",
-        indextemplate = "pair: %s; setting",
+        directivename="setting",
+        rolename="setting",
+        indextemplate="pair: %s; setting",
     )
     app.add_crossref_type(
-        directivename = "sig",
-        rolename      = "sig",
-        indextemplate = "pair: %s; sig",
+        directivename="sig",
+        rolename="sig",
+        indextemplate="pair: %s; sig",
     )
     app.add_crossref_type(
-        directivename = "state",
-        rolename      = "state",
-        indextemplate = "pair: %s; state",
+        directivename="state",
+        rolename="state",
+        indextemplate="pair: %s; state",
     )
     app.add_crossref_type(
-        directivename = "control",
-        rolename      = "control",
-        indextemplate = "pair: %s; control",
+        directivename="control",
+        rolename="control",
+        indextemplate="pair: %s; control",
     )

+ 1 - 2
docs/userguide/executing.rst

@@ -123,8 +123,7 @@ The default serializer is :mod:`pickle`, but you can
 change this globally or for each individual task.
 There is built-in support for :mod:`pickle`, `JSON`, `YAML`
 and `msgpack`, and you can also add your own custom serializers by registering
-them into the Carrot serializer registry (see
-`Kombu: Serialization of Data`_).
+them into the Kombu serializer registry (see `Kombu: Serialization of Data`_).
 
 .. _`Kombu: Serialization of Data`:
     http://packages.python.org/kombu/introduction.html#serialization-of-data

+ 1 - 0
examples/app/myapp.py

@@ -16,6 +16,7 @@ from celery import Celery
 celery = Celery("myapp")
 celery.conf.update(BROKER_HOST="localhost")
 
+
 @celery.task(accept_magic_kwargs=False)
 def add(x, y, **kwargs):
     print("add id: %r %r %r" % (add.request.id, add.request.args,

+ 0 - 110
examples/ghetto-queue/README.rst

@@ -1,110 +0,0 @@
-=======================================================
- Example Celery project using a database message queue
-=======================================================
-
-Short instructions
-==================
-
-Quick rundown of the tutorial::
-
-1. Install the `ghettoq`_ plugin.
-
-    $ pip install ghettoq
-
-    $ celeryinit
-
-2. Open up two terminals. In the first, run:
-
-    $ celeryd --loglevel=INFO
-
-  In the second you run the test program:
-
-    $ python ./test.py
-
-Voila, you've executed some tasks!
-
-Instructions
-============
-
-This example uses the database as a message queue (commonly called a "ghetto
-queue"). Excellent for testing, but not suitable for production
-installations.
-
-To try it out you have to install the `GhettoQ`_ package first::
-
-    $ pip install ghettoq
-
-This package is an add-on to `Carrot`_; the messaging abstraction celery
-uses. The add-on enables the use of databases as message queues. Currently it
-supports `Redis`_ and relational databases via the Django ORM.
-
-.. _`ghettoq`: http://pypi.python.org/pypi/ghettoq
-.. _`Carrot`: http://pypi.python.org/pypi/carrot
-.. _`Redis`: http://code.google.com/p/redis/
-
-
-The provided `celeryconfig.py` configures the settings used to drive celery.
-
-Next we have to create the database tables by issuing the `celeryinit`
-command::
-
-    $ celeryinit
-
-We're using SQLite3, so this creates a database file (`celery.db` as
-specified in the config file). SQLite is great, but when used in combination
-with Django it doesn't handle concurrency well. To protect your program from
-lock problems, celeryd will only spawn one worker process. With
-other database drivers you can specify as many worker processes as you want.
-
-
-With the setup done, let's run the worker::
-
-    $ celeryd --loglevel=INFO
-
-
-You should see the worker starting up. As it will continue running in
-the foreground, we have to open up another terminal to run our test program::
-
-    $ python test.py
-
-
-The test program simply runs the `add` task, which is a simple task adding
-numbers. You can also run the task manually if you want::
-
-    >>> from tasks import add
-    >>> result = add.delay(4, 4)
-    >>> result.wait()
-    8
-
-Using Redis instead
-===================
-
-To use redis instead, you have to configure the following directives in 
-`celeryconfig.py`::
-
-    CARROT_BACKEND = "ghettoq.taproot.Redis"
-    BROKER_HOST = "localhost"
-    BROKER_PORT = 6379
-
-Modules
-=======
-
-    * celeryconfig.py
-
-        The celery configuration module.
-
-    * tasks.py
-
-        Tasks are defined in this module. This module is automatically
-        imported by the worker because it's listed in
-        celeryconfig's `CELERY_IMPORTS` directive.
-
-    * test.py
-
-        Simple test program running tasks.
-
-
-More information
-================
-
-http://celeryproject.org

+ 0 - 0
examples/ghetto-queue/__init__.py


+ 0 - 21
examples/ghetto-queue/celeryconfig.py

@@ -1,21 +0,0 @@
-import os
-import sys
-sys.path.insert(0, os.getcwd())
-
-DATABASE_ENGINE = "sqlite3"
-DATABASE_NAME = "celery.db"
-
-# or for Redis use ghettoq.taproot.Redis
-CARROT_BACKEND = "ghettoq.taproot.Database"
-
-# not needed for database.
-# BROKER_HOST = "localhost"
-# BROKER_USER = "guest"
-# BROKER_PASSWORD = "guest"
-# BROKER_VHOST = "/"
-
-# Need to add ghettoq when using the database backend, so
-# the database tables are created at syncdb.
-INSTALLED_APPS = ("celery", "ghettoq")
-
-CELERY_IMPORTS = ("tasks", )

+ 0 - 11
examples/ghetto-queue/tasks.py

@@ -1,11 +0,0 @@
-from celery.decorators import task
-
-
-@task()
-def add(x, y):
-    return x + y
-
-
-@task()
-def mul(x, y):
-    return x * y

+ 0 - 15
examples/ghetto-queue/test.py

@@ -1,15 +0,0 @@
-from tasks import add
-
-
-print(">>> from tasks import add")
-print(">>> add(4, 4)")
-res = add(4, 4)
-print(repr(res))
-
-print(">>> add.delay(4, 4)")
-res = add.delay(4, 4)
-print(repr(res))
-
-print(">>> add.delay(4, 4).wait()")
-res = add.delay(4, 4).wait()
-print(repr(res))

+ 1 - 1
funtests/config.py

@@ -1,7 +1,7 @@
 import atexit
 import os
 
-CARROT_BACKEND = os.environ.get("CARROT_BACKEND") or "amqplib"
+BROKER_BACKEND = os.environ.get("BROKER_BACKEND") or "amqplib"
 
 BROKER_HOST = os.environ.get("BROKER_HOST") or "localhost"
 BROKER_USER = os.environ.get("BROKER_USER") or "guest"

+ 1 - 1
setup.cfg

@@ -44,6 +44,6 @@ requires = uuid
            multiprocessing == 2.6.2.1
            python-dateutil
            anyjson
-           carrot >= 0.10.5
+           kombu >= 0.9.1
            SQLAlchemy
            pyparsing

+ 1 - 1
setup.py

@@ -129,7 +129,7 @@ except ImportError:
 install_requires.extend([
     "python-dateutil",
     "anyjson",
-    "kombu>=0.9.0",
+    "kombu>=0.9.1",
     "pyparsing",
 ])