Ask Solem 12 лет назад
Родитель
Сommit
327c37ce52
2 измененных файлов с 9 добавлено и 6 удалено
  1. 3 1
      celery/backends/rpc.py
  2. 6 5
      celery/concurrency/processes.py

+ 3 - 1
celery/backends/rpc.py

@@ -36,7 +36,9 @@ class RPCBackend(amqp.AMQPBackend):
         return [self.binding]
 
     def _routing_key(self, task_id):
-        return current_task.request.reply_to
+        task = current_task._get_current_object()
+        if task is not None:
+            return task.request.reply_to
 
     def on_reply_declare(self, task_id):
         pass

+ 6 - 5
celery/concurrency/processes.py

@@ -524,9 +524,10 @@ class TaskPool(BasePool):
         if hard:
             self._tref_for_id[job] = hub.timer.apply_at(
                 now() + (hard - soft),
-                self._on_hard_timeout, (job, soft, hard, hub))
+                self._on_hard_timeout, (job, ),
+            )
         try:
-            result = self._pool.cache[job]
+            result = self._pool._cache[job]
         except KeyError:
             pass  # job ready
         else:
@@ -534,18 +535,18 @@ class TaskPool(BasePool):
         finally:
             if not hard:
                 # remove tref
-                self._pool.on_timeout_cancel(job)
+                self._discard_tref(job)
 
     def _on_hard_timeout(self, job):
         try:
-            result = self._pool.cache[job]
+            result = self._pool._cache[job]
         except KeyError:
             pass  # job ready
         else:
             self.on_hard_timeout(result)
         finally:
             # remove tref
-            self._pool.on_timeout_cancel(job)
+            self._discard_tref(job)
 
     def _create_timelimit_handlers(self, hub, now=time):
         apply_after = hub.timer.apply_after