瀏覽代碼

Merge branch 'master' of github.com:celery/celery

Ask Solem 10 年之前
父節點
當前提交
83ef2fb2ac

+ 3 - 0
CONTRIBUTORS.txt

@@ -180,3 +180,6 @@ Bert Vanderbauwhede, 2014/12/18
 John Anderson, 2014/12/27
 John Anderson, 2014/12/27
 Luke Burden, 2015/01/24
 Luke Burden, 2015/01/24
 Mickaël Penhard, 2015/02/15
 Mickaël Penhard, 2015/02/15
+Mark Parncutt, 2015/02/16
+Samuel Jaillet, 2015/03/24
+Ilya Georgievsky, 2015/03/31

+ 1 - 1
README.rst

@@ -185,7 +185,7 @@ development easier, and sometimes they add important hooks like closing
 database connections at ``fork``.
 database connections at ``fork``.
 
 
 .. _`Django`: http://djangoproject.com/
 .. _`Django`: http://djangoproject.com/
-.. _`Pylons`: http://pylonshq.com/
+.. _`Pylons`: http://www.pylonsproject.org/
 .. _`Flask`: http://flask.pocoo.org/
 .. _`Flask`: http://flask.pocoo.org/
 .. _`web2py`: http://web2py.com/
 .. _`web2py`: http://web2py.com/
 .. _`Bottle`: http://bottlepy.org/
 .. _`Bottle`: http://bottlepy.org/

+ 1 - 1
celery/app/task.py

@@ -319,7 +319,7 @@ class Task(object):
 
 
     def __call__(self, *args, **kwargs):
     def __call__(self, *args, **kwargs):
         _task_stack.push(self)
         _task_stack.push(self)
-        self.push_request()
+        self.push_request(args=args, kwargs=kwargs)
         try:
         try:
             # add self if this is a bound task
             # add self if this is a bound task
             if self.__self__ is not None:
             if self.__self__ is not None:

+ 5 - 3
celery/backends/amqp.py

@@ -231,7 +231,7 @@ class AMQPBackend(BaseBackend):
     def _many_bindings(self, ids):
     def _many_bindings(self, ids):
         return [self._create_binding(task_id) for task_id in ids]
         return [self._create_binding(task_id) for task_id in ids]
 
 
-    def get_many(self, task_ids, timeout=None, no_ack=True,
+    def get_many(self, task_ids, timeout=None, no_ack=True, on_message=None,
                  now=monotonic, getfields=itemgetter('status', 'task_id'),
                  now=monotonic, getfields=itemgetter('status', 'task_id'),
                  READY_STATES=states.READY_STATES,
                  READY_STATES=states.READY_STATES,
                  PROPAGATE_STATES=states.PROPAGATE_STATES, **kwargs):
                  PROPAGATE_STATES=states.PROPAGATE_STATES, **kwargs):
@@ -254,15 +254,17 @@ class AMQPBackend(BaseBackend):
             push_cache = self._cache.__setitem__
             push_cache = self._cache.__setitem__
             decode_result = self.meta_from_decoded
             decode_result = self.meta_from_decoded
 
 
-            def on_message(message):
+            def _on_message(message):
                 body = decode_result(message.decode())
                 body = decode_result(message.decode())
+                if on_message is not None:
+                    on_message(body)
                 state, uid = getfields(body)
                 state, uid = getfields(body)
                 if state in READY_STATES:
                 if state in READY_STATES:
                     push_result(body) \
                     push_result(body) \
                         if uid in task_ids else push_cache(uid, body)
                         if uid in task_ids else push_cache(uid, body)
 
 
             bindings = self._many_bindings(task_ids)
             bindings = self._many_bindings(task_ids)
-            with self.Consumer(channel, bindings, on_message=on_message,
+            with self.Consumer(channel, bindings, on_message=_on_message,
                                accept=self.accept, no_ack=no_ack):
                                accept=self.accept, no_ack=no_ack):
                 wait = conn.drain_events
                 wait = conn.drain_events
                 popleft = results.popleft
                 popleft = results.popleft

+ 54 - 25
celery/backends/mongodb.py

@@ -45,6 +45,7 @@ class Bunch(object):
 
 
 class MongoBackend(BaseBackend):
 class MongoBackend(BaseBackend):
 
 
+    mongo_host = None
     host = 'localhost'
     host = 'localhost'
     port = 27017
     port = 27017
     user = None
     user = None
@@ -75,6 +76,28 @@ class MongoBackend(BaseBackend):
                 'You need to install the pymongo library to use the '
                 'You need to install the pymongo library to use the '
                 'MongoDB backend.')
                 'MongoDB backend.')
 
 
+        self.url = url
+
+        # default options
+        self.options.setdefault('max_pool_size', self.max_pool_size)
+        self.options.setdefault('auto_start_request', False)
+
+        # update conf with mongo uri data, only if uri was given
+        if self.url:
+            uri_data = pymongo.uri_parser.parse_uri(self.url)
+            # build the hosts list to create a mongo connection
+            make_host_str = lambda x: "{0}:{1}".format(x[0], x[1])
+            hostslist = map(make_host_str, uri_data['nodelist'])
+            self.user = uri_data['username']
+            self.password = uri_data['password']
+            self.mongo_host = hostslist
+            if uri_data['database']:
+                # if no database is provided in the uri, use default
+                self.database_name = uri_data['database']
+
+            self.options.update(uri_data['options'])
+
+        # update conf with specific settings
         config = self.app.conf.get('CELERY_MONGODB_BACKEND_SETTINGS')
         config = self.app.conf.get('CELERY_MONGODB_BACKEND_SETTINGS')
         if config is not None:
         if config is not None:
             if not isinstance(config, dict):
             if not isinstance(config, dict):
@@ -82,8 +105,13 @@ class MongoBackend(BaseBackend):
                     'MongoDB backend settings should be grouped in a dict')
                     'MongoDB backend settings should be grouped in a dict')
             config = dict(config)  # do not modify original
             config = dict(config)  # do not modify original
 
 
+            if 'host' in config or 'port' in config:
+                # these should take over uri conf
+                self.mongo_host = None
+
             self.host = config.pop('host', self.host)
             self.host = config.pop('host', self.host)
-            self.port = int(config.pop('port', self.port))
+            self.port = config.pop('port', self.port)
+            self.mongo_host = config.pop('mongo_host', self.mongo_host)
             self.user = config.pop('user', self.user)
             self.user = config.pop('user', self.user)
             self.password = config.pop('password', self.password)
             self.password = config.pop('password', self.password)
             self.database_name = config.pop('database', self.database_name)
             self.database_name = config.pop('database', self.database_name)
@@ -94,37 +122,38 @@ class MongoBackend(BaseBackend):
                 'groupmeta_collection', self.groupmeta_collection,
                 'groupmeta_collection', self.groupmeta_collection,
             )
             )
 
 
-            self.options = dict(config, **config.pop('options', None) or {})
-
-            # Set option defaults
-            self.options.setdefault('max_pool_size', self.max_pool_size)
-            self.options.setdefault('auto_start_request', False)
-
-        self.url = url
-        if self.url:
-            # Specifying backend as an URL
-            self.host = self.url
+            self.options.update(config.pop('options', {}))
+            self.options.update(config)
 
 
     def _get_connection(self):
     def _get_connection(self):
         """Connect to the MongoDB server."""
         """Connect to the MongoDB server."""
         if self._connection is None:
         if self._connection is None:
             from pymongo import MongoClient
             from pymongo import MongoClient
 
 
-            # The first pymongo.Connection() argument (host) can be
-            # a list of ['host:port'] elements or a mongodb connection
-            # URI. If this is the case, don't use self.port
-            # but let pymongo get the port(s) from the URI instead.
-            # This enables the use of replica sets and sharding.
-            # See pymongo.Connection() for more info.
-            url = self.host
-            if isinstance(url, string_t) \
-                    and not url.startswith('mongodb://'):
-                url = 'mongodb://{0}:{1}'.format(url, self.port)
-            if url == 'mongodb://':
-                url = url + 'localhost'
+            host = self.mongo_host
+            if not host:
+                # The first pymongo.Connection() argument (host) can be
+                # a list of ['host:port'] elements or a mongodb connection
+                # URI. If this is the case, don't use self.port
+                # but let pymongo get the port(s) from the URI instead.
+                # This enables the use of replica sets and sharding.
+                # See pymongo.Connection() for more info.
+                host = self.host
+                if isinstance(host, string_t) \
+                   and not host.startswith('mongodb://'):
+                    host = 'mongodb://{0}:{1}'.format(host, self.port)
+
+                if host == 'mongodb://':
+                    host += 'localhost'
+
+            # don't change self.options
+            conf = dict(self.options)
+            conf['host'] = host
+
             if detect_environment() != 'default':
             if detect_environment() != 'default':
-                self.options['use_greenlets'] = True
-            self._connection = MongoClient(host=url, **self.options)
+                conf['use_greenlets'] = True
+
+            self._connection = MongoClient(**conf)
 
 
         return self._connection
         return self._connection
 
 

+ 14 - 14
celery/contrib/batches.py

@@ -197,20 +197,20 @@ class Batches(Task):
         flush_buffer = self._do_flush
         flush_buffer = self._do_flush
 
 
         def task_message_handler(message, body, ack, reject, callbacks, **kw):
         def task_message_handler(message, body, ack, reject, callbacks, **kw):
-            if body is None:                                                                                                                        31513 ?        S    125:09 /usr/bin/python -m celery worker --without-heartbeat -c 50 --pool=eventlet -n celery6@ns326150.ip-37-187-158.eu --app=mai
-                body, headers, decoded, utc = (                                                                                                     n -Q rss --without-gossip --logfile=/home/logs/rss.log --pidfile=celery6.pid
-                    message.body, message.headers, False, True,                                                                                     31528 ?        R    128:34 /usr/bin/python -m celery worker --without-heartbeat -c 50 --pool=eventlet -n celery7@ns326150.ip-37-187-158.eu --app=mai
-                )                                                                                                                                   n -Q rss --without-gossip --logfile=/home/logs/rss.log --pidfile=celery7.pid
-                if not body_can_be_buffer:                                                                                                          31543 ?        S    124:32 /usr/bin/python -m celery worker --without-heartbeat -c 50 --pool=eventlet -n celery8@ns326150.ip-37-187-158.eu --app=mai
-                    body = bytes(body) if isinstance(body, buffer_t) else body                                                                      n -Q rss --without-gossip --logfile=/home/logs/rss.log --pidfile=celery8.pid
-            else:                                                                                                                                   26150 ?        S      0:50 /usr/bin/python -m celery worker --without-heartbeat -c 2 --pool=eventlet -n engines@ns326150.ip-37-187-158.eu --app=main
-                body, headers, decoded, utc = proto1_to_proto2(message, body)                                                                        -Q engines --without-gossip --logfile=/home/logs/engines.log --pidfile=/home/logs/pid-engines.pid
-                                                                                                                                                    22409 ?        S      0:00 /usr/bin/python -m celery worker --without-heartbeat -c 1 -n elasticsearch_bulk_actions@ns326150.ip-37-187-158.eu --app=m
-            request = Req(                                                                                                                          ain -Q elasticsearch_bulk_actions --without-gossip --logfile=/home/logs/elasticsearch_bulk_actions.log --pidfile=elasticsearch_bulk_actions.pid
-                message,                                                                                                                            22459 ?        S      0:00  \_ /usr/bin/python -m celery worker --without-heartbeat -c 1 -n elasticsearch_bulk_actions@ns326150.ip-37-187-158.eu --a
-                on_ack=ack, on_reject=reject, app=app, hostname=hostname,                                                                           pp=main -Q elasticsearch_bulk_actions --without-gossip --logfile=/home/logs/elasticsearch_bulk_actions.log --pidfile=elasticsearch_bulk_actions.pid
-                eventer=eventer, task=task, connection_errors=connection_errors,                                                                    22419 ?        S      0:00 /usr/bin/python -m celery worker --without-heartbeat -c 1 -n celery@ns326150.ip-37-187-158.eu --app=main -Q elasticsearch
-                body=body, headers=headers, decoded=decoded, utc=utc,                                                                               _bulk_actions --without-gossip --logfile=/home/logs/elasticsearch_bulk_actions.log --pidfile=celery.pid
+            if body is None:
+                body, headers, decoded, utc = (
+                    message.body, message.headers, False, True,
+                )
+                if not body_can_be_buffer:
+                    body = bytes(body) if isinstance(body, buffer_t) else body
+            else:
+                body, headers, decoded, utc = proto1_to_proto2(message, body)
+
+            request = Req(
+                message,
+                on_ack=ack, on_reject=reject, app=app, hostname=hostname,
+                eventer=eventer, task=task, connection_errors=connection_errors,
+                body=body, headers=headers, decoded=decoded, utc=utc,
             )
             )
             put_buffer(request)
             put_buffer(request)
 
 

+ 8 - 1
celery/fixups/django.py

@@ -158,7 +158,14 @@ class DjangoWorkerFixup(object):
         except ImportError:
         except ImportError:
             from django.core.management.base import BaseCommand
             from django.core.management.base import BaseCommand
             cmd = BaseCommand()
             cmd = BaseCommand()
-            cmd.stdout, cmd.stderr = sys.stdout, sys.stderr
+            try:
+                # since django 1.5
+                from django.core.management.base import OutputWrapper
+                cmd.stdout = OutputWrapper(sys.stdout)
+                cmd.stderr = OutputWrapper(sys.stderr)
+            except ImportError:
+                cmd.stdout, cmd.stderr = sys.stdout, sys.stderr
+
             cmd.check()
             cmd.check()
         else:
         else:
             num_errors = get_validation_errors(s, None)
             num_errors = get_validation_errors(s, None)

+ 13 - 6
celery/result.py

@@ -567,7 +567,7 @@ class ResultSet(ResultBase):
                 raise TimeoutError('The operation timed out')
                 raise TimeoutError('The operation timed out')
 
 
     def get(self, timeout=None, propagate=True, interval=0.5,
     def get(self, timeout=None, propagate=True, interval=0.5,
-            callback=None, no_ack=True):
+            callback=None, no_ack=True, on_message=None):
         """See :meth:`join`
         """See :meth:`join`
 
 
         This is here for API compatibility with :class:`AsyncResult`,
         This is here for API compatibility with :class:`AsyncResult`,
@@ -577,10 +577,10 @@ class ResultSet(ResultBase):
         """
         """
         return (self.join_native if self.supports_native_join else self.join)(
         return (self.join_native if self.supports_native_join else self.join)(
             timeout=timeout, propagate=propagate,
             timeout=timeout, propagate=propagate,
-            interval=interval, callback=callback, no_ack=no_ack)
+            interval=interval, callback=callback, no_ack=no_ack, on_message=on_message)
 
 
     def join(self, timeout=None, propagate=True, interval=0.5,
     def join(self, timeout=None, propagate=True, interval=0.5,
-             callback=None, no_ack=True):
+             callback=None, no_ack=True, on_message=None):
         """Gathers the results of all tasks as a list in order.
         """Gathers the results of all tasks as a list in order.
 
 
         .. note::
         .. note::
@@ -632,6 +632,9 @@ class ResultSet(ResultBase):
         time_start = monotonic()
         time_start = monotonic()
         remaining = None
         remaining = None
 
 
+        if on_message is not None:
+            raise Exception('Your backend not suppored on_message callback')
+
         results = []
         results = []
         for result in self.results:
         for result in self.results:
             remaining = None
             remaining = None
@@ -649,7 +652,8 @@ class ResultSet(ResultBase):
                 results.append(value)
                 results.append(value)
         return results
         return results
 
 
-    def iter_native(self, timeout=None, interval=0.5, no_ack=True):
+    def iter_native(self, timeout=None, interval=0.5, no_ack=True,
+                    on_message=None):
         """Backend optimized version of :meth:`iterate`.
         """Backend optimized version of :meth:`iterate`.
 
 
         .. versionadded:: 2.2
         .. versionadded:: 2.2
@@ -667,10 +671,12 @@ class ResultSet(ResultBase):
         return self.backend.get_many(
         return self.backend.get_many(
             set(r.id for r in results),
             set(r.id for r in results),
             timeout=timeout, interval=interval, no_ack=no_ack,
             timeout=timeout, interval=interval, no_ack=no_ack,
+            on_message=on_message,
         )
         )
 
 
     def join_native(self, timeout=None, propagate=True,
     def join_native(self, timeout=None, propagate=True,
-                    interval=0.5, callback=None, no_ack=True):
+                    interval=0.5, callback=None, no_ack=True,
+                    on_message=None):
         """Backend optimized version of :meth:`join`.
         """Backend optimized version of :meth:`join`.
 
 
         .. versionadded:: 2.2
         .. versionadded:: 2.2
@@ -687,7 +693,8 @@ class ResultSet(ResultBase):
             result.id: i for i, result in enumerate(self.results)
             result.id: i for i, result in enumerate(self.results)
         }
         }
         acc = None if callback else [None for _ in range(len(self))]
         acc = None if callback else [None for _ in range(len(self))]
-        for task_id, meta in self.iter_native(timeout, interval, no_ack):
+        for task_id, meta in self.iter_native(timeout, interval, no_ack,
+                                              on_message):
             value = meta['result']
             value = meta['result']
             if propagate and meta['status'] in states.PROPAGATE_STATES:
             if propagate and meta['status'] in states.PROPAGATE_STATES:
                 raise value
                 raise value

+ 147 - 2
celery/schedules.py

@@ -22,12 +22,12 @@ from .five import range, string_t
 from .utils import is_iterable
 from .utils import is_iterable
 from .utils.timeutils import (
 from .utils.timeutils import (
     weekday, maybe_timedelta, remaining, humanize_seconds,
     weekday, maybe_timedelta, remaining, humanize_seconds,
-    timezone, maybe_make_aware, ffwd
+    timezone, maybe_make_aware, ffwd, localize
 )
 )
 from .datastructures import AttributeDict
 from .datastructures import AttributeDict
 
 
 __all__ = ['ParseException', 'schedule', 'crontab', 'crontab_parser',
 __all__ = ['ParseException', 'schedule', 'crontab', 'crontab_parser',
-           'maybe_schedule']
+           'maybe_schedule', 'solar']
 
 
 schedstate = namedtuple('schedstate', ('is_due', 'next'))
 schedstate = namedtuple('schedstate', ('is_due', 'next'))
 
 
@@ -591,3 +591,148 @@ def maybe_schedule(s, relative=False, app=None):
         else:
         else:
             s.app = app
             s.app = app
     return s
     return s
+
+SOLAR_INVALID_LATITUDE = """\
+Argument latitude {lat} is invalid, must be between -90 and 90.\
+"""
+
+SOLAR_INVALID_LONGITUDE = """\
+Argument longitude {lon} is invalid, must be between -180 and 180.\
+"""
+
+SOLAR_INVALID_EVENT = """\
+Argument event \"{event}\" is invalid, must be one of {all_events}.\
+"""
+
+class solar(schedule):
+    """A solar event can be used as the `run_every` value of a
+    :class:`PeriodicTask` to schedule based on certain solar events.
+
+    :param event: Solar event that triggers this task. Available
+        values are: dawn_astronomical, dawn_nautical, dawn_civil,
+        sunrise, solar_noon, sunset, dusk_civil, dusk_nautical,
+        dusk_astronomical
+    :param lat: The latitude of the observer.
+    :param lon: The longitude of the observer.
+    :param nowfun: Function returning the current date and time
+        (class:`~datetime.datetime`).
+    :param app: Celery app instance.
+    """
+
+
+    _all_events = ['dawn_astronomical',
+        'dawn_nautical',
+        'dawn_civil',
+        'sunrise',
+        'solar_noon',
+        'sunset',
+        'dusk_civil',
+        'dusk_nautical',
+        'dusk_astronomical']
+    _horizons = {'dawn_astronomical': '-18',
+        'dawn_nautical': '-12',
+        'dawn_civil': '-6',
+        'sunrise': '-0:34',
+        'solar_noon': '0',
+        'sunset': '-0:34',
+        'dusk_civil': '-6',
+        'dusk_nautical': '-12',
+        'dusk_astronomical': '18'}
+    _methods = {'dawn_astronomical': 'next_rising',
+        'dawn_nautical': 'next_rising',
+        'dawn_civil': 'next_rising',
+        'sunrise': 'next_rising',
+        'solar_noon': 'next_transit',
+        'sunset': 'next_setting',
+        'dusk_civil': 'next_setting',
+        'dusk_nautical': 'next_setting',
+        'dusk_astronomical': 'next_setting'}
+    _use_center_l = {'dawn_astronomical': True,
+        'dawn_nautical': True,
+        'dawn_civil': True,
+        'sunrise': False,
+        'solar_noon': True,
+        'sunset': False,
+        'dusk_civil': True,
+        'dusk_nautical': True,
+        'dusk_astronomical': True}
+
+    def __init__(self, event, lat, lon, nowfun=None, app=None):
+        self.ephem = __import__('ephem')
+        self.event = event
+        self.lat = lat
+        self.lon = lon
+        self.nowfun = nowfun
+        self._app = app
+
+        if event not in self._all_events:
+            raise ValueError(SOLAR_INVALID_EVENT.format(event=event, all_events=', '.join(self._all_events)))
+        if lat < -90 or lat > 90:
+            raise ValueError(SOLAR_INVALID_LATITUDE.format(lat=lat))
+        if lon < -180 or lon > 180:
+            raise ValueError(SOLAR_INVALID_LONGITUDE.format(lon=lon))
+
+        cal = self.ephem.Observer()
+        cal.lat = str(lat)
+        cal.lon = str(lon)
+        cal.elev = 0
+        cal.horizon = self._horizons[event]
+        cal.pressure = 0
+        self.cal = cal
+
+        self.method = self._methods[event]
+        self.use_center = self._use_center_l[event]
+
+    def now(self):
+        return (self.nowfun or self.app.now)()
+
+    def __reduce__(self):
+        return (self.__class__, (self.event,
+                                 self.lat,
+                                 self.lon), None)
+
+    def __repr__(self):
+        return "<solar: " + self.event + " at latitude " + str(self.lat) + ", longitude " + str(self.lon) + ">"
+
+    def remaining_estimate(self, last_run_at):
+        """Returns when the periodic task should run next as a timedelta,
+        or if it shouldn't run today (e.g. the sun does not rise today),
+        returns the time when the next check should take place."""
+        last_run_at = self.maybe_make_aware(last_run_at)
+        last_run_at_utc = localize(last_run_at, timezone.utc)
+        self.cal.date = last_run_at_utc
+        try:
+            next_utc = getattr(self.cal, self.method)(self.ephem.Sun(), start=last_run_at_utc, use_center=self.use_center)
+        except self.ephem.CircumpolarError:
+            """Sun will not rise/set today. Check again tomorrow
+            (specifically, after the next anti-transit)."""
+            next_utc = self.cal.next_antitransit(self.ephem.Sun()) + timedelta(minutes=1)
+        next = self.maybe_make_aware(next_utc.datetime())
+        now = self.maybe_make_aware(self.now())
+        delta = next - now
+        return delta
+
+    def is_due(self, last_run_at):
+        """Returns tuple of two items `(is_due, next_time_to_run)`,
+        where next time to run is in seconds.
+
+        See :meth:`celery.schedules.schedule.is_due` for more information.
+
+        """
+        rem_delta = self.remaining_estimate(last_run_at)
+        rem = max(rem_delta.total_seconds(), 0)
+        due = rem == 0
+        if due:
+            rem_delta = self.remaining_estimate(self.now())
+            rem = max(rem_delta.total_seconds(), 0)
+        return schedstate(due, rem)
+
+    def __eq__(self, other):
+        if isinstance(other, solar):
+            return (other.event == self.event and
+                    other.lat == self.lat and
+                    other.lon == self.lon)
+        return NotImplemented
+
+    def __ne__(self, other):
+        return not self.__eq__(other)

+ 30 - 0
celery/tests/backends/test_amqp.py

@@ -294,6 +294,36 @@ class test_AMQPBackend(AppCase):
             b.store_result(tids[0], i, states.PENDING)
             b.store_result(tids[0], i, states.PENDING)
             list(b.get_many(tids, timeout=0.01))
             list(b.get_many(tids, timeout=0.01))
 
 
+    def test_get_many_on_message(self):
+        b = self.create_backend(max_cached_results=10)
+
+        tids = []
+        for i in range(10):
+            tid = uuid()
+            b.store_result(tid, '', states.PENDING)
+            b.store_result(tid, 'comment_%i_1' % i, states.STARTED)
+            b.store_result(tid, 'comment_%i_2' % i, states.STARTED)
+            b.store_result(tid, 'final result %i' % i, states.SUCCESS)
+            tids.append(tid)
+
+
+        expected_messages = {}
+        for i, _tid in enumerate(tids):
+            expected_messages[_tid] = []
+            expected_messages[_tid].append( (states.PENDING, '') )
+            expected_messages[_tid].append( (states.STARTED, 'comment_%i_1' % i) )
+            expected_messages[_tid].append( (states.STARTED, 'comment_%i_2' % i) )
+            expected_messages[_tid].append( (states.SUCCESS, 'final result %i' % i) )
+
+        on_message_results = {}
+        def on_message(body):
+            if not body['task_id'] in on_message_results:
+                on_message_results[body['task_id']] = []
+            on_message_results[body['task_id']].append( (body['status'], body['result']) )
+
+        res = list(b.get_many(tids, timeout=1, on_message=on_message))
+        self.assertEqual(sorted(on_message_results), sorted(expected_messages))
+
     def test_get_many_raises_outer_block(self):
     def test_get_many_raises_outer_block(self):
 
 
         class Backend(AMQPBackend):
         class Backend(AMQPBackend):

+ 57 - 0
celery/tests/backends/test_mongodb.py

@@ -67,6 +67,63 @@ class test_MongoBackend(AppCase):
         self.app.conf.CELERY_MONGODB_BACKEND_SETTINGS = None
         self.app.conf.CELERY_MONGODB_BACKEND_SETTINGS = None
         MongoBackend(app=self.app)
         MongoBackend(app=self.app)
 
 
+    def test_init_with_settings(self):
+        self.app.conf.CELERY_MONGODB_BACKEND_SETTINGS = None
+        # empty settings
+        mb = MongoBackend(app=self.app)
+
+        # uri
+        uri = 'mongodb://localhost:27017'
+        mb = MongoBackend(app=self.app, url=uri)
+        self.assertEqual(mb.mongo_host, ['localhost:27017'])
+        self.assertEqual(mb.options, {'auto_start_request': False,
+                                      'max_pool_size': 10})
+        self.assertEqual(mb.database_name, 'celery')
+
+        # uri with database name
+        uri = 'mongodb://localhost:27017/celerydb'
+        mb = MongoBackend(app=self.app, url=uri)
+        self.assertEqual(mb.database_name, 'celerydb')
+
+        # uri with user, password, database name, replica set
+        uri = ('mongodb://'
+               'celeryuser:celerypassword@'
+               'mongo1.example.com:27017,'
+               'mongo2.example.com:27017,'
+               'mongo3.example.com:27017/'
+               'celerydatabase?replicaSet=rs0')
+        mb = MongoBackend(app=self.app, url=uri)
+        self.assertEqual(mb.mongo_host, ['mongo1.example.com:27017',
+                                         'mongo2.example.com:27017',
+                                         'mongo3.example.com:27017'])
+        self.assertEqual(mb.options, {'auto_start_request': False,
+                                      'max_pool_size': 10,
+                                      'replicaset': 'rs0'})
+        self.assertEqual(mb.user, 'celeryuser')
+        self.assertEqual(mb.password, 'celerypassword')
+        self.assertEqual(mb.database_name, 'celerydatabase')
+
+        # same uri, change some parameters in backend settings
+        self.app.conf.CELERY_MONGODB_BACKEND_SETTINGS = {
+            'replicaset': 'rs1',
+            'user': 'backenduser',
+            'database': 'another_db',
+            'options': {
+                'socketKeepAlive': True,
+            },
+        }
+        mb = MongoBackend(app=self.app, url=uri)
+        self.assertEqual(mb.mongo_host, ['mongo1.example.com:27017',
+                                         'mongo2.example.com:27017',
+                                         'mongo3.example.com:27017'])
+        self.assertEqual(mb.options, {'auto_start_request': False,
+                                      'max_pool_size': 10,
+                                      'replicaset': 'rs1',
+                                      'socketKeepAlive': True})
+        self.assertEqual(mb.user, 'backenduser')
+        self.assertEqual(mb.password, 'celerypassword')
+        self.assertEqual(mb.database_name, 'another_db')
+
     @depends_on_current_app
     @depends_on_current_app
     def test_reduce(self):
     def test_reduce(self):
         x = MongoBackend(app=self.app)
         x = MongoBackend(app=self.app)

+ 3 - 6
celery/worker/request.py

@@ -285,8 +285,8 @@ class Request(object):
         task_ready(self)
         task_ready(self)
         if soft:
         if soft:
             warn('Soft time limit (%ss) exceeded for %s[%s]',
             warn('Soft time limit (%ss) exceeded for %s[%s]',
-                 timeout, self.name, self.id)
-            exc = SoftTimeLimitExceeded(timeout)
+                 soft, self.name, self.id)
+            exc = SoftTimeLimitExceeded(soft)
         else:
         else:
             error('Hard time limit (%ss) exceeded for %s[%s]',
             error('Hard time limit (%ss) exceeded for %s[%s]',
                   timeout, self.name, self.id)
                   timeout, self.name, self.id)
@@ -310,10 +310,7 @@ class Request(object):
         if self.task.acks_late:
         if self.task.acks_late:
             self.acknowledge()
             self.acknowledge()
 
 
-        if self.eventer and self.eventer.enabled:
-            self.send_event(
-                'task-succeeded', result=retval, runtime=runtime,
-            )
+        self.send_event('task-succeeded', result=retval, runtime=runtime)
 
 
     def on_retry(self, exc_info):
     def on_retry(self, exc_info):
         """Handler called if the task should be retried."""
         """Handler called if the task should be retried."""

+ 1 - 2
celery/worker/state.py

@@ -27,8 +27,7 @@ from celery.five import Counter
 
 
 __all__ = ['SOFTWARE_INFO', 'reserved_requests', 'active_requests',
 __all__ = ['SOFTWARE_INFO', 'reserved_requests', 'active_requests',
            'total_count', 'revoked', 'task_reserved', 'maybe_shutdown',
            'total_count', 'revoked', 'task_reserved', 'maybe_shutdown',
-           'task_accepted', 'task_ready', 'task_reserved', 'task_ready',
-           'Persistent']
+           'task_accepted', 'task_reserved', 'task_ready', 'Persistent']
 
 
 #: Worker software/platform information.
 #: Worker software/platform information.
 SOFTWARE_INFO = {'sw_ident': 'py-celery',
 SOFTWARE_INFO = {'sw_ident': 'py-celery',

+ 1 - 0
docs/AUTHORS.txt

@@ -89,6 +89,7 @@ Marcin Kuźmiński <marcin@python-works.com>
 Marcin Lulek <info@webreactor.eu>
 Marcin Lulek <info@webreactor.eu>
 Mark Hellewell <mark.hellewell@gmail.com>
 Mark Hellewell <mark.hellewell@gmail.com>
 Mark Lavin <mlavin@caktusgroup.com>
 Mark Lavin <mlavin@caktusgroup.com>
+Mark Parncutt <me@markparncutt.com>
 Mark Stover <stovenator@gmail.com>
 Mark Stover <stovenator@gmail.com>
 Mark Thurman <mthurman@gmail.com>
 Mark Thurman <mthurman@gmail.com>
 Martin Galpin <m@66laps.com>
 Martin Galpin <m@66laps.com>

+ 1 - 1
docs/configuration.rst

@@ -1061,7 +1061,7 @@ manner using TCP/IP alone, so AMQP defines something called heartbeats
 that's is used both by the client and the broker to detect if
 that's is used both by the client and the broker to detect if
 a connection was closed.
 a connection was closed.
 
 
-Hartbeats are disabled by default.
+Heartbeats are disabled by default.
 
 
 If the heartbeat value is 10 seconds, then
 If the heartbeat value is 10 seconds, then
 the heartbeat will be monitored at the interval specified
 the heartbeat will be monitored at the interval specified

+ 3 - 2
docs/faq.rst

@@ -281,8 +281,9 @@ When using the RabbitMQ (AMQP) and Redis transports it should work
 out of the box.
 out of the box.
 
 
 For other transports the compatibility prefork pool is
 For other transports the compatibility prefork pool is
-used which requires a working POSIX semaphore implementation, and this isn't
-enabled in FreeBSD by default. You have to enable
+used which requires a working POSIX semaphore implementation,
+this is enabled in FreeBSD by default since FreeBSD 8.x.
+For older version of FreeBSD, you have to enable
 POSIX semaphores in the kernel and manually recompile billiard.
 POSIX semaphores in the kernel and manually recompile billiard.
 
 
 Luckily, Viktor Petersson has written a tutorial to get you started with
 Luckily, Viktor Petersson has written a tutorial to get you started with

+ 102 - 0
docs/userguide/periodic-tasks.rst

@@ -269,6 +269,108 @@ The syntax of these crontab expressions are very flexible.  Some examples:
 
 
 See :class:`celery.schedules.crontab` for more documentation.
 See :class:`celery.schedules.crontab` for more documentation.
 
 
+.. _beat-solar:
+
+Solar schedules
+=================
+
+If you have a task that should be executed according to sunrise,
+sunset, dawn or dusk, you can use the
+:class:`~celery.schedules.solar` schedule type:
+
+.. code-block:: python
+
+    from celery.schedules import solar
+
+    CELERYBEAT_SCHEDULE = {
+    	# Executes at sunset in Melbourne
+    	'add-at-melbourne-sunset': {
+    		'task': 'tasks.add',
+    		'schedule': solar('sunset', -37.81753, 144.96715),
+    		'args': (16, 16),
+    	},
+    }
+
+The arguments are simply: ``solar(event, latitude, longitude)``
+
+Be sure to use the correct sign for latitude and longitude:
+
++---------------+-------------------+----------------------+
+| **Sign**      | **Argument**      | **Meaning**          |
++---------------+-------------------+----------------------+
+| ``+``         | ``latitude``      | North                |
++---------------+-------------------+----------------------+
+| ``-``         | ``latitude``      | South                |
++---------------+-------------------+----------------------+
+| ``+``         | ``longitude``     | East                 |
++---------------+-------------------+----------------------+
+| ``-``         | ``longitude``     | West                 |
++---------------+-------------------+----------------------+
+
+Possible event types are:
+
++-----------------------------------------+--------------------------------------------+
+| **Event**                               | **Meaning**                                |
++-----------------------------------------+--------------------------------------------+
+| ``dawn_astronomical``                   | Execute at the moment after which the sky  |
+|                                         | is no longer completely dark. This is when |
+|                                         | the sun is 18 degrees below the horizon.   |
++-----------------------------------------+--------------------------------------------+
+| ``dawn_nautical``                       | Execute when there is enough sunlight for  |
+|                                         | the horizon and some objects to be         |
+|                                         | distinguishable; formally, when the sun is |
+|                                         | 12 degrees below the horizon.              |
++-----------------------------------------+--------------------------------------------+
+| ``dawn_civil``                          | Execute when there is enough light for     |
+|                                         | objects to be distinguishable so that      |
+|                                         | outdoor activities can commence;           |
+|                                         | formally, when the Sun is 6 degrees below  |
+|                                         | the horizon.                               |
++-----------------------------------------+--------------------------------------------+
+| ``sunrise``                             | Execute when the upper edge of the sun     |
+|                                         | appears over the eastern horizon in the    |
+|                                         | morning.                                   |
++-----------------------------------------+--------------------------------------------+
+| ``solar_noon``                          | Execute when the sun is highest above the  |
+|                                         | horizon on that day.                       |
++-----------------------------------------+--------------------------------------------+
+| ``sunset``                              | Execute when the trailing edge of the sun  |
+|                                         | disappears over the western horizon in the |
+|                                         | evening.                                   |
++-----------------------------------------+--------------------------------------------+
+| ``dusk_civil``                          | Execute at the end of civil twilight, when |
+|                                         | objects are still distinguishable and some |
+|                                         | stars and planets are visible. Formally,   |
+|                                         | when the sun is 6 degrees below the        |
+|                                         | horizon.                                   |
++-----------------------------------------+--------------------------------------------+
+| ``dusk_nautical``                       | Execute when the sun is 12 degrees below   |
+|                                         | the horizon. Objects are no longer         |
+|                                         | distinguishable, and the horizon is no     |
+|                                         | longer visible to the naked eye.           |
++-----------------------------------------+--------------------------------------------+
+| ``dusk_astronomical``                   | Execute at the moment after which the sky  |
+|                                         | becomes completely dark; formally, when    |
+|                                         | the sun is 18 degrees below the horizon.   |
++-----------------------------------------+--------------------------------------------+
+
+All solar events are calculated using UTC, and are therefore
+unaffected by your timezone setting.
+
+In polar regions, the sun may not rise or set every day. The scheduler
+is able to handle these cases, i.e. a ``sunrise`` event won't run on a day
+when the sun doesn't rise. The one exception is ``solar_noon``, which is
+formally defined as the moment the sun transits the celestial meridian,
+and will occur every day even if the sun is below the horizon.
+
+Twilight is defined as the period between dawn and sunrise, and between
+sunset and dusk. You can schedule an event according to "twilight"
+depending on your definition of twilight (civil, nautical or astronomical),
+and whether you want the event to take place at the beginning or end
+of twilight, using the appropriate event from the list above.
+
+See :class:`celery.schedules.solar` for more documentation.
+
 .. _beat-starting:
 .. _beat-starting:
 
 
 Starting the Scheduler
 Starting the Scheduler

+ 1 - 1
docs/userguide/tasks.rst

@@ -1164,7 +1164,7 @@ base class for new task types.
         abstract = True
         abstract = True
 
 
         def after_return(self, *args, **kwargs):
         def after_return(self, *args, **kwargs):
-            print('Task returned: {0!r}'.format(self.request)
+            print('Task returned: {0!r}'.format(self.request))
 
 
 
 
     @app.task(base=DebugTask)
     @app.task(base=DebugTask)

+ 5 - 2
extra/generic-init.d/celerybeat

@@ -202,14 +202,17 @@ create_paths () {
     create_default_dir "$CELERYBEAT_PID_DIR"
     create_default_dir "$CELERYBEAT_PID_DIR"
 }
 }
 
 
+is_running() {
+    pid=$1
+    ps $pid > /dev/null 2>&1
+}
 
 
 wait_pid () {
 wait_pid () {
     pid=$1
     pid=$1
     forever=1
     forever=1
     i=0
     i=0
     while [ $forever -gt 0 ]; do
     while [ $forever -gt 0 ]; do
-        kill -0 $pid 1>/dev/null 2>&1
-        if [ $? -eq 1 ]; then
+        if ! is_running $pid; then
             echo "OK"
             echo "OK"
             forever=0
             forever=0
         else
         else