Browse Source

Various fixes

Ask Solem 11 years ago
parent
commit
6b8b3aea75
4 changed files with 32 additions and 21 deletions
  1. 1 1
      celery/app/base.py
  2. 19 19
      celery/app/task.py
  3. 0 1
      celery/backends/amqp.py
  4. 12 0
      funtests/stress/stress/app.py

+ 1 - 1
celery/app/base.py

@@ -599,7 +599,7 @@ class Celery(object):
         conf = self.conf
         conf = self.conf
         tz = conf.CELERY_TIMEZONE
         tz = conf.CELERY_TIMEZONE
         if not tz:
         if not tz:
-            return (timezone.get_timezone('UTC') if conf.CELERY_USE_UTC
+            return (timezone.get_timezone('UTC') if conf.CELERY_ENABLE_UTC
                     else timezone.local)
                     else timezone.local)
         return timezone.get_timezone(self.conf.CELERY_TIMEZONE)
         return timezone.get_timezone(self.conf.CELERY_TIMEZONE)
 App = Celery  # compat
 App = Celery  # compat

+ 19 - 19
celery/app/task.py

@@ -578,31 +578,31 @@ class Task(object):
             countdown = self.default_retry_delay
             countdown = self.default_retry_delay
 
 
         is_eager = request.is_eager
         is_eager = request.is_eager
+        S = self.subtask_from_request(
+            request, args, kwargs,
+            countdown=countdown, eta=eta, retries=retries,
+            **options
+        )
+
+        if max_retries is not None and retries > max_retries:
+            if exc:
+                maybe_reraise()
+            raise self.MaxRetriesExceededError(
+                "Can't retry {0}[{1}] args:{2} kwargs:{3}".format(
+                    self.name, request.id, S.args, S.kwargs))
+
+        # If task was executed eagerly using apply(),
+        # then the retry must also be executed eagerly.
         try:
         try:
-            S = self.subtask_from_request(
-                request, args, kwargs,
-                countdown=countdown, eta=eta, retries=retries,
-                **options
-            )
-
-            if max_retries is not None and retries > max_retries:
-                if exc:
-                    maybe_reraise()
-                raise self.MaxRetriesExceededError(
-                    "Can't retry {0}[{1}] args:{2} kwargs:{3}".format(
-                        self.name, request.id, S.args, S.kwargs))
-
-            # If task was executed eagerly using apply(),
-            # then the retry must also be executed eagerly.
             S.apply().get() if is_eager else S.apply_async()
             S.apply().get() if is_eager else S.apply_async()
-            ret = Retry(exc=exc, when=eta or countdown)
-            if throw:
-                raise ret
-            return ret
         except Exception as exc:
         except Exception as exc:
             if is_eager:
             if is_eager:
                 raise
                 raise
             raise Reject(exc, requeue=True)
             raise Reject(exc, requeue=True)
+        ret = Retry(exc=exc, when=eta or countdown)
+        if throw:
+            raise ret
+        return ret
 
 
     def apply(self, args=None, kwargs=None,
     def apply(self, args=None, kwargs=None,
               link=None, link_error=None, **options):
               link=None, link_error=None, **options):

+ 0 - 1
celery/backends/amqp.py

@@ -152,7 +152,6 @@ class AMQPBackend(BaseBackend):
             binding.declare()
             binding.declare()
 
 
             prev = latest = acc = None
             prev = latest = acc = None
-            print('binding.get: %r' % (binding.get, ))
             for i in range(backlog_limit):  # spool ffwd
             for i in range(backlog_limit):  # spool ffwd
                 prev, latest, acc = latest, acc, binding.get(
                 prev, latest, acc = latest, acc, binding.get(
                     accept=self.accept, no_ack=False,
                     accept=self.accept, no_ack=False,

+ 12 - 0
funtests/stress/stress/app.py

@@ -41,6 +41,11 @@ def add(x, y):
     return x + y
     return x + y
 
 
 
 
+@app.task
+def xsum(x):
+    return sum(x)
+
+
 @app.task
 @app.task
 def any_(*args, **kwargs):
 def any_(*args, **kwargs):
     wait = kwargs.get('sleep')
     wait = kwargs.get('sleep')
@@ -71,6 +76,13 @@ def sleeping_ignore_limits(i):
         sleep(i)
         sleep(i)
 
 
 
 
+@app.task(bind=True)
+def retries(self):
+    if not self.request.retries:
+        raise self.retry(countdown=1)
+    return 10
+
+
 @app.task
 @app.task
 def segfault():
 def segfault():
     import ctypes
     import ctypes