¡@

Home 

OpenStack Study: test_v3_credential.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Copyright 2013 OpenStack Foundation

#

# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

import hashlib

import json

import uuid

from keystoneclient.contrib.ec2 import utils as ec2_utils

from keystone import config

from keystone import exception

from keystone.tests import test_v3

CONF = config.CONF

**** CubicPower OpenStack Study ****

class CredentialBaseTestCase(test_v3.RestfulTestCase):

**** CubicPower OpenStack Study ****

    def _create_dict_blob_credential(self):

        blob = {"access": uuid.uuid4().hex,

                "secret": uuid.uuid4().hex}

        credential_id = hashlib.sha256(blob['access']).hexdigest()

        credential = self.new_credential_ref(

            user_id=self.user['id'],

            project_id=self.project_id)

        credential['id'] = credential_id

        # Store the blob as a dict *not* JSON ref bug #1259584

        # This means we can test the dict->json workaround, added

        # as part of the bugfix for backwards compatibility works.

        credential['blob'] = blob

        credential['type'] = 'ec2'

        # Create direct via the DB API to avoid validation failure

        self.credential_api.create_credential(

            credential_id,

            credential)

        expected_blob = json.dumps(blob)

        return expected_blob, credential_id

**** CubicPower OpenStack Study ****

class CredentialTestCase(CredentialBaseTestCase):

"""Test credential CRUD."""

**** CubicPower OpenStack Study ****

    def setUp(self):

        super(CredentialTestCase, self).setUp()

        self.credential_id = uuid.uuid4().hex

        self.credential = self.new_credential_ref(

            user_id=self.user['id'],

            project_id=self.project_id)

        self.credential['id'] = self.credential_id

        self.credential_api.create_credential(

            self.credential_id,

            self.credential)

**** CubicPower OpenStack Study ****

    def test_credential_api_delete_credentials_for_project(self):

        self.credential_api.delete_credentials_for_project(self.project_id)

        # Test that the credential that we created in .setUp no longer exists

        # once we delete all credentials for self.project_id

        self.assertRaises(exception.CredentialNotFound,

                          self.credential_api.get_credential,

                          credential_id=self.credential_id)

**** CubicPower OpenStack Study ****

    def test_credential_api_delete_credentials_for_user(self):

        self.credential_api.delete_credentials_for_user(self.user_id)

        # Test that the credential that we created in .setUp no longer exists

        # once we delete all credentials for self.user_id

        self.assertRaises(exception.CredentialNotFound,

                          self.credential_api.get_credential,

                          credential_id=self.credential_id)

**** CubicPower OpenStack Study ****

    def test_list_credentials(self):

        """Call ``GET /credentials``."""

        r = self.get('/credentials')

        self.assertValidCredentialListResponse(r, ref=self.credential)

**** CubicPower OpenStack Study ****

    def test_list_credentials_xml(self):

        """Call ``GET /credentials`` (xml data)."""

        r = self.get('/credentials', content_type='xml')

        self.assertValidCredentialListResponse(r, ref=self.credential)

**** CubicPower OpenStack Study ****

    def test_create_credential(self):

        """Call ``POST /credentials``."""

        ref = self.new_credential_ref(user_id=self.user['id'])

        r = self.post(

            '/credentials',

            body={'credential': ref})

        self.assertValidCredentialResponse(r, ref)

**** CubicPower OpenStack Study ****

    def test_get_credential(self):

        """Call ``GET /credentials/{credential_id}``."""

        r = self.get(

            '/credentials/%(credential_id)s' % {

                'credential_id': self.credential_id})

        self.assertValidCredentialResponse(r, self.credential)

**** CubicPower OpenStack Study ****

    def test_update_credential(self):

        """Call ``PATCH /credentials/{credential_id}``."""

        ref = self.new_credential_ref(

            user_id=self.user['id'],

            project_id=self.project_id)

        del ref['id']

        r = self.patch(

            '/credentials/%(credential_id)s' % {

                'credential_id': self.credential_id},

            body={'credential': ref})

        self.assertValidCredentialResponse(r, ref)

**** CubicPower OpenStack Study ****

    def test_delete_credential(self):

        """Call ``DELETE /credentials/{credential_id}``."""

        self.delete(

            '/credentials/%(credential_id)s' % {

                'credential_id': self.credential_id})

**** CubicPower OpenStack Study ****

    def test_create_ec2_credential(self):

        """Call ``POST /credentials`` for creating ec2 credential."""

        ref = self.new_credential_ref(user_id=self.user['id'])

        blob = {"access": uuid.uuid4().hex,

                "secret": uuid.uuid4().hex}

        ref['blob'] = json.dumps(blob)

        ref['type'] = 'ec2'

        r = self.post(

            '/credentials',

            body={'credential': ref})

        self.assertValidCredentialResponse(r, ref)

        # Assert credential id is same as hash of access key id for

        # ec2 credentials

        self.assertEqual(r.result['credential']['id'],

                         hashlib.sha256(blob['access']).hexdigest())

        # Create second ec2 credential with the same access key id and check

        # for conflict.

        self.post(

            '/credentials',

            body={'credential': ref}, expected_status=409)

**** CubicPower OpenStack Study ****

    def test_get_ec2_dict_blob(self):

        """Ensure non-JSON blob data is correctly converted."""

        expected_blob, credential_id = self._create_dict_blob_credential()

        r = self.get(

            '/credentials/%(credential_id)s' % {

                'credential_id': credential_id})

        self.assertEqual(expected_blob, r.result['credential']['blob'])

**** CubicPower OpenStack Study ****

    def test_list_ec2_dict_blob(self):

        """Ensure non-JSON blob data is correctly converted."""

        expected_blob, credential_id = self._create_dict_blob_credential()

        list_r = self.get('/credentials')

        list_creds = list_r.result['credentials']

        list_ids = [r['id'] for r in list_creds]

        self.assertIn(credential_id, list_ids)

        for r in list_creds:

            if r['id'] == credential_id:

                self.assertEqual(expected_blob, r['blob'])

**** CubicPower OpenStack Study ****

    def test_create_non_ec2_credential(self):

        """Call ``POST /credentials`` for creating non-ec2 credential."""

        ref = self.new_credential_ref(user_id=self.user['id'])

        blob = {"access": uuid.uuid4().hex,

                "secret": uuid.uuid4().hex}

        ref['blob'] = json.dumps(blob)

        r = self.post(

            '/credentials',

            body={'credential': ref})

        self.assertValidCredentialResponse(r, ref)

        # Assert credential id is not same as hash of access key id for

        # non-ec2 credentials

        self.assertNotEqual(r.result['credential']['id'],

                            hashlib.sha256(blob['access']).hexdigest())

**** CubicPower OpenStack Study ****

    def test_create_ec2_credential_with_invalid_blob(self):

        """Call ``POST /credentials`` for creating ec2

           credential with invalid blob.

        """

        ref = self.new_credential_ref(user_id=self.user['id'])

        ref['blob'] = '{"abc":"def"d}'

        ref['type'] = 'ec2'

        # Assert 400 status for bad request containing invalid

        # blob

        response = self.post(

            '/credentials',

            body={'credential': ref}, expected_status=400)

        self.assertValidErrorResponse(response)

**** CubicPower OpenStack Study ****

    def test_create_credential_with_admin_token(self):

        # Make sure we can create credential with the static admin token

        ref = self.new_credential_ref(user_id=self.user['id'])

        r = self.post(

            '/credentials',

            body={'credential': ref},

            token=CONF.admin_token)

        self.assertValidCredentialResponse(r, ref)

**** CubicPower OpenStack Study ****

class TestCredentialTrustScoped(test_v3.RestfulTestCase):

"""Test credential with trust scoped token."""

**** CubicPower OpenStack Study ****

    def setUp(self):

        super(TestCredentialTrustScoped, self).setUp()

        self.trustee_user_id = uuid.uuid4().hex

        self.trustee_user = self.new_user_ref(domain_id=self.domain_id)

        self.trustee_user['id'] = self.trustee_user_id

        self.identity_api.create_user(self.trustee_user_id, self.trustee_user)

**** CubicPower OpenStack Study ****

    def config_overrides(self):

        super(TestCredentialTrustScoped, self).config_overrides()

        self.config_fixture.config(group='trust', enabled=True)

**** CubicPower OpenStack Study ****

    def test_trust_scoped_ec2_credential(self):

        """Call ``POST /credentials`` for creating ec2 credential."""

        # Create the trust

        ref = self.new_trust_ref(

            trustor_user_id=self.user_id,

            trustee_user_id=self.trustee_user_id,

            project_id=self.project_id,

            impersonation=True,

            expires=dict(minutes=1),

            role_ids=[self.role_id])

        del ref['id']

        r = self.post('/OS-TRUST/trusts', body={'trust': ref})

        trust = self.assertValidTrustResponse(r)

        # Get a trust scoped token

        auth_data = self.build_authentication_request(

            user_id=self.trustee_user['id'],

            password=self.trustee_user['password'],

            trust_id=trust['id'])

        r = self.post('/auth/tokens', body=auth_data)

        self.assertValidProjectTrustScopedTokenResponse(r, self.user)

        trust_id = r.result['token']['OS-TRUST:trust']['id']

        token_id = r.headers.get('X-Subject-Token')

        # Create the credential with the trust scoped token

        ref = self.new_credential_ref(user_id=self.user['id'])

        blob = {"access": uuid.uuid4().hex,

                "secret": uuid.uuid4().hex}

        ref['blob'] = json.dumps(blob)

        ref['type'] = 'ec2'

        r = self.post(

            '/credentials',

            body={'credential': ref},

            token=token_id)

        # We expect the response blob to contain the trust_id

        ret_ref = ref.copy()

        ret_blob = blob.copy()

        ret_blob['trust_id'] = trust_id

        ret_ref['blob'] = json.dumps(ret_blob)

        self.assertValidCredentialResponse(r, ref=ret_ref)

        # Assert credential id is same as hash of access key id for

        # ec2 credentials

        self.assertEqual(r.result['credential']['id'],

                         hashlib.sha256(blob['access']).hexdigest())

        # Create second ec2 credential with the same access key id and check

        # for conflict.

        self.post(

            '/credentials',

            body={'credential': ref},

            token=token_id,

            expected_status=409)

**** CubicPower OpenStack Study ****

class TestCredentialEc2(CredentialBaseTestCase):

"""Test v3 credential compatibility with ec2tokens."""

**** CubicPower OpenStack Study ****

    def setUp(self):

        super(TestCredentialEc2, self).setUp()

**** CubicPower OpenStack Study ****

    def _validate_signature(self, access, secret):

        """Test signature validation with the access/secret provided."""

        signer = ec2_utils.Ec2Signer(secret)

        params = {'SignatureMethod': 'HmacSHA256',

                  'SignatureVersion': '2',

                  'AWSAccessKeyId': access}

        request = {'host': 'foo',

                   'verb': 'GET',

                   'path': '/bar',

                   'params': params}

        signature = signer.generate(request)

        # Now make a request to validate the signed dummy request via the

        # ec2tokens API.  This proves the v3 ec2 credentials actually work.

        sig_ref = {'access': access,

                   'signature': signature,

                   'host': 'foo',

                   'verb': 'GET',

                   'path': '/bar',

                   'params': params}

        r = self.post(

            '/ec2tokens',

            body={'ec2Credentials': sig_ref},

            expected_status=200)

        self.assertValidTokenResponse(r)

**** CubicPower OpenStack Study ****

    def test_ec2_credential_signature_validate(self):

        """Test signature validation with a v3 ec2 credential."""

        ref = self.new_credential_ref(

            user_id=self.user['id'],

            project_id=self.project_id)

        blob = {"access": uuid.uuid4().hex,

                "secret": uuid.uuid4().hex}

        ref['blob'] = json.dumps(blob)

        ref['type'] = 'ec2'

        r = self.post(

            '/credentials',

            body={'credential': ref})

        self.assertValidCredentialResponse(r, ref)

        # Assert credential id is same as hash of access key id

        self.assertEqual(r.result['credential']['id'],

                         hashlib.sha256(blob['access']).hexdigest())

        cred_blob = json.loads(r.result['credential']['blob'])

        self.assertEqual(blob, cred_blob)

        self._validate_signature(access=cred_blob['access'],

                                 secret=cred_blob['secret'])

**** CubicPower OpenStack Study ****

    def test_ec2_credential_signature_validate_legacy(self):

        """Test signature validation with a legacy v3 ec2 credential."""

        cred_json, credential_id = self._create_dict_blob_credential()

        cred_blob = json.loads(cred_json)

        self._validate_signature(access=cred_blob['access'],

                                 secret=cred_blob['secret'])

**** CubicPower OpenStack Study ****

    def _get_ec2_cred_uri(self):

        return '/users/%s/credentials/OS-EC2' % self.user_id

**** CubicPower OpenStack Study ****

    def _get_ec2_cred(self):

        uri = self._get_ec2_cred_uri()

        r = self.post(uri, body={'tenant_id': self.project_id})

        return r.result['credential']

**** CubicPower OpenStack Study ****

    def test_ec2_create_credential(self):

        """Test ec2 credential creation."""

        ec2_cred = self._get_ec2_cred()

        self.assertEqual(self.user_id, ec2_cred['user_id'])

        self.assertEqual(self.project_id, ec2_cred['tenant_id'])

        self.assertIsNone(ec2_cred['trust_id'])

        self._validate_signature(access=ec2_cred['access'],

                                 secret=ec2_cred['secret'])

        return ec2_cred

**** CubicPower OpenStack Study ****

    def test_ec2_get_credential(self):

        ec2_cred = self._get_ec2_cred()

        uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']])

        r = self.get(uri)

        self.assertDictEqual(ec2_cred, r.result['credential'])

**** CubicPower OpenStack Study ****

    def test_ec2_list_credentials(self):

        """Test ec2 credential listing."""

        self._get_ec2_cred_uri()

        uri = self._get_ec2_cred_uri()

        r = self.get(uri)

        cred_list = r.result

        self.assertEqual(1, len(cred_list))

**** CubicPower OpenStack Study ****

    def test_ec2_delete_credential(self):

        """Test ec2 credential deletion."""

        ec2_cred = self._get_ec2_cred()

        uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']])

        cred_from_credential_api = self.credential_api.list_credentials(

            user_id=self.user_id)

        self.assertEqual(1, len(cred_from_credential_api))

        self.delete(uri)

        self.assertRaises(exception.CredentialNotFound,

                          self.credential_api.get_credential,

                          cred_from_credential_api[0]['id'])