Pārlūkot izejas kodu

task-received event working

Ask Solem 15 gadi atpakaļ
vecāks
revīzija
8779b8416c
3 mainītis faili ar 29 papildinājumiem un 9 dzēšanām
  1. 4 4
      celery/events.py
  2. 17 5
      celery/messaging.py
  3. 8 0
      celery/worker/__init__.py

+ 4 - 4
celery/events.py

@@ -1,6 +1,6 @@
 from celery.messaging import EventPublisher, EventConsumer
 from celery.messaging import EventPublisher, EventConsumer
-from datetime import datetime
 from UserDict import UserDict
 from UserDict import UserDict
+import time
 
 
 """
 """
 Events
 Events
@@ -18,17 +18,17 @@ WORKER-HEARTBEAT hostname timestamp
 """
 """
 
 
 def Event(type, **fields):
 def Event(type, **fields):
-    return dict(fields, type=event, timestamp=datetime.now())
+    return dict(fields, type=type, timestamp=time.time())
 
 
 
 
-class Dispatcher(object):
+class EventDispatcher(object):
     """
     """
 
 
     dispatcher.send("worker-heartbeat", hostname="h8.opera.com")
     dispatcher.send("worker-heartbeat", hostname="h8.opera.com")
 
 
     """
     """
     def __init__(self, connection):
     def __init__(self, connection):
-        self.connection = None
+        self.connection = connection
         self.publisher = EventPublisher(self.connection)
         self.publisher = EventPublisher(self.connection)
 
 
     def send(self, type, **fields):
     def send(self, type, **fields):

+ 17 - 5
celery/messaging.py

@@ -26,7 +26,6 @@ class TaskPublisher(Publisher):
     exchange_type = conf.AMQP_EXCHANGE_TYPE
     exchange_type = conf.AMQP_EXCHANGE_TYPE
     routing_key = conf.AMQP_PUBLISHER_ROUTING_KEY
     routing_key = conf.AMQP_PUBLISHER_ROUTING_KEY
     serializer = conf.TASK_SERIALIZER
     serializer = conf.TASK_SERIALIZER
-    encoder = pickle.dumps
 
 
     def delay_task(self, task_name, task_args, task_kwargs, **kwargs):
     def delay_task(self, task_name, task_args, task_kwargs, **kwargs):
         """Delay task for execution by the celery nodes."""
         """Delay task for execution by the celery nodes."""
@@ -74,7 +73,6 @@ class TaskConsumer(Consumer):
     exchange = conf.AMQP_EXCHANGE
     exchange = conf.AMQP_EXCHANGE
     routing_key = conf.AMQP_CONSUMER_ROUTING_KEY
     routing_key = conf.AMQP_CONSUMER_ROUTING_KEY
     exchange_type = conf.AMQP_EXCHANGE_TYPE
     exchange_type = conf.AMQP_EXCHANGE_TYPE
-    decoder = pickle.loads
     auto_ack = False
     auto_ack = False
     no_ack = False
     no_ack = False
 
 
@@ -82,7 +80,6 @@ class TaskConsumer(Consumer):
 class StatsPublisher(Publisher):
 class StatsPublisher(Publisher):
     exchange = "celerygraph"
     exchange = "celerygraph"
     routing_key = "stats"
     routing_key = "stats"
-    encoder = pickle.dumps
 
 
 
 
 class StatsConsumer(Consumer):
 class StatsConsumer(Consumer):
@@ -90,5 +87,20 @@ class StatsConsumer(Consumer):
     exchange = "celerygraph"
     exchange = "celerygraph"
     routing_key = "stats"
     routing_key = "stats"
     exchange_type = "direct"
     exchange_type = "direct"
-    decoder = pickle.loads
-    no_ack=True
+    no_ack = True
+
+
+class EventPublisher(Publisher):
+    exchange = "celeryevent"
+    routing_key = "event"
+
+
+class EventConsumer(Consumer):
+    queue = "celeryevent"
+    exchange = "celeryevent"
+    routing_key = "event"
+    exchange_type = "direct"
+    no_ack = True
+
+
+

+ 8 - 0
celery/worker/__init__.py

@@ -14,6 +14,7 @@ from celery.log import setup_logger
 from celery.pool import TaskPool
 from celery.pool import TaskPool
 from celery.utils import retry_over_time
 from celery.utils import retry_over_time
 from celery.datastructures import SharedCounter
 from celery.datastructures import SharedCounter
+from celery.events import EventDispatcher
 from Queue import Queue
 from Queue import Queue
 import traceback
 import traceback
 import logging
 import logging
@@ -50,6 +51,7 @@ class AMQPListener(object):
         self.hold_queue = hold_queue
         self.hold_queue = hold_queue
         self.logger = logger
         self.logger = logger
         self.prefetch_count = SharedCounter(initial_prefetch_count)
         self.prefetch_count = SharedCounter(initial_prefetch_count)
+        self.event_dispatcher = None
 
 
     def start(self):
     def start(self):
         """Start the consumer.
         """Start the consumer.
@@ -100,6 +102,10 @@ class AMQPListener(object):
             return
             return
 
 
         eta = message_data.get("eta")
         eta = message_data.get("eta")
+
+        print(message_data)
+        self.event_dispatcher.send("task-received", **message_data)
+
         if eta:
         if eta:
             self.prefetch_count.increment()
             self.prefetch_count.increment()
             self.logger.info("Got task from broker: %s[%s] eta:[%s]" % (
             self.logger.info("Got task from broker: %s[%s] eta:[%s]" % (
@@ -120,6 +126,7 @@ class AMQPListener(object):
                     "AMQPListener: Closing connection to the broker...")
                     "AMQPListener: Closing connection to the broker...")
             self.amqp_connection.close()
             self.amqp_connection.close()
             self.amqp_connection = None
             self.amqp_connection = None
+        self.event_dispatcher = None
 
 
     def reset_connection(self):
     def reset_connection(self):
         """Reset the AMQP connection, and reinitialize the
         """Reset the AMQP connection, and reinitialize the
@@ -134,6 +141,7 @@ class AMQPListener(object):
         self.amqp_connection = self._open_connection()
         self.amqp_connection = self._open_connection()
         self.task_consumer = get_consumer_set(connection=self.amqp_connection)
         self.task_consumer = get_consumer_set(connection=self.amqp_connection)
         self.task_consumer.register_callback(self.receive_message)
         self.task_consumer.register_callback(self.receive_message)
+        self.event_dispatcher = EventDispatcher(self.amqp_connection)
 
 
     def _open_connection(self):
     def _open_connection(self):
         """Retries connecting to the AMQP broker over time.
         """Retries connecting to the AMQP broker over time.