Browse Source

Merge branch 'master' of github.com:celery/celery

Ask Solem 12 years ago
parent
commit
e9a81a6062
2 changed files with 13 additions and 4 deletions
  1. 10 1
      celery/events/state.py
  2. 3 3
      celery/worker/buckets.py

+ 10 - 1
celery/events/state.py

@@ -277,7 +277,7 @@ class State(object):
     def __init__(self, callback=None,
                  max_workers_in_memory=5000, max_tasks_in_memory=10000):
         self.max_workers_in_memory = max_workers_in_memory
-        self.max_tasks_in_memory = 10000
+        self.max_tasks_in_memory = max_tasks_in_memory
         self.workers = LRUCache(limit=self.max_workers_in_memory)
         self.tasks = LRUCache(limit=self.max_tasks_in_memory)
         self._taskheap = []
@@ -445,5 +445,14 @@ class State(object):
         return '<State: events={0.event_count} tasks={0.task_count}>' \
             .format(self)
 
+    def __getstate__(self):
+        d = dict(vars(self))
+        d.pop('_mutex')
+        return d
+
+    def __setstate__(self, state):
+        self.__dict__ = state
+        self._mutex = threading.Lock()
+
 
 state = State()

+ 3 - 3
celery/worker/buckets.py

@@ -6,7 +6,7 @@
     This module implements the rate limiting of tasks,
     by having a token bucket queue for each task type.
     When a task is allowed to be processed it's moved
-    over the the ``ready_queue``
+    over the ``ready_queue``
 
     The :mod:`celery.worker.mediator` is then responsible
     for moving tasks from the ``ready_queue`` to the worker pool.
@@ -67,7 +67,7 @@ class TaskBucket(object):
 
     def put(self, request):
         """Put a :class:`~celery.worker.job.Request` into
-        the appropiate bucket."""
+        the appropriate bucket."""
         if request.name not in self.buckets:
             self.add_bucket_for_type(request.name)
         self.buckets[request.name].put_nowait(request)
@@ -118,7 +118,7 @@ class TaskBucket(object):
             return min(remaining_times), None
 
     def get(self, block=True, timeout=None):
-        """Retrive the task from the first available bucket.
+        """Retrieve the task from the first available bucket.
 
         Available as in, there is an item in the queue and you can
         consume tokens from it.