**** CubicPower OpenStack Study ****
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import uuid
import mock
from keystone.common import dependency
from keystone import config
from keystone.contrib.revoke import model
from keystone import exception
from keystone.openstack.common import timeutils
from keystone import tests
from keystone.tests import test_backend_sql
CONF = config.CONF
**** CubicPower OpenStack Study ****
def _new_id():
return uuid.uuid4().hex
**** CubicPower OpenStack Study ****
def _future_time():
expire_delta = datetime.timedelta(seconds=1000)
future_time = timeutils.utcnow() + expire_delta
return future_time
**** CubicPower OpenStack Study ****
def _past_time():
expire_delta = datetime.timedelta(days=-1000)
past_time = timeutils.utcnow() + expire_delta
return past_time
**** CubicPower OpenStack Study ****
def _sample_blank_token():
issued_delta = datetime.timedelta(minutes=-2)
issued_at = timeutils.utcnow() + issued_delta
token_data = model.blank_token_data(issued_at)
return token_data
**** CubicPower OpenStack Study ****
def _matches(event, token_values):
"""See if the token matches the revocation event.
Used as a secondary check on the logic to Check
By Tree Below: This is abrute force approach to checking.
Compare each attribute from the event with the corresponding
value from the token. If the event does not have a value for
the attribute, a match is still possible. If the event has a
value for the attribute, and it does not match the token, no match
is possible, so skip the remaining checks.
:param event one revocation event to match
:param token_values dictionary with set of values taken from the
token
:returns if the token matches the revocation event, indicating the
token has been revoked
"""
# The token has three attributes that can match the user_id
if event.user_id is not None:
for attribute_name in ['user_id', 'trustor_id', 'trustee_id']:
if event.user_id == token_values[attribute_name]:
break
else:
return False
# The token has two attributes that can match the domain_id
if event.domain_id is not None:
dom_id_matched = False
for attribute_name in ['user_domain_id', 'project_domain_id']:
if event.domain_id == token_values[attribute_name]:
dom_id_matched = True
break
if not dom_id_matched:
return False
# If any one check does not match, the while token does
# not match the event. The numerous return False indicate
# that the token is still valid and short-circuits the
# rest of the logic.
attribute_names = ['project_id',
'expires_at', 'trust_id', 'consumer_id',
'access_token_id']
for attribute_name in attribute_names:
if getattr(event, attribute_name) is not None:
if (getattr(event, attribute_name) !=
token_values[attribute_name]):
return False
if event.role_id is not None:
roles = token_values['roles']
role_found = False
for role in roles:
if event.role_id == role:
role_found = True
break
if not role_found:
return False
if token_values['issued_at'] > event.issued_before:
return False
return True
@dependency.requires('revoke_api')
**** CubicPower OpenStack Study ****
class RevokeTests(object):
**** CubicPower OpenStack Study ****
def test_list(self):
self.revoke_api.revoke_by_user(user_id=1)
self.assertEqual(1, len(self.revoke_api.get_events()))
self.revoke_api.revoke_by_user(user_id=2)
self.assertEqual(2, len(self.revoke_api.get_events()))
**** CubicPower OpenStack Study ****
def test_list_since(self):
self.revoke_api.revoke_by_user(user_id=1)
self.revoke_api.revoke_by_user(user_id=2)
past = timeutils.utcnow() - datetime.timedelta(seconds=1000)
self.assertEqual(2, len(self.revoke_api.get_events(past)))
future = timeutils.utcnow() + datetime.timedelta(seconds=1000)
self.assertEqual(0, len(self.revoke_api.get_events(future)))
**** CubicPower OpenStack Study ****
def test_past_expiry_are_removed(self):
user_id = 1
self.revoke_api.revoke_by_expiration(user_id, _future_time())
self.assertEqual(1, len(self.revoke_api.get_events()))
event = model.RevokeEvent()
event.revoked_at = _past_time()
self.revoke_api.revoke(event)
self.assertEqual(1, len(self.revoke_api.get_events()))
@mock.patch.object(timeutils, 'utcnow')
**** CubicPower OpenStack Study ****
def test_expired_events_removed_validate_token_success(self, mock_utcnow):
def _sample_token_values():
token = _sample_blank_token()
token['expires_at'] = timeutils.isotime(_future_time(),
subsecond=True)
return token
now = datetime.datetime.utcnow()
now_plus_2h = now + datetime.timedelta(hours=2)
mock_utcnow.return_value = now
# Build a token and validate it. This will seed the cache for the
# future 'synchronize' call.
token_values = _sample_token_values()
user_id = _new_id()
self.revoke_api.revoke_by_user(user_id)
token_values['user_id'] = user_id
self.assertRaises(exception.TokenNotFound,
self.revoke_api.check_token,
token_values)
# Move our clock forward by 2h, build a new token and validate it.
# 'synchronize' should now be exercised and remove old expired events
mock_utcnow.return_value = now_plus_2h
self.revoke_api.revoke_by_expiration(_new_id(), now_plus_2h)
#should no longer throw an exception
self.revoke_api.check_token(token_values)
**** CubicPower OpenStack Study ****
def _sample_token_values():
token = _sample_blank_token()
token['expires_at'] = timeutils.isotime(_future_time(),
subsecond=True)
return token
now = datetime.datetime.utcnow()
now_plus_2h = now + datetime.timedelta(hours=2)
mock_utcnow.return_value = now
# Build a token and validate it. This will seed the cache for the
# future 'synchronize' call.
token_values = _sample_token_values()
user_id = _new_id()
self.revoke_api.revoke_by_user(user_id)
token_values['user_id'] = user_id
self.assertRaises(exception.TokenNotFound,
self.revoke_api.check_token,
token_values)
# Move our clock forward by 2h, build a new token and validate it.
# 'synchronize' should now be exercised and remove old expired events
mock_utcnow.return_value = now_plus_2h
self.revoke_api.revoke_by_expiration(_new_id(), now_plus_2h)
#should no longer throw an exception
self.revoke_api.check_token(token_values)
**** CubicPower OpenStack Study ****
class SqlRevokeTests(test_backend_sql.SqlTests, RevokeTests):
**** CubicPower OpenStack Study ****
def config_overrides(self):
super(SqlRevokeTests, self).config_overrides()
self.config_fixture.config(
group='revoke',
driver='keystone.contrib.revoke.backends.sql.Revoke')
self.config_fixture.config(
group='token',
provider='keystone.token.providers.pki.Provider',
revoke_by_id=False)
**** CubicPower OpenStack Study ****
class KvsRevokeTests(tests.TestCase, RevokeTests):
**** CubicPower OpenStack Study ****
def config_overrides(self):
super(KvsRevokeTests, self).config_overrides()
self.config_fixture.config(
group='revoke',
driver='keystone.contrib.revoke.backends.kvs.Revoke')
self.config_fixture.config(
group='token',
provider='keystone.token.providers.pki.Provider',
revoke_by_id=False)
**** CubicPower OpenStack Study ****
def setUp(self):
super(KvsRevokeTests, self).setUp()
self.load_backends()
**** CubicPower OpenStack Study ****
class RevokeTreeTests(tests.TestCase):
**** CubicPower OpenStack Study ****
def setUp(self):
super(RevokeTreeTests, self).setUp()
self.events = []
self.tree = model.RevokeTree()
self._sample_data()
**** CubicPower OpenStack Study ****
def _sample_data(self):
user_ids = []
project_ids = []
role_ids = []
for i in range(0, 3):
user_ids.append(_new_id())
project_ids.append(_new_id())
role_ids.append(_new_id())
project_tokens = []
i = len(project_tokens)
project_tokens.append(_sample_blank_token())
project_tokens[i]['user_id'] = user_ids[0]
project_tokens[i]['project_id'] = project_ids[0]
project_tokens[i]['roles'] = [role_ids[1]]
i = len(project_tokens)
project_tokens.append(_sample_blank_token())
project_tokens[i]['user_id'] = user_ids[1]
project_tokens[i]['project_id'] = project_ids[0]
project_tokens[i]['roles'] = [role_ids[0]]
i = len(project_tokens)
project_tokens.append(_sample_blank_token())
project_tokens[i]['user_id'] = user_ids[0]
project_tokens[i]['project_id'] = project_ids[1]
project_tokens[i]['roles'] = [role_ids[0]]
token_to_revoke = _sample_blank_token()
token_to_revoke['user_id'] = user_ids[0]
token_to_revoke['project_id'] = project_ids[0]
token_to_revoke['roles'] = [role_ids[0]]
self.project_tokens = project_tokens
self.user_ids = user_ids
self.project_ids = project_ids
self.role_ids = role_ids
self.token_to_revoke = token_to_revoke
**** CubicPower OpenStack Study ****
def _assertTokenRevoked(self, token_data):
self.assertTrue(any([_matches(e, token_data) for e in self.events]))
return self.assertTrue(self.tree.is_revoked(token_data),
'Token should be revoked')
**** CubicPower OpenStack Study ****
def _assertTokenNotRevoked(self, token_data):
self.assertFalse(any([_matches(e, token_data) for e in self.events]))
return self.assertFalse(self.tree.is_revoked(token_data),
'Token should not be revoked')
**** CubicPower OpenStack Study ****
def _revoke_by_user(self, user_id):
return self.tree.add_event(
model.RevokeEvent(user_id=user_id))
**** CubicPower OpenStack Study ****
def _revoke_by_expiration(self, user_id, expires_at):
event = self.tree.add_event(
model.RevokeEvent(user_id=user_id,
expires_at=expires_at))
self.events.append(event)
return event
**** CubicPower OpenStack Study ****
def _revoke_by_grant(self, role_id, user_id=None,
domain_id=None, project_id=None):
event = self.tree.add_event(
model.RevokeEvent(user_id=user_id,
role_id=role_id,
domain_id=domain_id,
project_id=project_id))
self.events.append(event)
return event
**** CubicPower OpenStack Study ****
def _revoke_by_user_and_project(self, user_id, project_id):
event = self.tree.add_event(
model.RevokeEvent(project_id=project_id,
user_id=user_id))
self.events.append(event)
return event
**** CubicPower OpenStack Study ****
def _revoke_by_project_role_assignment(self, project_id, role_id):
event = self.tree.add_event(
model.RevokeEvent(project_id=project_id,
role_id=role_id))
self.events.append(event)
return event
**** CubicPower OpenStack Study ****
def _revoke_by_domain_role_assignment(self, domain_id, role_id):
event = self.tree.add_event(
model.RevokeEvent(domain_id=domain_id,
role_id=role_id))
self.events.append(event)
return event
**** CubicPower OpenStack Study ****
def _user_field_test(self, field_name):
user_id = _new_id()
event = self._revoke_by_user(user_id)
self.events.append(event)
token_data_u1 = _sample_blank_token()
token_data_u1[field_name] = user_id
self._assertTokenRevoked(token_data_u1)
token_data_u2 = _sample_blank_token()
token_data_u2[field_name] = _new_id()
self._assertTokenNotRevoked(token_data_u2)
self.tree.remove_event(event)
self.events.remove(event)
self._assertTokenNotRevoked(token_data_u1)
**** CubicPower OpenStack Study ****
def test_revoke_by_user(self):
self._user_field_test('user_id')
**** CubicPower OpenStack Study ****
def test_revoke_by_user_matches_trustee(self):
self._user_field_test('trustee_id')
**** CubicPower OpenStack Study ****
def test_revoke_by_user_matches_trustor(self):
self._user_field_test('trustor_id')
**** CubicPower OpenStack Study ****
def test_by_user_expiration(self):
future_time = _future_time()
user_id = 1
event = self._revoke_by_expiration(user_id, future_time)
token_data_1 = _sample_blank_token()
token_data_1['user_id'] = user_id
token_data_1['expires_at'] = future_time
self._assertTokenRevoked(token_data_1)
token_data_2 = _sample_blank_token()
token_data_2['user_id'] = user_id
expire_delta = datetime.timedelta(seconds=2000)
future_time = timeutils.utcnow() + expire_delta
token_data_2['expires_at'] = future_time
self._assertTokenNotRevoked(token_data_2)
self.removeEvent(event)
self._assertTokenNotRevoked(token_data_1)
**** CubicPower OpenStack Study ****
def removeEvent(self, event):
self.events.remove(event)
self.tree.remove_event(event)
**** CubicPower OpenStack Study ****
def test_by_project_grant(self):
token_to_revoke = self.token_to_revoke
tokens = self.project_tokens
self._assertTokenNotRevoked(token_to_revoke)
for token in tokens:
self._assertTokenNotRevoked(token)
event = self._revoke_by_grant(role_id=self.role_ids[0],
user_id=self.user_ids[0],
project_id=self.project_ids[0])
self._assertTokenRevoked(token_to_revoke)
for token in tokens:
self._assertTokenNotRevoked(token)
self.removeEvent(event)
self._assertTokenNotRevoked(token_to_revoke)
for token in tokens:
self._assertTokenNotRevoked(token)
token_to_revoke['roles'] = [self.role_ids[0],
self.role_ids[1],
self.role_ids[2]]
event = self._revoke_by_grant(role_id=self.role_ids[0],
user_id=self.user_ids[0],
project_id=self.project_ids[0])
self._assertTokenRevoked(token_to_revoke)
self.removeEvent(event)
self._assertTokenNotRevoked(token_to_revoke)
event = self._revoke_by_grant(role_id=self.role_ids[1],
user_id=self.user_ids[0],
project_id=self.project_ids[0])
self._assertTokenRevoked(token_to_revoke)
self.removeEvent(event)
self._assertTokenNotRevoked(token_to_revoke)
self._revoke_by_grant(role_id=self.role_ids[0],
user_id=self.user_ids[0],
project_id=self.project_ids[0])
self._revoke_by_grant(role_id=self.role_ids[1],
user_id=self.user_ids[0],
project_id=self.project_ids[0])
self._revoke_by_grant(role_id=self.role_ids[2],
user_id=self.user_ids[0],
project_id=self.project_ids[0])
self._assertTokenRevoked(token_to_revoke)
**** CubicPower OpenStack Study ****
def test_by_project_and_user_and_role(self):
user_id1 = _new_id()
user_id2 = _new_id()
project_id = _new_id()
self.events.append(self._revoke_by_user(user_id1))
self.events.append(
self._revoke_by_user_and_project(user_id2, project_id))
token_data = _sample_blank_token()
token_data['user_id'] = user_id2
token_data['project_id'] = project_id
self._assertTokenRevoked(token_data)
**** CubicPower OpenStack Study ****
def _assertEmpty(self, collection):
return self.assertEqual(0, len(collection), "collection not empty")
**** CubicPower OpenStack Study ****
def _assertEventsMatchIteration(self, turn):
self.assertEqual(1, len(self.tree.revoke_map))
self.assertEqual(turn + 1, len(self.tree.revoke_map
['trust_id=*']
['consumer_id=*']
['access_token_id=*']))
# two different functions add domain_ids, +1 for None
self.assertEqual(2 * turn + 1, len(self.tree.revoke_map
['trust_id=*']
['consumer_id=*']
['access_token_id=*']
['expires_at=*']))
# two different functions add project_ids, +1 for None
self.assertEqual(2 * turn + 1, len(self.tree.revoke_map
['trust_id=*']
['consumer_id=*']
['access_token_id=*']
['expires_at=*']
['domain_id=*']))
# 10 users added
self.assertEqual(turn, len(self.tree.revoke_map
['trust_id=*']
['consumer_id=*']
['access_token_id=*']
['expires_at=*']
['domain_id=*']
['project_id=*']))
**** CubicPower OpenStack Study ****
def test_cleanup(self):
events = self.events
self._assertEmpty(self.tree.revoke_map)
for i in range(0, 10):
events.append(
self._revoke_by_user(_new_id()))
events.append(
self._revoke_by_expiration(_new_id(), _future_time()))
events.append(
self._revoke_by_project_role_assignment(_new_id(), _new_id()))
events.append(
self._revoke_by_domain_role_assignment(_new_id(), _new_id()))
events.append(
self._revoke_by_domain_role_assignment(_new_id(), _new_id()))
events.append(
self._revoke_by_user_and_project(_new_id(), _new_id()))
self._assertEventsMatchIteration(i + 1)
for event in self.events:
self.tree.remove_event(event)
self._assertEmpty(self.tree.revoke_map)