**** CubicPower OpenStack Study ****
# Copyright 2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
import six
from keystone.common import extension
from keystone import config
from keystone.tests import rest
CONF = config.CONF
**** CubicPower OpenStack Study ****
class CoreApiTests(object):
**** CubicPower OpenStack Study ****
def assertValidError(self, error):
"""Applicable to XML and JSON."""
self.assertIsNotNone(error.get('code'))
self.assertIsNotNone(error.get('title'))
self.assertIsNotNone(error.get('message'))
**** CubicPower OpenStack Study ****
def assertValidVersion(self, version):
"""Applicable to XML and JSON.
However, navigating links and media-types differs between content
types so they need to be validated separately.
"""
self.assertIsNotNone(version)
self.assertIsNotNone(version.get('id'))
self.assertIsNotNone(version.get('status'))
self.assertIsNotNone(version.get('updated'))
**** CubicPower OpenStack Study ****
def assertValidExtension(self, extension):
"""Applicable to XML and JSON.
However, navigating extension links differs between content types.
They need to be validated separately with assertValidExtensionLink.
"""
self.assertIsNotNone(extension)
self.assertIsNotNone(extension.get('name'))
self.assertIsNotNone(extension.get('namespace'))
self.assertIsNotNone(extension.get('alias'))
self.assertIsNotNone(extension.get('updated'))
**** CubicPower OpenStack Study ****
def assertValidExtensionLink(self, link):
"""Applicable to XML and JSON."""
self.assertIsNotNone(link.get('rel'))
self.assertIsNotNone(link.get('type'))
self.assertIsNotNone(link.get('href'))
**** CubicPower OpenStack Study ****
def assertValidTenant(self, tenant):
"""Applicable to XML and JSON."""
self.assertIsNotNone(tenant.get('id'))
self.assertIsNotNone(tenant.get('name'))
**** CubicPower OpenStack Study ****
def assertValidUser(self, user):
"""Applicable to XML and JSON."""
self.assertIsNotNone(user.get('id'))
self.assertIsNotNone(user.get('name'))
**** CubicPower OpenStack Study ****
def assertValidRole(self, tenant):
"""Applicable to XML and JSON."""
self.assertIsNotNone(tenant.get('id'))
self.assertIsNotNone(tenant.get('name'))
**** CubicPower OpenStack Study ****
def test_public_not_found(self):
r = self.public_request(
path='/%s' % uuid.uuid4().hex,
expected_status=404)
self.assertValidErrorResponse(r)
**** CubicPower OpenStack Study ****
def test_admin_not_found(self):
r = self.admin_request(
path='/%s' % uuid.uuid4().hex,
expected_status=404)
self.assertValidErrorResponse(r)
**** CubicPower OpenStack Study ****
def test_public_multiple_choice(self):
r = self.public_request(path='/', expected_status=300)
self.assertValidMultipleChoiceResponse(r)
**** CubicPower OpenStack Study ****
def test_admin_multiple_choice(self):
r = self.admin_request(path='/', expected_status=300)
self.assertValidMultipleChoiceResponse(r)
**** CubicPower OpenStack Study ****
def test_public_version(self):
r = self.public_request(path='/v2.0/')
self.assertValidVersionResponse(r)
**** CubicPower OpenStack Study ****
def test_admin_version(self):
r = self.admin_request(path='/v2.0/')
self.assertValidVersionResponse(r)
**** CubicPower OpenStack Study ****
def test_public_extensions(self):
r = self.public_request(path='/v2.0/extensions')
self.assertValidExtensionListResponse(r,
extension.PUBLIC_EXTENSIONS)
**** CubicPower OpenStack Study ****
def test_admin_extensions(self):
r = self.admin_request(path='/v2.0/extensions')
self.assertValidExtensionListResponse(r,
extension.ADMIN_EXTENSIONS)
**** CubicPower OpenStack Study ****
def test_admin_extensions_404(self):
self.admin_request(path='/v2.0/extensions/invalid-extension',
expected_status=404)
**** CubicPower OpenStack Study ****
def test_public_osksadm_extension_404(self):
self.public_request(path='/v2.0/extensions/OS-KSADM',
expected_status=404)
**** CubicPower OpenStack Study ****
def test_admin_osksadm_extension(self):
r = self.admin_request(path='/v2.0/extensions/OS-KSADM')
self.assertValidExtensionResponse(r,
extension.ADMIN_EXTENSIONS)
**** CubicPower OpenStack Study ****
def test_authenticate(self):
r = self.public_request(
method='POST',
path='/v2.0/tokens',
body={
'auth': {
'passwordCredentials': {
'username': self.user_foo['name'],
'password': self.user_foo['password'],
},
'tenantId': self.tenant_bar['id'],
},
},
expected_status=200)
self.assertValidAuthenticationResponse(r, require_service_catalog=True)
**** CubicPower OpenStack Study ****
def test_authenticate_unscoped(self):
r = self.public_request(
method='POST',
path='/v2.0/tokens',
body={
'auth': {
'passwordCredentials': {
'username': self.user_foo['name'],
'password': self.user_foo['password'],
},
},
},
expected_status=200)
self.assertValidAuthenticationResponse(r)
**** CubicPower OpenStack Study ****
def test_get_tenants_for_token(self):
r = self.public_request(path='/v2.0/tenants',
token=self.get_scoped_token())
self.assertValidTenantListResponse(r)
**** CubicPower OpenStack Study ****
def test_validate_token(self):
token = self.get_scoped_token()
r = self.admin_request(
path='/v2.0/tokens/%(token_id)s' % {
'token_id': token,
},
token=token)
self.assertValidAuthenticationResponse(r)
**** CubicPower OpenStack Study ****
def test_invalid_token_404(self):
token = self.get_scoped_token()
self.admin_request(
path='/v2.0/tokens/%(token_id)s' % {
'token_id': 'invalid',
},
token=token,
expected_status=404)
**** CubicPower OpenStack Study ****
def test_validate_token_service_role(self):
self.md_foobar = self.assignment_api.add_role_to_user_and_project(
self.user_foo['id'],
self.tenant_service['id'],
self.role_service['id'])
token = self.get_scoped_token(tenant_id='service')
r = self.admin_request(
path='/v2.0/tokens/%s' % token,
token=token)
self.assertValidAuthenticationResponse(r)
**** CubicPower OpenStack Study ****
def test_remove_role_revokes_token(self):
self.md_foobar = self.assignment_api.add_role_to_user_and_project(
self.user_foo['id'],
self.tenant_service['id'],
self.role_service['id'])
token = self.get_scoped_token(tenant_id='service')
r = self.admin_request(
path='/v2.0/tokens/%s' % token,
token=token)
self.assertValidAuthenticationResponse(r)
self.assignment_api.remove_role_from_user_and_project(
self.user_foo['id'],
self.tenant_service['id'],
self.role_service['id'])
r = self.admin_request(
path='/v2.0/tokens/%s' % token,
token=token,
expected_status=401)
**** CubicPower OpenStack Study ****
def test_validate_token_belongs_to(self):
token = self.get_scoped_token()
path = ('/v2.0/tokens/%s?belongsTo=%s' % (token,
self.tenant_bar['id']))
r = self.admin_request(path=path, token=token)
self.assertValidAuthenticationResponse(r, require_service_catalog=True)
**** CubicPower OpenStack Study ****
def test_validate_token_no_belongs_to_still_returns_catalog(self):
token = self.get_scoped_token()
path = ('/v2.0/tokens/%s' % token)
r = self.admin_request(path=path, token=token)
self.assertValidAuthenticationResponse(r, require_service_catalog=True)
**** CubicPower OpenStack Study ****
def test_validate_token_head(self):
"""The same call as above, except using HEAD.
There's no response to validate here, but this is included for the
sake of completely covering the core API.
"""
token = self.get_scoped_token()
self.admin_request(
method='HEAD',
path='/v2.0/tokens/%(token_id)s' % {
'token_id': token,
},
token=token,
expected_status=204)
**** CubicPower OpenStack Study ****
def test_endpoints(self):
token = self.get_scoped_token()
r = self.admin_request(
path='/v2.0/tokens/%(token_id)s/endpoints' % {
'token_id': token,
},
token=token)
self.assertValidEndpointListResponse(r)
**** CubicPower OpenStack Study ****
def test_get_tenant(self):
token = self.get_scoped_token()
r = self.admin_request(
path='/v2.0/tenants/%(tenant_id)s' % {
'tenant_id': self.tenant_bar['id'],
},
token=token)
self.assertValidTenantResponse(r)
**** CubicPower OpenStack Study ****
def test_get_tenant_by_name(self):
token = self.get_scoped_token()
r = self.admin_request(
path='/v2.0/tenants?name=%(tenant_name)s' % {
'tenant_name': self.tenant_bar['name'],
},
token=token)
self.assertValidTenantResponse(r)
**** CubicPower OpenStack Study ****
def test_get_user_roles(self):
# The server responds with a 501 Not Implemented. See bug 933565.
token = self.get_scoped_token()
self.admin_request(
path='/v2.0/users/%(user_id)s/roles' % {
'user_id': self.user_foo['id'],
},
token=token, expected_status=501)
**** CubicPower OpenStack Study ****
def test_get_user_roles_with_tenant(self):
token = self.get_scoped_token()
r = self.admin_request(
path='/v2.0/tenants/%(tenant_id)s/users/%(user_id)s/roles' % {
'tenant_id': self.tenant_bar['id'],
'user_id': self.user_foo['id'],
},
token=token)
self.assertValidRoleListResponse(r)
**** CubicPower OpenStack Study ****
def test_get_user(self):
token = self.get_scoped_token()
r = self.admin_request(
path='/v2.0/users/%(user_id)s' % {
'user_id': self.user_foo['id'],
},
token=token)
self.assertValidUserResponse(r)
**** CubicPower OpenStack Study ****
def test_get_user_by_name(self):
token = self.get_scoped_token()
r = self.admin_request(
path='/v2.0/users?name=%(user_name)s' % {
'user_name': self.user_foo['name'],
},
token=token)
self.assertValidUserResponse(r)
**** CubicPower OpenStack Study ****
def test_create_update_user_invalid_enabled_type(self):
# Enforce usage of boolean for 'enabled' field in JSON and XML
token = self.get_scoped_token()
# Test CREATE request
r = self.admin_request(
method='POST',
path='/v2.0/users',
body={
'user': {
'name': uuid.uuid4().hex,
'password': uuid.uuid4().hex,
# In XML, only "true|false" are converted to boolean.
'enabled': "False",
},
},
token=token,
expected_status=400)
self.assertValidErrorResponse(r)
r = self.admin_request(
method='POST',
path='/v2.0/users',
body={
'user': {
'name': uuid.uuid4().hex,
'password': uuid.uuid4().hex,
# In JSON, 0|1 are not booleans
'enabled': 0,
},
},
token=token,
expected_status=400)
self.assertValidErrorResponse(r)
# Test UPDATE request
path = '/v2.0/users/%(user_id)s' % {
'user_id': self.user_foo['id'],
}
r = self.admin_request(
method='PUT',
path=path,
body={
'user': {
# In XML, only "true|false" are converted to boolean.
'enabled': "False",
},
},
token=token,
expected_status=400)
self.assertValidErrorResponse(r)
r = self.admin_request(
method='PUT',
path=path,
body={
'user': {
# In JSON, 0|1 are not booleans
'enabled': 1,
},
},
token=token,
expected_status=400)
self.assertValidErrorResponse(r)
**** CubicPower OpenStack Study ****
def test_error_response(self):
"""This triggers assertValidErrorResponse by convention."""
self.public_request(path='/v2.0/tenants', expected_status=401)
**** CubicPower OpenStack Study ****
def test_invalid_parameter_error_response(self):
token = self.get_scoped_token()
bad_body = {
'OS-KSADM:service%s' % uuid.uuid4().hex: {
'name': uuid.uuid4().hex,
'type': uuid.uuid4().hex,
},
}
res = self.admin_request(method='POST',
path='/v2.0/OS-KSADM/services',
body=bad_body,
token=token,
expected_status=400)
self.assertValidErrorResponse(res)
res = self.admin_request(method='POST',
path='/v2.0/users',
body=bad_body,
token=token,
expected_status=400)
self.assertValidErrorResponse(res)
**** CubicPower OpenStack Study ****
def _get_user_id(self, r):
"""Helper method to return user ID from a response.
This needs to be overridden by child classes
based on their content type.
"""
raise NotImplementedError()
**** CubicPower OpenStack Study ****
def _get_role_id(self, r):
"""Helper method to return a role ID from a response.
This needs to be overridden by child classes
based on their content type.
"""
raise NotImplementedError()
**** CubicPower OpenStack Study ****
def _get_role_name(self, r):
"""Helper method to return role NAME from a response.
This needs to be overridden by child classes
based on their content type.
"""
raise NotImplementedError()
**** CubicPower OpenStack Study ****
def _get_project_id(self, r):
"""Helper method to return project ID from a response.
This needs to be overridden by child classes
based on their content type.
"""
raise NotImplementedError()
**** CubicPower OpenStack Study ****
def assertNoRoles(self, r):
"""Helper method to assert No Roles
This needs to be overridden by child classes
based on their content type.
"""
raise NotImplementedError()
**** CubicPower OpenStack Study ****
def test_update_user_tenant(self):
token = self.get_scoped_token()
# Create a new user
r = self.admin_request(
method='POST',
path='/v2.0/users',
body={
'user': {
'name': uuid.uuid4().hex,
'password': uuid.uuid4().hex,
'tenantId': self.tenant_bar['id'],
'enabled': True,
},
},
token=token,
expected_status=200)
user_id = self._get_user_id(r.result)
# Check if member_role is in tenant_bar
r = self.admin_request(
path='/v2.0/tenants/%(project_id)s/users/%(user_id)s/roles' % {
'project_id': self.tenant_bar['id'],
'user_id': user_id
},
token=token,
expected_status=200)
self.assertEqual(CONF.member_role_name, self._get_role_name(r.result))
# Create a new tenant
r = self.admin_request(
method='POST',
path='/v2.0/tenants',
body={
'tenant': {
'name': 'test_update_user',
'description': 'A description ...',
'enabled': True,
},
},
token=token,
expected_status=200)
project_id = self._get_project_id(r.result)
# Update user's tenant
r = self.admin_request(
method='PUT',
path='/v2.0/users/%(user_id)s' % {
'user_id': user_id,
},
body={
'user': {
'tenantId': project_id,
},
},
token=token,
expected_status=200)
# 'member_role' should be in new_tenant
r = self.admin_request(
path='/v2.0/tenants/%(project_id)s/users/%(user_id)s/roles' % {
'project_id': project_id,
'user_id': user_id
},
token=token,
expected_status=200)
self.assertEqual('_member_', self._get_role_name(r.result))
# 'member_role' should not be in tenant_bar any more
r = self.admin_request(
path='/v2.0/tenants/%(project_id)s/users/%(user_id)s/roles' % {
'project_id': self.tenant_bar['id'],
'user_id': user_id
},
token=token,
expected_status=200)
self.assertNoRoles(r.result)
**** CubicPower OpenStack Study ****
def test_update_user_with_invalid_tenant(self):
token = self.get_scoped_token()
# Create a new user
r = self.admin_request(
method='POST',
path='/v2.0/users',
body={
'user': {
'name': 'test_invalid_tenant',
'password': uuid.uuid4().hex,
'tenantId': self.tenant_bar['id'],
'enabled': True,
},
},
token=token,
expected_status=200)
user_id = self._get_user_id(r.result)
# Update user with an invalid tenant
r = self.admin_request(
method='PUT',
path='/v2.0/users/%(user_id)s' % {
'user_id': user_id,
},
body={
'user': {
'tenantId': 'abcde12345heha',
},
},
token=token,
expected_status=404)
**** CubicPower OpenStack Study ****
def test_update_user_with_invalid_tenant_no_prev_tenant(self):
token = self.get_scoped_token()
# Create a new user
r = self.admin_request(
method='POST',
path='/v2.0/users',
body={
'user': {
'name': 'test_invalid_tenant',
'password': uuid.uuid4().hex,
'enabled': True,
},
},
token=token,
expected_status=200)
user_id = self._get_user_id(r.result)
# Update user with an invalid tenant
r = self.admin_request(
method='PUT',
path='/v2.0/users/%(user_id)s' % {
'user_id': user_id,
},
body={
'user': {
'tenantId': 'abcde12345heha',
},
},
token=token,
expected_status=404)
**** CubicPower OpenStack Study ****
def test_update_user_with_old_tenant(self):
token = self.get_scoped_token()
# Create a new user
r = self.admin_request(
method='POST',
path='/v2.0/users',
body={
'user': {
'name': uuid.uuid4().hex,
'password': uuid.uuid4().hex,
'tenantId': self.tenant_bar['id'],
'enabled': True,
},
},
token=token,
expected_status=200)
user_id = self._get_user_id(r.result)
# Check if member_role is in tenant_bar
r = self.admin_request(
path='/v2.0/tenants/%(project_id)s/users/%(user_id)s/roles' % {
'project_id': self.tenant_bar['id'],
'user_id': user_id
},
token=token,
expected_status=200)
self.assertEqual(CONF.member_role_name, self._get_role_name(r.result))
# Update user's tenant with old tenant id
r = self.admin_request(
method='PUT',
path='/v2.0/users/%(user_id)s' % {
'user_id': user_id,
},
body={
'user': {
'tenantId': self.tenant_bar['id'],
},
},
token=token,
expected_status=200)
# 'member_role' should still be in tenant_bar
r = self.admin_request(
path='/v2.0/tenants/%(project_id)s/users/%(user_id)s/roles' % {
'project_id': self.tenant_bar['id'],
'user_id': user_id
},
token=token,
expected_status=200)
self.assertEqual('_member_', self._get_role_name(r.result))
**** CubicPower OpenStack Study ****
def test_authenticating_a_user_with_no_password(self):
token = self.get_scoped_token()
username = uuid.uuid4().hex
# create the user
self.admin_request(
method='POST',
path='/v2.0/users',
body={
'user': {
'name': username,
'enabled': True,
},
},
token=token)
# fail to authenticate
r = self.public_request(
method='POST',
path='/v2.0/tokens',
body={
'auth': {
'passwordCredentials': {
'username': username,
'password': 'password',
},
},
},
expected_status=401)
self.assertValidErrorResponse(r)
**** CubicPower OpenStack Study ****
def test_www_authenticate_header(self):
r = self.public_request(
path='/v2.0/tenants',
expected_status=401)
self.assertEqual('Keystone uri="http://localhost"',
r.headers.get('WWW-Authenticate'))
**** CubicPower OpenStack Study ****
def test_www_authenticate_header_host(self):
test_url = 'http://%s:4187' % uuid.uuid4().hex
self.config_fixture.config(public_endpoint=test_url)
r = self.public_request(
path='/v2.0/tenants',
expected_status=401)
self.assertEqual('Keystone uri="%s"' % test_url,
r.headers.get('WWW-Authenticate'))
**** CubicPower OpenStack Study ****
class LegacyV2UsernameTests(object):
"""Tests to show the broken username behavior in V2.
The V2 API is documented to use `username` instead of `name`. The
API forced used to use name and left the username to fall into the
`extra` field.
These tests ensure this behavior works so fixes to `username`/`name`
will be backward compatible.
"""
**** CubicPower OpenStack Study ****
def create_user(self, **user_attrs):
"""Creates a users and returns the response object.
:param user_attrs: attributes added to the request body (optional)
"""
token = self.get_scoped_token()
body = {
'user': {
'name': uuid.uuid4().hex,
'enabled': True,
},
}
body['user'].update(user_attrs)
return self.admin_request(
method='POST',
path='/v2.0/users',
token=token,
body=body,
expected_status=200)
**** CubicPower OpenStack Study ****
def test_create_with_extra_username(self):
"""The response for creating a user will contain the extra fields."""
fake_username = uuid.uuid4().hex
r = self.create_user(username=fake_username)
self.assertValidUserResponse(r)
user = self.get_user_from_response(r)
self.assertEqual(fake_username, user.get('username'))
**** CubicPower OpenStack Study ****
def test_get_returns_username_from_extra(self):
"""The response for getting a user will contain the extra fields."""
token = self.get_scoped_token()
fake_username = uuid.uuid4().hex
r = self.create_user(username=fake_username)
id_ = self.get_user_attribute_from_response(r, 'id')
r = self.admin_request(path='/v2.0/users/%s' % id_, token=token)
self.assertValidUserResponse(r)
user = self.get_user_from_response(r)
self.assertEqual(fake_username, user.get('username'))
**** CubicPower OpenStack Study ****
def test_update_returns_new_username_when_adding_username(self):
"""The response for updating a user will contain the extra fields.
This is specifically testing for updating a username when a value
was not previously set.
"""
token = self.get_scoped_token()
r = self.create_user()
id_ = self.get_user_attribute_from_response(r, 'id')
name = self.get_user_attribute_from_response(r, 'name')
enabled = self.get_user_attribute_from_response(r, 'enabled')
r = self.admin_request(
method='PUT',
path='/v2.0/users/%s' % id_,
token=token,
body={
'user': {
'name': name,
'username': 'new_username',
'enabled': enabled,
},
},
expected_status=200)
self.assertValidUserResponse(r)
user = self.get_user_from_response(r)
self.assertEqual('new_username', user.get('username'))
**** CubicPower OpenStack Study ****
def test_update_returns_new_username_when_updating_username(self):
"""The response for updating a user will contain the extra fields.
This tests updating a username that was previously set.
"""
token = self.get_scoped_token()
r = self.create_user(username='original_username')
id_ = self.get_user_attribute_from_response(r, 'id')
name = self.get_user_attribute_from_response(r, 'name')
enabled = self.get_user_attribute_from_response(r, 'enabled')
r = self.admin_request(
method='PUT',
path='/v2.0/users/%s' % id_,
token=token,
body={
'user': {
'name': name,
'username': 'new_username',
'enabled': enabled,
},
},
expected_status=200)
self.assertValidUserResponse(r)
user = self.get_user_from_response(r)
self.assertEqual('new_username', user.get('username'))
**** CubicPower OpenStack Study ****
def test_username_is_always_returned_create(self):
"""Username is set as the value of name if no username is provided.
This matches the v2.0 spec where we really should be using username
and not name.
"""
r = self.create_user()
self.assertValidUserResponse(r)
user = self.get_user_from_response(r)
self.assertEqual(user.get('name'), user.get('username'))
**** CubicPower OpenStack Study ****
def test_username_is_always_returned_get(self):
"""Username is set as the value of name if no username is provided.
This matches the v2.0 spec where we really should be using username
and not name.
"""
token = self.get_scoped_token()
r = self.create_user()
id_ = self.get_user_attribute_from_response(r, 'id')
r = self.admin_request(path='/v2.0/users/%s' % id_, token=token)
self.assertValidUserResponse(r)
user = self.get_user_from_response(r)
self.assertEqual(user.get('name'), user.get('username'))
**** CubicPower OpenStack Study ****
def test_username_is_always_returned_get_by_name(self):
"""Username is set as the value of name if no username is provided.
This matches the v2.0 spec where we really should be using username
and not name.
"""
token = self.get_scoped_token()
r = self.create_user()
name = self.get_user_attribute_from_response(r, 'name')
r = self.admin_request(path='/v2.0/users?name=%s' % name, token=token)
self.assertValidUserResponse(r)
user = self.get_user_from_response(r)
self.assertEqual(user.get('name'), user.get('username'))
**** CubicPower OpenStack Study ****
def test_username_is_always_returned_update_no_username_provided(self):
"""Username is set as the value of name if no username is provided.
This matches the v2.0 spec where we really should be using username
and not name.
"""
token = self.get_scoped_token()
r = self.create_user()
id_ = self.get_user_attribute_from_response(r, 'id')
name = self.get_user_attribute_from_response(r, 'name')
enabled = self.get_user_attribute_from_response(r, 'enabled')
r = self.admin_request(
method='PUT',
path='/v2.0/users/%s' % id_,
token=token,
body={
'user': {
'name': name,
'enabled': enabled,
},
},
expected_status=200)
self.assertValidUserResponse(r)
user = self.get_user_from_response(r)
self.assertEqual(user.get('name'), user.get('username'))
**** CubicPower OpenStack Study ****
def test_updated_username_is_returned(self):
"""Username is set as the value of name if no username is provided.
This matches the v2.0 spec where we really should be using username
and not name.
"""
token = self.get_scoped_token()
r = self.create_user()
id_ = self.get_user_attribute_from_response(r, 'id')
name = self.get_user_attribute_from_response(r, 'name')
enabled = self.get_user_attribute_from_response(r, 'enabled')
r = self.admin_request(
method='PUT',
path='/v2.0/users/%s' % id_,
token=token,
body={
'user': {
'name': name,
'enabled': enabled,
},
},
expected_status=200)
self.assertValidUserResponse(r)
user = self.get_user_from_response(r)
self.assertEqual(user.get('name'), user.get('username'))
**** CubicPower OpenStack Study ****
def test_username_can_be_used_instead_of_name_create(self):
token = self.get_scoped_token()
r = self.admin_request(
method='POST',
path='/v2.0/users',
token=token,
body={
'user': {
'username': uuid.uuid4().hex,
'enabled': True,
},
},
expected_status=200)
self.assertValidUserResponse(r)
user = self.get_user_from_response(r)
self.assertEqual(user.get('name'), user.get('username'))
**** CubicPower OpenStack Study ****
def test_username_can_be_used_instead_of_name_update(self):
token = self.get_scoped_token()
r = self.create_user()
id_ = self.get_user_attribute_from_response(r, 'id')
new_username = uuid.uuid4().hex
enabled = self.get_user_attribute_from_response(r, 'enabled')
r = self.admin_request(
method='PUT',
path='/v2.0/users/%s' % id_,
token=token,
body={
'user': {
'username': new_username,
'enabled': enabled,
},
},
expected_status=200)
self.assertValidUserResponse(r)
user = self.get_user_from_response(r)
self.assertEqual(new_username, user.get('name'))
self.assertEqual(user.get('name'), user.get('username'))
**** CubicPower OpenStack Study ****
class RestfulTestCase(rest.RestfulTestCase):
**** CubicPower OpenStack Study ****
def setUp(self):
super(RestfulTestCase, self).setUp()
# TODO(termie): add an admin user to the fixtures and use that user
# override the fixtures, for now
self.assignment_api.add_role_to_user_and_project(
self.user_foo['id'],
self.tenant_bar['id'],
self.role_admin['id'])
**** CubicPower OpenStack Study ****
class JsonTestCase(RestfulTestCase, CoreApiTests, LegacyV2UsernameTests):
content_type = 'json'
**** CubicPower OpenStack Study ****
def _get_user_id(self, r):
return r['user']['id']
**** CubicPower OpenStack Study ****
def _get_role_name(self, r):
return r['roles'][0]['name']
**** CubicPower OpenStack Study ****
def _get_role_id(self, r):
return r['roles'][0]['id']
**** CubicPower OpenStack Study ****
def _get_project_id(self, r):
return r['tenant']['id']
**** CubicPower OpenStack Study ****
def _get_token_id(self, r):
"""Applicable only to JSON."""
return r.result['access']['token']['id']
**** CubicPower OpenStack Study ****
def assertNoRoles(self, r):
self.assertEqual([], r['roles'])
**** CubicPower OpenStack Study ****
def assertValidErrorResponse(self, r):
self.assertIsNotNone(r.result.get('error'))
self.assertValidError(r.result['error'])
self.assertEqual(r.result['error']['code'], r.status_code)
**** CubicPower OpenStack Study ****
def assertValidExtension(self, extension, expected):
super(JsonTestCase, self).assertValidExtension(extension)
descriptions = [ext['description'] for ext in six.itervalues(expected)]
description = extension.get('description')
self.assertIsNotNone(description)
self.assertIn(description, descriptions)
self.assertIsNotNone(extension.get('links'))
self.assertNotEmpty(extension.get('links'))
for link in extension.get('links'):
self.assertValidExtensionLink(link)
**** CubicPower OpenStack Study ****
def assertValidExtensionListResponse(self, r, expected):
self.assertIsNotNone(r.result.get('extensions'))
self.assertIsNotNone(r.result['extensions'].get('values'))
self.assertNotEmpty(r.result['extensions'].get('values'))
for extension in r.result['extensions']['values']:
self.assertValidExtension(extension, expected)
**** CubicPower OpenStack Study ****
def assertValidExtensionResponse(self, r, expected):
self.assertValidExtension(r.result.get('extension'), expected)
**** CubicPower OpenStack Study ****
def assertValidUser(self, user):
super(JsonTestCase, self).assertValidUser(user)
self.assertNotIn('default_project_id', user)
if 'tenantId' in user:
# NOTE(morganfainberg): tenantId should never be "None", it gets
# filtered out of the object if it is there. This is suspenders
# and a belt check to avoid unintended regressions.
self.assertIsNotNone(user.get('tenantId'))
**** CubicPower OpenStack Study ****
def assertValidAuthenticationResponse(self, r,
require_service_catalog=False):
self.assertIsNotNone(r.result.get('access'))
self.assertIsNotNone(r.result['access'].get('token'))
self.assertIsNotNone(r.result['access'].get('user'))
# validate token
self.assertIsNotNone(r.result['access']['token'].get('id'))
self.assertIsNotNone(r.result['access']['token'].get('expires'))
tenant = r.result['access']['token'].get('tenant')
if tenant is not None:
# validate tenant
self.assertIsNotNone(tenant.get('id'))
self.assertIsNotNone(tenant.get('name'))
# validate user
self.assertIsNotNone(r.result['access']['user'].get('id'))
self.assertIsNotNone(r.result['access']['user'].get('name'))
if require_service_catalog:
# roles are only provided with a service catalog
roles = r.result['access']['user'].get('roles')
self.assertNotEmpty(roles)
for role in roles:
self.assertIsNotNone(role.get('name'))
serviceCatalog = r.result['access'].get('serviceCatalog')
# validate service catalog
if require_service_catalog:
self.assertIsNotNone(serviceCatalog)
if serviceCatalog is not None:
self.assertIsInstance(serviceCatalog, list)
if require_service_catalog:
self.assertNotEmpty(serviceCatalog)
for service in r.result['access']['serviceCatalog']:
# validate service
self.assertIsNotNone(service.get('name'))
self.assertIsNotNone(service.get('type'))
# services contain at least one endpoint
self.assertIsNotNone(service.get('endpoints'))
self.assertNotEmpty(service['endpoints'])
for endpoint in service['endpoints']:
# validate service endpoint
self.assertIsNotNone(endpoint.get('publicURL'))
**** CubicPower OpenStack Study ****
def assertValidTenantListResponse(self, r):
self.assertIsNotNone(r.result.get('tenants'))
self.assertNotEmpty(r.result['tenants'])
for tenant in r.result['tenants']:
self.assertValidTenant(tenant)
self.assertIsNotNone(tenant.get('enabled'))
self.assertIn(tenant.get('enabled'), [True, False])
**** CubicPower OpenStack Study ****
def assertValidUserResponse(self, r):
self.assertIsNotNone(r.result.get('user'))
self.assertValidUser(r.result['user'])
**** CubicPower OpenStack Study ****
def assertValidTenantResponse(self, r):
self.assertIsNotNone(r.result.get('tenant'))
self.assertValidTenant(r.result['tenant'])
**** CubicPower OpenStack Study ****
def assertValidRoleListResponse(self, r):
self.assertIsNotNone(r.result.get('roles'))
self.assertNotEmpty(r.result['roles'])
for role in r.result['roles']:
self.assertValidRole(role)
**** CubicPower OpenStack Study ****
def assertValidVersion(self, version):
super(JsonTestCase, self).assertValidVersion(version)
self.assertIsNotNone(version.get('links'))
self.assertNotEmpty(version.get('links'))
for link in version.get('links'):
self.assertIsNotNone(link.get('rel'))
self.assertIsNotNone(link.get('href'))
self.assertIsNotNone(version.get('media-types'))
self.assertNotEmpty(version.get('media-types'))
for media in version.get('media-types'):
self.assertIsNotNone(media.get('base'))
self.assertIsNotNone(media.get('type'))
**** CubicPower OpenStack Study ****
def assertValidMultipleChoiceResponse(self, r):
self.assertIsNotNone(r.result.get('versions'))
self.assertIsNotNone(r.result['versions'].get('values'))
self.assertNotEmpty(r.result['versions']['values'])
for version in r.result['versions']['values']:
self.assertValidVersion(version)
**** CubicPower OpenStack Study ****
def assertValidVersionResponse(self, r):
self.assertValidVersion(r.result.get('version'))
**** CubicPower OpenStack Study ****
def assertValidEndpointListResponse(self, r):
self.assertIsNotNone(r.result.get('endpoints'))
self.assertNotEmpty(r.result['endpoints'])
for endpoint in r.result['endpoints']:
self.assertIsNotNone(endpoint.get('id'))
self.assertIsNotNone(endpoint.get('name'))
self.assertIsNotNone(endpoint.get('type'))
self.assertIsNotNone(endpoint.get('publicURL'))
self.assertIsNotNone(endpoint.get('internalURL'))
self.assertIsNotNone(endpoint.get('adminURL'))
**** CubicPower OpenStack Study ****
def get_user_from_response(self, r):
return r.result.get('user')
**** CubicPower OpenStack Study ****
def get_user_attribute_from_response(self, r, attribute_name):
return r.result['user'][attribute_name]
**** CubicPower OpenStack Study ****
def test_service_crud_requires_auth(self):
"""Service CRUD should 401 without an X-Auth-Token (bug 1006822)."""
# values here don't matter because we should 401 before they're checked
service_path = '/v2.0/OS-KSADM/services/%s' % uuid.uuid4().hex
service_body = {
'OS-KSADM:service': {
'name': uuid.uuid4().hex,
'type': uuid.uuid4().hex,
},
}
r = self.admin_request(method='GET',
path='/v2.0/OS-KSADM/services',
expected_status=401)
self.assertValidErrorResponse(r)
r = self.admin_request(method='POST',
path='/v2.0/OS-KSADM/services',
body=service_body,
expected_status=401)
self.assertValidErrorResponse(r)
r = self.admin_request(method='GET',
path=service_path,
expected_status=401)
self.assertValidErrorResponse(r)
r = self.admin_request(method='DELETE',
path=service_path,
expected_status=401)
self.assertValidErrorResponse(r)
**** CubicPower OpenStack Study ****
def test_user_role_list_requires_auth(self):
"""User role list should 401 without an X-Auth-Token (bug 1006815)."""
# values here don't matter because we should 401 before they're checked
path = '/v2.0/tenants/%(tenant_id)s/users/%(user_id)s/roles' % {
'tenant_id': uuid.uuid4().hex,
'user_id': uuid.uuid4().hex,
}
r = self.admin_request(path=path, expected_status=401)
self.assertValidErrorResponse(r)
**** CubicPower OpenStack Study ****
def test_fetch_revocation_list_nonadmin_fails(self):
self.admin_request(
method='GET',
path='/v2.0/tokens/revoked',
expected_status=401)
**** CubicPower OpenStack Study ****
def test_fetch_revocation_list_admin_200(self):
token = self.get_scoped_token()
r = self.admin_request(
method='GET',
path='/v2.0/tokens/revoked',
token=token,
expected_status=200)
self.assertValidRevocationListResponse(r)
**** CubicPower OpenStack Study ****
def assertValidRevocationListResponse(self, response):
self.assertIsNotNone(response.result['signed'])
**** CubicPower OpenStack Study ****
def test_create_update_user_json_invalid_enabled_type(self):
# Enforce usage of boolean for 'enabled' field in JSON
token = self.get_scoped_token()
# Test CREATE request
r = self.admin_request(
method='POST',
path='/v2.0/users',
body={
'user': {
'name': uuid.uuid4().hex,
'password': uuid.uuid4().hex,
# In JSON, "true|false" are not boolean
'enabled': "true",
},
},
token=token,
expected_status=400)
self.assertValidErrorResponse(r)
# Test UPDATE request
r = self.admin_request(
method='PUT',
path='/v2.0/users/%(user_id)s' % {
'user_id': self.user_foo['id'],
},
body={
'user': {
# In JSON, "true|false" are not boolean
'enabled': "true",
},
},
token=token,
expected_status=400)
self.assertValidErrorResponse(r)
**** CubicPower OpenStack Study ****
def test_authenticating_a_user_with_an_OSKSADM_password(self):
token = self.get_scoped_token()
username = uuid.uuid4().hex
password = uuid.uuid4().hex
# create the user
r = self.admin_request(
method='POST',
path='/v2.0/users',
body={
'user': {
'name': username,
'OS-KSADM:password': password,
'enabled': True,
},
},
token=token)
# successfully authenticate
self.public_request(
method='POST',
path='/v2.0/tokens',
body={
'auth': {
'passwordCredentials': {
'username': username,
'password': password,
},
},
},
expected_status=200)
# ensure password doesn't leak
user_id = r.result['user']['id']
r = self.admin_request(
method='GET',
path='/v2.0/users/%s' % user_id,
token=token,
expected_status=200)
self.assertNotIn('OS-KSADM:password', r.result['user'])
**** CubicPower OpenStack Study ****
def test_updating_a_user_with_an_OSKSADM_password(self):
token = self.get_scoped_token()
user_id = self.user_foo['id']
password = uuid.uuid4().hex
# update the user
self.admin_request(
method='PUT',
path='/v2.0/users/%s/OS-KSADM/password' % user_id,
body={
'user': {
'password': password,
},
},
token=token,
expected_status=200)
# successfully authenticate
self.public_request(
method='POST',
path='/v2.0/tokens',
body={
'auth': {
'passwordCredentials': {
'username': self.user_foo['name'],
'password': password,
},
},
},
expected_status=200)
**** CubicPower OpenStack Study ****
class RevokeApiJsonTestCase(JsonTestCase):
**** CubicPower OpenStack Study ****
def config_overrides(self):
super(RevokeApiJsonTestCase, self).config_overrides()
self.config_fixture.config(
group='revoke',
driver='keystone.contrib.revoke.backends.kvs.Revoke')
self.config_fixture.config(
group='token',
provider='keystone.token.providers.pki.Provider',
revoke_by_id=False)
**** CubicPower OpenStack Study ****
def test_fetch_revocation_list_admin_200(self):
self.skipTest('Revoke API disables revocation_list.')
**** CubicPower OpenStack Study ****
class XmlTestCase(RestfulTestCase, CoreApiTests, LegacyV2UsernameTests):
xmlns = 'http://docs.openstack.org/identity/api/v2.0'
content_type = 'xml'
**** CubicPower OpenStack Study ****
def _get_user_id(self, r):
return r.get('id')
**** CubicPower OpenStack Study ****
def _get_role_name(self, r):
return r[0].get('name')
**** CubicPower OpenStack Study ****
def _get_role_id(self, r):
return r[0].get('id')
**** CubicPower OpenStack Study ****
def _get_project_id(self, r):
return r.get('id')
**** CubicPower OpenStack Study ****
def assertNoRoles(self, r):
self.assertEqual(0, len(r))
**** CubicPower OpenStack Study ****
def _get_token_id(self, r):
return r.result.find(self._tag('token')).get('id')
**** CubicPower OpenStack Study ****
def _tag(self, tag_name, xmlns=None):
"""Helper method to build an namespaced element name."""
return '{%(ns)s}%(tag)s' % {'ns': xmlns or self.xmlns, 'tag': tag_name}
**** CubicPower OpenStack Study ****
def assertValidErrorResponse(self, r):
xml = r.result
self.assertEqual(self._tag('error'), xml.tag)
self.assertValidError(xml)
self.assertEqual(str(r.status_code), xml.get('code'))
**** CubicPower OpenStack Study ****
def assertValidExtension(self, extension, expected):
super(XmlTestCase, self).assertValidExtension(extension)
self.assertIsNotNone(extension.find(self._tag('description')))
self.assertTrue(extension.find(self._tag('description')).text)
links = extension.find(self._tag('links'))
self.assertNotEmpty(links.findall(self._tag('link')))
descriptions = [ext['description'] for ext in six.itervalues(expected)]
description = extension.find(self._tag('description')).text
self.assertIn(description, descriptions)
for link in links.findall(self._tag('link')):
self.assertValidExtensionLink(link)
**** CubicPower OpenStack Study ****
def assertValidExtensionListResponse(self, r, expected):
xml = r.result
self.assertEqual(self._tag('extensions'), xml.tag)
self.assertNotEmpty(xml.findall(self._tag('extension')))
for ext in xml.findall(self._tag('extension')):
self.assertValidExtension(ext, expected)
**** CubicPower OpenStack Study ****
def assertValidExtensionResponse(self, r, expected):
xml = r.result
self.assertEqual(self._tag('extension'), xml.tag)
self.assertValidExtension(xml, expected)
**** CubicPower OpenStack Study ****
def assertValidVersion(self, version):
super(XmlTestCase, self).assertValidVersion(version)
links = version.find(self._tag('links'))
self.assertIsNotNone(links)
self.assertNotEmpty(links.findall(self._tag('link')))
for link in links.findall(self._tag('link')):
self.assertIsNotNone(link.get('rel'))
self.assertIsNotNone(link.get('href'))
media_types = version.find(self._tag('media-types'))
self.assertIsNotNone(media_types)
self.assertNotEmpty(media_types.findall(self._tag('media-type')))
for media in media_types.findall(self._tag('media-type')):
self.assertIsNotNone(media.get('base'))
self.assertIsNotNone(media.get('type'))
**** CubicPower OpenStack Study ****
def assertValidMultipleChoiceResponse(self, r):
xml = r.result
self.assertEqual(self._tag('versions'), xml.tag)
self.assertNotEmpty(xml.findall(self._tag('version')))
for version in xml.findall(self._tag('version')):
self.assertValidVersion(version)
**** CubicPower OpenStack Study ****
def assertValidVersionResponse(self, r):
xml = r.result
self.assertEqual(self._tag('version'), xml.tag)
self.assertValidVersion(xml)
**** CubicPower OpenStack Study ****
def assertValidEndpointListResponse(self, r):
xml = r.result
self.assertEqual(self._tag('endpoints'), xml.tag)
self.assertNotEmpty(xml.findall(self._tag('endpoint')))
for endpoint in xml.findall(self._tag('endpoint')):
self.assertIsNotNone(endpoint.get('id'))
self.assertIsNotNone(endpoint.get('name'))
self.assertIsNotNone(endpoint.get('type'))
self.assertIsNotNone(endpoint.get('publicURL'))
self.assertIsNotNone(endpoint.get('internalURL'))
self.assertIsNotNone(endpoint.get('adminURL'))
**** CubicPower OpenStack Study ****
def assertValidTenantResponse(self, r):
xml = r.result
self.assertEqual(self._tag('tenant'), xml.tag)
self.assertValidTenant(xml)
**** CubicPower OpenStack Study ****
def assertValidUserResponse(self, r):
xml = r.result
self.assertEqual(self._tag('user'), xml.tag)
self.assertValidUser(xml)
**** CubicPower OpenStack Study ****
def assertValidRoleListResponse(self, r):
xml = r.result
self.assertEqual(self._tag('roles'), xml.tag)
self.assertNotEmpty(r.result.findall(self._tag('role')))
for role in r.result.findall(self._tag('role')):
self.assertValidRole(role)
**** CubicPower OpenStack Study ****
def assertValidAuthenticationResponse(self, r,
require_service_catalog=False):
xml = r.result
self.assertEqual(self._tag('access'), xml.tag)
# validate token
token = xml.find(self._tag('token'))
self.assertIsNotNone(token)
self.assertIsNotNone(token.get('id'))
self.assertIsNotNone(token.get('expires'))
tenant = token.find(self._tag('tenant'))
if tenant is not None:
# validate tenant
self.assertValidTenant(tenant)
self.assertIn(tenant.get('enabled'), ['true', 'false'])
user = xml.find(self._tag('user'))
self.assertIsNotNone(user)
self.assertIsNotNone(user.get('id'))
self.assertIsNotNone(user.get('name'))
if require_service_catalog:
# roles are only provided with a service catalog
roles = user.findall(self._tag('role'))
self.assertNotEmpty(roles)
for role in roles:
self.assertIsNotNone(role.get('name'))
serviceCatalog = xml.find(self._tag('serviceCatalog'))
# validate the serviceCatalog
if require_service_catalog:
self.assertIsNotNone(serviceCatalog)
if serviceCatalog is not None:
services = serviceCatalog.findall(self._tag('service'))
if require_service_catalog:
self.assertNotEmpty(services)
for service in services:
# validate service
self.assertIsNotNone(service.get('name'))
self.assertIsNotNone(service.get('type'))
# services contain at least one endpoint
endpoints = service.findall(self._tag('endpoint'))
self.assertNotEmpty(endpoints)
for endpoint in endpoints:
# validate service endpoint
self.assertIsNotNone(endpoint.get('publicURL'))
**** CubicPower OpenStack Study ****
def assertValidTenantListResponse(self, r):
xml = r.result
self.assertEqual(self._tag('tenants'), xml.tag)
self.assertNotEmpty(r.result)
for tenant in r.result.findall(self._tag('tenant')):
self.assertValidTenant(tenant)
self.assertIn(tenant.get('enabled'), ['true', 'false'])
**** CubicPower OpenStack Study ****
def get_user_from_response(self, r):
return r.result
**** CubicPower OpenStack Study ****
def get_user_attribute_from_response(self, r, attribute_name):
return r.result.get(attribute_name)
**** CubicPower OpenStack Study ****
def test_authenticate_with_invalid_xml_in_password(self):
# public_request would auto escape the ampersand
self.public_request(
method='POST',
path='/v2.0/tokens',
headers={
'Content-Type': 'application/xml'
},
body="""
tenantId="bar">
""",
expected_status=400,
convert=False)
**** CubicPower OpenStack Study ****
def test_add_tenant_xml(self):
"""Create a tenant without providing description field."""
token = self.get_scoped_token()
r = self.admin_request(
method='POST',
path='/v2.0/tenants',
headers={
'Content-Type': 'application/xml',
'X-Auth-Token': token
},
body="""
enabled="true" name="ACME Corp">
""",
convert=False)
self._from_content_type(r, 'json')
self.assertIsNotNone(r.result.get('tenant'))
self.assertValidTenant(r.result['tenant'])
self.assertEqual("", r.result['tenant'].get('description'))
**** CubicPower OpenStack Study ****
def test_add_tenant_json(self):
"""Create a tenant without providing description field."""
token = self.get_scoped_token()
r = self.admin_request(
method='POST',
path='/v2.0/tenants',
headers={
'Content-Type': 'application/json',
'X-Auth-Token': token
},
body="""
{"tenant":{
"name":"test1",
"description":"",
"enabled":true}
}
""",
convert=False)
self._from_content_type(r, 'json')
self.assertIsNotNone(r.result.get('tenant'))
self.assertValidTenant(r.result['tenant'])
self.assertEqual("", r.result['tenant'].get('description'))
**** CubicPower OpenStack Study ****
def test_create_project_invalid_enabled_type_string(self):
# Forbidden usage of string for 'enabled' field in JSON and XML
token = self.get_scoped_token()
r = self.admin_request(
method='POST',
path='/v2.0/tenants',
body={
'tenant': {
'name': uuid.uuid4().hex,
# In XML, only "true|false" are converted to boolean.
'enabled': "False",
},
},
token=token,
expected_status=400)
self.assertValidErrorResponse(r)
**** CubicPower OpenStack Study ****
def test_update_project_invalid_enabled_type_string(self):
# Forbidden usage of string for 'enabled' field in JSON and XML
token = self.get_scoped_token()
path = '/v2.0/tenants/%(tenant_id)s' % {
'tenant_id': self.tenant_bar['id'],
}
r = self.admin_request(
method='PUT',
path=path,
body={
'tenant': {
# In XML, only "true|false" are converted to boolean.
'enabled': "False",
},
},
token=token,
expected_status=400)
self.assertValidErrorResponse(r)
**** CubicPower OpenStack Study ****
def test_authenticating_a_user_with_an_OSKSADM_password(self):
token = self.get_scoped_token()
xmlns = "http://docs.openstack.org/identity/api/ext/OS-KSADM/v1.0"
username = uuid.uuid4().hex
password = uuid.uuid4().hex
# create the user
self.admin_request(
method='POST',
path='/v2.0/users',
headers={
'Content-Type': 'application/xml'
},
body="""
xmlns:OS-KSADM="%(xmlns)s" name="%(username)s"
OS-KSADM:password="%(password)s"
enabled="true"/>
""" % dict(username=username, password=password, xmlns=xmlns),
token=token,
expected_status=200,
convert=False)
# successfully authenticate
self.public_request(
method='POST',
path='/v2.0/tokens',
headers={
'Content-Type': 'application/xml'
},
body="""
xmlns:OS-KSADM="%(xmlns)s"> username="%(username)s" password="%(password)s"/>
""" % dict(username=username, password=password, xmlns=xmlns),
token=token,
expected_status=200,
convert=False)
**** CubicPower OpenStack Study ****
def test_remove_role_revokes_token(self):
self.md_foobar = self.assignment_api.add_role_to_user_and_project(
self.user_foo['id'],
self.tenant_service['id'],
self.role_service['id'])
token = self.get_scoped_token(tenant_id='service')
r = self.admin_request(
path='/v2.0/tokens/%s' % token,
token=token)
self.assertValidAuthenticationResponse(r)
self.assignment_api.remove_role_from_user_and_project(
self.user_foo['id'],
self.tenant_service['id'],
self.role_service['id'])
# TODO(ayoung): test fails due to XML problem
# r = self.admin_request(
# path='/v2.0/tokens/%s' % token,
# token=token,
# expected_status=401)