• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python db.query函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中nailgun.db.db.query函数的典型用法代码示例。如果您正苦于以下问题:Python query函数的具体用法?Python query怎么用?Python query使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了query函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: POST

    def POST(self, cluster_id):
        cluster = self.get_object_or_404(objects.Cluster, cluster_id)
        data = self.checked_data()
        node_id = data["node_id"]
        node = self.get_object_or_404(objects.Node, node_id)

        netgroups_mapping = self.get_netgroups_map(node.cluster, cluster)

        orig_roles = node.roles

        objects.Node.update_roles(node, [])  # flush
        objects.Node.update_pending_roles(node, [])  # flush

        node.replaced_deployment_info = []
        node.deployment_info = []
        node.kernel_params = None
        node.cluster_id = cluster.id
        node.group_id = None

        objects.Node.assign_group(node)  # flush
        objects.Node.update_pending_roles(node, orig_roles)  # flush

        for ip in node.ip_addrs:
            ip.network = netgroups_mapping[ip.network]

        nic_assignments = db.query(models.NetworkNICAssignment).\
            join(models.NodeNICInterface).\
            filter(models.NodeNICInterface.node_id == node.id).\
            all()
        for nic_assignment in nic_assignments:
            nic_assignment.network_id = \
                netgroups_mapping[nic_assignment.network_id]

        bond_assignments = db.query(models.NetworkBondAssignment).\
            join(models.NodeBondInterface).\
            filter(models.NodeBondInterface.node_id == node.id).\
            all()
        for bond_assignment in bond_assignments:
            bond_assignment.network_id = \
                netgroups_mapping[bond_assignment.network_id]

        objects.Node.add_pending_change(node,
                                        consts.CLUSTER_CHANGES.interfaces)

        node.pending_addition = True
        node.pending_deletion = False

        task = models.Task(name=consts.TASK_NAMES.node_deletion,
                           cluster=cluster)

        db.commit()

        self.delete_node_by_astute(task, node)
开发者ID:zhadaevfm,项目名称:octane,代码行数:53,代码来源:handlers.py


示例2: test_fails_if_there_is_task

    def test_fails_if_there_is_task(self):
        for task_name in DeploymentCheckMixin.deployment_tasks:
            task = models.Task(name=task_name, cluster_id=self.cluster.id)
            db.add(task)
            db.flush()
            self.assertRaisesWithMessage(
                errors.DeploymentAlreadyStarted,
                'Cannot perform the actions because there are '
                'running tasks {0}'.format([task]),
                DeploymentCheckMixin.check_no_running_deployment,
                self.cluster)

            db.query(models.Task).delete()
开发者ID:SmartInfrastructures,项目名称:fuel-web-dev,代码行数:13,代码来源:test_task_managers.py


示例3: validate_collection_update

 def validate_collection_update(cls, data, cluster_id=None):
     data = cls.validate_json(data)
     cls.validate_schema(data, assignment_format_schema)
     dict_data = dict((d["id"], d["roles"]) for d in data)
     received_node_ids = dict_data.keys()
     nodes = db.query(Node).filter(Node.id.in_(received_node_ids))
     cls.check_all_nodes(nodes, received_node_ids)
     cls.check_if_already_done(nodes)
     release = db.query(Cluster).get(cluster_id).release
     for node_id in received_node_ids:
         cls.validate_roles(
             release,
             dict_data[node_id]
         )
     return dict_data
开发者ID:tsipa,项目名称:fuel-web,代码行数:15,代码来源:assignment.py


示例4: check_no_running_deployment

    def check_no_running_deployment(cls, cluster):
        tasks_q = objects.TaskCollection.get_by_name_and_cluster(
            cluster, cls.deployment_tasks).filter_by(
                status=consts.TASK_STATUSES.running)

        tasks_exists = db.query(tasks_q.exists()).scalar()
        if tasks_exists:
            raise errors.DeploymentAlreadyStarted(
                'Cannot perform the actions because there are '
                'running tasks {0}'.format(tasks_q.all()))
开发者ID:ekorekin,项目名称:fuel-web,代码行数:10,代码来源:manager.py


示例5: _validate_nodes

    def _validate_nodes(cls, new_node_ids, instance):
        set_new_node_ids = set(new_node_ids)
        set_old_node_ids = set(objects.Cluster.get_nodes_ids(instance))
        nodes_to_add = set_new_node_ids - set_old_node_ids
        nodes_to_remove = set_old_node_ids - set_new_node_ids

        hostnames_to_add = [x[0] for x in db.query(Node.hostname)
                            .filter(Node.id.in_(nodes_to_add)).all()]

        duplicated = [x[0] for x in db.query(Node.hostname).filter(
            sa.and_(
                Node.hostname.in_(hostnames_to_add),
                Node.cluster_id == instance.id,
                Node.id.notin_(nodes_to_remove)
            )
        ).all()]
        if duplicated:
            raise errors.AlreadyExists(
                "Nodes with hostnames [{0}] already exist in cluster {1}."
                .format(",".join(duplicated), instance.id)
            )
开发者ID:mba811,项目名称:fuel-web,代码行数:21,代码来源:cluster.py


示例6: test_does_not_fail_if_there_is_deleted_task

    def test_does_not_fail_if_there_is_deleted_task(self):
        for task_name in DeploymentCheckMixin.deployment_tasks:
            task = models.Task(name=task_name,
                               deleted_at=datetime.datetime.now(),
                               cluster_id=self.cluster.id)
            db.add(task)
            db.flush()
            self.addCleanup(db.query(models.Task).delete)

            self.assertNotRaises(
                errors.DeploymentAlreadyStarted,
                DeploymentCheckMixin.check_no_running_deployment,
                self.cluster)
开发者ID:ekorekin,项目名称:fuel-web,代码行数:13,代码来源:test_task_managers.py


示例7: get_assigned_vips_for_controller_group

    def get_assigned_vips_for_controller_group(cls, cluster):
        """Get VIPs assigned in specified cluster's controller node group

        :param cluster: Cluster object
        :type cluster: Cluster model
        :returns: VIPs for given cluster
        """
        node_group_id = Cluster.get_controllers_group_id(cluster)
        cluster_vips = db.query(models.IPAddr).join(
            models.IPAddr.network_data).filter(
                models.IPAddr.vip_name.isnot(None) &
                (models.NetworkGroup.group_id == node_group_id))
        return cluster_vips
开发者ID:ekorekin,项目名称:fuel-web,代码行数:13,代码来源:ip_addr.py


示例8: validate_collection_update

 def validate_collection_update(cls, data, cluster_id=None):
     list_data = cls.validate_json(data)
     cls.validate_schema(list_data, unassignment_format_schema)
     node_ids_set = set(n['id'] for n in list_data)
     nodes = db.query(Node).filter(Node.id.in_(node_ids_set))
     node_id_cluster_map = dict(
         (n.id, n.cluster_id) for n in
         db.query(Node.id, Node.cluster_id).filter(
             Node.id.in_(node_ids_set)))
     other_cluster_ids_set = set(node_id_cluster_map.values()) - \
         set((int(cluster_id),))
     if other_cluster_ids_set:
         raise errors.InvalidData(
             u"Nodes [{0}] are not members of environment {1}."
             .format(
                 u", ".join(
                     str(n_id) for n_id, c_id in
                     node_id_cluster_map.iteritems()
                     if c_id in other_cluster_ids_set
                 ), cluster_id), log_message=True
         )
     cls.check_all_nodes(nodes, node_ids_set)
     return nodes
开发者ID:openstack,项目名称:fuel-web,代码行数:23,代码来源:assignment.py


示例9: _get_admin_node_network

 def _get_admin_node_network(cls, node_id):
     net = cls.get_admin_network_group()
     net_cidr = IPNetwork(net.cidr)
     node = db.query(Node).get(node_id)
     ip_addr = cls.get_admin_ip_for_node(node)
     return {
         'name': net.name,
         'vlan': net.vlan_start,
         'ip': "{0}/{1}".format(ip_addr, net_cidr.prefixlen),
         'netmask': str(net_cidr.netmask),
         'brd': str(net_cidr.broadcast),
         'gateway': net.gateway,
         'dev': node.admin_interface.name
     }
开发者ID:stamak,项目名称:fuel-web,代码行数:14,代码来源:manager.py


示例10: check_unique_hostnames

 def check_unique_hostnames(cls, nodes, cluster_id):
     hostnames = [node.hostname for node in nodes]
     conflicting_hostnames = [x[0] for x in db.query(
         Node.hostname).filter(
             sa.and_(
                 Node.hostname.in_(hostnames),
                 Node.cluster_id == cluster_id,
             )
         ).all()]
     if conflicting_hostnames:
         raise errors.AlreadyExists(
             "Nodes with hostnames [{0}] already exist in cluster {1}."
             .format(",".join(conflicting_hostnames), cluster_id)
         )
开发者ID:SmartInfrastructures,项目名称:fuel-web-dev,代码行数:14,代码来源:assignment.py


示例11: get_objects_list_or_404

    def get_objects_list_or_404(self, model, ids):
        """Get list of objects

        :param model: model object
        :param ids: list of ids

        :raises: web.notfound
        :returns: query object
        """
        node_query = db.query(model).filter(model.id.in_(ids))
        objects_count = node_query.count()

        if len(set(ids)) != objects_count:
            raise web.notfound('{0} not found'.format(model.__name__))

        return node_query
开发者ID:kdemina,项目名称:fuel-web,代码行数:16,代码来源:base.py


示例12: validate_collection_update

    def validate_collection_update(cls, data, cluster_id=None):
        data = cls.validate_json(data)
        cls.validate_schema(data, assignment_format_schema)
        dict_data = dict((d["id"], d["roles"]) for d in data)
        received_node_ids = dict_data.keys()
        nodes = db.query(Node).filter(Node.id.in_(received_node_ids))
        cls.check_all_nodes(nodes, received_node_ids)
        cls.check_if_already_done(nodes)
        cluster = objects.Cluster.get_by_uid(
            cluster_id, fail_if_not_found=True
        )

        for node_id in received_node_ids:
            cls.validate_roles(
                cluster,
                dict_data[node_id]
            )
        return dict_data
开发者ID:nebril,项目名称:fuel-web,代码行数:18,代码来源:assignment.py


示例13: get_for_model

    def get_for_model(cls, instance):
        """Get deployment graphs related to given model.

        :param instance: model that could have relation to graph
        :type instance: models.Plugin|models.Cluster|models.Release|
        :return: graph instance
        :rtype: model.DeploymentGraph
        """
        association_model = cls.single.get_association_for_model(instance)
        graphs = db.query(
            models.DeploymentGraph
        ).join(
            association_model
        ).join(
            instance.__class__
        ).filter(
            instance.__class__.id == instance.id
        )
        return graphs.all()
开发者ID:jiaolongsun,项目名称:fuel-web,代码行数:19,代码来源:deployment_graph.py


示例14: prepare_data

    def prepare_data(cls, data):
        """Prepares input data.

        Filter input data based on the fact that
        updating parameters of the fuel admin network
        is forbidden for default node group.

        Admin network cannot be updated because of:
        - sharing itself between environments;
        - having no mechanism to change its parameters on deployed Master node.
        """
        if data.get("networks"):
            default_admin = db.query(
                NetworkGroup).filter_by(group_id=None).first()
            data["networks"] = [
                n for n in data["networks"]
                if n.get("id") != default_admin.id
            ]

        return data
开发者ID:ibelikov,项目名称:fuel-web,代码行数:20,代码来源:network.py


示例15: _validate_unique_name

    def _validate_unique_name(cls, data, *filters):
        """Validate node group name to be unique.

        Validate whether node group name is unique for specific
        environment. Prevent to have duplicated node group names for
        the same environment.

        :param data: data which contains node group name and cluster_id.
        :param filters: additional filters for the query which is
                        used in the method for validation.
        :type data: dict
        :type filters: list
        :returns: None
        """
        nodegroup_query = objects.NodeGroupCollection.filter_by(None, name=data["name"], cluster_id=data["cluster_id"])
        if filters:
            nodegroup_query = nodegroup_query.filter(*filters)
        nodegroup_exists = db.query(nodegroup_query.exists()).scalar()
        if nodegroup_exists:
            raise errors.NotAllowed(
                "Node group '{0}' already exists " "in environment {1}.".format(data["name"], data["cluster_id"])
            )
开发者ID:mba811,项目名称:fuel-web,代码行数:22,代码来源:node_group.py


示例16: copy_vips

    def copy_vips(orig_cluster, new_cluster):
        orig_vips = {}
        for ng in orig_cluster.network_groups:
            vips = db.query(models.IPAddr).filter(
                models.IPAddr.network == ng.id,
                models.IPAddr.node.is_(None),
                models.IPAddr.vip_type.isnot(None),
            ).all()
            orig_vips[ng.name] = list(vips)

        new_vips = []
        for ng in new_cluster.network_groups:
            orig_ng_vips = orig_vips.get(ng.name)
            for vip in orig_ng_vips:
                ip_addr = models.IPAddr(
                    network=ng.id,
                    ip_addr=vip.ip_addr,
                    vip_type=vip.vip_type,
                )
                new_vips.append(ip_addr)
        db.add_all(new_vips)
        db.commit()
开发者ID:zhadaevfm,项目名称:octane,代码行数:22,代码来源:handlers.py


示例17: test_upgrade_node_error_msg_to_allow_long_error_msg

    def test_upgrade_node_error_msg_to_allow_long_error_msg(self):
        nodes = self.meta.tables['nodes']
        self.assertIsInstance(nodes.columns['error_msg'].type, sa.Text)

        node_uuid = '26b508d0-0d76-4159-bce9-f67ec2765480'
        long_error_msg = ''.join('a' for i in range(500))

        db.execute(
            nodes.insert(),
            [{
                'uuid': node_uuid,
                'cluster_id': None,
                'group_id': None,
                'status': 'discover',
                'meta': '{}',
                'mac': 'aa:aa:aa:aa:aa:aa',
                'error_msg': long_error_msg,
                'timestamp': datetime.datetime.utcnow(),
            }]
        )

        node = db.query(nodes).filter_by(uuid=node_uuid).first()
        self.assertEqual(long_error_msg, node.error_msg)
开发者ID:huyupeng,项目名称:fuel-web,代码行数:23,代码来源:test_migration_fuel_9_0_1.py


示例18: get_intersecting_ip

 def get_intersecting_ip(cls, instance, addr):
     """Get ip that intersects by ip_addr with given."""
     return db.query(cls.model).filter(
         cls.model.ip_addr == addr,
         cls.model.id != instance.id
     ).first()
开发者ID:dnikishov,项目名称:fuel-web,代码行数:6,代码来源:ip_addr.py


示例19: delete_by_network

 def delete_by_network(cls, ip, network):
     db.query(models.IPAddr).filter(
         models.IPAddr.ip_addr == ip,
         models.IPAddr.network == network
     ).delete(synchronize_session='fetch')
开发者ID:ekorekin,项目名称:fuel-web,代码行数:5,代码来源:ip_addr.py


示例20: _query_cluster_relations

 def _query_cluster_relations(cluster_id):
     return db.query(models.UpgradeRelation).filter(
         (models.UpgradeRelation.orig_cluster_id == cluster_id) |
         (models.UpgradeRelation.seed_cluster_id == cluster_id))
开发者ID:SmartInfrastructures,项目名称:fuel-web-dev,代码行数:4,代码来源:relations.py



注:本文中的nailgun.db.db.query函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python models.Task类代码示例发布时间:2022-05-27
下一篇:
Python db.execute函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap