Ask Solem преди 15 години
родител
ревизия
bb97451bf6
променени са 7 файла, в които са добавени 69 реда и са изтрити 51 реда
  1. 0 1
      celery/bin/celeryd.py
  2. 6 7
      celery/bin/celeryev.py
  3. 9 5
      celery/concurrency/processes/pool.py
  4. 0 3
      celery/concurrency/threads.py
  5. 12 9
      celery/task/schedules.py
  6. 41 26
      celery/tests/test_task.py
  7. 1 0
      celery/worker/__init__.py

+ 0 - 1
celery/bin/celeryd.py

@@ -245,7 +245,6 @@ class Worker(object):
                                   logfile=self.logfile)
         log.redirect_stdouts_to_logger(logger, loglevel=logging.WARNING)
 
-
     def purge_messages(self):
         discarded_count = discard_all()
         what = discarded_count > 1 and "messages" or "message"

+ 6 - 7
celery/bin/celeryev.py

@@ -20,9 +20,11 @@ from celery.messaging import establish_connection
 from celery.datastructures import LocalCache
 
 TASK_NAMES = LocalCache(0xFFF)
+
 HUMAN_TYPES = {"worker-offline": "shutdown",
                "worker-online": "started",
                "worker-heartbeat": "heartbeat"}
+
 OPTION_LIST = (
     optparse.make_option('-d', '--DUMP',
         action="store_true", dest="dump",
@@ -30,7 +32,6 @@ OPTION_LIST = (
 )
 
 
-
 def humanize_type(type):
     try:
         return HUMAN_TYPES[type.lower()]
@@ -177,7 +178,8 @@ class CursesMonitor(object):
             self.win.addstr(y(), 3, title, curses.A_BOLD | curses.A_UNDERLINE)
             blank_line()
         callback(my, mx, y())
-        self.win.addstr(my - 1, 0, "Press any key to continue...", curses.A_BOLD)
+        self.win.addstr(my - 1, 0, "Press any key to continue...",
+                        curses.A_BOLD)
         self.win.refresh()
         while 1:
             try:
@@ -333,7 +335,8 @@ class CursesMonitor(object):
                     attr = curses.A_NORMAL
                     if task.uuid == self.selected_task:
                         attr = curses.A_STANDOUT
-                    timestamp = datetime.fromtimestamp(task.timestamp or time.time())
+                    timestamp = datetime.fromtimestamp(
+                                    task.timestamp or time.time())
                     timef = timestamp.strftime("%H:%M:%S")
                     line = self.format_row(uuid, task.name,
                                            task.worker.hostname,
@@ -510,9 +513,5 @@ def main():
     options = parse_options(sys.argv[1:])
     return run_celeryev(**vars(options))
 
-
-
-
-
 if __name__ == "__main__":
     main()

+ 9 - 5
celery/concurrency/processes/pool.py

@@ -50,9 +50,11 @@ def mapstar(args):
 # Code run by worker processes
 #
 
+
 def soft_timeout_sighandler(signum, frame):
     raise SoftTimeLimitExceeded()
 
+
 def worker(inqueue, outqueue, ackqueue, initializer=None, initargs=(),
         maxtasks=None):
     assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
@@ -482,14 +484,15 @@ class Pool(object):
                   self._worker_handler, self._task_handler,
                   self._result_handler, self._cache,
                   self._timeout_handler),
-            exitpriority=15
+            exitpriority=15,
             )
 
     def _create_worker_process(self):
         w = self.Process(
             target=worker,
             args=(self._inqueue, self._outqueue, self._ackqueue,
-                    self._initializer, self._initargs, self._maxtasksperchild)
+                    self._initializer, self._initargs,
+                    self._maxtasksperchild),
             )
         self._pool.append(w)
         w.name = w.name.replace('Process', 'PoolWorker')
@@ -551,7 +554,8 @@ class Pool(object):
 
     def imap(self, func, iterable, chunksize=1):
         '''
-        Equivalent of `itertools.imap()` -- can be MUCH slower than `Pool.map()`
+        Equivalent of `itertools.imap()` -- can be MUCH slower
+        than `Pool.map()`
         '''
         assert self._state == RUN
         if chunksize == 1:
@@ -643,8 +647,8 @@ class Pool(object):
 
     def __reduce__(self):
         raise NotImplementedError(
-              'pool objects cannot be passed between processes or pickled'
-              )
+              'pool objects cannot be passed between '
+              'processes or pickled')
 
     def close(self):
         debug('closing pool')

+ 0 - 3
celery/concurrency/threads.py

@@ -66,6 +66,3 @@ class TaskPool(object):
             [errback(ret_value) for errback in errbacks]
         else:
             [callback(ret_value) for callback in callbacks]
-
-
-

+ 12 - 9
celery/task/schedules.py

@@ -1,5 +1,6 @@
 from datetime import datetime
-from pyparsing import Word, Literal, ZeroOrMore, Optional, Group, StringEnd, alphas
+from pyparsing import (Word, Literal, ZeroOrMore, Optional,
+                       Group, StringEnd, alphas)
 
 from celery.utils import is_iterable
 from celery.utils.timeutils import timedelta_seconds, weekday, remaining
@@ -90,7 +91,7 @@ class crontab_parser(object):
     @staticmethod
     def _expand_range(toks):
         if len(toks) > 1:
-            return range(toks[0], int(toks[2])+1)
+            return range(toks[0], int(toks[2]) + 1)
         else:
             return toks[0]
 
@@ -188,15 +189,17 @@ class crontab(schedule):
         elif is_iterable(cronspec):
             result = set(cronspec)
         else:
-            raise TypeError("Argument cronspec needs to be of any of the " + \
-                    "following types: int, basestring, or an iterable type. " + \
+            raise TypeError(
+                    "Argument cronspec needs to be of any of the "
+                    "following types: int, basestring, or an iterable type. "
                     "'%s' was given." % type(cronspec))
 
         # assure the result does not exceed the max
         for number in result:
             if number >= max_:
-                raise ValueError("Invalid crontab pattern. Valid " + \
-                "range is 0-%d. '%d' was found." % (max_, number))
+                raise ValueError(
+                        "Invalid crontab pattern. Valid "
+                        "range is 0-%d. '%d' was found." % (max_, number))
 
         return result
 
@@ -217,7 +220,7 @@ class crontab(schedule):
         last = now - last_run_at
         due, when = False, 1
         if last.days > 0 or last.seconds > 60:
-            due = now.isoweekday() % 7 in self.day_of_week and \
-                  now.hour in self.hour and \
-                  now.minute in self.minute
+            due = (now.isoweekday() % 7 in self.day_of_week and
+                   now.hour in self.hour and
+                   now.minute in self.minute)
         return due, when

+ 41 - 26
celery/tests/test_task.py

@@ -467,7 +467,8 @@ class TestPeriodicTask(unittest.TestCase):
     def test_is_due_not_due(self):
         due, remaining = MyPeriodic().is_due(datetime.now())
         self.assertFalse(due)
-        # TODO: This assertion may fail if executed in the first minute of an hour
+        # TODO This assertion may fail if executed in the
+        # first minute of an hour
         self.assertGreater(remaining, 60)
 
     def test_is_due(self):
@@ -524,25 +525,35 @@ class test_crontab_parser(unittest.TestCase):
         self.assertEquals(crontab_parser(7).parse('*'), set(range(7)))
 
     def test_parse_range(self):
-        self.assertEquals(crontab_parser(60).parse('1-10'), set(range(1,10+1)))
-        self.assertEquals(crontab_parser(24).parse('0-20'), set(range(0,20+1)))
-        self.assertEquals(crontab_parser().parse('2-10'), set(range(2,10+1)))
+        self.assertEquals(crontab_parser(60).parse('1-10'),
+                          set(range(1, 10 + 1)))
+        self.assertEquals(crontab_parser(24).parse('0-20'),
+                          set(range(0, 20 + 1)))
+        self.assertEquals(crontab_parser().parse('2-10'),
+                          set(range(2, 10 + 1)))
 
     def test_parse_groups(self):
-        self.assertEquals(crontab_parser().parse('1,2,3,4'), set([1,2,3,4]))
-        self.assertEquals(crontab_parser().parse('0,15,30,45'), set([0,15,30,45]))
+        self.assertEquals(crontab_parser().parse('1,2,3,4'),
+                          set([1, 2, 3, 4]))
+        self.assertEquals(crontab_parser().parse('0,15,30,45'),
+                          set([0, 15, 30, 45]))
 
     def test_parse_steps(self):
-        self.assertEquals(crontab_parser(8).parse('*/2'), set([0,2,4,6]))
-        self.assertEquals(crontab_parser().parse('*/2'), set([ i*2 for i in xrange(30) ]))
-        self.assertEquals(crontab_parser().parse('*/3'), set([ i*3 for i in xrange(20) ]))
+        self.assertEquals(crontab_parser(8).parse('*/2'),
+                          set([0, 2, 4, 6]))
+        self.assertEquals(crontab_parser().parse('*/2'),
+                          set(i * 2 for i in xrange(30)))
+        self.assertEquals(crontab_parser().parse('*/3'),
+                          set(i * 3 for i in xrange(20)))
 
     def test_parse_composite(self):
-        self.assertEquals(crontab_parser(8).parse('*/2'), set([0,2,4,6]))
+        self.assertEquals(crontab_parser(8).parse('*/2'), set([0, 2, 4, 6]))
         self.assertEquals(crontab_parser().parse('2-9/5'), set([5]))
-        self.assertEquals(crontab_parser().parse('2-10/5'), set([5,10]))
-        self.assertEquals(crontab_parser().parse('2-11/5,3'), set([3,5,10]))
-        self.assertEquals(crontab_parser().parse('2-4/3,*/5,0-21/4'), set([0,3,4,5,8,10,12,15,16,20,25,30,35,40,45,50,55]))
+        self.assertEquals(crontab_parser().parse('2-10/5'), set([5, 10]))
+        self.assertEquals(crontab_parser().parse('2-11/5,3'), set([3, 5, 10]))
+        self.assertEquals(crontab_parser().parse('2-4/3,*/5,0-21/4'),
+                set([0, 3, 4, 5, 8, 10, 12, 15, 16,
+                    20, 25, 30, 35, 40, 45, 50, 55]))
 
     def test_parse_errors_on_empty_string(self):
         self.assertRaises(ParseException, crontab_parser(60).parse, '')
@@ -576,10 +587,10 @@ class test_crontab_is_due(unittest.TestCase):
         self.assertEquals(c.minute, set([30]))
         c = crontab(minute='30')
         self.assertEquals(c.minute, set([30]))
-        c = crontab(minute=(30,40,50))
-        self.assertEquals(c.minute, set([30,40,50]))
-        c = crontab(minute=set([30,40,50]))
-        self.assertEquals(c.minute, set([30,40,50]))
+        c = crontab(minute=(30, 40, 50))
+        self.assertEquals(c.minute, set([30, 40, 50]))
+        c = crontab(minute=set([30, 40, 50]))
+        self.assertEquals(c.minute, set([30, 40, 50]))
 
     def test_crontab_spec_invalid_minute(self):
         self.assertRaises(ValueError, crontab, minute=60)
@@ -590,8 +601,8 @@ class test_crontab_is_due(unittest.TestCase):
         self.assertEquals(c.hour, set([6]))
         c = crontab(hour='5')
         self.assertEquals(c.hour, set([5]))
-        c = crontab(hour=(4,8,12))
-        self.assertEquals(c.hour, set([4,8,12]))
+        c = crontab(hour=(4, 8, 12))
+        self.assertEquals(c.hour, set([4, 8, 12]))
 
     def test_crontab_spec_invalid_hour(self):
         self.assertRaises(ValueError, crontab, hour=24)
@@ -605,11 +616,11 @@ class test_crontab_is_due(unittest.TestCase):
         c = crontab(day_of_week='fri')
         self.assertEquals(c.day_of_week, set([5]))
         c = crontab(day_of_week='tuesday,sunday,fri')
-        self.assertEquals(c.day_of_week, set([0,2,5]))
+        self.assertEquals(c.day_of_week, set([0, 2, 5]))
         c = crontab(day_of_week='mon-fri')
-        self.assertEquals(c.day_of_week, set([1,2,3,4,5]))
+        self.assertEquals(c.day_of_week, set([1, 2, 3, 4, 5]))
         c = crontab(day_of_week='*/2')
-        self.assertEquals(c.day_of_week, set([0,2,4,6]))
+        self.assertEquals(c.day_of_week, set([0, 2, 4, 6]))
 
     def test_crontab_spec_invalid_dow(self):
         self.assertRaises(ValueError, crontab, day_of_week='fooday-barday')
@@ -667,25 +678,29 @@ class test_crontab_is_due(unittest.TestCase):
 
     @patch_crontab_nowfun(QuarterlyPeriodic, datetime(2010, 5, 10, 10, 15))
     def test_first_quarter_execution_is_due(self):
-        due, remaining = QuarterlyPeriodic().is_due(datetime(2010, 5, 10, 6, 30))
+        due, remaining = QuarterlyPeriodic().is_due(
+                            datetime(2010, 5, 10, 6, 30))
         self.assertTrue(due)
         self.assertEquals(remaining, 1)
 
     @patch_crontab_nowfun(QuarterlyPeriodic, datetime(2010, 5, 10, 10, 30))
     def test_second_quarter_execution_is_due(self):
-        due, remaining = QuarterlyPeriodic().is_due(datetime(2010, 5, 10, 6, 30))
+        due, remaining = QuarterlyPeriodic().is_due(
+                            datetime(2010, 5, 10, 6, 30))
         self.assertTrue(due)
         self.assertEquals(remaining, 1)
 
     @patch_crontab_nowfun(QuarterlyPeriodic, datetime(2010, 5, 10, 10, 14))
     def test_first_quarter_execution_is_not_due(self):
-        due, remaining = QuarterlyPeriodic().is_due(datetime(2010, 5, 10, 6, 30))
+        due, remaining = QuarterlyPeriodic().is_due(
+                            datetime(2010, 5, 10, 6, 30))
         self.assertFalse(due)
         self.assertEquals(remaining, 1)
 
     @patch_crontab_nowfun(QuarterlyPeriodic, datetime(2010, 5, 10, 10, 29))
     def test_second_quarter_execution_is_not_due(self):
-        due, remaining = QuarterlyPeriodic().is_due(datetime(2010, 5, 10, 6, 30))
+        due, remaining = QuarterlyPeriodic().is_due(
+                            datetime(2010, 5, 10, 6, 30))
         self.assertFalse(due)
         self.assertEquals(remaining, 1)
 

+ 1 - 0
celery/worker/__init__.py

@@ -192,6 +192,7 @@ class WorkController(object):
                 component.start()
         finally:
             self.stop()
+
     def process_task(self, wrapper):
         """Process task by sending it to the pool of workers."""
         try: