Browse Source

Don't import from threading module

Ask Solem 12 years ago
parent
commit
39bf13ab84

+ 2 - 2
celery/app/base.py

@@ -9,13 +9,13 @@
 from __future__ import absolute_import
 from __future__ import absolute_import
 from __future__ import with_statement
 from __future__ import with_statement
 
 
+import threading
 import warnings
 import warnings
 
 
 from collections import deque
 from collections import deque
 from contextlib import contextmanager
 from contextlib import contextmanager
 from copy import deepcopy
 from copy import deepcopy
 from functools import wraps
 from functools import wraps
-from threading import Lock
 
 
 from billiard.util import register_after_fork
 from billiard.util import register_after_fork
 from kombu.clocks import LamportClock
 from kombu.clocks import LamportClock
@@ -77,7 +77,7 @@ class Celery(object):
         self._pending_defaults = deque()
         self._pending_defaults = deque()
 
 
         self.finalized = False
         self.finalized = False
-        self._finalize_mutex = Lock()
+        self._finalize_mutex = threading.Lock()
         self._pending = deque()
         self._pending = deque()
         self._tasks = tasks
         self._tasks = tasks
         if not isinstance(self._tasks, TaskRegistry):
         if not isinstance(self._tasks, TaskRegistry):

+ 2 - 2
celery/events/state.py

@@ -20,8 +20,8 @@ from __future__ import absolute_import
 from __future__ import with_statement
 from __future__ import with_statement
 
 
 import heapq
 import heapq
+import threading
 
 
-from threading import Lock
 from time import time
 from time import time
 
 
 from kombu.utils import kwdict
 from kombu.utils import kwdict
@@ -216,7 +216,7 @@ class State(object):
         self.event_callback = callback
         self.event_callback = callback
         self.group_handlers = {'worker': self.worker_event,
         self.group_handlers = {'worker': self.worker_event,
                                'task': self.task_event}
                                'task': self.task_event}
-        self._mutex = Lock()
+        self._mutex = threading.Lock()
 
 
     def freeze_while(self, fun, *args, **kwargs):
     def freeze_while(self, fun, *args, **kwargs):
         clear_after = kwargs.pop('clear_after', False)
         clear_after = kwargs.pop('clear_after', False)

+ 0 - 2
celery/tests/worker/test_control.py

@@ -156,8 +156,6 @@ class test_ControlPanel(Case):
         self.panel.handle('report')
         self.panel.handle('report')
 
 
     def test_active(self):
     def test_active(self):
-        from celery.worker.job import TaskRequest
-
         r = TaskRequest(mytask.name, 'do re mi', (), {})
         r = TaskRequest(mytask.name, 'do re mi', (), {})
         state.active_requests.add(r)
         state.active_requests.add(r)
         try:
         try:

+ 3 - 3
celery/utils/functional.py

@@ -10,10 +10,10 @@ from __future__ import absolute_import
 from __future__ import with_statement
 from __future__ import with_statement
 
 
 import operator
 import operator
+import threading
 
 
 from functools import partial, wraps
 from functools import partial, wraps
 from itertools import islice
 from itertools import islice
-from threading import Lock, RLock
 
 
 from kombu.utils import cached_property
 from kombu.utils import cached_property
 from kombu.utils.functional import promise, maybe_promise
 from kombu.utils.functional import promise, maybe_promise
@@ -36,7 +36,7 @@ class LRUCache(UserDict):
 
 
     def __init__(self, limit=None):
     def __init__(self, limit=None):
         self.limit = limit
         self.limit = limit
-        self.mutex = RLock()
+        self.mutex = threading.RLock()
         self.data = OrderedDict()
         self.data = OrderedDict()
 
 
     def __getitem__(self, key):
     def __getitem__(self, key):
@@ -102,7 +102,7 @@ def maybe_list(l):
 def memoize(maxsize=None, Cache=LRUCache):
 def memoize(maxsize=None, Cache=LRUCache):
 
 
     def _memoize(fun):
     def _memoize(fun):
-        mutex = Lock()
+        mutex = threading.Lock()
         cache = Cache(limit=maxsize)
         cache = Cache(limit=maxsize)
 
 
         @wraps(fun)
         @wraps(fun)

+ 7 - 7
celery/utils/timer2.py

@@ -13,10 +13,10 @@ import atexit
 import heapq
 import heapq
 import os
 import os
 import sys
 import sys
+import threading
 
 
 from functools import wraps
 from functools import wraps
 from itertools import count
 from itertools import count
-from threading import Condition, Event, Lock, Thread
 from time import time, sleep, mktime
 from time import time, sleep, mktime
 
 
 from datetime import datetime, timedelta
 from datetime import datetime, timedelta
@@ -210,7 +210,7 @@ class Schedule(object):
         return map(heapq.heappop, [events] * len(events))
         return map(heapq.heappop, [events] * len(events))
 
 
 
 
-class Timer(Thread):
+class Timer(threading.Thread):
     Entry = Entry
     Entry = Entry
     Schedule = Schedule
     Schedule = Schedule
 
 
@@ -231,11 +231,11 @@ class Timer(Thread):
                                                   max_interval=max_interval)
                                                   max_interval=max_interval)
         self.on_tick = on_tick or self.on_tick
         self.on_tick = on_tick or self.on_tick
 
 
-        Thread.__init__(self)
-        self._is_shutdown = Event()
-        self._is_stopped = Event()
-        self.mutex = Lock()
-        self.not_empty = Condition(self.mutex)
+        threading.Thread.__init__(self)
+        self._is_shutdown = threading.Event()
+        self._is_stopped = threading.Event()
+        self.mutex = threading.Lock()
+        self.not_empty = threading.Condition(self.mutex)
         self.setDaemon(True)
         self.setDaemon(True)
         self.setName('Timer-%s' % (self._timer_count(), ))
         self.setName('Timer-%s' % (self._timer_count(), ))