소스 검색

Merge branch '3.0'

Conflicts:
	Changelog
	README.rst
	celery/__init__.py
	celery/app/base.py
	celery/backends/base.py
	celery/events/__init__.py
	celery/result.py
	celery/tests/__init__.py
	celery/tests/worker/test_worker.py
	celery/worker/__init__.py
	celery/worker/job.py
	docs/includes/introduction.txt
	examples/app/myapp.py
	requirements/default.txt
	setup.cfg
Ask Solem 12 년 전
부모
커밋
15992f8af4

+ 2 - 2
celery/app/builtins.py

@@ -89,7 +89,7 @@ def add_unlock_chord_task(app):
         # check if the task group is ready, and if so apply the callback.
         deps = GroupResult(
             group_id,
-            [from_serializable(r, Result=Result) for r in result],
+            [from_serializable(r, app=app) for r in result],
         )
         j = deps.join_native if deps.supports_native_join else deps.join
 
@@ -168,7 +168,7 @@ def add_group_task(app):
 
         def run(self, tasks, result, group_id, partial_args):
             app = self.app
-            result = from_serializable(result)
+            result = from_serializable(result, app)
             # any partial args are added to all tasks in the group
             taskit = (subtask(task).clone(partial_args)
                       for i, task in enumerate(tasks))

+ 2 - 2
celery/app/task.py

@@ -640,14 +640,14 @@ class Task(object):
         state = states.SUCCESS if info is None else info.state
         return EagerResult(task_id, retval, state, traceback=tb)
 
-    def AsyncResult(self, task_id):
+    def AsyncResult(self, task_id, **kwargs):
         """Get AsyncResult instance for this kind of task.
 
         :param task_id: Task id to get result for.
 
         """
         return self._get_app().AsyncResult(task_id, backend=self.backend,
-                                           task_name=self.name)
+                                           task_name=self.name, **kwargs)
 
     def subtask(self, *args, **kwargs):
         """Returns :class:`~celery.subtask` object for

+ 5 - 2
celery/app/utils.py

@@ -18,7 +18,7 @@ try:
 except ImportError:  # pragma: no cover
     _forking = None  # noqa
 
-from celery import platforms
+from celery.platforms import pyimplementation, IS_WINDOWS
 from celery.five import items
 from celery.datastructures import ConfigurationView, DictAttribute
 from celery.utils.text import pretty
@@ -42,6 +42,7 @@ HIDDEN_SETTINGS = re.compile(
     re.IGNORECASE,
 )
 
+
 class Settings(ConfigurationView):
     """Celery settings object."""
 
@@ -81,6 +82,8 @@ class Settings(ConfigurationView):
         # is enabled.  There may be a better way to do this, but attempts
         # at forcing the subprocess to import the modules did not work out,
         # because of some sys.path problem.  More at Issue #1126.
+        if IS_WINDOWS:
+            return {}  # Django Settings object is not pickleable
         if _forking and _forking._forking_is_enabled:
             return self.changes
         R = {}
@@ -195,7 +198,7 @@ def bugreport(app):
     return BUGREPORT_INFO.format(
         system=_platform.system(),
         arch=', '.join(x for x in _platform.architecture() if x),
-        py_i=platforms.pyimplementation(),
+        py_i=pyimplementation(),
         celery_v=celery.VERSION_BANNER,
         kombu_v=kombu.__version__,
         billiard_v=billiard.__version__,

+ 2 - 2
celery/backends/base.py

@@ -414,12 +414,12 @@ class KeyValueStoreBackend(BaseBackend):
             meta = self.decode(meta)
             result = meta['result']
             if isinstance(result, (list, tuple)):
-                return {'result': from_serializable(result)}
+                return {'result': from_serializable(result, self.app)}
             return meta
 
     def on_chord_apply(self, group_id, body, result=None, **kwargs):
         if self.implements_incr:
-            self.app.GroupResult(group_id, result).save()
+            self.save_group(group_id, self.app.GroupResult(group_id, result))
         else:
             self.fallback_chord_unlock(group_id, body, result, **kwargs)
 

+ 1 - 1
celery/backends/cache.py

@@ -115,7 +115,7 @@ class CacheBackend(KeyValueStoreBackend):
 
     def on_chord_apply(self, group_id, body, result=None, **kwargs):
         self.client.set(self.get_key_for_chord(group_id), '0', time=86400)
-        self.app.GroupResult(group_id, result).save()
+        self.save_group(group_id, self.app.GroupResult(group_id, result))
 
     def incr(self, key):
         return self.client.incr(key)

+ 1 - 2
celery/canvas.py

@@ -473,8 +473,7 @@ class chord(Signature):
         if _chord.app.conf.CELERY_ALWAYS_EAGER:
             return self.apply((), kwargs)
         callback_id = body.options.setdefault('task_id', uuid())
-        _chord(**kwargs)
-        return _chord.AsyncResult(callback_id)
+        return _chord.AsyncResult(callback_id, parent=_chord(**kwargs))
 
     def clone(self, *args, **kwargs):
         s = Signature.clone(self, *args, **kwargs)

+ 2 - 2
celery/concurrency/processes.py

@@ -42,12 +42,12 @@ def process_initializer(app, hostname):
     # This is for Windows and other platforms not supporting
     # fork(). Note that init_worker makes sure it's only
     # run once per process.
+    app.loader.init_worker()
+    app.loader.init_worker_process()
     app.log.setup(int(os.environ.get('CELERY_LOG_LEVEL', 0)),
                   os.environ.get('CELERY_LOG_FILE') or None,
                   bool(os.environ.get('CELERY_LOG_REDIRECT', False)),
                   str(os.environ.get('CELERY_LOG_REDIRECT_LEVEL')))
-    app.loader.init_worker()
-    app.loader.init_worker_process()
     if os.environ.get('FORKED_BY_MULTIPROCESSING'):
         # pool did execv after fork
         trace.setup_worker_optimizations(app)

+ 19 - 8
celery/result.py

@@ -236,7 +236,7 @@ class AsyncResult(ResultBase):
     def children(self):
         children = self.backend.get_children(self.id)
         if children:
-            return [from_serializable(child) for child in children]
+            return [from_serializable(child, self.app) for child in children]
 
     @property
     def result(self):
@@ -401,11 +401,20 @@ class ResultSet(ResultBase):
         for result in self.results:
             result.forget()
 
-    def revoke(self, connection=None):
-        """Revoke all tasks in the set."""
+    def revoke(self, connection=None, terminate=False, signal=None):
+        """Send revoke signal to all workers for all tasks in the set.
+
+        :keyword terminate: Also terminate the process currently working
+            on the task (if any).
+        :keyword signal: Name of signal to send to process if terminate.
+            Default is TERM.
+
+        """
         with self.app.connection_or_acquire(connection) as conn:
             for result in self.results:
-                result.revoke(connection=conn)
+                result.revoke(
+                    connection=conn, terminate=terminate, signal=signal,
+                )
 
     def __iter__(self):
         return self.iterate()
@@ -697,7 +706,7 @@ class EagerResult(AsyncResult):
     def forget(self):
         pass
 
-    def revoke(self):
+    def revoke(self, *args, **kwargs):
         self._state = states.REVOKED
 
     def __repr__(self):
@@ -724,15 +733,17 @@ class EagerResult(AsyncResult):
         return False
 
 
-def from_serializable(r, Result=AsyncResult):
+def from_serializable(r, app=None):
     # earlier backends may just pickle, so check if
     # result is already prepared.
+    app = app_or_default(app)
+    Result = app.AsyncResult
     if not isinstance(r, ResultBase):
         id = parent = None
         res, nodes = r
         if nodes:
-            return GroupResult(
-                res, [from_serializable(child, Result) for child in nodes],
+            return app.GroupResult(
+                res, [from_serializable(child, app) for child in nodes],
             )
         if isinstance(res, (list, tuple)):
             id, parent = res[0], res[1]

+ 12 - 11
celery/tests/__init__.py

@@ -14,21 +14,22 @@ except NameError:
     class WindowsError(Exception):
         pass
 
+config_module = os.environ.setdefault(
+    'CELERY_TEST_CONFIG_MODULE', 'celery.tests.config',
+)
 
-def setup():
-    config_module = os.environ.setdefault(
-        'CELERY_TEST_CONFIG_MODULE', 'celery.tests.config',
-    )
+os.environ.setdefault('CELERY_CONFIG_MODULE', config_module)
+os.environ['CELERY_LOADER'] = 'default'
+os.environ['EVENTLET_NOPATCH'] = 'yes'
+os.environ['GEVENT_NOPATCH'] = 'yes'
+os.environ['KOMBU_DISABLE_LIMIT_PROTECTION'] = 'yes'
+os.environ['CELERY_BROKER_URL'] = 'memory://'
 
-    os.environ.setdefault('CELERY_CONFIG_MODULE', config_module)
-    os.environ['CELERY_LOADER'] = 'default'
-    os.environ['EVENTLET_NOPATCH'] = 'yes'
-    os.environ['GEVENT_NOPATCH'] = 'yes'
-    os.environ['KOMBU_DISABLE_LIMIT_PROTECTION'] = 'yes'
-    os.environ['CELERY_BROKER_URL'] = 'memory://'
 
+def setup():
     if os.environ.get('COVER_ALL_MODULES') or '--with-coverage3' in sys.argv:
-        with warnings.catch_warnings(record=True):
+        from celery.tests.utils import catch_warnings
+        with catch_warnings(record=True):
             import_all_modules()
         warnings.resetwarnings()
 

+ 4 - 4
celery/tests/tasks/test_result.py

@@ -579,10 +579,10 @@ class test_serializable(AppCase):
 
     def test_AsyncResult(self):
         x = AsyncResult(uuid())
-        self.assertEqual(x, from_serializable(x.serializable()))
-        self.assertEqual(x, from_serializable(x))
+        self.assertEqual(x, from_serializable(x.serializable(), self.app))
+        self.assertEqual(x, from_serializable(x, self.app))
 
     def test_GroupResult(self):
         x = GroupResult(uuid(), [AsyncResult(uuid()) for _ in range(10)])
-        self.assertEqual(x, from_serializable(x.serializable()))
-        self.assertEqual(x, from_serializable(x))
+        self.assertEqual(x, from_serializable(x.serializable(), self.app))
+        self.assertEqual(x, from_serializable(x, self.app))

+ 15 - 3
celery/tests/worker/test_worker.py

@@ -26,7 +26,7 @@ from celery.task import periodic_task as periodic_task_dec
 from celery.utils import uuid
 from celery.worker import WorkController
 from celery.worker import components
-from celery.worker.buckets import FastQueue
+from celery.worker.buckets import FastQueue, AsyncTaskBucket
 from celery.worker.job import Request
 from celery.worker import consumer
 from celery.worker.consumer import Consumer as __Consumer
@@ -1030,15 +1030,27 @@ class test_WorkController(AppCase):
         self.assertIsNone(worker.mediator)
         self.assertEqual(worker.ready_queue.put, worker.process_task)
 
+    def test_enable_rate_limits_eventloop(self):
+        try:
+            worker = self.create_worker(disable_rate_limits=False,
+                                        use_eventloop=True,
+                                        pool_cls='processes')
+        except ImportError:
+            raise SkipTest('multiprocessing not supported')
+        self.assertIsInstance(worker.ready_queue, AsyncTaskBucket)
+        self.assertFalse(worker.mediator)
+        self.assertNotEqual(worker.ready_queue.put, worker.process_task)
+
     def test_disable_rate_limits_processes(self):
         try:
             worker = self.create_worker(disable_rate_limits=True,
+                                        use_eventloop=False,
                                         pool_cls='processes')
         except ImportError:
             raise SkipTest('multiprocessing not supported')
         self.assertIsInstance(worker.ready_queue, FastQueue)
-        self.assertTrue(worker.mediator)
-        self.assertNotEqual(worker.ready_queue.put, worker.process_task)
+        self.assertFalse(worker.mediator)
+        self.assertEqual(worker.ready_queue.put, worker.process_task)
 
     def test_process_task_sem(self):
         worker = self.worker

+ 18 - 10
celery/utils/timer2.py

@@ -18,6 +18,7 @@ from datetime import datetime
 from functools import wraps
 from itertools import count
 from time import time, sleep
+from weakref import proxy as weakrefproxy
 
 from celery.five import THREAD_TIMEOUT_MAX, map
 from celery.utils.timeutils import timedelta_seconds, timezone
@@ -33,24 +34,34 @@ __docformat__ = 'restructuredtext'
 DEFAULT_MAX_INTERVAL = 2
 TIMER_DEBUG = os.environ.get('TIMER_DEBUG')
 EPOCH = datetime.utcfromtimestamp(0).replace(tzinfo=timezone.utc)
+IS_PYPY = hasattr(sys, 'pypy_version_info')
 
 logger = get_logger('timer2')
 
 
 class Entry(object):
-    cancelled = False
+    if not IS_PYPY:
+        __slots__ = (
+            'fun', 'args', 'kwargs', 'tref', 'cancelled',
+            '_last_run', '__weakref__',
+        )
 
     def __init__(self, fun, args=None, kwargs=None):
         self.fun = fun
         self.args = args or []
         self.kwargs = kwargs or {}
-        self.tref = self
+        self.tref = weakrefproxy(self)
+        self._last_run = None
+        self.cancelled = False
 
     def __call__(self):
         return self.fun(*self.args, **self.kwargs)
 
     def cancel(self):
-        self.tref.cancelled = True
+        try:
+            self.tref.cancelled = True
+        except ReferenceError:
+            pass
 
     def __repr__(self):
         return '<TimerEntry: {0}(*{1!r}, **{2!r})'.format(
@@ -131,7 +142,7 @@ class Schedule(object):
     def apply_at(self, eta, fun, args=(), kwargs={}, priority=0):
         return self.enter(self.Entry(fun, args, kwargs), eta, priority)
 
-    def enter_after(self, msecs, entry, priority=0):
+    def enter_after(self, msecs, entry, priority=0, time=time):
         return self.enter(entry, time() + (msecs / 1000.0), priority)
 
     def apply_after(self, msecs, fun, args=(), kwargs={}, priority=0):
@@ -163,12 +174,9 @@ class Schedule(object):
     def schedule(self):
         return self
 
-    def __iter__(self):
+    def __iter__(self, min=min, nowfun=time, pop=heapq.heappop,
+                 push=heapq.heappush):
         """The iterator yields the time to sleep for between runs."""
-
-        # localize variable access
-        nowfun = time
-        pop = heapq.heappop
         max_interval = self.max_interval
         queue = self._queue
 
@@ -187,7 +195,7 @@ class Schedule(object):
                             yield None, entry
                         continue
                     else:
-                        heapq.heappush(queue, event)
+                        push(queue, event)
             else:
                 yield None, None
 

+ 6 - 3
celery/worker/__init__.py

@@ -111,8 +111,8 @@ class WorkController(configurated):
         self._finalize = Finalize(self, self.stop, exitpriority=1)
         self.setup_instance(**self.prepare_args(**kwargs))
 
-    def setup_instance(self, queues=None, ready_callback=None,
-                       pidfile=None, include=None, **kwargs):
+    def setup_instance(self, queues=None, ready_callback=None, pidfile=None,
+                       include=None, use_eventloop=None, **kwargs):
         self.pidfile = pidfile
         self.setup_defaults(kwargs, namespace='celeryd')
         self.setup_queues(queues)
@@ -130,7 +130,10 @@ class WorkController(configurated):
         self.ready_callback = ready_callback or self.on_consumer_ready
         # this connection is not established, only used for params
         self._conninfo = self.app.connection()
-        self.use_eventloop = self.should_use_eventloop()
+        self.use_eventloop = (
+            self.should_use_eventloop() if use_eventloop is None
+            else use_eventloop
+        )
         self.options = kwargs
 
         signals.worker_init.send(sender=self)

+ 50 - 1
celery/worker/buckets.py

@@ -30,6 +30,53 @@ class RateLimitExceeded(Exception):
     """The token buckets rate limit has been exceeded."""
 
 
+class AsyncTaskBucket(object):
+
+    def __init__(self, task_registry, callback=None, worker=None):
+        self.task_registry = task_registry
+        self.callback = callback
+        self.worker = worker
+        self.buckets = {}
+        self.refresh()
+
+    def cont(self, request, bucket, tokens):
+        if not bucket.can_consume(tokens):
+            hold = bucket.expected_time(tokens)
+            self.worker.timer.apply_after(
+                hold * 1000.0, self.cont, (request, bucket, tokens),
+            )
+        else:
+            self.callback(request)
+
+    def put(self, request):
+        name = request.name
+        try:
+            bucket = self.buckets[name]
+        except KeyError:
+            bucket = self.add_bucket_for_type(name)
+        if not bucket:
+            return self.callback(request)
+        return self.cont(request, bucket, 1)
+
+    def add_task_type(self, name):
+        task_type = self.task_registry[name]
+        limit = getattr(task_type, 'rate_limit', None)
+        limit = timeutils.rate(limit)
+        bucket = self.buckets[name] = (
+            TokenBucket(limit, capacity=1) if limit else None
+        )
+        return bucket
+
+    def clear(self):
+        # called by the worker when the connection is lost,
+        # but this also clears out the timer so we be good.
+        pass
+
+    def refresh(self):
+        for name in self.task_registry:
+            self.add_task_type(name)
+
+
 class TaskBucket(object):
     """This is a collection of token buckets, each task type having
     its own token bucket.  If the task type doesn't have a rate limit,
@@ -57,13 +104,15 @@ class TaskBucket(object):
 
     """
 
-    def __init__(self, task_registry):
+    def __init__(self, task_registry, callback=None, worker=None):
         self.task_registry = task_registry
         self.buckets = {}
         self.init_with_registry()
         self.immediate = deque()
         self.mutex = threading.Lock()
         self.not_empty = threading.Condition(self.mutex)
+        self.callback = callback
+        self.worker = worker
 
     def put(self, request):
         """Put a :class:`~celery.worker.job.Request` into

+ 11 - 12
celery/worker/components.py

@@ -21,7 +21,7 @@ from celery.utils.log import worker_logger as logger
 from celery.utils.timer2 import Schedule
 
 from . import hub
-from .buckets import TaskBucket, FastQueue
+from .buckets import AsyncTaskBucket, TaskBucket, FastQueue
 
 
 class Hub(bootsteps.StartStopStep):
@@ -48,23 +48,22 @@ class Queues(bootsteps.Step):
         w.start_mediator = False
 
     def create(self, w):
+        BucketType = TaskBucket
         w.start_mediator = True
         if not w.pool_cls.rlimit_safe:
             w.disable_rate_limits = True
+        process_task = w.process_task
+        if w.use_eventloop:
+            BucketType = AsyncTaskBucket
+            if w.pool_putlocks and w.pool_cls.uses_semaphore:
+                process_task = w.process_task_sem
         if w.disable_rate_limits:
             w.ready_queue = FastQueue()
-            if w.use_eventloop:
-                w.start_mediator = False
-                if w.pool_putlocks and w.pool_cls.uses_semaphore:
-                    w.ready_queue.put = w.process_task_sem
-                else:
-                    w.ready_queue.put = w.process_task
-            elif not w.pool_cls.requires_mediator:
-                # just send task directly to pool, skip the mediator.
-                w.ready_queue.put = w.process_task
-                w.start_mediator = False
+            w.ready_queue.put = process_task
         else:
-            w.ready_queue = TaskBucket(task_registry=w.app.tasks)
+            w.ready_queue = BucketType(
+                task_registry=w.app.tasks, callback=process_task, worker=w,
+            )
 
 
 class Pool(bootsteps.StartStopStep):

+ 10 - 7
celery/worker/job.py

@@ -39,6 +39,8 @@ from celery.utils.timeutils import maybe_iso8601, timezone, maybe_make_aware
 
 from . import state
 
+IS_PYPY = hasattr(sys, 'pypy_version_info')
+
 logger = get_logger(__name__)
 debug, info, warn, error = (logger.debug, logger.info,
                             logger.warning, logger.error)
@@ -67,13 +69,14 @@ NEEDS_KWDICT = sys.version_info <= (2, 6)
 
 class Request(object):
     """A request for task execution."""
-    __slots__ = ('app', 'name', 'id', 'args', 'kwargs',
-                 'on_ack', 'delivery_info', 'hostname',
-                 'eventer', 'connection_errors',
-                 'task', 'eta', 'expires',
-                 'request_dict', 'acknowledged', 'utc',
-                 'time_start', 'worker_pid', '_already_revoked',
-                 '_terminate_on_ack', '_tzlocal')
+    if not IS_PYPY:
+        __slots__ = (
+            'app', 'name', 'id', 'args', 'kwargs', 'on_ack', 'delivery_info',
+            'hostname', 'eventer', 'connection_errors', 'task', 'eta',
+            'expires', 'request_dict', 'acknowledged', 'utc', 'time_start',
+            'worker_pid', '_already_revoked', '_terminate_on_ack', '_tzlocal',
+            '__weakref__',
+        )
 
     #: Format string used to log task success.
     success_msg = """\

+ 1 - 1
celery/worker/mediator.py

@@ -37,7 +37,7 @@ class WorkerComponent(StartStopStep):
         w.mediator = None
 
     def include_if(self, w):
-        return w.start_mediator
+        return w.start_mediator and not w.use_eventloop
 
     def create(self, w):
         m = w.mediator = self.instantiate(w.mediator_cls, w.ready_queue,

+ 59 - 0
docs/history/changelog-3.0.rst

@@ -9,6 +9,65 @@
 
 If you're looking for versions prior to 3.0.x you should go to :ref:`history`.
 
+.. _version-3.0.17:
+
+3.0.17
+======
+:release-date: 2013-03-22 04:00:00 P.M UTC
+
+- Now depends on kombu 2.5.8
+
+- Now depends on billiard 2.7.3.23
+
+- RabbitMQ/Redis: thread-less and lock-free rate-limit implementation.
+
+    This means that rate limits pose minimal overhead when used with
+    RabbitMQ/Redis or future transports using the eventloop,
+    and that the rate-limit implementation is now thread-less and lock-free.
+
+    The thread-based transports will still use the old implementation for
+    now, but the plan is to use the timer also for other
+    broker transports in Celery 3.1.
+
+- Rate limits now works with eventlet/gevent if using RabbitMQ/Redis as the
+  broker.
+
+- A regression caused ``task.retry`` to ignore additional keyword arguments.
+
+    Extra keyword arguments are now used as execution options again.
+    Fix contributed by Simon Engledew.
+
+- Windows: Fixed problem with the worker trying to pickle the Django settings
+  module at worker startup.
+
+- generic-init.d:  No longer double quotes ``$CELERYD_CHDIR`` (Issue #1235).
+
+- generic-init.d: Removes bash-specific syntax.
+
+    Fix contributed by Pär Wieslander.
+
+- Cassandra Result Backend: Now handles the
+  :exc:`~pycassa.AllServersUnavailable` error (Issue #1010).
+
+    Fix contributed by Jared Biel.
+
+- Result: Now properly forwards apps to GroupResults when deserializing
+  (Issue #1249).
+
+    Fix contributed by Charles-Axel Dein.
+
+- ``GroupResult.revoke`` now supports the ``terminate`` and ``signal``
+  keyword arguments.
+
+- Worker: Multiprocessing pool workers now import task modules/configuration
+  before setting up the logging system so that logging signals can be
+  connected before they're dispatched.
+
+- chord:  The ``AsyncResult`` instance returned now has its ``parent``
+  attribute set to the header ``GroupResult``.
+
+    This is consistent with how ``chain`` works.
+
 .. _version-3.0.16:
 
 3.0.16

+ 3 - 2
examples/app/myapp.py

@@ -10,9 +10,10 @@ Usage:
    32
 
 
-You can also specify the app to use with the celery command:
+You can also specify the app to use with the `celery` command,
+using the `-A` / `--app` option::
 
-    $ celery worker -l info --app=myapp
+    $ celery -A myapp worker -l info
 
 """
 from celery import Celery

+ 1 - 1
extra/generic-init.d/celeryd

@@ -75,7 +75,7 @@ if [ -n "$CELERYD_GROUP" ]; then
 fi
 
 if [ -n "$CELERYD_CHDIR" ]; then
-    DAEMON_OPTS="$DAEMON_OPTS --workdir=\"$CELERYD_CHDIR\""
+    DAEMON_OPTS="$DAEMON_OPTS --workdir=$CELERYD_CHDIR"
 fi
 
 

+ 35 - 0
funtests/benchmarks/timer.py

@@ -0,0 +1,35 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import
+
+import sys
+
+from time import sleep
+from celery.utils import timer2 as timer
+
+def noop(*args, **kwargs):
+    return
+
+
+def insert(s, n=100000):
+    for i in xrange(n):
+        s.apply_after(1 + (i and i / 10.0), noop, (i, ))
+
+
+def slurp(s, n=100000):
+    i = 0
+    it = iter(s)
+    while i < n:
+        delay, entry = next(it)
+        if entry:
+            i += 1
+            s.apply_entry(entry)
+        #else:
+            #if delay:
+            #    sleep(delay)
+
+if __name__ == '__main__':
+    s = timer.Schedule()
+    insert(s)
+    if '--insert-only' not in sys.argv:
+        slurp(s)
+

+ 2 - 2
requirements/default.txt

@@ -1,3 +1,3 @@
 pytz
-billiard>=2.7.3.22
-kombu>=2.5.7
+billiard>=2.7.3.23
+kombu>=2.5.8

+ 2 - 2
setup.cfg

@@ -15,5 +15,5 @@ upload-dir = docs/.build/html
 
 [bdist_rpm]
 requires = pytz
-           billiard >= 2.7.3.22
-           kombu >= 2.5.7
+           billiard >= 2.7.3.23
+           kombu >= 2.5.8