123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596 |
- from celery import current_app, task, uuid
- from celery.worker.consumer import Consumer
- #from celery.worker.job import Request
- from celery.app.task import Context
- from celery.concurrency.solo import TaskPool
- from celery.app.amqp import TASK_BARE
- from time import time
- from Queue import Queue
- from librabbitmq import Message
- from celery.utils.functional import noop
- from celery.worker.job import NEEDS_KWDICT
- from celery.datastructures import AttributeDict
- import socket
- import sys
- @task(accept_magic_kwargs=False)
- def T():
- pass
- class Request(object):
- #__slots__ = ("app", "name", "id", "args", "kwargs",
- # "on_ack", "delivery_info", "hostname",
- # "eventer", "connection_errors",
- # "task", "eta", "expires", "flags",
- # "request_dict", "acknowledged",
- # "worker_pid", "started",
- # "_already_revoked", "_terminate_on_ack", "_tzlocal")
- eta = None
- started = False
- acknowledged = _already_revoked = False
- worker_pid = _terminate_on_ack = None
- _tzlocal = None
- expires = None
- delivery_info = {}
- flags = 0
- args = ()
- def __init__(self, body, on_ack=noop,
- hostname=None, eventer=None, app=None,
- connection_errors=None, request_dict=None,
- delivery_info=None, task=None, Context=Context, **opts):
- self.app = app
- self.name = body["task"]
- self.id = body["id"]
- self.args = body["args"]
- try:
- self.kwargs = body["kwargs"]
- if NEEDS_KWDICT:
- self.kwargs = kwdict(self.kwargs)
- except KeyError:
- self.kwargs = {}
- try:
- self.flags = body["flags"]
- except KeyError:
- pass
- self.on_ack = on_ack
- self.hostname = hostname
- self.eventer = eventer
- self.connection_errors = connection_errors or ()
- self.task = task or self.app._tasks[self.name]
- if "eta" in body:
- eta = body["eta"]
- tz = tz_utc if utc else self.tzlocal
- self.eta = tz_to_local(maybe_iso8601(eta), self.tzlocal, tz)
- if "expires" in body:
- expires = body["expires"]
- tz = tz_utc if utc else self.tzlocal
- self.expires = tz_to_local(maybe_iso8601(expires),
- self.tzlocal, tz)
- if delivery_info:
- self.delivery_info = {
- "exchange": delivery_info.get("exchange"),
- "routing_key": delivery_info.get("routing_key"),
- }
- self.request_dict = AttributeDict(
- {"called_directly": False,
- "callbacks": [],
- "errbacks": [],
- "chord": None}, **body)
- tid = uuid()
- hostname = socket.gethostname()
- task = {"task": T.name, "args": (), "kwargs": {}, "id": tid, "flags": 0}
- app = current_app._get_current_object()
- m = Message(None, {}, {}, task)
- ts = time()
- for i in xrange(1000000):
- x = Request(task, hostname=hostname, app=app, task=task)
- print(time() - ts)
|