|
@@ -11,10 +11,6 @@ from celery.backends import default_backend, default_periodic_status_backend
|
|
|
from celery.timer import EventTimer
|
|
|
from django.core.mail import mail_admins
|
|
|
from celery.monitoring import TaskTimerStats
|
|
|
-from datetime import datetime, timedelta
|
|
|
-from Queue import Queue
|
|
|
-from Queue import Empty as QueueEmpty
|
|
|
-from multiprocessing import TimeoutError
|
|
|
import multiprocessing
|
|
|
import traceback
|
|
|
import threading
|
|
@@ -349,7 +345,6 @@ class WorkController(object):
|
|
|
self.is_detached = is_detached
|
|
|
self.amqp_connection = None
|
|
|
self.task_consumer = None
|
|
|
- self.bucket_queue = Queue()
|
|
|
|
|
|
def close_connection(self):
|
|
|
"""Close the AMQP connection."""
|
|
@@ -400,15 +395,6 @@ class WorkController(object):
|
|
|
|
|
|
def process_task(self, message_data, message):
|
|
|
"""Process task message by passing it to the pool of workers."""
|
|
|
-
|
|
|
- countdown = message_data.get("countdown")
|
|
|
- eta = message_data.get("eta")
|
|
|
- if countdown:
|
|
|
- eta = datetime.now() + timedelta(seconds=int(countdown))
|
|
|
- if eta:
|
|
|
- self.bucket_queue.put((message, message_data, eta))
|
|
|
- return
|
|
|
-
|
|
|
task = TaskWrapper.from_message(message, message_data,
|
|
|
logger=self.logger)
|
|
|
self.logger.info("Got task from broker: %s[%s]" % (
|
|
@@ -420,7 +406,7 @@ class WorkController(object):
|
|
|
|
|
|
self.logger.debug("Task %s has been executed asynchronously." % task)
|
|
|
|
|
|
- return
|
|
|
+ return result
|
|
|
|
|
|
def shutdown(self):
|
|
|
"""Make sure ``celeryd`` exits cleanly."""
|
|
@@ -451,31 +437,6 @@ class WorkController(object):
|
|
|
|
|
|
try:
|
|
|
while True:
|
|
|
- try:
|
|
|
- self.process_bucket()
|
|
|
- self.process_next(it, timeout=1)
|
|
|
- except TimeoutError:
|
|
|
- pass
|
|
|
+ it.next()
|
|
|
except (SystemExit, KeyboardInterrupt):
|
|
|
self.shutdown()
|
|
|
-
|
|
|
- def process_next(self, it, timeout=1):
|
|
|
- def on_timeout():
|
|
|
- raise TimeoutError()
|
|
|
- timer = threading.Timer(timeout, on_timeout)
|
|
|
- timer.start()
|
|
|
- try:
|
|
|
- it.next()
|
|
|
- finally:
|
|
|
- timer.cancel()
|
|
|
-
|
|
|
- def process_bucket(self):
|
|
|
- try:
|
|
|
- message, msg_data, eta = self.bucket_queue.get_nowait()
|
|
|
- except QueueEmpty:
|
|
|
- pass
|
|
|
- else:
|
|
|
- if datetime.now() >= eta:
|
|
|
- self.process_task(message, msg_data)
|
|
|
- else:
|
|
|
- self.bucket_queue.put((message, msg_data, eta))
|