Browse Source

Clonedigger refactorings

Ask Solem 13 years ago
parent
commit
cbb791b176

+ 1 - 110
celery/app/base.py

@@ -43,23 +43,6 @@ def _unpickle_appattr(reverse_name, args):
 
 
 class Celery(object):
-    """Celery Application.
-
-    :param main: Name of the main module if running as `__main__`.
-    :keyword broker: URL of the default broker used.
-    :keyword loader: The loader class, or the name of the loader class to use.
-                     Default is :class:`celery.loaders.app.AppLoader`.
-    :keyword backend: The result store backend class, or the name of the
-                      backend class to use. Default is the value of the
-                      :setting:`CELERY_RESULT_BACKEND` setting.
-    :keyword amqp: AMQP object or class name.
-    :keyword events: Events object or class name.
-    :keyword log: Log object or class name.
-    :keyword control: Control object or class name.
-    :keyword set_as_current:  Make this the global current app.
-    :keyword tasks: A task registry or the name of a registry class.
-
-    """
     Pickler = AppPickler
 
     SYSTEM = platforms.SYSTEM
@@ -105,7 +88,6 @@ class Celery(object):
         self.on_init()
 
     def set_current(self):
-        """Make this the current app for this thread."""
         _tls.current_app = self
 
     def on_init(self):
@@ -113,47 +95,15 @@ class Celery(object):
         pass
 
     def start(self, argv=None):
-        """Run :program:`celery` using `argv`.  Uses :data:`sys.argv`
-        if `argv` is not specified."""
         return instantiate("celery.bin.celery:CeleryCommand", app=self) \
                     .execute_from_commandline(argv)
 
     def worker_main(self, argv=None):
-        """Run :program:`celeryd` using `argv`.  Uses :data:`sys.argv`
-        if `argv` is not specified."""
         return instantiate("celery.bin.celeryd:WorkerCommand", app=self) \
                     .execute_from_commandline(argv)
 
     def task(self, *args, **options):
-        """Decorator to create a task class out of any callable.
-
-        **Examples:**
-
-        .. code-block:: python
-
-            @task
-            def refresh_feed(url):
-                return ...
-
-        with setting extra options:
-
-        .. code-block:: python
-
-            @task(exchange="feeds")
-            def refresh_feed(url):
-                return ...
-
-        .. admonition:: App Binding
-
-            For custom apps the task decorator returns proxy
-            objects, so that the act of creating the task is not performed
-            until the task is used or the task registry is accessed.
-
-            If you are depending on binding to be deferred, then you must
-            not access any attributes on the returned object until the
-            application is fully set up (finalized).
-
-        """
+        """Creates new task class from any callable."""
 
         def inner_create_task_cls(**options):
 
@@ -186,8 +136,6 @@ class Celery(object):
         return task
 
     def finalize(self):
-        """Finalizes the app by loading built-in tasks,
-        and evaluating pending task decorators."""
         if not self.finalized:
             load_builtin_tasks(self)
 
@@ -197,49 +145,20 @@ class Celery(object):
             self.finalized = True
 
     def config_from_object(self, obj, silent=False):
-        """Read configuration from object, where object is either
-        a object, or the name of a module to import.
-
-            >>> celery.config_from_object("myapp.celeryconfig")
-
-            >>> from myapp import celeryconfig
-            >>> celery.config_from_object(celeryconfig)
-
-        """
         del(self.conf)
         return self.loader.config_from_object(obj, silent=silent)
 
     def config_from_envvar(self, variable_name, silent=False):
-        """Read configuration from environment variable.
-
-        The value of the environment variable must be the name
-        of a module to import.
-
-            >>> os.environ["CELERY_CONFIG_MODULE"] = "myapp.celeryconfig"
-            >>> celery.config_from_envvar("CELERY_CONFIG_MODULE")
-
-        """
         del(self.conf)
         return self.loader.config_from_envvar(variable_name, silent=silent)
 
     def config_from_cmdline(self, argv, namespace="celery"):
-        """Read configuration from argv."""
         self.conf.update(self.loader.cmdline_config_parser(argv, namespace))
 
     def send_task(self, name, args=None, kwargs=None, countdown=None,
             eta=None, task_id=None, publisher=None, connection=None,
             connect_timeout=None, result_cls=None, expires=None,
             queues=None, **options):
-        """Send task by name.
-
-        :param name: Name of task to execute (e.g. `"tasks.add"`).
-        :keyword result_cls: Specify custom result class. Default is
-            using :meth:`AsyncResult`.
-
-        Supports the same arguments as
-        :meth:`~celery.app.task.BaseTask.apply_async`.
-
-        """
         if self.conf.CELERY_ALWAYS_EAGER:
             warnings.warn(AlwaysEagerIgnored(
                 "CELERY_ALWAYS_EAGER has no effect on send_task"))
@@ -270,23 +189,6 @@ class Celery(object):
             password=None, virtual_host=None, port=None, ssl=None,
             insist=None, connect_timeout=None, transport=None,
             transport_options=None, **kwargs):
-        """Establish a connection to the message broker.
-
-        :keyword hostname: defaults to the :setting:`BROKER_HOST` setting.
-        :keyword userid: defaults to the :setting:`BROKER_USER` setting.
-        :keyword password: defaults to the :setting:`BROKER_PASSWORD` setting.
-        :keyword virtual_host: defaults to the :setting:`BROKER_VHOST` setting.
-        :keyword port: defaults to the :setting:`BROKER_PORT` setting.
-        :keyword ssl: defaults to the :setting:`BROKER_USE_SSL` setting.
-        :keyword insist: defaults to the :setting:`BROKER_INSIST` setting.
-        :keyword connect_timeout: defaults to the
-            :setting:`BROKER_CONNECTION_TIMEOUT` setting.
-        :keyword backend_cls: defaults to the :setting:`BROKER_TRANSPORT`
-            setting.
-
-        :returns :class:`kombu.connection.BrokerConnection`:
-
-        """
         conf = self.conf
         return self.amqp.BrokerConnection(
                     hostname or conf.BROKER_HOST,
@@ -304,14 +206,6 @@ class Celery(object):
 
     @contextmanager
     def default_connection(self, connection=None, connect_timeout=None):
-        """For use within a with-statement to get a connection from the pool
-        if one is not already provided.
-
-        :keyword connection: If not provided, then a connection will be
-                             acquired from the connection pool.
-        :keyword connect_timeout: *No longer used.*
-
-        """
         if connection:
             yield connection
         else:
@@ -349,7 +243,6 @@ class Celery(object):
         return self.loader.now(utc=self.conf.CELERY_ENABLE_UTC)
 
     def mail_admins(self, subject, body, fail_silently=False):
-        """Send an email to the admins in the :setting:`ADMINS` setting."""
         if self.conf.ADMINS:
             to = [admin_email for _, admin_email in self.conf.ADMINS]
             return self.loader.mail_admins(subject, body, fail_silently, to=to,
@@ -372,8 +265,6 @@ class Celery(object):
         return first(None, values) or self.conf.get(default_key)
 
     def bugreport(self):
-        """Returns a string with information useful for the Celery core
-        developers when reporting a bug."""
         return bugreport(self)
 
     def _get_backend(self):

+ 36 - 60
celery/apps/worker.py

@@ -8,6 +8,8 @@ import socket
 import sys
 import warnings
 
+from functools import partial
+
 from billiard import cpu_count, current_process
 
 from celery import __version__, platforms, signals
@@ -256,66 +258,43 @@ class Worker(configurated):
         sys.exit(exitcode)
 
 
-def install_worker_int_handler(worker):
-
-    def _stop(signum, frame):
-        process_name = current_process()._name
-        if not process_name or process_name == "MainProcess":
-            print("celeryd: Hitting Ctrl+C again will terminate "
-                  "all running tasks!")
-            install_worker_int_again_handler(worker)
-            print("celeryd: Warm shutdown (%s)" % (process_name, ))
-            worker.stop(in_sighandler=True)
-        raise SystemExit()
-
-    platforms.signals["SIGINT"] = _stop
-
-
-def install_worker_int_again_handler(worker):
-
-    def _stop(signum, frame):
-        process_name = current_process()._name
-        if not process_name or process_name == "MainProcess":
-            print("celeryd: Cold shutdown (%s)" % (process_name, ))
-            worker.terminate(in_sighandler=True)
-        raise SystemTerminate()
-
-    platforms.signals["SIGINT"] = _stop
-
-
-def install_worker_term_handler(worker):
+def _shutdown_handler(worker, sig="TERM", how="stop", exc=SystemExit,
+        callback=None):
+    types = {"terminate": "Cold", "stop": "Warm"}
 
-    def _stop(signum, frame):
+    def _handle_request(signum, frame):
         process_name = current_process()._name
         if not process_name or process_name == "MainProcess":
-            print("celeryd: Warm shutdown (%s)" % (process_name, ))
-            worker.stop(in_sighandler=True)
-        raise SystemExit()
-
-    platforms.signals["SIGTERM"] = _stop
-
-
-def install_worker_term_hard_handler(worker):
-
-    def _stop(signum, frame):
-        process_name = current_process()._name
-        if not process_name or process_name == "MainProcess":
-            print("celeryd: Cold shutdown (%s)" % (process_name, ))
-            worker.terminate(in_sighandler=True)
-        raise SystemTerminate()
-
-    platforms.signals["SIGQUIT"] = _stop
-
-
-def install_worker_restart_handler(worker):
+            if callback:
+                callback(worker)
+            print("celeryd: %s shutdown (%s)" % (types[how], process_name, ))
+            getattr(worker, how)(in_sighandler=True)
+        raise exc()
+    _handle_request.__name__ = "worker_" + how
+    platforms.signals[sig] = _handle_request
+install_worker_term_handler = partial(
+    _shutdown_handler, sig="SIGTERM", how="stop", exc=SystemExit,
+)
+install_worker_term_hard_handler = partial(
+    _shutdown_handler, sig="SIGQUIT", how="terminate", exc=SystemTerminate,
+)
+
+def on_SIGINT(worker):
+    print("celeryd: Hitting Ctrl+C again will terminate all running tasks!")
+    install_worker_term_hard_handler(worker, sig="SIGINT")
+install_worker_int_handler = partial(
+    _shutdown_handler, sig="SIGINT", callback=on_SIGINT
+)
+
+
+def install_worker_restart_handler(worker, sig="SIGHUP"):
 
     def restart_worker_sig_handler(signum, frame):
         """Signal handler restarting the current python program."""
         print("Restarting celeryd (%s)" % (" ".join(sys.argv), ))
         worker.stop(in_sighandler=True)
         os.execv(sys.executable, [sys.executable] + sys.argv)
-
-    platforms.signals["SIGHUP"] = restart_worker_sig_handler
+    platforms.signals[sig] = restart_worker_sig_handler
 
 
 def install_cry_handler():
@@ -327,25 +306,22 @@ def install_cry_handler():
         def cry_handler(signum, frame):
             """Signal handler logging the stacktrace of all active threads."""
             logger.error("\n" + cry())
-
         platforms.signals["SIGUSR1"] = cry_handler
 
 
-def install_rdb_handler(envvar="CELERY_RDBSIG"):  # pragma: no cover
+def install_rdb_handler(envvar="CELERY_RDBSIG", sig="SIGUSR2"):
 
     def rdb_handler(signum, frame):
         """Signal handler setting a rdb breakpoint at the current frame."""
         from celery.contrib import rdb
         rdb.set_trace(frame)
-
     if os.environ.get(envvar):
-        platforms.signals["SIGUSR2"] = rdb_handler
+        platforms.signals[sig] = rdb_handler
 
 
-def install_HUP_not_supported_handler(worker):
+def install_HUP_not_supported_handler(worker, sig="SIGHUP"):
 
     def warn_on_HUP_handler(signum, frame):
-        logger.error("SIGHUP not supported: "
-            "Restarting with HUP is unstable on this platform!")
-
-    platforms.signals["SIGHUP"] = warn_on_HUP_handler
+        logger.error("%(sig)s not supported: Restarting with %(sig)s is "
+            "unstable on this platform!" % {"sig": sig})
+    platforms.signals[sig] = warn_on_HUP_handler

+ 32 - 1
celery/backends/base.py

@@ -214,10 +214,11 @@ class BaseBackend(object):
     def on_chord_part_return(self, task, propagate=False):
         pass
 
-    def on_chord_apply(self, setid, body, result=None, **kwargs):
+    def fallback_chord_unlock(self, setid, body, result=None, **kwargs):
         kwargs["result"] = [r.id for r in result]
         self.app.tasks["celery.chord_unlock"].apply_async((setid, body, ),
                                                           kwargs, countdown=1)
+    on_chord_apply = fallback_chord_unlock
 
     def current_task_children(self):
         current = current_task()
@@ -321,6 +322,7 @@ class KeyValueStoreBackend(BaseDictBackend):
     task_keyprefix = ensure_bytes("celery-task-meta-")
     taskset_keyprefix = ensure_bytes("celery-taskset-meta-")
     chord_keyprefix = ensure_bytes("chord-unlock-")
+    implements_incr = False
 
     def get(self, key):
         raise NotImplementedError("Must implement the get method.")
@@ -334,6 +336,12 @@ class KeyValueStoreBackend(BaseDictBackend):
     def delete(self, key):
         raise NotImplementedError("Must implement the delete method")
 
+    def incr(self, key):
+        raise NotImplementedError("Does not implement incr")
+
+    def expire(self, key, value):
+        pass
+
     def get_key_for_task(self, task_id):
         """Get the cache key for a task by id."""
         return self.task_keyprefix + ensure_bytes(task_id)
@@ -430,6 +438,29 @@ class KeyValueStoreBackend(BaseDictBackend):
                 return {"result": from_serializable(result)}
             return meta
 
+    def on_chord_apply(self, setid, body, result=None, **kwargs):
+        if self.implements_incr:
+            self.app.TaskSetResult(setid, result).save()
+        else:
+            self.fallback_chord_unlock(setid, body, result, **kwargs)
+
+    def on_chord_part_return(self, task, propagate=False):
+        if not self.implements_incr:
+            return
+        from celery import subtask
+        from celery.result import TaskSetResult
+        setid = task.request.taskset
+        if not setid:
+            return
+        key = self.get_key_for_chord(setid)
+        deps = TaskSetResult.restore(setid, backend=task.backend)
+        if self.incr(key) >= deps.total:
+            subtask(task.request.chord).delay(deps.join(propagate=propagate))
+            deps.delete()
+            self.client.delete(key)
+        else:
+            self.expire(key, 86400)
+
 
 class DisabledBackend(BaseBackend):
     _cache = {}   # need this attribute to reset cache in tests.

+ 5 - 15
celery/backends/cache.py

@@ -67,6 +67,7 @@ backends = {"memcache": lambda: get_best_memcache,
 class CacheBackend(KeyValueStoreBackend):
     servers = None
     supports_native_join = True
+    implements_incr = True
 
     def __init__(self, expires=None, backend=None, options={}, **kwargs):
         super(CacheBackend, self).__init__(self, **kwargs)
@@ -100,21 +101,10 @@ class CacheBackend(KeyValueStoreBackend):
         return self.client.delete(key)
 
     def on_chord_apply(self, setid, body, result=None, **kwargs):
-        key = self.get_key_for_chord(setid)
-        self.client.set(key, '0', time=86400)
-
-    def on_chord_part_return(self, task, propagate=False):
-        from celery import subtask
-        from celery.result import TaskSetResult
-        setid = task.request.taskset
-        if not setid:
-            return
-        key = self.get_key_for_chord(setid)
-        deps = TaskSetResult.restore(setid, backend=task.backend)
-        if self.client.incr(key) >= deps.total:
-            subtask(task.request.chord).delay(deps.join(propagate=propagate))
-            deps.delete()
-            self.client.delete(key)
+        self.client.set(self.get_key_for_chord(setid), '0', time=86400)
+
+    def incr(self, key):
+        return self.client.incr(key)
 
     @cached_property
     def client(self):

+ 7 - 15
celery/backends/redis.py

@@ -38,6 +38,7 @@ class RedisBackend(KeyValueStoreBackend):
     max_connections = None
 
     supports_native_join = True
+    implements_incr = True
 
     def __init__(self, host=None, port=None, db=None, password=None,
             expires=None, max_connections=None, url=None, **kwargs):
@@ -87,24 +88,15 @@ class RedisBackend(KeyValueStoreBackend):
     def delete(self, key):
         self.client.delete(key)
 
+    def incr(self, key):
+        return self.client.incr(key)
+
+    def expire(self, key, value):
+        return self.client.expire(key, value)
+
     def on_chord_apply(self, setid, body, result=None, **kwargs):
         self.app.TaskSetResult(setid, result).save()
 
-    def on_chord_part_return(self, task, propagate=False):
-        from celery import subtask
-        from celery.result import TaskSetResult
-        setid = task.request.taskset
-        if not setid:
-            return
-        key = self.get_key_for_chord(setid)
-        deps = TaskSetResult.restore(setid, backend=task.backend)
-        if self.client.incr(key) >= deps.total:
-            subtask(task.request.chord).delay(deps.join(propagate=propagate))
-            deps.delete()
-            self.client.delete(key)
-        else:
-            self.client.expire(key, 86400)
-
     @cached_property
     def client(self):
         pool = self.redis.ConnectionPool(host=self.host, port=self.port,

+ 8 - 8
celery/canvas.py

@@ -137,21 +137,21 @@ class Signature(dict):
         args, kwargs, options = self._merge(args, kwargs, options)
         return self.type.apply_async(args, kwargs, **options)
 
+    def append_to_list_option(self, key, value):
+        items = self.options.setdefault(key, [])
+        if value not in items:
+            items.append(value)
+        return value
+
     def link(self, callback):
         """Add a callback task to be applied if this task
         executes successfully."""
-        linked = self.options.setdefault("link", [])
-        if callback not in linked:
-            linked.append(callback)
-        return callback
+        return self.append_to_list_option("link", callback)
 
     def link_error(self, errback):
         """Add a callback task to be applied if an error occurs
         while executing this task."""
-        linked = self.options.setdefault("link_error", [])
-        if errback not in linked:
-            linked.append(errback)
-        return errback
+        return self.append_to_list_option("link_error", errback)
 
     def flatten_links(self):
         """Gives a recursive list of dependencies (unchain if you will,

+ 2 - 13
celery/concurrency/eventlet.py

@@ -35,18 +35,8 @@ class Schedule(timer2.Schedule):
         self._spawn_after = spawn_after
         self._queue = set()
 
-    def enter(self, entry, eta=None, priority=0):
-        try:
-            eta = timer2.to_timestamp(eta)
-        except OverflowError:
-            if not self.handle_error(sys.exc_info()):
-                raise
-
-        now = time()
-        if eta is None:
-            eta = now
-        secs = max(eta - now, 0)
-
+    def _enter(self, eta, priority, entry):
+        secs = max(eta - time(), 0)
         g = self._spawn_after(secs, entry)
         self._queue.add(g)
         g.link(self._entry_exit, entry)
@@ -54,7 +44,6 @@ class Schedule(timer2.Schedule):
         g.eta = eta
         g.priority = priority
         g.cancelled = False
-
         return g
 
     def _entry_exit(self, g, entry):

+ 2 - 13
celery/concurrency/gevent.py

@@ -30,18 +30,8 @@ class Schedule(timer2.Schedule):
         super(Schedule, self).__init__(*args, **kwargs)
         self._queue = set()
 
-    def enter(self, entry, eta=None, priority=0):
-        try:
-            eta = timer2.to_timestamp(eta)
-        except OverflowError:
-            if not self.handle_error(sys.exc_info()):
-                raise
-
-        now = time()
-        if eta is None:
-            eta = now
-        secs = max(eta - now, 0)
-
+    def _enter(self, eta, priority, entry):
+        secs = max(eta - time(), 0)
         g = self._Greenlet.spawn_later(secs, entry)
         self._queue.add(g)
         g.link(self._entry_exit)
@@ -49,7 +39,6 @@ class Schedule(timer2.Schedule):
         g.eta = eta
         g.priority = priority
         g.cancelled = False
-
         return g
 
     def _entry_exit(self, g):

+ 7 - 10
celery/concurrency/processes/_win.py

@@ -39,22 +39,19 @@ def CreateToolhelp32Snapshot(dwFlags=2, th32ProcessID=0):
     return hSnapshot
 
 
-def Process32First(hSnapshot):
-    pe = PROCESSENTRY32()
-    pe.dwSize = sizeof(PROCESSENTRY32)
-    success = windll.kernel32.Process32First(hSnapshot, byref(pe))
-    if not success:
-        if windll.kernel32.GetLastError() == ERROR_NO_MORE_FILES:
-            return
-        raise WinError()
-    return pe
+def Process32First(hSnapshot, pe=None):
+    return _Process32n(windll.kernel32.Process32First, hSnapshot, pe)
 
 
 def Process32Next(hSnapshot, pe=None):
+    return _Process32n(windll.kernel32.Process32Next, hSnapshot, pe)
+
+
+def _Process32n(fun, hSnapshot, pe=None):
     if pe is None:
         pe = PROCESSENTRY32()
     pe.dwSize = sizeof(PROCESSENTRY32)
-    success = windll.kernel32.Process32Next(hSnapshot, byref(pe))
+    success = fun(hSnapshot, byref(pe))
     if not success:
         if windll.kernel32.GetLastError() == ERROR_NO_MORE_FILES:
             return

+ 12 - 4
celery/platforms.py

@@ -20,6 +20,8 @@ import shlex
 import signal as _signal
 import sys
 
+from contextlib import contextmanager
+
 from .local import try_import
 
 from billiard import current_process
@@ -244,11 +246,8 @@ class DaemonContext(object):
             os.umask(self.umask)
 
             for fd in reversed(range(get_fdmax(default=2048))):
-                try:
+                with ignore_EBADF():
                     os.close(fd)
-                except OSError, exc:
-                    if exc.errno != errno.EBADF:
-                        raise
 
             os.open(DAEMON_REDIRECT_TO, os.O_RDWR)
             os.dup2(0, 1)
@@ -610,3 +609,12 @@ def shellsplit(s, posix=True):
     lexer.whitespace_split = True
     lexer.commenters = ''
     return list(lexer)
+
+
+@contextmanager
+def ignore_EBADF():
+    try:
+        yield
+    except OSError, exc:
+        if exc.errno != errno.EBADF:
+            raise

+ 4 - 14
celery/security/certificate.py

@@ -3,26 +3,19 @@ from __future__ import with_statement
 
 import glob
 import os
-import sys
-
-try:
-    from OpenSSL import crypto
-except ImportError:
-    crypto = None  # noqa
 
 from celery.exceptions import SecurityError
 
+from .utils import crypto, reraise_errors
+
 
 class Certificate(object):
     """X.509 certificate."""
 
     def __init__(self, cert):
         assert crypto is not None
-        try:
+        with reraise_errors("Invalid certificate: %r"):
             self._cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
-        except crypto.Error, exc:
-            raise SecurityError, SecurityError(
-                    "Invalid certificate: %r" % (exc, )), sys.exc_info()[2]
 
     def has_expired(self):
         """Check if the certificate has expired."""
@@ -43,11 +36,8 @@ class Certificate(object):
 
     def verify(self, data, signature, digest):
         """Verifies the signature for string containing data."""
-        try:
+        with reraise_errors("Bad signature: %r"):
             crypto.verify(self._cert, signature, data, digest)
-        except crypto.Error, exc:
-            raise SecurityError, SecurityError(
-                    "Bad signature: %r" % (exc, )), sys.exc_info()[2]
 
 
 class CertStore(object):

+ 4 - 17
celery/security/key.py

@@ -1,29 +1,16 @@
 from __future__ import absolute_import
+from __future__ import with_statement
 
-import sys
-
-try:
-    from OpenSSL import crypto
-except ImportError:
-    crypto = None  # noqa
-
-from celery.exceptions import SecurityError
+from .utils import crypto, reraise_errors
 
 
 class PrivateKey(object):
 
     def __init__(self, key):
-        assert crypto is not None
-        try:
+        with reraise_errors("Invalid private key: %r"):
             self._key = crypto.load_privatekey(crypto.FILETYPE_PEM, key)
-        except crypto.Error, exc:
-            raise SecurityError, SecurityError(
-                    "Invalid private key: %r" % (exc, )), sys.exc_info()[2]
 
     def sign(self, data, digest):
         """sign string containing data."""
-        try:
+        with reraise_errors("Unable to sign data: %r"):
             return crypto.sign(self._key, data, digest)
-        except crypto.Error, exc:
-            raise SecurityError, SecurityError(
-                    "Unable to sign data: %r" % (exc, )), sys.exc_info()[2]

+ 4 - 14
celery/security/serialization.py

@@ -1,15 +1,13 @@
 from __future__ import absolute_import
 
 import base64
-import sys
 
 from kombu.serialization import registry, encode, decode
 from kombu.utils.encoding import bytes_to_str, str_to_bytes
 
-from celery.exceptions import SecurityError
-
 from .certificate import Certificate, FSCertStore
 from .key import PrivateKey
+from .utils import reraise_errors
 
 
 def b64encode(s):
@@ -34,7 +32,7 @@ class SecureSerializer(object):
         """serialize data structure into string"""
         assert self._key is not None
         assert self._cert is not None
-        try:
+        with reraise_errors("Unable to serialize: %r", (Exception, )):
             content_type, content_encoding, body = encode(
                     data, serializer=self._serializer)
             # What we sign is the serialized body, not the body itself.
@@ -44,24 +42,16 @@ class SecureSerializer(object):
             return self._pack(body, content_type, content_encoding,
                               signature=self._key.sign(body, self._digest),
                               signer=self._cert.get_id())
-        except Exception, exc:
-            raise SecurityError, SecurityError(
-                    "Unable to serialize: %r" % (exc, )), sys.exc_info()[2]
 
     def deserialize(self, data):
         """deserialize data structure from string"""
         assert self._cert_store is not None
-        try:
+        with reraise_errors("Unable to deserialize: %r", (Exception, )):
             payload = self._unpack(data)
             signature, signer, body = (payload["signature"],
                                        payload["signer"],
                                        payload["body"])
-            self._cert_store[signer].verify(body,
-                                            signature, self._digest)
-        except Exception, exc:
-            raise SecurityError, SecurityError(
-                    "Unable to deserialize: %r" % (exc, )), sys.exc_info()[2]
-
+            self._cert_store[signer].verify(body, signature, self._digest)
         return decode(body, payload["content_type"],
                             payload["content_encoding"], force=True)
 

+ 22 - 0
celery/security/utils.py

@@ -0,0 +1,22 @@
+from __future__ import absolute_import
+
+import sys
+
+from contextlib import contextmanager
+
+from celery.exceptions import SecurityError
+
+try:
+    from OpenSSL import crypto
+except ImportError:
+    crypto = None  # noqa
+
+
+@contextmanager
+def reraise_errors(msg="%r", errors=None):
+    assert crypto is not None
+    errors = (crypto.Error, ) if errors is None else errors
+    try:
+        yield
+    except errors, exc:
+        raise SecurityError, SecurityError(msg % (exc, )), sys.exc_info()[2]

+ 1 - 17
celery/tests/test_bin/test_celerybeat.py

@@ -15,7 +15,7 @@ from celery.app import app_or_default
 from celery.bin import celerybeat as celerybeat_bin
 from celery.apps import beat as beatapp
 
-from celery.tests.utils import AppCase
+from celery.tests.utils import AppCase, create_pidlock
 
 
 class MockedShelveModule(object):
@@ -130,22 +130,6 @@ class test_Beat(AppCase):
     def test_use_pidfile(self, stdout, stderr):
         from celery import platforms
 
-        class create_pidlock(object):
-            instance = [None]
-
-            def __init__(self, file):
-                self.file = file
-                self.instance[0] = self
-
-            def acquire(self):
-                self.acquired = True
-
-                class Object(object):
-                    def release(self):
-                        pass
-
-                return Object()
-
         prev, platforms.create_pidlock = platforms.create_pidlock, \
                                          create_pidlock
         try:

+ 8 - 21
celery/tests/test_bin/test_celeryd.py

@@ -20,8 +20,11 @@ from celery.apps import worker as cd
 from celery.bin.celeryd import WorkerCommand, main as celeryd_main
 from celery.exceptions import ImproperlyConfigured, SystemTerminate
 
-from celery.tests.utils import (AppCase, WhateverIO, mask_modules,
-                                reset_modules, skip_unless_module)
+from celery.tests.utils import (
+    AppCase, WhateverIO, mask_modules,
+    reset_modules, skip_unless_module,
+    create_pidlock,
+)
 
 
 from celery.utils.log import ensure_process_aware_logger
@@ -221,22 +224,6 @@ class test_Worker(AppCase):
     def test_use_pidfile(self):
         from celery import platforms
 
-        class create_pidlock(object):
-            instance = [None]
-
-            def __init__(self, file):
-                self.file = file
-                self.instance[0] = self
-
-            def acquire(self):
-                self.acquired = True
-
-                class Object(object):
-                    def release(self):
-                        pass
-
-                return Object()
-
         prev, platforms.create_pidlock = platforms.create_pidlock, \
                                          create_pidlock
         try:
@@ -450,7 +437,7 @@ class test_signal_handlers(AppCase):
         handlers["SIGHUP"]("SIGHUP", object())
 
     @disable_stdouts
-    def test_worker_int_again_handler_only_stop_MainProcess(self):
+    def test_worker_term_hard_handler_only_stop_MainProcess(self):
         try:
             import _multiprocessing
         except ImportError:
@@ -459,9 +446,9 @@ class test_signal_handlers(AppCase):
         name, process.name = process.name, "OtherProcess"
         try:
             worker = self._Worker()
-            handlers = self.psig(cd.install_worker_int_again_handler, worker)
+            handlers = self.psig(cd.install_worker_term_hard_handler, worker)
             with self.assertRaises(SystemExit):
-                handlers["SIGINT"]("SIGINT", object())
+                handlers["SIGQUIT"]("SIGQUIT", object())
             self.assertFalse(worker.terminated)
         finally:
             process.name = name

+ 4 - 0
celery/tests/test_utils/test_timer2.py

@@ -48,10 +48,14 @@ class test_Schedule(Case):
 
         timer2.mktime = _overflow
         try:
+            print("+S1")
             s.enter(timer2.Entry(lambda: None, (), {}),
                     eta=datetime.now())
+            print("-S1")
+            print("+S2")
             s.enter(timer2.Entry(lambda: None, (), {}),
                     eta=None)
+            print("-S2")
             s.on_error = None
             with self.assertRaises(OverflowError):
                 s.enter(timer2.Entry(lambda: None, (), {}),

+ 34 - 31
celery/tests/utils.py

@@ -11,6 +11,7 @@ except AttributeError:
 import importlib
 import logging
 import os
+import platform
 import re
 import sys
 import time
@@ -20,7 +21,7 @@ try:
 except ImportError:  # py3k
     import builtins  # noqa
 
-from functools import wraps
+from functools import partial, wraps
 from contextlib import contextmanager
 
 import mock
@@ -407,25 +408,31 @@ def patch(module, name, mocked):
 
 
 @contextmanager
-def platform_pyimp(replace=None):
-    import platform
-    has_prev = hasattr(platform, "python_implementation")
-    prev = getattr(platform, "python_implementation", None)
-    if replace:
-        platform.python_implementation = replace
+def replace_module_value(module, name, value=None):
+    has_prev = hasattr(module, name)
+    prev = getattr(module, name, None)
+    if value:
+        setattr(module, name, value)
     else:
         try:
-            delattr(platform, "python_implementation")
+            delattr(module, name)
         except AttributeError:
             pass
     yield
     if prev is not None:
-        platform.python_implementation = prev
+        setattr(sys, name, prev)
     if not has_prev:
         try:
-            delattr(platform, "python_implementation")
+            delattr(module, name)
         except AttributeError:
             pass
+pypy_version = partial(
+    replace_module_value, sys, "pypy_version_info",
+)
+platform_pyimp = partial(
+    replace_module_value, platform, "python_implementation",
+)
+
 
 
 @contextmanager
@@ -435,27 +442,6 @@ def sys_platform(value):
     sys.platform = prev
 
 
-@contextmanager
-def pypy_version(value=None):
-    has_prev = hasattr(sys, "pypy_version_info")
-    prev = getattr(sys, "pypy_version_info", None)
-    if value:
-        sys.pypy_version_info = value
-    else:
-        try:
-            delattr(sys, "pypy_version_info")
-        except AttributeError:
-            pass
-    yield
-    if prev is not None:
-        sys.pypy_version_info = prev
-    if not has_prev:
-        try:
-            delattr(sys, "pypy_version_info")
-        except AttributeError:
-            pass
-
-
 @contextmanager
 def reset_modules(*modules):
     prev = dict((k, sys.modules.pop(k)) for k in modules if k in sys.modules)
@@ -476,3 +462,20 @@ def patch_modules(*modules):
             sys.modules.pop(name, None)
         else:
             sys.modules[name] = mod
+
+
+class create_pidlock(object):
+    instance = [None]
+
+    def __init__(self, file):
+        self.file = file
+        self.instance[0] = self
+
+    def acquire(self):
+        self.acquired = True
+
+        class Object(object):
+            def release(self):
+                pass
+
+        return Object()

+ 4 - 6
celery/utils/timer2.py

@@ -99,19 +99,17 @@ class Schedule(object):
         :keyword priority: Unused.
 
         """
-        if eta is None:  # schedule now
+        if eta is None:
             eta = datetime.now()
-
         try:
             eta = to_timestamp(eta)
         except OverflowError:
             if not self.handle_error(sys.exc_info()):
                 raise
+            return
+        return self._enter(eta, priority, entry)
 
-        if eta is None:
-            # schedule now.
-            eta = time()
-
+    def _enter(self, eta, priority, entry):
         heapq.heappush(self._queue, (eta, priority, entry))
         return entry
 

+ 2 - 4
celery/worker/autoreload.py

@@ -17,6 +17,7 @@ import time
 
 from collections import defaultdict
 
+from celery.platforms import ignore_EBADF
 from celery.utils.imports import module_file
 from celery.utils.log import get_logger
 from celery.utils.threads import bgThread, Event
@@ -136,11 +137,8 @@ class KQueueMonitor(BaseMonitor):
     def stop(self):
         self._kq.close()
         for fd in filter(None, self.filemap.values()):
-            try:
+            with ignore_EBADF:
                 os.close(fd)
-            except OSError, exc:
-                if exc != errno.EBADF:
-                    raise
             self.filemap[fd] = None
         self.filemap.clear()