¡@

Home 

OpenStack Study: service.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Copyright 2010 United States Government as represented by the

# Administrator of the National Aeronautics and Space Administration.

# Copyright 2011 Justin Santa Barbara

# All Rights Reserved.

#

# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

"""Generic Node base class for all workers that run on hosts."""

import errno

import logging as std_logging

import os

import random

import signal

import sys

import time

try:

# Importing just the symbol here because the io module does not

# exist in Python 2.6.

from io import UnsupportedOperation # noqa

except ImportError:

# Python 2.6

UnsupportedOperation = None

import eventlet

from eventlet import event

from oslo.config import cfg

from neutron.openstack.common import eventlet_backdoor

from neutron.openstack.common.gettextutils import _

from neutron.openstack.common import importutils

from neutron.openstack.common import log as logging

from neutron.openstack.common import threadgroup

rpc = importutils.try_import('neutron.openstack.common.rpc')

CONF = cfg.CONF

LOG = logging.getLogger(__name__)

**** CubicPower OpenStack Study ****

def _sighup_supported():

    return hasattr(signal, 'SIGHUP')

**** CubicPower OpenStack Study ****

def _is_daemon():

    # The process group for a foreground process will match the

    # process group of the controlling terminal. If those values do

    # not match, or ioctl() fails on the stdout file handle, we assume

    # the process is running in the background as a daemon.

    # http://www.gnu.org/software/bash/manual/bashref.html#Job-Control-Basics

    try:

        is_daemon = os.getpgrp() != os.tcgetpgrp(sys.stdout.fileno())

    except OSError as err:

        if err.errno == errno.ENOTTY:

            # Assume we are a daemon because there is no terminal.

            is_daemon = True

        else:

            raise

    except UnsupportedOperation:

        # Could not get the fileno for stdout, so we must be a daemon.

        is_daemon = True

    return is_daemon

**** CubicPower OpenStack Study ****

def _is_sighup_and_daemon(signo):

    if not (_sighup_supported() and signo == signal.SIGHUP):

        # Avoid checking if we are a daemon, because the signal isn't

        # SIGHUP.

        return False

    return _is_daemon()

**** CubicPower OpenStack Study ****

def _signo_to_signame(signo):

    signals = {signal.SIGTERM: 'SIGTERM',

               signal.SIGINT: 'SIGINT'}

    if _sighup_supported():

        signals[signal.SIGHUP] = 'SIGHUP'

    return signals[signo]

**** CubicPower OpenStack Study ****

def _set_signals_handler(handler):

    signal.signal(signal.SIGTERM, handler)

    signal.signal(signal.SIGINT, handler)

    if _sighup_supported():

        signal.signal(signal.SIGHUP, handler)

**** CubicPower OpenStack Study ****

class Launcher(object):

"""Launch one or more services and wait for them to complete."""

**** CubicPower OpenStack Study ****

    def __init__(self):

        """Initialize the service launcher.

        :returns: None

        """

        self.services = Services()

        self.backdoor_port = eventlet_backdoor.initialize_if_enabled()

**** CubicPower OpenStack Study ****

    def launch_service(self, service):

        """Load and start the given service.

        :param service: The service you would like to start.

        :returns: None

        """

        service.backdoor_port = self.backdoor_port

        self.services.add(service)

**** CubicPower OpenStack Study ****

    def stop(self):

        """Stop all services which are currently running.

        :returns: None

        """

        self.services.stop()

**** CubicPower OpenStack Study ****

    def wait(self):

        """Waits until all services have been stopped, and then returns.

        :returns: None

        """

        self.services.wait()

**** CubicPower OpenStack Study ****

    def restart(self):

        """Reload config files and restart service.

        :returns: None

        """

        cfg.CONF.reload_config_files()

        self.services.restart()

**** CubicPower OpenStack Study ****

class SignalExit(SystemExit):

**** CubicPower OpenStack Study ****

    def __init__(self, signo, exccode=1):

        super(SignalExit, self).__init__(exccode)

        self.signo = signo

**** CubicPower OpenStack Study ****

class ServiceLauncher(Launcher):

**** CubicPower OpenStack Study ****

    def _handle_signal(self, signo, frame):

        # Allow the process to be killed again and die from natural causes

        _set_signals_handler(signal.SIG_DFL)

        raise SignalExit(signo)

**** CubicPower OpenStack Study ****

    def handle_signal(self):

        _set_signals_handler(self._handle_signal)

**** CubicPower OpenStack Study ****

    def _wait_for_exit_or_signal(self, ready_callback=None):

        status = None

        signo = 0

        LOG.debug(_('Full set of CONF:'))

        CONF.log_opt_values(LOG, std_logging.DEBUG)

        try:

            if ready_callback:

                ready_callback()

            super(ServiceLauncher, self).wait()

        except SignalExit as exc:

            signame = _signo_to_signame(exc.signo)

            LOG.info(_('Caught %s, exiting'), signame)

            status = exc.code

            signo = exc.signo

        except SystemExit as exc:

            status = exc.code

        finally:

            self.stop()

            if rpc:

                try:

                    rpc.cleanup()

                except Exception:

                    # We're shutting down, so it doesn't matter at this point.

                    LOG.exception(_('Exception during rpc cleanup.'))

        return status, signo

**** CubicPower OpenStack Study ****

    def wait(self, ready_callback=None):

        while True:

            self.handle_signal()

            status, signo = self._wait_for_exit_or_signal(ready_callback)

            if not _is_sighup_and_daemon(signo):

                return status

            self.restart()

**** CubicPower OpenStack Study ****

class ServiceWrapper(object):

**** CubicPower OpenStack Study ****

    def __init__(self, service, workers):

        self.service = service

        self.workers = workers

        self.children = set()

        self.forktimes = []

**** CubicPower OpenStack Study ****

class ProcessLauncher(object):

**** CubicPower OpenStack Study ****

    def __init__(self, wait_interval=0.01):

        """Constructor.

        :param wait_interval: The interval to sleep for between checks

                              of child process exit.

        """

        self.children = {}

        self.sigcaught = None

        self.running = True

        self.wait_interval = wait_interval

        rfd, self.writepipe = os.pipe()

        self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')

        self.handle_signal()

**** CubicPower OpenStack Study ****

    def handle_signal(self):

        _set_signals_handler(self._handle_signal)

**** CubicPower OpenStack Study ****

    def _handle_signal(self, signo, frame):

        self.sigcaught = signo

        self.running = False

        # Allow the process to be killed again and die from natural causes

        _set_signals_handler(signal.SIG_DFL)

**** CubicPower OpenStack Study ****

    def _pipe_watcher(self):

        # This will block until the write end is closed when the parent

        # dies unexpectedly

        self.readpipe.read()

        LOG.info(_('Parent process has died unexpectedly, exiting'))

        sys.exit(1)

**** CubicPower OpenStack Study ****

    def _child_process_handle_signal(self):

        # Setup child signal handlers differently

        def _sigterm(*args):

            signal.signal(signal.SIGTERM, signal.SIG_DFL)

            raise SignalExit(signal.SIGTERM)

        def _sighup(*args):

            signal.signal(signal.SIGHUP, signal.SIG_DFL)

            raise SignalExit(signal.SIGHUP)

        signal.signal(signal.SIGTERM, _sigterm)

        if _sighup_supported():

            signal.signal(signal.SIGHUP, _sighup)

        # Block SIGINT and let the parent send us a SIGTERM

        signal.signal(signal.SIGINT, signal.SIG_IGN)

**** CubicPower OpenStack Study ****

        def _sigterm(*args):

            signal.signal(signal.SIGTERM, signal.SIG_DFL)

            raise SignalExit(signal.SIGTERM)

**** CubicPower OpenStack Study ****

        def _sighup(*args):

            signal.signal(signal.SIGHUP, signal.SIG_DFL)

            raise SignalExit(signal.SIGHUP)

        signal.signal(signal.SIGTERM, _sigterm)

        if _sighup_supported():

            signal.signal(signal.SIGHUP, _sighup)

        # Block SIGINT and let the parent send us a SIGTERM

        signal.signal(signal.SIGINT, signal.SIG_IGN)

**** CubicPower OpenStack Study ****

    def _child_wait_for_exit_or_signal(self, launcher):

        status = 0

        signo = 0

        # NOTE(johannes): All exceptions are caught to ensure this

        # doesn't fallback into the loop spawning children. It would

        # be bad for a child to spawn more children.

        try:

            launcher.wait()

        except SignalExit as exc:

            signame = _signo_to_signame(exc.signo)

            LOG.info(_('Caught %s, exiting'), signame)

            status = exc.code

            signo = exc.signo

        except SystemExit as exc:

            status = exc.code

        except BaseException:

            LOG.exception(_('Unhandled exception'))

            status = 2

        finally:

            launcher.stop()

        return status, signo

**** CubicPower OpenStack Study ****

    def _child_process(self, service):

        self._child_process_handle_signal()

        # Reopen the eventlet hub to make sure we don't share an epoll

        # fd with parent and/or siblings, which would be bad

        eventlet.hubs.use_hub()

        # Close write to ensure only parent has it open

        os.close(self.writepipe)

        # Create greenthread to watch for parent to close pipe

        eventlet.spawn_n(self._pipe_watcher)

        # Reseed random number generator

        random.seed()

        launcher = Launcher()

        launcher.launch_service(service)

        return launcher

**** CubicPower OpenStack Study ****

    def _start_child(self, wrap):

        if len(wrap.forktimes) > wrap.workers:

            # Limit ourselves to one process a second (over the period of

            # number of workers * 1 second). This will allow workers to

            # start up quickly but ensure we don't fork off children that

            # die instantly too quickly.

            if time.time() - wrap.forktimes[0] < wrap.workers:

                LOG.info(_('Forking too fast, sleeping'))

                time.sleep(1)

            wrap.forktimes.pop(0)

        wrap.forktimes.append(time.time())

        pid = os.fork()

        if pid == 0:

            launcher = self._child_process(wrap.service)

            while True:

                self._child_process_handle_signal()

                status, signo = self._child_wait_for_exit_or_signal(launcher)

                if not _is_sighup_and_daemon(signo):

                    break

                launcher.restart()

            os._exit(status)

        LOG.info(_('Started child %d'), pid)

        wrap.children.add(pid)

        self.children[pid] = wrap

        return pid

**** CubicPower OpenStack Study ****

    def launch_service(self, service, workers=1):

        wrap = ServiceWrapper(service, workers)

        LOG.info(_('Starting %d workers'), wrap.workers)

        while self.running and len(wrap.children) < wrap.workers:

            self._start_child(wrap)

**** CubicPower OpenStack Study ****

    def _wait_child(self):

        try:

            # Don't block if no child processes have exited

            pid, status = os.waitpid(0, os.WNOHANG)

            if not pid:

                return None

        except OSError as exc:

            if exc.errno not in (errno.EINTR, errno.ECHILD):

                raise

            return None

        if os.WIFSIGNALED(status):

            sig = os.WTERMSIG(status)

            LOG.info(_('Child %(pid)d killed by signal %(sig)d'),

                     dict(pid=pid, sig=sig))

        else:

            code = os.WEXITSTATUS(status)

            LOG.info(_('Child %(pid)s exited with status %(code)d'),

                     dict(pid=pid, code=code))

        if pid not in self.children:

            LOG.warning(_('pid %d not in child list'), pid)

            return None

        wrap = self.children.pop(pid)

        wrap.children.remove(pid)

        return wrap

**** CubicPower OpenStack Study ****

    def _respawn_children(self):

        while self.running:

            wrap = self._wait_child()

            if not wrap:

                # Yield to other threads if no children have exited

                # Sleep for a short time to avoid excessive CPU usage

                # (see bug #1095346)

                eventlet.greenthread.sleep(self.wait_interval)

                continue

            while self.running and len(wrap.children) < wrap.workers:

                self._start_child(wrap)

**** CubicPower OpenStack Study ****

    def wait(self):

        """Loop waiting on children to die and respawning as necessary."""

        LOG.debug(_('Full set of CONF:'))

        CONF.log_opt_values(LOG, std_logging.DEBUG)

        while True:

            self.handle_signal()

            self._respawn_children()

            if self.sigcaught:

                signame = _signo_to_signame(self.sigcaught)

                LOG.info(_('Caught %s, stopping children'), signame)

            if not _is_sighup_and_daemon(self.sigcaught):

                break

            for pid in self.children:

                os.kill(pid, signal.SIGHUP)

            self.running = True

            self.sigcaught = None

        for pid in self.children:

            try:

                os.kill(pid, signal.SIGTERM)

            except OSError as exc:

                if exc.errno != errno.ESRCH:

                    raise

        # Wait for children to die

        if self.children:

            LOG.info(_('Waiting on %d children to exit'), len(self.children))

            while self.children:

                self._wait_child()

**** CubicPower OpenStack Study ****

class Service(object):

"""Service object for binaries running on hosts."""

**** CubicPower OpenStack Study ****

    def __init__(self, threads=1000):

        self.tg = threadgroup.ThreadGroup(threads)

        # signal that the service is done shutting itself down:

        self._done = event.Event()

**** CubicPower OpenStack Study ****

    def reset(self):

        # NOTE(Fengqian): docs for Event.reset() recommend against using it

        self._done = event.Event()

**** CubicPower OpenStack Study ****

    def start(self):

        pass

**** CubicPower OpenStack Study ****

    def stop(self):

        self.tg.stop()

        self.tg.wait()

        # Signal that service cleanup is done:

        if not self._done.ready():

            self._done.send()

**** CubicPower OpenStack Study ****

    def wait(self):

        self._done.wait()

**** CubicPower OpenStack Study ****

class Services(object):

**** CubicPower OpenStack Study ****

    def __init__(self):

        self.services = []

        self.tg = threadgroup.ThreadGroup()

        self.done = event.Event()

**** CubicPower OpenStack Study ****

    def add(self, service):

        self.services.append(service)

        self.tg.add_thread(self.run_service, service, self.done)

**** CubicPower OpenStack Study ****

    def stop(self):

        # wait for graceful shutdown of services:

        for service in self.services:

            service.stop()

            service.wait()

        # Each service has performed cleanup, now signal that the run_service

        # wrapper threads can now die:

        if not self.done.ready():

            self.done.send()

        # reap threads:

        self.tg.stop()

**** CubicPower OpenStack Study ****

    def wait(self):

        self.tg.wait()

**** CubicPower OpenStack Study ****

    def restart(self):

        self.stop()

        self.done = event.Event()

        for restart_service in self.services:

            restart_service.reset()

            self.tg.add_thread(self.run_service, restart_service, self.done)

    @staticmethod

**** CubicPower OpenStack Study ****

    def run_service(service, done):

        """Service start wrapper.

        :param service: service to run

        :param done: event to wait on until a shutdown is triggered

        :returns: None

        """

        service.start()

        done.wait()

def launch(service, workers=None):

    if workers:

        launcher = ProcessLauncher()

        launcher.launch_service(service, workers=workers)

    else:

        launcher = ServiceLauncher()

        launcher.launch_service(service)

    return launcher

**** CubicPower OpenStack Study ****

def launch(service, workers=None):

    if workers:

        launcher = ProcessLauncher()

        launcher.launch_service(service, workers=workers)

    else:

        launcher = ServiceLauncher()

        launcher.launch_service(service)

    return launcher