**** CubicPower OpenStack Study ****
# Copyright 2011 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os.path
import shutil
import string
import tempfile
from cinder.brick.iscsi import iscsi
from cinder import test
from cinder.volume import driver
**** CubicPower OpenStack Study ****
class TargetAdminTestCase(object):
**** CubicPower OpenStack Study ****
def setUp(self):
self.cmds = []
self.tid = 1
self.target_name = 'iqn.2011-09.org.foo.bar:volume-blaa'
self.lun = 10
self.path = '/foo'
self.vol_id = 'blaa'
self.vol_name = 'volume-blaa'
self.db = {}
self.script_template = None
self.stubs.Set(os.path, 'isfile', lambda _: True)
self.stubs.Set(os, 'unlink', lambda _: '')
self.stubs.Set(iscsi.TgtAdm, '_get_target', self.fake_get_target)
self.stubs.Set(iscsi.LioAdm, '_get_target', self.fake_get_target)
self.stubs.Set(iscsi.LioAdm,
'_verify_rtstool',
self.fake_verify_rtstool)
self.driver = driver.ISCSIDriver()
self.stubs.Set(iscsi.TgtAdm, '_verify_backing_lun',
self.fake_verify_backing_lun)
self.driver = driver.ISCSIDriver()
self.flags(iscsi_target_prefix='iqn.2011-09.org.foo.bar:')
**** CubicPower OpenStack Study ****
def fake_verify_backing_lun(obj, iqn, tid):
return True
**** CubicPower OpenStack Study ****
def fake_verify_rtstool(obj):
pass
**** CubicPower OpenStack Study ****
def fake_get_target(obj, iqn):
return 1
**** CubicPower OpenStack Study ****
def get_script_params(self):
return {'tid': self.tid,
'target_name': self.target_name,
'lun': self.lun,
'path': self.path}
**** CubicPower OpenStack Study ****
def get_script(self):
return self.script_template % self.get_script_params()
**** CubicPower OpenStack Study ****
def fake_execute(self, *cmd, **kwargs):
self.cmds.append(string.join(cmd))
return "", None
**** CubicPower OpenStack Study ****
def clear_cmds(self):
self.cmds = []
**** CubicPower OpenStack Study ****
def verify_cmds(self, cmds):
self.assertEqual(len(cmds), len(self.cmds))
for cmd in self.cmds:
self.assertTrue(cmd in cmds)
**** CubicPower OpenStack Study ****
def verify(self):
script = self.get_script()
cmds = []
for line in script.split('\n'):
if not line.strip():
continue
cmds.append(line)
self.verify_cmds(cmds)
**** CubicPower OpenStack Study ****
def run_commands(self):
target_helper = self.driver.get_target_helper(self.db)
target_helper.set_execute(self.fake_execute)
target_helper.create_iscsi_target(self.target_name, self.tid,
self.lun, self.path)
target_helper.show_target(self.tid, iqn=self.target_name)
target_helper.remove_iscsi_target(self.tid, self.lun, self.vol_id,
self.vol_name)
**** CubicPower OpenStack Study ****
def test_target_admin(self):
self.clear_cmds()
self.run_commands()
self.verify()
**** CubicPower OpenStack Study ****
class TgtAdmTestCase(test.TestCase, TargetAdminTestCase):
**** CubicPower OpenStack Study ****
def setUp(self):
super(TgtAdmTestCase, self).setUp()
TargetAdminTestCase.setUp(self)
self.persist_tempdir = tempfile.mkdtemp()
self.flags(iscsi_helper='tgtadm')
self.flags(volumes_dir=self.persist_tempdir)
self.script_template = "\n".join([
'tgt-admin --update %(target_name)s',
'tgt-admin --delete %(target_name)s',
'tgt-admin --force '
'--delete %(target_name)s',
'tgtadm --lld iscsi --op show --mode target'])
**** CubicPower OpenStack Study ****
def tearDown(self):
try:
shutil.rmtree(self.persist_tempdir)
except OSError:
pass
super(TgtAdmTestCase, self).tearDown()
**** CubicPower OpenStack Study ****
class IetAdmTestCase(test.TestCase, TargetAdminTestCase):
**** CubicPower OpenStack Study ****
def setUp(self):
super(IetAdmTestCase, self).setUp()
TargetAdminTestCase.setUp(self)
self.flags(iscsi_helper='ietadm')
self.script_template = "\n".join([
'ietadm --op new --tid=%(tid)s --params Name=%(target_name)s',
'ietadm --op new --tid=%(tid)s --lun=%(lun)s '
'--params Path=%(path)s,Type=fileio',
'ietadm --op show --tid=%(tid)s',
'ietadm --op delete --tid=%(tid)s --lun=%(lun)s',
'ietadm --op delete --tid=%(tid)s'])
**** CubicPower OpenStack Study ****
class IetAdmBlockIOTestCase(test.TestCase, TargetAdminTestCase):
**** CubicPower OpenStack Study ****
def setUp(self):
super(IetAdmBlockIOTestCase, self).setUp()
TargetAdminTestCase.setUp(self)
self.flags(iscsi_helper='ietadm')
self.flags(iscsi_iotype='blockio')
self.script_template = "\n".join([
'ietadm --op new --tid=%(tid)s --params Name=%(target_name)s',
'ietadm --op new --tid=%(tid)s --lun=%(lun)s '
'--params Path=%(path)s,Type=blockio',
'ietadm --op show --tid=%(tid)s',
'ietadm --op delete --tid=%(tid)s --lun=%(lun)s',
'ietadm --op delete --tid=%(tid)s'])
**** CubicPower OpenStack Study ****
class IetAdmFileIOTestCase(test.TestCase, TargetAdminTestCase):
**** CubicPower OpenStack Study ****
def setUp(self):
super(IetAdmFileIOTestCase, self).setUp()
TargetAdminTestCase.setUp(self)
self.flags(iscsi_helper='ietadm')
self.flags(iscsi_iotype='fileio')
self.script_template = "\n".join([
'ietadm --op new --tid=%(tid)s --params Name=%(target_name)s',
'ietadm --op new --tid=%(tid)s --lun=%(lun)s '
'--params Path=%(path)s,Type=fileio',
'ietadm --op show --tid=%(tid)s',
'ietadm --op delete --tid=%(tid)s --lun=%(lun)s',
'ietadm --op delete --tid=%(tid)s'])
**** CubicPower OpenStack Study ****
class IetAdmAutoIOTestCase(test.TestCase, TargetAdminTestCase):
**** CubicPower OpenStack Study ****
def setUp(self):
super(IetAdmAutoIOTestCase, self).setUp()
TargetAdminTestCase.setUp(self)
self.stubs.Set(iscsi.IetAdm, '_is_block', lambda a, b: True)
self.flags(iscsi_helper='ietadm')
self.flags(iscsi_iotype='auto')
self.script_template = "\n".join([
'ietadm --op new --tid=%(tid)s --params Name=%(target_name)s',
'ietadm --op new --tid=%(tid)s --lun=%(lun)s '
'--params Path=%(path)s,Type=blockio',
'ietadm --op show --tid=%(tid)s',
'ietadm --op delete --tid=%(tid)s --lun=%(lun)s',
'ietadm --op delete --tid=%(tid)s'])
**** CubicPower OpenStack Study ****
class LioAdmTestCase(test.TestCase, TargetAdminTestCase):
**** CubicPower OpenStack Study ****
def setUp(self):
super(LioAdmTestCase, self).setUp()
TargetAdminTestCase.setUp(self)
self.persist_tempdir = tempfile.mkdtemp()
self.flags(iscsi_helper='lioadm')
self.script_template = "\n".join([
'cinder-rtstool create '
'%(path)s %(target_name)s test_id test_pass',
'cinder-rtstool delete %(target_name)s'])
**** CubicPower OpenStack Study ****
class ISERTgtAdmTestCase(TgtAdmTestCase):
**** CubicPower OpenStack Study ****
def setUp(self):
super(ISERTgtAdmTestCase, self).setUp()
self.flags(iscsi_helper='iseradm')