Ask Solem 10 years ago
parent
commit
0731623271

+ 2 - 2
celery/app/builtins.py

@@ -84,8 +84,8 @@ def add_unlock_chord_task(app):
             ready = deps.ready()
         except Exception as exc:
             raise self.retry(
-                    exc=exc, countdown=interval, max_retries=max_retries,
-                )
+                exc=exc, countdown=interval, max_retries=max_retries,
+            )
         else:
             if not ready:
                 raise self.retry(countdown=interval, max_retries=max_retries)

+ 1 - 1
celery/backends/mongodb.py

@@ -129,7 +129,7 @@ class MongoBackend(BaseBackend):
             if pymongo.version_tuple >= (3, ):
                 return {'maxPoolSize': self.max_pool_size}
             else:  # pragma: no cover
-                return {'max_pool_size': max_pool_size,
+                return {'max_pool_size': self.max_pool_size,
                         'auto_start_request': False}
 
     def _get_connection(self):

+ 1 - 1
celery/bin/base.py

@@ -509,7 +509,7 @@ class Command(object):
         default = opt.default or []
 
         if opt.dest not in acc:
-           acc[opt.dest] = default
+            acc[opt.dest] = default
 
         acc[opt.dest].append(value)
 

+ 5 - 1
celery/contrib/batches.py

@@ -85,6 +85,8 @@ from __future__ import absolute_import
 
 from itertools import count
 
+from kombu.five import buffer_t
+
 from celery.task import Task
 from celery.five import Empty, Queue
 from celery.utils.log import get_logger
@@ -195,6 +197,7 @@ class Batches(Task):
         timer = consumer.timer
         put_buffer = self._buffer.put
         flush_buffer = self._do_flush
+        body_can_be_buffer = consumer.pool.body_can_be_buffer
 
         def task_message_handler(message, body, ack, reject, callbacks, **kw):
             if body is None:
@@ -209,8 +212,9 @@ class Batches(Task):
             request = Req(
                 message,
                 on_ack=ack, on_reject=reject, app=app, hostname=hostname,
-                eventer=eventer, task=task, connection_errors=connection_errors,
+                eventer=eventer, task=task,
                 body=body, headers=headers, decoded=decoded, utc=utc,
+                connection_errors=connection_errors,
             )
             put_buffer(request)
 

+ 3 - 1
celery/result.py

@@ -577,7 +577,9 @@ class ResultSet(ResultBase):
         """
         return (self.join_native if self.supports_native_join else self.join)(
             timeout=timeout, propagate=propagate,
-            interval=interval, callback=callback, no_ack=no_ack, on_message=on_message)
+            interval=interval, callback=callback, no_ack=no_ack,
+            on_message=on_message,
+        )
 
     def join(self, timeout=None, propagate=True, interval=0.5,
              callback=None, no_ack=True, on_message=None):

+ 44 - 27
celery/schedules.py

@@ -47,6 +47,18 @@ CRON_REPR = """\
 {0._orig_day_of_month} {0._orig_month_of_year} (m/h/d/dM/MY)>\
 """
 
+SOLAR_INVALID_LATITUDE = """\
+Argument latitude {lat} is invalid, must be between -90 and 90.\
+"""
+
+SOLAR_INVALID_LONGITUDE = """\
+Argument longitude {lon} is invalid, must be between -180 and 180.\
+"""
+
+SOLAR_INVALID_EVENT = """\
+Argument event "{event}" is invalid, must be one of {all_events}.\
+"""
+
 
 def cronfield(s):
     return '*' if s is None else s
@@ -592,17 +604,6 @@ def maybe_schedule(s, relative=False, app=None):
             s.app = app
     return s
 
-SOLAR_INVALID_LATITUDE = """\
-Argument latitude {lat} is invalid, must be between -90 and 90.\
-"""
-
-SOLAR_INVALID_LONGITUDE = """\
-Argument longitude {lon} is invalid, must be between -180 and 180.\
-"""
-
-SOLAR_INVALID_EVENT = """\
-Argument event \"{event}\" is invalid, must be one of {all_events}.\
-"""
 
 class solar(schedule):
     """A solar event can be used as the `run_every` value of a
@@ -619,8 +620,8 @@ class solar(schedule):
     :param app: Celery app instance.
     """
 
-
-    _all_events = ['dawn_astronomical',
+    _all_events = [
+        'dawn_astronomical',
         'dawn_nautical',
         'dawn_civil',
         'sunrise',
@@ -628,8 +629,10 @@ class solar(schedule):
         'sunset',
         'dusk_civil',
         'dusk_nautical',
-        'dusk_astronomical']
-    _horizons = {'dawn_astronomical': '-18',
+        'dusk_astronomical',
+    ]
+    _horizons = {
+        'dawn_astronomical': '-18',
         'dawn_nautical': '-12',
         'dawn_civil': '-6',
         'sunrise': '-0:34',
@@ -637,8 +640,10 @@ class solar(schedule):
         'sunset': '-0:34',
         'dusk_civil': '-6',
         'dusk_nautical': '-12',
-        'dusk_astronomical': '18'}
-    _methods = {'dawn_astronomical': 'next_rising',
+        'dusk_astronomical': '18',
+    }
+    _methods = {
+        'dawn_astronomical': 'next_rising',
         'dawn_nautical': 'next_rising',
         'dawn_civil': 'next_rising',
         'sunrise': 'next_rising',
@@ -646,8 +651,10 @@ class solar(schedule):
         'sunset': 'next_setting',
         'dusk_civil': 'next_setting',
         'dusk_nautical': 'next_setting',
-        'dusk_astronomical': 'next_setting'}
-    _use_center_l = {'dawn_astronomical': True,
+        'dusk_astronomical': 'next_setting',
+    }
+    _use_center_l = {
+        'dawn_astronomical': True,
         'dawn_nautical': True,
         'dawn_civil': True,
         'sunrise': False,
@@ -655,7 +662,8 @@ class solar(schedule):
         'sunset': False,
         'dusk_civil': True,
         'dusk_nautical': True,
-        'dusk_astronomical': True}
+        'dusk_astronomical': True,
+    }
 
     def __init__(self, event, lat, lon, nowfun=None, app=None):
         self.ephem = __import__('ephem')
@@ -666,7 +674,9 @@ class solar(schedule):
         self._app = app
 
         if event not in self._all_events:
-            raise ValueError(SOLAR_INVALID_EVENT.format(event=event, all_events=', '.join(self._all_events)))
+            raise ValueError(SOLAR_INVALID_EVENT.format(
+                event=event, all_events=', '.join(self._all_events),
+            ))
         if lat < -90 or lat > 90:
             raise ValueError(SOLAR_INVALID_LATITUDE.format(lat=lat))
         if lon < -180 or lon > 180:
@@ -687,12 +697,13 @@ class solar(schedule):
         return (self.nowfun or self.app.now)()
 
     def __reduce__(self):
-        return (self.__class__, (self.event,
-                                 self.lat,
-                                 self.lon), None)
+        return (self.__class__, (
+            self.event, self.lat, self.lon), None)
 
     def __repr__(self):
-        return "<solar: " + self.event + " at latitude " + str(self.lat) + ", longitude " + str(self.lon) + ">"
+        return '<solar: {0} at latitude {1}, longitude: {2}>'.format(
+            self.event, self.lat, self.lon,
+        )
 
     def remaining_estimate(self, last_run_at):
         """Returns when the periodic task should run next as a timedelta,
@@ -702,11 +713,17 @@ class solar(schedule):
         last_run_at_utc = localize(last_run_at, timezone.utc)
         self.cal.date = last_run_at_utc
         try:
-            next_utc = getattr(self.cal, self.method)(self.ephem.Sun(), start=last_run_at_utc, use_center=self.use_center)
+            next_utc = getattr(self.cal, self.method)(
+                self.ephem.Sun(),
+                start=last_run_at_utc, use_center=self.use_center,
+            )
         except self.ephem.CircumpolarError:
             """Sun will not rise/set today. Check again tomorrow
             (specifically, after the next anti-transit)."""
-            next_utc = self.cal.next_antitransit(self.ephem.Sun()) + timedelta(minutes=1)
+            next_utc = (
+                self.cal.next_antitransit(self.ephem.Sun()) +
+                timedelta(minutes=1)
+            )
         next = self.maybe_make_aware(next_utc.datetime())
         now = self.maybe_make_aware(self.now())
         delta = next - now

+ 15 - 7
celery/tests/backends/test_amqp.py

@@ -306,22 +306,30 @@ class test_AMQPBackend(AppCase):
             b.store_result(tid, 'final result %i' % i, states.SUCCESS)
             tids.append(tid)
 
-
         expected_messages = {}
         for i, _tid in enumerate(tids):
             expected_messages[_tid] = []
-            expected_messages[_tid].append( (states.PENDING, '') )
-            expected_messages[_tid].append( (states.STARTED, 'comment_%i_1' % i) )
-            expected_messages[_tid].append( (states.STARTED, 'comment_%i_2' % i) )
-            expected_messages[_tid].append( (states.SUCCESS, 'final result %i' % i) )
+            expected_messages[_tid].append((states.PENDING, ''))
+            expected_messages[_tid].append(
+                (states.STARTED, 'comment_%i_1' % i),
+            )
+            expected_messages[_tid].append(
+                (states.STARTED, 'comment_%i_2' % i),
+            )
+            expected_messages[_tid].append(
+                (states.SUCCESS, 'final result %i' % i),
+            )
 
         on_message_results = {}
+
         def on_message(body):
             if not body['task_id'] in on_message_results:
                 on_message_results[body['task_id']] = []
-            on_message_results[body['task_id']].append( (body['status'], body['result']) )
+            on_message_results[body['task_id']].append(
+                (body['status'], body['result']),
+            )
 
-        res = list(b.get_many(tids, timeout=1, on_message=on_message))
+        list(b.get_many(tids, timeout=1, on_message=on_message))
         self.assertEqual(sorted(on_message_results), sorted(expected_messages))
 
     def test_get_many_raises_outer_block(self):

+ 0 - 1
celery/tests/concurrency/test_prefork.py

@@ -1,7 +1,6 @@
 from __future__ import absolute_import
 
 import errno
-import select
 import socket
 import time
 

+ 1 - 1
celery/tests/fixups/test_django.py

@@ -209,7 +209,7 @@ class test_DjangoWorkerFixup(FixupCase):
             conns[1].close.side_effect = KeyError('already closed')
             f.database_errors = (KeyError, )
 
-            f._db.connections = Mock() # ConnectionHandler
+            f._db.connections = Mock()  # ConnectionHandler
             f._db.connections.all.side_effect = lambda: conns
 
             f._close_database()