

OpenStack Study: tasks.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Copyright 2013 VMware, Inc.

# All Rights Reserved


# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at


# http://www.apache.org/licenses/LICENSE-2.0


# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

import collections

import uuid

from eventlet import event

from eventlet import greenthread

from neutron.common import exceptions

from neutron.openstack.common import log as logging

from neutron.openstack.common import loopingcall

from neutron.plugins.vmware.vshield.tasks.constants import TaskState

from neutron.plugins.vmware.vshield.tasks.constants import TaskStatus


LOG = logging.getLogger(__name__)

**** CubicPower OpenStack Study ****

def nop(task):

    return TaskStatus.COMPLETED

**** CubicPower OpenStack Study ****

class TaskException(exceptions.NeutronException):

**** CubicPower OpenStack Study ****

    def __init__(self, message=None, **kwargs):

        if message is not None:

            self.message = message

        super(TaskException, self).__init__(**kwargs)

**** CubicPower OpenStack Study ****

class InvalidState(TaskException):

message = _("Invalid state %(state)d")

**** CubicPower OpenStack Study ****

class TaskStateSkipped(TaskException):

message = _("State %(state)d skipped. Current state %(current)d")

**** CubicPower OpenStack Study ****

class Task():

**** CubicPower OpenStack Study ****

    def __init__(self, name, resource_id, execute_callback,

                 status_callback=nop, result_callback=nop, userdata=None):

        self.name = name

        self.resource_id = resource_id

        self._execute_callback = execute_callback

        self._status_callback = status_callback

        self._result_callback = result_callback

        self.userdata = userdata

        self.id = None

        self.status = None

        self._monitors = {

            TaskState.START: [],

            TaskState.EXECUTED: [],

            TaskState.RESULT: []


        self._states = [None, None, None, None]

        self._state = TaskState.NONE

**** CubicPower OpenStack Study ****

    def _add_monitor(self, action, func):


        return self

**** CubicPower OpenStack Study ****

    def _move_state(self, state):

        self._state = state

        if self._states[state] is not None:

            e = self._states[state]

            self._states[state] = None


        for s in range(state):

            if self._states[s] is not None:

                e = self._states[s]

                self._states[s] = None


                    TaskStateSkipped(state=s, current=self._state))

**** CubicPower OpenStack Study ****

    def _invoke_monitor(self, state):

        for func in self._monitors[state]:



            except Exception:

                msg = _("Task %(task)s encountered exception in %(func)s "

                        "at state %(state)s") % {

                            'task': str(self),

                            'func': str(func),

                            'state': state}



        return self

**** CubicPower OpenStack Study ****

    def _start(self):

        return self._invoke_monitor(TaskState.START)

**** CubicPower OpenStack Study ****

    def _executed(self):

        return self._invoke_monitor(TaskState.EXECUTED)

**** CubicPower OpenStack Study ****

    def _update_status(self, status):

        if self.status == status:

            return self

        self.status = status

**** CubicPower OpenStack Study ****

    def _finished(self):

        return self._invoke_monitor(TaskState.RESULT)

**** CubicPower OpenStack Study ****

    def add_start_monitor(self, func):

        return self._add_monitor(TaskState.START, func)

**** CubicPower OpenStack Study ****

    def add_executed_monitor(self, func):

        return self._add_monitor(TaskState.EXECUTED, func)

**** CubicPower OpenStack Study ****

    def add_result_monitor(self, func):

        return self._add_monitor(TaskState.RESULT, func)

**** CubicPower OpenStack Study ****

    def wait(self, state):

        if (state < TaskState.START or

            state > TaskState.RESULT or

            state == TaskState.STATUS):

            raise InvalidState(state=state)

        if state <= self._state:

            # we already passed this current state, so no wait


        e = event.Event()

        self._states[state] = e


**** CubicPower OpenStack Study ****

    def __repr__(self):

        return "Task-%s-%s-%s" % (

            self.name, self.resource_id, self.id)

**** CubicPower OpenStack Study ****

class TaskManager():

_instance = None


**** CubicPower OpenStack Study ****

    def __init__(self, interval=None):

        self._interval = interval or TaskManager._default_interval

        # A queue to pass tasks from other threads

        self._tasks_queue = collections.deque()

        # A dict to store resource -> resource's tasks

        self._tasks = {}

        # Current task being executed in main thread

        self._main_thread_exec_task = None

        # New request event

        self._req = event.Event()

        # TaskHandler stopped event

        self._stopped = False

        # Periodic function trigger

        self._monitor = None

        self._monitor_busy = False

        # Thread handling the task request

        self._thread = None

**** CubicPower OpenStack Study ****

    def _execute(self, task):

        """Execute task."""

        msg = _("Start task %s") % str(task)




            status = task._execute_callback(task)

        except Exception:

            msg = _("Task %(task)s encountered exception in %(cb)s") % {

                'task': str(task),

                'cb': str(task._execute_callback)}


            status = TaskStatus.ERROR

        LOG.debug(_("Task %(task)s return %(status)s"), {

            'task': str(task),

            'status': status})



        return status

**** CubicPower OpenStack Study ****

    def _result(self, task):

        """Notify task execution result."""



        except Exception:

            msg = _("Task %(task)s encountered exception in %(cb)s") % {

                'task': str(task),

                'cb': str(task._result_callback)}


        LOG.debug(_("Task %(task)s return %(status)s"),

                  {'task': str(task), 'status': task.status})


**** CubicPower OpenStack Study ****

    def _check_pending_tasks(self):

        """Check all pending tasks status."""

        for resource_id in self._tasks.keys():

            if self._stopped:

                # Task manager is stopped, return now


            tasks = self._tasks[resource_id]

            # only the first task is executed and pending

            task = tasks[0]


                status = task._status_callback(task)

            except Exception:

                msg = _("Task %(task)s encountered exception in %(cb)s") % {

                    'task': str(task),

                    'cb': str(task._status_callback)}


                status = TaskStatus.ERROR


            if status != TaskStatus.PENDING:

                self._dequeue(task, True)

**** CubicPower OpenStack Study ****

    def _enqueue(self, task):

        if task.resource_id in self._tasks:

            # append to existing resource queue for ordered processing



            # put the task to a new resource queue

            tasks = collections.deque()


            self._tasks[task.resource_id] = tasks

**** CubicPower OpenStack Study ****

    def _dequeue(self, task, run_next):


        tasks = self._tasks[task.resource_id]


        if not tasks:

            # no more tasks for this resource

            del self._tasks[task.resource_id]


        if run_next:

            # process next task for this resource

            while tasks:

                task = tasks[0]

                status = self._execute(task)

                if status == TaskStatus.PENDING:


                self._dequeue(task, False)

**** CubicPower OpenStack Study ****

    def _abort(self):

        """Abort all tasks."""

        # put all tasks haven't been received by main thread to queue

        # so the following abort handling can cover them

        for t in self._tasks_queue:



        for resource_id in self._tasks.keys():

            tasks = list(self._tasks[resource_id])

            for task in tasks:


                self._dequeue(task, False)

**** CubicPower OpenStack Study ****

    def _get_task(self):

        """Get task request."""

        while True:

            for t in self._tasks_queue:

                return self._tasks_queue.popleft()



**** CubicPower OpenStack Study ****

    def run(self):

        while True:


                if self._stopped:

                    # Gracefully terminate this thread if the _stopped

                    # attribute was set to true

                    LOG.info(_("Stopping TaskManager"))


                # get a task from queue, or timeout for periodic status check

                task = self._get_task()

                if task.resource_id in self._tasks:

                    # this resource already has some tasks under processing,

                    # append the task to same queue for ordered processing




                    self._main_thread_exec_task = task



                    self._main_thread_exec_task = None

                    if task.status is None:

                        # The thread is killed during _execute(). To guarantee

                        # the task been aborted correctly, put it to the queue.


                    elif task.status != TaskStatus.PENDING:




            except Exception:

                LOG.exception(_("TaskManager terminating because "

                                "of an exception"))


**** CubicPower OpenStack Study ****

    def add(self, task):

        task.id = uuid.uuid1()


        if not self._req.ready():


        return task.id

**** CubicPower OpenStack Study ****

    def stop(self):

        if self._thread is None:


        self._stopped = True


        self._thread = None

        # Stop looping call and abort running tasks


        if self._monitor_busy:



        LOG.info(_("TaskManager terminated"))

**** CubicPower OpenStack Study ****

    def has_pending_task(self):

        if self._tasks_queue or self._tasks or self._main_thread_exec_task:

            return True


            return False

**** CubicPower OpenStack Study ****

    def show_pending_tasks(self):

        for task in self._tasks_queue:


        for resource, tasks in self._tasks.iteritems():

            for task in tasks:


        if self._main_thread_exec_task:


**** CubicPower OpenStack Study ****

    def count(self):

        count = 0

        for resource_id, tasks in self._tasks.iteritems():

            count += len(tasks)

        return count

**** CubicPower OpenStack Study ****

    def start(self, interval=None):

        def _inner():


        def _loopingcall_callback():

            self._monitor_busy = True



            except Exception:

                LOG.exception(_("Exception in _check_pending_tasks"))

            self._monitor_busy = False

        if self._thread is not None:

            return self

        if interval is None or interval == 0:

            interval = self._interval

        self._stopped = False

        self._thread = greenthread.spawn(_inner)

        self._monitor = loopingcall.FixedIntervalLoopingCall(


        self._monitor.start(interval / 1000.0,

                            interval / 1000.0)

        # To allow the created thread start running


        return self


**** CubicPower OpenStack Study ****

        def _inner():


**** CubicPower OpenStack Study ****

        def _loopingcall_callback():

            self._monitor_busy = True



            except Exception:

                LOG.exception(_("Exception in _check_pending_tasks"))

            self._monitor_busy = False

        if self._thread is not None:

            return self

        if interval is None or interval == 0:

            interval = self._interval

        self._stopped = False

        self._thread = greenthread.spawn(_inner)

        self._monitor = loopingcall.FixedIntervalLoopingCall(


        self._monitor.start(interval / 1000.0,

                            interval / 1000.0)

        # To allow the created thread start running


        return self


**** CubicPower OpenStack Study ****

    def set_default_interval(cls, interval):

        cls._default_interval = interval