Parcourir la source

Adds app.add_periodic_task

Ask Solem il y a 10 ans
Parent
commit
bc18d0859c
4 fichiers modifiés avec 100 ajouts et 29 suppressions
  1. 59 24
      celery/app/base.py
  2. 1 0
      docs/_ext/celerydocs.py
  3. 36 5
      docs/userguide/periodic-tasks.rst
  4. 4 0
      funtests/stress/stress/app.py

+ 59 - 24
celery/app/base.py

@@ -118,6 +118,7 @@ class Celery(object):
     registry_cls = TaskRegistry
     _fixups = None
     _pool = None
+    _conf = None
     builtin_fixups = BUILTIN_FIXUPS
 
     #: Signal sent when app is loading configuration.
@@ -154,6 +155,7 @@ class Celery(object):
         self.configured = False
         self._config_source = config_source
         self._pending_defaults = deque()
+        self._pending_periodic_tasks = deque()
 
         self.finalized = False
         self._finalize_mutex = threading.Lock()
@@ -311,13 +313,13 @@ class Celery(object):
         if not callable(fun):
             d, fun = fun, lambda: d
         if self.configured:
-            return self.conf.add_defaults(fun())
+            return self._conf.add_defaults(fun())
         self._pending_defaults.append(fun)
 
     def config_from_object(self, obj, silent=False, force=False):
         self._config_source = obj
         if force or self.configured:
-            del(self.conf)
+            self._conf = None
             return self.loader.config_from_object(obj, silent=silent)
 
     def config_from_envvar(self, variable_name, silent=False, force=False):
@@ -330,7 +332,9 @@ class Celery(object):
         return self.config_from_object(module_name, silent=silent, force=force)
 
     def config_from_cmdline(self, argv, namespace='celery'):
-        self.conf.update(self.loader.cmdline_config_parser(argv, namespace))
+        (self._conf if self.configured else self.conf).update(
+            self.loader.cmdline_config_parser(argv, namespace)
+        )
 
     def setup_security(self, allowed_serializers=None, key=None, cert=None,
                        store=None, digest='sha1', serializer='json'):
@@ -440,18 +444,19 @@ class Celery(object):
         return self.loader.now(utc=self.conf.CELERY_ENABLE_UTC)
 
     def mail_admins(self, subject, body, fail_silently=False):
-        if self.conf.ADMINS:
-            to = [admin_email for _, admin_email in self.conf.ADMINS]
+        conf = self.conf
+        if conf.ADMINS:
+            to = [admin_email for _, admin_email in conf.ADMINS]
             return self.loader.mail_admins(
                 subject, body, fail_silently, to=to,
-                sender=self.conf.SERVER_EMAIL,
-                host=self.conf.EMAIL_HOST,
-                port=self.conf.EMAIL_PORT,
-                user=self.conf.EMAIL_HOST_USER,
-                password=self.conf.EMAIL_HOST_PASSWORD,
-                timeout=self.conf.EMAIL_TIMEOUT,
-                use_ssl=self.conf.EMAIL_USE_SSL,
-                use_tls=self.conf.EMAIL_USE_TLS,
+                sender=conf.SERVER_EMAIL,
+                host=conf.EMAIL_HOST,
+                port=conf.EMAIL_PORT,
+                user=conf.EMAIL_HOST_USER,
+                password=conf.EMAIL_HOST_PASSWORD,
+                timeout=conf.EMAIL_TIMEOUT,
+                use_ssl=conf.EMAIL_USE_SSL,
+                use_tls=conf.EMAIL_USE_TLS,
             )
 
     def select_queues(self, queues=None):
@@ -472,7 +477,7 @@ class Celery(object):
             self.loader)
         return backend(app=self, url=url)
 
-    def _get_config(self):
+    def _load_config(self):
         if isinstance(self.on_configure, Signal):
             self.on_configure.send(sender=self)
         else:
@@ -482,12 +487,19 @@ class Celery(object):
             self.loader.config_from_object(self._config_source)
         defaults = dict(deepcopy(DEFAULTS), **self._preconf)
         self.configured = True
-        s = Settings({}, [self.prepare_config(self.loader.conf),
-                          defaults])
+        s = self._conf = Settings(
+            {}, [self.prepare_config(self.loader.conf), defaults],
+        )
         # load lazy config dict initializers.
-        pending = self._pending_defaults
-        while pending:
-            s.add_defaults(maybe_evaluate(pending.popleft()()))
+        pending_def = self._pending_defaults
+        while pending_def:
+            s.add_defaults(maybe_evaluate(pending_def.popleft()()))
+
+        # load lazy periodic tasks
+        pending_beat = self._pending_periodic_tasks
+        while pending_beat:
+            pargs, pkwargs = pending_beat.popleft()
+            self._add_periodic_task(*pargs, **pkwargs)
         self.on_after_configure.send(sender=self, source=s)
         return s
 
@@ -507,6 +519,27 @@ class Celery(object):
         kwargs['app'] = self
         return self.canvas.signature(*args, **kwargs)
 
+    def add_periodic_task(self, *args, **kwargs):
+        if not self.configured:
+            return self._pending_periodic_tasks.append((args, kwargs))
+        return self._add_periodic_task(*args, **kwargs)
+
+    def _add_periodic_task(self, schedule, sig,
+                           args=(), kwargs={}, name=None, **opts):
+        from .task import Task
+
+        sig = (self.signature(sig.name, args, kwargs)
+               if isinstance(sig, Task) else sig.clone(args, kwargs))
+
+        name = name or ':'.join([sig.name, ','.join(map(str, sig.args))])
+        self._conf.CELERYBEAT_SCHEDULE[name] = {
+            'schedule': schedule,
+            'task': sig.name,
+            'args': sig.args,
+            'kwargs': sig.kwargs,
+            'options': dict(sig.options, **opts),
+        }
+
     def create_task_cls(self):
         """Creates a base task class using default configuration
         taken from this app."""
@@ -568,7 +601,7 @@ class Celery(object):
         when unpickling."""
         return {
             'main': self.main,
-            'changes': self.conf.changes,
+            'changes': self._conf.changes if self._conf else {},
             'loader': self.loader_cls,
             'backend': self.backend_cls,
             'amqp': self.amqp_cls,
@@ -582,7 +615,7 @@ class Celery(object):
 
     def __reduce_args__(self):
         """Deprecated method, please use :meth:`__reduce_keys__` instead."""
-        return (self.main, self.conf.changes,
+        return (self.main, self._conf.changes if self._conf else {},
                 self.loader_cls, self.backend_cls, self.amqp_cls,
                 self.events_cls, self.log_cls, self.control_cls,
                 False, self._config_source)
@@ -653,9 +686,11 @@ class Celery(object):
     def backend(self):
         return self._get_backend()
 
-    @cached_property
+    @property
     def conf(self):
-        return self._get_config()
+        if self._conf is None:
+            self._load_config()
+        return self._conf
 
     @cached_property
     def control(self):
@@ -691,5 +726,5 @@ class Celery(object):
         if not tz:
             return (timezone.get_timezone('UTC') if conf.CELERY_ENABLE_UTC
                     else timezone.local)
-        return timezone.get_timezone(self.conf.CELERY_TIMEZONE)
+        return timezone.get_timezone(conf.CELERY_TIMEZONE)
 App = Celery  # compat

+ 1 - 0
docs/_ext/celerydocs.py

@@ -34,6 +34,7 @@ APPDIRECT = {
     'select_queues', 'either', 'bugreport', 'create_task_cls',
     'subclass_with_self', 'annotations', 'current_task', 'oid',
     'timezone', '__reduce_keys__', 'fixups', 'finalized', 'configured',
+    'add_periodic_task',
     'autofinalize', 'steps', 'user_options', 'main', 'clock',
 }
 

+ 36 - 5
docs/userguide/periodic-tasks.rst

@@ -74,19 +74,50 @@ schedule manually.
 Entries
 =======
 
-To schedule a task periodically you have to add an entry to the
-:setting:`CELERYBEAT_SCHEDULE` setting.
+To call a task periodically you have to add an entry to the
+beat schedule list.
+
+.. code-block:: python
+
+    from celery import Celery
+    from celery.schedules import crontab
+
+    app = Celery()
+
+    @app.on_after_configure.connect
+    def setup_periodic_tasks(sender, **kwargs):
+        # Calls test('hello') every 10 seconds.
+        sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
+
+        # Calls test('world') every 30 seconds
+        sender.add_periodic_task(30.0, test.s('world'), expires=10)
+
+        # Executes every Monday morning at 7:30 A.M
+        sender.add_periodic_task(
+            crontab(hour=7, minute=30, day_of_week=1),
+            test.s('Happy Mondays!'),
+        )
+
+    @app.task
+    def test(arg):
+        print(arg)
+
+
+Setting these up from within the ``on_after_configure`` handler means
+that we will not evaluate the app at module level when using ``test.s()``.
+
+The `@add_periodic_task` function will add the entry to the
+:setting:`CELERYBEAT_SCHEDULE` setting behind the scenes, which also
+can be used to set up periodic tasks manually:
 
 Example: Run the `tasks.add` task every 30 seconds.
 
 .. code-block:: python
 
-    from datetime import timedelta
-
     CELERYBEAT_SCHEDULE = {
         'add-every-30-seconds': {
             'task': 'tasks.add',
-            'schedule': timedelta(seconds=30),
+            'schedule': 30.0,
             'args': (16, 16)
         },
     }

+ 4 - 0
funtests/stress/stress/app.py

@@ -149,3 +149,7 @@ def marker(s, sep='-'):
             return _marker.delay(s, sep)
         except Exception as exc:
             print("Retrying marker.delay(). It failed to start: %s" % exc)
+
+@app.on_after_configure.connect
+def setup_periodic_tasks(sender, **kwargs):
+    sender.add_periodic_task(10, add.s(2, 2), expires=10)