**** CubicPower OpenStack Study ****
# Copyright (c) 2012 NetApp, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests for NetApp volume driver
"""
import BaseHTTPServer
import httplib
from lxml import etree
import six
from cinder import exception
from cinder.openstack.common import log as logging
from cinder import test
from cinder.volume import configuration as conf
from cinder.volume.drivers.netapp.api import NaElement
from cinder.volume.drivers.netapp.api import NaServer
from cinder.volume.drivers.netapp import common
from cinder.volume.drivers.netapp.options import netapp_7mode_opts
from cinder.volume.drivers.netapp.options import netapp_basicauth_opts
from cinder.volume.drivers.netapp.options import netapp_cluster_opts
from cinder.volume.drivers.netapp.options import netapp_connection_opts
from cinder.volume.drivers.netapp.options import netapp_provisioning_opts
from cinder.volume.drivers.netapp.options import netapp_transport_opts
from cinder.volume.drivers.netapp import ssc_utils
LOG = logging.getLogger("cinder.volume.driver")
**** CubicPower OpenStack Study ****
def create_configuration():
configuration = conf.Configuration(None)
configuration.append_config_values(netapp_connection_opts)
configuration.append_config_values(netapp_transport_opts)
configuration.append_config_values(netapp_basicauth_opts)
configuration.append_config_values(netapp_cluster_opts)
configuration.append_config_values(netapp_7mode_opts)
configuration.append_config_values(netapp_provisioning_opts)
return configuration
**** CubicPower OpenStack Study ****
class FakeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
"""HTTP handler that doesn't spam the log."""
**** CubicPower OpenStack Study ****
def log_message(self, format, *args):
pass
**** CubicPower OpenStack Study ****
class FakeHttplibSocket(object):
"""A fake socket implementation for httplib.HTTPResponse."""
**** CubicPower OpenStack Study ****
def __init__(self, value):
self._rbuffer = six.StringIO(value)
self._wbuffer = six.StringIO('')
oldclose = self._wbuffer.close
def newclose():
self.result = self._wbuffer.getvalue()
oldclose()
self._wbuffer.close = newclose
**** CubicPower OpenStack Study ****
def newclose():
self.result = self._wbuffer.getvalue()
oldclose()
self._wbuffer.close = newclose
**** CubicPower OpenStack Study ****
def makefile(self, mode, _other):
"""Returns the socket's internal buffer"""
if mode == 'r' or mode == 'rb':
return self._rbuffer
if mode == 'w' or mode == 'wb':
return self._wbuffer
RESPONSE_PREFIX_DIRECT_CMODE = """
"""
RESPONSE_PREFIX_DIRECT_7MODE = """
"""
RESPONSE_PREFIX_DIRECT = """
"""RESPONSE_SUFFIX_DIRECT = """
"""
**** CubicPower OpenStack Study ****
class FakeDirectCMODEServerHandler(FakeHTTPRequestHandler):
"""HTTP handler that fakes enough stuff to allow the driver to run."""
**** CubicPower OpenStack Study ****
def do_GET(s):
"""Respond to a GET request."""
if '/servlets/netapp.servlets.admin.XMLrequest_filer' not in s.path:
s.send_response(404)
s.end_headers
return
s.send_response(200)
s.send_header("Content-Type", "text/xml; charset=utf-8")
s.end_headers()
out = s.wfile
out.write('' ' ' status="failed" errno="Not_Allowed"/>
')
**** CubicPower OpenStack Study ****
def do_POST(s):
"""Respond to a POST request."""
if '/servlets/netapp.servlets.admin.XMLrequest_filer' not in s.path:
s.send_response(404)
s.end_headers
return
request_xml = s.rfile.read(int(s.headers['Content-Length']))
root = etree.fromstring(request_xml)
body = [x for x in root.iterchildren()]
request = body[0]
tag = request.tag
api = etree.QName(tag).localname or tag
if 'lun-get-iter' == api:
tag = \
FakeDirectCMODEServerHandler._get_child_by_name(request, 'tag')
if tag is None:
body = """ indeterminate
512
1354536362
false
true
falselinux
true/vol/navneet/lun1
0 false
2FfGI$APyN68
none20971520
0false
0
cec1f3d7-3d41-11e2-9cf4-123478563412
navneetben_vserver
<lun-get-iter-key-td> <key-0>ben_vserver</key-0>
<key-1>/vol/navneet/lun2</key-1>
<key-2>navneet</key-2>
<key-3></key-3>
<key-4>lun2</key-4>
</lun-get-iter-key-td>
1
"""
else:
body = """ indeterminate
512
1354536362
false
true
falselinux
true/vol/navneet/lun3
0 false
2FfGI$APyN68
none20971520
0false
0
cec1f3d7-3d41-11e2-9cf4-123478563412
navneetben_vserver
1
"""
elif 'volume-get-iter' == api:
tag = \
FakeDirectCMODEServerHandler._get_child_by_name(request, 'tag')
if tag is None:
body = """ iscsi Openstack
214748364
true
falseonline
nfsvol openstack
247483648
true
falseonline
<volume-get-iter-key-td> <key-0>openstack</key-0>
<key-1>nfsvol</key-1>
</volume-get-iter-key-td>
2
"""
else:
body = """ iscsi Openstack
4147483648
true
falseonline
nfsvol openstack
8147483648
true
falseonline
2
"""
elif 'lun-create-by-size' == api:
body = """ 22020096
"""
elif 'lun-destroy' == api:
body = """"""
elif 'igroup-get-iter' == api:
init_found = True
query = FakeDirectCMODEServerHandler._get_child_by_name(request,
'query')
if query is not None:
igroup_info = FakeDirectCMODEServerHandler._get_child_by_name(
query, 'initiator-group-info')
if igroup_info is not None:
inits = FakeDirectCMODEServerHandler._get_child_by_name(
igroup_info, 'initiators')
if inits is not None:
init_info = \
FakeDirectCMODEServerHandler._get_child_by_name(
inits, 'initiator-info')
init_name = \
FakeDirectCMODEServerHandler._get_child_content(
init_info,
'initiator-name')
if init_name == 'iqn.1993-08.org.debian:01:10':
init_found = True
else:
init_found = False
if init_found:
tag = \
FakeDirectCMODEServerHandler._get_child_by_name(
request, 'tag')
if tag is None:
body = """ openstack-01f5297b-00f7-4170-bf30-69b1314b2118
windows
iscsi
iqn.1993-08.org.debian:01:10
openstack
<igroup-get-iter-key-td>
<key-0>openstack</key-0>
<key-1>
openstack-01f5297b-00f7-4170-bf30-69b1314b2118<
/key-1>
</igroup-get-iter-key-td>
1"""
else:
body = """ openstack-01f5297b-00f7-4170-bf30-69b1314b2118
linux
iscsi
iqn.1993-08.org.debian:01:10
openstack
1"""
else:
body = """ 0
"""
elif 'lun-map-get-iter' == api:
tag = \
FakeDirectCMODEServerHandler._get_child_by_name(request, 'tag')
if tag is None:
body = """ openstack-44c5e7e1-3306-4800-9623-259e57d56a83
948ae304-06e9-11e2
0
5587e563-06e9-11e2-9cf4-123478563412
/vol/openvol/lun1
openstack
<lun-map-get-iter-key-td>
<key-0>openstack</key-0>
<key-1>openstack-01f5297b-00f7-4170-bf30-69b1314b2118<
/key-1>
</lun-map-get-iter-key-td>
1
"""
else:
body = """ openstack-44c5e7e1-3306-4800-9623-259e57d56a83
948ae304-06e9-11e2
0
5587e563-06e9-11e2-9cf4-123478563412
/vol/openvol/lun1
openstack
1
"""
elif 'lun-map' == api:
body = """1
"""
elif 'lun-get-geometry' == api:
body = """256
512 3221225472
512
2147483648 256
"""
elif 'iscsi-service-get-iter' == api:
body = """ openstack
true
iqn.1992-08.com.netapp:sn.fa9:vs.105
openstack
1"""
elif 'iscsi-interface-get-iter' == api:
body = """ fas3170rre-cmode-01
e1b-1165 iscsi_data_if
10.63.165.216
3260true
5
iscsi_data_if
1038 openstack
1
"""
elif 'igroup-create' == api:
body = """"""
elif 'igroup-add' == api:
body = """"""
elif 'clone-create' == api:
body = """"""
elif 'lun-unmap' == api:
body = """"""
elif 'system-get-ontapi-version' == api:
body = """ 1
19
"""
elif 'vserver-get-iter' == api:
body = """ vserver
node
1
"""
elif 'ems-autosupport-log' == api:
body = """"""
elif 'lun-resize' == api:
body = """"""
elif 'lun-get-geometry' == api:
body = """ 1
2
8
2
4
5
"""
elif 'volume-options-list-info' == api:
body = """
"""
elif 'lun-move' == api:
body = """"""
else:
# Unknown API
s.send_response(500)
s.end_headers
return
s.send_response(200)
s.send_header("Content-Type", "text/xml; charset=utf-8")
s.end_headers()
s.wfile.write(RESPONSE_PREFIX_DIRECT_CMODE)
s.wfile.write(RESPONSE_PREFIX_DIRECT)
s.wfile.write(body)
s.wfile.write(RESPONSE_SUFFIX_DIRECT)
@staticmethod
**** CubicPower OpenStack Study ****
def _get_child_by_name(self, name):
for child in self.iterchildren():
if child.tag == name or etree.QName(child.tag).localname == name:
return child
return None
@staticmethod
**** CubicPower OpenStack Study ****
def _get_child_content(self, name):
"""Get the content of the child."""
for child in self.iterchildren():
if child.tag == name or etree.QName(child.tag).localname == name:
return child.text
return None
**** CubicPower OpenStack Study ****
class FakeDirectCmodeHTTPConnection(object):
"""A fake httplib.HTTPConnection for netapp tests
Requests made via this connection actually get translated and routed into
the fake direct handler above, we then turn the response into
the httplib.HTTPResponse that the caller expects.
"""
**** CubicPower OpenStack Study ****
def __init__(self, host, timeout=None):
self.host = host
**** CubicPower OpenStack Study ****
def request(self, method, path, data=None, headers=None):
if not headers:
headers = {}
req_str = '%s %s HTTP/1.1\r\n' % (method, path)
for key, value in headers.iteritems():
req_str += "%s: %s\r\n" % (key, value)
if data:
req_str += '\r\n%s' % data
# NOTE(vish): normally the http transport normalizes from unicode
sock = FakeHttplibSocket(req_str.decode("latin-1").encode("utf-8"))
# NOTE(vish): stop the server from trying to look up address from
# the fake socket
FakeDirectCMODEServerHandler.address_string = lambda x: '127.0.0.1'
self.app = FakeDirectCMODEServerHandler(sock, '127.0.0.1:80', None)
self.sock = FakeHttplibSocket(sock.result)
self.http_response = httplib.HTTPResponse(self.sock)
**** CubicPower OpenStack Study ****
def set_debuglevel(self, level):
pass
**** CubicPower OpenStack Study ****
def getresponse(self):
self.http_response.begin()
return self.http_response
**** CubicPower OpenStack Study ****
def getresponsebody(self):
return self.sock.result
**** CubicPower OpenStack Study ****
class NetAppDirectCmodeISCSIDriverTestCase(test.TestCase):
"""Test case for NetAppISCSIDriver"""
volume = {'name': 'lun1', 'size': 2, 'volume_name': 'lun1',
'os_type': 'linux', 'provider_location': 'lun1',
'id': 'lun1', 'provider_auth': None, 'project_id': 'project',
'display_name': None, 'display_description': 'lun1',
'volume_type_id': None}
snapshot = {'name': 'snapshot1', 'size': 2, 'volume_name': 'lun1',
'volume_size': 2, 'project_id': 'project',
'display_name': None, 'display_description': 'lun1',
'volume_type_id': None}
snapshot_fail = {'name': 'snapshot2', 'size': 2, 'volume_name': 'lun1',
'volume_size': 1, 'project_id': 'project'}
volume_sec = {'name': 'vol_snapshot', 'size': 2, 'volume_name': 'lun1',
'os_type': 'linux', 'provider_location': 'lun1',
'id': 'lun1', 'provider_auth': None, 'project_id': 'project',
'display_name': None, 'display_description': 'lun1',
'volume_type_id': None}
volume_clone = {'name': 'cl_sm', 'size': 3, 'volume_name': 'lun1',
'os_type': 'linux', 'provider_location': 'cl_sm',
'id': 'lun1', 'provider_auth': None,
'project_id': 'project', 'display_name': None,
'display_description': 'lun1',
'volume_type_id': None}
volume_clone_large = {'name': 'cl_lg', 'size': 6, 'volume_name': 'lun1',
'os_type': 'linux', 'provider_location': 'cl_lg',
'id': 'lun1', 'provider_auth': None,
'project_id': 'project', 'display_name': None,
'display_description': 'lun1',
'volume_type_id': None}
connector = {'initiator': 'iqn.1993-08.org.debian:01:10'}
vol_fail = {'name': 'lun_fail', 'size': 10000, 'volume_name': 'lun1',
'os_type': 'linux', 'provider_location': 'lun1',
'id': 'lun1', 'provider_auth': None, 'project_id': 'project',
'display_name': None, 'display_description': 'lun1',
'volume_type_id': None}
vol1 = ssc_utils.NetAppVolume('lun1', 'openstack')
vol1.state['vserver_root'] = False
vol1.state['status'] = 'online'
vol1.state['junction_active'] = True
vol1.space['size_avl_bytes'] = '4000000000'
vol1.space['size_total_bytes'] = '5000000000'
vol1.space['space-guarantee-enabled'] = False
vol1.space['space-guarantee'] = 'file'
vol1.space['thin_provisioned'] = True
vol1.mirror['mirrored'] = True
vol1.qos['qos_policy_group'] = None
vol1.aggr['name'] = 'aggr1'
vol1.aggr['junction'] = '/vola'
vol1.sis['dedup'] = True
vol1.sis['compression'] = True
vol1.aggr['raid_type'] = 'raiddp'
vol1.aggr['ha_policy'] = 'cfo'
vol1.aggr['disk_type'] = 'SSD'
ssc_map = {'mirrored': set([vol1]), 'dedup': set([vol1]),
'compression': set([vol1]),
'thin': set([vol1]), 'all': set([vol1])}
**** CubicPower OpenStack Study ****
def setUp(self):
super(NetAppDirectCmodeISCSIDriverTestCase, self).setUp()
self._custom_setup()
**** CubicPower OpenStack Study ****
def _custom_setup(self):
self.stubs.Set(
ssc_utils, 'refresh_cluster_ssc',
lambda a, b, c, synchronous: None)
configuration = self._set_config(create_configuration())
driver = common.NetAppDriver(configuration=configuration)
self.stubs.Set(httplib, 'HTTPConnection',
FakeDirectCmodeHTTPConnection)
driver.do_setup(context='')
client = driver.client
client.set_api_version(1, 15)
self.driver = driver
self.driver.ssc_vols = self.ssc_map
**** CubicPower OpenStack Study ****
def _set_config(self, configuration):
configuration.netapp_storage_protocol = 'iscsi'
configuration.netapp_login = 'admin'
configuration.netapp_password = 'pass'
configuration.netapp_server_hostname = '127.0.0.1'
configuration.netapp_transport_type = 'http'
configuration.netapp_server_port = '80'
configuration.netapp_vserver = 'openstack'
return configuration
**** CubicPower OpenStack Study ****
def test_connect(self):
self.driver.check_for_setup_error()
**** CubicPower OpenStack Study ****
def test_create_destroy(self):
self.driver.create_volume(self.volume)
self.driver.delete_volume(self.volume)
**** CubicPower OpenStack Study ****
def test_create_vol_snapshot_destroy(self):
self.driver.create_volume(self.volume)
self.driver.create_snapshot(self.snapshot)
self.driver.create_volume_from_snapshot(self.volume_sec, self.snapshot)
self.driver.delete_snapshot(self.snapshot)
self.driver.delete_volume(self.volume)
**** CubicPower OpenStack Study ****
def test_map_unmap(self):
self.driver.create_volume(self.volume)
updates = self.driver.create_export(None, self.volume)
self.assertTrue(updates['provider_location'])
self.volume['provider_location'] = updates['provider_location']
connection_info = self.driver.initialize_connection(self.volume,
self.connector)
self.assertEqual(connection_info['driver_volume_type'], 'iscsi')
properties = connection_info['data']
if not properties:
raise AssertionError('Target portal is none')
self.driver.terminate_connection(self.volume, self.connector)
self.driver.delete_volume(self.volume)
**** CubicPower OpenStack Study ****
def test_cloned_volume_destroy(self):
self.driver.create_volume(self.volume)
self.driver.create_cloned_volume(self.snapshot, self.volume)
self.driver.delete_volume(self.snapshot)
self.driver.delete_volume(self.volume)
**** CubicPower OpenStack Study ****
def test_map_by_creating_igroup(self):
self.driver.create_volume(self.volume)
updates = self.driver.create_export(None, self.volume)
self.assertTrue(updates['provider_location'])
self.volume['provider_location'] = updates['provider_location']
connector_new = {'initiator': 'iqn.1993-08.org.debian:01:1001'}
connection_info = self.driver.initialize_connection(self.volume,
connector_new)
self.assertEqual(connection_info['driver_volume_type'], 'iscsi')
properties = connection_info['data']
if not properties:
raise AssertionError('Target portal is none')
**** CubicPower OpenStack Study ****
def test_fail_create_vol(self):
self.assertRaises(exception.VolumeBackendAPIException,
self.driver.create_volume, self.vol_fail)
**** CubicPower OpenStack Study ****
def test_vol_stats(self):
self.driver.get_volume_stats(refresh=True)
**** CubicPower OpenStack Study ****
def test_create_vol_snapshot_diff_size_resize(self):
self.driver.create_volume(self.volume)
self.driver.create_snapshot(self.snapshot)
self.driver.create_volume_from_snapshot(
self.volume_clone, self.snapshot)
self.driver.delete_snapshot(self.snapshot)
self.driver.delete_volume(self.volume)
**** CubicPower OpenStack Study ****
def test_create_vol_snapshot_diff_size_subclone(self):
self.driver.create_volume(self.volume)
self.driver.create_snapshot(self.snapshot)
self.driver.create_volume_from_snapshot(
self.volume_clone_large, self.snapshot)
self.driver.delete_snapshot(self.snapshot)
self.driver.delete_volume(self.volume)
**** CubicPower OpenStack Study ****
def test_extend_vol_same_size(self):
self.driver.create_volume(self.volume)
self.driver.extend_volume(self.volume, self.volume['size'])
**** CubicPower OpenStack Study ****
def test_extend_vol_direct_resize(self):
self.driver.create_volume(self.volume)
self.driver.extend_volume(self.volume, 3)
**** CubicPower OpenStack Study ****
def test_extend_vol_sub_lun_clone(self):
self.driver.create_volume(self.volume)
self.driver.extend_volume(self.volume, 4)
**** CubicPower OpenStack Study ****
class NetAppDriverNegativeTestCase(test.TestCase):
"""Test case for NetAppDriver"""
**** CubicPower OpenStack Study ****
def setUp(self):
super(NetAppDriverNegativeTestCase, self).setUp()
**** CubicPower OpenStack Study ****
def test_incorrect_family(self):
configuration = create_configuration()
configuration.netapp_storage_family = 'xyz_abc'
try:
driver = common.NetAppDriver(configuration=configuration)
raise AssertionError('Wrong storage family is getting accepted.')
except exception.InvalidInput:
pass
**** CubicPower OpenStack Study ****
def test_incorrect_protocol(self):
configuration = create_configuration()
configuration.netapp_storage_family = 'ontap'
configuration.netapp_storage_protocol = 'ontap'
try:
driver = common.NetAppDriver(configuration=configuration)
raise AssertionError('Wrong storage protocol is getting accepted.')
except exception.InvalidInput:
pass
**** CubicPower OpenStack Study ****
def test_non_netapp_driver(self):
configuration = create_configuration()
common.netapp_unified_plugin_registry['test_family'] =\
{'iscsi': 'cinder.volume.drivers.arbitrary.IscsiDriver'}
configuration.netapp_storage_family = 'test_family'
configuration.netapp_storage_protocol = 'iscsi'
try:
driver = common.NetAppDriver(configuration=configuration)
raise AssertionError('Non NetApp driver is getting instantiated.')
except exception.InvalidInput:
pass
finally:
common.netapp_unified_plugin_registry.pop('test_family')
**** CubicPower OpenStack Study ****
class FakeDirect7MODEServerHandler(FakeHTTPRequestHandler):
"""HTTP handler that fakes enough stuff to allow the driver to run."""
**** CubicPower OpenStack Study ****
def do_GET(s):
"""Respond to a GET request."""
if '/servlets/netapp.servlets.admin.XMLrequest_filer' not in s.path:
s.send_response(404)
s.end_headers
return
s.send_response(200)
s.send_header("Content-Type", "text/xml; charset=utf-8")
s.end_headers()
out = s.wfile
out.write('' ' ' status="failed" errno="Not_Allowed"/>
')
**** CubicPower OpenStack Study ****
def do_POST(s):
"""Respond to a POST request."""
if '/servlets/netapp.servlets.admin.XMLrequest_filer' not in s.path:
s.send_response(404)
s.end_headers
return
request_xml = s.rfile.read(int(s.headers['Content-Length']))
root = etree.fromstring(request_xml)
body = [x for x in root.iterchildren()]
request = body[0]
tag = request.tag
api = etree.QName(tag).localname or tag
if 'lun-list-info' == api:
body = """ false
false
/vol/vol1/lun1
20971520
true
false
false
false
none
linux
e867d844-c2c0-11e0-9282-00a09825b3b5
P3lgP4eTyaNl
512
true
0
indeterminate
/vol/vol1/lun1
20971520
true
false
false
false
none
linux
8e1e9284-c288-11e0-9282-00a09825b3b5
P3lgP4eTc3lp
512
true
0
indeterminate
"""
elif 'volume-list-info' == api:
body = """ vol0
019c8f7a-9243-11e0-9281-00a09825b3b5
flex
32_bit
online
576914493440
13820354560
563094110208
2
20
140848264
0
0
0
0
20907162
7010
518
31142
31142
0
false
aggr0
disabled
idle
regular
sun-sat@0
Mon Aug 8 09:34:15 EST 2011
Mon Aug 8 09:34:15 EST 2011
0
0
0
0
0
0
0
0
0
0
false
volume
true
14
raid_dp,sis
block
true
false
false
false
false
unmirrored
3
1
/aggr0/plex0
true
false
vol1
2d50ecf4-c288-11e0-9282-00a09825b3b5
flex
32_bit
online
42949672960
44089344
42905583616
0
20
10485760
8192
8192
0
0
1556480
110
504
31142
31142
0
false
aggr1
disabled
idle
regular
sun-sat@0
Sun Aug 7 14:51:00 EST 2011
Sun Aug 7 14:51:00 EST 2011
0
0
0
0
0
0
0
0
0
0
false
volume
true
7
raid4,sis
block
true
false
false
false
false
unmirrored
2
1
/aggr1/plex0
true
false
"""
elif 'volume-options-list-info' == api:
body = """ snapmirrored
off
root
false
ha_policy
cfo
striping
not_striped
compression
off
"""
elif 'lun-create-by-size' == api:
body = """ 22020096
"""
elif 'lun-destroy' == api:
body = """"""
elif 'igroup-list-info' == api:
body = """ openstack-8bc96490
iscsi
b8e1d274-c378-11e0
linux
0
false
false
false
true
iqn.1993-08.org.debian:01:10
iscsi_group
iscsi
ccb8cbe4-c36f
linux
0
false
false
false
true
iqn.1993-08.org.debian:01:10ca
"""
elif 'lun-map-list-info' == api:
body = """
"""
elif 'lun-map' == api:
body = """1
"""
elif 'iscsi-node-get-name' == api:
body = """ iqn.1992-08.com.netapp:sn.135093938
"""
elif 'iscsi-portal-list-info' == api:
body = """ 10.61.176.156
3260
1000
e0a
"""
elif 'igroup-create' == api:
body = """"""
elif 'igroup-add' == api:
body = """"""
elif 'clone-start' == api:
body = """ 2d50ecf4-c288-11e0-9282-00a09825b3b5
11
"""
elif 'clone-list-status' == api:
body = """ completed
"""
elif 'lun-unmap' == api:
body = """"""
elif 'system-get-ontapi-version' == api:
body = """ 1
8
"""
elif 'lun-set-space-reservation-info' == api:
body = """"""
elif 'ems-autosupport-log' == api:
body = """"""
elif 'lun-resize' == api:
body = """"""
elif 'lun-get-geometry' == api:
body = """ 1
2
8
2
4
5
"""
elif 'volume-options-list-info' == api:
body = """
"""
elif 'lun-move' == api:
body = """"""
else:
# Unknown API
s.send_response(500)
s.end_headers
return
s.send_response(200)
s.send_header("Content-Type", "text/xml; charset=utf-8")
s.end_headers()
s.wfile.write(RESPONSE_PREFIX_DIRECT_7MODE)
s.wfile.write(RESPONSE_PREFIX_DIRECT)
s.wfile.write(body)
s.wfile.write(RESPONSE_SUFFIX_DIRECT)
**** CubicPower OpenStack Study ****
class FakeDirect7modeHTTPConnection(object):
"""A fake httplib.HTTPConnection for netapp tests
Requests made via this connection actually get translated and routed into
the fake direct handler above, we then turn the response into
the httplib.HTTPResponse that the caller expects.
"""
**** CubicPower OpenStack Study ****
def __init__(self, host, timeout=None):
self.host = host
**** CubicPower OpenStack Study ****
def request(self, method, path, data=None, headers=None):
if not headers:
headers = {}
req_str = '%s %s HTTP/1.1\r\n' % (method, path)
for key, value in headers.iteritems():
req_str += "%s: %s\r\n" % (key, value)
if data:
req_str += '\r\n%s' % data
# NOTE(vish): normally the http transport normailizes from unicode
sock = FakeHttplibSocket(req_str.decode("latin-1").encode("utf-8"))
# NOTE(vish): stop the server from trying to look up address from
# the fake socket
FakeDirect7MODEServerHandler.address_string = lambda x: '127.0.0.1'
self.app = FakeDirect7MODEServerHandler(sock, '127.0.0.1:80', None)
self.sock = FakeHttplibSocket(sock.result)
self.http_response = httplib.HTTPResponse(self.sock)
**** CubicPower OpenStack Study ****
def set_debuglevel(self, level):
pass
**** CubicPower OpenStack Study ****
def getresponse(self):
self.http_response.begin()
return self.http_response
**** CubicPower OpenStack Study ****
def getresponsebody(self):
return self.sock.result
**** CubicPower OpenStack Study ****
class NetAppDirect7modeISCSIDriverTestCase_NV(
NetAppDirectCmodeISCSIDriverTestCase):
"""Test case for NetAppISCSIDriver
No vfiler
"""
**** CubicPower OpenStack Study ****
def setUp(self):
super(NetAppDirect7modeISCSIDriverTestCase_NV, self).setUp()
**** CubicPower OpenStack Study ****
def _custom_setup(self):
configuration = self._set_config(create_configuration())
driver = common.NetAppDriver(configuration=configuration)
self.stubs.Set(httplib, 'HTTPConnection',
FakeDirect7modeHTTPConnection)
driver.do_setup(context='')
client = driver.client
client.set_api_version(1, 9)
self.driver = driver
**** CubicPower OpenStack Study ****
def _set_config(self, configuration):
configuration.netapp_storage_family = 'ontap_7mode'
configuration.netapp_storage_protocol = 'iscsi'
configuration.netapp_login = 'admin'
configuration.netapp_password = 'pass'
configuration.netapp_server_hostname = '127.0.0.1'
configuration.netapp_transport_type = 'http'
configuration.netapp_server_port = '80'
return configuration
**** CubicPower OpenStack Study ****
def test_create_on_select_vol(self):
self.driver.volume_list = ['vol0', 'vol1']
self.driver.create_volume(self.volume)
self.driver.delete_volume(self.volume)
self.driver.volume_list = []
**** CubicPower OpenStack Study ****
def test_create_fail_on_select_vol(self):
self.driver.volume_list = ['vol2', 'vol3']
success = False
try:
self.driver.create_volume(self.volume)
except exception.VolumeBackendAPIException:
success = True
pass
finally:
self.driver.volume_list = []
if not success:
raise AssertionError('Failed creating on selected volumes')
**** CubicPower OpenStack Study ****
def test_check_for_setup_error_version(self):
drv = self.driver
delattr(drv.client, '_api_version')
# check exception raises when version not found
self.assertRaises(exception.VolumeBackendAPIException,
drv.check_for_setup_error)
drv.client.set_api_version(1, 8)
# check exception raises when not supported version
self.assertRaises(exception.VolumeBackendAPIException,
drv.check_for_setup_error)
**** CubicPower OpenStack Study ****
class NetAppDirect7modeISCSIDriverTestCase_WV(
NetAppDirect7modeISCSIDriverTestCase_NV):
"""Test case for NetAppISCSIDriver
With vfiler
"""
**** CubicPower OpenStack Study ****
def setUp(self):
super(NetAppDirect7modeISCSIDriverTestCase_WV, self).setUp()
**** CubicPower OpenStack Study ****
def _custom_setup(self):
configuration = self._set_config(create_configuration())
driver = common.NetAppDriver(configuration=configuration)
self.stubs.Set(httplib, 'HTTPConnection',
FakeDirect7modeHTTPConnection)
driver.do_setup(context='')
client = driver.client
client.set_api_version(1, 9)
self.driver = driver
**** CubicPower OpenStack Study ****
def _set_config(self, configuration):
configuration.netapp_storage_family = 'ontap_7mode'
configuration.netapp_storage_protocol = 'iscsi'
configuration.netapp_login = 'admin'
configuration.netapp_password = 'pass'
configuration.netapp_server_hostname = '127.0.0.1'
configuration.netapp_transport_type = 'http'
configuration.netapp_server_port = '80'
configuration.netapp_vfiler = 'openstack'
return configuration
**** CubicPower OpenStack Study ****
class NetAppApiElementTransTests(test.TestCase):
"""Test case for NetApp api element translations."""
**** CubicPower OpenStack Study ****
def setUp(self):
super(NetAppApiElementTransTests, self).setUp()
**** CubicPower OpenStack Study ****
def test_translate_struct_dict_unique_key(self):
"""Tests if dict gets properly converted to NaElements."""
root = NaElement('root')
child = {'e1': 'v1', 'e2': 'v2', 'e3': 'v3'}
root.translate_struct(child)
self.assertEqual(len(root.get_children()), 3)
self.assertEqual(root.get_child_content('e1'), 'v1')
self.assertEqual(root.get_child_content('e2'), 'v2')
self.assertEqual(root.get_child_content('e3'), 'v3')
**** CubicPower OpenStack Study ****
def test_translate_struct_dict_nonunique_key(self):
"""Tests if list/dict gets properly converted to NaElements."""
root = NaElement('root')
child = [{'e1': 'v1', 'e2': 'v2'}, {'e1': 'v3'}]
root.translate_struct(child)
self.assertEqual(len(root.get_children()), 3)
children = root.get_children()
for c in children:
if c.get_name() == 'e1':
self.assertIn(c.get_content(), ['v1', 'v3'])
else:
self.assertEqual(c.get_content(), 'v2')
**** CubicPower OpenStack Study ****
def test_translate_struct_list(self):
"""Tests if list gets properly converted to NaElements."""
root = NaElement('root')
child = ['e1', 'e2']
root.translate_struct(child)
self.assertEqual(len(root.get_children()), 2)
self.assertIsNone(root.get_child_content('e1'))
self.assertIsNone(root.get_child_content('e2'))
**** CubicPower OpenStack Study ****
def test_translate_struct_tuple(self):
"""Tests if tuple gets properly converted to NaElements."""
root = NaElement('root')
child = ('e1', 'e2')
root.translate_struct(child)
self.assertEqual(len(root.get_children()), 2)
self.assertIsNone(root.get_child_content('e1'))
self.assertIsNone(root.get_child_content('e2'))
**** CubicPower OpenStack Study ****
def test_translate_invalid_struct(self):
"""Tests if invalid data structure raises exception."""
root = NaElement('root')
child = 'random child element'
self.assertRaises(ValueError, root.translate_struct, child)
**** CubicPower OpenStack Study ****
def test_setter_builtin_types(self):
"""Tests str, int, float get converted to NaElement."""
root = NaElement('root')
root['e1'] = 'v1'
root['e2'] = 1
root['e3'] = 2.0
root['e4'] = 8l
self.assertEqual(len(root.get_children()), 4)
self.assertEqual(root.get_child_content('e1'), 'v1')
self.assertEqual(root.get_child_content('e2'), '1')
self.assertEqual(root.get_child_content('e3'), '2.0')
self.assertEqual(root.get_child_content('e4'), '8')
**** CubicPower OpenStack Study ****
def test_setter_na_element(self):
"""Tests na_element gets appended as child."""
root = NaElement('root')
root['e1'] = NaElement('nested')
self.assertEqual(len(root.get_children()), 1)
e1 = root.get_child_by_name('e1')
self.assertIsInstance(e1, NaElement)
self.assertIsInstance(e1.get_child_by_name('nested'), NaElement)
**** CubicPower OpenStack Study ****
def test_setter_child_dict(self):
"""Tests dict is appended as child to root."""
root = NaElement('root')
root['d'] = {'e1': 'v1', 'e2': 'v2'}
e1 = root.get_child_by_name('d')
self.assertIsInstance(e1, NaElement)
sub_ch = e1.get_children()
self.assertEqual(len(sub_ch), 2)
for c in sub_ch:
self.assertIn(c.get_name(), ['e1', 'e2'])
if c.get_name() == 'e1':
self.assertEqual(c.get_content(), 'v1')
else:
self.assertEqual(c.get_content(), 'v2')
**** CubicPower OpenStack Study ****
def test_setter_child_list_tuple(self):
"""Tests list/tuple are appended as child to root."""
root = NaElement('root')
root['l'] = ['l1', 'l2']
root['t'] = ('t1', 't2')
l = root.get_child_by_name('l')
self.assertIsInstance(l, NaElement)
t = root.get_child_by_name('t')
self.assertIsInstance(t, NaElement)
for le in l.get_children():
self.assertIn(le.get_name(), ['l1', 'l2'])
for te in t.get_children():
self.assertIn(te.get_name(), ['t1', 't2'])
**** CubicPower OpenStack Study ****
def test_setter_no_value(self):
"""Tests key with None value."""
root = NaElement('root')
root['k'] = None
self.assertIsNone(root.get_child_content('k'))
**** CubicPower OpenStack Study ****
def test_setter_invalid_value(self):
"""Tests invalid value raises exception."""
root = NaElement('root')
try:
root['k'] = NaServer('localhost')
except Exception as e:
if not isinstance(e, TypeError):
self.fail(_('Error not a TypeError.'))
**** CubicPower OpenStack Study ****
def test_setter_invalid_key(self):
"""Tests invalid value raises exception."""
root = NaElement('root')
try:
root[None] = 'value'
except Exception as e:
if not isinstance(e, KeyError):
self.fail(_('Error not a KeyError.'))