本文整理汇总了Python中st2common.persistence.auth.User类的典型用法代码示例。如果您正苦于以下问题:Python User类的具体用法?Python User怎么用?Python User使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了User类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: setUp
def setUp(self):
super(PolicyTypeControllerRBACTestCase, self).setUp()
self.models = self.fixtures_loader.save_fixtures_to_db(fixtures_pack=FIXTURES_PACK,
fixtures_dict=TEST_FIXTURES)
file_name = 'fake_policy_type_1.yaml'
PolicyTypeControllerRBACTestCase.POLICY_TYPE_1 = self.fixtures_loader.load_fixtures(
fixtures_pack=FIXTURES_PACK,
fixtures_dict={'policytypes': [file_name]})['policytypes'][file_name]
file_name = 'fake_policy_type_2.yaml'
PolicyTypeControllerRBACTestCase.POLICY_TYPE_2 = self.fixtures_loader.load_fixtures(
fixtures_pack=FIXTURES_PACK,
fixtures_dict={'policytypes': [file_name]})['policytypes'][file_name]
# Insert mock users, roles and assignments
# Users
user_1_db = UserDB(name='policy_type_list')
user_1_db = User.add_or_update(user_1_db)
self.users['policy_type_list'] = user_1_db
user_2_db = UserDB(name='policy_type_view')
user_2_db = User.add_or_update(user_2_db)
self.users['policy_type_view'] = user_2_db
# Roles
# policy_type_list
grant_db = PermissionGrantDB(resource_uid=None,
resource_type=ResourceType.POLICY_TYPE,
permission_types=[PermissionType.POLICY_TYPE_LIST])
grant_db = PermissionGrant.add_or_update(grant_db)
permission_grants = [str(grant_db.id)]
role_1_db = RoleDB(name='policy_type_list', permission_grants=permission_grants)
role_1_db = Role.add_or_update(role_1_db)
self.roles['policy_type_list'] = role_1_db
# policy_type_view on timer 1
policy_type_uid = self.models['policytypes']['fake_policy_type_1.yaml'].get_uid()
grant_db = PermissionGrantDB(resource_uid=policy_type_uid,
resource_type=ResourceType.POLICY_TYPE,
permission_types=[PermissionType.POLICY_TYPE_VIEW])
grant_db = PermissionGrant.add_or_update(grant_db)
permission_grants = [str(grant_db.id)]
role_1_db = RoleDB(name='policy_type_view', permission_grants=permission_grants)
role_1_db = Role.add_or_update(role_1_db)
self.roles['policy_type_view'] = role_1_db
# Role assignments
role_assignment_db = UserRoleAssignmentDB(
user=self.users['policy_type_list'].name,
role=self.roles['policy_type_list'].name,
source='assignments/%s.yaml' % self.users['policy_type_list'].name)
UserRoleAssignment.add_or_update(role_assignment_db)
role_assignment_db = UserRoleAssignmentDB(
user=self.users['policy_type_view'].name,
role=self.roles['policy_type_view'].name,
source='assignments/%s.yaml' % self.users['policy_type_view'].name)
UserRoleAssignment.add_or_update(role_assignment_db)
开发者ID:lyandut,项目名称:st2,代码行数:60,代码来源:test_policies_rbac.py
示例2: setUp
def setUp(self):
super(APIControllerWithRBACTestCase, self).setUp()
self.users = {}
self.roles = {}
# Run RBAC migrations
run_all_rbac_migrations()
# Insert mock users with default role assignments
role_names = [SystemRole.SYSTEM_ADMIN, SystemRole.ADMIN, SystemRole.OBSERVER]
for role_name in role_names:
user_db = UserDB(name=role_name)
user_db = User.add_or_update(user_db)
self.users[role_name] = user_db
role_assignment_db = UserRoleAssignmentDB(
user=user_db.name,
role=role_name)
UserRoleAssignment.add_or_update(role_assignment_db)
# Insert a user with no permissions and role assignments
user_1_db = UserDB(name='no_permissions')
user_1_db = User.add_or_update(user_1_db)
self.users['no_permissions'] = user_1_db
开发者ID:Bala96,项目名称:st2,代码行数:25,代码来源:base.py
示例3: setUp
def setUp(self):
super(KeyValuesControllerRBACTestCase, self).setUp()
self.kvps = {}
# Insert mock users
user_1_db = UserDB(name='user1')
user_1_db = User.add_or_update(user_1_db)
self.users['user_1'] = user_1_db
user_2_db = UserDB(name='user2')
user_2_db = User.add_or_update(user_2_db)
self.users['user_2'] = user_2_db
# Insert mock kvp objects
kvp_api = KeyValuePairSetAPI(name='test_system_scope', value='value1',
scope=FULL_SYSTEM_SCOPE)
kvp_db = KeyValuePairSetAPI.to_model(kvp_api)
kvp_db = KeyValuePair.add_or_update(kvp_db)
kvp_db = KeyValuePairAPI.from_model(kvp_db)
self.kvps['kvp_1'] = kvp_db
kvp_api = KeyValuePairSetAPI(name='test_system_scope_secret', value='value_secret',
scope=FULL_SYSTEM_SCOPE, secret=True)
kvp_db = KeyValuePairSetAPI.to_model(kvp_api)
kvp_db = KeyValuePair.add_or_update(kvp_db)
kvp_db = KeyValuePairAPI.from_model(kvp_db)
self.kvps['kvp_2'] = kvp_db
name = get_key_reference(scope=FULL_USER_SCOPE, name='test_user_scope_1', user='user1')
kvp_db = KeyValuePairDB(name=name, value='valueu12', scope=FULL_USER_SCOPE)
kvp_db = KeyValuePair.add_or_update(kvp_db)
kvp_db = KeyValuePairAPI.from_model(kvp_db)
self.kvps['kvp_3'] = kvp_db
name = get_key_reference(scope=FULL_USER_SCOPE, name='test_user_scope_2', user='user1')
kvp_api = KeyValuePairSetAPI(name=name, value='user_secret', scope=FULL_USER_SCOPE,
secret=True)
kvp_db = KeyValuePairSetAPI.to_model(kvp_api)
kvp_db = KeyValuePair.add_or_update(kvp_db)
kvp_db = KeyValuePairAPI.from_model(kvp_db)
self.kvps['kvp_4'] = kvp_db
name = get_key_reference(scope=FULL_USER_SCOPE, name='test_user_scope_3', user='user2')
kvp_db = KeyValuePairDB(name=name, value='valueu21', scope=FULL_USER_SCOPE)
kvp_db = KeyValuePair.add_or_update(kvp_db)
kvp_db = KeyValuePairAPI.from_model(kvp_db)
self.kvps['kvp_5'] = kvp_db
self.system_scoped_items_count = 2
self.user_scoped_items_count = 3
self.user_scoped_items_per_user_count = {
'user1': 2,
'user2': 1
}
开发者ID:lyandut,项目名称:st2,代码行数:55,代码来源:test_kvps_rbac.py
示例4: create_token
def create_token(username, ttl=None, metadata=None, add_missing_user=True, service=False):
"""
:param username: Username of the user to create the token for. If the account for this user
doesn't exist yet it will be created.
:type username: ``str``
:param ttl: Token TTL (in seconds).
:type ttl: ``int``
:param metadata: Optional metadata to associate with the token.
:type metadata: ``dict``
:param add_missing_user: Add the user given by `username` if they don't exist
:type add_missing_user: ``bool``
:param service: True if this is a service (non-user) token.
:type service: ``bool``
"""
if ttl:
# Note: We allow arbitrary large TTLs for service tokens.
if not service and ttl > cfg.CONF.auth.token_ttl:
msg = ('TTL specified %s is greater than max allowed %s.' % (ttl,
cfg.CONF.auth.token_ttl))
raise TTLTooLargeException(msg)
else:
ttl = cfg.CONF.auth.token_ttl
if username:
try:
User.get_by_name(username)
except:
if add_missing_user:
user_db = UserDB(name=username)
User.add_or_update(user_db)
extra = {'username': username, 'user': user_db}
LOG.audit('Registered new user "%s".' % (username), extra=extra)
else:
raise UserNotFoundError()
token = uuid.uuid4().hex
expiry = date_utils.get_datetime_utc_now() + datetime.timedelta(seconds=ttl)
token = TokenDB(user=username, token=token, expiry=expiry, metadata=metadata, service=service)
Token.add_or_update(token)
username_string = username if username else 'an anonymous user'
token_expire_string = isotime.format(expiry, offset=False)
extra = {'username': username, 'token_expiration': token_expire_string}
LOG.audit('Access granted to "%s" with the token set to expire at "%s".' %
(username_string, token_expire_string), extra=extra)
return token
开发者ID:StackStorm,项目名称:st2,代码行数:54,代码来源:access.py
示例5: _get_username_for_request
def _get_username_for_request(self, username, request):
impersonate_user = getattr(request, 'user', None)
if impersonate_user is not None:
# check this is a service account
try:
if not User.get_by_name(username).is_service:
message = "Current user is not a service and cannot " \
"request impersonated tokens"
abort_request(status_code=http_client.BAD_REQUEST,
message=message)
return
username = impersonate_user
except (UserNotFoundError, StackStormDBObjectNotFoundError):
message = "Could not locate user %s" % \
(impersonate_user)
abort_request(status_code=http_client.BAD_REQUEST,
message=message)
return
else:
impersonate_user = getattr(request, 'impersonate_user', None)
nickname_origin = getattr(request, 'nickname_origin', None)
if impersonate_user is not None:
try:
# check this is a service account
if not User.get_by_name(username).is_service:
raise NotServiceUserError()
username = User.get_by_nickname(impersonate_user,
nickname_origin).name
except NotServiceUserError:
message = "Current user is not a service and cannot " \
"request impersonated tokens"
abort_request(status_code=http_client.BAD_REQUEST,
message=message)
return
except (UserNotFoundError, StackStormDBObjectNotFoundError):
message = "Could not locate user %[email protected]%s" % \
(impersonate_user, nickname_origin)
abort_request(status_code=http_client.BAD_REQUEST,
message=message)
return
except NoNicknameOriginProvidedError:
message = "Nickname origin is not provided for nickname '%s'" % \
impersonate_user
abort_request(status_code=http_client.BAD_REQUEST,
message=message)
return
except AmbiguousUserError:
message = "%[email protected]%s matched more than one username" % \
(impersonate_user, nickname_origin)
abort_request(status_code=http_client.BAD_REQUEST,
message=message)
return
return username
开发者ID:nzlosh,项目名称:st2,代码行数:54,代码来源:handlers.py
示例6: setUp
def setUp(self):
super(RunnerPermissionsResolverTestCase, self).setUp()
# Create some mock users
user_1_db = UserDB(name='custom_role_runner_view_grant')
user_1_db = User.add_or_update(user_1_db)
self.users['custom_role_runner_view_grant'] = user_1_db
user_2_db = UserDB(name='custom_role_runner_modify_grant')
user_2_db = User.add_or_update(user_2_db)
self.users['custom_role_runner_modify_grant'] = user_2_db
# Create some mock resources on which permissions can be granted
runner_1_db = RunnerTypeDB(name='runner_1')
self.resources['runner_1'] = runner_1_db
runner_2_db = RunnerTypeDB(name='runner_2')
self.resources['runner_2'] = runner_2_db
# Create some mock roles with associated permission grants
# Custom role - "runner_view" grant on runner_1
grant_db = PermissionGrantDB(resource_uid=self.resources['runner_1'].get_uid(),
resource_type=ResourceType.RUNNER,
permission_types=[PermissionType.RUNNER_VIEW])
grant_db = PermissionGrant.add_or_update(grant_db)
permission_grants = [str(grant_db.id)]
role_db = RoleDB(name='custom_role_runner_view_grant',
permission_grants=permission_grants)
role_db = Role.add_or_update(role_db)
self.roles['custom_role_runner_view_grant'] = role_db
# Custom role - "runner_modify" grant on runner_2
grant_db = PermissionGrantDB(resource_uid=self.resources['runner_2'].get_uid(),
resource_type=ResourceType.RUNNER,
permission_types=[PermissionType.RUNNER_MODIFY])
grant_db = PermissionGrant.add_or_update(grant_db)
permission_grants = [str(grant_db.id)]
role_db = RoleDB(name='custom_role_runner_modify_grant',
permission_grants=permission_grants)
role_db = Role.add_or_update(role_db)
self.roles['custom_role_runner_modify_grant'] = role_db
# Create some mock role assignments
user_db = self.users['custom_role_runner_view_grant']
role_assignment_db = UserRoleAssignmentDB(
user=user_db.name,
role=self.roles['custom_role_runner_view_grant'].name)
UserRoleAssignment.add_or_update(role_assignment_db)
user_db = self.users['custom_role_runner_modify_grant']
role_assignment_db = UserRoleAssignmentDB(
user=user_db.name,
role=self.roles['custom_role_runner_modify_grant'].name)
UserRoleAssignment.add_or_update(role_assignment_db)
开发者ID:Bala96,项目名称:st2,代码行数:54,代码来源:test_rbac_resolvers_runner.py
示例7: setUp
def setUp(self):
super(KeyValuesControllerRBACTestCase, self).setUp()
self.kvps = {}
# Insert mock users
user_1_db = UserDB(name="user1")
user_1_db = User.add_or_update(user_1_db)
self.users["user_1"] = user_1_db
user_2_db = UserDB(name="user2")
user_2_db = User.add_or_update(user_2_db)
self.users["user_2"] = user_2_db
# Insert mock kvp objects
kvp_api = KeyValuePairSetAPI(name="test_system_scope", value="value1", scope=FULL_SYSTEM_SCOPE)
kvp_db = KeyValuePairSetAPI.to_model(kvp_api)
kvp_db = KeyValuePair.add_or_update(kvp_db)
kvp_db = KeyValuePairAPI.from_model(kvp_db)
self.kvps["kvp_1"] = kvp_db
kvp_api = KeyValuePairSetAPI(
name="test_system_scope_secret", value="value_secret", scope=FULL_SYSTEM_SCOPE, secret=True
)
kvp_db = KeyValuePairSetAPI.to_model(kvp_api)
kvp_db = KeyValuePair.add_or_update(kvp_db)
kvp_db = KeyValuePairAPI.from_model(kvp_db)
self.kvps["kvp_2"] = kvp_db
name = get_key_reference(scope=FULL_USER_SCOPE, name="test_user_scope_1", user="user1")
kvp_db = KeyValuePairDB(name=name, value="valueu12", scope=FULL_USER_SCOPE)
kvp_db = KeyValuePair.add_or_update(kvp_db)
kvp_db = KeyValuePairAPI.from_model(kvp_db)
self.kvps["kvp_3"] = kvp_db
name = get_key_reference(scope=FULL_USER_SCOPE, name="test_user_scope_2", user="user1")
kvp_api = KeyValuePairSetAPI(name=name, value="user_secret", scope=FULL_USER_SCOPE, secret=True)
kvp_db = KeyValuePairSetAPI.to_model(kvp_api)
kvp_db = KeyValuePair.add_or_update(kvp_db)
kvp_db = KeyValuePairAPI.from_model(kvp_db)
self.kvps["kvp_4"] = kvp_db
name = get_key_reference(scope=FULL_USER_SCOPE, name="test_user_scope_3", user="user2")
kvp_db = KeyValuePairDB(name=name, value="valueu21", scope=FULL_USER_SCOPE)
kvp_db = KeyValuePair.add_or_update(kvp_db)
kvp_db = KeyValuePairAPI.from_model(kvp_db)
self.kvps["kvp_5"] = kvp_db
self.system_scoped_items_count = 2
self.user_scoped_items_count = 3
self.user_scoped_items_per_user_count = {"user1": 2, "user2": 1}
开发者ID:Pulsant,项目名称:st2,代码行数:51,代码来源:test_kvps_rbac.py
示例8: setUp
def setUp(self):
super(RBACDefinitionsDBSyncerTestCase, self).setUp()
self.roles = {}
self.users = {}
# Insert some mock users
user_1_db = UserDB(name='user_1')
user_1_db = User.add_or_update(user_1_db)
self.users['user_1'] = user_1_db
user_2_db = UserDB(name='user_2')
user_2_db = User.add_or_update(user_2_db)
self.users['user_2'] = user_2_db
开发者ID:agilee,项目名称:st2,代码行数:14,代码来源:test_rbac_syncer.py
示例9: setUp
def setUp(self):
super(WebhookControllerRBACTestCase, self).setUp()
# Insert mock users, roles and assignments
# Users
user_1_db = UserDB(name='webhook_list')
user_1_db = User.add_or_update(user_1_db)
self.users['webhook_list'] = user_1_db
user_2_db = UserDB(name='webhook_view')
user_2_db = User.add_or_update(user_2_db)
self.users['webhook_view'] = user_2_db
# Roles
# webhook_list
grant_db = PermissionGrantDB(resource_uid=None,
resource_type=ResourceType.WEBHOOK,
permission_types=[PermissionType.WEBHOOK_LIST])
grant_db = PermissionGrant.add_or_update(grant_db)
permission_grants = [str(grant_db.id)]
role_1_db = RoleDB(name='webhook_list', permission_grants=permission_grants)
role_1_db = Role.add_or_update(role_1_db)
self.roles['webhook_list'] = role_1_db
# webhook_view on webhook 1 (git)
name = 'git'
webhook_db = WebhookDB(name=name)
webhook_uid = webhook_db.get_uid()
grant_db = PermissionGrantDB(resource_uid=webhook_uid,
resource_type=ResourceType.WEBHOOK,
permission_types=[PermissionType.WEBHOOK_VIEW])
grant_db = PermissionGrant.add_or_update(grant_db)
permission_grants = [str(grant_db.id)]
role_1_db = RoleDB(name='webhook_view', permission_grants=permission_grants)
role_1_db = Role.add_or_update(role_1_db)
self.roles['webhook_view'] = role_1_db
# Role assignments
role_assignment_db = UserRoleAssignmentDB(
user=self.users['webhook_list'].name,
role=self.roles['webhook_list'].name,
source='assignments/%s.yaml' % self.users['webhook_list'].name)
UserRoleAssignment.add_or_update(role_assignment_db)
role_assignment_db = UserRoleAssignmentDB(
user=self.users['webhook_view'].name,
role=self.roles['webhook_view'].name,
source='assignments/%s.yaml' % self.users['webhook_view'].name)
UserRoleAssignment.add_or_update(role_assignment_db)
开发者ID:lyandut,项目名称:st2,代码行数:50,代码来源:test_webhooks_rbac.py
示例10: post
def post(self, api_key_api, requester_user):
"""
Create a new entry.
"""
permission_type = PermissionType.API_KEY_CREATE
rbac_utils = get_rbac_backend().get_utils_class()
rbac_utils.assert_user_has_resource_api_permission(user_db=requester_user,
resource_api=api_key_api,
permission_type=permission_type)
api_key_db = None
api_key = None
try:
if not getattr(api_key_api, 'user', None):
if requester_user:
api_key_api.user = requester_user.name
else:
api_key_api.user = cfg.CONF.system_user.user
try:
User.get_by_name(api_key_api.user)
except StackStormDBObjectNotFoundError:
user_db = UserDB(name=api_key_api.user)
User.add_or_update(user_db)
extra = {'username': api_key_api.user, 'user': user_db}
LOG.audit('Registered new user "%s".' % (api_key_api.user), extra=extra)
# If key_hash is provided use that and do not create a new key. The assumption
# is user already has the original api-key
if not getattr(api_key_api, 'key_hash', None):
api_key, api_key_hash = auth_util.generate_api_key_and_hash()
# store key_hash in DB
api_key_api.key_hash = api_key_hash
api_key_db = ApiKey.add_or_update(ApiKeyAPI.to_model(api_key_api))
except (ValidationError, ValueError) as e:
LOG.exception('Validation failed for api_key data=%s.', api_key_api)
abort(http_client.BAD_REQUEST, six.text_type(e))
extra = {'api_key_db': api_key_db}
LOG.audit('ApiKey created. ApiKey.id=%s' % (api_key_db.id), extra=extra)
api_key_create_response_api = ApiKeyCreateResponseAPI.from_model(api_key_db)
# Return real api_key back to user. A one-way hash of the api_key is stored in the DB
# only the real value only returned at create time. Also, no masking of key here since
# the user needs to see this value atleast once.
api_key_create_response_api.key = api_key
return Response(json=api_key_create_response_api, status=http_client.CREATED)
开发者ID:StackStorm,项目名称:st2,代码行数:50,代码来源:auth.py
示例11: test_grant_and_revoke_role
def test_grant_and_revoke_role(self):
user_db = UserDB(name='test-user-1')
user_db = User.add_or_update(user_db)
# Initial state, no roles
role_dbs = rbac_services.get_roles_for_user(user_db=user_db)
self.assertItemsEqual(role_dbs, [])
role_dbs = user_db.get_roles()
self.assertItemsEqual(role_dbs, [])
# Assign a role, should have one role assigned
rbac_services.assign_role_to_user(role_db=self.roles['custom_role_1'],
user_db=user_db)
role_dbs = rbac_services.get_roles_for_user(user_db=user_db)
self.assertItemsEqual(role_dbs, [self.roles['custom_role_1']])
role_dbs = user_db.get_roles()
self.assertItemsEqual(role_dbs, [self.roles['custom_role_1']])
# Revoke previously assigned role, should have no roles again
rbac_services.revoke_role_from_user(role_db=self.roles['custom_role_1'],
user_db=user_db)
role_dbs = rbac_services.get_roles_for_user(user_db=user_db)
self.assertItemsEqual(role_dbs, [])
role_dbs = user_db.get_roles()
self.assertItemsEqual(role_dbs, [])
开发者ID:Bala96,项目名称:st2,代码行数:29,代码来源:test_rbac.py
示例12: setUp
def setUp(self):
super(RBACRoleAssignmentsControllerRBACTestCase, self).setUp()
# Insert mock users, roles and assignments
self.role_assignments = {}
# Users
user_1_db = UserDB(name='user_foo')
user_1_db = User.add_or_update(user_1_db)
self.users['user_foo'] = user_1_db
# Roles
role_1_db = RoleDB(name='user_foo', permission_grants=[])
role_1_db = Role.add_or_update(role_1_db)
self.roles['user_foo'] = role_1_db
# Role assignments
role_assignment_db = UserRoleAssignmentDB(
user=self.users['user_foo'].name,
role=self.roles['user_foo'].name,
source='assignments/%s.yaml' % self.users['user_foo'].name)
UserRoleAssignment.add_or_update(role_assignment_db)
self.role_assignments['assignment_one'] = role_assignment_db
role_assignment_db = UserRoleAssignmentDB(
user='user_bar',
role=self.roles['user_foo'].name,
source='assignments/user_bar.yaml')
UserRoleAssignment.add_or_update(role_assignment_db)
self.role_assignments['assignment_two'] = role_assignment_db
开发者ID:lyandut,项目名称:st2,代码行数:30,代码来源:test_rbac_rbac.py
示例13: test_logging_profiling_is_disabled
def test_logging_profiling_is_disabled(self, mock_log):
disable_profiling()
queryset = User.query(name__in=['test1', 'test2'], order_by=['+aa', '-bb'], limit=1)
result = log_query_and_profile_data_for_queryset(queryset=queryset)
self.assertEqual(queryset, result)
call_args_list = mock_log.debug.call_args_list
self.assertItemsEqual(call_args_list, [])
开发者ID:StackStorm,项目名称:st2,代码行数:7,代码来源:test_model_utils_profiling.py
示例14: sync_users_role_assignments
def sync_users_role_assignments(self, role_assignment_apis):
"""
Synchronize role assignments for all the users in the database.
:param role_assignment_apis: Role assignments API objects for the assignments loaded
from the files.
:type role_assignment_apis: ``list`` of :class:`UserRoleAssignmentFileFormatAPI`
:return: Dictionary with created and removed role assignments for each user.
:rtype: ``dict``
"""
LOG.info("Synchronizing users role assignments...")
username_to_role_assignment_map = dict([(api.username, api) for api in role_assignment_apis])
user_dbs = User.get_all()
results = {}
for user_db in user_dbs:
username = user_db.name
role_assignment_api = username_to_role_assignment_map.get(username, None)
role_assignment_dbs = rbac_services.get_role_assignments_for_user(user_db=user_db)
result = self._sync_user_role_assignments(
user_db=user_db, role_assignment_dbs=role_assignment_dbs, role_assignment_api=role_assignment_api
)
results[username] = result
LOG.info("User role assignments synchronized")
return results
开发者ID:alexmakarski,项目名称:st2,代码行数:29,代码来源:syncer.py
示例15: setUp
def setUp(self):
super(ActionControllerRBACTestCase, self).setUp()
self.fixtures_loader.save_fixtures_to_db(fixtures_pack=FIXTURES_PACK,
fixtures_dict=TEST_FIXTURES)
file_name = 'action1.yaml'
ActionControllerRBACTestCase.ACTION_1 = self.fixtures_loader.load_fixtures(
fixtures_pack=FIXTURES_PACK,
fixtures_dict={'actions': [file_name]})['actions'][file_name]
# Insert mock users, roles and assignments
# Users
user_2_db = UserDB(name='action_create')
user_2_db = User.add_or_update(user_2_db)
self.users['action_create'] = user_2_db
# Roles
# action_create grant on parent pack
grant_db = PermissionGrantDB(resource_uid='pack:examples',
resource_type=ResourceType.PACK,
permission_types=[PermissionType.ACTION_CREATE])
grant_db = PermissionGrant.add_or_update(grant_db)
permission_grants = [str(grant_db.id)]
role_1_db = RoleDB(name='action_create', permission_grants=permission_grants)
role_1_db = Role.add_or_update(role_1_db)
self.roles['action_create'] = role_1_db
# Role assignments
user_db = self.users['action_create']
role_assignment_db = UserRoleAssignmentDB(
user=user_db.name,
role=self.roles['action_create'].name)
UserRoleAssignment.add_or_update(role_assignment_db)
开发者ID:Bala96,项目名称:st2,代码行数:34,代码来源:test_actions_rbac.py
示例16: setUp
def setUp(self):
super(WebhookPermissionsResolverTestCase, self).setUp()
# Create some mock users
user_1_db = UserDB(name='custom_role_webhook_grant')
user_1_db = User.add_or_update(user_1_db)
self.users['custom_role_webhook_grant'] = user_1_db
# Create some mock resources on which permissions can be granted
webhook_1_db = WebhookDB(name='st2/')
self.resources['webhook_1'] = webhook_1_db
# Create some mock roles with associated permission grants
# Custom role - "webhook_send" grant on webhook_1
grant_db = PermissionGrantDB(resource_uid=self.resources['webhook_1'].get_uid(),
resource_type=ResourceType.WEBHOOK,
permission_types=[PermissionType.WEBHOOK_SEND])
grant_db = PermissionGrant.add_or_update(grant_db)
permission_grants = [str(grant_db.id)]
role_db = RoleDB(name='custom_role_webhook_grant',
permission_grants=permission_grants)
role_db = Role.add_or_update(role_db)
self.roles['custom_role_webhook_grant'] = role_db
# Create some mock role assignments
user_db = self.users['custom_role_webhook_grant']
role_assignment_db = UserRoleAssignmentDB(
user=user_db.name, role=self.roles['custom_role_webhook_grant'].name,
source='assignments/%s.yaml' % user_db.name)
UserRoleAssignment.add_or_update(role_assignment_db)
开发者ID:lyandut,项目名称:st2,代码行数:30,代码来源:test_rbac_resolvers_webhook.py
示例17: sync_users_role_assignments
def sync_users_role_assignments(self, role_assignment_apis):
"""
Synchronize role assignments for all the users in the database.
:param role_assignment_apis: Role assignments API objects for the assignments loaded
from the files.
:type role_assignment_apis: ``list`` of :class:`UserRoleAssignmentFileFormatAPI`
:return: Dictionary with created and removed role assignments for each user.
:rtype: ``dict``
"""
LOG.info('Synchronizing users role assignments...')
user_dbs = User.get_all()
username_to_user_db_map = dict([(user_db.name, user_db) for user_db in user_dbs])
results = {}
for role_assignment_api in role_assignment_apis:
username = role_assignment_api.username
user_db = username_to_user_db_map.get(username, None)
if not user_db:
LOG.debug(('Skipping role assignments for user "%s" which doesn\'t exist in the '
'database' % (username)))
continue
role_assignment_dbs = rbac_services.get_role_assignments_for_user(user_db=user_db)
result = self._sync_user_role_assignments(user_db=user_db,
role_assignment_dbs=role_assignment_dbs,
role_assignment_api=role_assignment_api)
results[username] = result
LOG.info('User role assignments synchronized')
return results
开发者ID:agilee,项目名称:st2,代码行数:35,代码来源:syncer.py
示例18: setUp
def setUp(self):
super(ExecutionViewsFiltersControllerRBACTestCase, self).setUp()
# Insert mock users, roles and assignments
# Users
user_1_db = UserDB(name='execution_views_filters_list')
user_1_db = User.add_or_update(user_1_db)
self.users['execution_views_filters_list'] = user_1_db
# Roles
# trace_list
permission_types = [PermissionType.EXECUTION_VIEWS_FILTERS_LIST]
grant_db = PermissionGrantDB(resource_uid=None,
resource_type=ResourceType.EXECUTION,
permission_types=permission_types)
grant_db = PermissionGrant.add_or_update(grant_db)
permission_grants = [str(grant_db.id)]
role_1_db = RoleDB(name='execution_views_filters_list',
permission_grants=permission_grants)
role_1_db = Role.add_or_update(role_1_db)
self.roles['execution_views_filters_list'] = role_1_db
# Role assignments
role_assignment_db = UserRoleAssignmentDB(
user=self.users['execution_views_filters_list'].name,
role=self.roles['execution_views_filters_list'].name,
source='assignments/%s.yaml' % self.users['execution_views_filters_list'].name)
UserRoleAssignment.add_or_update(role_assignment_db)
开发者ID:lyandut,项目名称:st2,代码行数:29,代码来源:test_executions_filters_rbac.py
示例19: create_token
def create_token(username, ttl=None, metadata=None):
"""
:param username: Username of the user to create the token for. If the account for this user
doesn't exist yet it will be created.
:type username: ``str``
:param ttl: Token TTL (in seconds).
:type ttl: ``int``
:param metadata: Optional metadata to associate with the token.
:type metadata: ``dict``
"""
if ttl:
if ttl > cfg.CONF.auth.token_ttl:
msg = 'TTL specified %s is greater than max allowed %s.' % (
ttl, cfg.CONF.auth.token_ttl
)
raise TTLTooLargeException(msg)
else:
ttl = cfg.CONF.auth.token_ttl
if username:
try:
User.get_by_name(username)
except:
user = UserDB(name=username)
User.add_or_update(user)
extra = {'username': username, 'user': user}
LOG.audit('Registered new user "%s".' % (username), extra=extra)
token = uuid.uuid4().hex
expiry = datetime.datetime.utcnow() + datetime.timedelta(seconds=ttl)
expiry = isotime.add_utc_tz(expiry)
token = TokenDB(user=username, token=token, expiry=expiry, metadata=metadata)
Token.add_or_update(token)
username_string = username if username else 'an anonymous user'
token_expire_string = isotime.format(expiry, offset=False)
extra = {'username': username, 'token_expiration': token_expire_string}
LOG.audit('Access granted to "%s" with the token set to expire at "%s".' %
(username_string, token_expire_string), extra=extra)
return token
开发者ID:Kailashkatheth1,项目名称:st2,代码行数:46,代码来源:access.py
示例20: setUp
def setUp(self):
super(HandlerTestCase, self).setUp()
cfg.CONF.auth.backend = 'mock'
self.users = {}
self.roles = {}
self.role_assignments = {}
# Insert some mock users
user_1_db = UserDB(name='auser')
user_1_db = User.add_or_update(user_1_db)
self.users['user_1'] = user_1_db
user_2_db = UserDB(name='buser')
user_2_db = User.add_or_update(user_2_db)
self.users['user_2'] = user_2_db
# Insert mock local role assignments
role_db = create_role(name='mock_local_role_1')
user_db = self.users['user_1']
source = 'assignments/%s.yaml' % user_db.name
role_assignment_db_1 = assign_role_to_user(
role_db=role_db, user_db=user_db, source=source, is_remote=False)
self.roles['mock_local_role_1'] = role_db
self.role_assignments['assignment_1'] = role_assignment_db_1
role_db = create_role(name='mock_local_role_2')
user_db = self.users['user_1']
source = 'assignments/%s.yaml' % user_db.name
role_assignment_db_2 = assign_role_to_user(
role_db=role_db, user_db=user_db, source=source, is_remote=False)
self.roles['mock_local_role_2'] = role_db
self.role_assignments['assignment_2'] = role_assignment_db_2
role_db = create_role(name='mock_role_3')
self.roles['mock_role_3'] = role_db
role_db = create_role(name='mock_role_4')
self.roles['mock_role_4'] = role_db
role_db = create_role(name='mock_role_5')
self.roles['mock_role_5'] = role_db
开发者ID:lyandut,项目名称:st2,代码行数:45,代码来源:test_handlers.py
注:本文中的st2common.persistence.auth.User类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论