Browse Source

Cosmetics

Ask Solem 14 years ago
parent
commit
377f1cc358
5 changed files with 16 additions and 49 deletions
  1. 0 5
      celery/__init__.py
  2. 2 32
      celery/conf.py
  3. 5 5
      celery/worker/buckets.py
  4. 7 5
      celery/worker/job.py
  5. 2 2
      docs/userguide/monitoring.rst

+ 0 - 5
celery/__init__.py

@@ -14,11 +14,6 @@ def Celery(*args, **kwargs):
     from celery.app import App
     return App(*args, **kwargs)
 
-
-def CompatCelery(*args, **kwargs):
-    return Celery(loader=os.environ.get("CELERY_LOADER", "default"))
-
-
 if not os.environ.get("CELERY_NO_EVAL", False):
     from celery.utils import LocalProxy, instantiate
     current_app = LocalProxy(lambda: instantiate("celery.app.current_app"))

+ 2 - 32
celery/conf.py

@@ -6,12 +6,11 @@ Use :mod:`celery.defaults` instead.
 
 
 """
-from celery.app import app_or_default
+from celery import current_app
 from celery.app import defaults
 
 _DEFAULTS = defaults.DEFAULTS
-
-conf = app_or_default().conf
+conf = current_app.conf
 
 ALWAYS_EAGER = conf.CELERY_ALWAYS_EAGER
 EAGER_PROPAGATES_EXCEPTIONS = conf.CELERY_EAGER_PROPAGATES_EXCEPTIONS
@@ -23,21 +22,11 @@ TASK_RESULT_EXPIRES = conf.CELERY_TASK_RESULT_EXPIRES
 IGNORE_RESULT = conf.CELERY_IGNORE_RESULT
 TRACK_STARTED = conf.CELERY_TRACK_STARTED
 ACKS_LATE = conf.CELERY_ACKS_LATE
-
 REDIRECT_STDOUTS = conf.CELERY_REDIRECT_STDOUTS
 REDIRECT_STDOUTS_LEVEL = conf.CELERY_REDIRECT_STDOUTS_LEVEL
-
-# <--- SQLAlchemy                                  <-   --   --- - ----- -- #
-
 RESULT_DBURI = conf.CELERY_RESULT_DBURI
 RESULT_ENGINE_OPTIONS = conf.CELERY_RESULT_ENGINE_OPTIONS
-
-# <--- Client                                      <-   --   --- - ----- -- #
-
 MAX_CACHED_RESULTS = conf.CELERY_MAX_CACHED_RESULTS
-
-# <--- Worker                                      <-   --   --- - ----- -- #
-
 SEND_EVENTS = conf.CELERY_SEND_EVENTS
 DEFAULT_RATE_LIMIT = conf.CELERY_DEFAULT_RATE_LIMIT
 DISABLE_RATE_LIMITS = conf.CELERY_DISABLE_RATE_LIMITS
@@ -56,22 +45,17 @@ CELERYD_STATE_DB = conf.CELERYD_STATE_DB
 CELERYD_CONCURRENCY = conf.CELERYD_CONCURRENCY
 CELERYD_PREFETCH_MULTIPLIER = conf.CELERYD_PREFETCH_MULTIPLIER
 CELERYD_POOL_PUTLOCKS = conf.CELERYD_POOL_PUTLOCKS
-
 CELERYD_POOL = conf.CELERYD_POOL
 CELERYD_LISTENER = conf.CELERYD_CONSUMER
 CELERYD_MEDIATOR = conf.CELERYD_MEDIATOR
 CELERYD_ETA_SCHEDULER = conf.CELERYD_ETA_SCHEDULER
 CELERYD_ETA_SCHEDULER_PRECISION = conf.CELERYD_ETA_SCHEDULER_PRECISION
-
-# :--- Email settings                               <-   --   --- - ----- -- #
 ADMINS = conf.ADMINS
 SERVER_EMAIL = conf.SERVER_EMAIL
 EMAIL_HOST = conf.EMAIL_HOST
 EMAIL_HOST_USER = conf.EMAIL_HOST_USER
 EMAIL_HOST_PASSWORD = conf.EMAIL_HOST_PASSWORD
 EMAIL_PORT = conf.EMAIL_PORT
-
-# :--- Broker connections                           <-   --   --- - ----- -- #
 BROKER_HOST = conf.BROKER_HOST
 BROKER_PORT = conf.BROKER_PORT
 BROKER_USER = conf.BROKER_USER
@@ -83,8 +67,6 @@ BROKER_CONNECTION_TIMEOUT = conf.BROKER_CONNECTION_TIMEOUT
 BROKER_CONNECTION_RETRY = conf.BROKER_CONNECTION_RETRY
 BROKER_CONNECTION_MAX_RETRIES = conf.BROKER_CONNECTION_MAX_RETRIES
 BROKER_BACKEND = conf.BROKER_BACKEND
-
-# <--- Message routing                             <-   --   --- - ----- -- #
 DEFAULT_QUEUE = conf.CELERY_DEFAULT_QUEUE
 DEFAULT_ROUTING_KEY = conf.CELERY_DEFAULT_ROUTING_KEY
 DEFAULT_EXCHANGE = conf.CELERY_DEFAULT_EXCHANGE
@@ -93,31 +75,19 @@ DEFAULT_DELIVERY_MODE = conf.CELERY_DEFAULT_DELIVERY_MODE
 QUEUES = conf.CELERY_QUEUES
 CREATE_MISSING_QUEUES = conf.CELERY_CREATE_MISSING_QUEUES
 ROUTES = conf.CELERY_ROUTES
-# :--- Broadcast queue settings                     <-   --   --- - ----- -- #
-
 BROADCAST_QUEUE = conf.CELERY_BROADCAST_QUEUE
 BROADCAST_EXCHANGE = conf.CELERY_BROADCAST_EXCHANGE
 BROADCAST_EXCHANGE_TYPE = conf.CELERY_BROADCAST_EXCHANGE_TYPE
-
-# :--- Event queue settings                         <-   --   --- - ----- -- #
-
 EVENT_SERIALIZER = conf.CELERY_EVENT_SERIALIZER
-
-# :--- AMQP Backend settings                        <-   --   --- - ----- -- #
-
 RESULT_EXCHANGE = conf.CELERY_RESULT_EXCHANGE
 RESULT_EXCHANGE_TYPE = conf.CELERY_RESULT_EXCHANGE_TYPE
 RESULT_SERIALIZER = conf.CELERY_RESULT_SERIALIZER
 RESULT_PERSISTENT = conf.CELERY_RESULT_PERSISTENT
-
-# :--- Celery Beat                                  <-   --   --- - ----- -- #
 CELERYBEAT_LOG_LEVEL = conf.CELERYBEAT_LOG_LEVEL
 CELERYBEAT_LOG_FILE = conf.CELERYBEAT_LOG_FILE
 CELERYBEAT_SCHEDULER = conf.CELERYBEAT_SCHEDULER
 CELERYBEAT_SCHEDULE = conf.CELERYBEAT_SCHEDULE
 CELERYBEAT_SCHEDULE_FILENAME = conf.CELERYBEAT_SCHEDULE_FILENAME
 CELERYBEAT_MAX_LOOP_INTERVAL = conf.CELERYBEAT_MAX_LOOP_INTERVAL
-
-# :--- Celery Monitor                               <-   --   --- - ----- -- #
 CELERYMON_LOG_LEVEL = conf.CELERYMON_LOG_LEVEL
 CELERYMON_LOG_FILE = conf.CELERYMON_LOG_FILE

+ 5 - 5
celery/worker/buckets.py

@@ -1,7 +1,7 @@
 import threading
-import time
 
 from collections import deque
+from time import time, sleep
 from Queue import Queue, Empty
 
 from celery.datastructures import TokenBucket
@@ -110,8 +110,8 @@ class TaskBucket(object):
         consume tokens from it.
 
         """
-        time_start = time.time()
-        did_timeout = lambda: timeout and time.time() - time_start > timeout
+        time_start = time()
+        did_timeout = lambda: timeout and time() - time_start > timeout
 
         self.not_empty.acquire()
         try:
@@ -126,7 +126,7 @@ class TaskBucket(object):
                 if remaining_time:
                     if not block or did_timeout():
                         raise Empty()
-                    time.sleep(min(remaining_time, timeout or 1))
+                    sleep(min(remaining_time, timeout or 1))
                 else:
                     return item
         finally:
@@ -302,7 +302,7 @@ class TokenBucketQueue(object):
             remaining = self.expected_time()
             if not remaining:
                 return self.get(block=block)
-            time.sleep(remaining)
+            sleep(remaining)
 
     def expected_time(self, tokens=1):
         """Returns the expected time in seconds of when a new token should be

+ 7 - 5
celery/worker/job.py

@@ -48,13 +48,11 @@ WANTED_DELIVERY_INFO = ("exchange", "routing_key", "consumer_tag", )
 
 class InvalidTaskError(Exception):
     """The task has invalid data or is not properly constructed."""
-    pass
 
 
 class AlreadyExecutedError(Exception):
     """Tasks can only be executed once, as they might change
     world-wide state."""
-    pass
 
 
 class WorkerTaskTrace(TaskTrace):
@@ -77,6 +75,11 @@ class WorkerTaskTrace(TaskTrace):
     :param args: List of positional args to pass on to the function.
     :param kwargs: Keyword arguments mapping to pass on to the function.
 
+    :keyword loader: Custom loader to use, if not specified the current app
+      loader will be used.
+    :keyword hostname: Custom hostname to use, if not specified the system
+      hostname will be used.
+
     :returns: the evaluated functions return value on success, or
         the exception instance on failure.
 
@@ -155,12 +158,11 @@ def execute_and_trace(task_name, *args, **kwargs):
 
     """
     hostname = kwargs.get("hostname")
-    platforms.set_mp_process_title("celeryd", info=task_name,
-                                   hostname=hostname)
+    platforms.set_mp_process_title("celeryd", task_name, hostname=hostname)
     try:
         return WorkerTaskTrace(task_name, *args, **kwargs).execute_safe()
     finally:
-        platforms.set_mp_process_title("celeryd", hostname=hostname)
+        platforms.set_mp_process_title("celeryd", "-idle-", hostname)
 
 
 class TaskRequest(object):

+ 2 - 2
docs/userguide/monitoring.rst

@@ -533,11 +533,11 @@ Worker Events
     * `sw_ver`: Software version (e.g. 2.2.0).
     * `sw_sys`: Operating System (e.g. Linux, Windows, Darwin).
 
-* `worker-heartbeat(hostname, timestamp)`
+* `worker-heartbeat(hostname, timestamp, sw_ident, sw_ver, sw_sys)`
 
     Sent every minute, if the worker has not sent a heartbeat in 2 minutes,
     it is considered to be offline.
 
-* `worker-offline(hostname, timestamp)`
+* `worker-offline(hostname, timestamp, sw_ident, sw_ver, sw_sys)`
 
     The worker has disconnected from the broker.