Browse Source

celeryd: Reject tasks with an eta that cannot be converted to timestamp. Closes #209. Thanks to rlotun

Ask Solem 14 years ago
parent
commit
5c593be0f2
3 changed files with 55 additions and 10 deletions
  1. 27 1
      celery/tests/test_worker.py
  2. 16 6
      celery/utils/timer2.py
  3. 12 3
      celery/worker/listener.py

+ 27 - 1
celery/tests/test_worker.py

@@ -16,6 +16,7 @@ from celery.utils import gen_unique_id
 from celery.worker import WorkController
 from celery.worker.buckets import FastQueue
 from celery.worker.job import TaskRequest
+from celery.worker import listener
 from celery.worker.listener import CarrotListener, QoS, RUN
 
 from celery.tests.compat import catch_warnings
@@ -278,6 +279,31 @@ class test_CarrotListener(unittest.TestCase):
         context = catch_warnings(record=True)
         execute_context(context, with_catch_warnings)
 
+    def test_receive_message_eta_OverflowError(self):
+        l = MyCarrotListener(self.ready_queue, self.eta_schedule, self.logger,
+                             send_events=False)
+        backend = MockBackend()
+        called = [False]
+
+        def to_timestamp(d):
+            called[0] = True
+            raise OverflowError()
+
+        m = create_message(backend, task=foo_task.name,
+                                    args=("2, 2"),
+                                    kwargs={},
+                                    eta=datetime.now().isoformat())
+        l.event_dispatcher = MockEventDispatcher()
+        l.control_dispatch = MockControlDispatch()
+
+        prev, listener.to_timestamp = listener.to_timestamp, to_timestamp
+        try:
+            l.receive_message(m.decode(), m)
+            self.assertTrue(m.acknowledged)
+            self.assertTrue(called[0])
+        finally:
+            listener.to_timestamp = prev
+
     def test_receive_message_InvalidTaskError(self):
         logger = MockLogger()
         l = MyCarrotListener(self.ready_queue, self.eta_schedule, logger,
@@ -335,7 +361,7 @@ class test_CarrotListener(unittest.TestCase):
                 self.prefetch_count_incremented = True
 
         l = MyCarrotListener(self.ready_queue, self.eta_schedule, self.logger,
-                           send_events=False)
+                             send_events=False)
         backend = MockBackend()
         m = create_message(backend, task=foo_task.name,
                            eta=datetime.now().isoformat(),

+ 16 - 6
celery/utils/timer2.py

@@ -44,6 +44,12 @@ class Entry(object):
         self.tref.cancelled = True
 
 
+def to_timestamp(d):
+    if isinstance(d, datetime):
+        return mktime(d.timetuple())
+    return d
+
+
 class Schedule(object):
     """ETA scheduler."""
     on_error = None
@@ -66,12 +72,16 @@ class Schedule(object):
         :keyword priority: Unused.
 
         """
-        if isinstance(eta, datetime):
-            try:
-                eta = mktime(eta.timetuple())
-            except OverflowError:
-                self.handle_error(sys.exc_info())
-        eta = eta or time()
+        try:
+            eta = to_timestamp(eta)
+        except OverflowError:
+            self.handle_error(sys.exc_info())
+            return
+
+        if eta is None:
+            # schedule now.
+            eta = time()
+
         heapq.heappush(self._queue, (eta, priority, entry))
         return entry
 

+ 12 - 3
celery/worker/listener.py

@@ -93,6 +93,7 @@ from celery.worker.heartbeat import Heart
 from celery.events import EventDispatcher
 from celery.exceptions import NotRegistered
 from celery.datastructures import SharedCounter
+from celery.utils.timer2 import to_timestamp
 
 RUN = 0x1
 CLOSE = 0x2
@@ -275,9 +276,17 @@ class CarrotListener(object):
                 expires=task.expires and task.expires.isoformat())
 
         if task.eta:
-            self.qos.increment()
-            self.eta_schedule.apply_at(task.eta,
-                                       self.apply_eta_task, (task, ))
+            try:
+                eta = to_timestamp(task.eta)
+            except OverflowError, exc:
+                self.logger.error(
+                    "Couldn't convert eta %s to timestamp: %r. Task: %r" % (
+                        task.eta, exc, task.info(safe=True)))
+                task.acknowledge()
+            else:
+                self.qos.increment()
+                self.eta_schedule.apply_at(eta,
+                                           self.apply_eta_task, (task, ))
         else:
             self.ready_queue.put(task)