Ask Solem 13 gadi atpakaļ
vecāks
revīzija
a9d898ddef

+ 2 - 1
celery/platforms.py

@@ -567,7 +567,8 @@ if os.environ.get("NOSETPS"):
         pass
 else:
 
-    def set_mp_process_title(progname, info=None, hostname=None, rate_limit=False):
+    def set_mp_process_title(progname, info=None, hostname=None,  # noqa
+            rate_limit=False):
         """Set the ps name using the multiprocessing process name.
 
         Only works if :mod:`setproctitle` is installed.

+ 2 - 2
celery/tests/test_worker/test_worker_control.py

@@ -39,8 +39,8 @@ class Consumer(object):
 
     def __init__(self):
         self.ready_queue = FastQueue()
-        self.ready_queue.put(TaskRequest(task_name=mytask.name,
-                                         task_id=uuid(),
+        self.ready_queue.put(TaskRequest(mytask.name,
+                                         uuid(),
                                          args=(2, 2),
                                          kwargs={}))
         self.eta_schedule = Timer()

+ 3 - 0
celery/tests/test_worker/test_worker_heartbeat.py

@@ -10,6 +10,9 @@ class MockDispatcher(object):
 
     def __init__(self):
         self.sent = []
+        self.on_enabled = set()
+        self.on_disabled = set()
+        self.enabled = True
 
     def send(self, msg, **_fields):
         self.sent.append(msg)

+ 7 - 6
celery/tests/test_worker/test_worker_job.py

@@ -45,7 +45,7 @@ def jail(task_id, name, args, kwargs):
     return eager_trace_task(tasks[name], task_id, args, kwargs, eager=False)[0]
 
 
-def on_ack():
+def on_ack(*args, **kwargs):
     scratch["ACK"] = True
 
 
@@ -213,10 +213,10 @@ class test_TaskRequest(unittest.TestCase):
         mytask.ignore_result = True
         try:
             tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
-            self.assertFalse(tw._store_errors)
+            self.assertFalse(tw.store_errors)
             mytask.store_errors_even_if_ignored = True
             tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
-            self.assertTrue(tw._store_errors)
+            self.assertTrue(tw.store_errors)
         finally:
             mytask.ignore_result = False
             mytask.store_errors_even_if_ignored = False
@@ -462,10 +462,10 @@ class test_TaskRequest(unittest.TestCase):
             tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
             tw.logger = MockLogger()
         finally:
-            mytask.ignore_result = False
             tw.on_timeout(soft=True, timeout=1336)
             self.assertEqual(mytask.backend.get_status(tw.task_id),
                              states.PENDING)
+            mytask.ignore_result = False
 
     def test_execute_and_trace(self):
         res = execute_and_trace(mytask.name, uuid(), [4], {})
@@ -551,8 +551,9 @@ class test_TaskRequest(unittest.TestCase):
         self.assertEqual(tw.task_id, body["id"])
         self.assertEqual(tw.args, body["args"])
         us = from_utf8(us)
-        self.assertEqual(tw.kwargs.keys()[0], us)
-        self.assertIsInstance(tw.kwargs.keys()[0], str)
+        if sys.version_info < (2, 6):
+            self.assertEqual(tw.kwargs.keys()[0], us)
+            self.assertIsInstance(tw.kwargs.keys()[0], str)
         self.assertTrue(tw.logger)
 
     def test_from_message_empty_args(self):

+ 3 - 3
celery/worker/consumer.py

@@ -416,7 +416,7 @@ class Consumer(object):
                 "Received and deleted unknown message. Wrong destination?!? \
                 the full contents of the message body was: %s" % (
                  self._message_report(body, message), )))
-            ack()
+            message.ack_log_error(self.logger, self.connection_errors)
             return
 
         try:
@@ -424,11 +424,11 @@ class Consumer(object):
         except KeyError, exc:
             self.logger.error(UNKNOWN_TASK_ERROR, exc, safe_repr(body),
                               exc_info=sys.exc_info())
-            ack()
+            message.ack_log_error(self.logger, self.connection_errors)
         except InvalidTaskError, exc:
             self.logger.error(INVALID_TASK_ERROR, str(exc), safe_repr(body),
                               exc_info=sys.exc_info())
-            ack()
+            message.ack_log_error(self.logger, self.connection_errors)
 
     def maybe_conn_error(self, fun):
         """Applies function but ignores any connection or channel

+ 5 - 0
celery/worker/job.py

@@ -135,6 +135,11 @@ class TaskRequest(object):
             logger=None, eventer=None, eta=None, expires=None, app=None,
             taskset=None, chord=None, utc=False, connection_errors=None,
             **opts):
+        try:
+            kwargs.items
+        except AttributeError:
+            raise exceptions.InvalidTaskError(
+                    "Task keyword arguments is not a mapping")
         self.app = app or app_or_default(app)
         self.name = task
         self.id = id

+ 1 - 1
celery/worker/state.py

@@ -55,7 +55,7 @@ task_reserved = reserved_requests.add
 def task_accepted(request):
     """Updates global state when a task has been accepted."""
     active_requests.add(request)
-    #total_count[request.task_name] += 1
+    total_count[request.task_name] += 1
 
 
 def task_ready(request):