本文整理汇总了Python中sqlalchemy.sql.join函数的典型用法代码示例。如果您正苦于以下问题:Python join函数的具体用法?Python join怎么用?Python join使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了join函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _search_for_join
def _search_for_join(mapper, table):
"""find a join between the given mapper's mapped table and the given table.
will try the mapper's local table first for more specificity, then if not
found will try the more general mapped table, which in the case of inheritance
is a join."""
try:
return sql.join(mapper.local_table, table)
except exceptions.ArgumentError, e:
return sql.join(mapper.mapped_table, table)
开发者ID:Eubolist,项目名称:ankimini,代码行数:9,代码来源:properties.py
示例2: upgrade
def upgrade():
op.add_column('request',
sa.Column('payout', sa.Numeric(precision=15, scale=2), index=True,
nullable=True))
bind = op.get_bind()
absolute = select([abs_table.c.value.label('value'),
mod_table.c.request_id.label('request_id')])\
.select_from(join(abs_table, mod_table,
mod_table.c.id == abs_table.c.id))\
.where(mod_table.c.voided_user_id == None)\
.alias()
relative = select([rel_table.c.value.label('value'),
mod_table.c.request_id.label('request_id')])\
.select_from(join(rel_table, mod_table,
mod_table.c.id == rel_table.c.id))\
.where(mod_table.c.voided_user_id == None)\
.alias()
abs_sum = select([request.c.id.label('request_id'),
request.c.base_payout.label('base_payout'),
func.sum(absolute.c.value).label('sum')])\
.select_from(outerjoin(request, absolute,
request.c.id == absolute.c.request_id))\
.group_by(request.c.id)\
.alias()
rel_sum = select([request.c.id.label('request_id'),
func.sum(relative.c.value).label('sum')])\
.select_from(outerjoin(request, relative,
request.c.id == relative.c.request_id))\
.group_by(request.c.id)\
.alias()
total_sum = select([abs_sum.c.request_id.label('request_id'),
((
abs_sum.c.base_payout +
case([(abs_sum.c.sum == None, Decimal(0))],
else_=abs_sum.c.sum)) *
(
1 +
case([(rel_sum.c.sum == None, Decimal(0))],
else_=rel_sum.c.sum))).label('payout')])\
.select_from(join(abs_sum, rel_sum,
abs_sum.c.request_id == rel_sum.c.request_id))
payouts = bind.execute(total_sum)
for request_id, payout in payouts:
up = update(request).where(request.c.id == request_id).values(
payout=payout)
bind.execute(up)
op.alter_column('request', 'payout', nullable=False,
existing_type=sa.Numeric(precision=15, scale=2))
开发者ID:Acidity,项目名称:evesrp,代码行数:49,代码来源:3e5e1d3a02c_denormalize_request_payout.py
示例3: _determine_joins
def _determine_joins(self):
if self.secondaryjoin is not None and self.secondary is None:
raise exceptions.ArgumentError("Property '" + self.key + "' specified with secondary join condition but no secondary argument")
# if join conditions were not specified, figure them out based on foreign keys
try:
if self.secondary is not None:
if self.secondaryjoin is None:
self.secondaryjoin = sql.join(self.mapper.unjoined_table, self.secondary).onclause
if self.primaryjoin is None:
self.primaryjoin = sql.join(self.parent.unjoined_table, self.secondary).onclause
else:
if self.primaryjoin is None:
self.primaryjoin = sql.join(self.parent.unjoined_table, self.target).onclause
except exceptions.ArgumentError, e:
raise exceptions.ArgumentError("Error determining primary and/or secondary join for relationship '%s'. If the underlying error cannot be corrected, you should specify the 'primaryjoin' (and 'secondaryjoin', if there is an association table present) keyword arguments to the relation() function (or for backrefs, by specifying the backref using the backref() function with keyword arguments) to explicitly specify the join conditions. Nested error is \"%s\"" % (str(self), str(e)))
开发者ID:nakedible,项目名称:vpnease-l2tp,代码行数:15,代码来源:properties.py
示例4: __init__
def __init__(self, left, right, onclause=None, isouter=False):
if _is_mapped_class(left) or _is_mapped_class(right):
if hasattr(left, '_orm_mappers'):
left_mapper = left._orm_mappers[1]
adapt_from = left.right
else:
left_mapper = _class_to_mapper(left)
if _is_aliased_class(left):
adapt_from = left.alias
else:
adapt_from = None
right_mapper = _class_to_mapper(right)
self._orm_mappers = (left_mapper, right_mapper)
if isinstance(onclause, basestring):
prop = left_mapper.get_property(onclause)
if _is_aliased_class(right):
adapt_to = right.alias
else:
adapt_to = None
pj, sj, source, dest, target_adapter = prop._create_joins(source_selectable=adapt_from, dest_selectable=adapt_to, source_polymorphic=True, dest_polymorphic=True)
if sj:
left = sql.join(left, prop.secondary, onclause=pj)
onclause = sj
else:
onclause = pj
expression.Join.__init__(self, left, right, onclause, isouter)
开发者ID:Eubolist,项目名称:ankimini,代码行数:31,代码来源:util.py
示例5: query
def query(self):
pq = qualstat_getstatdata(column("eval_type") == "f")
base = alias(pq)
query = (select([
func.array_agg(column("queryid")).label("queryids"),
"qualid",
cast(column("quals"), JSONB).label('quals'),
"occurences",
"execution_count",
func.array_agg(column("query")).label("queries"),
"avg_filter",
"filter_ratio"
]).select_from(
join(base, powa_databases,
onclause=(
powa_databases.c.oid == literal_column("dbid"))))
.where(powa_databases.c.datname == bindparam("database"))
.where(column("avg_filter") > 1000)
.where(column("filter_ratio") > 0.3)
.group_by(column("qualid"), column("execution_count"),
column("occurences"),
cast(column("quals"), JSONB),
column("avg_filter"), column("filter_ratio"))
.order_by(column("occurences").desc())
.limit(200))
return query
开发者ID:girgen,项目名称:powa-web,代码行数:26,代码来源:wizard.py
示例6: messages_in_narrow_backend
def messages_in_narrow_backend(request, user_profile,
msg_ids = REQ(validator=check_list(check_int)),
narrow = REQ(converter=narrow_parameter)):
# type: (HttpRequest, UserProfile, List[int], List[Dict[str, Any]]) -> HttpResponse
# Note that this function will only work on messages the user
# actually received
# TODO: We assume that the narrow is a search. For now this works because
# the browser only ever calls this function for searches, since it can't
# apply that narrow operator itself.
query = select([column("message_id"), column("subject"), column("rendered_content")],
and_(column("user_profile_id") == literal(user_profile.id),
column("message_id").in_(msg_ids)),
join(table("zerver_usermessage"), table("zerver_message"),
literal_column("zerver_usermessage.message_id") ==
literal_column("zerver_message.id")))
builder = NarrowBuilder(user_profile, column("message_id"))
for term in narrow:
query = builder.add_term(query, term)
sa_conn = get_sqlalchemy_connection()
query_result = list(sa_conn.execute(query).fetchall())
search_fields = dict()
for row in query_result:
(message_id, subject, rendered_content, content_matches, subject_matches) = row
search_fields[message_id] = get_search_fields(rendered_content, subject,
content_matches, subject_matches)
return json_success({"messages": search_fields})
开发者ID:souravbadami,项目名称:zulip,代码行数:33,代码来源:messages.py
示例7: get_range_data
def get_range_data(offset_, size_):
tbl_main = alias(tbl_def, 't')
join_condition = []
pk_names_desc = [name+" DESC" for name in pk_names]
sub_q = select(pk_cols).order_by(", ".join(pk_names_desc)).offset(offset_).limit(1).alias()
for pk_name in pk_names:
item = (tbl_main.c[pk_name] <= sub_q.c[pk_name])
join_condition.append(item)
if len(join_condition) > 1:
j = join(tbl_main, sub_q, and_(*join_condition))
else:
j = join(tbl_main, sub_q, join_condition[0])
return select([tbl_main]).select_from(j).order_by(", ".join(pk_names_desc)).limit(size_)
开发者ID:fengshao0907,项目名称:ElasticSphinx,代码行数:17,代码来源:db_sync.py
示例8: team_score_list
def team_score_list():
teams = Team.query.filter(Team.role.in_([Team.BLUE, Team.RED]))
scoring_teams = []
for team in teams:
temp = db.session.query(
functions.sum(CheckResult.success * ServiceCheck.value),
functions.sum(ServiceCheck.value)) \
.select_from(
join(CheckResult,
join(ServiceCheck, Service, ServiceCheck.service_id == Service.id),
CheckResult.check_id == ServiceCheck.id))
services = temp.filter(Service.team_id == team.id).first()
earned = 0
maximum = 0
if services[0]:
earned = services[0]
maximum = services[1]
flag_subquery = db.session.\
query(functions.count(FlagDiscovery.id).label('solve_count'), Flag.value).\
select_from(join(Flag, FlagDiscovery, Flag.id == FlagDiscovery.flag_id)).\
filter(Flag.team_id == team.id).\
group_by(Flag.id).\
subquery('flag_subquery')
flags = db.session \
.query(functions.sum(flag_subquery.c.solve_count * flag_subquery.c.value)).\
first()
flags = flags[0] if flags[0] else 0
injects = score_injects(team)
team.scores = {
'services_earned': earned,
'services_maximum': maximum,
'injects_earned': injects,
'flags_lost': flags
}
scoring_teams.append(team)
return render_scoring_page('scoring/index.html', teams=scoring_teams)
开发者ID:grdaneault,项目名称:service-scoring-engine,代码行数:44,代码来源:controllers.py
示例9: test_mapify_with_table_object_join
def test_mapify_with_table_object_join(self):
t1 = Table('test_baph_mapify', self.orm.metadata, useexisting=True)
t2 = Table('test_baph_mapify_join', self.orm.metadata, autoload=True,
useexisting=True)
tjoin = join(t1, t2)
JoinObj = Mapify(self.orm, tjoin)(MapifiableClass)
self.assertHasAttr(JoinObj, '__table__')
self.assertHasAttr(JoinObj, 'id')
self.assertHasAttr(JoinObj, 'string')
self.assertHasAttr(JoinObj, 'number_with_decimal_point')
self.assertHasAttr(JoinObj, 'other_string')
开发者ID:alvaromartin,项目名称:baph,代码行数:11,代码来源:test_orm.py
示例10: _getPostInfo
def _getPostInfo(self, ctx, row):
post_info = {
'type': row['post_type'],
'slug': row['post_name'],
'datetime': row['post_date'],
'title': row['post_title'],
'status': row['post_status'],
'post_id': row['ID'],
'post_guid': row['guid'],
'content': row['post_content'],
'excerpt': row['post_excerpt']}
res = ctx.conn.execute(
select([ctx.users])
.where(ctx.users.c.ID == row['post_author'])).fetchone()
if res:
post_info['author'] = res['user_login']
else:
logger.warning("No author on %s" % row['post_name'])
post_info['author'] = ''
# TODO: This is super slow. Gotta cache this thing somehow.
res = ctx.conn.execute(
join(ctx.term_relationships,
join(ctx.term_taxonomy, ctx.terms))
.select(ctx.term_relationships.c.object_id == row['ID']))
categories = []
for r in res:
if r['taxonomy'] != 'category':
logger.debug("Skipping taxonomy '%s' on: %s" %
(r['taxonomy'], row['post_name']))
continue
categories.append(r['slug'])
post_info['categories'] = categories
metadata = {}
post_info['metadata'] = metadata
return post_info
开发者ID:ludovicchabant,项目名称:PieCrust-WordpressSql,代码行数:39,代码来源:piecrust_wordpresssql.py
示例11: select_media
def select_media(hashtag):
j = join(TwitterMedia, TwitterHashtag, TwitterMedia.tweet_id == TwitterHashtag.tweet_id)
q = select([TwitterMedia.media_url, TwitterMedia.tweet_id]).where(TwitterHashtag.hashtag == hashtag).where(TwitterMedia.enabled == True).select_from(j)
log.debug(q)
result = []
for r in connection.execute(q).fetchall():
result.append(r['media_url'])
return result
开发者ID:ikeda-mk,项目名称:happyday2016,代码行数:13,代码来源:disable_404_media.py
示例12: sflvault_service_put
def sflvault_service_put(self, authtok, service_id, data):
# 'user_id' required in session.
# TODO: verify I had access to the service previously.
req = sql.join(servicegroups_table, usergroups_table,
ServiceGroup.group_id==UserGroup.group_id) \
.join(users_table, User.id==UserGroup.user_id) \
.select() \
.where(User.id==self.sess['user_id']) \
.where(ServiceGroup.service_id==service_id)
res = list(meta.Session.execute(req))
if not res:
return vaultMsg(False, "You don't have access to that service.")
else:
return self.vault.service_put(service_id, data)
开发者ID:baloo,项目名称:sflvault-server,代码行数:14,代码来源:xmlrpc.py
示例13: __init__
def __init__(self, left, right, onclause=None,
isouter=False, join_to_left=True):
adapt_from = None
if hasattr(left, '_orm_mappers'):
left_mapper = left._orm_mappers[1]
if join_to_left:
adapt_from = left.right
else:
left_mapper, left, left_is_aliased = _entity_info(left)
if join_to_left and (left_is_aliased or not left_mapper):
adapt_from = left
right_mapper, right, right_is_aliased = _entity_info(right)
if right_is_aliased:
adapt_to = right
else:
adapt_to = None
if left_mapper or right_mapper:
self._orm_mappers = (left_mapper, right_mapper)
if isinstance(onclause, basestring):
prop = left_mapper.get_property(onclause)
elif isinstance(onclause, attributes.QueryableAttribute):
if adapt_from is None:
adapt_from = onclause.__clause_element__()
prop = onclause.property
elif isinstance(onclause, MapperProperty):
prop = onclause
else:
prop = None
if prop:
pj, sj, source, dest, \
secondary, target_adapter = prop._create_joins(
source_selectable=adapt_from,
dest_selectable=adapt_to,
source_polymorphic=True,
dest_polymorphic=True,
of_type=right_mapper)
if sj is not None:
left = sql.join(left, secondary, pj, isouter)
onclause = sj
else:
onclause = pj
self._target_adapter = target_adapter
expression.Join.__init__(self, left, right, onclause, isouter)
开发者ID:AntonNguyen,项目名称:easy_api,代码行数:50,代码来源:util.py
示例14: by_is
def by_is(self, query, operand, maybe_negate):
if operand == 'private':
query = query.select_from(join(query.froms[0], "zerver_recipient",
column("recipient_id") ==
literal_column("zerver_recipient.id")))
cond = or_(column("type") == Recipient.PERSONAL,
column("type") == Recipient.HUDDLE)
return query.where(maybe_negate(cond))
elif operand == 'starred':
cond = column("flags").op("&")(UserMessage.flags.starred.mask) != 0
return query.where(maybe_negate(cond))
elif operand == 'mentioned' or operand == 'alerted':
cond = column("flags").op("&")(UserMessage.flags.mentioned.mask) != 0
return query.where(maybe_negate(cond))
raise BadNarrowOperator("unknown 'is' operand " + operand)
开发者ID:danshev,项目名称:zulip,代码行数:15,代码来源:messages.py
示例15: get_album_query
def get_album_query(self):
from masterapp.config.schema import dbfields
# Number of songs available on this album subquery
havesongs = Session.query(Album.id.label('albumid'),
func.count(Song.id).label('Album_havesongs'),
func.sum(Song.length).label('Album_length')
).join(Album.songs, SongOwner).filter(SongOwner.uid == self.id)
havesongs = havesongs.group_by(Album.id).subquery()
query = Session.query(SongOwner.uid.label('Friend_id'), havesongs.c.Album_havesongs,
havesongs.c.Album_length, User._name.label('Friend_name'),
*dbfields['album'])
joined = join(Album, havesongs, Album.id == havesongs.c.albumid)
query = query.select_from(joined)
query = query.join(Album.artist).reset_joinpoint()
query = query.join(Album.songs, SongOwner, SongOwner.user).filter(SongOwner.uid == self.id)
query = query.group_by(Album)
return query
开发者ID:JustinTulloss,项目名称:harmonize.fm,代码行数:19,代码来源:user.py
示例16: _filter_by_event
def _filter_by_event(self, first_name, last_name, year, from_dt, to_dt):
"""
filter by event table
:param first_name: batter first name
:param last_name: batter last name
:param year: season year
:param from_dt: from date
:param to_dt: to date
:return: count
"""
batter = self.get_player_data_one(year, first_name, last_name)
return self.session.query(Event).select_from(join(Game, Event, Game.GAME_ID == Event.GAME_ID)).\
filter(Event.BAT_ID == batter[rosters.c.PLAYER_ID.name]).\
filter(
Game.GAME_DT.between(
self.QUERY_DATE_FORMAT.format(year=year, dt=from_dt),
self.QUERY_DATE_FORMAT.format(year=year, dt=to_dt)
)
)
开发者ID:Shinichi-Nakagawa,项目名称:hatteberg,代码行数:19,代码来源:retrosheet_controller.py
示例17: machine_list
def machine_list(self, customer_id=None):
"""Return a simple list of the machines"""
sel = sql.join(customers_table, machines_table) \
.select(use_labels=True) \
.order_by(Customer.id)
# Filter also..
if customer_id:
sel = sel.where(Customer.id==customer_id)
lst = meta.Session.execute(sel)
out = [{'id': x.machines_id, 'name': x.machines_name,
'fqdn': x.machines_fqdn, 'ip': x.machines_ip,
'location': x.machines_location, 'notes': x.machines_notes,
'customer_id': x.customers_id,
'customer_name': x.customers_name}
for x in lst]
return vaultMsg(True, "Here is the machines list", {'list': out})
开发者ID:hfeeki,项目名称:sflvault,代码行数:20,代码来源:vault.py
示例18: outer_with_filter
def outer_with_filter(query, alias, relation, filter_clause):
left = relation.prop.parent
left_info = inspection.inspect(left)
right_info = inspection.inspect(alias)
adapt_to = right_info.selectable
adapt_from = left_info.selectable
pj, sj, source, dest, \
secondary, target_adapter = relation.prop._create_joins(
source_selectable=adapt_from,
dest_selectable=adapt_to,
source_polymorphic=True,
dest_polymorphic=True,
of_type=right_info.mapper)
if sj is not None:
# note this is an inner join from secondary->right
right = sql.join(secondary, alias, sj)
else:
right = alias
onclause = and_(_add_alias(pj, relation, alias), filter_clause)
return query.outerjoin(right, onclause)
开发者ID:kolypto,项目名称:py-mongosql,代码行数:20,代码来源:utils.py
示例19: setUp
def setUp(self):
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import mapper
from sqlalchemy.sql import join
from sqlalchemy import Table, Column, Integer, String, ForeignKey
self.session = tws.transactional_session()
Base = declarative_base(metadata=sa.MetaData('sqlite:///:memory:'))
Base.query = self.session.query_property()
users_table = Table('users', Base.metadata,
Column('user_id', Integer, primary_key=True),
Column('name', String(40)),
)
addresses_table = Table('addresses', Base.metadata,
Column('address_id', Integer, primary_key=True),
Column('user_id', Integer, ForeignKey('users.user_id')),
Column('place', String(40)),
)
class DBTestCls1(object):
pass
j = join(users_table, addresses_table)
mapper(DBTestCls1, j, properties={
'user_id': [users_table.c.user_id, addresses_table.c.user_id]
})
Base.metadata.create_all()
self.DBTestCls1 = DBTestCls1
transaction.commit()
testapi.setup()
开发者ID:LeResKP,项目名称:tw2.sqla,代码行数:37,代码来源:test_exception.py
示例20: _check_capacity_exceeded
def _check_capacity_exceeded(conn, allocs):
"""Checks to see if the supplied allocation records would result in any of
the inventories involved having their capacity exceeded.
Raises an InvalidAllocationCapacityExceeded exception if any inventory
would be exhausted by the allocation. Raises an
InvalidAllocationConstraintsViolated exception if any of the `step_size`,
`min_unit` or `max_unit` constraints in an inventory will be violated
by any one of the allocations.
If no inventories would be exceeded or violated by the allocations, the
function returns a list of `ResourceProvider` objects that contain the
generation at the time of the check.
:param conn: SQLalchemy Connection object to use
:param allocs: List of `Allocation` objects to check
"""
# The SQL generated below looks like this:
# SELECT
# rp.id,
# rp.uuid,
# rp.generation,
# inv.resource_class_id,
# inv.total,
# inv.reserved,
# inv.allocation_ratio,
# allocs.used
# FROM resource_providers AS rp
# JOIN inventories AS i1
# ON rp.id = i1.resource_provider_id
# LEFT JOIN (
# SELECT resource_provider_id, resource_class_id, SUM(used) AS used
# FROM allocations
# WHERE resource_class_id IN ($RESOURCE_CLASSES)
# GROUP BY resource_provider_id, resource_class_id
# ) AS allocs
# ON inv.resource_provider_id = allocs.resource_provider_id
# AND inv.resource_class_id = allocs.resource_class_id
# WHERE rp.uuid IN ($RESOURCE_PROVIDERS)
# AND inv.resource_class_id IN ($RESOURCE_CLASSES)
#
# We then take the results of the above and determine if any of the
# inventory will have its capacity exceeded.
rc_ids = set([_RC_CACHE.id_from_string(a.resource_class)
for a in allocs])
provider_uuids = set([a.resource_provider.uuid for a in allocs])
usage = sa.select([_ALLOC_TBL.c.resource_provider_id,
_ALLOC_TBL.c.consumer_id,
_ALLOC_TBL.c.resource_class_id,
sql.func.sum(_ALLOC_TBL.c.used).label('used')])
usage = usage.where(_ALLOC_TBL.c.resource_class_id.in_(rc_ids))
usage = usage.group_by(_ALLOC_TBL.c.resource_provider_id,
_ALLOC_TBL.c.resource_class_id)
usage = sa.alias(usage, name='usage')
inv_join = sql.join(_RP_TBL, _INV_TBL,
sql.and_(_RP_TBL.c.id == _INV_TBL.c.resource_provider_id,
_INV_TBL.c.resource_class_id.in_(rc_ids)))
primary_join = sql.outerjoin(inv_join, usage,
sql.and_(
_INV_TBL.c.resource_provider_id == usage.c.resource_provider_id,
_INV_TBL.c.resource_class_id == usage.c.resource_class_id)
)
cols_in_output = [
_RP_TBL.c.id.label('resource_provider_id'),
_RP_TBL.c.uuid,
_RP_TBL.c.generation,
_INV_TBL.c.resource_class_id,
_INV_TBL.c.total,
_INV_TBL.c.reserved,
_INV_TBL.c.allocation_ratio,
_INV_TBL.c.min_unit,
_INV_TBL.c.max_unit,
_INV_TBL.c.step_size,
usage.c.used,
]
sel = sa.select(cols_in_output).select_from(primary_join)
sel = sel.where(
sa.and_(_RP_TBL.c.uuid.in_(provider_uuids),
_INV_TBL.c.resource_class_id.in_(rc_ids)))
records = conn.execute(sel)
# Create a map keyed by (rp_uuid, res_class) for the records in the DB
usage_map = {}
provs_with_inv = set()
for record in records:
map_key = (record['uuid'], record['resource_class_id'])
if map_key in usage_map:
raise KeyError("%s already in usage_map, bad query" % str(map_key))
usage_map[map_key] = record
provs_with_inv.add(record["uuid"])
# Ensure that all providers have existing inventory
missing_provs = provider_uuids - provs_with_inv
if missing_provs:
class_str = ', '.join([_RC_CACHE.string_from_id(rc_id)
for rc_id in rc_ids])
provider_str = ', '.join(missing_provs)
raise exception.InvalidInventory(resource_class=class_str,
resource_provider=provider_str)
#.........这里部分代码省略.........
开发者ID:pshchelo,项目名称:nova,代码行数:101,代码来源:resource_provider.py
注:本文中的sqlalchemy.sql.join函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论