Browse Source

Docstringimprovements

Ask Solem 14 years ago
parent
commit
c34097d89c
2 changed files with 76 additions and 70 deletions
  1. 60 48
      celery/worker/job.py
  2. 16 22
      celery/worker/state.py

+ 60 - 48
celery/worker/job.py

@@ -23,6 +23,8 @@ from celery.worker import state
 # pep8.py borks on a inline signature separator and
 # says "trailing whitespace" ;)
 EMAIL_SIGNATURE_SEP = "-- "
+
+#: format string for the body of an error e-mail.
 TASK_ERROR_EMAIL_BODY = """
 Task %%(name)s with id %%(id)s raised exception:\n%%(exc)s
 
@@ -39,16 +41,20 @@ celeryd at %%(hostname)s.
 """ % {"EMAIL_SIGNATURE_SEP": EMAIL_SIGNATURE_SEP}
 
 
+#: keys to keep from the message delivery info.  The values
+#: of these keys must be pickleable.
 WANTED_DELIVERY_INFO = ("exchange", "routing_key", "consumer_tag", )
 
 
 class InvalidTaskError(Exception):
     """The task has invalid data or is not properly constructed."""
+    pass
 
 
 class AlreadyExecutedError(Exception):
     """Tasks can only be executed once, as they might change
     world-wide state."""
+    pass
 
 
 class WorkerTaskTrace(TaskTrace):
@@ -76,6 +82,12 @@ class WorkerTaskTrace(TaskTrace):
 
     """
 
+    #: current loader.
+    loader = None
+
+    #: hostname to report as.
+    hostname = None
+
     def __init__(self, *args, **kwargs):
         self.loader = kwargs.get("loader") or app_or_default().loader
         self.hostname = kwargs.get("hostname") or socket.gethostname()
@@ -152,71 +164,70 @@ def execute_and_trace(task_name, *args, **kwargs):
 
 
 class TaskRequest(object):
-    """A request for task execution.
-
-    :param task_name: see :attr:`task_name`.
-    :param task_id: see :attr:`task_id`.
-    :param args: see :attr:`args`
-    :param kwargs: see :attr:`kwargs`.
-
-    .. attribute:: task_name
-
-        Kind of task. Must be a name registered in the task registry.
-
-    .. attribute:: task_id
-
-        UUID of the task.
+    """A request for task execution."""
 
-    .. attribute:: args
+    #: kind of task.  Must be a name registered in the task registry.
+    name = None
 
-        List of positional arguments to apply to the task.
+    #: the task class (set by constructor using :attr:`task_name`).
+    task = None
 
-    .. attribute:: kwargs
+    #: UUID of the task.
+    task_id = None
 
-        Mapping of keyword arguments to apply to the task.
+    #: List of positional arguments to apply to the task.
+    args = None
 
-    .. attribute:: on_ack
+    #: Mapping of keyword arguments to apply to the task.
+    kwargs = None
 
-        Callback called when the task should be acknowledged.
+    #: number of times the task has been retried.
+    retries = 0
 
-    .. attribute:: message
+    #: the tasks eta (for information only).
+    eta = None
 
-        The original message sent. Used for acknowledging the message.
+    #: when the task expires.
+    expires = None
 
-    .. attribute:: executed
+    #: callback called when the task should be acknowledged.
+    on_ack = None
 
-        Set to :const:`True` if the task has been executed.
-        A task should only be executed once.
+    #: the message object.  Used to acknowledge the message.
+    message = None
 
-    .. attribute:: delivery_info
-
-        Additional delivery info, e.g. the contains the path
-        from producer to consumer.
+    #: flag set when the task has been executed.
+    executed = False
 
-    .. attribute:: acknowledged
+    #: additional delivery info, e.g. contains the path from
+    #: producer to consumer.
+    delivery_info = None
 
-        Set to :const:`True` if the task has been acknowledged.
+    #: flag set when the task has been acknowledged.
+    acknowledged = False
 
-    """
-    # Logging output
+    #: format string used to log task success.
     success_msg = "Task %(name)s[%(id)s] processed: %(return_value)s"
-    error_msg = """
+
+    #: format string used to log task failure.
+    error_msg = """\
         Task %(name)s[%(id)s] raised exception: %(exc)s\n%(traceback)s
     """
-    retry_msg = """
-        Task %(name)s[%(id)s] retry: %(exc)s
-    """
 
-    # E-mails
-    email_subject = """
+    #: format string used to log task retry.
+    retry_msg = """Task %(name)s[%(id)s] retry: %(exc)s"""
+
+    #: format string used to generate error e-mail subjects.
+    email_subject = """\
         [celery@%(hostname)s] Error: Task %(name)s (%(id)s): %(exc)s
     """
+
+    #: format string used to generate error e-mail content.
     email_body = TASK_ERROR_EMAIL_BODY
 
-    # Internal flags
-    executed = False
-    acknowledged = False
+    #: timestamp set when the task is started.
     time_start = None
+
     _already_revoked = False
 
     def __init__(self, task_name, task_id, args, kwargs,
@@ -513,12 +524,6 @@ class TaskRequest(object):
             body = self.email_body.strip() % context
             self.app.mail_admins(subject, body, fail_silently=fail_silently)
 
-    def __repr__(self):
-        return '<%s: {name:"%s", id:"%s", args:"%s", kwargs:"%s"}>' % (
-                self.__class__.__name__,
-                self.task_name, self.task_id,
-                self.args, self.kwargs)
-
     def info(self, safe=False):
         args = self.args
         kwargs = self.kwargs
@@ -541,3 +546,10 @@ class TaskRequest(object):
                     self.task_id,
                     self.eta and " eta:[%s]" % (self.eta, ) or "",
                     self.expires and " expires:[%s]" % (self.expires, ) or "")
+
+    def __repr__(self):
+        return '<%s: {name:"%s", id:"%s", args:"%s", kwargs:"%s"}>' % (
+                self.__class__.__name__,
+                self.task_name, self.task_id,
+                self.args, self.kwargs)
+

+ 16 - 22
celery/worker/state.py

@@ -3,34 +3,28 @@ import shelve
 from celery.utils.compat import defaultdict
 from celery.datastructures import LimitedSet
 
-# Maximum number of revokes to keep in memory.
+#: maximum number of revokes to keep in memory.
 REVOKES_MAX = 10000
 
-# How many seconds a revoke will be active before
-# being expired when the max limit has been exceeded.
-REVOKE_EXPIRES = 3600                       # One hour.
+#: how many seconds a revoke will be active before
+#: being expired when the max limit has been exceeded.
+REVOKE_EXPIRES = 3600
 
-"""
-.. data:: active_requests
-
-Set of currently active :class:`~celery.worker.job.TaskRequest`'s.
-
-.. data:: total_count
-
-Count of tasks executed by the worker, sorted by type.
-
-.. data:: revoked
-
-The list of currently revoked tasks. (PERSISTENT if statedb set).
-
-"""
+#: set of all reserved :class:`~celery.worker.job.TaskRequest`'s.
 reserved_requests = set()
+
+#: set of currently active :class:`~celery.worker.job.TaskRequest`'s.
 active_requests = set()
+
+#: count of tasks executed by the worker, sorted by type.
 total_count = defaultdict(lambda: 0)
+
+#: the list of currently revoked tasks.  Persistent if statedb set.
 revoked = LimitedSet(maxlen=REVOKES_MAX, expires=REVOKE_EXPIRES)
 
 
 def task_reserved(request):
+    """Updates global state when a task has been reserved."""
     reserved_requests.add(request)
 
 
@@ -54,10 +48,6 @@ class Persistent(object):
         self.filename = filename
         self._load()
 
-    def _load(self):
-        self.merge(self.db)
-        self.close()
-
     def save(self):
         self.sync(self.db).sync()
         self.close()
@@ -80,6 +70,10 @@ class Persistent(object):
             self._open.close()
             self._open = None
 
+    def _load(self):
+        self.merge(self.db)
+        self.close()
+
     @property
     def db(self):
         if self._open is None: