from celery import current_app, task, uuid from celery.worker.consumer import Consumer #from celery.worker.job import Request from celery.app.task import Context from celery.concurrency.solo import TaskPool from celery.app.amqp import TASK_BARE from time import time from Queue import Queue from librabbitmq import Message from celery.utils.functional import noop from celery.worker.job import NEEDS_KWDICT from celery.datastructures import AttributeDict import socket import sys @task(accept_magic_kwargs=False) def T(): pass class Request(object): #__slots__ = ("app", "name", "id", "args", "kwargs", # "on_ack", "delivery_info", "hostname", # "eventer", "connection_errors", # "task", "eta", "expires", "flags", # "request_dict", "acknowledged", # "worker_pid", "started", # "_already_revoked", "_terminate_on_ack", "_tzlocal") eta = None started = False acknowledged = _already_revoked = False worker_pid = _terminate_on_ack = None _tzlocal = None expires = None delivery_info = {} flags = 0 args = () def __init__(self, body, on_ack=noop, hostname=None, eventer=None, app=None, connection_errors=None, request_dict=None, delivery_info=None, task=None, Context=Context, **opts): self.app = app self.name = body["task"] self.id = body["id"] self.args = body["args"] try: self.kwargs = body["kwargs"] if NEEDS_KWDICT: self.kwargs = kwdict(self.kwargs) except KeyError: self.kwargs = {} try: self.flags = body["flags"] except KeyError: pass self.on_ack = on_ack self.hostname = hostname self.eventer = eventer self.connection_errors = connection_errors or () self.task = task or self.app._tasks[self.name] if "eta" in body: eta = body["eta"] tz = tz_utc if utc else self.tzlocal self.eta = tz_to_local(maybe_iso8601(eta), self.tzlocal, tz) if "expires" in body: expires = body["expires"] tz = tz_utc if utc else self.tzlocal self.expires = tz_to_local(maybe_iso8601(expires), self.tzlocal, tz) if delivery_info: self.delivery_info = { "exchange": delivery_info.get("exchange"), "routing_key": delivery_info.get("routing_key"), } self.request_dict = AttributeDict( {"called_directly": False, "callbacks": [], "errbacks": [], "chord": None}, **body) tid = uuid() hostname = socket.gethostname() task = {"task": T.name, "args": (), "kwargs": {}, "id": tid, "flags": 0} app = current_app._get_current_object() m = Message(None, {}, {}, task) ts = time() for i in xrange(1000000): x = Request(task, hostname=hostname, app=app, task=task) print(time() - ts)