Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

# -*- coding: utf-8 -*- 

""" 

    celery.app.registry 

    ~~~~~~~~~~~~~~~~~~~ 

 

    Registry of available tasks. 

 

""" 

from __future__ import absolute_import 

 

import inspect 

 

from importlib import import_module 

 

from celery._state import get_current_app 

from celery.exceptions import NotRegistered 

from celery.five import items 

 

 

class TaskRegistry(dict): 

    NotRegistered = NotRegistered 

 

    def __missing__(self, key): 

        raise self.NotRegistered(key) 

 

    def register(self, task): 

        """Register a task in the task registry. 

 

        The task will be automatically instantiated if not already an 

        instance. 

 

        """ 

        self[task.name] = inspect.isclass(task) and task() or task 

 

    def unregister(self, name): 

        """Unregister task by name. 

 

        :param name: name of the task to unregister, or a 

            :class:`celery.task.base.Task` with a valid `name` attribute. 

 

        :raises celery.exceptions.NotRegistered: if the task has not 

            been registered. 

 

        """ 

        try: 

            self.pop(getattr(name, 'name', name)) 

        except KeyError: 

            raise self.NotRegistered(name) 

 

    # -- these methods are irrelevant now and will be removed in 4.0 

    def regular(self): 

        return self.filter_types('regular') 

 

    def periodic(self): 

        return self.filter_types('periodic') 

 

    def filter_types(self, type): 

        return dict((name, task) for name, task in items(self) 

                    if getattr(task, 'type', 'regular') == type) 

 

 

def _unpickle_task(name): 

    return get_current_app().tasks[name] 

 

 

def _unpickle_task_v2(name, module=None): 

    if module: 

        import_module(module) 

    return get_current_app().tasks[name]