本文整理汇总了Python中mistral.api.access_control.enforce函数的典型用法代码示例。如果您正苦于以下问题:Python enforce函数的具体用法?Python enforce怎么用?Python enforce使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了enforce函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: put
def put(self, env):
"""Update an environment.
:param env: Required. Environment structure to update
"""
acl.enforce('environments:update', context.ctx())
if not env.name:
raise exceptions.InputException(
'Name of the environment is not provided.'
)
LOG.debug("Update environment [name=%s, env=%s]", env.name, cut(env))
definition = json.loads(wsme_pecan.pecan.request.body.decode())
definition.pop('name')
self._validate_environment(
definition,
['description', 'variables', 'scope']
)
db_model = rest_utils.rest_retry_on_db_error(
db_api.update_environment
)(env.name, env.to_dict())
return resources.Environment.from_db_model(db_model)
开发者ID:openstack,项目名称:mistral,代码行数:27,代码来源:environment.py
示例2: put
def put(self, member_id, member_info):
"""Sets the status for a resource member."""
acl.enforce('members:update', context.ctx())
LOG.debug(
"Update resource member status. [resource_id=%s, "
"member_id=%s, member_info=%s].",
self.resource_id,
member_id,
member_info
)
if not member_info.status:
msg = "Status must be provided."
raise exc.WorkflowException(msg)
db_member = rest_utils.rest_retry_on_db_error(
db_api.update_resource_member
)(
self.resource_id,
self.type,
member_id,
{'status': member_info.status}
)
return resources.Member.from_db_model(db_member)
开发者ID:openstack,项目名称:mistral,代码行数:26,代码来源:member.py
示例3: delete
def delete(self, name):
"""Delete cron trigger."""
acl.enforce('cron_triggers:delete', context.ctx())
LOG.info("Delete cron trigger [name=%s]" % name)
db_api.delete_cron_trigger(name)
开发者ID:Tesora,项目名称:tesora-mistral,代码行数:7,代码来源:cron_trigger.py
示例4: delete
def delete(self, id):
"""Delete the specified Execution."""
acl.enforce('executions:delete', context.ctx())
LOG.info("Delete execution [id=%s]", id)
return db_api.delete_workflow_execution(id)
开发者ID:Tesora,项目名称:tesora-mistral,代码行数:7,代码来源:execution.py
示例5: get_all
def get_all(self, marker=None, limit=None, sort_keys='created_at',
sort_dirs='asc', fields='', all_projects=False, **filters):
"""Return all event triggers."""
acl.enforce('event_triggers:list', auth_ctx.ctx())
if all_projects:
acl.enforce('event_triggers:list:all_projects', auth_ctx.ctx())
LOG.debug(
"Fetch event triggers. marker=%s, limit=%s, sort_keys=%s, "
"sort_dirs=%s, fields=%s, all_projects=%s, filters=%s", marker,
limit, sort_keys, sort_dirs, fields, all_projects, filters
)
return rest_utils.get_all(
resources.EventTriggers,
resources.EventTrigger,
db_api.get_event_triggers,
db_api.get_event_trigger,
resource_function=None,
marker=marker,
limit=limit,
sort_keys=sort_keys,
sort_dirs=sort_dirs,
fields=fields,
all_projects=all_projects,
**filters
)
开发者ID:openstack,项目名称:mistral,代码行数:28,代码来源:event_trigger.py
示例6: post
def post(self, member_info):
"""Shares the resource to a new member."""
acl.enforce('members:create', context.ctx())
LOG.info(
"Share resource to a member. [resource_id=%s, "
"resource_type=%s, member_info=%s].",
self.resource_id,
self.type,
member_info
)
if not member_info.member_id:
msg = "Member id must be provided."
raise exc.WorkflowException(msg)
wf_db = db_api.get_workflow_definition(self.resource_id)
if wf_db.scope != 'private':
msg = "Only private resource could be shared."
raise exc.WorkflowException(msg)
resource_member = {
'resource_id': self.resource_id,
'resource_type': self.type,
'member_id': member_info.member_id,
'status': 'pending'
}
db_member = db_api.create_resource_member(resource_member)
return resources.Member.from_dict(db_member.to_dict())
开发者ID:anilyadav,项目名称:mistral,代码行数:32,代码来源:member.py
示例7: post
def post(self, wf_ex):
"""Create a new Execution.
:param wf_ex: Execution object with input content.
"""
acl.enforce('executions:create', context.ctx())
LOG.info('Create execution [execution=%s]' % wf_ex)
engine = rpc.get_engine_client()
exec_dict = wf_ex.to_dict()
if not (exec_dict.get('workflow_id')
or exec_dict.get('workflow_name')):
raise exc.WorkflowException(
"Workflow ID or workflow name must be provided. Workflow ID is"
" recommended."
)
result = engine.start_workflow(
exec_dict.get('workflow_id', exec_dict.get('workflow_name')),
exec_dict.get('input'),
exec_dict.get('description', ''),
**exec_dict.get('params') or {}
)
return Execution.from_dict(result)
开发者ID:PrinceKatiyar,项目名称:mistral,代码行数:26,代码来源:execution.py
示例8: delete
def delete(self, name):
"""Delete the named workbook."""
acl.enforce('workbooks:delete', context.ctx())
LOG.info("Delete workbook [name=%s]" % name)
db_api.delete_workbook(name)
开发者ID:luckysamadhiya93,项目名称:mistral,代码行数:7,代码来源:workbook.py
示例9: put
def put(self, id, event_trigger):
"""Updates an existing event trigger.
The exchange, topic and event can not be updated. The right way to
change them is to delete the event trigger first, then create a new
event trigger with new params.
"""
acl.enforce('event_triggers:update', auth_ctx.ctx())
values = event_trigger.to_dict()
for field in UPDATE_NOT_ALLOWED:
if values.get(field):
raise exc.EventTriggerException(
"Can not update fields %s of event trigger." %
UPDATE_NOT_ALLOWED
)
LOG.debug('Update event trigger: [id=%s, values=%s]', id, values)
@rest_utils.rest_retry_on_db_error
def _update_event_trigger():
with db_api.transaction():
# ensure that event trigger exists
db_api.get_event_trigger(id)
return triggers.update_event_trigger(id, values)
db_model = _update_event_trigger()
return resources.EventTrigger.from_db_model(db_model)
开发者ID:openstack,项目名称:mistral,代码行数:31,代码来源:event_trigger.py
示例10: post
def post(self, namespace=''):
"""Create a new workbook.
:param namespace: Optional. The namespace to create the workbook
in. Workbooks with the same name can be added to a given
project if they are in two different namespaces.
"""
acl.enforce('workbooks:create', context.ctx())
definition = pecan.request.text
scope = pecan.request.GET.get('scope', 'private')
resources.Workbook.validate_scope(scope)
LOG.debug("Create workbook [definition=%s]", definition)
wb_db = rest_utils.rest_retry_on_db_error(
workbooks.create_workbook_v2)(
definition,
namespace=namespace,
scope=scope
)
pecan.response.status = 201
return resources.Workbook.from_db_model(wb_db).to_json()
开发者ID:openstack,项目名称:mistral,代码行数:26,代码来源:workbook.py
示例11: post
def post(self):
"""Create a new action.
NOTE: This text is allowed to have definitions
of multiple actions. In this case they all will be created.
"""
acl.enforce('actions:create', context.ctx())
definition = pecan.request.text
scope = pecan.request.GET.get('scope', 'private')
pecan.response.status = 201
resources.Action.validate_scope(scope)
if scope == 'public':
acl.enforce('actions:publicize', context.ctx())
LOG.debug("Create action(s) [definition=%s]", definition)
@rest_utils.rest_retry_on_db_error
def _create_action_definitions():
with db_api.transaction():
return actions.create_actions(definition, scope=scope)
db_acts = _create_action_definitions()
action_list = [
resources.Action.from_db_model(db_act) for db_act in db_acts
]
return resources.Actions(actions=action_list).to_json()
开发者ID:openstack,项目名称:mistral,代码行数:30,代码来源:action.py
示例12: post
def post(self, event_trigger):
"""Creates a new event trigger."""
acl.enforce('event_triggers:create', auth_ctx.ctx())
values = event_trigger.to_dict()
input_keys = [k for k in values if values[k]]
if CREATE_MANDATORY - set(input_keys):
raise exc.EventTriggerException(
"Params %s must be provided for creating event trigger." %
CREATE_MANDATORY
)
if values.get('scope') == 'public':
acl.enforce('event_triggers:create:public', auth_ctx.ctx())
LOG.debug('Create event trigger: %s', values)
db_model = rest_utils.rest_retry_on_db_error(
triggers.create_event_trigger
)(
name=values.get('name', ''),
exchange=values.get('exchange'),
topic=values.get('topic'),
event=values.get('event'),
workflow_id=values.get('workflow_id'),
scope=values.get('scope'),
workflow_input=values.get('workflow_input'),
workflow_params=values.get('workflow_params'),
)
return resources.EventTrigger.from_db_model(db_model)
开发者ID:openstack,项目名称:mistral,代码行数:32,代码来源:event_trigger.py
示例13: post
def post(self, action_ex):
"""Create new action_execution.
:param action_ex: Action to execute
"""
acl.enforce('action_executions:create', context.ctx())
LOG.debug(
"Create action_execution [action_execution=%s]",
action_ex
)
name = action_ex.name
description = action_ex.description or None
action_input = action_ex.input or {}
params = action_ex.params or {}
if not name:
raise exc.InputException(
"Please provide at least action name to run action."
)
values = rpc.get_engine_client().start_action(
name,
action_input,
description=description,
**params
)
return resources.ActionExecution.from_dict(values)
开发者ID:openstack,项目名称:mistral,代码行数:30,代码来源:action_execution.py
示例14: post
def post(self, namespace=''):
"""Create a new workflow.
:param namespace: Optional. The namespace to create the workflow
in. Workflows with the same name can be added to a given
project if they are in two different namespaces.
The text is allowed to have definitions of multiple workflows.
In such case, they all will be created.
"""
acl.enforce('workflows:create', context.ctx())
definition = pecan.request.text
scope = pecan.request.GET.get('scope', 'private')
pecan.response.status = 201
resources.Workflow.validate_scope(scope)
if scope == 'public':
acl.enforce('workflows:publicize', context.ctx())
LOG.debug("Create workflow(s) [definition=%s]", definition)
db_wfs = rest_utils.rest_retry_on_db_error(workflows.create_workflows)(
definition,
scope=scope,
namespace=namespace
)
workflow_list = [
resources.Workflow.from_db_model(db_wf) for db_wf in db_wfs
]
return resources.Workflows(workflows=workflow_list).to_json()
开发者ID:openstack,项目名称:mistral,代码行数:33,代码来源:workflow.py
示例15: post
def post(self):
"""Create a new workflow.
NOTE: The text is allowed to have definitions
of multiple workflows. In this case they all will be created.
"""
acl.enforce('workflows:create', context.ctx())
definition = pecan.request.text
scope = pecan.request.GET.get('scope', 'private')
pecan.response.status = 201
if scope not in resources.SCOPE_TYPES.values:
raise exc.InvalidModelException(
"Scope must be one of the following: %s; actual: "
"%s" % (resources.SCOPE_TYPES.values, scope)
)
LOG.info("Create workflow(s) [definition=%s]" % definition)
db_wfs = workflows.create_workflows(definition, scope=scope)
models_dicts = [db_wf.to_dict() for db_wf in db_wfs]
workflow_list = [
resources.Workflow.from_dict(wf) for wf in models_dicts
]
return resources.Workflows(workflows=workflow_list).to_json()
开发者ID:luckysamadhiya93,项目名称:mistral,代码行数:28,代码来源:workflow.py
示例16: delete
def delete(self, name):
"""Delete the named environment."""
acl.enforce('environments:delete', context.ctx())
LOG.info("Delete environment [name=%s]" % name)
db_api.delete_environment(name)
开发者ID:luckysamadhiya93,项目名称:mistral,代码行数:7,代码来源:environment.py
示例17: get_all
def get_all(self, marker=None, limit=None, sort_keys='created_at',
sort_dirs='asc', fields='', name=None, description=None,
variables=None, scope=None, created_at=None, updated_at=None):
"""Return all environments.
Where project_id is the same as the requester or
project_id is different but the scope is public.
:param marker: Optional. Pagination marker for large data sets.
:param limit: Optional. Maximum number of resources to return in a
single result. Default value is None for backward
compatibility.
:param sort_keys: Optional. Columns to sort results by.
Default: created_at, which is backward compatible.
:param sort_dirs: Optional. Directions to sort corresponding to
sort_keys, "asc" or "desc" can be chosen.
Default: desc. The length of sort_dirs can be equal
or less than that of sort_keys.
:param fields: Optional. A specified list of fields of the resource to
be returned. 'id' will be included automatically in
fields if it's provided, since it will be used when
constructing 'next' link.
:param name: Optional. Keep only resources with a specific name.
:param description: Optional. Keep only resources with a specific
description.
:param variables: Optional. Keep only resources with specific
variables.
:param scope: Optional. Keep only resources with a specific scope.
:param created_at: Optional. Keep only resources created at a specific
time and date.
:param updated_at: Optional. Keep only resources with specific latest
update time and date.
"""
acl.enforce('environments:list', context.ctx())
filters = rest_utils.filters_to_dict(
created_at=created_at,
name=name,
updated_at=updated_at,
description=description,
variables=variables,
scope=scope
)
LOG.info("Fetch environments. marker=%s, limit=%s, sort_keys=%s, "
"sort_dirs=%s, filters=%s", marker, limit, sort_keys,
sort_dirs, filters)
return rest_utils.get_all(
resources.Environments,
resources.Environment,
db_api.get_environments,
db_api.get_environment,
marker=marker,
limit=limit,
sort_keys=sort_keys,
sort_dirs=sort_dirs,
fields=fields,
**filters
)
开发者ID:anilyadav,项目名称:mistral,代码行数:60,代码来源:environment.py
示例18: put
def put(self, identifier=None):
"""Update one or more actions.
NOTE: This text is allowed to have definitions
of multiple actions. In this case they all will be updated.
"""
acl.enforce('actions:update', context.ctx())
definition = pecan.request.text
LOG.info("Update action(s) [definition=%s]", definition)
scope = pecan.request.GET.get('scope', 'private')
if scope not in resources.SCOPE_TYPES.values:
raise exc.InvalidModelException(
"Scope must be one of the following: %s; actual: "
"%s" % (resources.SCOPE_TYPES.values, scope)
)
with db_api.transaction():
db_acts = actions.update_actions(
definition,
scope=scope,
identifier=identifier
)
models_dicts = [db_act.to_dict() for db_act in db_acts]
action_list = [resources.Action.from_dict(act) for act in models_dicts]
return resources.Actions(actions=action_list).to_json()
开发者ID:Tesora,项目名称:tesora-mistral,代码行数:28,代码来源:action.py
示例19: post
def post(self, cron_trigger):
"""Creates a new cron trigger.
:param cron_trigger: Required. Cron trigger structure.
"""
acl.enforce('cron_triggers:create', context.ctx())
LOG.debug('Create cron trigger: %s', cron_trigger)
values = cron_trigger.to_dict()
db_model = rest_utils.rest_retry_on_db_error(
triggers.create_cron_trigger
)(
name=values['name'],
workflow_name=values.get('workflow_name'),
workflow_input=values.get('workflow_input'),
workflow_params=values.get('workflow_params'),
pattern=values.get('pattern'),
first_time=values.get('first_execution_time'),
count=values.get('remaining_executions'),
workflow_id=values.get('workflow_id')
)
return resources.CronTrigger.from_db_model(db_model)
开发者ID:openstack,项目名称:mistral,代码行数:25,代码来源:cron_trigger.py
示例20: put
def put(self, identifier=None):
"""Update one or more workflows.
:param identifier: Optional. If provided, it's UUID of a workflow.
Only one workflow can be updated with identifier param.
The text is allowed to have definitions of multiple workflows. In this
case they all will be updated.
"""
acl.enforce('workflows:update', context.ctx())
definition = pecan.request.text
scope = pecan.request.GET.get('scope', 'private')
if scope not in SCOPE_TYPES.values:
raise exc.InvalidModelException(
"Scope must be one of the following: %s; actual: "
"%s" % (SCOPE_TYPES.values, scope)
)
LOG.info("Update workflow(s) [definition=%s]" % definition)
db_wfs = workflows.update_workflows(
definition,
scope=scope,
identifier=identifier
)
models_dicts = [db_wf.to_dict() for db_wf in db_wfs]
workflow_list = [Workflow.from_dict(wf) for wf in models_dicts]
return (workflow_list[0].to_json() if identifier
else Workflows(workflows=workflow_list).to_json())
开发者ID:PrinceKatiyar,项目名称:mistral,代码行数:32,代码来源:workflow.py
注:本文中的mistral.api.access_control.enforce函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论