浏览代码

Raise our pylint score to 8.24/10

Ask Solem 16 年之前
父节点
当前提交
ced0b64ad2

+ 3 - 1
celery/backends/base.py

@@ -4,7 +4,6 @@ try:
     import cPickle as pickle
     import cPickle as pickle
 except ImportError:
 except ImportError:
     import pickle
     import pickle
-import sys
 
 
 
 
 def find_nearest_pickleable_exception(exc):
 def find_nearest_pickleable_exception(exc):
@@ -97,11 +96,13 @@ class BaseBackend(object):
         return self.store_result(task_id, exc, status="FAILURE")
         return self.store_result(task_id, exc, status="FAILURE")
 
 
     def create_exception_cls(self, name, module, parent=None):
     def create_exception_cls(self, name, module, parent=None):
+        """Dynamically create an exception class."""
         if not parent:
         if not parent:
             parent = Exception
             parent = Exception
         return type(name, (parent, ), {"__module__": module})
         return type(name, (parent, ), {"__module__": module})
 
 
     def prepare_exception(self, exc):
     def prepare_exception(self, exc):
+        """Prepare exception for serialization."""
         nearest = find_nearest_pickleable_exception(exc)
         nearest = find_nearest_pickleable_exception(exc)
         if nearest:
         if nearest:
             return nearest
             return nearest
@@ -118,6 +119,7 @@ class BaseBackend(object):
             return exc
             return exc
 
 
     def exception_to_python(self, exc):
     def exception_to_python(self, exc):
+        """Convert serialized exception to Python exception."""
         if isinstance(exc, UnpickleableExceptionWrapper):
         if isinstance(exc, UnpickleableExceptionWrapper):
             exc_cls = self.create_exception_cls(exc.exc_cls_name,
             exc_cls = self.create_exception_cls(exc.exc_cls_name,
                                                 exc.exc_module)
                                                 exc.exc_module)

+ 2 - 0
celery/backends/cache.py

@@ -17,6 +17,7 @@ class Backend(BaseBackend):
         self._cache = {}
         self._cache = {}
 
 
     def _cache_key(self, task_id):
     def _cache_key(self, task_id):
+        """Get the cache key for a task by id."""
         return "celery-task-meta-%s" % task_id
         return "celery-task-meta-%s" % task_id
 
 
     def store_result(self, task_id, result, status):
     def store_result(self, task_id, result, status):
@@ -45,6 +46,7 @@ class Backend(BaseBackend):
         return self.get_status(task_id) == "DONE"
         return self.get_status(task_id) == "DONE"
 
 
     def _get_task_meta_for(self, task_id):
     def _get_task_meta_for(self, task_id):
+        """Get the task metadata for a task by id."""
         if task_id in self._cache:
         if task_id in self._cache:
             return self._cache[task_id]
             return self._cache[task_id]
         meta = cache.get(self._cache_key(task_id))
         meta = cache.get(self._cache_key(task_id))

+ 2 - 0
celery/backends/database.py

@@ -13,6 +13,7 @@ class Backend(BaseBackend):
         self._cache = {}
         self._cache = {}
 
 
     def run_periodic_tasks(self):
     def run_periodic_tasks(self):
+        """Run all waiting periodic tasks."""
         waiting_tasks = PeriodicTaskMeta.objects.get_waiting_tasks()
         waiting_tasks = PeriodicTaskMeta.objects.get_waiting_tasks()
         for waiting_task in waiting_tasks:
         for waiting_task in waiting_tasks:
             waiting_task.delay()
             waiting_task.delay()
@@ -42,6 +43,7 @@ class Backend(BaseBackend):
             return meta.result
             return meta.result
 
 
     def _get_task_meta_for(self, task_id):
     def _get_task_meta_for(self, task_id):
+        """Get task metadata for a task by id."""
         if task_id in self._cache:
         if task_id in self._cache:
             return self._cache[task_id]
             return self._cache[task_id]
         meta = TaskMeta.objects.get_task(task_id)
         meta = TaskMeta.objects.get_task(task_id)

+ 2 - 0
celery/backends/tyrant.py

@@ -57,6 +57,7 @@ class Backend(BaseBackend):
         return pytyrant.PyTyrant.open(self.tyrant_host, self.tyrant_port)
         return pytyrant.PyTyrant.open(self.tyrant_host, self.tyrant_port)
 
 
     def _cache_key(self, task_id):
     def _cache_key(self, task_id):
+        """Get the cache key for a task by id."""
         return "celery-task-meta-%s" % task_id
         return "celery-task-meta-%s" % task_id
 
 
     def store_result(self, task_id, result, status):
     def store_result(self, task_id, result, status):
@@ -85,6 +86,7 @@ class Backend(BaseBackend):
         return self.get_status(task_id) == "DONE"
         return self.get_status(task_id) == "DONE"
 
 
     def _get_task_meta_for(self, task_id):
     def _get_task_meta_for(self, task_id):
+        """Get task metadata for a task by id."""
         if task_id in self._cache:
         if task_id in self._cache:
             return self._cache[task_id]
             return self._cache[task_id]
         meta = self.get_server().get(self._cache_key(task_id))
         meta = self.get_server().get(self._cache_key(task_id))

+ 5 - 3
celery/bin/celeryd.py

@@ -40,7 +40,7 @@ if django_project_dir:
 
 
 from django.conf import settings
 from django.conf import settings
 from celery.platform import PIDFile, daemonize, remove_pidfile
 from celery.platform import PIDFile, daemonize, remove_pidfile
-from celery.log import setup_logger, emergency_error
+from celery.log import emergency_error
 from celery.conf import LOG_LEVELS, DAEMON_LOG_FILE, DAEMON_LOG_LEVEL
 from celery.conf import LOG_LEVELS, DAEMON_LOG_FILE, DAEMON_LOG_LEVEL
 from celery.conf import DAEMON_CONCURRENCY, DAEMON_PID_FILE
 from celery.conf import DAEMON_CONCURRENCY, DAEMON_PID_FILE
 from celery.conf import QUEUE_WAKEUP_AFTER
 from celery.conf import QUEUE_WAKEUP_AFTER
@@ -54,6 +54,7 @@ import atexit
 def main(concurrency=DAEMON_CONCURRENCY, daemon=False,
 def main(concurrency=DAEMON_CONCURRENCY, daemon=False,
         loglevel=DAEMON_LOG_LEVEL, logfile=DAEMON_LOG_FILE,
         loglevel=DAEMON_LOG_LEVEL, logfile=DAEMON_LOG_FILE,
         pidfile=DAEMON_PID_FILE, queue_wakeup_after=QUEUE_WAKEUP_AFTER):
         pidfile=DAEMON_PID_FILE, queue_wakeup_after=QUEUE_WAKEUP_AFTER):
+    """Run the celery daemon."""
     if settings.DATABASE_ENGINE == "sqlite3" and concurrency > 1:
     if settings.DATABASE_ENGINE == "sqlite3" and concurrency > 1:
         import warnings
         import warnings
         warnings.warn("The sqlite3 database engine doesn't support "
         warnings.warn("The sqlite3 database engine doesn't support "
@@ -82,7 +83,7 @@ def main(concurrency=DAEMON_CONCURRENCY, daemon=False,
                             e.__class__, e, traceback.format_exc()))
                             e.__class__, e, traceback.format_exc()))
 
 
 
 
-option_list = (
+OPTION_LIST = (
     optparse.make_option('-c', '--concurrency', default=DAEMON_CONCURRENCY,
     optparse.make_option('-c', '--concurrency', default=DAEMON_CONCURRENCY,
             action="store", dest="concurrency", type="int",
             action="store", dest="concurrency", type="int",
             help="Number of child processes processing the queue."),
             help="Number of child processes processing the queue."),
@@ -107,7 +108,8 @@ option_list = (
 
 
 
 
 def parse_options(arguments):
 def parse_options(arguments):
-    parser = optparse.OptionParser(option_list=option_list)
+    """Option parsers for the available options to ``celeryd``."""
+    parser = optparse.OptionParser(option_list=OPTION_LIST)
     options, values = parser.parse_args(arguments)
     options, values = parser.parse_args(arguments)
     if not isinstance(options.loglevel, int):
     if not isinstance(options.loglevel, int):
         options.loglevel = LOG_LEVELS[options.loglevel.upper()]
         options.loglevel = LOG_LEVELS[options.loglevel.upper()]

+ 11 - 1
celery/fields.py

@@ -1,3 +1,8 @@
+"""
+
+Custom Django Model Fields.
+
+"""
 from django.db import models
 from django.db import models
 
 
 try:
 try:
@@ -14,9 +19,11 @@ class PickledObject(str):
 
 
 
 
 class PickledObjectField(models.Field):
 class PickledObjectField(models.Field):
+    """A field that automatically pickles/unpickles its value."""
     __metaclass__ = models.SubfieldBase
     __metaclass__ = models.SubfieldBase
 
 
     def to_python(self, value):
     def to_python(self, value):
+        """Convert the database value to a python value."""
         if isinstance(value, PickledObject):
         if isinstance(value, PickledObject):
             # If the value is a definite pickle; and an error is
             # If the value is a definite pickle; and an error is
             # raised in de-pickling it should be allowed to propogate.
             # raised in de-pickling it should be allowed to propogate.
@@ -24,19 +31,22 @@ class PickledObjectField(models.Field):
         else:
         else:
             try:
             try:
                 return pickle.loads(str(value))
                 return pickle.loads(str(value))
-            except:
+            except pickle.PickleError:
                 # If an error was raised, just return the plain value
                 # If an error was raised, just return the plain value
                 return value
                 return value
 
 
     def get_db_prep_save(self, value):
     def get_db_prep_save(self, value):
+        """get_db_prep_save"""
         if value is not None and not isinstance(value, PickledObject):
         if value is not None and not isinstance(value, PickledObject):
             value = PickledObject(pickle.dumps(value))
             value = PickledObject(pickle.dumps(value))
         return value
         return value
 
 
     def get_internal_type(self):
     def get_internal_type(self):
+        """The database field type used by this field."""
         return 'TextField'
         return 'TextField'
 
 
     def get_db_prep_lookup(self, lookup_type, value):
     def get_db_prep_lookup(self, lookup_type, value):
+        """get_db_prep_lookup"""
         if lookup_type == 'exact':
         if lookup_type == 'exact':
             value = self.get_db_prep_save(value)
             value = self.get_db_prep_save(value)
             return super(PickledObjectField, self).get_db_prep_lookup(
             return super(PickledObjectField, self).get_db_prep_lookup(

+ 9 - 3
celery/management/commands/celeryd.py

@@ -1,14 +1,20 @@
-from django.core.management.base import BaseCommand
+"""
+
+Start the celery daemon from the Django management command.
 
 
-from celery.bin.celeryd import main, option_list
+"""
+from django.core.management.base import BaseCommand
+from celery.bin.celeryd import main, OPTION_LIST
 from celery.conf import LOG_LEVELS
 from celery.conf import LOG_LEVELS
 
 
 
 
 class Command(BaseCommand):
 class Command(BaseCommand):
-    option_list = BaseCommand.option_list + option_list
+    """Run the celery daemon."""
+    option_list = BaseCommand.option_list + OPTION_LIST
     help = 'Run the celery daemon'
     help = 'Run the celery daemon'
 
 
     def handle(self, *args, **options):
     def handle(self, *args, **options):
+        """Handle the management command."""
         if not isinstance(options.get('loglevel'), int):
         if not isinstance(options.get('loglevel'), int):
             options['loglevel'] = LOG_LEVELS[options.get('loglevel').upper()]
             options['loglevel'] = LOG_LEVELS[options.get('loglevel').upper()]
         main(concurrency=options.get('concurrency'),
         main(concurrency=options.get('concurrency'),

+ 1 - 0
celery/messaging.py

@@ -40,6 +40,7 @@ class TaskPublisher(Publisher):
 
 
     def _delay_task(self, task_name, task_id=None, part_of_set=None,
     def _delay_task(self, task_name, task_id=None, part_of_set=None,
             task_args=None, task_kwargs=None, **kwargs):
             task_args=None, task_kwargs=None, **kwargs):
+        """INTERNAL"""
         priority = kwargs.get("priority")
         priority = kwargs.get("priority")
         immediate = kwargs.get("immediate")
         immediate = kwargs.get("immediate")
         mandatory = kwargs.get("mandatory")
         mandatory = kwargs.get("mandatory")

+ 11 - 34
celery/models.py

@@ -1,42 +1,13 @@
+"""
+
+Django Models.
+
+"""
 from django.db import models
 from django.db import models
 from celery.registry import tasks
 from celery.registry import tasks
 from celery.managers import TaskManager, PeriodicTaskManager
 from celery.managers import TaskManager, PeriodicTaskManager
 from celery.fields import PickledObjectField
 from celery.fields import PickledObjectField
 from django.utils.translation import ugettext_lazy as _
 from django.utils.translation import ugettext_lazy as _
-from Queue import Queue
-
-
-class RetryQueue(object):
-    queue = Queue()
-
-    class Item(object):
-
-        def __init__(self, task_name, task_id, args, kwargs):
-            self.task_name = task_name
-            self.task_id = task_id
-            self.args = args
-            self.kwargs = kwargs
-            self.retry_count = 0
-
-        def retry(self):
-            self.task.requeue(self.task_id, self.args, self.kwargs)
-            self.retry_count += 1
-            self.last_retry_at = time.time()
-            return self.retry_count
-
-        @property
-        def task(self):
-            return tasks[self.task_name]
-
-    def put(self, task_name, task_id, args, kwargs):
-        self.queue.put(self.Item(task_name, task_id, args, kwargs))
-
-    def get(self):
-        if not self.queue.qsize():
-            return None
-        return self.queue.get()
-retry_queue = RetryQueue()
-
 
 
 TASK_STATUS_PENDING = "PENDING"
 TASK_STATUS_PENDING = "PENDING"
 TASK_STATUS_RETRY = "RETRY"
 TASK_STATUS_RETRY = "RETRY"
@@ -48,6 +19,7 @@ TASK_STATUSES_CHOICES = zip(TASK_STATUSES, TASK_STATUSES)
 
 
 
 
 class TaskMeta(models.Model):
 class TaskMeta(models.Model):
+    """Task result/status."""
     task_id = models.CharField(_(u"task id"), max_length=255, unique=True)
     task_id = models.CharField(_(u"task id"), max_length=255, unique=True)
     status = models.CharField(_(u"task status"), max_length=50,
     status = models.CharField(_(u"task status"), max_length=50,
             default=TASK_STATUS_PENDING, choices=TASK_STATUSES_CHOICES)
             default=TASK_STATUS_PENDING, choices=TASK_STATUSES_CHOICES)
@@ -57,6 +29,7 @@ class TaskMeta(models.Model):
     objects = TaskManager()
     objects = TaskManager()
 
 
     class Meta:
     class Meta:
+        """Model meta-data."""
         verbose_name = _(u"task meta")
         verbose_name = _(u"task meta")
         verbose_name_plural = _(u"task meta")
         verbose_name_plural = _(u"task meta")
 
 
@@ -65,6 +38,7 @@ class TaskMeta(models.Model):
 
 
 
 
 class PeriodicTaskMeta(models.Model):
 class PeriodicTaskMeta(models.Model):
+    """Information about a Periodic Task."""
     name = models.CharField(_(u"name"), max_length=255, unique=True)
     name = models.CharField(_(u"name"), max_length=255, unique=True)
     last_run_at = models.DateTimeField(_(u"last time run"),
     last_run_at = models.DateTimeField(_(u"last time run"),
                                        auto_now=True, blank=True)
                                        auto_now=True, blank=True)
@@ -74,6 +48,7 @@ class PeriodicTaskMeta(models.Model):
     objects = PeriodicTaskManager()
     objects = PeriodicTaskManager()
 
 
     class Meta:
     class Meta:
+        """Model meta-data."""
         verbose_name = _(u"periodic task")
         verbose_name = _(u"periodic task")
         verbose_name_plural = _(u"periodic tasks")
         verbose_name_plural = _(u"periodic tasks")
 
 
@@ -82,10 +57,12 @@ class PeriodicTaskMeta(models.Model):
                 self.name, self.last_run_at, self.total_run_count)
                 self.name, self.last_run_at, self.total_run_count)
 
 
     def delay(self, *args, **kwargs):
     def delay(self, *args, **kwargs):
+        """Apply the periodic task immediately."""
         self.task.delay()
         self.task.delay()
         self.total_run_count = self.total_run_count + 1
         self.total_run_count = self.total_run_count + 1
         self.save()
         self.save()
 
 
     @property
     @property
     def task(self):
     def task(self):
+        """The entry registered in the task registry for this task."""
         return tasks[self.name]
         return tasks[self.name]

+ 35 - 0
celery/pool.py

@@ -1,3 +1,8 @@
+"""
+
+Process Pools.
+
+"""
 import multiprocessing
 import multiprocessing
 import itertools
 import itertools
 import threading
 import threading
@@ -33,15 +38,25 @@ class TaskPool(object):
         self.reap_timeout = reap_timeout
         self.reap_timeout = reap_timeout
         self._process_counter = itertools.count(1)
         self._process_counter = itertools.count(1)
         self._processed_total = 0
         self._processed_total = 0
+        self._pool = None
+        self._processes = None
 
 
     def run(self):
     def run(self):
+        """Run the task pool.
+        
+        Will launch all worker processes so they are ready
+        for processing tasks.
+
+        """
         self._start()
         self._start()
 
 
     def _start(self):
     def _start(self):
+        """INTERNAL: Starts the pool. Used by :meth:`run`."""
         self._processes = {}
         self._processes = {}
         self._pool = multiprocessing.Pool(processes=self.limit)
         self._pool = multiprocessing.Pool(processes=self.limit)
 
 
     def _terminate_and_restart(self):
     def _terminate_and_restart(self):
+        """INTERNAL: Terminate and restart the pool."""
         try:
         try:
             self._pool.terminate()
             self._pool.terminate()
         except OSError:
         except OSError:
@@ -49,6 +64,7 @@ class TaskPool(object):
         self._start()
         self._start()
 
 
     def _restart(self):
     def _restart(self):
+        """INTERNAL: Close and restart the pool."""
         self.logger.info("Closing and restarting the pool...")
         self.logger.info("Closing and restarting the pool...")
         self._pool.close()
         self._pool.close()
         timeout_thread = threading.Timer(30.0, self._terminate_and_restart)
         timeout_thread = threading.Timer(30.0, self._terminate_and_restart)
@@ -58,10 +74,21 @@ class TaskPool(object):
         self._start()
         self._start()
 
 
     def _pool_is_running(self):
     def _pool_is_running(self):
+        """Check if the pool is in the run state.
+
+        :returns: ``True`` if the pool is running.
+
+        """
         return self._pool._state == POOL_STATE_RUN
         return self._pool._state == POOL_STATE_RUN
 
 
     def apply_async(self, target, args=None, kwargs=None, callbacks=None,
     def apply_async(self, target, args=None, kwargs=None, callbacks=None,
             errbacks=None, meta=None):
             errbacks=None, meta=None):
+        """Equivalent of the :func:``apply`` built-in function.
+
+        All ``callbacks`` and ``errbacks`` should complete immediately since
+        otherwise the thread which handles the result will get blocked.
+
+        """
         args = args or []
         args = args or []
         kwargs = kwargs or {}
         kwargs = kwargs or {}
         callbacks = callbacks or []
         callbacks = callbacks or []
@@ -84,6 +111,7 @@ class TaskPool(object):
         return result
         return result
 
 
     def on_return(self, ret_val, tid, callbacks, errbacks, meta):
     def on_return(self, ret_val, tid, callbacks, errbacks, meta):
+        """What to do when the process returns."""
         try:
         try:
             del(self._processes[tid])
             del(self._processes[tid])
         except KeyError:
         except KeyError:
@@ -119,6 +147,12 @@ class TaskPool(object):
             self.wait_for_result()
             self.wait_for_result()
 
 
     def full(self):
     def full(self):
+        """Is the pool full?
+
+        :returns: ``True`` if the maximum number of concurrent processes
+            has been reached.
+            
+        """
         return len(self._processes.values()) >= self.limit
         return len(self._processes.values()) >= self.limit
 
 
     def wait_for_result(self):
     def wait_for_result(self):
@@ -132,6 +166,7 @@ class TaskPool(object):
                 break
                 break
 
 
     def reap(self):
     def reap(self):
+        """Reap finished tasks."""
         self.logger.debug("Reaping processes...")
         self.logger.debug("Reaping processes...")
         processes_reaped = 0
         processes_reaped = 0
         for process_no, entry in enumerate(self._processes.items()):
         for process_no, entry in enumerate(self._processes.items()):

+ 1 - 0
celery/registry.py

@@ -1,3 +1,4 @@
+"""celery.registry"""
 from celery import discovery
 from celery import discovery
 from UserDict import UserDict
 from UserDict import UserDict
 
 

+ 2 - 8
celery/task.py

@@ -239,14 +239,6 @@ class Task(object):
         """
         """
         return TaskConsumer(connection=DjangoAMQPConnection())
         return TaskConsumer(connection=DjangoAMQPConnection())
 
 
-    def requeue(self, task_id, args, kwargs):
-        publisher = self.get_publisher()
-        publisher.requeue_task(self.name, task_id, args, kwargs)
-        publisher.connection.close()
-
-    def retry(self, task_id, args, kwargs):
-        retry_queue.put(self.name, task_id, args, kwargs)
-
     @classmethod
     @classmethod
     def delay(cls, *args, **kwargs):
     def delay(cls, *args, **kwargs):
         """Delay this task for execution by the ``celery`` daemon(s).
         """Delay this task for execution by the ``celery`` daemon(s).
@@ -441,6 +433,7 @@ class AsynchronousMapTask(Task):
     name = "celery.map_async"
     name = "celery.map_async"
 
 
     def run(self, serfunc, args, **kwargs):
     def run(self, serfunc, args, **kwargs):
+        """The method run by ``celeryd``."""
         timeout = kwargs.get("timeout")
         timeout = kwargs.get("timeout")
         return TaskSet.map(pickle.loads(serfunc), args, timeout=timeout)
         return TaskSet.map(pickle.loads(serfunc), args, timeout=timeout)
 tasks.register(AsynchronousMapTask)
 tasks.register(AsynchronousMapTask)
@@ -566,6 +559,7 @@ class DeleteExpiredTaskMetaTask(PeriodicTask):
     run_every = timedelta(days=1)
     run_every = timedelta(days=1)
 
 
     def run(self, **kwargs):
     def run(self, **kwargs):
+        """The method run by ``celeryd``."""
         logger = self.get_logger(**kwargs)
         logger = self.get_logger(**kwargs)
         logger.info("Deleting expired task meta objects...")
         logger.info("Deleting expired task meta objects...")
         default_backend.cleanup()
         default_backend.cleanup()

+ 5 - 0
celery/urls.py

@@ -1,3 +1,8 @@
+"""
+
+URLs defined for celery.
+
+"""
 from django.conf.urls.defaults import patterns, url
 from django.conf.urls.defaults import patterns, url
 from celery import views
 from celery import views
 
 

+ 3 - 7
celery/views.py

@@ -1,14 +1,10 @@
 """celery.views"""
 """celery.views"""
-from django.http import Http404, HttpResponse
+from django.http import HttpResponse
 from celery.task import is_done, delay_task
 from celery.task import is_done, delay_task
 from celery.result import AsyncResult
 from celery.result import AsyncResult
 import simplejson
 import simplejson
 
 
 
 
-def apply_async(request, task_name, *args, **kwargs):
-    res = delay_task(task_name, args, kwargs)
-
-
 def is_task_done(request, task_id):
 def is_task_done(request, task_id):
     """Returns task execute status in JSON format."""
     """Returns task execute status in JSON format."""
     response_data = {"task": {"id": task_id, "executed": is_done(task_id)}}
     response_data = {"task": {"id": task_id, "executed": is_done(task_id)}}
@@ -20,7 +16,7 @@ def task_status(request, task_id):
     async_result = AsyncResult(task_id)
     async_result = AsyncResult(task_id)
     response_data = {"task": {
     response_data = {"task": {
                         "id": task_id,
                         "id": task_id,
-                        "status": async_result.get_status(),
-                        "result": async_result.get_result(),
+                        "status": async_result.status,
+                        "result": async_result.result,
     }}
     }}
     return HttpResponse(simplejson.dumps(response_data))
     return HttpResponse(simplejson.dumps(response_data))

+ 4 - 1
celery/worker.py

@@ -133,8 +133,9 @@ class TaskWrapper(object):
         self.task_func = task_func
         self.task_func = task_func
         self.args = args
         self.args = args
         self.kwargs = kwargs
         self.kwargs = kwargs
+        self.logger = kwargs.get("logger")
         for opt in ("success_msg", "fail_msg", "fail_email_subject",
         for opt in ("success_msg", "fail_msg", "fail_email_subject",
-                "fail_email_body", "logger"):
+                "fail_email_body"):
             setattr(self, opt, opts.get(opt, getattr(self, opt, None)))
             setattr(self, opt, opts.get(opt, getattr(self, opt, None)))
         if not self.logger:
         if not self.logger:
             self.logger = multiprocessing.get_logger()
             self.logger = multiprocessing.get_logger()
@@ -308,6 +309,7 @@ class WorkController(object):
         self.logger = setup_logger(loglevel, logfile)
         self.logger = setup_logger(loglevel, logfile)
         self.pool = TaskPool(self.concurrency, logger=self.logger)
         self.pool = TaskPool(self.concurrency, logger=self.logger)
         self.task_consumer = None
         self.task_consumer = None
+        self.task_consumer_it = None
         self.is_detached = is_detached
         self.is_detached = is_detached
         self.reset_connection()
         self.reset_connection()
 
 
@@ -349,6 +351,7 @@ class WorkController(object):
         return message
         return message
 
 
     def process_task(self, message):
     def process_task(self, message):
+        """Process task message by passing it to the pool of workers."""
         task = TaskWrapper.from_message(message, logger=self.logger)
         task = TaskWrapper.from_message(message, logger=self.logger)
         self.logger.info("Got task from broker: %s[%s]" % (
         self.logger.info("Got task from broker: %s[%s]" % (
             task.task_name, task.task_id))
             task.task_name, task.task_id))