Przeglądaj źródła

Merge branch '3.0'

Conflicts:
	celery/bin/celery.py
	celery/contrib/migrate.py
	celery/platforms.py
Ask Solem 12 lat temu
rodzic
commit
beb99819a8

+ 2 - 1
celery/canvas.py

@@ -281,7 +281,8 @@ class Signature(dict):
 class chain(Signature):
 
     def __init__(self, *tasks, **options):
-        tasks = tasks[0] if len(tasks) == 1 and is_list(tasks[0]) else tasks
+        tasks = (regen(tasks[0]) if len(tasks) == 1 and is_list(tasks[0])
+                else tasks)
         Signature.__init__(
             self, 'celery.chain', (), {'tasks': tasks}, **options
         )

+ 1 - 0
celery/contrib/migrate.py

@@ -242,6 +242,7 @@ def start_filter(app, conn, filter, limit=None, timeout=1.0,
                  consume_from=None, state=None, **kwargs):
     state = state or State()
     queues = prepare_queues(queues)
+    consume_from = [_maybe_queue(app, q) for q in consume_from or queues.keys()]
     if isinstance(tasks, string_t):
         tasks = set(tasks.split(','))
     if tasks is None:

+ 13 - 4
celery/platforms.py

@@ -258,17 +258,26 @@ def _create_pidlock(pidfile):
 
 
 def fileno(f):
-    """Get object fileno, or :const:`None` if not defined."""
-    if isinstance(f, int):
+    if isinstance(f, (int, long)):
         return f
+    return f.fileno()
+
+
+def maybe_fileno(f):
+    """Get object fileno, or :const:`None` if not defined."""
     try:
+<<<<<<< HEAD
         return f.fileno()
     except FILENO_ERRORS:
+=======
+        return fileno(f)
+    except AttributeError:
+>>>>>>> 3.0
         pass
 
 
 def close_open_fds(keep=None):
-    keep = [fileno(f) for f in keep if fileno(f)] if keep else []
+    keep = [maybe_fileno(f) for f in keep if maybe_fileno(f)] if keep else []
     for fd in reversed(range(get_fdmax(default=2048))):
         if fd not in keep:
             with ignore_errno(errno.EBADF):
@@ -304,7 +313,7 @@ class DaemonContext(object):
 
             close_open_fds(self.stdfds)
             for fd in self.stdfds:
-                self.redirect_to_null(fileno(fd))
+                self.redirect_to_null(maybe_fileno(fd))
 
             self._is_open = True
     __enter__ = open

+ 5 - 0
celery/tests/tasks/test_canvas.py

@@ -133,6 +133,11 @@ class test_chain(Case):
         self.assertEqual(res.parent.parent.get(), 8)
         self.assertIsNone(res.parent.parent.parent)
 
+    def test_accepts_generator_argument(self):
+        x = chain(add.s(i) for i in range(10))
+        self.assertTrue(x.tasks[0].type, add)
+        self.assertTrue(x.type)
+
 
 class test_group(Case):
 

+ 1 - 0
celery/worker/consumer.py

@@ -271,6 +271,7 @@ class Consumer(object):
         # They can't be acked anyway, as a delivery tag is specific
         # to the current channel.
         self.timer.clear()
+        state.reserved_requests.clear()
 
     def connect(self):
         """Establish the broker connection.