Selaa lähdekoodia

Fixes dump_request example. thanks to ojii

Ask Solem 12 vuotta sitten
vanhempi
commit
1b28f3e6e0
6 muutettua tiedostoa jossa 61 lisäystä ja 30 poistoa
  1. 8 24
      celery/bin/base.py
  2. 8 3
      celery/bin/celery.py
  3. 3 0
      celery/bin/celeryd.py
  4. 24 1
      celery/platforms.py
  5. 2 1
      celery/utils/__init__.py
  6. 16 1
      docs/userguide/tasks.rst

+ 8 - 24
celery/bin/base.py

@@ -69,7 +69,7 @@ from types import ModuleType
 
 import celery
 from celery.exceptions import CDeprecationWarning, CPendingDeprecationWarning
-from celery.platforms import EX_FAILURE, EX_USAGE
+from celery.platforms import EX_FAILURE, EX_USAGE, maybe_patch_concurrency
 from celery.utils import text
 from celery.utils.imports import symbol_by_name, import_from_cwd
 
@@ -165,9 +165,7 @@ class Command(object):
         if argv is None:
             argv = list(sys.argv)
         # Should we load any special concurrency environment?
-        pool_option = self.with_pool_option(argv)
-        if pool_option:
-            self.maybe_patch_concurrency(argv, *pool_option)
+        self.maybe_patch_concurrency(argv)
         self.on_concurrency_setup()
 
         # Dump version and exit if '--version' arg set.
@@ -176,26 +174,12 @@ class Command(object):
         prog_name = os.path.basename(argv[0])
         return self.handle_argv(prog_name, argv[1:])
 
-    def _find_option_with_arg(self, argv, short_opts=None, long_opts=None):
-        for i, arg in enumerate(argv):
-            if arg.startswith('-'):
-                if long_opts and arg.startswith('--'):
-                    name, _, val = arg.partition('=')
-                    if name in long_opts:
-                        return val
-                if short_opts and arg in short_opts:
-                    return argv[i + 1]
-        raise KeyError('|'.join(short_opts or [] + long_opts or []))
-
-    def maybe_patch_concurrency(self, argv, short_opts=None, long_opts=None):
-        try:
-            pool = self._find_option_with_arg(argv, short_opts, long_opts)
-        except KeyError:
-            pass
-        else:
-            from celery import concurrency
-            # set up eventlet/gevent environments ASAP.
-            concurrency.get_implementation(pool)
+    def maybe_patch_concurrency(self, argv=None):
+        argv = argv or sys.argv
+        pool_option = self.with_pool_option(argv)
+        if pool_option:
+            maybe_patch_concurrency(argv, *pool_option)
+            short_opts, long_opts = pool_option
 
     def on_concurrency_setup(self):
         pass

+ 8 - 3
celery/bin/celery.py

@@ -9,11 +9,13 @@ The :program:`celery` umbrella command.
 from __future__ import absolute_import
 from __future__ import with_statement
 
-import anyjson
 import sys
+from celery.platforms import maybe_patch_concurrency
+maybe_patch_concurrency(sys.argv, ['-P'], ['--pool'])
+
+import anyjson
 import warnings
 
-from billiard import freeze_support
 from importlib import import_module
 from pprint import pformat
 
@@ -933,8 +935,11 @@ def main():
     try:
         if __name__ != '__main__':  # pragma: no cover
             sys.modules['__main__'] = sys.modules[__name__]
+        cmd = CeleryCommand()
+        cmd.maybe_patch_concurrency()
+        from billiard import freeze_support
         freeze_support()
-        CeleryCommand().execute_from_commandline()
+        cmd.execute_from_commandline()
     except KeyboardInterrupt:
         pass
 

+ 3 - 0
celery/bin/celeryd.py

@@ -116,6 +116,9 @@ The :program:`celery worker` command (previously known as ``celeryd``)
 from __future__ import absolute_import
 
 import sys
+import sys
+from celery.platforms import maybe_patch_concurrency
+maybe_patch_concurrency(sys.argv, ['-P'], ['--pool'])
 
 from billiard import freeze_support
 

+ 24 - 1
celery/platforms.py

@@ -22,7 +22,6 @@ from contextlib import contextmanager
 
 from .local import try_import
 
-from billiard import current_process
 from kombu.utils.limits import TokenBucket
 
 _setproctitle = try_import('setproctitle')
@@ -67,6 +66,29 @@ def pyimplementation():
         return 'CPython'
 
 
+def _find_option_with_arg(argv, short_opts=None, long_opts=None):
+    for i, arg in enumerate(argv):
+        if arg.startswith('-'):
+            if long_opts and arg.startswith('--'):
+                name, _, val = arg.partition('=')
+                if name in long_opts:
+                    return val
+            if short_opts and arg in short_opts:
+                return argv[i + 1]
+    raise KeyError('|'.join(short_opts or [] + long_opts or []))
+
+
+def maybe_patch_concurrency(argv, short_opts=None, long_opts=None):
+    try:
+        pool = _find_option_with_arg(argv, short_opts, long_opts)
+    except KeyError:
+        pass
+    else:
+        # set up eventlet/gevent environments ASAP.
+        from celery import concurrency
+        concurrency.get_implementation(pool)
+
+
 class LockFailed(Exception):
     """Raised if a pidlock can't be acquired."""
     pass
@@ -594,6 +616,7 @@ else:
 
         """
         if not rate_limit or _setps_bucket.can_consume(1):
+            from billiard import current_process
             if hostname:
                 progname = '%s@%s' % (progname, hostname.split('.')[0])
             return set_process_title(

+ 2 - 1
celery/utils/__init__.py

@@ -12,7 +12,6 @@ from __future__ import with_statement
 import operator
 import os
 import sys
-import threading
 import traceback
 import warnings
 
@@ -147,6 +146,8 @@ def cry():  # pragma: no cover
     From https://gist.github.com/737056
 
     """
+    import threading
+
     tmap = {}
     main_thread = None
     # get a map of threads by their ID so we can print their names

+ 16 - 1
docs/userguide/tasks.rst

@@ -228,9 +228,24 @@ An example task accessing information in the context is:
 .. code-block:: python
 
     @celery.task()
-    def dump_context(x, y):
+    def add(x, y):
         print('Executing task id %r, args: %r kwargs: %r' % (
             add.request.id, add.request.args, add.request.kwargs))
+        return x + y
+
+
+:data:`~celery.current_task` can also be used:
+
+.. code-block:: python
+
+    from celery import current_task
+
+    @celery.task()
+    def add(x, y):
+        request = current_task.request
+        print('Executing task id %r, args: %r kwargs: %r' % (
+            request.id, request.args, request.kwargs))
+        return x + y
 
 .. _task-logging: