Ask Solem преди 15 години
родител
ревизия
b4d5019c5e
променени са 4 файла, в които са добавени 84 реда и са изтрити 9 реда
  1. 57 0
      celery/contrib/batches.py
  2. 13 0
      celery/task/base.py
  3. 3 3
      celery/worker/__init__.py
  4. 11 6
      celery/worker/job.py

+ 57 - 0
celery/contrib/batches.py

@@ -0,0 +1,57 @@
+from itertools import count
+from collections import deque, defaultdict
+
+from celery.task.base import Task
+
+
+class Batches(Task):
+    abstract = True
+    flush_every = 10
+
+    def __init__(self):
+        self._buffer = deque()
+        self._count = count().next
+
+    def execute(self, wrapper, pool, loglevel, logfile):
+        self._buffer.append((wrapper, pool, loglevel, logfile))
+
+        if not self._count() % self.flush_every:
+            self.flush(self._buffer)
+            self._buffer.clear()
+
+    def flush(self, tasks):
+        for wrapper, pool, loglevel, logfile in tasks:
+            wrapper.execute_using_pool(pool, loglevel, logfile)
+
+
+class Counter(Task):
+    abstract = True
+    flush_every = 10
+
+    def __init__(self):
+        self._buffer = deque()
+        self._count = count().next
+
+    def execute(self, wrapper, pool, loglevel, logfile):
+        self._buffer.append((wrapper.args, wrapper.kwargs))
+
+        if not self._count() % self.flush_every:
+            self.flush(self._buffer)
+            self._buffer.clear()
+
+    def flush(self, buffer):
+        raise NotImplementedError("Counters must implement 'flush'")
+
+
+
+class ClickCounter(Task):
+    flush_every = 1000
+
+    def flush(self, buffer):
+        urlcount = defaultdict(lambda: 0)
+        for args, kwargs in buffer:
+            urlcount[kwargs["url"]] += 1
+
+        for url, count in urlcount.items():
+            print(">>> Clicks: %s -> %s" % (url, count))
+            # increment_in_db(url, n=count)

+ 13 - 0
celery/task/base.py

@@ -401,6 +401,19 @@ class Task(object):
         """
         pass
 
+    def execute(self, wrapper, pool, loglevel, logfile):
+        """The method the worker calls to execute the task.
+
+        :param wrapper: A :class:`celery.worker.job.TaskWrapper`.
+        :param pool: A :class:`celery.worker.pool.TaskPool` object.
+        :param loglevel: Current loglevel.
+        :param logfile: Name of the currently used logfile.
+
+        """
+        wrapper.execute_using_pool(pool, loglevel, logfile)
+
+
+
 
 class ExecuteRemoteTask(Task):
     """Execute an arbitrary function or object.

+ 3 - 3
celery/worker/__init__.py

@@ -168,12 +168,12 @@ class WorkController(object):
         finally:
             self.stop()
 
-    def process_task(self, task):
+    def process_task(self, wrapper):
         """Process task by sending it to the pool of workers."""
         try:
             try:
-                task.execute_using_pool(self.pool, self.loglevel,
-                                        self.logfile)
+                wrapper.task.execute(wrapper, self.pool,
+                                     self.loglevel, self.logfile)
             except Exception, exc:
                 self.logger.critical("Internal error %s: %s\n%s" % (
                                 exc.__class__, exc, traceback.format_exc()))

+ 11 - 6
celery/worker/job.py

@@ -175,6 +175,9 @@ class TaskWrapper(object):
         [celery@%(hostname)s] Error: Task %(name)s (%(id)s): %(exc)s
     """
     fail_email_body = TASK_FAIL_EMAIL_BODY
+    _type = None # set by property: type
+    executed = False
+    time_start = None
 
     def __init__(self, task_name, task_id, args, kwargs,
             on_ack=noop, retries=0, **opts):
@@ -183,17 +186,13 @@ class TaskWrapper(object):
         self.retries = retries
         self.args = args
         self.kwargs = kwargs
-        self.logger = opts.get("logger")
-        self.eventer = opts.get("eventer")
         self.on_ack = on_ack
-        self.executed = False
-        self.time_start = None
+
         for opt in ("success_msg", "fail_msg", "fail_email_subject",
-                "fail_email_body"):
+                "fail_email_body", "logger", "eventer"):
             setattr(self, opt, opts.get(opt, getattr(self, opt, None)))
         if not self.logger:
             self.logger = get_default_logger()
-        self.task = tasks[self.task_name]
 
     def __repr__(self):
         return '<%s: {name:"%s", id:"%s", args:"%s", kwargs:"%s"}>' % (
@@ -347,3 +346,9 @@ class TaskWrapper(object):
             subject = self.fail_email_subject.strip() % context
             body = self.fail_email_body.strip() % context
             mail_admins(subject, body, fail_silently=True)
+
+    @property
+    def task(self):
+        if self._type is None:
+            self._type = tasks[self.task_name]
+        return self._type