本文整理汇总了Python中superset.security_manager.find_role函数的典型用法代码示例。如果您正苦于以下问题:Python find_role函数的具体用法?Python find_role怎么用?Python find_role使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了find_role函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_clean_requests_after_alpha_grant
def test_clean_requests_after_alpha_grant(self):
session = db.session
# Case 2. Two access requests from gamma and gamma2
# Gamma becomes alpha, gamma2 gets granted
# Check if request by gamma has been deleted
access_request1 = create_access_request(
session, 'table', 'birth_names', TEST_ROLE_1, 'gamma')
create_access_request(
session, 'table', 'birth_names', TEST_ROLE_2, 'gamma2')
ds_1_id = access_request1.datasource_id
# gamma becomes alpha
alpha_role = security_manager.find_role('Alpha')
gamma_user = security_manager.find_user(username='gamma')
gamma_user.roles.append(alpha_role)
session.commit()
access_requests = self.get_access_requests('gamma', 'table', ds_1_id)
self.assertTrue(access_requests)
self.client.get(EXTEND_ROLE_REQUEST.format(
'table', ds_1_id, 'gamma2', TEST_ROLE_2))
access_requests = self.get_access_requests('gamma', 'table', ds_1_id)
self.assertFalse(access_requests)
gamma_user = security_manager.find_user(username='gamma')
gamma_user.roles.remove(security_manager.find_role('Alpha'))
session.commit()
开发者ID:tan31989,项目名称:caravel,代码行数:27,代码来源:access_tests.py
示例2: test_clean_requests_after_db_grant
def test_clean_requests_after_db_grant(self):
session = db.session
# Case 3. Two access requests from gamma and gamma2
# Gamma gets database access, gamma2 access request granted
# Check if request by gamma has been deleted
gamma_user = security_manager.find_user(username='gamma')
access_request1 = create_access_request(
session, 'table', 'energy_usage', TEST_ROLE_1, 'gamma')
create_access_request(
session, 'table', 'energy_usage', TEST_ROLE_2, 'gamma2')
ds_1_id = access_request1.datasource_id
# gamma gets granted database access
database = session.query(models.Database).first()
security_manager.merge_perm('database_access', database.perm)
ds_perm_view = security_manager.find_permission_view_menu(
'database_access', database.perm)
security_manager.add_permission_role(
security_manager.find_role(DB_ACCESS_ROLE), ds_perm_view)
gamma_user.roles.append(security_manager.find_role(DB_ACCESS_ROLE))
session.commit()
access_requests = self.get_access_requests('gamma', 'table', ds_1_id)
self.assertTrue(access_requests)
# gamma2 request gets fulfilled
self.client.get(EXTEND_ROLE_REQUEST.format(
'table', ds_1_id, 'gamma2', TEST_ROLE_2))
access_requests = self.get_access_requests('gamma', 'table', ds_1_id)
self.assertFalse(access_requests)
gamma_user = security_manager.find_user(username='gamma')
gamma_user.roles.remove(security_manager.find_role(DB_ACCESS_ROLE))
session.commit()
开发者ID:tan31989,项目名称:caravel,代码行数:34,代码来源:access_tests.py
示例3: test_sql_json_has_access
def test_sql_json_has_access(self):
main_db = self.get_main_database(db.session)
security_manager.add_permission_view_menu('database_access', main_db.perm)
db.session.commit()
main_db_permission_view = (
db.session.query(ab_models.PermissionView)
.join(ab_models.ViewMenu)
.join(ab_models.Permission)
.filter(ab_models.ViewMenu.name == '[main].(id:1)')
.filter(ab_models.Permission.name == 'database_access')
.first()
)
astronaut = security_manager.add_role('Astronaut')
security_manager.add_permission_role(astronaut, main_db_permission_view)
# Astronaut role is Gamma + sqllab + main db permissions
for perm in security_manager.find_role('Gamma').permissions:
security_manager.add_permission_role(astronaut, perm)
for perm in security_manager.find_role('sql_lab').permissions:
security_manager.add_permission_role(astronaut, perm)
gagarin = security_manager.find_user('gagarin')
if not gagarin:
security_manager.add_user(
'gagarin', 'Iurii', 'Gagarin', '[email protected]',
astronaut,
password='general')
data = self.run_sql('SELECT * FROM ab_user', '3', user_name='gagarin')
db.session.query(Query).delete()
db.session.commit()
self.assertLess(0, len(data['data']))
开发者ID:zhangxi123051,项目名称:incubator-superset,代码行数:30,代码来源:sqllab_tests.py
示例4: tearDownClass
def tearDownClass(cls):
override_me = security_manager.find_role('override_me')
db.session.delete(override_me)
db.session.delete(security_manager.find_role(TEST_ROLE_1))
db.session.delete(security_manager.find_role(TEST_ROLE_2))
db.session.delete(security_manager.find_role(DB_ACCESS_ROLE))
db.session.delete(security_manager.find_role(SCHEMA_ACCESS_ROLE))
db.session.commit()
开发者ID:tan31989,项目名称:caravel,代码行数:8,代码来源:access_tests.py
示例5: test_clean_requests_after_role_extend
def test_clean_requests_after_role_extend(self):
session = db.session
# Case 1. Gamma and gamma2 requested test_role1 on energy_usage access
# Gamma already has role test_role1
# Extend test_role1 with energy_usage access for gamma2
# Check if access request for gamma at energy_usage was deleted
# gamma2 and gamma request table_role on energy usage
if app.config.get('ENABLE_ACCESS_REQUEST'):
access_request1 = create_access_request(
session, 'table', 'random_time_series', TEST_ROLE_1, 'gamma2')
ds_1_id = access_request1.datasource_id
create_access_request(
session, 'table', 'random_time_series', TEST_ROLE_1, 'gamma')
access_requests = self.get_access_requests('gamma', 'table', ds_1_id)
self.assertTrue(access_requests)
# gamma gets test_role1
self.get_resp(GRANT_ROLE_REQUEST.format(
'table', ds_1_id, 'gamma', TEST_ROLE_1))
# extend test_role1 with access on energy usage
self.client.get(EXTEND_ROLE_REQUEST.format(
'table', ds_1_id, 'gamma2', TEST_ROLE_1))
access_requests = self.get_access_requests('gamma', 'table', ds_1_id)
self.assertFalse(access_requests)
gamma_user = security_manager.find_user(username='gamma')
gamma_user.roles.remove(security_manager.find_role('test_role1'))
开发者ID:tan31989,项目名称:caravel,代码行数:28,代码来源:access_tests.py
示例6: test_filter_druid_datasource
def test_filter_druid_datasource(self):
CLUSTER_NAME = 'new_druid'
cluster = self.get_or_create(
DruidCluster,
{'cluster_name': CLUSTER_NAME},
db.session)
db.session.merge(cluster)
gamma_ds = self.get_or_create(
DruidDatasource, {'datasource_name': 'datasource_for_gamma'},
db.session)
gamma_ds.cluster = cluster
db.session.merge(gamma_ds)
no_gamma_ds = self.get_or_create(
DruidDatasource, {'datasource_name': 'datasource_not_for_gamma'},
db.session)
no_gamma_ds.cluster = cluster
db.session.merge(no_gamma_ds)
db.session.commit()
security_manager.merge_perm('datasource_access', gamma_ds.perm)
security_manager.merge_perm('datasource_access', no_gamma_ds.perm)
perm = security_manager.find_permission_view_menu(
'datasource_access', gamma_ds.get_perm())
security_manager.add_permission_role(security_manager.find_role('Gamma'), perm)
security_manager.get_session.commit()
self.login(username='gamma')
url = '/druiddatasourcemodelview/list/'
resp = self.get_resp(url)
self.assertIn('datasource_for_gamma', resp)
self.assertNotIn('datasource_not_for_gamma', resp)
开发者ID:neuroradiology,项目名称:caravel,代码行数:34,代码来源:druid_tests.py
示例7: revoke_public_access_to_table
def revoke_public_access_to_table(self, table):
public_role = security_manager.find_role('Public')
perms = db.session.query(ab_models.PermissionView).all()
for perm in perms:
if (perm.permission.name == 'datasource_access' and
perm.view_menu and table.perm in perm.view_menu.name):
security_manager.del_permission_role(public_role, perm)
开发者ID:liguo86,项目名称:incubator-superset,代码行数:7,代码来源:base_tests.py
示例8: assert_admin_view_menus_in
def assert_admin_view_menus_in(role_name, assert_func):
role = security_manager.find_role(role_name)
view_menus = [p.view_menu.name for p in role.permissions]
assert_func('ResetPasswordView', view_menus)
assert_func('RoleModelView', view_menus)
assert_func('Security', view_menus)
assert_func('UserDBModelView', view_menus)
assert_func('SQL Lab',
view_menus)
开发者ID:zhangxi123051,项目名称:incubator-superset,代码行数:9,代码来源:core_tests.py
示例9: test_queryview_filter_owner_only
def test_queryview_filter_owner_only(self) -> None:
"""
Test queryview api with can_only_access_owned_queries perm added to
Admin and make sure only Admin queries show up.
"""
session = db.session
# Add can_only_access_owned_queries perm to Admin user
owned_queries_view = security_manager.find_permission_view_menu(
'can_only_access_owned_queries',
'can_only_access_owned_queries',
)
security_manager.add_permission_role(
security_manager.find_role('Admin'),
owned_queries_view,
)
session.commit()
# Test search_queries for Admin user
self.run_some_queries()
self.login('admin')
url = '/queryview/api/read'
data = self.get_json_resp(url)
admin = security_manager.find_user('admin')
self.assertEquals(2, len(data['result']))
all_admin_user_queries = all([
result.get('username') == admin.username for result in data['result']
])
assert all_admin_user_queries is True
# Remove can_only_access_owned_queries from Admin
owned_queries_view = security_manager.find_permission_view_menu(
'can_only_access_owned_queries',
'can_only_access_owned_queries',
)
security_manager.del_permission_role(
security_manager.find_role('Admin'),
owned_queries_view,
)
session.commit()
开发者ID:tan31989,项目名称:caravel,代码行数:42,代码来源:sqllab_tests.py
示例10: test_search_query_with_owner_only_perms
def test_search_query_with_owner_only_perms(self) -> None:
"""
Test a search query with can_only_access_owned_queries perm added to
Admin and make sure only Admin queries show up.
"""
session = db.session
# Add can_only_access_owned_queries perm to Admin user
owned_queries_view = security_manager.find_permission_view_menu(
'can_only_access_owned_queries',
'can_only_access_owned_queries',
)
security_manager.add_permission_role(
security_manager.find_role('Admin'),
owned_queries_view,
)
session.commit()
# Test search_queries for Admin user
self.run_some_queries()
self.login('admin')
user_id = security_manager.find_user('admin').id
data = self.get_json_resp('/superset/search_queries')
self.assertEquals(2, len(data))
user_ids = {k['userId'] for k in data}
self.assertEquals(set([user_id]), user_ids)
# Remove can_only_access_owned_queries from Admin
owned_queries_view = security_manager.find_permission_view_menu(
'can_only_access_owned_queries',
'can_only_access_owned_queries',
)
security_manager.del_permission_role(
security_manager.find_role('Admin'),
owned_queries_view,
)
session.commit()
开发者ID:tan31989,项目名称:caravel,代码行数:39,代码来源:sqllab_tests.py
示例11: test_override_role_permissions_drops_absent_perms
def test_override_role_permissions_drops_absent_perms(self):
override_me = security_manager.find_role('override_me')
override_me.permissions.append(
security_manager.find_permission_view_menu(
view_menu_name=self.get_table_by_name('energy_usage').perm,
permission_name='datasource_access'),
)
db.session.flush()
response = self.client.post(
'/superset/override_role_permissions/',
data=json.dumps(ROLE_TABLES_PERM_DATA),
content_type='application/json')
self.assertEquals(201, response.status_code)
updated_override_me = security_manager.find_role('override_me')
self.assertEquals(1, len(updated_override_me.permissions))
birth_names = self.get_table_by_name('birth_names')
self.assertEquals(
birth_names.perm,
updated_override_me.permissions[0].view_menu.name)
self.assertEquals(
'datasource_access',
updated_override_me.permissions[0].permission.name)
开发者ID:tan31989,项目名称:caravel,代码行数:23,代码来源:access_tests.py
示例12: test_clean_requests_after_schema_grant
def test_clean_requests_after_schema_grant(self):
session = db.session
# Case 4. Two access requests from gamma and gamma2
# Gamma gets schema access, gamma2 access request granted
# Check if request by gamma has been deleted
gamma_user = security_manager.find_user(username='gamma')
access_request1 = create_access_request(
session, 'table', 'wb_health_population', TEST_ROLE_1, 'gamma')
create_access_request(
session, 'table', 'wb_health_population', TEST_ROLE_2, 'gamma2')
ds_1_id = access_request1.datasource_id
ds = session.query(SqlaTable).filter_by(
table_name='wb_health_population').first()
ds.schema = 'temp_schema'
security_manager.merge_perm('schema_access', ds.schema_perm)
schema_perm_view = security_manager.find_permission_view_menu(
'schema_access', ds.schema_perm)
security_manager.add_permission_role(
security_manager.find_role(SCHEMA_ACCESS_ROLE), schema_perm_view)
gamma_user.roles.append(security_manager.find_role(SCHEMA_ACCESS_ROLE))
session.commit()
# gamma2 request gets fulfilled
self.client.get(EXTEND_ROLE_REQUEST.format(
'table', ds_1_id, 'gamma2', TEST_ROLE_2))
access_requests = self.get_access_requests('gamma', 'table', ds_1_id)
self.assertFalse(access_requests)
gamma_user = security_manager.find_user(username='gamma')
gamma_user.roles.remove(security_manager.find_role(SCHEMA_ACCESS_ROLE))
ds = session.query(SqlaTable).filter_by(
table_name='wb_health_population').first()
ds.schema = None
session.commit()
开发者ID:tan31989,项目名称:caravel,代码行数:37,代码来源:access_tests.py
示例13: test_override_role_permissions_1_table
def test_override_role_permissions_1_table(self):
response = self.client.post(
'/superset/override_role_permissions/',
data=json.dumps(ROLE_TABLES_PERM_DATA),
content_type='application/json')
self.assertEquals(201, response.status_code)
updated_override_me = security_manager.find_role('override_me')
self.assertEquals(1, len(updated_override_me.permissions))
birth_names = self.get_table_by_name('birth_names')
self.assertEquals(
birth_names.perm,
updated_override_me.permissions[0].view_menu.name)
self.assertEquals(
'datasource_access',
updated_override_me.permissions[0].permission.name)
开发者ID:tan31989,项目名称:caravel,代码行数:16,代码来源:access_tests.py
示例14: load_test_users_run
def load_test_users_run():
"""
Loads admin, alpha, and gamma user for testing purposes
Syncs permissions for those users/roles
"""
if config.get('TESTING'):
security_manager.sync_role_definitions()
gamma_sqllab_role = security_manager.add_role('gamma_sqllab')
for perm in security_manager.find_role('Gamma').permissions:
security_manager.add_permission_role(gamma_sqllab_role, perm)
utils.get_or_create_main_db()
db_perm = utils.get_main_database(security_manager.get_session).perm
security_manager.merge_perm('database_access', db_perm)
db_pvm = security_manager.find_permission_view_menu(
view_menu_name=db_perm, permission_name='database_access')
gamma_sqllab_role.permissions.append(db_pvm)
for perm in security_manager.find_role('sql_lab').permissions:
security_manager.add_permission_role(gamma_sqllab_role, perm)
admin = security_manager.find_user('admin')
if not admin:
security_manager.add_user(
'admin', 'admin', ' user', '[email protected]',
security_manager.find_role('Admin'),
password='general')
gamma = security_manager.find_user('gamma')
if not gamma:
security_manager.add_user(
'gamma', 'gamma', 'user', '[email protected]',
security_manager.find_role('Gamma'),
password='general')
gamma2 = security_manager.find_user('gamma2')
if not gamma2:
security_manager.add_user(
'gamma2', 'gamma2', 'user', '[email protected]',
security_manager.find_role('Gamma'),
password='general')
gamma_sqllab_user = security_manager.find_user('gamma_sqllab')
if not gamma_sqllab_user:
security_manager.add_user(
'gamma_sqllab', 'gamma_sqllab', 'user', '[email protected]',
gamma_sqllab_role, password='general')
alpha = security_manager.find_user('alpha')
if not alpha:
security_manager.add_user(
'alpha', 'alpha', 'user', '[email protected]',
security_manager.find_role('Alpha'),
password='general')
security_manager.get_session.commit()
开发者ID:chenhaiyan,项目名称:incubator-superset,代码行数:54,代码来源:cli.py
示例15: test_gamma_permissions
def test_gamma_permissions(self):
def assert_can_read(view_menu):
self.assertIn(('can_show', view_menu), gamma_perm_set)
self.assertIn(('can_list', view_menu), gamma_perm_set)
def assert_can_write(view_menu):
self.assertIn(('can_add', view_menu), gamma_perm_set)
self.assertIn(('can_download', view_menu), gamma_perm_set)
self.assertIn(('can_delete', view_menu), gamma_perm_set)
self.assertIn(('can_edit', view_menu), gamma_perm_set)
def assert_cannot_write(view_menu):
self.assertNotIn(('can_add', view_menu), gamma_perm_set)
self.assertNotIn(('can_download', view_menu), gamma_perm_set)
self.assertNotIn(('can_delete', view_menu), gamma_perm_set)
self.assertNotIn(('can_edit', view_menu), gamma_perm_set)
self.assertNotIn(('can_save', view_menu), gamma_perm_set)
def assert_can_all(view_menu):
assert_can_read(view_menu)
assert_can_write(view_menu)
gamma_perm_set = set()
for perm in security_manager.find_role('Gamma').permissions:
gamma_perm_set.add((perm.permission.name, perm.view_menu.name))
# check read only perms
assert_can_read('TableModelView')
assert_cannot_write('DruidColumnInlineView')
# make sure that user can create slices and dashboards
assert_can_all('SliceModelView')
assert_can_all('DashboardModelView')
self.assertIn(('can_add_slices', 'Superset'), gamma_perm_set)
self.assertIn(('can_copy_dash', 'Superset'), gamma_perm_set)
self.assertIn(('can_created_dashboards', 'Superset'), gamma_perm_set)
self.assertIn(('can_created_slices', 'Superset'), gamma_perm_set)
self.assertIn(('can_csv', 'Superset'), gamma_perm_set)
self.assertIn(('can_dashboard', 'Superset'), gamma_perm_set)
self.assertIn(('can_explore', 'Superset'), gamma_perm_set)
self.assertIn(('can_explore_json', 'Superset'), gamma_perm_set)
self.assertIn(('can_fave_dashboards', 'Superset'), gamma_perm_set)
self.assertIn(('can_fave_slices', 'Superset'), gamma_perm_set)
self.assertIn(('can_save_dash', 'Superset'), gamma_perm_set)
self.assertIn(('can_slice', 'Superset'), gamma_perm_set)
开发者ID:tan31989,项目名称:caravel,代码行数:46,代码来源:security_tests.py
示例16: test_slices_V2
def test_slices_V2(self):
# Add explore-v2-beta role to admin user
# Test all slice urls as user with with explore-v2-beta role
security_manager.add_role('explore-v2-beta')
security_manager.add_user(
'explore_beta', 'explore_beta', ' user', '[email protected]',
security_manager.find_role('explore-v2-beta'),
password='general')
self.login(username='explore_beta', password='general')
Slc = models.Slice
urls = []
for slc in db.session.query(Slc).all():
urls += [
(slc.slice_name, 'slice_url', slc.slice_url),
]
for name, method, url in urls:
print('[{name}]/[{method}]: {url}'.format(**locals()))
response = self.client.get(url)
开发者ID:zhangxi123051,项目名称:incubator-superset,代码行数:20,代码来源:core_tests.py
示例17: create_access_request
def create_access_request(session, ds_type, ds_name, role_name, user_name):
ds_class = ConnectorRegistry.sources[ds_type]
# TODO: generalize datasource names
if ds_type == 'table':
ds = session.query(ds_class).filter(
ds_class.table_name == ds_name).first()
else:
ds = session.query(ds_class).filter(
ds_class.datasource_name == ds_name).first()
ds_perm_view = security_manager.find_permission_view_menu(
'datasource_access', ds.perm)
security_manager.add_permission_role(
security_manager.find_role(role_name), ds_perm_view)
access_request = models.DatasourceAccessRequest(
datasource_id=ds.id,
datasource_type=ds_type,
created_by_fk=security_manager.find_user(username=user_name).id,
)
session.add(access_request)
session.commit()
return access_request
开发者ID:tan31989,项目名称:caravel,代码行数:21,代码来源:access_tests.py
示例18: setUpClass
def setUpClass(cls):
try:
os.remove(app.config.get('SQL_CELERY_DB_FILE_PATH'))
except OSError as e:
app.logger.warn(str(e))
try:
os.remove(app.config.get('SQL_CELERY_RESULTS_DB_FILE_PATH'))
except OSError as e:
app.logger.warn(str(e))
security_manager.sync_role_definitions()
worker_command = BASE_DIR + '/bin/superset worker'
subprocess.Popen(
worker_command, shell=True, stdout=subprocess.PIPE)
admin = security_manager.find_user('admin')
if not admin:
security_manager.add_user(
'admin', 'admin', ' user', '[email protected]',
security_manager.find_role('Admin'),
password='general')
cli.load_examples(load_test_data=True)
开发者ID:liguo86,项目名称:incubator-superset,代码行数:23,代码来源:celery_tests.py
示例19: test_override_role_permissions_druid_and_table
def test_override_role_permissions_druid_and_table(self):
response = self.client.post(
'/superset/override_role_permissions/',
data=json.dumps(ROLE_ALL_PERM_DATA),
content_type='application/json')
self.assertEquals(201, response.status_code)
updated_role = security_manager.find_role('override_me')
perms = sorted(
updated_role.permissions, key=lambda p: p.view_menu.name)
druid_ds_1 = self.get_druid_ds_by_name('druid_ds_1')
self.assertEquals(druid_ds_1.perm, perms[0].view_menu.name)
self.assertEquals('datasource_access', perms[0].permission.name)
druid_ds_2 = self.get_druid_ds_by_name('druid_ds_2')
self.assertEquals(druid_ds_2.perm, perms[1].view_menu.name)
self.assertEquals(
'datasource_access', updated_role.permissions[1].permission.name)
birth_names = self.get_table_by_name('birth_names')
self.assertEquals(birth_names.perm, perms[2].view_menu.name)
self.assertEquals(
'datasource_access', updated_role.permissions[2].permission.name)
self.assertEquals(3, len(perms))
开发者ID:tan31989,项目名称:caravel,代码行数:24,代码来源:access_tests.py
示例20: assert_admin_permission_in
def assert_admin_permission_in(role_name, assert_func):
role = security_manager.find_role(role_name)
permissions = [p.permission.name for p in role.permissions]
assert_func('can_sync_druid_source', permissions)
assert_func('can_approve', permissions)
开发者ID:zhangxi123051,项目名称:incubator-superset,代码行数:5,代码来源:core_tests.py
注:本文中的superset.security_manager.find_role函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论