|
@@ -5,14 +5,16 @@ import signal
|
|
|
import socket
|
|
|
import sys
|
|
|
import traceback
|
|
|
-from celery.tests.utils import unittest
|
|
|
|
|
|
from itertools import count
|
|
|
+from time import time
|
|
|
|
|
|
from celery.exceptions import TimeoutError
|
|
|
from celery.task.control import ping, flatten_reply, inspect
|
|
|
from celery.utils import get_full_cls_name
|
|
|
|
|
|
+from celery.tests.utils import unittest
|
|
|
+
|
|
|
HOSTNAME = socket.gethostname()
|
|
|
|
|
|
|
|
@@ -21,8 +23,9 @@ def say(msg):
|
|
|
|
|
|
|
|
|
def try_while(fun, reason="Timed out", timeout=10, interval=0.5):
|
|
|
+ time_start = time()
|
|
|
for iterations in count(0):
|
|
|
- if iterations * interval >= timeout:
|
|
|
+ if time() - time_start >= timeout:
|
|
|
raise TimeoutError()
|
|
|
ret = fun()
|
|
|
if ret:
|
|
@@ -46,11 +49,9 @@ class Worker(object):
|
|
|
def _fork_and_exec(self):
|
|
|
pid = os.fork()
|
|
|
if pid == 0:
|
|
|
- os.execv(sys.executable,
|
|
|
- [sys.executable] + ["-m", "celery.bin.celeryd",
|
|
|
- "-l", self.loglevel,
|
|
|
- "-n", self.hostname])
|
|
|
- os.exit()
|
|
|
+ from celery import current_app
|
|
|
+ current_app.worker_main(["celeryd", "--loglevel=DEBUG",
|
|
|
+ "-n", self.hostname])
|
|
|
self.pid = pid
|
|
|
|
|
|
def is_alive(self, timeout=1):
|
|
@@ -58,10 +59,10 @@ class Worker(object):
|
|
|
timeout=timeout)
|
|
|
return self.hostname in flatten_reply(r)
|
|
|
|
|
|
- def wait_until_started(self, timeout=10, interval=0.2):
|
|
|
+ def wait_until_started(self, timeout=10, interval=0.5):
|
|
|
try_while(lambda: self.is_alive(interval),
|
|
|
"Worker won't start (after %s secs.)" % timeout,
|
|
|
- interval=0.2, timeout=10)
|
|
|
+ interval=interval, timeout=timeout)
|
|
|
say("--WORKER %s IS ONLINE--" % self.hostname)
|
|
|
|
|
|
def ensure_shutdown(self, timeout=10, interval=0.5):
|
|
@@ -115,7 +116,7 @@ class WorkerCase(unittest.TestCase):
|
|
|
self.assertTrue(self.worker.is_alive)
|
|
|
|
|
|
def inspect(self, timeout=1):
|
|
|
- return inspect(self.worker.hostname, timeout=timeout)
|
|
|
+ return inspect([self.worker.hostname], timeout=timeout)
|
|
|
|
|
|
def my_response(self, response):
|
|
|
return flatten_reply(response)[self.worker.hostname]
|
|
@@ -123,7 +124,7 @@ class WorkerCase(unittest.TestCase):
|
|
|
def is_accepted(self, task_id, interval=0.5):
|
|
|
active = self.inspect(timeout=interval).active()
|
|
|
if active:
|
|
|
- for task in active:
|
|
|
+ for task in active[self.worker.hostname]:
|
|
|
if task["id"] == task_id:
|
|
|
return True
|
|
|
return False
|
|
@@ -131,7 +132,7 @@ class WorkerCase(unittest.TestCase):
|
|
|
def is_reserved(self, task_id, interval=0.5):
|
|
|
reserved = self.inspect(timeout=interval).reserved()
|
|
|
if reserved:
|
|
|
- for task in reserved:
|
|
|
+ for task in reserved[self.worker.hostname]:
|
|
|
if task["id"] == task_id:
|
|
|
return True
|
|
|
return False
|
|
@@ -139,7 +140,7 @@ class WorkerCase(unittest.TestCase):
|
|
|
def is_scheduled(self, task_id, interval=0.5):
|
|
|
schedule = self.inspect(timeout=interval).scheduled()
|
|
|
if schedule:
|
|
|
- for item in schedule:
|
|
|
+ for item in schedule[self.worker.hostname]:
|
|
|
if item["request"]["id"] == task_id:
|
|
|
return True
|
|
|
return False
|