strategy.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.strategy
  4. ~~~~~~~~~~~~~~~~~~~~~~
  5. Task execution strategy (optimization).
  6. """
  7. from __future__ import absolute_import
  8. import logging
  9. from kombu.async.timer import to_timestamp
  10. from kombu.utils.encoding import safe_repr
  11. from celery.utils.log import get_logger
  12. from celery.utils.timeutils import timezone
  13. from .job import Request
  14. from .state import task_reserved
  15. __all__ = ['default']
  16. logger = get_logger(__name__)
  17. def default(task, app, consumer,
  18. info=logger.info, error=logger.error, task_reserved=task_reserved,
  19. to_system_tz=timezone.to_system):
  20. hostname = consumer.hostname
  21. eventer = consumer.event_dispatcher
  22. Req = Request
  23. connection_errors = consumer.connection_errors
  24. _does_info = logger.isEnabledFor(logging.INFO)
  25. events = eventer and eventer.enabled
  26. send_event = eventer.send
  27. call_at = consumer.timer.call_at
  28. apply_eta_task = consumer.apply_eta_task
  29. rate_limits_enabled = not consumer.disable_rate_limits
  30. get_bucket = consumer.task_buckets.__getitem__
  31. handle = consumer.on_task_request
  32. limit_task = consumer._limit_task
  33. def task_message_handler(message, body, ack, reject, callbacks,
  34. to_timestamp=to_timestamp):
  35. req = Req(body, on_ack=ack, on_reject=reject,
  36. app=app, hostname=hostname,
  37. eventer=eventer, task=task,
  38. connection_errors=connection_errors,
  39. message=message)
  40. if req.revoked():
  41. return
  42. if _does_info:
  43. info('Received task: %s', req)
  44. if events:
  45. send_event(
  46. 'task-received',
  47. uuid=req.id, name=req.name,
  48. args=safe_repr(req.args), kwargs=safe_repr(req.kwargs),
  49. retries=req.request_dict.get('retries', 0),
  50. eta=req.eta and req.eta.isoformat(),
  51. expires=req.expires and req.expires.isoformat(),
  52. )
  53. if req.eta:
  54. try:
  55. if req.utc:
  56. eta = to_timestamp(to_system_tz(req.eta))
  57. else:
  58. eta = to_timestamp(req.eta, timezone.local)
  59. except OverflowError as exc:
  60. error("Couldn't convert eta %s to timestamp: %r. Task: %r",
  61. req.eta, exc, req.info(safe=True), exc_info=True)
  62. req.acknowledge()
  63. else:
  64. consumer.qos.increment_eventually()
  65. call_at(eta, apply_eta_task, (req, ), priority=6)
  66. else:
  67. if rate_limits_enabled:
  68. bucket = get_bucket(task.name)
  69. if bucket:
  70. return limit_task(req, bucket, 1)
  71. task_reserved(req)
  72. if callbacks:
  73. [callback() for callback in callbacks]
  74. handle(req)
  75. return task_message_handler