reqi.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. from celery import current_app, task, uuid
  2. from celery.worker.consumer import Consumer
  3. #from celery.worker.job import Request
  4. from celery.app.task import Context
  5. from celery.concurrency.solo import TaskPool
  6. from celery.app.amqp import TASK_BARE
  7. from time import time
  8. from Queue import Queue
  9. from librabbitmq import Message
  10. from celery.utils.functional import noop
  11. from celery.worker.job import NEEDS_KWDICT
  12. from celery.datastructures import AttributeDict
  13. import socket
  14. import sys
  15. @task(accept_magic_kwargs=False)
  16. def T():
  17. pass
  18. class Request(object):
  19. #__slots__ = ("app", "name", "id", "args", "kwargs",
  20. # "on_ack", "delivery_info", "hostname",
  21. # "eventer", "connection_errors",
  22. # "task", "eta", "expires", "flags",
  23. # "request_dict", "acknowledged",
  24. # "worker_pid", "started",
  25. # "_already_revoked", "_terminate_on_ack", "_tzlocal")
  26. eta = None
  27. started = False
  28. acknowledged = _already_revoked = False
  29. worker_pid = _terminate_on_ack = None
  30. _tzlocal = None
  31. expires = None
  32. delivery_info = {}
  33. flags = 0
  34. args = ()
  35. def __init__(self, body, on_ack=noop,
  36. hostname=None, eventer=None, app=None,
  37. connection_errors=None, request_dict=None,
  38. delivery_info=None, task=None, Context=Context, **opts):
  39. self.app = app
  40. self.name = body["task"]
  41. self.id = body["id"]
  42. self.args = body["args"]
  43. try:
  44. self.kwargs = body["kwargs"]
  45. if NEEDS_KWDICT:
  46. self.kwargs = kwdict(self.kwargs)
  47. except KeyError:
  48. self.kwargs = {}
  49. try:
  50. self.flags = body["flags"]
  51. except KeyError:
  52. pass
  53. self.on_ack = on_ack
  54. self.hostname = hostname
  55. self.eventer = eventer
  56. self.connection_errors = connection_errors or ()
  57. self.task = task or self.app._tasks[self.name]
  58. if "eta" in body:
  59. eta = body["eta"]
  60. tz = tz_utc if utc else self.tzlocal
  61. self.eta = tz_to_local(maybe_iso8601(eta), self.tzlocal, tz)
  62. if "expires" in body:
  63. expires = body["expires"]
  64. tz = tz_utc if utc else self.tzlocal
  65. self.expires = tz_to_local(maybe_iso8601(expires),
  66. self.tzlocal, tz)
  67. if delivery_info:
  68. self.delivery_info = {
  69. "exchange": delivery_info.get("exchange"),
  70. "routing_key": delivery_info.get("routing_key"),
  71. }
  72. self.request_dict = AttributeDict(
  73. {"called_directly": False,
  74. "callbacks": [],
  75. "errbacks": [],
  76. "chord": None}, **body)
  77. tid = uuid()
  78. hostname = socket.gethostname()
  79. task = {"task": T.name, "args": (), "kwargs": {}, "id": tid, "flags": 0}
  80. app = current_app._get_current_object()
  81. m = Message(None, {}, {}, task)
  82. ts = time()
  83. for i in xrange(1000000):
  84. x = Request(task, hostname=hostname, app=app, task=task)
  85. print(time() - ts)