123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- from __future__ import absolute_import
- import atexit
- import logging
- import os
- import signal
- import socket
- import sys
- import traceback
- from itertools import count
- from time import time
- from celery.exceptions import TimeoutError
- from celery.task.control import ping, flatten_reply, inspect
- from celery.utils.imports import qualname
- from celery.tests.utils import Case
- HOSTNAME = socket.gethostname()
- def say(msg):
- sys.stderr.write('%s\n' % msg)
- def try_while(fun, reason='Timed out', timeout=10, interval=0.5):
- time_start = time()
- for iterations in count(0):
- if time() - time_start >= timeout:
- raise TimeoutError()
- ret = fun()
- if ret:
- return ret
- class Worker(object):
- started = False
- next_worker_id = count(1).next
- _shutdown_called = False
- def __init__(self, hostname, loglevel='error'):
- self.hostname = hostname
- self.loglevel = loglevel
- def start(self):
- if not self.started:
- self._fork_and_exec()
- self.started = True
- def _fork_and_exec(self):
- pid = os.fork()
- if pid == 0:
- from celery import current_app
- current_app.worker_main(['celeryd', '--loglevel=INFO',
- '-n', self.hostname,
- '-P', 'solo'])
- os._exit(0)
- self.pid = pid
- def is_alive(self, timeout=1):
- r = ping(destination=[self.hostname],
- timeout=timeout)
- return self.hostname in flatten_reply(r)
- def wait_until_started(self, timeout=10, interval=0.5):
- try_while(lambda: self.is_alive(interval),
- "Worker won't start (after %s secs.)" % timeout,
- interval=interval, timeout=timeout)
- say('--WORKER %s IS ONLINE--' % self.hostname)
- def ensure_shutdown(self, timeout=10, interval=0.5):
- os.kill(self.pid, signal.SIGTERM)
- try_while(lambda: not self.is_alive(interval),
- "Worker won't shutdown (after %s secs.)" % timeout,
- timeout=10, interval=0.5)
- say('--WORKER %s IS SHUTDOWN--' % self.hostname)
- self._shutdown_called = True
- def ensure_started(self):
- self.start()
- self.wait_until_started()
- @classmethod
- def managed(cls, hostname=None, caller=None):
- hostname = hostname or socket.gethostname()
- if caller:
- hostname = '.'.join([qualname(caller), hostname])
- else:
- hostname += str(cls.next_worker_id())
- worker = cls(hostname)
- worker.ensure_started()
- stack = traceback.format_stack()
- @atexit.register
- def _ensure_shutdown_once():
- if not worker._shutdown_called:
- say('-- Found worker not stopped at shutdown: %s\n%s' % (
- worker.hostname,
- '\n'.join(stack)))
- worker.ensure_shutdown()
- return worker
- class WorkerCase(Case):
- hostname = HOSTNAME
- worker = None
- @classmethod
- def setUpClass(cls):
- logging.getLogger('amqplib').setLevel(logging.ERROR)
- cls.worker = Worker.managed(cls.hostname, caller=cls)
- @classmethod
- def tearDownClass(cls):
- cls.worker.ensure_shutdown()
- def assertWorkerAlive(self, timeout=1):
- self.assertTrue(self.worker.is_alive)
- def inspect(self, timeout=1):
- return inspect([self.worker.hostname], timeout=timeout)
- def my_response(self, response):
- return flatten_reply(response)[self.worker.hostname]
- def is_accepted(self, task_id, interval=0.5):
- active = self.inspect(timeout=interval).active()
- if active:
- for task in active[self.worker.hostname]:
- if task['id'] == task_id:
- return True
- return False
- def is_reserved(self, task_id, interval=0.5):
- reserved = self.inspect(timeout=interval).reserved()
- if reserved:
- for task in reserved[self.worker.hostname]:
- if task['id'] == task_id:
- return True
- return False
- def is_scheduled(self, task_id, interval=0.5):
- schedule = self.inspect(timeout=interval).scheduled()
- if schedule:
- for item in schedule[self.worker.hostname]:
- if item['request']['id'] == task_id:
- return True
- return False
- def is_received(self, task_id, interval=0.5):
- return (self.is_reserved(task_id, interval) or
- self.is_scheduled(task_id, interval) or
- self.is_accepted(task_id, interval))
- def ensure_accepted(self, task_id, interval=0.5, timeout=10):
- return try_while(lambda: self.is_accepted(task_id, interval),
- 'Task not accepted within timeout',
- interval=0.5, timeout=10)
- def ensure_received(self, task_id, interval=0.5, timeout=10):
- return try_while(lambda: self.is_received(task_id, interval),
- 'Task not receied within timeout',
- interval=0.5, timeout=10)
- def ensure_scheduled(self, task_id, interval=0.5, timeout=10):
- return try_while(lambda: self.is_scheduled(task_id, interval),
- 'Task not scheduled within timeout',
- interval=0.5, timeout=10)
|