reqi.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. from celery import current_app, task, uuid
  2. from celery.five import range
  3. from celery.worker.consumer import Consumer
  4. #from celery.worker.job import Request
  5. from celery.app.task import Context
  6. from celery.concurrency.solo import TaskPool
  7. from celery.app.amqp import TASK_BARE
  8. from time import time
  9. from Queue import Queue
  10. from librabbitmq import Message
  11. from celery.utils.functional import noop
  12. from celery.worker.job import NEEDS_KWDICT
  13. from celery.datastructures import AttributeDict
  14. import socket
  15. import sys
  16. @task(accept_magic_kwargs=False)
  17. def T():
  18. pass
  19. class Request(object):
  20. #__slots__ = ('app', 'name', 'id', 'args', 'kwargs',
  21. # 'on_ack', 'delivery_info', 'hostname',
  22. # 'eventer', 'connection_errors',
  23. # 'task', 'eta', 'expires', 'flags',
  24. # 'request_dict', 'acknowledged',
  25. # 'worker_pid', 'started',
  26. # '_already_revoked', '_terminate_on_ack', '_tzlocal')
  27. eta = None
  28. started = False
  29. acknowledged = _already_revoked = False
  30. worker_pid = _terminate_on_ack = None
  31. _tzlocal = None
  32. expires = None
  33. delivery_info = {}
  34. flags = 0
  35. args = ()
  36. def __init__(self, body, on_ack=noop,
  37. hostname=None, eventer=None, app=None,
  38. connection_errors=None, request_dict=None,
  39. delivery_info=None, task=None, Context=Context, **opts):
  40. self.app = app
  41. self.name = body['task']
  42. self.id = body['id']
  43. self.args = body['args']
  44. try:
  45. self.kwargs = body['kwargs']
  46. if NEEDS_KWDICT:
  47. self.kwargs = kwdict(self.kwargs)
  48. except KeyError:
  49. self.kwargs = {}
  50. try:
  51. self.flags = body['flags']
  52. except KeyError:
  53. pass
  54. self.on_ack = on_ack
  55. self.hostname = hostname
  56. self.eventer = eventer
  57. self.connection_errors = connection_errors or ()
  58. self.task = task or self.app._tasks[self.name]
  59. if 'eta' in body:
  60. eta = body['eta']
  61. tz = tz_utc if utc else self.tzlocal
  62. self.eta = tz_to_local(maybe_iso8601(eta), self.tzlocal, tz)
  63. if 'expires' in body:
  64. expires = body['expires']
  65. tz = tz_utc if utc else self.tzlocal
  66. self.expires = tz_to_local(maybe_iso8601(expires),
  67. self.tzlocal, tz)
  68. if delivery_info:
  69. self.delivery_info = {
  70. 'exchange': delivery_info.get('exchange'),
  71. 'routing_key': delivery_info.get('routing_key'),
  72. }
  73. self.request_dict = AttributeDict(
  74. {'called_directly': False,
  75. 'callbacks': [],
  76. 'errbacks': [],
  77. 'chord': None}, **body)
  78. tid = uuid()
  79. hostname = socket.gethostname()
  80. task = {'task': T.name, 'args': (), 'kwargs': {}, 'id': tid, 'flags': 0}
  81. app = current_app._get_current_object()
  82. m = Message(None, {}, {}, task)
  83. ts = time()
  84. for i in range(1000000):
  85. x = Request(task, hostname=hostname, app=app, task=task)
  86. print(time() - ts)