Ask Solem %!s(int64=13) %!d(string=hai) anos
pai
achega
632d80f86c

+ 1 - 1
celery/concurrency/processes/__init__.py

@@ -111,7 +111,6 @@ class TaskPool(BasePool):
     def set_on_process_started(self, callback):
         self._pool.on_process_created
 
-
     def _get_on_process_started(self):
         return self._pool.on_process_started
 
@@ -119,6 +118,7 @@ class TaskPool(BasePool):
         self._pool.on_process_started = fun
     on_process_started = property(_get_on_process_started,
                                   _set_on_process_started)
+
     def _get_on_process_down(self):
         return self._pool.on_process_down
 

+ 2 - 2
celery/task/trace.py

@@ -128,7 +128,7 @@ class TraceInfo(object):
             del(tb)
 
 
-def execute_bare(task, uuid, args, kwargs, request=None):
+def execute_bare(task, uuid, args, kwargs, request=None, Info=TraceInfo):
     R = I = None
     kwargs = kwdict(kwargs)
     try:
@@ -146,7 +146,7 @@ def execute_bare(task, uuid, args, kwargs, request=None):
             # (but deprecated)
             I = Info(FAILURE, None)
             state, retval = I.state, I.retval
-            R = I.handle_error_state(task, eager=eager)
+            R = I.handle_error_state(task)
     except Exception, exc:
         R = report_internal_error(task, exc)
     return R

+ 4 - 4
celery/tests/tasks/test_tasks.py

@@ -114,18 +114,18 @@ class test_task_retries(Case):
     def test_retry(self):
         retry_task.__class__.max_retries = 3
         retry_task.iterations = 0
-        result = retry_task.apply([0xFF, 0xFFFF])
+        retry_task.apply([0xFF, 0xFFFF])
         self.assertEqual(retry_task.iterations, 4)
 
         retry_task.__class__.max_retries = 3
         retry_task.iterations = 0
-        result = retry_task.apply([0xFF, 0xFFFF], {"max_retries": 10})
+        retry_task.apply([0xFF, 0xFFFF], {"max_retries": 10})
         self.assertEqual(retry_task.iterations, 11)
 
     def test_retry_no_args(self):
         retry_task_noargs.__class__.max_retries = 3
         retry_task_noargs.iterations = 0
-        result = retry_task_noargs.apply()
+        retry_task_noargs.apply()
         self.assertEqual(retry_task_noargs.iterations, 4)
 
     def test_retry_kwargs_can_be_empty(self):
@@ -154,7 +154,7 @@ class test_task_retries(Case):
     def test_retry_with_kwargs(self):
         retry_task_customexc.__class__.max_retries = 3
         retry_task_customexc.iterations = 0
-        result = retry_task_customexc.apply([0xFF, 0xFFFF], {"kwarg": 0xF})
+        retry_task_customexc.apply([0xFF, 0xFFFF], {"kwarg": 0xF})
         self.assertEqual(retry_task_customexc.iterations, 4)
 
     def test_retry_with_custom_exception(self):

+ 1 - 2
celery/worker/consumer.py

@@ -366,7 +366,6 @@ class Consumer(object):
 
         with self.hub as hub:
             qos = self.qos
-            concurrency = self.pool.num_processes
             update_qos = qos.update
             update_readers = hub.update_readers
             fdmap = hub.fdmap
@@ -444,7 +443,7 @@ class Consumer(object):
                             except Empty:
                                 break
                             except socket.error:
-                                if self._state != CLOSE:        # pragma: no cover
+                                if self._state != CLOSE:  # pragma: no cover
                                     raise
                         if buffer:
                             flush_buffer()

+ 5 - 8
celery/worker/hub.py

@@ -1,9 +1,7 @@
 from __future__ import absolute_import
 
-from collections import deque
-
 from kombu.utils import cached_property
-from kombu.utils.eventio import poll, POLL_READ, POLL_ERR, POLL_WRITE
+from kombu.utils.eventio import poll, READ, WRITE, ERR
 
 from celery.utils.timer2 import Schedule
 
@@ -40,7 +38,7 @@ class BoundedSemaphore(object):
 
 
 class Hub(object):
-    eventflags = POLL_READ | POLL_ERR
+    READ, WRITE, ERR = READ, WRITE, ERR
 
     def __init__(self, timer=None):
         self.fdmap = {}
@@ -63,8 +61,7 @@ class Hub(object):
                 self.timer.apply_entry(entry)
         return min(max(delay, min_delay), max_delay)
 
-    def add(self, fd, callback, flags=None):
-        flags = self.eventflags if flags is None else flags
+    def add(self, fd, callback, flags):
         self.poller.register(fd, flags)
         try:
             fileno = fd.fileno()
@@ -73,10 +70,10 @@ class Hub(object):
         self.fdmap[fileno] = callback
 
     def add_reader(self, fd, callback):
-        return self.add(fd, callback, POLL_READ|POLL_ERR)
+        return self.add(fd, callback, READ | ERR)
 
     def add_writer(self, fd, callback):
-        return self.add(fd, callback, POLL_WRITE)
+        return self.add(fd, callback, WRITE)
 
     def update_readers(self, *maps):
         [self.add_reader(*x) for row in maps for x in row.iteritems()]