本文整理汇总了Python中sqlalchemy.orm.join函数的典型用法代码示例。如果您正苦于以下问题:Python join函数的具体用法?Python join怎么用?Python join使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了join函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: dataXml
def dataXml(self,**kw):
allOpen = None
allClosed = None
openByParent = None
try:
allOpen = DBSession().query(Request).select_from(join(Request, RequestItem)).filter(RequestItem.request_item_dispense_date==None).order_by(Request.request_id).all()
allClosed = DBSession().query(Request).select_from(join(Request, RequestItem)).filter(RequestItem.request_item_dispense_date > (datetime.now()-timedelta(days=1))).order_by(Request.request_id).all()
openByParent = DBSession().query(Request).select_from(join(Request, RequestItem)).filter(Request.requested_by_id == request.identity['user'].user_id).filter(RequestItem.request_item_dispense_date == None).order_by(Request.request_id).all()
except Exception,e:
log.exception(e)
开发者ID:aytsai,项目名称:ricebowl,代码行数:10,代码来源:grids.py
示例2: __init__
def __init__(self, graph_view):
from os.path import dirname, join
from jinja2 import Template
templates_dir = join(dirname(dirname(__file__)), "templates")
with open(join(templates_dir, "idea_in_synthesis.jinja2")) as f:
self.idea_template = Template(f.read())
with open(join(templates_dir, "synthesis.jinja2")) as f:
self.synthesis_template = Template(f.read())
self.graph_view = graph_view
开发者ID:rmoorman,项目名称:assembl,代码行数:10,代码来源:idea_graph_view.py
示例3: getDiagnoseByAdmin2
def getDiagnoseByAdmin2(cls,session,hostpitalList=None,doctorName=None,pagger=Pagger(1,20) ):
if (doctorName is None or doctorName == u'')and hostpitalList is None:
return session.query(Diagnose).filter(Diagnose.status==DiagnoseStatus.NeedTriage).offset(pagger.getOffset()).limit(pagger.getLimitCount()).all()
if doctorName is None or doctorName == u'':
return session.query(Diagnose).filter(Diagnose.hospitalId.in_(hostpitalList),Diagnose.status==DiagnoseStatus.NeedTriage).offset(pagger.getOffset()).limit(pagger.getLimitCount()).all()
if hostpitalList:
query=session.query(Diagnose).select_from(join(Doctor,Diagnose,Doctor.id==Diagnose.doctorId))\
.filter(Doctor.username==doctorName,Diagnose.status==DiagnoseStatus.NeedTriage,Diagnose.hospitalId.in_(hostpitalList)).offset(pagger.getOffset()).limit(pagger.getLimitCount())
else:
query=session.query(Diagnose).select_from(join(Doctor,Diagnose,Doctor.id==Diagnose.doctorId)) \
.filter(Doctor.username==doctorName,Diagnose.status==DiagnoseStatus.NeedTriage).offset(pagger.getOffset()).limit(pagger.getLimitCount())
return query.all()
开发者ID:LichuanLu,项目名称:blueberry,代码行数:13,代码来源:diagnoseDocument.py
示例4: getDiagnoseByPatientUser
def getDiagnoseByPatientUser(cls,session,userId,status=None,pagger=Pagger(1,20) ):
if userId is None :
return
query=None
if status is None or len(status) == 0:
query=session.query(Diagnose).select_from(join(Patient,Diagnose,Patient.id==Diagnose.patientId)) \
.filter(Patient.userID==userId,Diagnose.status!=DiagnoseStatus.Del).offset(pagger.getOffset()).limit(pagger.getLimitCount())
else:
query=session.query(Diagnose).select_from(join(Patient,Diagnose,Patient.id==Diagnose.patientId)) \
.filter(Patient.userID==userId,Diagnose.status==status).offset(pagger.getOffset()).limit(pagger.getLimitCount())
return query.all()
开发者ID:LichuanLu,项目名称:blueberry,代码行数:14,代码来源:diagnoseDocument.py
示例5: test_create_hosts
def test_create_hosts():
br = Branch.get_unique(sess, 'ny-prod', compel=True)
dns_dmn = DnsDomain.get_unique(sess, 'one-nyp.ms.com', compel=True)
stat = Status.get_unique(sess, 'build', compel=True)
os = sess.query(OperatingSystem).filter(Archetype.name == 'vmhost').first()
assert os, 'No OS in %s' % func_name()
pers = sess.query(Personality).select_from(
join(Personality, Archetype)).filter(
and_(Archetype.name=='vmhost', Personality.name=='generic')).one()
sess.autoflush=False
for i in xrange(NUM_HOSTS):
machine = m_factory.next()
vm_host = Host(machine=machine, name='%s%s' % (HOST_NAME, i),
dns_domain=dns_dmn, branch=br, personality=pers,
status=stat, operating_system=os)
add(sess, vm_host)
sess.autoflush=True
commit(sess)
hosts = sess.query(Host).filter(
Host.name.like(HOST_NAME+'%')).all()
assert len(hosts) is NUM_HOSTS
print 'created %s hosts'% len(hosts)
开发者ID:jrha,项目名称:aquilon,代码行数:28,代码来源:test_cluster.py
示例6: test_host_in_two_clusters
def test_host_in_two_clusters():
"""
create 2 new clusters and add a host to both. check Host.cluster.
"""
per = sess.query(Personality).select_from(
join(Archetype, Personality)).filter(
and_(Archetype.name=='windows', Personality.name=='generic')).one()
for i in xrange(3):
ec = EsxCluster(name='%s%s'% (CLUSTER_NAME, i), personality=per)
add(sess, ec)
commit(sess)
c1 = sess.query(EsxCluster).filter_by(name='%s1' % (CLUSTER_NAME)).one()
c2 = sess.query(EsxCluster).filter_by(name='%s2' % (CLUSTER_NAME)).one()
assert c1
assert c2
print 'clusters in host in 2 cluster test are %s and %s'% (c1, c2)
host = h_factory.next()
sess.autoflush = False
hcm1 = HostClusterMember(host=host, cluster=c1)
create(sess, hcm1)
assert host in c1.hosts
print 'c1 hosts are %s'% (c1.hosts)
c2.hosts.append(host)
sess.autoflush = True
commit(sess)
开发者ID:jrha,项目名称:aquilon,代码行数:32,代码来源:test_cluster.py
示例7: create_link_xml
def create_link_xml(xml_topology, network_name):
#search link
subq = join(InterFace, Node, InterFace.idNode == Node.id) \
.select(Node.network_name == network_name)
#To specify the column to get.
subq = subq.with_only_columns([InterFace.id])
#find a link connection source I/F in the network.
link_list = Link.query.filter(Link.src_idIF.in_(subq)).group_by(Link.id).all()
for link in link_list:
if link.src_if is None or link.dst_if is None:
logger.warn('{0} is invalid link.(src_if:{1}-dst_if{2})'\
.format(link.id,link.src_idIF,link.dst_idIF))
continue
#add <link type='lan'>
xml_link = SubElement(xml_topology, const.XML_TAG_LINK,
{const.XML_ATTR_TYPE:const.TYPE_LINK_LAN})
#add (source I/F) <interface_ref client_id='xxx'>
SubElement(xml_link, const.XML_TAG_IF_REF,
{const.XML_ATTR_CLIENT_ID:link.src_if.if_name})
#add (destination I/F) <interface_ref client_id='xxx'>
SubElement(xml_link, const.XML_TAG_IF_REF,
{const.XML_ATTR_CLIENT_ID:link.dst_if.if_name})
return xml_topology
开发者ID:HalasNet,项目名称:felix,代码行数:27,代码来源:topology_rest.py
示例8: viewDataTable
def viewDataTable(data_table_id):
#1. get data_table
data_table = DataTable.query.filter_by(data_table_id=data_table_id).first()
data_table_data_source_id = data_table.data_table_data_source_id
title=data_table.data_table_name
caption = data_table.data_table_description
notes = "Notes"
#1a. Get tags
#tags = models.db.session.query(Tag, TagMap).filter_by(TagMap.data_table_id=data_table_id).all()
tags = models.db.session.query(Tag).select_from(join(Tag, TagMap)).filter(TagMap.data_table_id==data_table_id).all()
#models.db.session.query(Tag, TagMap).filter(Tag.tag_id==TagMap.tag_id).filter(TagMap.data_table_id=='[email protected]').all()
#2. get data_source
data_source = DataSource.query.filter_by(data_source_id=data_table_data_source_id).first()
data_source_owner_user_id = data_source.data_source_owner_user_id
data_owner =Users.query.filter_by(user_id=data_source_owner_user_id).first()
#3. get data_columns
data_columns = DataColumn.query.filter_by(data_column_data_table_id=data_table_id)
no_of_data_columns = data_columns.count()
#4. get values_data_table_<data_table_id>
sql="select * from values_data_table_"+str(data_table_id)
values_data_table = models.db.session.execute(sql)
return render_template('community/view_data_table.html',title=title, caption=caption, notes=notes, values_data_table=values_data_table, data_table=data_table,data_source=data_source, data_columns=data_columns,no_of_data_columns=no_of_data_columns,data_owner=data_owner,explore_tab="active",data_table_id=data_table_id,tags=tags)
开发者ID:arghyam,项目名称:dataview,代码行数:28,代码来源:community.py
示例9: test_valid_activation
def test_valid_activation(self):
"""Test that the ``activate`` view properly handles a valid activation
(in this case, based on the default backend's activation window).
"""
success_redirect = self.url_reverse("registration_activation_complete")
# First, register an account.
self.client.post(
reverse("registration_register"),
data={
"username": "alice",
"email": "[email protected]",
"password1": "swordfish",
"password2": "swordfish",
},
)
profile = (
self.session.query(RegistrationProfile)
.select_from(join(RegistrationProfile, User))
.filter(User.username == u"alice")
.first()
)
self.assertIsNotNone(profile)
path = reverse("registration_activate", kwargs={"activation_key": profile.activation_key})
response = self.client.get(path)
self.assertRedirects(response, success_redirect)
alice = self.session.query(User).filter_by(username=u"alice").first()
self.assertTrue(alice.is_active)
开发者ID:saebyn,项目名称:baph,代码行数:28,代码来源:test_views.py
示例10: search_index
def search_index(self):
parent_id = self.params.get("parent_id", None)
# Setup Paging
c.count = 0
c.resources = None
limit, offset = self._paging()
# Build Query
q = meta.Session.query(self._poly_class_)
if parent_id:
q = q.select_from(join(Connection, Resource, Connection.child_id == Resource.id)).filter(
Connection.parent_id == parent_id
)
if self._classname() != "resources":
q = q.filter(Resource.resource_type == self._classname())
for k, v in self.params.iteritems():
if hasattr(self._poly_class_, k):
q = q.filter(getattr(self._poly_class_, k) == v)
c.count = q.count()
if limit:
q = q.limit(limit)
if offset:
q = q.offset(offset)
c.resources = q.all()
return c.resources
开发者ID:HyFrmn,项目名称:vault,代码行数:25,代码来源:resources.py
示例11: isInRole2
def isInRole2(username, rolename):
if rolename == None:
print "ERROR: no rolename specified"
return False
if rolename == '':
print "ERROR: no rolename specified"
return False
# validate rolename:
rolerow = sqlalchemysetup.session.query(tableclasses.Role).filter(tableclasses.Role.role_name == rolename ).first()
if rolerow == None:
print "ERROR: invalid rolename specified"
return False
rolerow = sqlalchemysetup.session.query(tableclasses.Role).select_from(join(join(tableclasses.Role,tableclasses.RoleMember),tableclasses.Account)).filter(tableclasses.Role.role_name == rolename ).filter(tableclasses.Account.username == username ).first()
return ( rolerow != None )
开发者ID:hughperkins,项目名称:ailadder,代码行数:16,代码来源:roles.py
示例12: _get_map_from_user_by_id
def _get_map_from_user_by_id(self, user, map_id):
""" Get a mapfile owned by a user from the database by
map_id. """
req = Session.query(Map).select_from(join(Map, User))
try:
return req.filter(and_(User.login==user, Map.id==map_id)).one()
except Exception, e:
return None
开发者ID:camptocamp,项目名称:Studio,代码行数:8,代码来源:mapfiles.py
示例13: untrack
def untrack(self, query, session):
try:
t = session.query(UserTrack).select_from(
join(UserTrack, Track)).filter(
UserTrack.user_id == self.id).filter(Track.query == query).one()
session.delete(t)
return True
except exc.NoResultFound, e:
return False
开发者ID:gaustin,项目名称:twitterspy,代码行数:9,代码来源:models.py
示例14: groupfinder
def groupfinder(authorname, request):
dbsession = DBSession()
author = dbsession.query(Author).filter_by(author_name=authorname).first()
groups = dbsession.query(AccessGroup).select_from(join(AuthorAccessGroupMap, AccessGroup, AuthorAccessGroupMap.group_id==AccessGroup.id)).filter(AuthorAccessGroupMap.author_id==author.id).all()
return [group.group_name for group in groups]
开发者ID:Thisisdotme,项目名称:thisis.me,代码行数:9,代码来源:security.py
示例15: get_all_se_if
def get_all_se_if():
# search node
subq = join(InterFace, Node, InterFace.idNode == Node.id) \
.select(Node.type == const.TYPE_NODE_SE)
# To specify the column to get.
subq = subq.with_only_columns([InterFace.id])
# look for the interface that node_type == 'se'.
return InterFace.query.filter(InterFace.id.in_(subq)).all()
开发者ID:HalasNet,项目名称:felix,代码行数:10,代码来源:topologydb.py
示例16: get_all_target_if
def get_all_target_if(node_name,port):
# search node
subq = join(InterFace, Node, InterFace.idNode == Node.id) \
.select(Node.node_name == node_name)
# To specify the column to get.
subq = subq.with_only_columns([InterFace.id])
# look for the interface that node_name and port_num matches.
return InterFace.query.filter(InterFace.port == port).filter(InterFace.id.in_(subq)).all()
开发者ID:HalasNet,项目名称:felix,代码行数:10,代码来源:topologydb.py
示例17: bargraph
def bargraph():
print request.form
visual_plugin = Plugin.query.filter_by(plugin_key="bargraph").first()
################# DATA RELATED ###########################################
data_type = request.form.get('data_type', '')
data_key = request.form.get('data_key', '')
name_column = request.form.get('name_column', '').strip()
value_column = request.form.get('value_column', '').strip()
if data_key != "":
data_key =int(data_key)
else:
data_key = 0
#only for table as of now
data_table_id = data_key
data_table = DataTable.query.filter_by(data_table_id=data_table_id).first()
data_table_data_source_id = data_table.data_table_data_source_id
title=data_table.data_table_name
caption = data_table.data_table_description
notes = "Notes"
#1a. Get tags
#tags = models.db.session.query(Tag, TagMap).filter_by(TagMap.data_table_id=data_table_id).all()
tags = models.db.session.query(Tag).select_from(join(Tag, TagMap)).filter(TagMap.data_table_id==data_table_id).all()
#models.db.session.query(Tag, TagMap).filter(Tag.tag_id==TagMap.tag_id).filter(TagMap.data_table_id=='[email protected]').all()
#2. get data_source
data_source = DataSource.query.filter_by(data_source_id=data_table_data_source_id).first()
data_source_owner_user_id = data_source.data_source_owner_user_id
data_owner =Users.query.filter_by(user_id=data_source_owner_user_id).first()
#3. get data_columns
data_columns = DataColumn.query.filter_by(data_column_data_table_id=data_table_id)
no_of_data_columns = data_columns.count()
#4. get values_data_table_<data_table_id>
sql="select * from values_data_table_"+str(data_table_id)
values_data_table = models.db.session.execute(sql)
data_array =[]
values_array = []
values_array.append(name_column)
values_array.append(value_column)
data_array.append(values_array)
for row in values_data_table:
values_array = []
print unicode(row[name_column])
values_array.append(unicode(row[name_column]))
values_array.append(int(row[value_column]))
data_array.append(values_array)
print json.dumps(data_array,ensure_ascii=False)
#pass the returned value to template for display
return render_template("visualization_plugins/bargraph.html",title=request.form['title'],caption="",notes="Please click on save to save the visualization",explore_tab="active",visual_plugin=visual_plugin, values_data_table=values_data_table, data_table=data_table,data_source=data_source, data_columns=data_columns,no_of_data_columns=no_of_data_columns,data_owner=data_owner,data_table_id=data_table_id,tags=tags,user_action="try",selected_columns=request.form,data_array=json.dumps(data_array,ensure_ascii=False) )
开发者ID:arghyam,项目名称:dataview,代码行数:54,代码来源:visualization.py
示例18: getNeedDealDiagnoseByHospitalUser
def getNeedDealDiagnoseByHospitalUser(cls,session,uploadUserId,patientName=None,pagger=Pagger(1,20) ):
if uploadUserId is None :
return
if patientName is None or patientName == u'':
query=session.query(Diagnose)\
.filter(Diagnose.uploadUserId==uploadUserId,Diagnose.status.in_((DiagnoseStatus.NeedTriage,DiagnoseStatus.NeedUpdate))).offset(pagger.getOffset()).limit(pagger.getLimitCount())
else:
query=session.query(Diagnose).select_from(join(Patient,Diagnose,Patient.id==Diagnose.patientId)) \
.filter(Patient.realname==patientName,Diagnose.status.in_((DiagnoseStatus.NeedTriage,DiagnoseStatus.NeedUpdate)),Diagnose.uploadUserId==uploadUserId).offset(pagger.getOffset()).limit(pagger.getLimitCount())
return query.all()
开发者ID:LichuanLu,项目名称:blueberry,代码行数:11,代码来源:diagnoseDocument.py
示例19: groupfinder
def groupfinder(authorname, request):
dbsession = DBSession()
author = dbsession.query(Author).filter_by(author_name=authorname).first()
# TODO: add handling for invalid author name
if author is not None:
groups = dbsession.query(AccessGroup).select_from(join(AuthorAccessGroupMap, AccessGroup, AuthorAccessGroupMap.group_id == AccessGroup.id)).filter(AuthorAccessGroupMap.author_id == author.id).all()
return [group.group_name for group in groups]
return []
开发者ID:Thisisdotme,项目名称:thisis.me,代码行数:12,代码来源:security.py
示例20: test_no_host_threshold
def test_no_host_threshold():
""" ensure down_hosts_threshold must exist """
br = Branch.get_unique(sess, 'ny-prod', compel=True)
np = Building.get_unique(sess, name='np', compel=True)
per = sess.query(Personality).select_from(
join(Archetype, Personality)).filter(
and_(Archetype.name=='windows', Personality.name=='generic')).one()
ec = EsxCluster(name=CLUSTER_NAME,
location_constraint=np, personality=per, branch=br)
add(sess, ec)
commit(sess)
开发者ID:jrha,项目名称:aquilon,代码行数:13,代码来源:test_cluster.py
注:本文中的sqlalchemy.orm.join函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论