¡@

Home 

OpenStack Study: test_brick_linuxfc.py

OpenStack Index

**** CubicPower OpenStack Study ****

# (c) Copyright 2013 Hewlett-Packard Development Company, L.P.

#

# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

import os.path

import string

from cinder.brick.initiator import linuxfc

from cinder.openstack.common import log as logging

from cinder import test

LOG = logging.getLogger(__name__)

**** CubicPower OpenStack Study ****

class LinuxFCTestCase(test.TestCase):

**** CubicPower OpenStack Study ****

    def setUp(self):

        super(LinuxFCTestCase, self).setUp()

        self.cmds = []

        self.stubs.Set(os.path, 'exists', lambda x: True)

        self.lfc = linuxfc.LinuxFibreChannel(None, execute=self.fake_execute)

**** CubicPower OpenStack Study ****

    def fake_execute(self, *cmd, **kwargs):

        self.cmds.append(string.join(cmd))

        return "", None

**** CubicPower OpenStack Study ****

    def test_rescan_hosts(self):

        hbas = [{'host_device': 'foo'},

                {'host_device': 'bar'}, ]

        self.lfc.rescan_hosts(hbas)

        expected_commands = ['tee -a /sys/class/scsi_host/foo/scan',

                             'tee -a /sys/class/scsi_host/bar/scan']

        self.assertEqual(expected_commands, self.cmds)

**** CubicPower OpenStack Study ****

    def test_get_fc_hbas_fail(self):

        def fake_exec1(a, b, c, d, run_as_root=True, root_helper='sudo'):

            raise OSError

        def fake_exec2(a, b, c, d, run_as_root=True, root_helper='sudo'):

            return None, 'None found'

        self.stubs.Set(self.lfc, "_execute", fake_exec1)

        hbas = self.lfc.get_fc_hbas()

        self.assertEqual(0, len(hbas))

        self.stubs.Set(self.lfc, "_execute", fake_exec2)

        hbas = self.lfc.get_fc_hbas()

        self.assertEqual(0, len(hbas))

**** CubicPower OpenStack Study ****

        def fake_exec1(a, b, c, d, run_as_root=True, root_helper='sudo'):

            raise OSError

**** CubicPower OpenStack Study ****

        def fake_exec2(a, b, c, d, run_as_root=True, root_helper='sudo'):

            return None, 'None found'

        self.stubs.Set(self.lfc, "_execute", fake_exec1)

        hbas = self.lfc.get_fc_hbas()

        self.assertEqual(0, len(hbas))

        self.stubs.Set(self.lfc, "_execute", fake_exec2)

        hbas = self.lfc.get_fc_hbas()

        self.assertEqual(0, len(hbas))

**** CubicPower OpenStack Study ****

    def test_get_fc_hbas(self):

        def fake_exec(a, b, c, d, run_as_root=True, root_helper='sudo'):

            return SYSTOOL_FC, None

        self.stubs.Set(self.lfc, "_execute", fake_exec)

        hbas = self.lfc.get_fc_hbas()

        self.assertEqual(2, len(hbas))

        hba1 = hbas[0]

        self.assertEqual(hba1["ClassDevice"], "host0")

        hba2 = hbas[1]

        self.assertEqual(hba2["ClassDevice"], "host2")

**** CubicPower OpenStack Study ****

        def fake_exec(a, b, c, d, run_as_root=True, root_helper='sudo'):

            return SYSTOOL_FC, None

        self.stubs.Set(self.lfc, "_execute", fake_exec)

        hbas = self.lfc.get_fc_hbas()

        self.assertEqual(2, len(hbas))

        hba1 = hbas[0]

        self.assertEqual(hba1["ClassDevice"], "host0")

        hba2 = hbas[1]

        self.assertEqual(hba2["ClassDevice"], "host2")

**** CubicPower OpenStack Study ****

    def test_get_fc_hbas_info(self):

        def fake_exec(a, b, c, d, run_as_root=True, root_helper='sudo'):

            return SYSTOOL_FC, None

        self.stubs.Set(self.lfc, "_execute", fake_exec)

        hbas_info = self.lfc.get_fc_hbas_info()

        expected_info = [{'device_path': '/sys/devices/pci0000:20/'

                                         '0000:20:03.0/0000:21:00.0/'

                                         'host0/fc_host/host0',

                          'host_device': 'host0',

                          'node_name': '50014380242b9751',

                          'port_name': '50014380242b9750'},

                         {'device_path': '/sys/devices/pci0000:20/'

                                         '0000:20:03.0/0000:21:00.1/'

                                         'host2/fc_host/host2',

                          'host_device': 'host2',

                          'node_name': '50014380242b9753',

                          'port_name': '50014380242b9752'}, ]

        self.assertEqual(expected_info, hbas_info)

**** CubicPower OpenStack Study ****

        def fake_exec(a, b, c, d, run_as_root=True, root_helper='sudo'):

            return SYSTOOL_FC, None

        self.stubs.Set(self.lfc, "_execute", fake_exec)

        hbas_info = self.lfc.get_fc_hbas_info()

        expected_info = [{'device_path': '/sys/devices/pci0000:20/'

                                         '0000:20:03.0/0000:21:00.0/'

                                         'host0/fc_host/host0',

                          'host_device': 'host0',

                          'node_name': '50014380242b9751',

                          'port_name': '50014380242b9750'},

                         {'device_path': '/sys/devices/pci0000:20/'

                                         '0000:20:03.0/0000:21:00.1/'

                                         'host2/fc_host/host2',

                          'host_device': 'host2',

                          'node_name': '50014380242b9753',

                          'port_name': '50014380242b9752'}, ]

        self.assertEqual(expected_info, hbas_info)

**** CubicPower OpenStack Study ****

    def test_get_fc_wwpns(self):

        def fake_exec(a, b, c, d, run_as_root=True, root_helper='sudo'):

            return SYSTOOL_FC, None

        self.stubs.Set(self.lfc, "_execute", fake_exec)

        wwpns = self.lfc.get_fc_wwpns()

        expected_wwpns = ['50014380242b9750', '50014380242b9752']

        self.assertEqual(expected_wwpns, wwpns)

**** CubicPower OpenStack Study ****

        def fake_exec(a, b, c, d, run_as_root=True, root_helper='sudo'):

            return SYSTOOL_FC, None

        self.stubs.Set(self.lfc, "_execute", fake_exec)

        wwpns = self.lfc.get_fc_wwpns()

        expected_wwpns = ['50014380242b9750', '50014380242b9752']

        self.assertEqual(expected_wwpns, wwpns)

**** CubicPower OpenStack Study ****

    def test_get_fc_wwnns(self):

        def fake_exec(a, b, c, d, run_as_root=True, root_helper='sudo'):

            return SYSTOOL_FC, None

        self.stubs.Set(self.lfc, "_execute", fake_exec)

        wwnns = self.lfc.get_fc_wwpns()

        expected_wwnns = ['50014380242b9750', '50014380242b9752']

        self.assertEqual(expected_wwnns, wwnns)

SYSTOOL_FC = """

Class = "fc_host"

  Class Device = "host0"

  Class Device path = "/sys/devices/pci0000:20/0000:20:03.0/\

0000:21:00.0/host0/fc_host/host0"

    dev_loss_tmo        = "16"

    fabric_name         = "0x100000051ea338b9"

    issue_lip           = 

    max_npiv_vports     = "0"

    node_name           = "0x50014380242b9751"

    npiv_vports_inuse   = "0"

    port_id             = "0x960d0d"

    port_name           = "0x50014380242b9750"

    port_state          = "Online"

    port_type           = "NPort (fabric via point-to-point)"

    speed               = "8 Gbit"

    supported_classes   = "Class 3"

    supported_speeds    = "1 Gbit, 2 Gbit, 4 Gbit, 8 Gbit"

    symbolic_name       = "QMH2572 FW:v4.04.04 DVR:v8.03.07.12-k"

    system_hostname     = ""

    tgtid_bind_type     = "wwpn (World Wide Port Name)"

    uevent              =

    vport_create        = 

    vport_delete        = 

    Device = "host0"

    Device path = "/sys/devices/pci0000:20/0000:20:03.0/0000:21:00.0/host0"

      edc                 = 

      optrom_ctl          = 

      reset               = 

      uevent              = "DEVTYPE=scsi_host"

  Class Device = "host2"

  Class Device path = "/sys/devices/pci0000:20/0000:20:03.0/\

0000:21:00.1/host2/fc_host/host2"

    dev_loss_tmo        = "16"

    fabric_name         = "0x100000051ea33b79"

    issue_lip           = 

    max_npiv_vports     = "0"

    node_name           = "0x50014380242b9753"

    npiv_vports_inuse   = "0"

    port_id             = "0x970e09"

    port_name           = "0x50014380242b9752"

    port_state          = "Online"

    port_type           = "NPort (fabric via point-to-point)"

    speed               = "8 Gbit"

    supported_classes   = "Class 3"

    supported_speeds    = "1 Gbit, 2 Gbit, 4 Gbit, 8 Gbit"

    symbolic_name       = "QMH2572 FW:v4.04.04 DVR:v8.03.07.12-k"

    system_hostname     = ""

    tgtid_bind_type     = "wwpn (World Wide Port Name)"

    uevent              =

    vport_create        = 

    vport_delete        = 

    Device = "host2"

    Device path = "/sys/devices/pci0000:20/0000:20:03.0/0000:21:00.1/host2"

      edc                 = 

      optrom_ctl          = 

      reset               = 

      uevent              = "DEVTYPE=scsi_host"

"""

**** CubicPower OpenStack Study ****

        def fake_exec(a, b, c, d, run_as_root=True, root_helper='sudo'):

            return SYSTOOL_FC, None

        self.stubs.Set(self.lfc, "_execute", fake_exec)

        wwnns = self.lfc.get_fc_wwpns()

        expected_wwnns = ['50014380242b9750', '50014380242b9752']

        self.assertEqual(expected_wwnns, wwnns)

SYSTOOL_FC = """

Class = "fc_host"

  Class Device = "host0"

  Class Device path = "/sys/devices/pci0000:20/0000:20:03.0/\

0000:21:00.0/host0/fc_host/host0"

    dev_loss_tmo        = "16"

    fabric_name         = "0x100000051ea338b9"

    issue_lip           = 

    max_npiv_vports     = "0"

    node_name           = "0x50014380242b9751"

    npiv_vports_inuse   = "0"

    port_id             = "0x960d0d"

    port_name           = "0x50014380242b9750"

    port_state          = "Online"

    port_type           = "NPort (fabric via point-to-point)"

    speed               = "8 Gbit"

    supported_classes   = "Class 3"

    supported_speeds    = "1 Gbit, 2 Gbit, 4 Gbit, 8 Gbit"

    symbolic_name       = "QMH2572 FW:v4.04.04 DVR:v8.03.07.12-k"

    system_hostname     = ""

    tgtid_bind_type     = "wwpn (World Wide Port Name)"

    uevent              =

    vport_create        = 

    vport_delete        = 

    Device = "host0"

    Device path = "/sys/devices/pci0000:20/0000:20:03.0/0000:21:00.0/host0"

      edc                 = 

      optrom_ctl          = 

      reset               = 

      uevent              = "DEVTYPE=scsi_host"

  Class Device = "host2"

  Class Device path = "/sys/devices/pci0000:20/0000:20:03.0/\

0000:21:00.1/host2/fc_host/host2"

    dev_loss_tmo        = "16"

    fabric_name         = "0x100000051ea33b79"

    issue_lip           = 

    max_npiv_vports     = "0"

    node_name           = "0x50014380242b9753"

    npiv_vports_inuse   = "0"

    port_id             = "0x970e09"

    port_name           = "0x50014380242b9752"

    port_state          = "Online"

    port_type           = "NPort (fabric via point-to-point)"

    speed               = "8 Gbit"

    supported_classes   = "Class 3"

    supported_speeds    = "1 Gbit, 2 Gbit, 4 Gbit, 8 Gbit"

    symbolic_name       = "QMH2572 FW:v4.04.04 DVR:v8.03.07.12-k"

    system_hostname     = ""

    tgtid_bind_type     = "wwpn (World Wide Port Name)"

    uevent              =

    vport_create        = 

    vport_delete        = 

    Device = "host2"

    Device path = "/sys/devices/pci0000:20/0000:20:03.0/0000:21:00.1/host2"

      edc                 = 

      optrom_ctl          = 

      reset               = 

      uevent              = "DEVTYPE=scsi_host"

"""