Quellcode durchsuchen

Must handle pool results for every task received

Ask Solem vor 12 Jahren
Ursprung
Commit
b840f02326

+ 1 - 1
celery/backends/redis.py

@@ -69,7 +69,7 @@ class RedisBackend(KeyValueStoreBackend):
         uhost = uport = upass = udb = None
         if url:
             _, uhost, uport, _, upass, udb, _ = _parse_url(url)
-            udb = udb.strip('/')
+            udb = udb.strip('/') if udb else 0
         self.host = uhost or host or _get('HOST') or self.host
         self.port = int(uport or port or _get('PORT') or self.port)
         self.db = udb or db or _get('DB') or self.db

+ 4 - 4
celery/bin/celeryd.py

@@ -77,11 +77,11 @@ The :program:`celery worker` command (previously known as ``celeryd``)
 
 .. cmdoption:: --time-limit
 
-    Enables a hard time limit (in seconds) for tasks.
+    Enables a hard time limit (in seconds int/float) for tasks.
 
 .. cmdoption:: --soft-time-limit
 
-    Enables a soft time limit (in seconds) for tasks.
+    Enables a soft time limit (in seconds int/float) for tasks.
 
 .. cmdoption:: --maxtasksperchild
 
@@ -179,10 +179,10 @@ class WorkerCommand(Command):
                 default=conf.CELERYD_STATE_DB, dest='state_db'),
             Option('-E', '--events', default=conf.CELERY_SEND_EVENTS,
                 action='store_true', dest='send_events'),
-            Option('--time-limit', type='int', dest='task_time_limit',
+            Option('--time-limit', type='float', dest='task_time_limit',
                 default=conf.CELERYD_TASK_TIME_LIMIT),
             Option('--soft-time-limit', dest='task_soft_time_limit',
-                default=conf.CELERYD_TASK_SOFT_TIME_LIMIT, type='int'),
+                default=conf.CELERYD_TASK_SOFT_TIME_LIMIT, type='float'),
             Option('--maxtasksperchild', dest='max_tasks_per_child',
                 default=conf.CELERYD_MAX_TASKS_PER_CHILD, type='int'),
             Option('--queues', '-Q', default=[]),

+ 3 - 0
celery/concurrency/base.py

@@ -83,6 +83,9 @@ class BasePool(object):
     def on_hard_timeout(self, job):
         pass
 
+    def maybe_handle_result(self, *args):
+        pass
+
     def maintain_pool(self, *args, **kwargs):
         pass
 

+ 8 - 13
celery/concurrency/processes/__init__.py

@@ -79,10 +79,14 @@ class TaskPool(BasePool):
         Will pre-fork all workers so they're ready to accept tasks.
 
         """
-        self._pool = self.Pool(processes=self.limit,
-                               initializer=process_initializer,
-                               **self.options)
-        self.on_apply = self._pool.apply_async
+        P = self._pool = self.Pool(processes=self.limit,
+                                   initializer=process_initializer,
+                                   **self.options)
+        self.on_apply = P.apply_async
+        self.on_soft_timeout = P._timeout_handler.on_soft_timeout
+        self.on_hard_timeout = P._timeout_handler.on_hard_timeout
+        self.maintain_pool = P.maintain_pool
+        self.maybe_handle_result = P._result_handler.handle_event
 
     def did_start_ok(self):
         return self._pool.did_start_ok()
@@ -131,15 +135,6 @@ class TaskPool(BasePool):
         if self._pool._timeout_handler:
             self._pool._timeout_handler.handle_event()
 
-    def on_soft_timeout(self, job):
-        self._pool._timeout_handler.on_soft_timeout(job)
-
-    def on_hard_timeout(self, job):
-        self._pool._timeout_handler.on_hard_timeout(job)
-
-    def maintain_pool(self, *args, **kwargs):
-        self._pool.maintain_pool(*args, **kwargs)
-
     @property
     def num_processes(self):
         return self._pool._processes

+ 4 - 0
celery/worker/__init__.py

@@ -106,6 +106,10 @@ class Pool(bootsteps.StartStopComponent):
         if not pool.did_start_ok():
             raise WorkerLostError('Could not start worker processes')
 
+        # need to handle pool results before every task
+        # since multiple tasks can be received in a single poll()
+        hub.on_task.append(pool.maybe_handle_result)
+
         hub.update_readers(pool.readers)
         for handler, interval in pool.timers.iteritems():
             hub.timer.apply_interval(interval * 1000.0, handler)

+ 1 - 0
celery/worker/consumer.py

@@ -405,6 +405,7 @@ class Consumer(object):
                     self.handle_unknown_task(body, message, exc)
                 except InvalidTaskError, exc:
                     self.handle_invalid_task(body, message, exc)
+                fire_timers()
 
             self.task_consumer.callbacks = [on_task_received]
             self.task_consumer.consume()