• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python orm.join函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.orm.join函数的典型用法代码示例。如果您正苦于以下问题:Python join函数的具体用法?Python join怎么用?Python join使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了join函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: dataXml

 def dataXml(self,**kw):
     allOpen = None
     allClosed = None
     openByParent = None
     try:
         allOpen = DBSession().query(Request).select_from(join(Request, RequestItem)).filter(RequestItem.request_item_dispense_date==None).order_by(Request.request_id).all() 
         allClosed = DBSession().query(Request).select_from(join(Request, RequestItem)).filter(RequestItem.request_item_dispense_date > (datetime.now()-timedelta(days=1))).order_by(Request.request_id).all() 
         openByParent = DBSession().query(Request).select_from(join(Request, RequestItem)).filter(Request.requested_by_id == request.identity['user'].user_id).filter(RequestItem.request_item_dispense_date == None).order_by(Request.request_id).all()
     except Exception,e:
         log.exception(e)
开发者ID:aytsai,项目名称:ricebowl,代码行数:10,代码来源:grids.py


示例2: __init__

    def __init__(self, graph_view):
        from os.path import dirname, join
        from jinja2 import Template

        templates_dir = join(dirname(dirname(__file__)), "templates")
        with open(join(templates_dir, "idea_in_synthesis.jinja2")) as f:
            self.idea_template = Template(f.read())
        with open(join(templates_dir, "synthesis.jinja2")) as f:
            self.synthesis_template = Template(f.read())
        self.graph_view = graph_view
开发者ID:rmoorman,项目名称:assembl,代码行数:10,代码来源:idea_graph_view.py


示例3: getDiagnoseByAdmin2

    def getDiagnoseByAdmin2(cls,session,hostpitalList=None,doctorName=None,pagger=Pagger(1,20) ):
        if (doctorName is None or doctorName == u'')and hostpitalList is None:
            return session.query(Diagnose).filter(Diagnose.status==DiagnoseStatus.NeedTriage).offset(pagger.getOffset()).limit(pagger.getLimitCount()).all()
        if doctorName is None or doctorName == u'':
            return session.query(Diagnose).filter(Diagnose.hospitalId.in_(hostpitalList),Diagnose.status==DiagnoseStatus.NeedTriage).offset(pagger.getOffset()).limit(pagger.getLimitCount()).all()

        if hostpitalList:
            query=session.query(Diagnose).select_from(join(Doctor,Diagnose,Doctor.id==Diagnose.doctorId))\
            .filter(Doctor.username==doctorName,Diagnose.status==DiagnoseStatus.NeedTriage,Diagnose.hospitalId.in_(hostpitalList)).offset(pagger.getOffset()).limit(pagger.getLimitCount())
        else:
            query=session.query(Diagnose).select_from(join(Doctor,Diagnose,Doctor.id==Diagnose.doctorId)) \
                .filter(Doctor.username==doctorName,Diagnose.status==DiagnoseStatus.NeedTriage).offset(pagger.getOffset()).limit(pagger.getLimitCount())
        return query.all()
开发者ID:LichuanLu,项目名称:blueberry,代码行数:13,代码来源:diagnoseDocument.py


示例4: getDiagnoseByPatientUser

    def getDiagnoseByPatientUser(cls,session,userId,status=None,pagger=Pagger(1,20) ):
        if userId is None :
            return
        query=None
        if status is None or len(status) == 0:
            query=session.query(Diagnose).select_from(join(Patient,Diagnose,Patient.id==Diagnose.patientId)) \
                .filter(Patient.userID==userId,Diagnose.status!=DiagnoseStatus.Del).offset(pagger.getOffset()).limit(pagger.getLimitCount())

        else:
            query=session.query(Diagnose).select_from(join(Patient,Diagnose,Patient.id==Diagnose.patientId)) \
                .filter(Patient.userID==userId,Diagnose.status==status).offset(pagger.getOffset()).limit(pagger.getLimitCount())


        return query.all()
开发者ID:LichuanLu,项目名称:blueberry,代码行数:14,代码来源:diagnoseDocument.py


示例5: test_create_hosts

def test_create_hosts():
    br = Branch.get_unique(sess, 'ny-prod', compel=True)
    dns_dmn = DnsDomain.get_unique(sess, 'one-nyp.ms.com', compel=True)
    stat = Status.get_unique(sess, 'build', compel=True)
    os = sess.query(OperatingSystem).filter(Archetype.name == 'vmhost').first()
    assert os, 'No OS in %s' % func_name()

    pers = sess.query(Personality).select_from(
        join(Personality, Archetype)).filter(
        and_(Archetype.name=='vmhost', Personality.name=='generic')).one()

    sess.autoflush=False

    for i in xrange(NUM_HOSTS):
        machine = m_factory.next()
        vm_host = Host(machine=machine, name='%s%s' % (HOST_NAME, i),
                       dns_domain=dns_dmn, branch=br, personality=pers,
                       status=stat, operating_system=os)
        add(sess, vm_host)

    sess.autoflush=True

    commit(sess)

    hosts = sess.query(Host).filter(
        Host.name.like(HOST_NAME+'%')).all()
    assert len(hosts) is NUM_HOSTS
    print 'created %s hosts'% len(hosts)
开发者ID:jrha,项目名称:aquilon,代码行数:28,代码来源:test_cluster.py


示例6: test_host_in_two_clusters

def test_host_in_two_clusters():
    """
        create 2 new clusters and add a host to both. check Host.cluster.
    """
    per = sess.query(Personality).select_from(
            join(Archetype, Personality)).filter(
            and_(Archetype.name=='windows', Personality.name=='generic')).one()

    for i in xrange(3):
        ec = EsxCluster(name='%s%s'% (CLUSTER_NAME, i), personality=per)
        add(sess, ec)
    commit(sess)

    c1 = sess.query(EsxCluster).filter_by(name='%s1' % (CLUSTER_NAME)).one()
    c2 = sess.query(EsxCluster).filter_by(name='%s2' % (CLUSTER_NAME)).one()

    assert c1
    assert c2
    print 'clusters in host in 2 cluster test are %s and %s'% (c1, c2)

    host = h_factory.next()


    sess.autoflush = False
    hcm1 = HostClusterMember(host=host, cluster=c1)
    create(sess, hcm1)
    assert host in c1.hosts
    print 'c1 hosts are %s'% (c1.hosts)

    c2.hosts.append(host)
    sess.autoflush = True
    commit(sess)
开发者ID:jrha,项目名称:aquilon,代码行数:32,代码来源:test_cluster.py


示例7: create_link_xml

def create_link_xml(xml_topology, network_name):
    #search link
    subq = join(InterFace, Node, InterFace.idNode == Node.id) \
            .select(Node.network_name == network_name)
    #To specify the column to get.
    subq =  subq.with_only_columns([InterFace.id])
 
    #find a link connection source I/F in the network.
    link_list = Link.query.filter(Link.src_idIF.in_(subq)).group_by(Link.id).all()
    for link in link_list:
        if link.src_if is None or link.dst_if is None:
            logger.warn('{0} is invalid link.(src_if:{1}-dst_if{2})'\
                        .format(link.id,link.src_idIF,link.dst_idIF))
            continue
            
        #add <link type='lan'>
        xml_link = SubElement(xml_topology, const.XML_TAG_LINK,
                             {const.XML_ATTR_TYPE:const.TYPE_LINK_LAN})
     
        #add (source I/F) <interface_ref client_id='xxx'>
        SubElement(xml_link, const.XML_TAG_IF_REF,
                             {const.XML_ATTR_CLIENT_ID:link.src_if.if_name})
        #add (destination I/F) <interface_ref client_id='xxx'>
        SubElement(xml_link, const.XML_TAG_IF_REF,
                             {const.XML_ATTR_CLIENT_ID:link.dst_if.if_name})
        
    return xml_topology
开发者ID:HalasNet,项目名称:felix,代码行数:27,代码来源:topology_rest.py


示例8: viewDataTable

def viewDataTable(data_table_id):
    #1. get data_table
    data_table = DataTable.query.filter_by(data_table_id=data_table_id).first()
    data_table_data_source_id = data_table.data_table_data_source_id
    title=data_table.data_table_name
    caption = data_table.data_table_description
    notes = "Notes"
    

    #1a. Get tags
    #tags = models.db.session.query(Tag, TagMap).filter_by(TagMap.data_table_id=data_table_id).all()
    tags = models.db.session.query(Tag).select_from(join(Tag, TagMap)).filter(TagMap.data_table_id==data_table_id).all()
    #models.db.session.query(Tag, TagMap).filter(Tag.tag_id==TagMap.tag_id).filter(TagMap.data_table_id=='[email protected]').all()

    #2. get data_source
    data_source = DataSource.query.filter_by(data_source_id=data_table_data_source_id).first()
    data_source_owner_user_id = data_source.data_source_owner_user_id
    data_owner =Users.query.filter_by(user_id=data_source_owner_user_id).first()
    #3. get data_columns
    data_columns = DataColumn.query.filter_by(data_column_data_table_id=data_table_id)
    no_of_data_columns = data_columns.count()
    #4. get values_data_table_<data_table_id>
    sql="select * from values_data_table_"+str(data_table_id)
    values_data_table = models.db.session.execute(sql)



    return render_template('community/view_data_table.html',title=title, caption=caption, notes=notes, values_data_table=values_data_table, data_table=data_table,data_source=data_source, data_columns=data_columns,no_of_data_columns=no_of_data_columns,data_owner=data_owner,explore_tab="active",data_table_id=data_table_id,tags=tags)
开发者ID:arghyam,项目名称:dataview,代码行数:28,代码来源:community.py


示例9: test_valid_activation

    def test_valid_activation(self):
        """Test that the ``activate`` view properly handles a valid activation
        (in this case, based on the default backend's activation window).
        """
        success_redirect = self.url_reverse("registration_activation_complete")

        # First, register an account.
        self.client.post(
            reverse("registration_register"),
            data={
                "username": "alice",
                "email": "[email protected]",
                "password1": "swordfish",
                "password2": "swordfish",
            },
        )
        profile = (
            self.session.query(RegistrationProfile)
            .select_from(join(RegistrationProfile, User))
            .filter(User.username == u"alice")
            .first()
        )
        self.assertIsNotNone(profile)
        path = reverse("registration_activate", kwargs={"activation_key": profile.activation_key})
        response = self.client.get(path)
        self.assertRedirects(response, success_redirect)
        alice = self.session.query(User).filter_by(username=u"alice").first()
        self.assertTrue(alice.is_active)
开发者ID:saebyn,项目名称:baph,代码行数:28,代码来源:test_views.py


示例10: search_index

    def search_index(self):
        parent_id = self.params.get("parent_id", None)
        # Setup Paging
        c.count = 0
        c.resources = None
        limit, offset = self._paging()

        # Build Query
        q = meta.Session.query(self._poly_class_)
        if parent_id:
            q = q.select_from(join(Connection, Resource, Connection.child_id == Resource.id)).filter(
                Connection.parent_id == parent_id
            )
        if self._classname() != "resources":
            q = q.filter(Resource.resource_type == self._classname())
        for k, v in self.params.iteritems():
            if hasattr(self._poly_class_, k):
                q = q.filter(getattr(self._poly_class_, k) == v)
        c.count = q.count()
        if limit:
            q = q.limit(limit)
            if offset:
                q = q.offset(offset)
        c.resources = q.all()
        return c.resources
开发者ID:HyFrmn,项目名称:vault,代码行数:25,代码来源:resources.py


示例11: isInRole2

def isInRole2(username, rolename):
   if rolename == None:
      print "ERROR: no rolename specified"
      return False
   if rolename == '':
      print "ERROR: no rolename specified"
      return False

   # validate rolename:
   rolerow = sqlalchemysetup.session.query(tableclasses.Role).filter(tableclasses.Role.role_name == rolename ).first()
   if rolerow == None:
      print "ERROR: invalid rolename specified"
      return False

   rolerow = sqlalchemysetup.session.query(tableclasses.Role).select_from(join(join(tableclasses.Role,tableclasses.RoleMember),tableclasses.Account)).filter(tableclasses.Role.role_name == rolename ).filter(tableclasses.Account.username == username ).first()
   return ( rolerow != None )
开发者ID:hughperkins,项目名称:ailadder,代码行数:16,代码来源:roles.py


示例12: _get_map_from_user_by_id

 def _get_map_from_user_by_id(self, user, map_id):
     """ Get a mapfile owned by a user from the database by
         map_id. """
     req = Session.query(Map).select_from(join(Map, User))
     try:
         return req.filter(and_(User.login==user, Map.id==map_id)).one()
     except Exception, e:
         return None
开发者ID:camptocamp,项目名称:Studio,代码行数:8,代码来源:mapfiles.py


示例13: untrack

 def untrack(self, query, session):
     try:
         t = session.query(UserTrack).select_from(
             join(UserTrack, Track)).filter(
             UserTrack.user_id == self.id).filter(Track.query == query).one()
         session.delete(t)
         return True
     except exc.NoResultFound, e:
         return False
开发者ID:gaustin,项目名称:twitterspy,代码行数:9,代码来源:models.py


示例14: groupfinder

def groupfinder(authorname, request):

  dbsession = DBSession()
  
  author = dbsession.query(Author).filter_by(author_name=authorname).first()
  
  groups = dbsession.query(AccessGroup).select_from(join(AuthorAccessGroupMap, AccessGroup, AuthorAccessGroupMap.group_id==AccessGroup.id)).filter(AuthorAccessGroupMap.author_id==author.id).all()

  return [group.group_name for group in groups]
  
开发者ID:Thisisdotme,项目名称:thisis.me,代码行数:9,代码来源:security.py


示例15: get_all_se_if

def get_all_se_if():
    # search node
    subq = join(InterFace, Node, InterFace.idNode == Node.id) \
            .select(Node.type == const.TYPE_NODE_SE)

    # To specify the column to get.
    subq =  subq.with_only_columns([InterFace.id])
 
    # look for the interface that node_type == 'se'.
    return InterFace.query.filter(InterFace.id.in_(subq)).all()
开发者ID:HalasNet,项目名称:felix,代码行数:10,代码来源:topologydb.py


示例16: get_all_target_if

def get_all_target_if(node_name,port):
    # search node
    subq = join(InterFace, Node, InterFace.idNode == Node.id) \
            .select(Node.node_name == node_name)

    # To specify the column to get.
    subq =  subq.with_only_columns([InterFace.id])
 
    # look for the interface that node_name and port_num matches.
    return InterFace.query.filter(InterFace.port == port).filter(InterFace.id.in_(subq)).all()
开发者ID:HalasNet,项目名称:felix,代码行数:10,代码来源:topologydb.py


示例17: bargraph

def bargraph():
    print request.form 
    visual_plugin = Plugin.query.filter_by(plugin_key="bargraph").first()

    ################# DATA RELATED ###########################################
    data_type = request.form.get('data_type', '')
    data_key = request.form.get('data_key', '')
    name_column = request.form.get('name_column', '').strip()
    value_column = request.form.get('value_column', '').strip()

    if data_key != "":
        data_key =int(data_key)
    else:
        data_key = 0

    #only for table as of now
    data_table_id = data_key

    data_table = DataTable.query.filter_by(data_table_id=data_table_id).first()
    data_table_data_source_id = data_table.data_table_data_source_id
    title=data_table.data_table_name
    caption = data_table.data_table_description
    notes = "Notes"
    

    #1a. Get tags
    #tags = models.db.session.query(Tag, TagMap).filter_by(TagMap.data_table_id=data_table_id).all()
    tags = models.db.session.query(Tag).select_from(join(Tag, TagMap)).filter(TagMap.data_table_id==data_table_id).all()
    #models.db.session.query(Tag, TagMap).filter(Tag.tag_id==TagMap.tag_id).filter(TagMap.data_table_id=='[email protected]').all()

    #2. get data_source
    data_source = DataSource.query.filter_by(data_source_id=data_table_data_source_id).first()
    data_source_owner_user_id = data_source.data_source_owner_user_id
    data_owner =Users.query.filter_by(user_id=data_source_owner_user_id).first()
    #3. get data_columns
    data_columns = DataColumn.query.filter_by(data_column_data_table_id=data_table_id)
    no_of_data_columns = data_columns.count()
    #4. get values_data_table_<data_table_id>
    sql="select * from values_data_table_"+str(data_table_id)
    values_data_table = models.db.session.execute(sql)
    data_array =[]
    values_array = []
    values_array.append(name_column)
    values_array.append(value_column)
    data_array.append(values_array)
    for row in values_data_table:
        values_array = []
        print unicode(row[name_column])
        values_array.append(unicode(row[name_column]))
        values_array.append(int(row[value_column]))
        data_array.append(values_array)
    print json.dumps(data_array,ensure_ascii=False)
    #pass the returned value to template for display 
    return render_template("visualization_plugins/bargraph.html",title=request.form['title'],caption="",notes="Please click on save to save the visualization",explore_tab="active",visual_plugin=visual_plugin, values_data_table=values_data_table, data_table=data_table,data_source=data_source, data_columns=data_columns,no_of_data_columns=no_of_data_columns,data_owner=data_owner,data_table_id=data_table_id,tags=tags,user_action="try",selected_columns=request.form,data_array=json.dumps(data_array,ensure_ascii=False)  )
开发者ID:arghyam,项目名称:dataview,代码行数:54,代码来源:visualization.py


示例18: getNeedDealDiagnoseByHospitalUser

    def getNeedDealDiagnoseByHospitalUser(cls,session,uploadUserId,patientName=None,pagger=Pagger(1,20) ):
        if uploadUserId is None :
            return
        if patientName is None or patientName == u'':
            query=session.query(Diagnose)\
                .filter(Diagnose.uploadUserId==uploadUserId,Diagnose.status.in_((DiagnoseStatus.NeedTriage,DiagnoseStatus.NeedUpdate))).offset(pagger.getOffset()).limit(pagger.getLimitCount())
        else:
            query=session.query(Diagnose).select_from(join(Patient,Diagnose,Patient.id==Diagnose.patientId)) \
                .filter(Patient.realname==patientName,Diagnose.status.in_((DiagnoseStatus.NeedTriage,DiagnoseStatus.NeedUpdate)),Diagnose.uploadUserId==uploadUserId).offset(pagger.getOffset()).limit(pagger.getLimitCount())

        return query.all()
开发者ID:LichuanLu,项目名称:blueberry,代码行数:11,代码来源:diagnoseDocument.py


示例19: groupfinder

def groupfinder(authorname, request):

  dbsession = DBSession()

  author = dbsession.query(Author).filter_by(author_name=authorname).first()

  # TODO: add handling for invalid author name
  if author is not None:
    groups = dbsession.query(AccessGroup).select_from(join(AuthorAccessGroupMap, AccessGroup, AuthorAccessGroupMap.group_id == AccessGroup.id)).filter(AuthorAccessGroupMap.author_id == author.id).all()
    return [group.group_name for group in groups]

  return []
开发者ID:Thisisdotme,项目名称:thisis.me,代码行数:12,代码来源:security.py


示例20: test_no_host_threshold

def test_no_host_threshold():
    """ ensure down_hosts_threshold must exist """
    br = Branch.get_unique(sess, 'ny-prod', compel=True)
    np = Building.get_unique(sess, name='np', compel=True)

    per = sess.query(Personality).select_from(
            join(Archetype, Personality)).filter(
            and_(Archetype.name=='windows', Personality.name=='generic')).one()

    ec = EsxCluster(name=CLUSTER_NAME,
                    location_constraint=np, personality=per, branch=br)
    add(sess, ec)
    commit(sess)
开发者ID:jrha,项目名称:aquilon,代码行数:13,代码来源:test_cluster.py



注:本文中的sqlalchemy.orm.join函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python orm.joinedload函数代码示例发布时间:2022-05-27
下一篇:
Python orm.foreign函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap