Преглед на файлове

Merge branch 'master' into gossip2

Conflicts:
	celery/events/__init__.py
Ask Solem преди 12 години
родител
ревизия
2ce242d6ef

+ 6 - 6
celery/__init__.py

@@ -45,14 +45,14 @@ if STATICA_HACK:
     # This is never executed, but tricks static analyzers (PyDev, PyCharm,
     # This is never executed, but tricks static analyzers (PyDev, PyCharm,
     # pylint, etc.) into knowing the types of these symbols, and what
     # pylint, etc.) into knowing the types of these symbols, and what
     # they contain.
     # they contain.
-    from celery.app.base import Celery
-    from celery.app.utils import bugreport
-    from celery.app.task import Task
-    from celery._state import current_app, current_task
-    from celery.canvas import (
+    from celery.app.base import Celery                  # noqa
+    from celery.app.utils import bugreport              # noqa
+    from celery.app.task import Task                    # noqa
+    from celery._state import current_app, current_task # noqa
+    from celery.canvas import (                         # noqa
         chain, chord, chunks, group, subtask, xmap, xstarmap,
         chain, chord, chunks, group, subtask, xmap, xstarmap,
     )
     )
-    from celery.utils import uuid
+    from celery.utils import uuid                       # noqa
 
 
 # Lazy loading
 # Lazy loading
 from .__compat__ import recreate_module
 from .__compat__ import recreate_module

+ 5 - 1
celery/app/control.py

@@ -55,6 +55,9 @@ class Inspect(object):
     def report(self):
     def report(self):
         return self._request('report')
         return self._request('report')
 
 
+    def clock(self):
+        return self._request('clock')
+
     def active(self, safe=False):
     def active(self, safe=False):
         return self._request('dump_active', safe=safe)
         return self._request('dump_active', safe=safe)
 
 
@@ -89,7 +92,8 @@ class Control(object):
 
 
     def __init__(self, app=None):
     def __init__(self, app=None):
         self.app = app_or_default(app)
         self.app = app_or_default(app)
-        self.mailbox = self.Mailbox('celery', type='fanout')
+        self.mailbox = self.Mailbox('celery',
+                type='fanout', clock=self.app.clock)
 
 
     @cached_property
     @cached_property
     def inspect(self):
     def inspect(self):

+ 2 - 2
celery/app/task.py

@@ -444,7 +444,7 @@ class Task(object):
         :keyword publisher: Deprecated alias to ``producer``.
         :keyword publisher: Deprecated alias to ``producer``.
 
 
         Also supports all keyword arguments supported by
         Also supports all keyword arguments supported by
-        :meth:`kombu.messaging.Producer.publish`.
+        :meth:`kombu.Producer.publish`.
 
 
         .. note::
         .. note::
             If the :setting:`CELERY_ALWAYS_EAGER` setting is set, it will
             If the :setting:`CELERY_ALWAYS_EAGER` setting is set, it will
@@ -474,7 +474,7 @@ class Task(object):
                 evd = app.events.Dispatcher(channel=P.channel,
                 evd = app.events.Dispatcher(channel=P.channel,
                                             buffer_while_offline=False)
                                             buffer_while_offline=False)
 
 
-            extra_properties = self.backend.on_task_call(producer, task_id)
+            extra_properties = self.backend.on_task_call(P, task_id)
             task_id = P.publish_task(self.name, args, kwargs,
             task_id = P.publish_task(self.name, args, kwargs,
                                      task_id=task_id,
                                      task_id=task_id,
                                      event_dispatcher=evd,
                                      event_dispatcher=evd,

+ 4 - 1
celery/backends/amqp.py

@@ -115,9 +115,12 @@ class AMQPBackend(BaseBackend):
                             routing_key=self._routing_key(task_id),
                             routing_key=self._routing_key(task_id),
                             serializer=self.serializer,
                             serializer=self.serializer,
                             retry=True, retry_policy=self.retry_policy,
                             retry=True, retry_policy=self.retry_policy,
-                            declare=[self._create_binding(task_id)])
+                            declare=self.on_reply_declare(task_id))
         return result
         return result
 
 
+    def on_reply_declare(self, task_id):
+        return [self._create_binding(task_id)]
+
     def wait_for(self, task_id, timeout=None, cache=True, propagate=True,
     def wait_for(self, task_id, timeout=None, cache=True, propagate=True,
             **kwargs):
             **kwargs):
         cached_meta = self._cache.get(task_id)
         cached_meta = self._cache.get(task_id)

+ 7 - 17
celery/backends/amqrpc.py

@@ -7,26 +7,15 @@
 
 
 """
 """
 from __future__ import absolute_import
 from __future__ import absolute_import
-from __future__ import with_statement
 
 
 import kombu
 import kombu
-import os
-import uuid
 
 
 from threading import local
 from threading import local
 
 
-from kombu.common import maybe_declare
-from celery.backends import amqp
-
-try:
-    from thread import get_ident            # noqa
-except ImportError:                         # pragma: no cover
-    try:
-        from dummy_thread import get_ident  # noqa
-    except ImportError:                     # pragma: no cover
-        from _thread import get_ident       # noqa
+from kombu.common import maybe_declare, oid_from
 
 
-_nodeid = uuid.getnode()
+from celery import current_task
+from celery.backends import amqp
 
 
 
 
 class AMQRPCBackend(amqp.AMQPBackend):
 class AMQRPCBackend(amqp.AMQPBackend):
@@ -50,9 +39,11 @@ class AMQRPCBackend(amqp.AMQPBackend):
         return [self.binding]
         return [self.binding]
 
 
     def _routing_key(self, task_id):
     def _routing_key(self, task_id):
-        from celery import current_task
         return current_task.request.reply_to
         return current_task.request.reply_to
 
 
+    def on_reply_declare(self, task_id):
+        pass
+
     @property
     @property
     def binding(self):
     def binding(self):
         return self.Queue(self.oid, self.exchange, self.oid,
         return self.Queue(self.oid, self.exchange, self.oid,
@@ -63,6 +54,5 @@ class AMQRPCBackend(amqp.AMQPBackend):
         try:
         try:
             return self._tls.OID
             return self._tls.OID
         except AttributeError:
         except AttributeError:
-            ent = '%x-%x-%x' % (_nodeid, os.getpid(), get_ident())
-            oid = self._tls.OID = str(uuid.uuid3(uuid.NAMESPACE_OID, ent))
+            oid = self._tls.OID = oid_from(self)
             return oid
             return oid

+ 0 - 1
celery/beat.py

@@ -7,7 +7,6 @@
 
 
 """
 """
 from __future__ import absolute_import
 from __future__ import absolute_import
-from __future__ import with_statement
 
 
 import errno
 import errno
 import os
 import os

+ 1 - 0
celery/bin/celery.py

@@ -603,6 +603,7 @@ class inspect(_RemoteControl):
         'revoked': (1.0, 'dump of revoked task ids'),
         'revoked': (1.0, 'dump of revoked task ids'),
         'registered': (1.0, 'dump of registered tasks'),
         'registered': (1.0, 'dump of registered tasks'),
         'ping': (0.2, 'ping worker(s)'),
         'ping': (0.2, 'ping worker(s)'),
+        'clock': (1.0, 'get value of logical clock'),
         'report': (1.0, 'get bugreport info')
         'report': (1.0, 'get bugreport info')
     }
     }
 
 

+ 0 - 1
celery/concurrency/gevent.py

@@ -7,7 +7,6 @@
 
 
 """
 """
 from __future__ import absolute_import
 from __future__ import absolute_import
-from __future__ import with_statement
 
 
 import os
 import os
 
 

+ 4 - 4
celery/events/__init__.py

@@ -99,6 +99,8 @@ class EventDispatcher(object):
         self.on_enabled = set()
         self.on_enabled = set()
         self.on_disabled = set()
         self.on_disabled = set()
         self.groups = set(groups or [])
         self.groups = set(groups or [])
+        self.tzoffset = [-time.timezone, -time.altzone]
+        self.clock = self.app.clock
         if not connection and channel:
         if not connection and channel:
             self.connection = channel.connection.client
             self.connection = channel.connection.client
         self.enabled = enabled
         self.enabled = enabled
@@ -149,7 +151,7 @@ class EventDispatcher(object):
 
 
             with self.mutex:
             with self.mutex:
                 event = Event(type, hostname=self.hostname,
                 event = Event(type, hostname=self.hostname,
-                                    clock=self.app.clock.forward(),
+                                    clock=self.clock.forward(),
                                     utcoffset=utcoffset(), **fields)
                                     utcoffset=utcoffset(), **fields)
                 try:
                 try:
                     self.publisher.publish(event,
                     self.publisher.publish(event,
@@ -241,9 +243,7 @@ class EventReceiver(ConsumerMixin):
 
 
     def event_from_message(self, body, localize=True):
     def event_from_message(self, body, localize=True):
         type = body.get('type', '').lower()
         type = body.get('type', '').lower()
-        clock = body.get('clock')
-        if clock:
-            self.adjust_clock(body.get('clock') or 0)
+        self.adjust_clock(body.get('clock') or 0)
         if localize:
         if localize:
             try:
             try:
                 offset, timestamp = _TZGETTER(body)
                 offset, timestamp = _TZGETTER(body)

+ 5 - 5
celery/events/state.py

@@ -343,9 +343,9 @@ class State(object):
         Returns a list of `(uuid, task)` tuples.
         Returns a list of `(uuid, task)` tuples.
 
 
         """
         """
-        sorted_tasks = self._sort_tasks_by_time([(uuid, task)
-                                                 for uuid, task in self.tasks.iteritems()
-                                                 if task.name == name])
+        sorted_tasks = self._sort_tasks_by_time((uuid, task)
+                for uuid, task in self.tasks.iteritems()
+                    if task.name == name)
 
 
         return sorted_tasks[0:limit or None]
         return sorted_tasks[0:limit or None]
 
 
@@ -355,9 +355,9 @@ class State(object):
         Returns a list of `(uuid, task)` tuples.
         Returns a list of `(uuid, task)` tuples.
 
 
         """
         """
-        return self._sort_tasks_by_time([(uuid, task)
+        return self._sort_tasks_by_time((uuid, task)
                 for uuid, task in self.itertasks(limit)
                 for uuid, task in self.itertasks(limit)
-                    if task.worker.hostname == hostname])
+                    if task.worker.hostname == hostname)
 
 
     def task_types(self):
     def task_types(self):
         """Returns a list of all seen task types."""
         """Returns a list of all seen task types."""

+ 2 - 2
celery/task/__init__.py

@@ -27,7 +27,7 @@ if STATICA_HACK:
     # This is never executed, but tricks static analyzers (PyDev, PyCharm,
     # This is never executed, but tricks static analyzers (PyDev, PyCharm,
     # pylint, etc.) into knowing the types of these symbols, and what
     # pylint, etc.) into knowing the types of these symbols, and what
     # they contain.
     # they contain.
-    from celery.canvas import chain, group, chord, subtask
+    from celery.canvas import group, chord, subtask
     from .base import BaseTask, Task, PeriodicTask, task, periodic_task
     from .base import BaseTask, Task, PeriodicTask, task, periodic_task
     from .sets import TaskSet
     from .sets import TaskSet
 
 
@@ -42,7 +42,7 @@ old_module, new_module = recreate_module(__name__,  # pragma: no cover
     by_module={
     by_module={
         'celery.task.base':   ['BaseTask', 'Task', 'PeriodicTask',
         'celery.task.base':   ['BaseTask', 'Task', 'PeriodicTask',
                                'task', 'periodic_task'],
                                'task', 'periodic_task'],
-        'celery.canvas':      ['chain', 'group', 'chord', 'subtask'],
+        'celery.canvas':      ['group', 'chord', 'subtask'],
         'celery.task.sets':   ['TaskSet'],
         'celery.task.sets':   ['TaskSet'],
     },
     },
     base=module,
     base=module,

+ 7 - 9
celery/tests/backends/test_amqp.py

@@ -5,6 +5,8 @@ import socket
 from datetime import timedelta
 from datetime import timedelta
 from Queue import Empty, Queue
 from Queue import Empty, Queue
 
 
+from mock import patch
+
 from celery import current_app
 from celery import current_app
 from celery import states
 from celery import states
 from celery.app import app_or_default
 from celery.app import app_or_default
@@ -246,15 +248,11 @@ class test_AMQPBackend(AppCase):
             next(b.get_many(['id1']))
             next(b.get_many(['id1']))
 
 
     def test_test_get_many_raises_inner_block(self):
     def test_test_get_many_raises_inner_block(self):
-
-        class Backend(AMQPBackend):
-
-            def drain_events(self, *args, **kwargs):
-                raise KeyError('foo')
-
-        b = Backend()
-        with self.assertRaises(KeyError):
-            next(b.get_many(['id1']))
+        with patch('kombu.connection.Connection.drain_events') as drain:
+            drain.side_effect = KeyError('foo')
+            b = AMQPBackend()
+            with self.assertRaises(KeyError):
+                next(b.get_many(['id1']))
 
 
     def test_no_expires(self):
     def test_no_expires(self):
         b = self.create_backend(expires=None)
         b = self.create_backend(expires=None)

+ 7 - 1
celery/worker/control.py

@@ -188,7 +188,13 @@ def stats(panel, **kwargs):
             'consumer': panel.consumer.info,
             'consumer': panel.consumer.info,
             'pool': panel.consumer.pool.info,
             'pool': panel.consumer.pool.info,
             'autoscaler': asinfo,
             'autoscaler': asinfo,
-            'pid': os.getpid()}
+            'pid': os.getpid(),
+            'clock': str(panel.app.clock)}
+
+
+@Panel.register
+def clock(panel, **kwargs):
+    return {'clock': panel.app.clock.value}
 
 
 
 
 @Panel.register
 @Panel.register

+ 1 - 1
docs/reference/celery.app.amqp.rst

@@ -13,7 +13,7 @@
         .. attribute:: Connection
         .. attribute:: Connection
 
 
             Broker connection class used.  Default is
             Broker connection class used.  Default is
-            :class:`kombu.connection.Connection`.
+            :class:`kombu.Connection`.
 
 
         .. attribute:: Consumer
         .. attribute:: Consumer
 
 

+ 1 - 1
docs/reference/celery.rst

@@ -264,7 +264,7 @@ Application
         :keyword transport: defaults to the :setting:`BROKER_TRANSPORT`
         :keyword transport: defaults to the :setting:`BROKER_TRANSPORT`
                  setting.
                  setting.
 
 
-        :returns :class:`kombu.connection.Connection`:
+        :returns :class:`kombu.Connection`:
 
 
     .. method:: Celery.connection_or_acquire(connection=None)
     .. method:: Celery.connection_or_acquire(connection=None)