Browse Source

Found some more app leaks

Ask Solem 14 years ago
parent
commit
c99de0404c

+ 3 - 2
celery/bin/celeryctl.py

@@ -247,8 +247,9 @@ class status(Command):
     option_list = inspect.option_list
 
     def run(self, *args, **kwargs):
-        replies = inspect(no_color=kwargs.get("no_color", False)) \
-                            .run("ping", **dict(kwargs, quiet=True))
+        replies = inspect(app=self.app,
+                          no_color=kwargs.get("no_color", False)) \
+                    .run("ping", **dict(kwargs, quiet=True))
         if not replies:
             raise Error("No nodes replied within time constraint")
         nodecount = len(replies)

+ 1 - 1
celery/events/cursesmon.py

@@ -389,7 +389,7 @@ def evtop(app=None):
     sys.stderr.write("-> evtop: starting capture...\n")
     app = app_or_default(app)
     state = State()
-    display = CursesMonitor(state)
+    display = CursesMonitor(state, app=app)
     display.init_screen()
     refresher = DisplayThread(display)
     refresher.start()

+ 4 - 4
celery/task/sets.py

@@ -197,10 +197,10 @@ class TaskSet(UserList):
         """Applies the taskset locally."""
         taskset_id = gen_unique_id()
 
-        # This will be filled with EagerResults.
-        return TaskSetResult(taskset_id, [task.apply(taskset_id=taskset_id)
-                                            for task in self.tasks],
-                             app=self.app)
+        # this will be filled with EagerResults.
+        results = [task.apply(taskset_id=taskset_id)
+                        for task in self.tasks]
+        return self.app.TaskSetResult(taskset_id, results)
 
     @property
     def tasks(self):

+ 0 - 10
celery/utils/__init__.py

@@ -15,7 +15,6 @@ from inspect import getargspec
 from itertools import islice
 
 from carrot.utils import rpartition
-from dateutil.parser import parse as parse_iso8601
 
 from celery.utils.compat import all, any, defaultdict
 from celery.utils.timeutils import timedelta_seconds # was here before
@@ -107,15 +106,6 @@ def noop(*args, **kwargs):
     pass
 
 
-def maybe_iso8601(dt):
-    """``Either datetime | str -> datetime or None -> None``"""
-    if not dt:
-        return
-    if isinstance(dt, datetime):
-        return dt
-    return parse_iso8601(dt)
-
-
 def kwdict(kwargs):
     """Make sure keyword arguments are not in unicode.
 

+ 10 - 0
celery/utils/timeutils.py

@@ -1,6 +1,7 @@
 import math
 
 from datetime import datetime, timedelta
+from dateutil.parser import parse as parse_iso8601
 
 from carrot.utils import partition
 
@@ -116,3 +117,12 @@ def humanize_seconds(secs, prefix=""):
     return "now"
 
 
+def maybe_iso8601(dt):
+    """``Either datetime | str -> datetime or None -> None``"""
+    if not dt:
+        return
+    if isinstance(dt, datetime):
+        return dt
+    return parse_iso8601(dt)
+
+

+ 3 - 2
celery/worker/__init__.py

@@ -137,8 +137,9 @@ class WorkController(object):
 
         self.beat = None
         if self.embed_clockservice:
-            self.beat = beat.EmbeddedService(logger=self.logger,
-                                    schedule_filename=self.schedule_filename)
+            self.beat = beat.EmbeddedService(app=self.app,
+                                logger=self.logger,
+                                schedule_filename=self.schedule_filename)
 
         prefetch_count = self.concurrency * self.prefetch_multiplier
         self.listener = instantiate(self.listener_cls,

+ 2 - 1
celery/worker/job.py

@@ -12,7 +12,8 @@ from celery.datastructures import ExceptionInfo
 from celery.execute.trace import TaskTrace
 from celery.registry import tasks
 from celery.utils import noop, kwdict, fun_takes_kwargs
-from celery.utils import truncate_text, maybe_iso8601
+from celery.utils import truncate_text
+from celery.utils.timeutils import maybe_iso8601
 from celery.worker import state
 
 # pep8.py borks on a inline signature separator and