فهرست منبع

Merge branch '3.0'

Conflicts:
	celery/app/base.py
	celery/app/utils.py
	celery/backends/base.py
	celery/worker/consumer.py
Ask Solem 12 سال پیش
والد
کامیت
2734c1ef70
6فایلهای تغییر یافته به همراه26 افزوده شده و 10 حذف شده
  1. 1 0
      celery/app/amqp.py
  2. 4 2
      celery/app/base.py
  3. 1 0
      celery/app/defaults.py
  4. 2 1
      celery/worker/consumer.py
  5. 3 6
      celery/worker/loops.py
  6. 15 1
      docs/configuration.rst

+ 1 - 0
celery/app/amqp.py

@@ -163,6 +163,7 @@ class TaskProducer(Producer):
     auto_declare = False
     retry = False
     retry_policy = None
+    utc = True
 
     def __init__(self, channel=None, exchange=None, *args, **kwargs):
         self.retry = kwargs.pop('retry', self.retry)

+ 4 - 2
celery/app/base.py

@@ -78,8 +78,8 @@ class Celery(object):
     def __init__(self, main=None, loader=None, backend=None,
                  amqp=None, events=None, log=None, control=None,
                  set_as_current=True, accept_magic_kwargs=False,
-                 tasks=None, broker=None, include=None, fixups=None,
-                 changes=None, **kwargs):
+                 tasks=None, broker=None, include=None, changes=None,
+                 fixups=None, **kwargs):
         self.clock = LamportClock()
         self.main = main
         self.amqp_cls = amqp or self.amqp_cls
@@ -194,6 +194,8 @@ class Celery(object):
     def _task_from_fun(self, fun, **options):
         base = options.pop('base', None) or self.Task
 
+        print('%r base is: %r' % (fun, base, ))
+
         T = type(fun.__name__, (base, ), dict({
             'app': self,
             'accept_magic_kwargs': False,

+ 1 - 0
celery/app/defaults.py

@@ -70,6 +70,7 @@ NAMESPACES = {
         'CONNECTION_RETRY': Option(True, type='bool'),
         'CONNECTION_MAX_RETRIES': Option(100, type='int'),
         'HEARTBEAT': Option(10, type='int'),
+        'HEARTBEAT_CHECKRATE': Option(2.0, type='int'),
         'POOL_LIMIT': Option(10, type='int'),
         'USE_SSL': Option(False, type='bool'),
         'TRANSPORT': Option(type='string'),

+ 2 - 1
celery/worker/consumer.py

@@ -157,6 +157,7 @@ class Consumer(object):
 
         self._does_info = logger.isEnabledFor(logging.INFO)
         self._quick_put = self.ready_queue.put
+        self.amqheartbeat_rate = self.app.conf.BROKER_HEARTBEAT_CHECKRATE
 
         if hub:
             self.amqheartbeat = amqheartbeat
@@ -216,7 +217,7 @@ class Consumer(object):
                 self.strategies, self.namespace, self.hub, self.qos,
                 self.amqheartbeat, self.handle_unknown_message,
                 self.handle_unknown_task, self.handle_invalid_task,
-                self.app.clock)
+                self.app.clock, self.amqheartbeat_rate)
 
     def on_poll_init(self, hub):
         hub.update_readers(self.connection.eventmap)

+ 3 - 6
celery/worker/loops.py

@@ -19,14 +19,11 @@ from celery.five import Empty
 
 from . import state
 
-#: Heartbeat check is called every heartbeat_seconds' / rate'.
-AMQHEARTBEAT_RATE = 2.0
-
 
 def asynloop(obj, connection, consumer, strategies, ns, hub, qos,
              heartbeat, handle_unknown_message, handle_unknown_task,
-             handle_invalid_task, clock, sleep=sleep, min=min, Empty=Empty,
-             hbrate=AMQHEARTBEAT_RATE):
+             handle_invalid_task, clock, hbrate=2.0,
+             sleep=sleep, min=min, Empty=Empty):
     """Non-blocking eventloop consuming messages until connection is lost,
     or shutdown is requested."""
 
@@ -123,7 +120,7 @@ def asynloop(obj, connection, consumer, strategies, ns, hub, qos,
 
 def synloop(obj, connection, consumer, strategies, ns, hub, qos,
             heartbeat, handle_unknown_message, handle_unknown_task,
-            handle_invalid_task, clock, **kwargs):
+            handle_invalid_task, clock, hbrate=2.0, **kwargs):
     """Fallback blocking eventloop for transports that doesn't support AIO."""
 
     def on_task_received(body, message):

+ 15 - 1
docs/configuration.rst

@@ -785,9 +785,23 @@ and this requires the :mod:`amqp` module:
     $ pip install amqp
 
 The default heartbeat value is 10 seconds,
-the heartbeat will then be monitored at double the rate of the heartbeat value
+the heartbeat will then be monitored at the interval specified
+by the :setting:`BROKER_HEARTBEAT_CHECKRATE` setting, which by default is
+double the rate of the heartbeat value
 (so for the default 10 seconds, the heartbeat is checked every 5 seconds).
 
+.. setting:: BROKER_HEARTBEAT_CHECKRATE
+
+BROKER_HEARTBEAT_CHECKRATE
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+:transports supported: ``pyamqp``
+
+At intervals the worker will monitor that the broker has not missed
+too many heartbeats.  The rate at which this is checked is calculated
+by dividing the :setting:`BROKER_HEARTBEAT` value with this value,
+so if the heartbeat is 10.0 and the rate is the default 2.0, the check
+will be performed every 5 seconds (twice the heartbeat sending rate).
+
 .. setting:: BROKER_USE_SSL
 
 BROKER_USE_SSL