**** CubicPower OpenStack Study ****
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2011 Justin Santa Barbara
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Generic Node base class for all workers that run on hosts."""
import errno
import logging as std_logging
import os
import random
import signal
import sys
import time
import eventlet
from eventlet import event
from oslo.config import cfg
from keystone.openstack.common import eventlet_backdoor
from keystone.openstack.common.gettextutils import _ # noqa
from keystone.openstack.common import importutils
from keystone.openstack.common import log as logging
from keystone.openstack.common import threadgroup
rpc = importutils.try_import('keystone.openstack.common.rpc')
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
**** CubicPower OpenStack Study ****
def _sighup_supported():
return hasattr(signal, 'SIGHUP')
**** CubicPower OpenStack Study ****
def _is_sighup(signo):
return _sighup_supported() and signo == signal.SIGHUP
**** CubicPower OpenStack Study ****
def _signo_to_signame(signo):
signals = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT'}
if _sighup_supported():
signals[signal.SIGHUP] = 'SIGHUP'
return signals[signo]
**** CubicPower OpenStack Study ****
def _set_signals_handler(handler):
signal.signal(signal.SIGTERM, handler)
signal.signal(signal.SIGINT, handler)
if _sighup_supported():
signal.signal(signal.SIGHUP, handler)
**** CubicPower OpenStack Study ****
class Launcher(object):
"""Launch one or more services and wait for them to complete."""
**** CubicPower OpenStack Study ****
def __init__(self):
"""Initialize the service launcher.
:returns: None
"""
self.services = Services()
self.backdoor_port = eventlet_backdoor.initialize_if_enabled()
**** CubicPower OpenStack Study ****
def launch_service(self, service):
"""Load and start the given service.
:param service: The service you would like to start.
:returns: None
"""
service.backdoor_port = self.backdoor_port
self.services.add(service)
**** CubicPower OpenStack Study ****
def stop(self):
"""Stop all services which are currently running.
:returns: None
"""
self.services.stop()
**** CubicPower OpenStack Study ****
def wait(self):
"""Waits until all services have been stopped, and then returns.
:returns: None
"""
self.services.wait()
**** CubicPower OpenStack Study ****
def restart(self):
"""Reload config files and restart service.
:returns: None
"""
cfg.CONF.reload_config_files()
self.services.restart()
**** CubicPower OpenStack Study ****
class SignalExit(SystemExit):
**** CubicPower OpenStack Study ****
def __init__(self, signo, exccode=1):
super(SignalExit, self).__init__(exccode)
self.signo = signo
**** CubicPower OpenStack Study ****
class ServiceLauncher(Launcher):
**** CubicPower OpenStack Study ****
def _handle_signal(self, signo, frame):
# Allow the process to be killed again and die from natural causes
_set_signals_handler(signal.SIG_DFL)
raise SignalExit(signo)
**** CubicPower OpenStack Study ****
def handle_signal(self):
_set_signals_handler(self._handle_signal)
**** CubicPower OpenStack Study ****
def _wait_for_exit_or_signal(self, ready_callback=None):
status = None
signo = 0
LOG.debug(_('Full set of CONF:'))
CONF.log_opt_values(LOG, std_logging.DEBUG)
try:
if ready_callback:
ready_callback()
super(ServiceLauncher, self).wait()
except SignalExit as exc:
signame = _signo_to_signame(exc.signo)
LOG.info(_('Caught %s, exiting'), signame)
status = exc.code
signo = exc.signo
except SystemExit as exc:
status = exc.code
finally:
self.stop()
if rpc:
try:
rpc.cleanup()
except Exception:
# We're shutting down, so it doesn't matter at this point.
LOG.exception(_('Exception during rpc cleanup.'))
return status, signo
**** CubicPower OpenStack Study ****
def wait(self, ready_callback=None):
while True:
self.handle_signal()
status, signo = self._wait_for_exit_or_signal(ready_callback)
if not _is_sighup(signo):
return status
self.restart()
**** CubicPower OpenStack Study ****
class ServiceWrapper(object):
**** CubicPower OpenStack Study ****
def __init__(self, service, workers):
self.service = service
self.workers = workers
self.children = set()
self.forktimes = []
**** CubicPower OpenStack Study ****
class ProcessLauncher(object):
**** CubicPower OpenStack Study ****
def __init__(self):
self.children = {}
self.sigcaught = None
self.running = True
rfd, self.writepipe = os.pipe()
self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
self.handle_signal()
**** CubicPower OpenStack Study ****
def handle_signal(self):
_set_signals_handler(self._handle_signal)
**** CubicPower OpenStack Study ****
def _handle_signal(self, signo, frame):
self.sigcaught = signo
self.running = False
# Allow the process to be killed again and die from natural causes
_set_signals_handler(signal.SIG_DFL)
**** CubicPower OpenStack Study ****
def _pipe_watcher(self):
# This will block until the write end is closed when the parent
# dies unexpectedly
self.readpipe.read()
LOG.info(_('Parent process has died unexpectedly, exiting'))
sys.exit(1)
**** CubicPower OpenStack Study ****
def _child_process_handle_signal(self):
# Setup child signal handlers differently
def _sigterm(*args):
signal.signal(signal.SIGTERM, signal.SIG_DFL)
raise SignalExit(signal.SIGTERM)
def _sighup(*args):
signal.signal(signal.SIGHUP, signal.SIG_DFL)
raise SignalExit(signal.SIGHUP)
signal.signal(signal.SIGTERM, _sigterm)
if _sighup_supported():
signal.signal(signal.SIGHUP, _sighup)
# Block SIGINT and let the parent send us a SIGTERM
signal.signal(signal.SIGINT, signal.SIG_IGN)
**** CubicPower OpenStack Study ****
def _sigterm(*args):
signal.signal(signal.SIGTERM, signal.SIG_DFL)
raise SignalExit(signal.SIGTERM)
**** CubicPower OpenStack Study ****
def _sighup(*args):
signal.signal(signal.SIGHUP, signal.SIG_DFL)
raise SignalExit(signal.SIGHUP)
signal.signal(signal.SIGTERM, _sigterm)
if _sighup_supported():
signal.signal(signal.SIGHUP, _sighup)
# Block SIGINT and let the parent send us a SIGTERM
signal.signal(signal.SIGINT, signal.SIG_IGN)
**** CubicPower OpenStack Study ****
def _child_wait_for_exit_or_signal(self, launcher):
status = 0
signo = 0
# NOTE(johannes): All exceptions are caught to ensure this
# doesn't fallback into the loop spawning children. It would
# be bad for a child to spawn more children.
try:
launcher.wait()
except SignalExit as exc:
signame = _signo_to_signame(exc.signo)
LOG.info(_('Caught %s, exiting'), signame)
status = exc.code
signo = exc.signo
except SystemExit as exc:
status = exc.code
except BaseException:
LOG.exception(_('Unhandled exception'))
status = 2
finally:
launcher.stop()
return status, signo
**** CubicPower OpenStack Study ****
def _child_process(self, service):
self._child_process_handle_signal()
# Reopen the eventlet hub to make sure we don't share an epoll
# fd with parent and/or siblings, which would be bad
eventlet.hubs.use_hub()
# Close write to ensure only parent has it open
os.close(self.writepipe)
# Create greenthread to watch for parent to close pipe
eventlet.spawn_n(self._pipe_watcher)
# Reseed random number generator
random.seed()
launcher = Launcher()
launcher.launch_service(service)
return launcher
**** CubicPower OpenStack Study ****
def _start_child(self, wrap):
if len(wrap.forktimes) > wrap.workers:
# Limit ourselves to one process a second (over the period of
# number of workers * 1 second). This will allow workers to
# start up quickly but ensure we don't fork off children that
# die instantly too quickly.
if time.time() - wrap.forktimes[0] < wrap.workers:
LOG.info(_('Forking too fast, sleeping'))
time.sleep(1)
wrap.forktimes.pop(0)
wrap.forktimes.append(time.time())
pid = os.fork()
if pid == 0:
launcher = self._child_process(wrap.service)
while True:
self._child_process_handle_signal()
status, signo = self._child_wait_for_exit_or_signal(launcher)
if not _is_sighup(signo):
break
launcher.restart()
os._exit(status)
LOG.info(_('Started child %d'), pid)
wrap.children.add(pid)
self.children[pid] = wrap
return pid
**** CubicPower OpenStack Study ****
def launch_service(self, service, workers=1):
wrap = ServiceWrapper(service, workers)
LOG.info(_('Starting %d workers'), wrap.workers)
while self.running and len(wrap.children) < wrap.workers:
self._start_child(wrap)
**** CubicPower OpenStack Study ****
def _wait_child(self):
try:
# Don't block if no child processes have exited
pid, status = os.waitpid(0, os.WNOHANG)
if not pid:
return None
except OSError as exc:
if exc.errno not in (errno.EINTR, errno.ECHILD):
raise
return None
if os.WIFSIGNALED(status):
sig = os.WTERMSIG(status)
LOG.info(_('Child %(pid)d killed by signal %(sig)d'),
dict(pid=pid, sig=sig))
else:
code = os.WEXITSTATUS(status)
LOG.info(_('Child %(pid)s exited with status %(code)d'),
dict(pid=pid, code=code))
if pid not in self.children:
LOG.warning(_('pid %d not in child list'), pid)
return None
wrap = self.children.pop(pid)
wrap.children.remove(pid)
return wrap
**** CubicPower OpenStack Study ****
def _respawn_children(self):
while self.running:
wrap = self._wait_child()
if not wrap:
# Yield to other threads if no children have exited
# Sleep for a short time to avoid excessive CPU usage
# (see bug #1095346)
eventlet.greenthread.sleep(.01)
continue
while self.running and len(wrap.children) < wrap.workers:
self._start_child(wrap)
**** CubicPower OpenStack Study ****
def wait(self):
"""Loop waiting on children to die and respawning as necessary."""
LOG.debug(_('Full set of CONF:'))
CONF.log_opt_values(LOG, std_logging.DEBUG)
while True:
self.handle_signal()
self._respawn_children()
if self.sigcaught:
signame = _signo_to_signame(self.sigcaught)
LOG.info(_('Caught %s, stopping children'), signame)
if not _is_sighup(self.sigcaught):
break
for pid in self.children:
os.kill(pid, signal.SIGHUP)
self.running = True
self.sigcaught = None
for pid in self.children:
try:
os.kill(pid, signal.SIGTERM)
except OSError as exc:
if exc.errno != errno.ESRCH:
raise
# Wait for children to die
if self.children:
LOG.info(_('Waiting on %d children to exit'), len(self.children))
while self.children:
self._wait_child()
**** CubicPower OpenStack Study ****
class Service(object):
"""Service object for binaries running on hosts."""
**** CubicPower OpenStack Study ****
def __init__(self, threads=1000):
self.tg = threadgroup.ThreadGroup(threads)
# signal that the service is done shutting itself down:
self._done = event.Event()
**** CubicPower OpenStack Study ****
def reset(self):
# NOTE(Fengqian): docs for Event.reset() recommend against using it
self._done = event.Event()
**** CubicPower OpenStack Study ****
def start(self):
pass
**** CubicPower OpenStack Study ****
def stop(self):
self.tg.stop()
self.tg.wait()
# Signal that service cleanup is done:
if not self._done.ready():
self._done.send()
**** CubicPower OpenStack Study ****
def wait(self):
self._done.wait()
**** CubicPower OpenStack Study ****
class Services(object):
**** CubicPower OpenStack Study ****
def __init__(self):
self.services = []
self.tg = threadgroup.ThreadGroup()
self.done = event.Event()
**** CubicPower OpenStack Study ****
def add(self, service):
self.services.append(service)
self.tg.add_thread(self.run_service, service, self.done)
**** CubicPower OpenStack Study ****
def stop(self):
# wait for graceful shutdown of services:
for service in self.services:
service.stop()
service.wait()
# Each service has performed cleanup, now signal that the run_service
# wrapper threads can now die:
if not self.done.ready():
self.done.send()
# reap threads:
self.tg.stop()
**** CubicPower OpenStack Study ****
def wait(self):
self.tg.wait()
**** CubicPower OpenStack Study ****
def restart(self):
self.stop()
self.done = event.Event()
for restart_service in self.services:
restart_service.reset()
self.tg.add_thread(self.run_service, restart_service, self.done)
@staticmethod
**** CubicPower OpenStack Study ****
def run_service(service, done):
"""Service start wrapper.
:param service: service to run
:param done: event to wait on until a shutdown is triggered
:returns: None
"""
service.start()
done.wait()
def launch(service, workers=None):
if workers:
launcher = ProcessLauncher()
launcher.launch_service(service, workers=workers)
else:
launcher = ServiceLauncher()
launcher.launch_service(service)
return launcher
**** CubicPower OpenStack Study ****
def launch(service, workers=None):
if workers:
launcher = ProcessLauncher()
launcher.launch_service(service, workers=workers)
else:
launcher = ServiceLauncher()
launcher.launch_service(service)
return launcher