|
@@ -3,31 +3,32 @@ import sys
|
|
|
|
|
|
from time import time
|
|
from time import time
|
|
|
|
|
|
-import eventlet
|
|
|
|
-import eventlet.debug
|
|
|
|
|
|
|
|
if not os.environ.get("EVENTLET_NOPATCH"):
|
|
if not os.environ.get("EVENTLET_NOPATCH"):
|
|
|
|
+ import eventlet
|
|
|
|
+ import eventlet.debug
|
|
eventlet.monkey_patch()
|
|
eventlet.monkey_patch()
|
|
eventlet.debug.hub_prevent_multiple_readers(False)
|
|
eventlet.debug.hub_prevent_multiple_readers(False)
|
|
|
|
|
|
-from eventlet import GreenPool
|
|
|
|
-from eventlet.greenthread import getcurrent, spawn_after_local
|
|
|
|
-from greenlet import GreenletExit
|
|
|
|
-
|
|
|
|
from celery.concurrency import base
|
|
from celery.concurrency import base
|
|
from celery.utils import timer2
|
|
from celery.utils import timer2
|
|
|
|
|
|
|
|
|
|
def apply_target(target, args=(), kwargs={}, callback=None,
|
|
def apply_target(target, args=(), kwargs={}, callback=None,
|
|
- accept_callback=None):
|
|
|
|
|
|
+ accept_callback=None, getpid=None):
|
|
return base.apply_target(target, args, kwargs, callback, accept_callback,
|
|
return base.apply_target(target, args, kwargs, callback, accept_callback,
|
|
- pid=getcurrent())
|
|
|
|
|
|
+ pid=getpid())
|
|
|
|
|
|
|
|
|
|
class Schedule(timer2.Schedule):
|
|
class Schedule(timer2.Schedule):
|
|
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
def __init__(self, *args, **kwargs):
|
|
|
|
+ from eventlet.greenthread import spawn_after_local
|
|
|
|
+ from greenlet import GreenletExit
|
|
super(Schedule, self).__init__(*args, **kwargs)
|
|
super(Schedule, self).__init__(*args, **kwargs)
|
|
|
|
+
|
|
|
|
+ self.GreenletExit = GreenletExit
|
|
|
|
+ self._spawn_after_local = spawn_after_local
|
|
self._queue = set()
|
|
self._queue = set()
|
|
|
|
|
|
def enter(self, entry, eta=None, priority=0):
|
|
def enter(self, entry, eta=None, priority=0):
|
|
@@ -42,7 +43,7 @@ class Schedule(timer2.Schedule):
|
|
eta = now
|
|
eta = now
|
|
secs = eta - now
|
|
secs = eta - now
|
|
|
|
|
|
- g = spawn_after_local(secs, entry)
|
|
|
|
|
|
+ g = self._spawn_after_local(secs, entry)
|
|
self._queue.add(g)
|
|
self._queue.add(g)
|
|
g.link(self._entry_exit, entry)
|
|
g.link(self._entry_exit, entry)
|
|
g.entry = entry
|
|
g.entry = entry
|
|
@@ -56,7 +57,7 @@ class Schedule(timer2.Schedule):
|
|
try:
|
|
try:
|
|
try:
|
|
try:
|
|
g.wait()
|
|
g.wait()
|
|
- except GreenletExit:
|
|
|
|
|
|
+ except self.GreenletExit:
|
|
entry.cancel()
|
|
entry.cancel()
|
|
g.cancelled = True
|
|
g.cancelled = True
|
|
finally:
|
|
finally:
|
|
@@ -89,11 +90,18 @@ class Timer(timer2.Timer):
|
|
|
|
|
|
|
|
|
|
class TaskPool(base.BasePool):
|
|
class TaskPool(base.BasePool):
|
|
- Pool = GreenPool
|
|
|
|
Timer = Timer
|
|
Timer = Timer
|
|
|
|
|
|
signal_safe = False
|
|
signal_safe = False
|
|
|
|
|
|
|
|
+ def __init__(self, *args, **kwargs):
|
|
|
|
+ from eventlet import greenthread
|
|
|
|
+ from eventlet.greenpool import GreenPool
|
|
|
|
+ self.Pool = GreenPool
|
|
|
|
+ self.greenthread = greenthread
|
|
|
|
+
|
|
|
|
+ super(TaskPool, self).__init__(*args, **kwargs)
|
|
|
|
+
|
|
def on_start(self):
|
|
def on_start(self):
|
|
self._pool = self.Pool(self.limit)
|
|
self._pool = self.Pool(self.limit)
|
|
|
|
|
|
@@ -104,4 +112,5 @@ class TaskPool(base.BasePool):
|
|
def on_apply(self, target, args=None, kwargs=None, callback=None,
|
|
def on_apply(self, target, args=None, kwargs=None, callback=None,
|
|
accept_callback=None, **_):
|
|
accept_callback=None, **_):
|
|
self._pool.spawn_n(apply_target, target, args, kwargs,
|
|
self._pool.spawn_n(apply_target, target, args, kwargs,
|
|
- callback, accept_callback)
|
|
|
|
|
|
+ callback, accept_callback,
|
|
|
|
+ self.greenthread.getcurrent)
|