Przeglądaj źródła

Make sure we don't shutdown twice.

Ask Solem 16 lat temu
rodzic
commit
faaa58ca71
1 zmienionych plików z 6 dodań i 5 usunięć
  1. 6 5
      celery/worker.py

+ 6 - 5
celery/worker.py

@@ -177,8 +177,6 @@ class TaskWrapper(object):
         :returns: :class:`TaskWrapper` instance.
 
         """
-        # TODO This doesn't really make sense anymore, since the message
-        # is now part of the TaskWrapper instance itself. Needs refactoring.
         task_name = message_data["task"]
         task_id = message_data["id"]
         args = message_data["args"]
@@ -342,6 +340,7 @@ class WorkController(object):
     loglevel = logging.ERROR
     concurrency = DAEMON_CONCURRENCY
     logfile = DAEMON_LOG_FILE
+    _state = None
 
     def __init__(self, concurrency=None, logfile=None, loglevel=None,
             is_detached=False):
@@ -397,9 +396,6 @@ class WorkController(object):
             except Exception, exc:
                 self.logger.critical("Message queue raised %s: %s\n%s" % (
                                 exc.__class__, exc, traceback.format_exc()))
-            except:
-                self.shutdown()
-                raise
         except (SystemExit, KeyboardInterrupt):
             self.shutdown()
 
@@ -419,13 +415,18 @@ class WorkController(object):
         return result
 
     def shutdown(self):
+        """Make sure ``celeryd`` exits cleanly."""
         # shut down the periodic work controller thread
+        if self._state != "RUN":
+            return
+        self._state = "TERMINATE"
         self.periodicworkcontroller.stop()
         self.pool.terminate()
         self.close_connection()
 
     def run(self):
         """Starts the workers main loop."""
+        self._state = "RUN"
         task_consumer = self.reset_connection()
         it = task_consumer.iterconsume(limit=None)