• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python base.model_query函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中mistral.db.sqlalchemy.base.model_query函数的典型用法代码示例。如果您正苦于以下问题:Python model_query函数的具体用法?Python model_query怎么用?Python model_query使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了model_query函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: update_delayed_call

def update_delayed_call(id, values, query_filter=None, session=None):
    if query_filter:
        try:
            specimen = models.DelayedCall(id=id, **query_filter)
            delayed_call = b.model_query(models.DelayedCall).update_on_match(
                specimen=specimen, surrogate_key="id", values=values
            )
            return delayed_call, 1

        except oslo_sqlalchemy.update_match.NoRowsMatched as e:
            LOG.debug(
                "No rows matched for update call [id=%s, values=%s, " "query_filter=%s," "exception=%s]",
                id,
                values,
                query_filter,
                e,
            )

            return None, 0

    else:
        delayed_call = get_delayed_call(id=id, session=session)
        delayed_call.update(values)

        return delayed_call, len(session.dirty)
开发者ID:dennybaa,项目名称:mistral,代码行数:25,代码来源:api.py


示例2: _get_collection

def _get_collection(model, insecure=False, limit=None, marker=None,
                    sort_keys=None, sort_dirs=None, fields=None, **filters):
    columns = (
        tuple([getattr(model, f) for f in fields if hasattr(model, f)])
        if fields else ()
    )

    query = (b.model_query(model, *columns) if insecure
             else _secure_query(model, *columns))
    query = db_filters.apply_filters(query, model, **filters)

    query = _paginate_query(
        model,
        limit,
        marker,
        sort_keys,
        sort_dirs,
        query
    )

    try:
        return query.all()
    except Exception as e:
        raise exc.DBQueryEntryError(
            "Failed when querying database, error type: %s, "
            "error message: %s" % (e.__class__.__name__, str(e))
        )
开发者ID:Tesora,项目名称:tesora-mistral,代码行数:27,代码来源:api.py


示例3: _secure_query

def _secure_query(model, *columns):
    query = b.model_query(model, columns)

    if issubclass(model, mb.MistralSecureModelBase):
        query = query.filter(sa.or_(model.project_id == security.get_project_id(), model.scope == "public"))

    return query
开发者ID:dennybaa,项目名称:mistral,代码行数:7,代码来源:api.py


示例4: get_delayed_calls_to_start

def get_delayed_calls_to_start(time, session=None):
    query = b.model_query(models.DelayedCall)

    query = query.filter(models.DelayedCall.execution_time < time)
    query = query.order_by(models.DelayedCall.execution_time)

    return query.all()
开发者ID:ainkov,项目名称:mistral,代码行数:7,代码来源:api.py


示例5: get_next_cron_triggers

def get_next_cron_triggers(time, session=None):
    query = b.model_query(models.CronTrigger)

    query = query.filter(models.CronTrigger.next_execution_time < time)
    query = query.order_by(models.CronTrigger.next_execution_time)

    return query.all()
开发者ID:anilyadav,项目名称:mistral,代码行数:7,代码来源:api.py


示例6: _secure_query

def _secure_query(model, *columns):
    query = b.model_query(model, columns)

    if not issubclass(model, mb.MistralSecureModelBase):
        return query

    shared_res_ids = []
    res_type = RESOURCE_MAPPING.get(model, '')

    if res_type:
        shared_res = _get_accepted_resources(res_type)
        shared_res_ids = [res.resource_id for res in shared_res]

    query_criterion = sa.or_(
        model.project_id == security.get_project_id(),
        model.scope == 'public'
    )

    # NOTE(kong): Include IN_ predicate in query filter only if shared_res_ids
    # is not empty to avoid sqlalchemy SAWarning and wasting a db call.
    if shared_res_ids:
        query_criterion = sa.or_(
            query_criterion,
            model.id.in_(shared_res_ids)
        )

    query = query.filter(query_criterion)

    return query
开发者ID:anilyadav,项目名称:mistral,代码行数:29,代码来源:api.py


示例7: update_scheduled_job

def update_scheduled_job(id, values, query_filter=None, session=None):
    if query_filter:
        try:
            specimen = models.ScheduledJob(id=id, **query_filter)

            job = b.model_query(
                models.ScheduledJob
            ).update_on_match(
                specimen=specimen,
                surrogate_key='id',
                values=values
            )

            return job, 1

        except oslo_sqlalchemy.update_match.NoRowsMatched as e:
            LOG.debug(
                "No rows matched for update scheduled job [id=%s, values=%s, "
                "query_filter=%s,"
                "exception=%s]", id, values, query_filter, e
            )

            return None, 0

    else:
        job = get_scheduled_job(id=id, session=session)

        job.update(values)

        return job, len(session.dirty)
开发者ID:openstack,项目名称:mistral,代码行数:30,代码来源:api.py


示例8: get_scheduled_jobs_to_start

def get_scheduled_jobs_to_start(time, batch_size=None, session=None):
    query = b.model_query(models.ScheduledJob)

    execute_at_col = models.ScheduledJob.execute_at
    captured_at_col = models.ScheduledJob.captured_at

    # Filter by execution time accounting for a configured job pickup interval.
    query = query.filter(
        execute_at_col <
        time - datetime.timedelta(seconds=CONF.scheduler.pickup_job_after)
    )

    # Filter by captured time accounting for a configured captured job timeout.
    min_captured_at = (
        utils.utc_now_sec() -
        datetime.timedelta(seconds=CONF.scheduler.captured_job_timeout)
    )

    query = query.filter(
        sa.or_(
            captured_at_col == sa.null(),
            captured_at_col <= min_captured_at
        )
    )

    query = query.order_by(execute_at_col)
    query = query.limit(batch_size)

    return query.all()
开发者ID:openstack,项目名称:mistral,代码行数:29,代码来源:api.py


示例9: update_cron_trigger

def update_cron_trigger(name, values, session=None, query_filter=None):
    cron_trigger = get_cron_trigger(name)

    if query_filter:
        try:
            # Execute the UPDATE statement with the query_filter as the WHERE.
            specimen = models.CronTrigger(id=cron_trigger.id, **query_filter)

            query = b.model_query(models.CronTrigger)

            cron_trigger = query.update_on_match(
                specimen=specimen,
                surrogate_key='id',
                values=values
            )

            return cron_trigger, 1

        except oslo_sqlalchemy.update_match.NoRowsMatched:
            LOG.debug(
                "No rows matched for cron update call"
                "[id=%s, values=%s, query_filter=%s", id, values, query_filter
            )

            return cron_trigger, 0

    else:
        cron_trigger.update(values.copy())

        return cron_trigger, len(session.dirty)
开发者ID:anilyadav,项目名称:mistral,代码行数:30,代码来源:api.py


示例10: _get_db_object_by_id

def _get_db_object_by_id(model, id, insecure=False, columns=()):
    query = (
        b.model_query(model, columns=columns)
        if insecure
        else _secure_query(model, *columns)
    )

    return query.filter_by(id=id).first()
开发者ID:openstack,项目名称:mistral,代码行数:8,代码来源:api.py


示例11: _get_collection_sorted_by_name

def _get_collection_sorted_by_name(model, **kwargs):
    # Note(lane): Sometimes tenant_A needs to get resources of tenant_B,
    # especially in resource sharing scenario, the resource owner needs to
    # check if the resource is used by a member.
    query = (b.model_query(model) if 'project_id' in kwargs
             else _secure_query(model))

    return query.filter_by(**kwargs).order_by(model.name).all()
开发者ID:gs-spadmanabhan,项目名称:mistral,代码行数:8,代码来源:api.py


示例12: get_delayed_calls_to_start

def get_delayed_calls_to_start(time, batch_size=None, session=None):
    query = b.model_query(models.DelayedCall)

    query = query.filter(models.DelayedCall.execution_time < time)
    query = query.filter_by(processing=False)
    query = query.order_by(models.DelayedCall.execution_time)
    query = query.limit(batch_size)

    return query.all()
开发者ID:openstack,项目名称:mistral,代码行数:9,代码来源:api.py


示例13: _get_cron_triggers

def _get_cron_triggers(*columns, **kwargs):
    query = b.model_query(models.CronTrigger)

    return _get_collection(
        models.CronTrigger,
        query=query,
        *columns,
        **kwargs
    )
开发者ID:PrinceKatiyar,项目名称:mistral,代码行数:9,代码来源:api.py


示例14: get_expired_executions

def get_expired_executions(time, session=None):
    query = b.model_query(models.WorkflowExecution)

    # Only WorkflowExecution that are not a child of other WorkflowExecution.
    query = query.filter(models.WorkflowExecution.task_execution_id == sa.null())
    query = query.filter(models.WorkflowExecution.updated_at < time)
    query = query.filter(sa.or_(models.WorkflowExecution.state == "SUCCESS", models.WorkflowExecution.state == "ERROR"))

    return query.all()
开发者ID:dennybaa,项目名称:mistral,代码行数:9,代码来源:api.py


示例15: delete_event_trigger

def delete_event_trigger(id, session=None):
    # It's safe to use insecure query here because users can't access
    # delayed calls.
    count = b.model_query(models.EventTrigger).filter(
        models.EventTrigger.id == id).delete()

    if count == 0:
        raise exc.DBEntityNotFoundError(
            "Event trigger not found [id=%s]." % id
        )
开发者ID:openstack,项目名称:mistral,代码行数:10,代码来源:api.py


示例16: delete_scheduled_job

def delete_scheduled_job(id, session=None):
    # It's safe to use insecure query here because users can't access
    # scheduled job.
    count = b.model_query(models.ScheduledJob).filter(
        models.ScheduledJob.id == id).delete()

    if count == 0:
        raise exc.DBEntityNotFoundError(
            "Scheduled job not found [id=%s]" % id
        )
开发者ID:openstack,项目名称:mistral,代码行数:10,代码来源:api.py


示例17: delete_delayed_call

def delete_delayed_call(id, session=None):
    # It's safe to use insecure query here because users can't access
    # delayed calls.
    count = b.model_query(models.DelayedCall).filter(
        models.DelayedCall.id == id).delete()

    if count == 0:
        raise exc.DBEntityNotFoundError(
            "Delayed Call not found [id=%s]" % id
        )
开发者ID:openstack,项目名称:mistral,代码行数:10,代码来源:api.py


示例18: delete_workflow_execution

def delete_workflow_execution(id, session=None):
    model = models.WorkflowExecution
    insecure = context.ctx().is_admin
    query = b.model_query(model) if insecure else _secure_query(model)

    count = query.filter(
        models.WorkflowExecution.id == id).delete()

    if count == 0:
        raise exc.DBEntityNotFoundError(
            "WorkflowExecution not found [id=%s]" % id
        )
开发者ID:openstack,项目名称:mistral,代码行数:12,代码来源:api.py


示例19: _get_associated_cron_triggers

def _get_associated_cron_triggers(wf_identifier):
    criterion = (
        {'workflow_id': wf_identifier}
        if uuidutils.is_uuid_like(wf_identifier)
        else {'workflow_name': wf_identifier}
    )

    cron_triggers = b.model_query(
        models.CronTrigger,
        [models.CronTrigger.name]
    ).filter_by(**criterion).all()

    return [t[0] for t in cron_triggers]
开发者ID:PrinceKatiyar,项目名称:mistral,代码行数:13,代码来源:api.py


示例20: _get_completed_task_executions_query

def _get_completed_task_executions_query(kwargs):
    query = b.model_query(models.TaskExecution)

    query = query.filter_by(**kwargs)

    query = query.filter(
        sa.or_(
            models.TaskExecution.state == states.ERROR,
            models.TaskExecution.state == states.CANCELLED,
            models.TaskExecution.state == states.SUCCESS
        )
    )

    return query
开发者ID:Tesora,项目名称:tesora-mistral,代码行数:14,代码来源:api.py



注:本文中的mistral.db.sqlalchemy.base.model_query函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python api.get_action_executions函数代码示例发布时间:2022-05-27
下一篇:
Python api.tasks_get函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap