Ver código fonte

Countdown + Eta seems to be working correctly now.

    # Run 10 seconds into the future.
    >>> res = apply_async(MyTask, countdown=10);

    # Run 1 day from now
    >>> res = apply_async(MyTask, eta=datetime.now() + timedelta(days=1)
Ask Solem 16 anos atrás
pai
commit
f2b4da3915
1 arquivos alterados com 36 adições e 10 exclusões
  1. 36 10
      celery/worker.py

+ 36 - 10
celery/worker.py

@@ -296,10 +296,12 @@ class PeriodicWorkController(threading.Thread):
     :class:`celery.task.PeriodicTask` tasks waiting for execution,
     and executes them."""
 
-    def __init__(self):
+    def __init__(self, bucket_queue, hold_queue):
         super(PeriodicWorkController, self).__init__()
         self._shutdown = threading.Event()
         self._stopped = threading.Event()
+        self.hold_queue = hold_queue
+        self.bucket_queue = bucket_queue
 
     def run(self):
         """Run when you use :meth:`Thread.start`"""
@@ -307,9 +309,20 @@ class PeriodicWorkController(threading.Thread):
             if self._shutdown.isSet():
                 break
             default_periodic_status_backend.run_periodic_tasks()
+            self.process_hold_queue()
             time.sleep(1)
         self._stopped.set() # indicate that we are stopped
 
+    def process_hold_queue(self):
+        try:
+            task, eta = self.hold_queue.get_nowait()
+        except QueueEmpty:
+            return
+        if datetime.now() >= eta:
+            self.bucket_queue.put(task)
+        else:
+            self.hold_queue.put((task, eta))
+
     def stop(self):
         """Shutdown the thread."""
         self._shutdown.set()
@@ -360,18 +373,27 @@ class WorkController(object):
 
     def __init__(self, concurrency=None, logfile=None, loglevel=None,
             is_detached=False):
+
+        # Options
         self.loglevel = loglevel or self.loglevel
         self.concurrency = concurrency or self.concurrency
         self.logfile = logfile or self.logfile
-        self.logger = setup_logger(loglevel, logfile)
-        self.pool = TaskPool(self.concurrency, logger=self.logger)
-        self.periodicworkcontroller = PeriodicWorkController()
         self.is_detached = is_detached
-        self.bucket_queue = Queue()
-        self.mediator = Mediator(self.bucket_queue, self.process_task)
+        self.logger = setup_logger(loglevel, logfile)
         self.amqp_connection = None
         self.task_consumer = None
-    
+
+        # Queues
+        self.bucket_queue = Queue()
+        self.hold_queue = Queue()
+
+        # Threads+Pool
+        self.periodicworkcontroller = PeriodicWorkController(
+                                                    self.bucket_queue,
+                                                    self.hold_queue)
+        self.pool = TaskPool(self.concurrency, logger=self.logger)
+        self.mediator = Mediator(self.bucket_queue, self.process_task)
+
     def run(self):
         """Starts the workers main loop."""
         self._state = "RUN"
@@ -409,10 +431,14 @@ class WorkController(object):
         except (SystemExit, KeyboardInterrupt):
             self.shutdown()
 
-    def add_to_bucket(self, message_data, message):
+    def receive_message(self, message_data, message):
         task = TaskWrapper.from_message(message, message_data,
                                         logger=self.logger)
-        self.bucket_queue.put(task)
+        eta = message_data.get("eta")
+        if eta:
+           self.hold_queue.put((task, eta))
+        else:
+            self.bucket_queue.put(task)
 
     def close_connection(self):
         """Close the AMQP connection."""
@@ -431,7 +457,7 @@ class WorkController(object):
         self.close_connection()
         self.amqp_connection = DjangoAMQPConnection()
         self.task_consumer = TaskConsumer(connection=self.amqp_connection)
-        self.task_consumer.register_callback(self.add_to_bucket)
+        self.task_consumer.register_callback(self.receive_message)
         return self.task_consumer
 
     def shutdown(self):