|
@@ -187,36 +187,36 @@ class BaseBackend(object):
|
|
|
raise NotImplementedError(
|
|
|
"get_traceback is not supported by this backend.")
|
|
|
|
|
|
- def save_taskset(self, taskset_id, result):
|
|
|
+ def save_group(self, group_id, result):
|
|
|
"""Store the result and status of a task."""
|
|
|
raise NotImplementedError(
|
|
|
- "save_taskset is not supported by this backend.")
|
|
|
+ "save_group is not supported by this backend.")
|
|
|
|
|
|
- def restore_taskset(self, taskset_id, cache=True):
|
|
|
- """Get the result of a taskset."""
|
|
|
+ def restore_group(self, group_id, cache=True):
|
|
|
+ """Get the result of a group."""
|
|
|
raise NotImplementedError(
|
|
|
- "restore_taskset is not supported by this backend.")
|
|
|
+ "restore_group is not supported by this backend.")
|
|
|
|
|
|
- def delete_taskset(self, taskset_id):
|
|
|
+ def delete_group(self, group_id):
|
|
|
raise NotImplementedError(
|
|
|
- "delete_taskset is not supported by this backend.")
|
|
|
+ "delete_group is not supported by this backend.")
|
|
|
|
|
|
def reload_task_result(self, task_id):
|
|
|
"""Reload task result, even if it has been previously fetched."""
|
|
|
raise NotImplementedError(
|
|
|
"reload_task_result is not supported by this backend.")
|
|
|
|
|
|
- def reload_taskset_result(self, task_id):
|
|
|
- """Reload taskset result, even if it has been previously fetched."""
|
|
|
+ def reload_group_result(self, task_id):
|
|
|
+ """Reload group result, even if it has been previously fetched."""
|
|
|
raise NotImplementedError(
|
|
|
- "reload_taskset_result is not supported by this backend.")
|
|
|
+ "reload_group_result is not supported by this backend.")
|
|
|
|
|
|
def on_chord_part_return(self, task, propagate=False):
|
|
|
pass
|
|
|
|
|
|
- def fallback_chord_unlock(self, setid, body, result=None, **kwargs):
|
|
|
+ def fallback_chord_unlock(self, group_id, body, result=None, **kwargs):
|
|
|
kwargs["result"] = [r.id for r in result]
|
|
|
- self.app.tasks["celery.chord_unlock"].apply_async((setid, body, ),
|
|
|
+ self.app.tasks["celery.chord_unlock"].apply_async((group_id, body, ),
|
|
|
kwargs, countdown=1)
|
|
|
on_chord_apply = fallback_chord_unlock
|
|
|
|
|
@@ -287,40 +287,40 @@ class BaseDictBackend(BaseBackend):
|
|
|
def reload_task_result(self, task_id):
|
|
|
self._cache[task_id] = self.get_task_meta(task_id, cache=False)
|
|
|
|
|
|
- def reload_taskset_result(self, taskset_id):
|
|
|
- self._cache[taskset_id] = self.get_taskset_meta(taskset_id,
|
|
|
- cache=False)
|
|
|
+ def reload_group_result(self, group_id):
|
|
|
+ self._cache[group_id] = self.get_group_meta(group_id,
|
|
|
+ cache=False)
|
|
|
|
|
|
- def get_taskset_meta(self, taskset_id, cache=True):
|
|
|
+ def get_group_meta(self, group_id, cache=True):
|
|
|
if cache:
|
|
|
try:
|
|
|
- return self._cache[taskset_id]
|
|
|
+ return self._cache[group_id]
|
|
|
except KeyError:
|
|
|
pass
|
|
|
|
|
|
- meta = self._restore_taskset(taskset_id)
|
|
|
+ meta = self._restore_group(group_id)
|
|
|
if cache and meta is not None:
|
|
|
- self._cache[taskset_id] = meta
|
|
|
+ self._cache[group_id] = meta
|
|
|
return meta
|
|
|
|
|
|
- def restore_taskset(self, taskset_id, cache=True):
|
|
|
- """Get the result for a taskset."""
|
|
|
- meta = self.get_taskset_meta(taskset_id, cache=cache)
|
|
|
+ def restore_group(self, group_id, cache=True):
|
|
|
+ """Get the result for a group."""
|
|
|
+ meta = self.get_group_meta(group_id, cache=cache)
|
|
|
if meta:
|
|
|
return meta["result"]
|
|
|
|
|
|
- def save_taskset(self, taskset_id, result):
|
|
|
- """Store the result of an executed taskset."""
|
|
|
- return self._save_taskset(taskset_id, result)
|
|
|
+ def save_group(self, group_id, result):
|
|
|
+ """Store the result of an executed group."""
|
|
|
+ return self._save_group(group_id, result)
|
|
|
|
|
|
- def delete_taskset(self, taskset_id):
|
|
|
- self._cache.pop(taskset_id, None)
|
|
|
- return self._delete_taskset(taskset_id)
|
|
|
+ def delete_group(self, group_id):
|
|
|
+ self._cache.pop(group_id, None)
|
|
|
+ return self._delete_group(group_id)
|
|
|
|
|
|
|
|
|
class KeyValueStoreBackend(BaseDictBackend):
|
|
|
task_keyprefix = ensure_bytes("celery-task-meta-")
|
|
|
- taskset_keyprefix = ensure_bytes("celery-taskset-meta-")
|
|
|
+ group_keyprefix = ensure_bytes("celery-taskset-meta-")
|
|
|
chord_keyprefix = ensure_bytes("chord-unlock-")
|
|
|
implements_incr = False
|
|
|
|
|
@@ -346,18 +346,18 @@ class KeyValueStoreBackend(BaseDictBackend):
|
|
|
"""Get the cache key for a task by id."""
|
|
|
return self.task_keyprefix + ensure_bytes(task_id)
|
|
|
|
|
|
- def get_key_for_taskset(self, taskset_id):
|
|
|
- """Get the cache key for a taskset by id."""
|
|
|
- return self.taskset_keyprefix + ensure_bytes(taskset_id)
|
|
|
+ def get_key_for_group(self, group_id):
|
|
|
+ """Get the cache key for a group by id."""
|
|
|
+ return self.group_keyprefix + ensure_bytes(group_id)
|
|
|
|
|
|
- def get_key_for_chord(self, taskset_id):
|
|
|
- """Get the cache key for the chord waiting on taskset with given id."""
|
|
|
- return self.chord_keyprefix + ensure_bytes(taskset_id)
|
|
|
+ def get_key_for_chord(self, group_id):
|
|
|
+ """Get the cache key for the chord waiting on group with given id."""
|
|
|
+ return self.chord_keyprefix + ensure_bytes(group_id)
|
|
|
|
|
|
def _strip_prefix(self, key):
|
|
|
"""Takes bytes, emits string."""
|
|
|
key = ensure_bytes(key)
|
|
|
- for prefix in self.task_keyprefix, self.taskset_keyprefix:
|
|
|
+ for prefix in self.task_keyprefix, self.group_keyprefix:
|
|
|
if key.startswith(prefix):
|
|
|
return bytes_to_str(key[len(prefix):])
|
|
|
return bytes_to_str(key)
|
|
@@ -411,13 +411,13 @@ class KeyValueStoreBackend(BaseDictBackend):
|
|
|
self.set(self.get_key_for_task(task_id), self.encode(meta))
|
|
|
return result
|
|
|
|
|
|
- def _save_taskset(self, taskset_id, result):
|
|
|
- self.set(self.get_key_for_taskset(taskset_id),
|
|
|
+ def _save_group(self, group_id, result):
|
|
|
+ self.set(self.get_key_for_group(group_id),
|
|
|
self.encode({"result": result.serializable()}))
|
|
|
return result
|
|
|
|
|
|
- def _delete_taskset(self, taskset_id):
|
|
|
- self.delete(self.get_key_for_taskset(taskset_id))
|
|
|
+ def _delete_group(self, group_id):
|
|
|
+ self.delete(self.get_key_for_group(group_id))
|
|
|
|
|
|
def _get_task_meta_for(self, task_id):
|
|
|
"""Get task metadata for a task by id."""
|
|
@@ -426,9 +426,9 @@ class KeyValueStoreBackend(BaseDictBackend):
|
|
|
return {"status": states.PENDING, "result": None}
|
|
|
return self.decode(meta)
|
|
|
|
|
|
- def _restore_taskset(self, taskset_id):
|
|
|
+ def _restore_group(self, group_id):
|
|
|
"""Get task metadata for a task by id."""
|
|
|
- meta = self.get(self.get_key_for_taskset(taskset_id))
|
|
|
+ meta = self.get(self.get_key_for_group(group_id))
|
|
|
|
|
|
|
|
|
|
|
@@ -439,22 +439,22 @@ class KeyValueStoreBackend(BaseDictBackend):
|
|
|
return {"result": from_serializable(result)}
|
|
|
return meta
|
|
|
|
|
|
- def on_chord_apply(self, setid, body, result=None, **kwargs):
|
|
|
+ def on_chord_apply(self, group_id, body, result=None, **kwargs):
|
|
|
if self.implements_incr:
|
|
|
- self.app.TaskSetResult(setid, result).save()
|
|
|
+ self.app.GroupResult(group_id, result).save()
|
|
|
else:
|
|
|
- self.fallback_chord_unlock(setid, body, result, **kwargs)
|
|
|
+ self.fallback_chord_unlock(group_id, body, result, **kwargs)
|
|
|
|
|
|
def on_chord_part_return(self, task, propagate=False):
|
|
|
if not self.implements_incr:
|
|
|
return
|
|
|
from celery import subtask
|
|
|
- from celery.result import TaskSetResult
|
|
|
- setid = task.request.taskset
|
|
|
- if not setid:
|
|
|
+ from celery.result import GroupResult
|
|
|
+ gid = task.request.group
|
|
|
+ if not gid:
|
|
|
return
|
|
|
- key = self.get_key_for_chord(setid)
|
|
|
- deps = TaskSetResult.restore(setid, backend=task.backend)
|
|
|
+ key = self.get_key_for_chord(gid)
|
|
|
+ deps = GroupResult.restore(gid, backend=task.backend)
|
|
|
val = self.incr(key)
|
|
|
if val >= deps.total:
|
|
|
subtask(task.request.chord).delay(deps.join(propagate=propagate))
|