Ask Solem 15 vuotta sitten
vanhempi
commit
5ab8f6f9fd

+ 0 - 2
celery/bin/celeryd.py

@@ -68,14 +68,12 @@ import socket
 import logging
 import optparse
 import warnings
-import traceback
 import multiprocessing
 
 import celery
 from celery import conf
 from celery import signals
 from celery import platform
-from celery.log import emergency_error
 from celery.task import discard_all
 from celery.utils import info
 from celery.utils import get_full_cls_name

+ 1 - 3
celery/bin/celeryd_multi.py

@@ -1,9 +1,7 @@
 import sys
-import shlex
 import socket
 
 from celery.utils.compat import defaultdict
-from carrot.utils import rpartition
 
 EXAMPLES = """
 Some examples:
@@ -203,7 +201,7 @@ class MultiTool(object):
 
         try:
             return self.commands[argv[0]](argv[1:], cmd)
-        except KeyError, exc:
+        except KeyError:
             say("Invalid command: %s" % argv[0])
             self.usage()
             sys.exit(1)

+ 6 - 6
celery/bin/celeryev.py

@@ -1,16 +1,16 @@
 import sys
 import time
 import curses
-import atexit
 import socket
 import optparse
 import threading
 
-from pprint import pformat
 from datetime import datetime
 from textwrap import wrap
 from itertools import count
 
+from carrot.utils import rpartition
+
 import celery
 from celery import states
 from celery.task import control
@@ -117,7 +117,7 @@ class CursesMonitor(object):
                           "L": self.selection_rate_limit}
         self.keymap = dict(default_keymap, **self.keymap)
 
-    def format_row(self, uuid, worker, task, time, state):
+    def format_row(self, uuid, worker, task, timestamp, state):
         my, mx = self.win.getmaxyx()
         mx = mx - 3
         uuid_max = 36
@@ -127,8 +127,8 @@ class CursesMonitor(object):
         worker = abbr(worker, 16).ljust(16)
         task = abbrtask(task, 16).ljust(16)
         state = abbr(state, 8).ljust(8)
-        time = time.ljust(8)
-        row = "%s %s %s %s %s " % (uuid, worker, task, time, state)
+        timestamp = timestamp.ljust(8)
+        row = "%s %s %s %s %s " % (uuid, worker, task, timestamp, state)
         if self.screen_width is None:
             self.screen_width = len(row[:mx])
         return row[:mx]
@@ -473,7 +473,7 @@ def eventtop():
                 conn.connection.drain_events()
             except socket.timeout:
                 pass
-    except Exception, exc:
+    except Exception:
         refresher.shutdown = True
         refresher.join()
         display.resetscreen()

+ 2 - 3
celery/concurrency/processes/pool.py

@@ -189,7 +189,6 @@ class AckHandler(PoolThread):
 
     def run(self):
         debug('ack handler starting')
-        ackqueue = self.ackqueue
         get = self.get
         cache = self.cache
 
@@ -241,7 +240,6 @@ class AckHandler(PoolThread):
 class TimeoutHandler(PoolThread):
 
     def __init__(self, processes, sentinel_event, cache, t_soft, t_hard):
-        self.pool = pool
         self.sentinel_event = sentinel_event
         self.cache = cache
         self.t_soft = t_soft
@@ -946,7 +944,8 @@ class IMapUnorderedIterator(IMapIterator):
 
 class ThreadPool(Pool):
 
-    from multiprocessing.dummy import Process
+    from multiprocessing.dummy import Process as DummyProcess
+    Process = DummyProcess
 
     def __init__(self, processes=None, initializer=None, initargs=()):
         Pool.__init__(self, processes, initializer, initargs)

+ 0 - 2
celery/db/session.py

@@ -1,5 +1,3 @@
-import os
-
 from sqlalchemy import create_engine
 from sqlalchemy.orm import sessionmaker
 from sqlalchemy.ext.declarative import declarative_base

+ 1 - 1
celery/routes.py

@@ -16,7 +16,7 @@ def expand_destination(route, routing_table):
     if isinstance(route, basestring):
         try:
             dest = dict(routing_table[route])
-        except KeyError, exc:
+        except KeyError:
             raise RouteNotFound(
                 "Route %s does not exist in the routing table "
                 "(CELERY_QUEUES)" % route)

+ 0 - 1
celery/serialization.py

@@ -1,6 +1,5 @@
 import sys
 import types
-import operator
 try:
     import cPickle as pickle
 except ImportError:

+ 0 - 1
celery/tests/test_worker_job.py

@@ -13,7 +13,6 @@ from celery.task.base import Task
 from celery.utils import gen_unique_id
 from celery.result import AsyncResult
 from celery.worker.job import WorkerTaskTrace, TaskRequest
-from celery.concurrency.processes import TaskPool
 from celery.backends import default_backend
 from celery.exceptions import RetryTaskError, NotRegistered
 from celery.decorators import task as task_dec

+ 5 - 2
celery/worker/__init__.py

@@ -3,7 +3,6 @@
 The Multiprocessing Worker Server
 
 """
-import time
 import socket
 import logging
 import traceback
@@ -26,6 +25,11 @@ TERMINATE = 0x3
 
 
 def process_initializer():
+    """Initializes the process so it can be used to process tasks.
+
+    Used for multiprocessing environments.
+
+    """
     # There seems to a bug in multiprocessing (backport?)
     # when detached, where the worker gets EOFErrors from time to time
     # and the logger is left from the parent process causing a crash.
@@ -53,7 +57,6 @@ class WorkController(object):
     :param embed_clockservice: see :attr:`run_clockservice`.
     :param send_events: see :attr:`send_events`.
 
-
     .. attribute:: concurrency
 
         The number of simultaneous processes doing work (default:

+ 0 - 2
celery/worker/listener.py

@@ -239,8 +239,6 @@ class CarrotListener(object):
 
     def consume_messages(self):
         """Consume messages forever (or until an exception is raised)."""
-        task_consumer = self.task_consumer
-
         self.logger.debug("CarrotListener: Starting message consumer...")
         wait_for_message = self._detect_wait_method()(limit=None).next
         self.logger.debug("CarrotListener: Ready to accept tasks!")