Ask Solem vor 14 Jahren
Ursprung
Commit
2726f9db69
2 geänderte Dateien mit 33 neuen und 37 gelöschten Zeilen
  1. 2 3
      celery/backends/cassandra.py
  2. 31 34
      celery/worker/consumer.py

+ 2 - 3
celery/backends/cassandra.py

@@ -14,7 +14,7 @@ from datetime import datetime
 from celery.backends.base import BaseDictBackend
 from celery.exceptions import ImproperlyConfigured
 from celery.utils.serialization import pickle
-from celery.utils.timeutils import maybe_timedelta
+from celery.utils.timeutils import maybe_timedelta, timedelta_seconds
 from celery import states
 
 
@@ -126,12 +126,11 @@ class CassandraBackend(BaseDictBackend):
         """Store return value and status of an executed task."""
         cf = self._get_column_family()
         date_done = datetime.utcnow()
-        lifetime = self.result_expires.days * 86400 + self.result_expires.seconds
         meta = {"status": status,
                 "result": pickle.dumps(result),
                 "date_done": date_done.strftime('%Y-%m-%dT%H:%M:%SZ'),
                 "traceback": pickle.dumps(traceback)}
-        cf.insert(task_id, meta, ttl=lifetime)
+        cf.insert(task_id, meta, ttl=timedelta_seconds(self.result_expires))
 
     @_retry_on_error
     def _get_task_meta_for(self, task_id):

+ 31 - 34
celery/worker/consumer.py

@@ -255,7 +255,7 @@ class Consumer(object):
     #: The current worker pool instance.
     pool = None
 
-    #: A timer used for high-priority internal tasks, such 
+    #: A timer used for high-priority internal tasks, such
     #: as sending heartbeats.
     priority_timer = None
 
@@ -393,42 +393,40 @@ class Consumer(object):
         :param message: The kombu message object.
 
         """
-
-        # Handle task
-        if body.get("task"):
-            # need to guard against errors occuring while acking the message.
-            def ack():
-                try:
-                    message.ack()
-                except self.connection_errors + (AttributeError, ), exc:
-                    self.logger.critical(
-                        "Couldn't ack %r: body:%r reason:%r" % (
-                            message.delivery_tag, safe_str(body), exc))
-
+        # need to guard against errors occuring while acking the message.
+        def ack():
             try:
-                task = TaskRequest.from_message(message, body, ack,
-                                                app=self.app,
-                                                logger=self.logger,
-                                                hostname=self.hostname,
-                                                eventer=self.event_dispatcher)
-
-            except NotRegistered, exc:
-                self.logger.error(UNKNOWN_TASK_ERROR % (
-                        exc, safe_str(body)), exc_info=sys.exc_info())
-                message.ack()
-            except InvalidTaskError, exc:
-                self.logger.error(INVALID_TASK_ERROR % (
-                        str(exc), safe_str(body)), exc_info=sys.exc_info())
                 message.ack()
-            else:
-                self.on_task(task)
+            except self.connection_errors + (AttributeError, ), exc:
+                self.logger.critical(
+                    "Couldn't ack %r: body:%r reason:%r" % (
+                        message.delivery_tag, safe_str(body), exc))
+
+        if not body.get("task"):
+            warnings.warn(RuntimeWarning(
+                "Received and deleted unknown message. Wrong destination?!? \
+                the full contents of the message body was: %s" % (
+                 safe_str(body), )))
+            ack()
             return
 
-        warnings.warn(RuntimeWarning(
-            "Received and deleted unknown message. Wrong destination?!? \
-             the full contents of the message body was: %s" % (
-                 safe_str(body), )))
-        message.ack()
+        try:
+            task = TaskRequest.from_message(message, body, ack,
+                                            app=self.app,
+                                            logger=self.logger,
+                                            hostname=self.hostname,
+                                            eventer=self.event_dispatcher)
+
+        except NotRegistered, exc:
+            self.logger.error(UNKNOWN_TASK_ERROR % (
+                    exc, safe_str(body)), exc_info=sys.exc_info())
+            ack()
+        except InvalidTaskError, exc:
+            self.logger.error(INVALID_TASK_ERROR % (
+                    str(exc), safe_str(body)), exc_info=sys.exc_info())
+            ack()
+        else:
+            self.on_task(task)
 
     def maybe_conn_error(self, fun):
         """Applies function but ignores any connection or channel
@@ -644,4 +642,3 @@ class Consumer(object):
 
     def _debug(self, msg, **kwargs):
         self.logger.debug("Consumer: %s" % (msg, ), **kwargs)
-