• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python access_control.enforce函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中mistral.api.access_control.enforce函数的典型用法代码示例。如果您正苦于以下问题:Python enforce函数的具体用法?Python enforce怎么用?Python enforce使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了enforce函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: put

    def put(self, env):
        """Update an environment.

        :param env: Required. Environment structure to update
        """
        acl.enforce('environments:update', context.ctx())

        if not env.name:
            raise exceptions.InputException(
                'Name of the environment is not provided.'
            )

        LOG.debug("Update environment [name=%s, env=%s]", env.name, cut(env))

        definition = json.loads(wsme_pecan.pecan.request.body.decode())
        definition.pop('name')

        self._validate_environment(
            definition,
            ['description', 'variables', 'scope']
        )

        db_model = rest_utils.rest_retry_on_db_error(
            db_api.update_environment
        )(env.name, env.to_dict())

        return resources.Environment.from_db_model(db_model)
开发者ID:openstack,项目名称:mistral,代码行数:27,代码来源:environment.py


示例2: put

    def put(self, member_id, member_info):
        """Sets the status for a resource member."""
        acl.enforce('members:update', context.ctx())

        LOG.debug(
            "Update resource member status. [resource_id=%s, "
            "member_id=%s, member_info=%s].",
            self.resource_id,
            member_id,
            member_info
        )

        if not member_info.status:
            msg = "Status must be provided."
            raise exc.WorkflowException(msg)

        db_member = rest_utils.rest_retry_on_db_error(
            db_api.update_resource_member
        )(
            self.resource_id,
            self.type,
            member_id,
            {'status': member_info.status}
        )

        return resources.Member.from_db_model(db_member)
开发者ID:openstack,项目名称:mistral,代码行数:26,代码来源:member.py


示例3: delete

    def delete(self, name):
        """Delete cron trigger."""
        acl.enforce('cron_triggers:delete', context.ctx())

        LOG.info("Delete cron trigger [name=%s]" % name)

        db_api.delete_cron_trigger(name)
开发者ID:Tesora,项目名称:tesora-mistral,代码行数:7,代码来源:cron_trigger.py


示例4: delete

    def delete(self, id):
        """Delete the specified Execution."""
        acl.enforce('executions:delete', context.ctx())

        LOG.info("Delete execution [id=%s]", id)

        return db_api.delete_workflow_execution(id)
开发者ID:Tesora,项目名称:tesora-mistral,代码行数:7,代码来源:execution.py


示例5: get_all

    def get_all(self, marker=None, limit=None, sort_keys='created_at',
                sort_dirs='asc', fields='', all_projects=False, **filters):
        """Return all event triggers."""
        acl.enforce('event_triggers:list', auth_ctx.ctx())

        if all_projects:
            acl.enforce('event_triggers:list:all_projects', auth_ctx.ctx())

        LOG.debug(
            "Fetch event triggers. marker=%s, limit=%s, sort_keys=%s, "
            "sort_dirs=%s, fields=%s, all_projects=%s, filters=%s", marker,
            limit, sort_keys, sort_dirs, fields, all_projects, filters
        )

        return rest_utils.get_all(
            resources.EventTriggers,
            resources.EventTrigger,
            db_api.get_event_triggers,
            db_api.get_event_trigger,
            resource_function=None,
            marker=marker,
            limit=limit,
            sort_keys=sort_keys,
            sort_dirs=sort_dirs,
            fields=fields,
            all_projects=all_projects,
            **filters
        )
开发者ID:openstack,项目名称:mistral,代码行数:28,代码来源:event_trigger.py


示例6: post

    def post(self, member_info):
        """Shares the resource to a new member."""
        acl.enforce('members:create', context.ctx())

        LOG.info(
            "Share resource to a member. [resource_id=%s, "
            "resource_type=%s, member_info=%s].",
            self.resource_id,
            self.type,
            member_info
        )

        if not member_info.member_id:
            msg = "Member id must be provided."
            raise exc.WorkflowException(msg)

        wf_db = db_api.get_workflow_definition(self.resource_id)

        if wf_db.scope != 'private':
            msg = "Only private resource could be shared."
            raise exc.WorkflowException(msg)

        resource_member = {
            'resource_id': self.resource_id,
            'resource_type': self.type,
            'member_id': member_info.member_id,
            'status': 'pending'
        }

        db_member = db_api.create_resource_member(resource_member)

        return resources.Member.from_dict(db_member.to_dict())
开发者ID:anilyadav,项目名称:mistral,代码行数:32,代码来源:member.py


示例7: post

    def post(self, wf_ex):
        """Create a new Execution.

        :param wf_ex: Execution object with input content.
        """
        acl.enforce('executions:create', context.ctx())
        LOG.info('Create execution [execution=%s]' % wf_ex)

        engine = rpc.get_engine_client()
        exec_dict = wf_ex.to_dict()

        if not (exec_dict.get('workflow_id')
                or exec_dict.get('workflow_name')):
            raise exc.WorkflowException(
                "Workflow ID or workflow name must be provided. Workflow ID is"
                " recommended."
            )

        result = engine.start_workflow(
            exec_dict.get('workflow_id', exec_dict.get('workflow_name')),
            exec_dict.get('input'),
            exec_dict.get('description', ''),
            **exec_dict.get('params') or {}
        )

        return Execution.from_dict(result)
开发者ID:PrinceKatiyar,项目名称:mistral,代码行数:26,代码来源:execution.py


示例8: delete

    def delete(self, name):
        """Delete the named workbook."""
        acl.enforce('workbooks:delete', context.ctx())

        LOG.info("Delete workbook [name=%s]" % name)

        db_api.delete_workbook(name)
开发者ID:luckysamadhiya93,项目名称:mistral,代码行数:7,代码来源:workbook.py


示例9: put

    def put(self, id, event_trigger):
        """Updates an existing event trigger.

        The exchange, topic and event can not be updated. The right way to
        change them is to delete the event trigger first, then create a new
        event trigger with new params.
        """
        acl.enforce('event_triggers:update', auth_ctx.ctx())

        values = event_trigger.to_dict()

        for field in UPDATE_NOT_ALLOWED:
            if values.get(field):
                raise exc.EventTriggerException(
                    "Can not update fields %s of event trigger." %
                    UPDATE_NOT_ALLOWED
                )

        LOG.debug('Update event trigger: [id=%s, values=%s]', id, values)

        @rest_utils.rest_retry_on_db_error
        def _update_event_trigger():
            with db_api.transaction():
                # ensure that event trigger exists
                db_api.get_event_trigger(id)

                return triggers.update_event_trigger(id, values)

        db_model = _update_event_trigger()

        return resources.EventTrigger.from_db_model(db_model)
开发者ID:openstack,项目名称:mistral,代码行数:31,代码来源:event_trigger.py


示例10: post

    def post(self, namespace=''):
        """Create a new workbook.

        :param namespace: Optional. The namespace to create the workbook
            in. Workbooks with the same name can be added to a given
            project if they are in two different namespaces.
        """
        acl.enforce('workbooks:create', context.ctx())

        definition = pecan.request.text
        scope = pecan.request.GET.get('scope', 'private')

        resources.Workbook.validate_scope(scope)

        LOG.debug("Create workbook [definition=%s]", definition)

        wb_db = rest_utils.rest_retry_on_db_error(
            workbooks.create_workbook_v2)(
            definition,
            namespace=namespace,
            scope=scope
        )

        pecan.response.status = 201

        return resources.Workbook.from_db_model(wb_db).to_json()
开发者ID:openstack,项目名称:mistral,代码行数:26,代码来源:workbook.py


示例11: post

    def post(self):
        """Create a new action.

        NOTE: This text is allowed to have definitions
            of multiple actions. In this case they all will be created.
        """
        acl.enforce('actions:create', context.ctx())

        definition = pecan.request.text
        scope = pecan.request.GET.get('scope', 'private')
        pecan.response.status = 201

        resources.Action.validate_scope(scope)
        if scope == 'public':
            acl.enforce('actions:publicize', context.ctx())

        LOG.debug("Create action(s) [definition=%s]", definition)

        @rest_utils.rest_retry_on_db_error
        def _create_action_definitions():
            with db_api.transaction():
                return actions.create_actions(definition, scope=scope)

        db_acts = _create_action_definitions()

        action_list = [
            resources.Action.from_db_model(db_act) for db_act in db_acts
        ]

        return resources.Actions(actions=action_list).to_json()
开发者ID:openstack,项目名称:mistral,代码行数:30,代码来源:action.py


示例12: post

    def post(self, event_trigger):
        """Creates a new event trigger."""
        acl.enforce('event_triggers:create', auth_ctx.ctx())

        values = event_trigger.to_dict()
        input_keys = [k for k in values if values[k]]

        if CREATE_MANDATORY - set(input_keys):
            raise exc.EventTriggerException(
                "Params %s must be provided for creating event trigger." %
                CREATE_MANDATORY
            )

        if values.get('scope') == 'public':
            acl.enforce('event_triggers:create:public', auth_ctx.ctx())

        LOG.debug('Create event trigger: %s', values)

        db_model = rest_utils.rest_retry_on_db_error(
            triggers.create_event_trigger
        )(
            name=values.get('name', ''),
            exchange=values.get('exchange'),
            topic=values.get('topic'),
            event=values.get('event'),
            workflow_id=values.get('workflow_id'),
            scope=values.get('scope'),
            workflow_input=values.get('workflow_input'),
            workflow_params=values.get('workflow_params'),
        )

        return resources.EventTrigger.from_db_model(db_model)
开发者ID:openstack,项目名称:mistral,代码行数:32,代码来源:event_trigger.py


示例13: post

    def post(self, action_ex):
        """Create new action_execution.

        :param action_ex: Action to execute
        """
        acl.enforce('action_executions:create', context.ctx())

        LOG.debug(
            "Create action_execution [action_execution=%s]",
            action_ex
        )

        name = action_ex.name
        description = action_ex.description or None
        action_input = action_ex.input or {}
        params = action_ex.params or {}

        if not name:
            raise exc.InputException(
                "Please provide at least action name to run action."
            )

        values = rpc.get_engine_client().start_action(
            name,
            action_input,
            description=description,
            **params
        )

        return resources.ActionExecution.from_dict(values)
开发者ID:openstack,项目名称:mistral,代码行数:30,代码来源:action_execution.py


示例14: post

    def post(self, namespace=''):
        """Create a new workflow.

        :param namespace: Optional. The namespace to create the workflow
            in. Workflows with the same name can be added to a given
            project if they are in two different namespaces.

        The text is allowed to have definitions of multiple workflows.
        In such case, they all will be created.
        """
        acl.enforce('workflows:create', context.ctx())

        definition = pecan.request.text
        scope = pecan.request.GET.get('scope', 'private')
        pecan.response.status = 201

        resources.Workflow.validate_scope(scope)
        if scope == 'public':
            acl.enforce('workflows:publicize', context.ctx())

        LOG.debug("Create workflow(s) [definition=%s]", definition)

        db_wfs = rest_utils.rest_retry_on_db_error(workflows.create_workflows)(
            definition,
            scope=scope,
            namespace=namespace
        )

        workflow_list = [
            resources.Workflow.from_db_model(db_wf) for db_wf in db_wfs
        ]

        return resources.Workflows(workflows=workflow_list).to_json()
开发者ID:openstack,项目名称:mistral,代码行数:33,代码来源:workflow.py


示例15: post

    def post(self):
        """Create a new workflow.

        NOTE: The text is allowed to have definitions
            of multiple workflows. In this case they all will be created.
        """
        acl.enforce('workflows:create', context.ctx())

        definition = pecan.request.text
        scope = pecan.request.GET.get('scope', 'private')
        pecan.response.status = 201

        if scope not in resources.SCOPE_TYPES.values:
            raise exc.InvalidModelException(
                "Scope must be one of the following: %s; actual: "
                "%s" % (resources.SCOPE_TYPES.values, scope)
            )

        LOG.info("Create workflow(s) [definition=%s]" % definition)

        db_wfs = workflows.create_workflows(definition, scope=scope)
        models_dicts = [db_wf.to_dict() for db_wf in db_wfs]

        workflow_list = [
            resources.Workflow.from_dict(wf) for wf in models_dicts
        ]

        return resources.Workflows(workflows=workflow_list).to_json()
开发者ID:luckysamadhiya93,项目名称:mistral,代码行数:28,代码来源:workflow.py


示例16: delete

    def delete(self, name):
        """Delete the named environment."""
        acl.enforce('environments:delete', context.ctx())

        LOG.info("Delete environment [name=%s]" % name)

        db_api.delete_environment(name)
开发者ID:luckysamadhiya93,项目名称:mistral,代码行数:7,代码来源:environment.py


示例17: get_all

    def get_all(self, marker=None, limit=None, sort_keys='created_at',
                sort_dirs='asc', fields='', name=None, description=None,
                variables=None, scope=None, created_at=None, updated_at=None):
        """Return all environments.

        Where project_id is the same as the requester or
        project_id is different but the scope is public.

        :param marker: Optional. Pagination marker for large data sets.
        :param limit: Optional. Maximum number of resources to return in a
                      single result. Default value is None for backward
                      compatibility.
        :param sort_keys: Optional. Columns to sort results by.
                          Default: created_at, which is backward compatible.
        :param sort_dirs: Optional. Directions to sort corresponding to
                          sort_keys, "asc" or "desc" can be chosen.
                          Default: desc. The length of sort_dirs can be equal
                          or less than that of sort_keys.
        :param fields: Optional. A specified list of fields of the resource to
                       be returned. 'id' will be included automatically in
                       fields if it's provided, since it will be used when
                       constructing 'next' link.
        :param name: Optional. Keep only resources with a specific name.
        :param description: Optional. Keep only resources with a specific
                            description.
        :param variables: Optional. Keep only resources with specific
                          variables.
        :param scope: Optional. Keep only resources with a specific scope.
        :param created_at: Optional. Keep only resources created at a specific
                           time and date.
        :param updated_at: Optional. Keep only resources with specific latest
                           update time and date.
        """
        acl.enforce('environments:list', context.ctx())

        filters = rest_utils.filters_to_dict(
            created_at=created_at,
            name=name,
            updated_at=updated_at,
            description=description,
            variables=variables,
            scope=scope
        )

        LOG.info("Fetch environments. marker=%s, limit=%s, sort_keys=%s, "
                 "sort_dirs=%s, filters=%s", marker, limit, sort_keys,
                 sort_dirs, filters)

        return rest_utils.get_all(
            resources.Environments,
            resources.Environment,
            db_api.get_environments,
            db_api.get_environment,
            marker=marker,
            limit=limit,
            sort_keys=sort_keys,
            sort_dirs=sort_dirs,
            fields=fields,
            **filters
        )
开发者ID:anilyadav,项目名称:mistral,代码行数:60,代码来源:environment.py


示例18: put

    def put(self, identifier=None):
        """Update one or more actions.

        NOTE: This text is allowed to have definitions
            of multiple actions. In this case they all will be updated.
        """
        acl.enforce('actions:update', context.ctx())
        definition = pecan.request.text
        LOG.info("Update action(s) [definition=%s]", definition)
        scope = pecan.request.GET.get('scope', 'private')

        if scope not in resources.SCOPE_TYPES.values:
            raise exc.InvalidModelException(
                "Scope must be one of the following: %s; actual: "
                "%s" % (resources.SCOPE_TYPES.values, scope)
            )

        with db_api.transaction():
            db_acts = actions.update_actions(
                definition,
                scope=scope,
                identifier=identifier
            )

        models_dicts = [db_act.to_dict() for db_act in db_acts]
        action_list = [resources.Action.from_dict(act) for act in models_dicts]

        return resources.Actions(actions=action_list).to_json()
开发者ID:Tesora,项目名称:tesora-mistral,代码行数:28,代码来源:action.py


示例19: post

    def post(self, cron_trigger):
        """Creates a new cron trigger.

        :param cron_trigger: Required. Cron trigger structure.
        """
        acl.enforce('cron_triggers:create', context.ctx())

        LOG.debug('Create cron trigger: %s', cron_trigger)

        values = cron_trigger.to_dict()

        db_model = rest_utils.rest_retry_on_db_error(
            triggers.create_cron_trigger
        )(
            name=values['name'],
            workflow_name=values.get('workflow_name'),
            workflow_input=values.get('workflow_input'),
            workflow_params=values.get('workflow_params'),
            pattern=values.get('pattern'),
            first_time=values.get('first_execution_time'),
            count=values.get('remaining_executions'),
            workflow_id=values.get('workflow_id')
        )

        return resources.CronTrigger.from_db_model(db_model)
开发者ID:openstack,项目名称:mistral,代码行数:25,代码来源:cron_trigger.py


示例20: put

    def put(self, identifier=None):
        """Update one or more workflows.

        :param identifier: Optional. If provided, it's UUID of a workflow.
            Only one workflow can be updated with identifier param.

        The text is allowed to have definitions of multiple workflows. In this
        case they all will be updated.
        """
        acl.enforce('workflows:update', context.ctx())
        definition = pecan.request.text
        scope = pecan.request.GET.get('scope', 'private')

        if scope not in SCOPE_TYPES.values:
            raise exc.InvalidModelException(
                "Scope must be one of the following: %s; actual: "
                "%s" % (SCOPE_TYPES.values, scope)
            )

        LOG.info("Update workflow(s) [definition=%s]" % definition)

        db_wfs = workflows.update_workflows(
            definition,
            scope=scope,
            identifier=identifier
        )

        models_dicts = [db_wf.to_dict() for db_wf in db_wfs]
        workflow_list = [Workflow.from_dict(wf) for wf in models_dicts]

        return (workflow_list[0].to_json() if identifier
                else Workflows(workflows=workflow_list).to_json())
开发者ID:PrinceKatiyar,项目名称:mistral,代码行数:32,代码来源:workflow.py



注:本文中的mistral.api.access_control.enforce函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python context.ctx函数代码示例发布时间:2022-05-27
下一篇:
Python helpers.params_from_request函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap