瀏覽代碼

Merge commit 'ask/master' into fixes

Brian Rosner 16 年之前
父節點
當前提交
00c7a578b0
共有 3 個文件被更改,包括 41 次插入10 次删除
  1. 1 1
      README
  2. 27 5
      celery/bin/celeryd.py
  3. 13 4
      celery/task.py

+ 1 - 1
README

@@ -144,7 +144,7 @@ You only need three simple steps to use celery with your Django project.
     3. Configure celery to use the AMQP user and virtual host we created
     3. Configure celery to use the AMQP user and virtual host we created
         before, by adding the following to your ``settings.py``::
         before, by adding the following to your ``settings.py``::
 
 
-            AMQP_HOST = "localhost"
+            AMQP_SERVER = "localhost"
             AMQP_PORT = 5672
             AMQP_PORT = 5672
             AMQP_USER = "myuser"
             AMQP_USER = "myuser"
             AMQP_PASSWORD = "mypassword"
             AMQP_PASSWORD = "mypassword"

+ 27 - 5
celery/bin/celeryd.py

@@ -62,6 +62,7 @@ from django.conf import settings
 from celery.log import emergency_error
 from celery.log import emergency_error
 from celery.conf import LOG_LEVELS, DAEMON_LOG_FILE, DAEMON_LOG_LEVEL
 from celery.conf import LOG_LEVELS, DAEMON_LOG_FILE, DAEMON_LOG_LEVEL
 from celery.conf import DAEMON_CONCURRENCY, DAEMON_PID_FILE
 from celery.conf import DAEMON_CONCURRENCY, DAEMON_PID_FILE
+from celery import conf
 from celery import discovery
 from celery import discovery
 from celery.task import discard_all
 from celery.task import discard_all
 from celery.worker import WorkController
 from celery.worker import WorkController
@@ -72,6 +73,15 @@ from daemon import DaemonContext
 from daemon.pidlockfile import PIDLockFile
 from daemon.pidlockfile import PIDLockFile
 import errno
 import errno
 
 
+STARTUP_INFO_FMT = """
+    * Celery loading with the following configuration
+        * Broker -> amqp://%(vhost)s@%(host)s:%(port)s 
+        * Exchange -> %(exchange)s (%(exchange_type)s)
+        * Consumer -> Queue:%(consumer_queue)s Routing:%(consumer_rkey)s
+        * Concurrency:%(concurrency)s
+""".strip()
+
+
 def acquire_pidlock(pidfile):
 def acquire_pidlock(pidfile):
     """Get the :class:`daemon.pidlockfile.PIDLockFile` handler for
     """Get the :class:`daemon.pidlockfile.PIDLockFile` handler for
     ``pidfile``.
     ``pidfile``.
@@ -99,7 +109,7 @@ def acquire_pidlock(pidfile):
                 "ERROR: Pidfile (%s) already exists.\n"
                 "ERROR: Pidfile (%s) already exists.\n"
                 "Seems celeryd is already running? (PID: %d)" % (
                 "Seems celeryd is already running? (PID: %d)" % (
                     pidfile, pid))
                     pidfile, pid))
-    return pidlock        
+    return pidlock
 
 
 
 
 def run_worker(concurrency=DAEMON_CONCURRENCY, daemon=False,
 def run_worker(concurrency=DAEMON_CONCURRENCY, daemon=False,
@@ -119,11 +129,23 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, daemon=False,
 
 
     if discard:
     if discard:
         discarded_count = discard_all()
         discarded_count = discard_all()
-        what = "message"
-        if discarded_count > 1:
-            what = "messages"
+        what = discard_count > 1 and "messages" or "message"
         sys.stderr.write("Discard: Erased %d %s from the queue.\n" % (
         sys.stderr.write("Discard: Erased %d %s from the queue.\n" % (
             discarded_count, what))
             discarded_count, what))
+    startup_info = STARTUP_INFO_FMT % {
+            "vhost": settings.AMQP_VHOST,
+            "host": settings.AMQP_SERVER,
+            "port": settings.AMQP_PORT,
+            "exchange": conf.AMQP_EXCHANGE,
+            "exchange_type": conf.AMQP_EXCHANGE_TYPE,
+            "consumer_queue": conf.AMQP_CONSUMER_QUEUE,
+            "consumer_rkey": conf.AMQP_CONSUMER_ROUTING_KEY,
+            "publisher_rkey": conf.AMQP_PUBLISHER_ROUTING_KEY,
+            "concurrency": concurrency,
+            "loglevel": loglevel,
+            "pidfile": pidfile,
+    }
+    sys.stderr.write(startup_info + "\n")
     if daemon:
     if daemon:
         # Since without stderr any errors will be silently suppressed,
         # Since without stderr any errors will be silently suppressed,
         # we need to know that we have access to the logfile
         # we need to know that we have access to the logfile
@@ -135,7 +157,7 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, daemon=False,
         uid = uid and int(uid) or os.geteuid()
         uid = uid and int(uid) or os.geteuid()
         gid = gid and int(gid) or os.getegid()
         gid = gid and int(gid) or os.getegid()
         working_directory = working_directory or os.getcwd()
         working_directory = working_directory or os.getcwd()
-        sys.stderr.write("Launching celeryd in the background...\n")
+        sys.stderr.write("* Launching celeryd in the background...\n")
         context = DaemonContext(chroot_directory=chroot,
         context = DaemonContext(chroot_directory=chroot,
                                 working_directory=working_directory,
                                 working_directory=working_directory,
                                 umask=umask,
                                 umask=umask,

+ 13 - 4
celery/task.py

@@ -15,7 +15,8 @@ import pickle
 
 
 
 
 def apply_async(task, args=None, kwargs=None, routing_key=None,
 def apply_async(task, args=None, kwargs=None, routing_key=None,
-        immediate=None, mandatory=None, connect_timeout=None, priority=None):
+        immediate=None, mandatory=None, connection=None,
+        connect_timeout=None, priority=None):
     """Run a task asynchronously by the celery daemon(s).
     """Run a task asynchronously by the celery daemon(s).
 
 
     :param task: The task to run (a callable object, or a :class:`Task`
     :param task: The task to run (a callable object, or a :class:`Task`
@@ -35,6 +36,9 @@ def apply_async(task, args=None, kwargs=None, routing_key=None,
     :keyword mandatory: Mandatory routing. Raises an exception if there's
     :keyword mandatory: Mandatory routing. Raises an exception if there's
         no running workers able to take on this task.
         no running workers able to take on this task.
 
 
+    :keyword connection: Re-use existing AMQP connection.
+        The ``connect_timeout`` argument is not respected if this is set.
+
     :keyword connect_timeout: The timeout in seconds, before we give up
     :keyword connect_timeout: The timeout in seconds, before we give up
         on establishing a connection to the AMQP server.
         on establishing a connection to the AMQP server.
 
 
@@ -52,11 +56,16 @@ def apply_async(task, args=None, kwargs=None, routing_key=None,
     for option_name, option_value in message_opts.items():
     for option_name, option_value in message_opts.items():
         message_opts[option_name] = getattr(task, option_name, option_value)
         message_opts[option_name] = getattr(task, option_name, option_value)
 
 
-    conn = DjangoAMQPConnection(connect_timeout=connect_timeout)
-    publisher = TaskPublisher(connection=conn)
+    need_to_close_connection = False
+    if not connection:
+        connection = DjangoAMQPConnection(connect_timeout=connect_timeout)
+        need_to_close_connection = True
+
+    publisher = TaskPublisher(connection=connection)
     task_id = publisher.delay_task(task.name, args, kwargs, **message_opts)
     task_id = publisher.delay_task(task.name, args, kwargs, **message_opts)
     publisher.close()
     publisher.close()
-    conn.close()
+    if need_to_close_connection:
+        connection.close()
     return AsyncResult(task_id)
     return AsyncResult(task_id)