Ask Solem 11 yıl önce
ebeveyn
işleme
46c92025cd

+ 0 - 1
celery/_state.py

@@ -32,7 +32,6 @@ _task_join_will_block = False
 
 
 def _set_task_join_will_block(blocks):
-    print('SET TASK JOIN WILL BLOCK: %r' % (blocks, ))
     global _task_join_will_block
     _task_join_will_block = True
 

+ 5 - 2
celery/app/task.py

@@ -602,8 +602,11 @@ class Task(object):
 
         .. code-block:: python
 
-            >>> @task()
-            >>> def tweet(auth, message):
+            >>> from imaginary_twitter_lib import Twitter
+            >>> from proj.celery import app
+
+            >>> @app.task()
+            ... def tweet(auth, message):
             ...     twitter = Twitter(oauth=auth)
             ...     try:
             ...         twitter.post_status_update(message)

+ 3 - 1
celery/app/utils.py

@@ -86,6 +86,7 @@ class Settings(ConfigurationView):
 
         Will return ``(namespace, key, type)`` tuple, e.g.::
 
+            >>> from proj.celery import app
             >>> app.conf.find_option('disable_rate_limits')
             ('CELERY', 'DISABLE_RATE_LIMITS',
              <Option: type->bool default->False>))
@@ -105,7 +106,8 @@ class Settings(ConfigurationView):
 
         Example::
 
-            >>> celery.conf.get_by_parts('CELERY', 'DISABLE_RATE_LIMITS')
+            >>> from proj.celery import app
+            >>> app.conf.get_by_parts('CELERY', 'DISABLE_RATE_LIMITS')
             False
 
         """

+ 0 - 12
celery/bin/amqp.py

@@ -212,13 +212,6 @@ class AMQShell(cmd.Cmd):
 
         :returns: tuple of `(method, processed_args)`.
 
-        Example:
-
-            >>> get_amqp_api_command('queue.delete', ['pobox', 'yes', 'no'])
-            (<bound method Channel.queue_delete of
-             <amqp.channel.Channel object at 0x...>>,
-             ('testfoo', True, False))
-
         """
         spec = self.amqp[cmd]
         args = spec.str_args_to_python(arglist)
@@ -279,11 +272,6 @@ class AMQShell(cmd.Cmd):
         :returns: tuple of three items:
             `(command_name, arglist, original_line)`
 
-        E.g::
-
-            >>> parseline('queue.delete A 'B' C')
-            ('queue.delete', 'A 'B' C', 'queue.delete A 'B' C')
-
         """
         parts = line.split()
         if parts:

+ 0 - 12
celery/bin/celery.py

@@ -513,18 +513,6 @@ class shell(Command):  # pragma: no cover
           xmap, xstarmap subtask, Task
         - all registered tasks.
 
-    Example Session:
-
-    .. code-block:: bash
-
-        $ celery shell
-
-        >>> celery
-        <Celery default:0x1012d9fd0>
-        >>> add
-        <@task: tasks.add>
-        >>> add.delay(2, 2)
-        <AsyncResult: 537b48c7-d6d3-427a-a24a-d1b4414035be>
     """
     option_list = Command.option_list + (
         Option('--ipython', '-I',

+ 7 - 5
celery/canvas.py

@@ -38,11 +38,13 @@ class _getitem_property(object):
 
     Example:
 
-        class Me(dict):
-            deep = defaultdict(dict)
+        >>> from collections import defaultdict
 
-            foo = _getitem_property('foo')
-            deep_thing = _getitem_property('deep.thing')
+        >>> class Me(dict):
+        ...     deep = defaultdict(dict)
+        ...
+        ...     foo = _getitem_property('foo')
+        ...     deep_thing = _getitem_property('deep.thing')
 
 
         >>> me = Me()
@@ -58,7 +60,7 @@ class _getitem_property(object):
         >>> me.deep_thing = 42
         >>> me.deep_thing
         42
-        >>> me.deep:
+        >>> me.deep
         defaultdict(<type 'dict'>, {'thing': 42})
 
     """

+ 0 - 1
celery/concurrency/asynpool.py

@@ -206,7 +206,6 @@ class ResultHandler(_pool.ResultHandler):
         else:
             buf = bufv = BytesIO()
 
-
         while Br < body_size:
             try:
                 n = __read__(

+ 7 - 7
celery/contrib/migrate.py

@@ -321,10 +321,10 @@ def move_by_idmap(map, **kwargs):
 
     Example::
 
-        >>> reroute_idmap({
-        ...     '5bee6e82-f4ac-468e-bd3d-13e8600250bc': Queue(...),
-        ...     'ada8652d-aef3-466b-abd2-becdaf1b82b3': Queue(...),
-        ...     '3a2b140d-7db1-41ba-ac90-c36a0ef4ab1f': Queue(...)},
+        >>> move_by_idmap({
+        ...     '5bee6e82-f4ac-468e-bd3d-13e8600250bc': Queue('name'),
+        ...     'ada8652d-aef3-466b-abd2-becdaf1b82b3': Queue('name'),
+        ...     '3a2b140d-7db1-41ba-ac90-c36a0ef4ab1f': Queue('name')},
         ...   queues=['hipri'])
 
     """
@@ -342,9 +342,9 @@ def move_by_taskmap(map, **kwargs):
 
     Example::
 
-        >>> reroute_idmap({
-        ...     'tasks.add': Queue(...),
-        ...     'tasks.mul': Queue(...),
+        >>> move_by_taskmap({
+        ...     'tasks.add': Queue('name'),
+        ...     'tasks.mul': Queue('name'),
         ... })
 
     """

+ 2 - 1
celery/fixups/django.py

@@ -181,7 +181,8 @@ class DjangoFixup(object):
             if hasattr(self._db, 'close_old_connections'):  # django 1.6
                 funs = [self._db.close_old_connections]
             else:
-                funs = [self._db.close_connection]  # pre multidb, pending deprication in django 1.6
+                # pre multidb, pending deprication in django 1.6
+                funs = [self._db.close_connection]
 
         for close in funs:
             try:

+ 4 - 2
celery/platforms.py

@@ -547,6 +547,7 @@ class Signals(object):
 
         >>> from celery.platforms import signals
 
+        >>> from proj.handlers import my_handler
         >>> signals['INT'] = my_handler
 
         >>> signals['INT']
@@ -566,6 +567,7 @@ class Signals(object):
         >>> signals['USR1'] == signals.default
         True
 
+        >>> from proj.handlers import exit_handler, hup_handler
         >>> signals.update(INT=exit_handler,
         ...                TERM=exit_handler,
         ...                HUP=hup_handler)
@@ -710,8 +712,8 @@ def ignore_errno(*errnos, **kwargs):
     the name of the code, or the code integer itself::
 
         >>> with ignore_errno('ENOENT'):
-        ...     with open('foo', 'r'):
-        ...         return r.read()
+        ...     with open('foo', 'r') as fh:
+        ...         return fh.read()
 
         >>> with ignore_errno(errno.ENOENT, errno.EPERM):
         ...    pass

+ 5 - 2
celery/result.py

@@ -167,6 +167,8 @@ class AsyncResult(ResultBase):
 
         .. code-block:: python
 
+            >>> from proj.tasks import A
+
             >>> result = A.delay(10)
             >>> list(result.collect())
             [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
@@ -658,8 +660,9 @@ class GroupResult(ResultSet):
 
         Example::
 
-            >>> result.save()
-            >>> result = GroupResult.restore(group_id)
+            >>> def save_and_restore(result):
+            ...     result.save()
+            ...     result = GroupResult.restore(result.id)
 
         """
         return (backend or self.app.backend).save_group(self.id, self)

+ 2 - 59
celery/task/base.py

@@ -159,68 +159,11 @@ class PeriodicTask(Task):
 
 
 def task(*args, **kwargs):
-    """Decorator to create a task class out of any callable.
-
-    **Examples**
-
-    .. code-block:: python
-
-        @task()
-        def refresh_feed(url):
-            return Feed.objects.get(url=url).refresh()
-
-    With setting extra options and using retry.
-
-    .. code-block:: python
-
-        @task(max_retries=10)
-        def refresh_feed(url):
-            try:
-                return Feed.objects.get(url=url).refresh()
-            except socket.error as exc:
-                refresh_feed.retry(exc=exc)
-
-    Calling the resulting task:
-
-            >>> refresh_feed('http://example.com/rss') # Regular
-            <Feed: http://example.com/rss>
-            >>> refresh_feed.delay('http://example.com/rss') # Async
-            <AsyncResult: 8998d0f4-da0b-4669-ba03-d5ab5ac6ad5d>
-    """
+    """Deprecated decorators, please use :meth:`~@task`."""
     return current_app.task(*args, **dict({'accept_magic_kwargs': False,
                                            'base': Task}, **kwargs))
 
 
 def periodic_task(*args, **options):
-    """Decorator to create a task class out of any callable.
-
-        .. admonition:: Examples
-
-            .. code-block:: python
-
-                @task()
-                def refresh_feed(url):
-                    return Feed.objects.get(url=url).refresh()
-
-            With setting extra options and using retry.
-
-            .. code-block:: python
-
-                from celery.task import current
-
-                @task(exchange='feeds')
-                def refresh_feed(url):
-                    try:
-                        return Feed.objects.get(url=url).refresh()
-                    except socket.error as exc:
-                        current.retry(exc=exc)
-
-            Calling the resulting task:
-
-                >>> refresh_feed('http://example.com/rss') # Regular
-                <Feed: http://example.com/rss>
-                >>> refresh_feed.delay('http://example.com/rss') # Async
-                <AsyncResult: 8998d0f4-da0b-4669-ba03-d5ab5ac6ad5d>
-
-    """
+    """Deprecated decorator, please use :setting:`CELERYBEAT_SCHEDULE`."""
     return task(**dict({'base': PeriodicTask}, **options))

+ 2 - 0
celery/task/sets.py

@@ -31,6 +31,8 @@ class TaskSet(list):
 
     Example::
 
+        >>> from myproj.tasks import refresh_feed
+
         >>> urls = ('http://cnn.com/rss', 'http://bbc.co.uk/rss')
         >>> s = TaskSet(refresh_feed.s(url) for url in urls)
         >>> taskset_result = s.apply_async()

+ 0 - 3
celery/tests/bin/test_worker.py

@@ -277,9 +277,6 @@ class test_Worker(WorkerAppCase):
         if app.IS_WINDOWS:
             raise SkipTest('Not applicable on Windows')
 
-        def getuid():
-            return 0
-
         with patch('os.getuid') as getuid:
             getuid.return_value = 0
             self.app.conf.CELERY_ACCEPT_CONTENT = ['pickle']

+ 2 - 2
celery/tests/case.py

@@ -507,14 +507,14 @@ def mask_modules(*modnames):
 
     For example:
 
-        >>> with missing_modules('sys'):
+        >>> with mask_modules('sys'):
         ...     try:
         ...         import sys
         ...     except ImportError:
         ...         print 'sys not found'
         sys not found
 
-        >>> import sys
+        >>> import sys  # noqa
         >>> sys.version
         (2, 5, 2, 'final', 0)
 

+ 3 - 26
celery/utils/__init__.py

@@ -130,27 +130,7 @@ def is_iterable(obj):
 
 
 def fun_takes_kwargs(fun, kwlist=[]):
-    """With a function, and a list of keyword arguments, returns arguments
-    in the list which the function takes.
-
-    If the object has an `argspec` attribute that is used instead
-    of using the :meth:`inspect.getargspec` introspection.
-
-    :param fun: The function to inspect arguments of.
-    :param kwlist: The list of keyword arguments.
-
-    Examples
-
-        >>> def foo(self, x, y, logfile=None, loglevel=None):
-        ...     return x * y
-        >>> fun_takes_kwargs(foo, ['logfile', 'loglevel', 'task_id'])
-        ['logfile', 'loglevel']
-
-        >>> def foo(self, x, y, **kwargs):
-        >>> fun_takes_kwargs(foo, ['logfile', 'loglevel', 'task_id'])
-        ['logfile', 'loglevel', 'task_id']
-
-    """
+    # deprecated
     S = getattr(fun, 'argspec', getargspec(fun))
     if S.keywords is not None:
         return kwlist
@@ -165,11 +145,8 @@ def isatty(fh):
 
 
 def cry(out=None, sepchr='=', seplen=49):  # pragma: no cover
-    """Return stacktrace of all active threads.
-
-    From https://gist.github.com/737056
-
-    """
+    """Return stacktrace of all active threads,
+    taken from https://gist.github.com/737056."""
     import threading
 
     out = StringIO() if out is None else out

+ 1 - 3
celery/utils/dispatch/saferef.py

@@ -243,9 +243,7 @@ class BoundNonDescriptorMethodWeakref(BoundMethodWeakref):  # pragma: no cover
             which will be passed a pointer to this object.
 
         """
-        assert getattr(target.__self__, target.__name__) == target, \
-            "method %s isn't available as the attribute %s of %s" % (
-            target, target.__name__, target.__self__)
+        assert getattr(target.__self__, target.__name__) == target
         super(BoundNonDescriptorMethodWeakref, self).__init__(target,
                                                               on_delete)
 

+ 3 - 2
celery/utils/functional.py

@@ -250,8 +250,9 @@ def padlist(container, size, default=None):
         ('George', 'Costanza', 'NYC')
         >>> first, last, city = padlist(['George', 'Costanza'], 3)
         ('George', 'Costanza', None)
-        >>> first, last, city, planet = padlist(['George', 'Costanza',
-                                                 'NYC'], 4, default='Earth')
+        >>> first, last, city, planet = padlist(
+        ...     ['George', 'Costanza', 'NYC'], 4, default='Earth',
+        ... )
         ('George', 'Costanza', 'NYC', 'Earth')
 
     """

+ 0 - 1
celery/utils/log.py

@@ -53,7 +53,6 @@ def set_in_sighandler(value):
 
 def iter_open_logger_fds():
     seen = set()
-    loggers = logging.Logger.manager.loggerDict
     for logger in values(logging.Logger.manager.loggerDict):
         try:
             for handler in logger.handlers:

+ 10 - 7
celery/utils/serialization.py

@@ -85,13 +85,16 @@ class UnpickleableExceptionWrapper(Exception):
 
     .. code-block:: python
 
-        >>> try:
-        ...     something_raising_unpickleable_exc()
-        >>> except Exception as e:
-        ...     exc = UnpickleableException(e.__class__.__module__,
-        ...                                 e.__class__.__name__,
-        ...                                 e.args)
-        ...     pickle.dumps(exc) # Works fine.
+        >>> def pickle_it(raising_function):
+        ...     try:
+        ...         raising_function()
+        ...     except Exception as e:
+        ...         exc = UnpickleableExceptionWrapper(
+        ...             e.__class__.__module__,
+        ...             e.__class__.__name__,
+        ...             e.args,
+        ...         )
+        ...         pickle.dumps(exc)  # Works fine.
 
     """
 

+ 1 - 1
celery/worker/consumer.py

@@ -250,7 +250,7 @@ class Consumer(object):
     def _update_qos_eventually(self, index):
         return (self.qos.decrement_eventually if index < 0
                 else self.qos.increment_eventually)(
-                    abs(index) * self.prefetch_multiplier)
+            abs(index) * self.prefetch_multiplier)
 
     def _limit_task(self, request, bucket, tokens):
         if not bucket.can_consume(tokens):

+ 1 - 1
celery/worker/control.py

@@ -64,7 +64,7 @@ def query_task(state, ids, **kwargs):
         )
     ))
 
-    return req
+    return reqs
 
 
 @Panel.register