Selaa lähdekoodia

Tests passing

Ask Solem 13 vuotta sitten
vanhempi
commit
0fba468745

+ 8 - 8
celery/tests/worker/test_request.py

@@ -94,8 +94,8 @@ class MyTaskIgnoreResult(Task):
 
 
 @task_dec(accept_magic_kwargs=True)
-def mytask_some_kwargs(i, logfile):
-    some_kwargs_scratchpad["logfile"] = logfile
+def mytask_some_kwargs(i, task_id):
+    some_kwargs_scratchpad["task_id"] = task_id
     return i ** i
 
 
@@ -671,9 +671,9 @@ class test_TaskRequest(Case):
     def test_execute_success_some_kwargs(self):
         tid = uuid()
         tw = TaskRequest(mytask_some_kwargs.name, tid, [4], {})
-        self.assertEqual(tw.execute(logfile="foobaz.log"), 256)
+        self.assertEqual(tw.execute(), 256)
         meta = mytask_some_kwargs.backend.get_task_meta(tid)
-        self.assertEqual(some_kwargs_scratchpad.get("logfile"), "foobaz.log")
+        self.assertEqual(some_kwargs_scratchpad.get("task_id"), tid)
         self.assertEqual(meta["result"], 256)
         self.assertEqual(meta["status"], states.SUCCESS)
 
@@ -716,7 +716,7 @@ class test_TaskRequest(Case):
         p = MockPool()
         tw.execute_using_pool(p)
         self.assertTrue(p.target)
-        self.assertEqual(p.args[0], mytask.name)
+        self.assertEqual(p.args[0], mytask)
         self.assertEqual(p.args[1], tid)
         self.assertEqual(p.args[2], [4])
         self.assertIn("f", p.args[3])
@@ -729,10 +729,10 @@ class test_TaskRequest(Case):
         tid = uuid()
         tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
         self.assertDictEqual(
-                tw.extend_with_default_kwargs(10, "some_logfile"), {
+                tw.extend_with_default_kwargs(), {
                     "f": "x",
-                    "logfile": "some_logfile",
-                    "loglevel": 10,
+                    "logfile": None,
+                    "loglevel": None,
                     "task_id": tw.id,
                     "task_retries": 0,
                     "task_is_eager": False,

+ 5 - 5
celery/worker/job.py

@@ -168,7 +168,7 @@ class Request(object):
         return Request(body,
             delivery_info=getattr(message, "delivery_info", None), **kwargs)
 
-    def extend_with_default_kwargs(self, loglevel, logfile):
+    def extend_with_default_kwargs(self):
         """Extend the tasks keyword arguments with standard task arguments.
 
         Currently these are `logfile`, `loglevel`, `task_id`,
@@ -181,8 +181,8 @@ class Request(object):
 
         """
         kwargs = dict(self.kwargs)
-        default_kwargs = {"logfile": logfile,
-                          "loglevel": loglevel,
+        default_kwargs = {"logfile": None,   # deprecated
+                          "loglevel": None,  # deprecated
                           "task_id": self.id,
                           "task_name": self.name,
                           "task_retries": self.request_dict.get("retries", 0),
@@ -217,7 +217,7 @@ class Request(object):
         hostname = self.hostname
         kwargs = self.kwargs
         if task.accept_magic_kwargs:
-            kwargs = self.extend_with_default_kwargs(loglevel, logfile)
+            kwargs = self.extend_with_default_kwargs()
         request = self.request_dict
         request.update({"hostname": hostname, "is_eager": False,
                         "delivery_info": self.delivery_info})
@@ -248,7 +248,7 @@ class Request(object):
 
         kwargs = self.kwargs
         if self.task.accept_magic_kwargs:
-            kwargs = self.extend_with_default_kwargs(loglevel, logfile)
+            kwargs = self.extend_with_default_kwargs()
         request = self.request_dict
         request.update({"loglevel": loglevel, "logfile": logfile,
                         "hostname": self.hostname, "is_eager": False,

+ 43 - 0
funtests/benchmarks/lreq.py

@@ -0,0 +1,43 @@
+from _librabbitmq import Request
+
+from celery import task, uuid, current_app
+
+from datetime import datetime
+
+@task(accept_magic_kwargs=False)
+def add(x, y):
+    return x + y
+
+
+app = current_app._get_current_object()
+
+def on_ack(x): pass
+
+
+eta = datetime.utcnow().isoformat()
+expires = datetime.utcnow().isoformat()
+task = {"task": add.name, "id": uuid(), "args": (2, 2), "kwargs": {},
+        "eta": eta, "expires": expires}
+
+x = Request(task, app=app, on_ack=on_ack, hostname="foo",
+            delivery_info={"exchange": "celery", "routing_key": "celery"})
+
+print(x.app)
+print(x.name)
+print(x.id)
+print(x.args)
+print(x.kwargs)
+print(x.eta)
+print(x.expires)
+print(x.flags)
+print(x.on_ack)
+print(x.task)
+print(x.delivery_info)
+
+print;
+print(x.request_dict)
+
+print;
+print(x.acknowledged)
+print(x._already_revoked)
+print(x._terminate_on_ack)

+ 39 - 0
funtests/benchmarks/req.py

@@ -0,0 +1,39 @@
+from celery import current_app, task, uuid
+from celery.worker.consumer import Consumer
+from celery.worker.job import Request
+from celery.concurrency.solo import TaskPool
+from celery.app.amqp import TASK_BARE
+from time import time
+from Queue import Queue
+from librabbitmq import Message
+import socket
+import sys
+
+@task(accept_magic_kwargs=False)
+def T():
+    pass
+
+tid = uuid()
+P = TaskPool()
+hostname = socket.gethostname()
+task = {"task": T.name, "args": (), "kwargs": {}, "id": tid, "flags": 0}
+app = current_app._get_current_object()
+ready_queue = Queue()
+
+def on_put(req):
+    req.execute_using_pool(P)
+
+def on_ack(*a): pass
+
+
+m = Message(None, {}, {}, task)
+
+ready_queue.put = on_put
+x = Consumer(ready_queue, hostname=hostname, app=app)
+x.update_strategies()
+name = T.name
+ts = time()
+for i in xrange(100000):
+    x.strategies[name](m, m.body, on_ack)
+print(time() - ts)
+

+ 96 - 0
funtests/benchmarks/reqi.py

@@ -0,0 +1,96 @@
+from celery import current_app, task, uuid
+from celery.worker.consumer import Consumer
+#from celery.worker.job import Request
+from celery.app.task import Context
+from celery.concurrency.solo import TaskPool
+from celery.app.amqp import TASK_BARE
+from time import time
+from Queue import Queue
+from librabbitmq import Message
+from celery.utils.functional import noop
+from celery.worker.job import NEEDS_KWDICT
+from celery.datastructures import AttributeDict
+import socket
+import sys
+
+@task(accept_magic_kwargs=False)
+def T():
+    pass
+
+class Request(object):
+    #__slots__ = ("app", "name", "id", "args", "kwargs",
+    #             "on_ack", "delivery_info", "hostname",
+    #             "eventer", "connection_errors",
+    #             "task", "eta", "expires", "flags",
+    #             "request_dict", "acknowledged",
+    #             "worker_pid", "started",
+    #             "_already_revoked", "_terminate_on_ack", "_tzlocal")
+    eta = None
+    started = False
+    acknowledged = _already_revoked = False
+    worker_pid = _terminate_on_ack = None
+    _tzlocal = None
+    expires = None
+    delivery_info = {}
+    flags = 0
+    args = ()
+
+    def __init__(self, body, on_ack=noop,
+            hostname=None, eventer=None, app=None,
+            connection_errors=None, request_dict=None,
+            delivery_info=None, task=None, Context=Context, **opts):
+        self.app = app
+        self.name = body["task"]
+        self.id = body["id"]
+        self.args = body["args"]
+        try:
+            self.kwargs = body["kwargs"]
+            if NEEDS_KWDICT:
+                self.kwargs = kwdict(self.kwargs)
+        except KeyError:
+            self.kwargs = {}
+        try:
+            self.flags = body["flags"]
+        except KeyError:
+            pass
+        self.on_ack = on_ack
+        self.hostname = hostname
+        self.eventer = eventer
+        self.connection_errors = connection_errors or ()
+        self.task = task or self.app._tasks[self.name]
+        if "eta" in body:
+            eta = body["eta"]
+            tz = tz_utc if utc else self.tzlocal
+            self.eta = tz_to_local(maybe_iso8601(eta), self.tzlocal, tz)
+        if "expires" in body:
+            expires = body["expires"]
+            tz = tz_utc if utc else self.tzlocal
+            self.expires = tz_to_local(maybe_iso8601(expires),
+                                       self.tzlocal, tz)
+        if delivery_info:
+            self.delivery_info = {
+                "exchange": delivery_info.get("exchange"),
+                "routing_key": delivery_info.get("routing_key"),
+            }
+
+        self.request_dict = AttributeDict(
+                {"called_directly": False,
+                 "callbacks": [],
+                 "errbacks": [],
+                 "chord": None}, **body)
+
+
+
+
+tid = uuid()
+hostname = socket.gethostname()
+task = {"task": T.name, "args": (), "kwargs": {}, "id": tid, "flags": 0}
+app = current_app._get_current_object()
+
+m = Message(None, {}, {}, task)
+
+ts = time()
+for i in xrange(1000000):
+    x = Request(task, hostname=hostname, app=app, task=task)
+print(time() - ts)
+

+ 46 - 0
funtests/benchmarks/trace.py

@@ -0,0 +1,46 @@
+from celery import current_app, task, uuid
+from celery.worker.consumer import Consumer
+from celery.worker.job import Request
+from celery.concurrency.solo import TaskPool
+from celery.app.amqp import TASK_BARE
+from time import time
+from Queue import Queue
+from librabbitmq import Message
+import socket
+import sys
+
+@task(accept_magic_kwargs=False)
+def T():
+    pass
+
+tid = uuid()
+P = TaskPool()
+hostname = socket.gethostname()
+task = {"task": T.name, "args": (), "kwargs": {}, "id": tid, "flags": 0}
+app = current_app._get_current_object()
+ready_queue = Queue()
+
+def on_put(req):
+    req.execute_using_pool(P)
+
+def on_ack(*a): pass
+
+
+m = Message(None, {}, {}, task)
+
+ready_queue.put = on_put
+x = Consumer(ready_queue, hostname=hostname, app=app)
+x.update_strategies()
+name = T.name
+ts = time()
+from celery.datastructures import AttributeDict
+from celery.task.trace import trace_task_ret
+request = AttributeDict(
+                {"called_directly": False,
+                 "callbacks": [],
+                 "errbacks": [],
+                 "chord": None}, **task)
+for i in xrange(100000):
+    trace_task_ret(T, tid, (), {}, request)
+print(time() - ts)
+