Ask Solem 15 年之前
父节点
当前提交
a3c2c539ef

+ 2 - 1
celery/backends/base.py

@@ -181,7 +181,8 @@ class BaseDictBackend(BaseBackend):
         self._cache[task_id] = self.get_task_meta(task_id, cache=False)
         self._cache[task_id] = self.get_task_meta(task_id, cache=False)
 
 
     def reload_taskset_result(self, taskset_id):
     def reload_taskset_result(self, taskset_id):
-        self._cache[taskset_id] = self.get_taskset_meta(taskset_id, cache=False)
+        self._cache[taskset_id] = self.get_taskset_meta(taskset_id,
+                                                        cache=False)
 
 
     def get_taskset_meta(self, taskset_id, cache=True):
     def get_taskset_meta(self, taskset_id, cache=True):
         if cache and taskset_id in self._cache:
         if cache and taskset_id in self._cache:

+ 6 - 18
celery/bin/camqadm.py

@@ -3,18 +3,11 @@
 
 
 .. program:: camqadm
 .. program:: camqadm
 
 
-.. cmdoption:: -X, --x
-
-    Description
-
-
 """
 """
-import os
 import cmd
 import cmd
 import sys
 import sys
 import shlex
 import shlex
 import pprint
 import pprint
-import readline
 import optparse
 import optparse
 from itertools import count
 from itertools import count
 
 
@@ -25,7 +18,6 @@ from celery.utils import info
 from celery.utils import padlist
 from celery.utils import padlist
 from celery.messaging import establish_connection
 from celery.messaging import establish_connection
 
 
-
 # Valid string -> bool coercions.
 # Valid string -> bool coercions.
 BOOLS = {"1": True, "0": False,
 BOOLS = {"1": True, "0": False,
          "on": True, "off": False,
          "on": True, "off": False,
@@ -35,12 +27,7 @@ BOOLS = {"1": True, "0": False,
 # Map to coerce strings to other types.
 # Map to coerce strings to other types.
 COERCE = {bool: lambda value: BOOLS[value.lower()]}
 COERCE = {bool: lambda value: BOOLS[value.lower()]}
 
 
-OPTION_LIST = (
-    #optparse.make_option('-c', '--concurrency',
-    #    default=conf.CELERYD_CONCURRENCY,
-    #        action="store", dest="concurrency", type="int",
-    #        help="Number of child processes processing the queue."),
-)
+OPTION_LIST = ()
 
 
 HELP_HEADER = """
 HELP_HEADER = """
 Commands
 Commands
@@ -134,6 +121,7 @@ def dump_message(message):
             "properties": message.properties,
             "properties": message.properties,
             "delivery_info": message.delivery_info}
             "delivery_info": message.delivery_info}
 
 
+
 def format_declare_queue(ret):
 def format_declare_queue(ret):
     return "ok. queue:%s messages:%s consumers:%s." % ret
     return "ok. queue:%s messages:%s consumers:%s." % ret
 
 
@@ -165,9 +153,9 @@ class AMQShell(cmd.Cmd):
     counter = 1
     counter = 1
     inc_counter = count(2).next
     inc_counter = count(2).next
 
 
-    builtins = {"exit": "do_exit",
-                "EOF": "do_exit",
-                "help": "do_help",}
+    builtins = {"EOF": "do_exit",
+                "exit": "do_exit",
+                "help": "do_help"}
 
 
     amqp = {
     amqp = {
         "exchange.declare": Spec(("exchange", str),
         "exchange.declare": Spec(("exchange", str),
@@ -381,7 +369,7 @@ def camqadm(*args, **options):
 
 
 def main():
 def main():
     options, values = parse_options(sys.argv[1:])
     options, values = parse_options(sys.argv[1:])
-    return run_worker(*values, **vars(options))
+    return camqadm(*values, **vars(options))
 
 
 if __name__ == "__main__":
 if __name__ == "__main__":
     main()
     main()

+ 2 - 1
celery/task/base.py

@@ -211,7 +211,8 @@ class Task(object):
 
 
     @classmethod
     @classmethod
     def get_publisher(self, connection=None, exchange=None,
     def get_publisher(self, connection=None, exchange=None,
-            connect_timeout=conf.BROKER_CONNECTION_TIMEOUT,exchange_type=None):
+            connect_timeout=conf.BROKER_CONNECTION_TIMEOUT,
+            exchange_type=None):
         """Get a celery task message publisher.
         """Get a celery task message publisher.
 
 
         :rtype: :class:`celery.messaging.TaskPublisher`.
         :rtype: :class:`celery.messaging.TaskPublisher`.

+ 0 - 1
celery/tests/test_backends/test_amqp.py

@@ -1,5 +1,4 @@
 import sys
 import sys
-import errno
 import unittest2 as unittest
 import unittest2 as unittest
 
 
 from celery.exceptions import ImproperlyConfigured
 from celery.exceptions import ImproperlyConfigured

+ 0 - 1
celery/tests/test_backends/test_redis.py

@@ -1,5 +1,4 @@
 import sys
 import sys
-import errno
 import socket
 import socket
 import unittest2 as unittest
 import unittest2 as unittest
 
 

+ 0 - 1
celery/tests/test_backends/test_tyrant.py

@@ -1,5 +1,4 @@
 import sys
 import sys
-import errno
 import socket
 import socket
 import unittest2 as unittest
 import unittest2 as unittest
 
 

+ 2 - 6
celery/tests/test_loaders.py

@@ -2,8 +2,6 @@ import os
 import sys
 import sys
 import unittest2 as unittest
 import unittest2 as unittest
 
 
-from billiard.utils.functional import wraps
-
 from celery import loaders
 from celery import loaders
 from celery.loaders import base
 from celery.loaders import base
 from celery.loaders import djangoapp
 from celery.loaders import djangoapp
@@ -53,7 +51,6 @@ class TestLoaderBase(unittest.TestCase):
         self.loader.on_worker_init()
         self.loader.on_worker_init()
 
 
     def test_import_task_module(self):
     def test_import_task_module(self):
-        import sys
         self.assertEqual(sys, self.loader.import_task_module("sys"))
         self.assertEqual(sys, self.loader.import_task_module("sys"))
 
 
     def test_conf_property(self):
     def test_conf_property(self):
@@ -62,9 +59,8 @@ class TestLoaderBase(unittest.TestCase):
         self.assertEqual(self.loader.conf.foo, "bar")
         self.assertEqual(self.loader.conf.foo, "bar")
 
 
     def test_import_default_modules(self):
     def test_import_default_modules(self):
-        import os
-        import sys
-        self.assertSameElements(self.loader.import_default_modules(), [os, sys])
+        self.assertSameElements(self.loader.import_default_modules(),
+                                [os, sys])
 
 
 
 
 class TestDjangoLoader(unittest.TestCase):
 class TestDjangoLoader(unittest.TestCase):

+ 0 - 1
celery/tests/test_task_http.py

@@ -1,7 +1,6 @@
 # -*- coding: utf-8 -*-
 # -*- coding: utf-8 -*-
 from __future__ import generators
 from __future__ import generators
 
 
-import sys
 import logging
 import logging
 import unittest2 as unittest
 import unittest2 as unittest
 from urllib import addinfourl
 from urllib import addinfourl

+ 0 - 2
celery/tests/test_utils.py

@@ -2,8 +2,6 @@ import sys
 import socket
 import socket
 import unittest2 as unittest
 import unittest2 as unittest
 
 
-from billiard.utils.functional import wraps
-
 from celery import utils
 from celery import utils
 
 
 from celery.tests.utils import sleepdeprived, execute_context
 from celery.tests.utils import sleepdeprived, execute_context

+ 1 - 3
celery/tests/test_views.py

@@ -1,8 +1,6 @@
 import sys
 import sys
-import unittest
 
 
 from django.http import HttpResponse
 from django.http import HttpResponse
-from django.test.client import Client
 from django.test.testcases import TestCase as DjangoTestCase
 from django.test.testcases import TestCase as DjangoTestCase
 from django.core.urlresolvers import reverse
 from django.core.urlresolvers import reverse
 from django.template import TemplateDoesNotExist
 from django.template import TemplateDoesNotExist
@@ -59,7 +57,7 @@ class TestTaskApply(ViewTestCase):
     def test_apply(self):
     def test_apply(self):
         conf.ALWAYS_EAGER = True
         conf.ALWAYS_EAGER = True
         try:
         try:
-            ret = self.client.get(task_apply(kwargs={"task_name":
+            self.client.get(task_apply(kwargs={"task_name":
                 mytask.name}) + "?x=4&y=4")
                 mytask.name}) + "?x=4&y=4")
             self.assertEqual(scratch["result"], 16)
             self.assertEqual(scratch["result"], 16)
         finally:
         finally:

+ 2 - 2
celery/tests/test_worker.py

@@ -8,9 +8,9 @@ from carrot.backends.base import BaseMessage
 from billiard.serialization import pickle
 from billiard.serialization import pickle
 
 
 from celery import conf
 from celery import conf
-from celery.utils import gen_unique_id, noop
+from celery.utils import gen_unique_id
 from celery.worker import WorkController
 from celery.worker import WorkController
-from celery.worker.listener import CarrotListener, RUN, CLOSE
+from celery.worker.listener import CarrotListener, RUN
 from celery.worker.job import TaskWrapper
 from celery.worker.job import TaskWrapper
 from celery.worker.scheduler import Scheduler
 from celery.worker.scheduler import Scheduler
 from celery.decorators import task as task_dec
 from celery.decorators import task as task_dec

+ 12 - 11
celery/tests/test_worker_job.py

@@ -197,7 +197,7 @@ class TestTaskWrapper(unittest.TestCase):
             tw = TaskWrapper(mytask.name, gen_unique_id(), [1], {"f": "x"})
             tw = TaskWrapper(mytask.name, gen_unique_id(), [1], {"f": "x"})
             try:
             try:
                 raise KeyError("foo")
                 raise KeyError("foo")
-            except KeyError, exc:
+            except KeyError:
                 einfo = ExceptionInfo(sys.exc_info())
                 einfo = ExceptionInfo(sys.exc_info())
 
 
             tw.on_failure(einfo)
             tw.on_failure(einfo)
@@ -242,7 +242,7 @@ class TestTaskWrapper(unittest.TestCase):
     def create_exception(self, exc):
     def create_exception(self, exc):
         try:
         try:
             raise exc
             raise exc
-        except exc.__class__, thrown:
+        except exc.__class__:
             return sys.exc_info()
             return sys.exc_info()
 
 
     def test_worker_task_trace_handle_retry(self):
     def test_worker_task_trace_handle_retry(self):
@@ -376,15 +376,16 @@ class TestTaskWrapper(unittest.TestCase):
     def test_default_kwargs(self):
     def test_default_kwargs(self):
         tid = gen_unique_id()
         tid = gen_unique_id()
         tw = TaskWrapper(mytask.name, tid, [4], {"f": "x"})
         tw = TaskWrapper(mytask.name, tid, [4], {"f": "x"})
-        self.assertDictEqual(tw.extend_with_default_kwargs(10, "some_logfile"), {
-            "f": "x",
-            "logfile": "some_logfile",
-            "loglevel": 10,
-            "task_id": tw.task_id,
-            "task_retries": 0,
-            "task_is_eager": False,
-            "delivery_info": {},
-            "task_name": tw.task_name})
+        self.assertDictEqual(
+                tw.extend_with_default_kwargs(10, "some_logfile"), {
+                    "f": "x",
+                    "logfile": "some_logfile",
+                    "loglevel": 10,
+                    "task_id": tw.task_id,
+                    "task_retries": 0,
+                    "task_is_eager": False,
+                    "delivery_info": {},
+                    "task_name": tw.task_name})
 
 
     def test_on_failure(self):
     def test_on_failure(self):
         tid = gen_unique_id()
         tid = gen_unique_id()

+ 2 - 0
celery/utils/info.py

@@ -39,6 +39,7 @@ def format_routing_table(table=None, indent=0):
                             for name, route in table.items())
                             for name, route in table.items())
     return textindent(routes, indent=indent)
     return textindent(routes, indent=indent)
 
 
+
 def get_broker_info():
 def get_broker_info():
     broker_connection = establish_connection()
     broker_connection = establish_connection()
 
 
@@ -61,6 +62,7 @@ def get_broker_info():
             "port": port,
             "port": port,
             "vhost": vhost}
             "vhost": vhost}
 
 
+
 def format_broker_info(info=None):
 def format_broker_info(info=None):
     """Get message broker connection info string for log dumps."""
     """Get message broker connection info string for log dumps."""
     return BROKER_FORMAT % get_broker_info()
     return BROKER_FORMAT % get_broker_info()

+ 2 - 2
celery/worker/buckets.py

@@ -174,8 +174,8 @@ class TaskBucket(object):
             try:
             try:
                 bucket.clear()
                 bucket.clear()
             except AttributeError:
             except AttributeError:
-                # Probably a Queue, not a TokenBucketQueue. Clear the underlying
-                # deque instead.
+                # Probably a Queue, not a TokenBucketQueue, so clear the
+                # underlying deque instead.
                 bucket.queue.clear()
                 bucket.queue.clear()
 
 
 
 

+ 8 - 0
docs/reference/celery.bin.camqadm.rst

@@ -0,0 +1,8 @@
+===========================================================
+ caqmadm: AMQP API Command-line Shell - celery.bin.camqadm
+===========================================================
+
+.. currentmodule:: celery.bin.camqadm
+
+.. automodule:: celery.bin.camqadm
+    :members:

+ 1 - 0
docs/reference/index.rst

@@ -32,3 +32,4 @@
     celery.bin.celeryd
     celery.bin.celeryd
     celery.bin.celerybeat
     celery.bin.celerybeat
     celery.bin.celeryinit
     celery.bin.celeryinit
+    celery.bin.camqadm

+ 1 - 0
docs/userguide/index.rst

@@ -11,3 +11,4 @@
     tasks
     tasks
     executing
     executing
     remote-tasks
     remote-tasks
+    routing

+ 1 - 1
setup.py

@@ -112,7 +112,7 @@ setup(
             'celeryinit = celery.bin.celeryinit:main',
             'celeryinit = celery.bin.celeryinit:main',
             'celerybeat = celery.bin.celerybeat:main',
             'celerybeat = celery.bin.celerybeat:main',
             'camqadm = celery.bin.camqadm:main',
             'camqadm = celery.bin.camqadm:main',
-            ]
+            ],
     },
     },
     long_description=long_description,
     long_description=long_description,
 )
 )