|
@@ -2,7 +2,6 @@
|
|
|
from carrot.connection import DjangoAMQPConnection
|
|
|
from celery.messaging import TaskConsumer
|
|
|
from celery.conf import DAEMON_CONCURRENCY, DAEMON_LOG_FILE
|
|
|
-from celery.conf import QUEUE_WAKEUP_AFTER, EMPTY_MSG_EMIT_EVERY
|
|
|
from celery.conf import SEND_CELERY_TASK_ERROR_EMAILS
|
|
|
from celery.log import setup_logger
|
|
|
from celery.registry import tasks
|
|
@@ -16,6 +15,7 @@ import multiprocessing
|
|
|
import traceback
|
|
|
import threading
|
|
|
import logging
|
|
|
+import signal
|
|
|
import socket
|
|
|
import time
|
|
|
import sys
|
|
@@ -31,16 +31,12 @@ The contents of the full traceback was:
|
|
|
|
|
|
%%(traceback)s
|
|
|
|
|
|
-%%(EMAIL_SIGNATURE_SEP)s
|
|
|
+%(EMAIL_SIGNATURE_SEP)s
|
|
|
Just thought I'd let you know!
|
|
|
celeryd at %%(hostname)s.
|
|
|
""" % {"EMAIL_SIGNATURE_SEP": EMAIL_SIGNATURE_SEP}
|
|
|
|
|
|
|
|
|
-class EmptyQueue(Exception):
|
|
|
- """The message queue is currently empty."""
|
|
|
-
|
|
|
-
|
|
|
class UnknownTask(Exception):
|
|
|
"""Got an unknown task in the queue. The message is requeued and
|
|
|
ignored."""
|
|
@@ -93,9 +89,6 @@ def jail(task_id, task_name, func, args, kwargs):
|
|
|
# Backend process cleanup
|
|
|
default_backend.process_cleanup()
|
|
|
|
|
|
- # Convert any unicode keys in the keyword arguments to ascii.
|
|
|
- kwargs = dict([(k.encode("utf-8"), v)
|
|
|
- for k, v in kwargs.items()])
|
|
|
try:
|
|
|
result = func(*args, **kwargs)
|
|
|
except (SystemExit, KeyboardInterrupt):
|
|
@@ -191,6 +184,11 @@ class TaskWrapper(object):
|
|
|
task_id = message_data["id"]
|
|
|
args = message_data["args"]
|
|
|
kwargs = message_data["kwargs"]
|
|
|
+
|
|
|
+ # Convert any unicode keys in the keyword arguments to ascii.
|
|
|
+ kwargs = dict([(key.encode("utf-8"), value)
|
|
|
+ for key, value in kwargs.items()])
|
|
|
+
|
|
|
if task_name not in tasks:
|
|
|
raise UnknownTask(task_name)
|
|
|
task_func = tasks[task_name]
|
|
@@ -286,7 +284,7 @@ class PeriodicWorkController(threading.Thread):
|
|
|
super(PeriodicWorkController, self).__init__()
|
|
|
self._shutdown = threading.Event()
|
|
|
self._stopped = threading.Event()
|
|
|
-
|
|
|
+
|
|
|
def run(self):
|
|
|
"""Don't use :meth:`run`. use :meth:`start`."""
|
|
|
while True:
|
|
@@ -310,8 +308,6 @@ class WorkController(object):
|
|
|
|
|
|
:param loglevel: see :attr:`loglevel`.
|
|
|
|
|
|
- :param queue_wakeup_after: see :attr:`queue_wakeup_after`.
|
|
|
-
|
|
|
|
|
|
.. attribute:: concurrency
|
|
|
|
|
@@ -327,18 +323,6 @@ class WorkController(object):
|
|
|
The logfile used, if no logfile is specified it uses ``stderr``
|
|
|
(default: :const:`celery.conf.DAEMON_LOG_FILE`).
|
|
|
|
|
|
- .. attribute:: queue_wakeup_after
|
|
|
-
|
|
|
- The time it takes for the daemon to wake up after the queue is empty,
|
|
|
- so it can check for more work
|
|
|
- (default: :const:`celery.conf.QUEUE_WAKEUP_AFTER`).
|
|
|
-
|
|
|
- .. attribute:: empty_msg_emit_every
|
|
|
-
|
|
|
- How often the daemon emits the ``"Waiting for queue..."`` message.
|
|
|
- If this is ``None``, the message will never be logged.
|
|
|
- (default: :const:`celery.conf.EMPTY_MSG_EMIT_EVERY`)
|
|
|
-
|
|
|
.. attribute:: logger
|
|
|
|
|
|
The :class:`logging.Logger` instance used for logging.
|
|
@@ -355,23 +339,24 @@ class WorkController(object):
|
|
|
loglevel = logging.ERROR
|
|
|
concurrency = DAEMON_CONCURRENCY
|
|
|
logfile = DAEMON_LOG_FILE
|
|
|
- queue_wakeup_after = QUEUE_WAKEUP_AFTER
|
|
|
- empty_msg_emit_every = EMPTY_MSG_EMIT_EVERY
|
|
|
|
|
|
def __init__(self, concurrency=None, logfile=None, loglevel=None,
|
|
|
- queue_wakeup_after=None, is_detached=False):
|
|
|
+ is_detached=False):
|
|
|
self.loglevel = loglevel or self.loglevel
|
|
|
self.concurrency = concurrency or self.concurrency
|
|
|
self.logfile = logfile or self.logfile
|
|
|
- self.queue_wakeup_after = queue_wakeup_after or \
|
|
|
- self.queue_wakeup_after
|
|
|
self.logger = setup_logger(loglevel, logfile)
|
|
|
self.pool = TaskPool(self.concurrency, logger=self.logger)
|
|
|
self.periodicworkcontroller = PeriodicWorkController()
|
|
|
- self.task_consumer = None
|
|
|
- self.task_consumer_it = None
|
|
|
self.is_detached = is_detached
|
|
|
- self.reset_connection()
|
|
|
+ self.amqp_connection = None
|
|
|
+ self.task_consumer = None
|
|
|
+
|
|
|
+ def close_connection(self):
|
|
|
+ if self.task_consumer:
|
|
|
+ self.task_consumer.close()
|
|
|
+ if self.amqp_connection:
|
|
|
+ self.amqp_connection.close()
|
|
|
|
|
|
def reset_connection(self):
|
|
|
"""Reset the AMQP connection, and reinitialize the
|
|
@@ -380,11 +365,10 @@ class WorkController(object):
|
|
|
Resets the task consumer in :attr:`task_consumer`.
|
|
|
|
|
|
"""
|
|
|
- if self.task_consumer:
|
|
|
- self.task_consumer.connection.close()
|
|
|
- amqp_connection = DjangoAMQPConnection()
|
|
|
- self.task_consumer = TaskConsumer(connection=amqp_connection)
|
|
|
- self.task_consumer_it = self.task_consumer.iterqueue(infinite=True)
|
|
|
+ self.close_connection()
|
|
|
+ self.amqp_connection = DjangoAMQPConnection()
|
|
|
+ self.task_consumer = TaskConsumer(connection=self.amqp_connection)
|
|
|
+ return self.task_consumer
|
|
|
|
|
|
def connection_diagnostics(self):
|
|
|
"""Diagnose the AMQP connection, and reset connection if
|
|
@@ -396,19 +380,24 @@ class WorkController(object):
|
|
|
"AMQP Connection has died, restoring connection.")
|
|
|
self.reset_connection()
|
|
|
|
|
|
- def receive_message(self):
|
|
|
- """Receive the next message from the message broker.
|
|
|
-
|
|
|
- Tries to reset the AMQP connection if not available.
|
|
|
- Returns ``None`` if no message is waiting on the queue.
|
|
|
-
|
|
|
- :rtype: :class:`carrot.messaging.Message` instance.
|
|
|
-
|
|
|
- """
|
|
|
- message = self.task_consumer_it.next()
|
|
|
- if not message:
|
|
|
- raise EmptyQueue()
|
|
|
- return message
|
|
|
+ def _message_callback(self, message_data, message):
|
|
|
+ try:
|
|
|
+ try:
|
|
|
+ self.process_task(message)
|
|
|
+ except ValueError:
|
|
|
+ # execute_next_task didn't return a r/name/id tuple,
|
|
|
+ # probably because it got an exception.
|
|
|
+ pass
|
|
|
+ except UnknownTask, exc:
|
|
|
+ self.logger.info("Unknown task ignored: %s" % (exc))
|
|
|
+ except Exception, exc:
|
|
|
+ self.logger.critical("Message queue raised %s: %s\n%s" % (
|
|
|
+ exc.__class__, exc, traceback.format_exc()))
|
|
|
+ except:
|
|
|
+ self.shutdown()
|
|
|
+ raise
|
|
|
+ except (SystemExit, KeyboardInterrupt):
|
|
|
+ self.shutdown()
|
|
|
|
|
|
def process_task(self, message):
|
|
|
"""Process task message by passing it to the pool of workers."""
|
|
@@ -424,31 +413,17 @@ class WorkController(object):
|
|
|
|
|
|
return result
|
|
|
|
|
|
- def execute_next_task(self):
|
|
|
- """Execute the next task on the queue using the multiprocessing pool.
|
|
|
-
|
|
|
- Catches all exceptions and logs them with level
|
|
|
- :const:`logging.CRITICAL`.
|
|
|
-
|
|
|
- Raises :exc:`EmptyQueue` exception if there is no message
|
|
|
- waiting on the queue.
|
|
|
-
|
|
|
- """
|
|
|
- self.process_task(self.receive_message())
|
|
|
-
|
|
|
- def schedule_retry_tasks(self):
|
|
|
- """Reschedule all requeued tasks waiting for retry."""
|
|
|
- pass
|
|
|
-
|
|
|
def shutdown(self):
|
|
|
# shut down the periodic work controller thread
|
|
|
self.periodicworkcontroller.stop()
|
|
|
self.pool.terminate()
|
|
|
+ self.close_connection()
|
|
|
|
|
|
def run(self):
|
|
|
"""Starts the workers main loop."""
|
|
|
- log_wait = lambda: self.logger.debug("Waiting for queue...")
|
|
|
- ev_msg_waiting = EventTimer(log_wait, self.empty_msg_emit_every)
|
|
|
+ task_consumer = self.reset_connection()
|
|
|
+ task_consumer.register_callback(self._message_callback)
|
|
|
+ it = task_consumer.iterconsume(limit=None)
|
|
|
|
|
|
self.pool.run()
|
|
|
self.periodicworkcontroller.start()
|
|
@@ -462,26 +437,7 @@ class WorkController(object):
|
|
|
time.sleep(1)
|
|
|
|
|
|
try:
|
|
|
- while True:
|
|
|
- try:
|
|
|
- self.execute_next_task()
|
|
|
- except ValueError:
|
|
|
- # execute_next_task didn't return a r/name/id tuple,
|
|
|
- # probably because it got an exception.
|
|
|
- continue
|
|
|
- except EmptyQueue:
|
|
|
- ev_msg_waiting.tick()
|
|
|
- time.sleep(self.queue_wakeup_after)
|
|
|
- continue
|
|
|
- except UnknownTask, exc:
|
|
|
- self.logger.info("Unknown task ignored: %s" % (exc))
|
|
|
- continue
|
|
|
- except Exception, exc:
|
|
|
- self.logger.critical("Message queue raised %s: %s\n%s" % (
|
|
|
- exc.__class__, exc, traceback.format_exc()))
|
|
|
- continue
|
|
|
- except:
|
|
|
- self.shutdown()
|
|
|
- raise
|
|
|
+ while True:
|
|
|
+ it.next()
|
|
|
except (SystemExit, KeyboardInterrupt):
|
|
|
self.shutdown()
|