本文整理汇总了Python中sqlalchemy.orm.aliased函数的典型用法代码示例。如果您正苦于以下问题:Python aliased函数的具体用法?Python aliased怎么用?Python aliased使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了aliased函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: get_descendants_select
def get_descendants_select(source_id):
"""Return a query that includes the descendants of an idea. (not idea itself yet.)
Beware: we use a recursive query via a CTE and the PostgreSQL-specific
ARRAY type. Blame this guy for that choice:
http://explainextended.com/2009/09/24/adjacency-list-vs-nested-sets-postgresql/
Also, that other guy provided insight into using CTE queries:
http://stackoverflow.com/questions/11994092/how-can-i-perform-this-recursive-common-table-expression-in-sqlalchemy
A literal column and an op complement nicely all this craziness.
All I can say is SQLAlchemy kicks ass, and so does PostgreSQL.
"""
link = select(
[IdeaLink.source_id, IdeaLink.target_id]
).select_from(
IdeaLink
).where(
IdeaLink.source_id == source_id
).cte(recursive=True)
source_alias = aliased(link)
targets_alias = aliased(IdeaLink)
parent_link = targets_alias.source_id == source_alias.c.target_id
children = select(
[targets_alias.source_id, targets_alias.target_id]
).select_from(targets_alias).where(parent_link)
with_children = link.union_all(children)
return with_children
开发者ID:assembl,项目名称:assembl,代码行数:29,代码来源:recursion_example.py
示例2: data
def data(self,
use_choice_labels=False,
expand_collections=False,
ignore_private=True):
session = self.db_session
CreateUser = aliased(datastore.User)
ModifyUser = aliased(datastore.User)
query = (
session.query(
models.Enrollment.id.label('id'),
models.Patient.pid.label('pid'),
models.Site.name.label('site'),
models.Enrollment.id.label('enrollment_id'), # BBB
models.Study.title.label('study'),
models.Patient.nurse.label('nurse'), # BBB
models.Enrollment.reference_number.label('reference_number'),
models.Enrollment.consent_date.label('consent_date'),
models.Enrollment.latest_consent_date.label(
'latest_consent_date'),
models.Enrollment.termination_date.label('termination_date'),
models.Enrollment.create_date,
CreateUser.key.label('create_user'),
models.Enrollment.modify_date,
ModifyUser.key.label('modify_user'))
.select_from(models.Enrollment)
.join(models.Enrollment.patient)
.join(models.Enrollment.study)
.join(models.Patient.site)
.join(CreateUser, models.Enrollment.create_user)
.join(ModifyUser, models.Enrollment.modify_user)
.order_by(models.Enrollment.id,
models.Study.title,
models.Patient.pid))
return query
开发者ID:davidmote,项目名称:occams_studies,代码行数:34,代码来源:enrollment.py
示例3: get_description_predecessors
def get_description_predecessors(self):
""" get all descriptions of the predecessors """
session = Session.object_session(self)
PackageVersion2=aliased(PackageVersion)
#SELECT B.description_id from package_version_tb AS A LEFT JOIN package_version_tb AS B ON A.package = B.package where A.description_id='79246' group by B.description_id;
DescriptionIDs = [x for x, in session.query(PackageVersion2.description_id). \
join(PackageVersion, PackageVersion2.package == PackageVersion.package). \
filter(PackageVersion.description_id == self.description_id).\
filter(PackageVersion2.description_id != self.description_id). \
group_by(PackageVersion2.description_id).\
all()]
# START REMOVE AFTER FIX
# FIXME
# use later only package_version_tb and not the old package field
# SELECT B.description_id from description_tb AS A left join description_tb AS B ON A.package = B.package where A.description_id='79246' group by B.description_id;
Description2=aliased(Description)
DescriptionIDs2 = [x for x, in session.query(Description2.description_id). \
join(Description, Description2.package == Description.package). \
filter(Description.description_id == self.description_id).\
filter(Description.description_id != self.description_id). \
group_by(Description2.description_id). \
all()]
DescriptionIDs += DescriptionIDs2
# END REMOVE AFTER FIX
#return dict.fromkeys(DescriptionIDs).keys()
result = session.query(Description).filter(Description.description_id.in_(DescriptionIDs)).all()
return result
开发者ID:fpirola,项目名称:DDTSS-Django,代码行数:31,代码来源:ddtp.py
示例4: test_containment_relations3d
def test_containment_relations3d( req ):
rospy.loginfo( "SEMAP DB SRVs: test_containment_relations3d" )
res = GetDirectionalRelations2DResponse()
obj1 = aliased( ObjectInstance )
geo1 = aliased( GeometryModel )
obj2 = aliased( ObjectInstance )
geo2 = aliased( GeometryModel )
geos = db().query( SFCGAL_Contains3D( geo1.geometry, geo2.geometry) ).\
filter(obj1.id == req.reference_id, \
obj1.absolute_description_id == geo1.abstraction_desc, geo1.type == "BoundingBox", \
obj2.id == req.target_id, \
obj2.absolute_description_id == geo2.abstraction_desc, geo2.type == "BoundingBox" ).all()
print geos
for geoI, geoII in geos:
print 'geoI', db().execute( ST_AsText( geoI.geometry) ).scalar()
#print 'isValid', db().execute( SFCGAL_IsValid( geoI.geometry) ).scalar()
print 'geoI', db().execute( ST_AsText( geoII.geometry) ).scalar()
#print 'isValid', db().execute( SFCGAL_IsValid( geoII.geometry) ).scalar()
containment_status = db().execute( SFCGAL_Contains3D( geoI.geometry, geoII.geometry ) ).scalar()
print 'containment_status:', containment_status
#if containment_status:
# rospy.loginfo("OBJECT CONTAINMENT VERIFIED!")
#else:
# rospy.loginfo("OBJECT CONTAINMENT REJECTED!")
return res
开发者ID:hdeeken,项目名称:semap_ros,代码行数:30,代码来源:spatial_relations.py
示例5: unbind_objects_on_objects
def unbind_objects_on_objects(req):
rospy.loginfo( "SEMAP DB SRVs: get_objects_within_object" )
call_get_objects_in_objects(req.reference_object_types, req.target_types, req.fully_within)
res = GetObjectsWithinObjectResponse()
obj1 = aliased( ObjectInstance )
geo1 = aliased( GeometryModel )
obj2 = aliased( ObjectInstance )
geo2 = aliased( GeometryModel )
if req.target_object_types:
obj2_ids = any_obj_types_ids(obj2, req.target_object_types)
else:
obj2_ids = any_obj_ids(obj2)
print req.reference_object_id
print req.target_object_types
print obj2_ids.all()
rospy.loginfo("SEMAP DB SRVs: test_containment_relations3d tries to find em")
ids = db().query( obj2.id ).\
filter(obj1.id == req.reference_object_id, obj1.absolute_description_id == geo1.abstraction_desc, geo1.type == "BoundingBox", \
obj1.frame_id == FrameNode.id,
obj2.id.in_( obj2_ids ), obj2.absolute_description_id == geo2.abstraction_desc, geo2.type == "BoundingBox", \
SFCGAL_Contains3D( geo1.geometry, geo2.geometry)
).all()
res.target_object_ids = [id for id, in ids]
for id in res.target_object_ids:
call_change_frame(id, "world", False)
return res
开发者ID:hdeeken,项目名称:semap_ros,代码行数:35,代码来源:topology_functions.py
示例6: get_objects_within_volume
def get_objects_within_volume( req ):
rospy.loginfo( "SEMAP DB SRVs: get_objects_within_area" )
res = GetObjectsWithinVolumeResponse()
tar_obj = aliased( ObjectInstance )
tar_geo = aliased( GeometryModel )
if req.target_object_types:
tar_ids = any_obj_types_ids(tar_obj, req.target_object_types)
else:
tar_ids = any_obj_ids(tar_obj)
if req.fully_within:
operator = SFCGAL_Contains3D(fromPolygonMesh3D(req.reference_mesh), tar_geo.geometry)
else:
operator = or_( SFCGAL_Contains3D(fromPolygonMesh3D(req.reference_mesh), tar_geo.geometry),
SFCGAL_Intersects3D(fromPolygonMesh3D(req.reference_mesh), tar_geo.geometry) )
results = db().query( tar_obj.id ).\
filter( tar_obj.id.in_( tar_ids ),
tar_obj.absolute_description_id == tar_geo.abstraction_desc,
tar_geo.type == req.target_object_geometry_type,
operator).all()
for i in results:
pair = ObjectPair()
pair.reference_id = -1
pair.target_id = i[0]
pair.relations.append("contained-in-volume")
res.pairs.append(pair)
return res
'''
开发者ID:hdeeken,项目名称:semap_ros,代码行数:33,代码来源:spatial_relations.py
示例7: get_objects_within_range2d
def get_objects_within_range2d( req ):
rospy.loginfo( "SEMAP DB SRVs: get_objects_within_range2d" )
res = GetObjectsWithinRange2DResponse()
obj = aliased( ObjectInstance )
geo = aliased( GeometryModel )
print req.object_types, req.point, req.geometry_type, req.distance
if req.geometry_type not in ["Position2D", "AxisAligned2D", "FootprintBox", "FootprintHull"]:
rospy.logerr("SEMAP DB SRVs: get_objects_within_range2d was called with %s which is not a valid 2D geometry type" % req.geometry_type)
else:
rospy.loginfo("SEMAP DB SRVs: get_objects_within_range2d tries to find em")
if req.object_types:
obj_ids = any_obj_types_ids(obj, req.object_types)
else:
obj_ids = any_obj_ids(obj)
if req.fully_within:
ids = db().query( obj.id ).filter( obj.id.in_( obj_ids ), \
obj.absolute_description_id == geo.abstraction_desc, geo.type == req.geometry_type, \
ST_DFullyWithin(fromPoint2D(req.point),geo.geometry, req.distance)
).all()
else:
ids = db().query( obj.id ).filter( obj.id.in_( obj_ids ), \
obj.absolute_description_id == geo.abstraction_desc, geo.type == req.geometry_type, \
ST_DWithin(fromPoint2D(req.point), geo.geometry, req.distance)
).all()
res.ids = [id for id, in ids]
return res
开发者ID:hdeeken,项目名称:semap_ros,代码行数:32,代码来源:spatial_relations.py
示例8: _get_context_relationships
def _get_context_relationships():
"""Load list of objects related on contexts and objects types.
This code handles the case when user is added as `Auditor` and should be
able to see objects mapped to the `Program` on `My Work` page.
Returns:
objects (list((id, type, None))): Related objects
"""
user_role_query = db.session.query(UserRole.context_id).join(
Role, UserRole.role_id == Role.id).filter(and_(
UserRole.person_id == contact_id, Role.name == 'Auditor')
)
_ct = aliased(all_models.Context, name="c")
_rl = aliased(all_models.Relationship, name="rl")
context_query = db.session.query(
_rl.source_id.label('id'),
_rl.source_type.label('type'),
literal(None)).join(_ct, and_(
_ct.id.in_(user_role_query),
_rl.destination_id == _ct.related_object_id,
_rl.destination_type == _ct.related_object_type,
_rl.source_type.in_(model_names),
)).union(db.session.query(
_rl.destination_id.label('id'),
_rl.destination_type.label('type'),
literal(None)).join(_ct, and_(
_ct.id.in_(user_role_query),
_rl.source_id == _ct.related_object_id,
_rl.source_type == _ct.related_object_type,
_rl.destination_type.in_(model_names),)))
return context_query
开发者ID:zidarsk8,项目名称:ggrc-core,代码行数:34,代码来源:query_helpers.py
示例9: create_new_repack_notifications
def create_new_repack_notifications(date_from=None, date_to=None, user_notifications=None):
packer_channel = aliased(Channel, name="source_channel")
packer_user = aliased(User, name="packer_user")
repacker_channel = aliased(Channel, name="repacker_channel")
repacker_user = aliased(User, name="repacker_user")
activity_window = readonly_session.query(VideoInstance, packer_channel, repacker_channel, repacker_user).join(
packer_channel,
packer_channel.id == VideoInstance.source_channel
).join(
packer_user,
packer_user.id == packer_channel.owner
).join(
repacker_channel,
(repacker_channel.id == VideoInstance.channel) &
(repacker_channel.favourite == False) &
(repacker_channel.public == True)
).join(
repacker_user,
repacker_user.id == repacker_channel.owner
)
if date_from:
activity_window = activity_window.filter(VideoInstance.date_added >= date_from)
if date_to:
activity_window = activity_window.filter(VideoInstance.date_added < date_to)
for video_instance, packer_channel, repacker_channel, repacker in activity_window:
user, type, body = repack_message(repacker, repacker_channel, video_instance)
_add_user_notification(packer_channel.owner, video_instance.date_added, type, body)
if user_notifications is not None and type in app.config['PUSH_NOTIFICATION_MAP']:
user_notifications.setdefault(packer_channel.owner, None)
开发者ID:wonderpl,项目名称:dolly-web,代码行数:33,代码来源:commands.py
示例10: apifriends
def apifriends():
# this is for dev
src_user_id = set_src_user_id()
# dev end
try:
lastid = int(request.args.get('lastid', 0))
except:
raise InvalidParam('invalid lastid')
from_table = aliased(Greeting)
to_table = aliased(Greeting)
friends = db.session.query(Greeting.id, Greeting.dst_user_id, Account.uid, Account.provider, Greeting.created_at)\
.join(Account, Account.user_id == Greeting.dst_user_id)\
.filter(Greeting.src_user_id == src_user_id)\
.filter(Greeting.is_friend == True)\
.filter(Greeting.id > lastid).all()
return jsonify({
"status": "success",
"data": {
"items": [dict(zip(['id', 'user_id', 'uid', 'provider', 'created_at'], [id, user_id, uid, provider, totimestamp(created_at)]))
for id, user_id, uid, provider, created_at in friends]
}
})
开发者ID:siteshen,项目名称:dianying,代码行数:26,代码来源:index.py
示例11: ak_jurnal_skpd_item_act
def ak_jurnal_skpd_item_act(self):
ses = self.request.session
req = self.request
params = req.params
url_dict = req.matchdict
pk_id = 'id' in params and params['id'] and int(params['id']) or 0
if url_dict['act']=='grid':
ak_jurnal_id = url_dict['ak_jurnal_id'].isdigit() and url_dict['ak_jurnal_id'] or 0
columns = []
columns.append(ColumnDT('id'))
columns.append(ColumnDT('sapkd'))
columns.append(ColumnDT('sapnm'))
columns.append(ColumnDT('amount', filter=self._number_format))
columns.append(ColumnDT('notes'))
columns.append(ColumnDT('rekkd'))
columns.append(ColumnDT('reknm'))
columns.append(ColumnDT('kegiatan_sub_id'))
columns.append(ColumnDT('rekening_id'))
columns.append(ColumnDT('ak_jurnal_id'))
columns.append(ColumnDT('subkd'))
columns.append(ColumnDT('subnm'))
rek = aliased(Rekening)
sap = aliased(Sap)
sub = aliased(KegiatanSub)
query = DBSession.query(AkJurnalItem.id,
sap.kode.label('sapkd'),
sap.nama.label('sapnm'),
AkJurnalItem.amount,
AkJurnalItem.notes,
rek.kode.label('rekkd'),
rek.nama.label('reknm'),
AkJurnalItem.kegiatan_sub_id,
AkJurnalItem.rekening_id,
AkJurnalItem.ak_jurnal_id,
sub.kode.label('subkd'),
sub.nama.label('subnm'),
).join(AkJurnal,
).outerjoin(rek, AkJurnalItem.rekening_id == rek.id
).outerjoin(sap, AkJurnalItem.sap_id == sap.id
).outerjoin(sub, AkJurnalItem.kegiatan_sub_id == sub.id
).filter(AkJurnalItem.ak_jurnal_id==ak_jurnal_id,
AkJurnalItem.ak_jurnal_id==AkJurnal.id,
).group_by(AkJurnalItem.id,
sap.kode.label('sapkd'),
sap.nama.label('sapnm'),
AkJurnalItem.amount,
AkJurnalItem.notes,
rek.kode.label('rekkd'),
rek.nama.label('reknm'),
AkJurnalItem.kegiatan_sub_id,
AkJurnalItem.rekening_id,
AkJurnalItem.ak_jurnal_id,
sub.kode.label('subkd'),
sub.nama.label('subnm'),
)
rowTable = DataTables(req, AkJurnalItem, query, columns)
return rowTable.output_result()
开发者ID:aagusti,项目名称:o-sipkd,代码行数:60,代码来源:ak_jurnal_skpd_item.py
示例12: context_relationship_query
def context_relationship_query(contexts):
"""Load a list of objects related to the given contexts
Args:
contexts (list(int)): A list of context ids
Returns:
objects (list((id, type, None))): Related objects
"""
if not len(contexts):
return []
_context = aliased(all_models.Context, name="c")
_relationship = aliased(all_models.Relationship, name="rl")
headers = (case([
(_relationship.destination_type == _context.related_object_type,
_relationship.source_id.label('id'))
], else_=_relationship.destination_id.label('id')),
case([
(_relationship.destination_type == _context.related_object_type,
_relationship.source_type.label('type'))
], else_=_relationship.destination_type.label('type')),
literal(None))
return db.session.query(*headers).join(_context, and_(
_context.id.in_(contexts),
_relationship.destination_id == _context.related_object_id,
_relationship.destination_type == _context.related_object_type,
)).union(db.session.query(*headers).join(_context, and_(
_context.id.in_(contexts),
_relationship.source_id == _context.related_object_id,
_relationship.source_type == _context.related_object_type,
))).all()
开发者ID:zidarsk8,项目名称:ggrc-core,代码行数:33,代码来源:__init__.py
示例13: removeDupes
def removeDupes(self):
"""Remove Duplicate Items"""
print "Removing Dupes"
from sqlalchemy.orm import aliased
session = self.Session()
a1 = aliased(Data)
a2 = aliased(Data)
query = session.query(a1,a2)
query = query.filter(a1.datasource_key == a2.datasource_key) #Data sources should be unique
query = query.filter(a1.node_key == a2.node_key)
query = query.filter(a1.timestamp == a2.timestamp)
query = query.filter(a1.data_key < a2.data_key)
#query = query.limit(10)
log.debug("Count of Items {0}".format(query.count()))
#print query.limit(10).all()
keyList = [item[1].data_key for item in query.all()]
#log.debug(keyList)
theQry = session.query(Data).filter(Data.data_key.in_(keyList))
delItems = theQry.delete(False)
log.debug("Total of {0} items deleted".format(delItems))
session.commit()
# for item in query.limit(10):
#log.debug("{0} == {1}".format(item[0],item[1]))
return
开发者ID:rwilkins87,项目名称:cogent-house,代码行数:33,代码来源:processAR.py
示例14: __correspondence_query__
def __correspondence_query__(self, chain1, chain2):
"""Create a query for correspondences between the two chains. This only
checks in the given direction.
Parameters
----------
chain1 : int
The first chain id.
chain2 : int
The second chain id.
Returns
-------
corr_id : int
The correspondence id if there is an alignment between the two
chains.
"""
with self.session() as session:
info = mod.CorrespondenceInfo
mapping1 = aliased(mod.ExpSeqChainMapping)
mapping2 = aliased(mod.ExpSeqChainMapping)
query = session.query(info.correspondence_id).\
join(mapping1, mapping1.exp_seq_id == info.exp_seq_id_1).\
join(mapping2, mapping2.exp_seq_id == info.exp_seq_id_2).\
filter(mapping1.chain_id == chain1).\
filter(mapping2.chain_id == chain2).\
filter(info.good_alignment == 1)
result = query.first()
if result:
return result.correspondence_id
return None
开发者ID:BGSU-RNA,项目名称:RNA-3D-Hub-core,代码行数:33,代码来源:comparision.py
示例15: get_objects_within_range
def get_objects_within_range( req ):
rospy.loginfo( "SEMAP DB SRVs: get_objects_within_range" )
res = GetObjectsWithinRangeResponse()
tar_obj = aliased( ObjectInstance )
tar_geo = aliased( GeometryModel )
if req.target_object_types:
tar_ids = any_obj_types_ids(tar_obj, req.target_object_types)
else:
tar_ids = any_obj_ids(tar_obj)
if req.fully_within:
operator = ST_3DDFullyWithin(fromPoint3D(req.reference_point), tar_geo.geometry, req.distance)
else:
operator = ST_3DDWithin(fromPoint3D(req.reference_point), tar_geo.geometry, req.distance)
results = db().query( tar_obj.id, ST_3DDistance( fromPoint3D(req.reference_point), tar_geo.geometry), ST_3DMaxDistance( fromPoint3D(req.reference_point), tar_geo.geometry), ST_3DClosestPoint(tar_geo.geometry, fromPoint3D(req.reference_point)) ).\
filter( tar_obj.id.in_( tar_ids ), tar_obj.absolute_description_id == tar_geo.abstraction_desc, tar_geo.type == req.target_object_geometry_type, operator).\
order_by( ST_3DDistance( fromPoint3D(req.reference_point), tar_geo.geometry) ).all()
for i, min, max, point in results:
print i, min, max, point
pair = ObjectPair()
pair.reference_id = -1
pair.target_id = i
pair.max_dist = max
pair.max_dist_line[0] = req.reference_point
pair.max_dist_line[1] = toPoint3D( point )
pair.min_dist = min
pair.min_dist_line[0] = req.reference_point
pair.min_dist_line[1] = toPoint3D( point )
res.pairs.append(pair)
return res
开发者ID:hdeeken,项目名称:semap_ros,代码行数:35,代码来源:spatial_relations.py
示例16: _get_central_fip_host_routes_by_router
def _get_central_fip_host_routes_by_router(self, context, router_id,
bgp_speaker_id):
"""Get floating IP host routes with the given router as nexthop."""
with context.session.begin(subtransactions=True):
dest_alias = aliased(l3_db.FloatingIP,
name='destination')
next_hop_alias = aliased(models_v2.IPAllocation,
name='next_hop')
binding_alias = aliased(BgpSpeakerNetworkBinding,
name='binding')
router_attrs = aliased(l3_attrs_db.RouterExtraAttributes,
name='router_attrs')
query = context.session.query(dest_alias.floating_ip_address,
next_hop_alias.ip_address)
query = query.join(
next_hop_alias,
next_hop_alias.network_id == dest_alias.floating_network_id)
query = query.join(l3_db.Router,
dest_alias.router_id == l3_db.Router.id)
query = query.filter(
l3_db.Router.id == router_id,
dest_alias.router_id == l3_db.Router.id,
l3_db.Router.id == router_attrs.router_id,
router_attrs.distributed == sa.sql.false(),
l3_db.Router.gw_port_id == next_hop_alias.port_id,
next_hop_alias.subnet_id == models_v2.Subnet.id,
models_v2.Subnet.ip_version == 4,
binding_alias.network_id == models_v2.Subnet.network_id,
binding_alias.bgp_speaker_id == bgp_speaker_id,
binding_alias.ip_version == 4,
BgpSpeaker.advertise_floating_ip_host_routes == sa.sql.true())
query = query.outerjoin(router_attrs,
l3_db.Router.id == router_attrs.router_id)
query = query.filter(router_attrs.distributed != sa.sql.true())
return self._host_route_list_from_tuples(query.all())
开发者ID:MODITDC,项目名称:neutron,代码行数:35,代码来源:bgp_db.py
示例17: get_objects_within_object
def get_objects_within_object( req ):
rospy.loginfo( "SEMAP DB SRVs: get_objects_within_object" )
res = GetObjectsWithinObjectResponse()
ref_obj = aliased( ObjectInstance )
ref_geo = aliased( GeometryModel )
tar_obj = aliased( ObjectInstance )
tar_geo = aliased( GeometryModel )
if req.target_object_types:
tar_ids = any_obj_types_ids(tar_obj, req.target_object_types)
else:
tar_ids = any_obj_ids(tar_obj)
if req.fully_within:
operator = SFCGAL_Contains3D(ref_geo.geometry, tar_geo.geometry)
else:
operator = or_( SFCGAL_Contains3D(ref_geo.geometry, tar_geo.geometry),
SFCGAL_Intersects3D(ref_geo.geometry, tar_geo.geometry) )
ids = db().query( tar_obj.id ).\
filter( ref_obj.id == req.reference_object_id, ref_obj.absolute_description_id == ref_geo.abstraction_desc, ref_geo.type == "BoundingBox",
tar_obj.id.in_( tar_ids ), tar_obj.absolute_description_id == tar_geo.abstraction_desc, tar_geo.type == "BoundingBox",
operator ).all()
res.target_object_ids = [id for id, in ids]
return res
开发者ID:hdeeken,项目名称:semap_ros,代码行数:28,代码来源:spatial_relations.py
示例18: _tenant_networks_by_network_query
def _tenant_networks_by_network_query(self, context,
network_id, bgp_speaker_id):
"""Return subquery for tenant networks by binding network ID"""
address_scope = aliased(address_scope_db.AddressScope,
name='address_scope')
router_attrs = aliased(l3_attrs_db.RouterExtraAttributes,
name='router_attrs')
tenant_networks_query = context.session.query(
l3_db.RouterPort.router_id,
models_v2.Subnet.cidr,
models_v2.Subnet.ip_version,
address_scope.id)
tenant_networks_query = tenant_networks_query.filter(
l3_db.RouterPort.port_type != lib_consts.DEVICE_OWNER_ROUTER_GW,
l3_db.RouterPort.port_type != lib_consts.DEVICE_OWNER_ROUTER_SNAT,
l3_db.RouterPort.router_id == router_attrs.router_id,
models_v2.IPAllocation.port_id == l3_db.RouterPort.port_id,
models_v2.IPAllocation.subnet_id == models_v2.Subnet.id,
models_v2.Subnet.network_id != network_id,
models_v2.Subnet.subnetpool_id == models_v2.SubnetPool.id,
models_v2.SubnetPool.address_scope_id == address_scope.id,
BgpSpeaker.id == bgp_speaker_id,
BgpSpeaker.ip_version == address_scope.ip_version,
models_v2.Subnet.ip_version == address_scope.ip_version)
return tenant_networks_query
开发者ID:MODITDC,项目名称:neutron,代码行数:25,代码来源:bgp_db.py
示例19: get_containment_pairs
def get_containment_pairs( req ):
rospy.loginfo( "SEMAP DB SRVs: get_containment_pairs" )
res = GetObjectLocation()
obj1 = aliased( ObjectInstance )
geo1 = aliased( GeometryModel )
obj2 = aliased( ObjectInstance )
geo2 = aliased( GeometryModel )
if req.reference_object_types:
obj1_ids = any_obj_types_ids(obj1, req.refrence_object_types)
else:
obj1_ids = any_obj_ids(obj1)
if req.target_object_types:
obj2_ids = any_obj_types_ids(obj2, req.target_object_types)
else:
obj2_ids = any_obj_ids(obj2)
ids = db().query( obj1.id, obj2.id ).\
filter(obj1.id.in_( obj1_ids ), obj1.absolute_description_id == geo1.abstraction_desc, geo1.type == "BoundingBox", \
obj2.id.in_( obj2_ids ), obj2.absolute_description_id == geo2.abstraction_desc, geo2.type == "BoundingBox", \
SFCGAL_Contains3D( geo1.geometry, geo2.geometry)
).all()
for i1, i2 in ids:
pair = ObjectPair()
pair.reference_id = i1
pair.relations.append("contains")
pair.target_id = i2
res.target_object_ids = [id for id, in ids]
return res
开发者ID:hdeeken,项目名称:semap_ros,代码行数:35,代码来源:spatial_relations.py
示例20: export_aliases
def export_aliases(project, fh):
""" Dump a list of all entity names to a CSV file. The table will contain the
active name of each entity, and one of the other existing names as an alias. """
writer = DictWriter(fh, ['entity_id', 'alias', 'canonical'])
writer.writeheader()
alias = aliased(EntityProperty)
canonical = aliased(EntityProperty)
q = db.session.query(alias.value_string.label('alias'), alias.entity_id)
q = q.join(Entity)
q = q.join(canonical)
q = q.filter(Entity.project_id==project.id)
q = q.filter(alias.entity_id!=None)
q = q.filter(alias.name=='name')
q = q.filter(canonical.name=='name')
q = q.filter(canonical.active==True)
q = q.add_columns(canonical.value_string.label('canonical'))
for row in q.all():
#if row.alias == row.canonical:
# continue
writer.writerow({
'entity_id': str(row.entity_id),
'alias': row.alias,
'canonical': row.canonical
})
开发者ID:ahurriyetoglu,项目名称:grano,代码行数:26,代码来源:aliases.py
注:本文中的sqlalchemy.orm.aliased函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论