本文整理汇总了Python中sqlalchemy.sql.select函数的典型用法代码示例。如果您正苦于以下问题:Python select函数的具体用法?Python select怎么用?Python select使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了select函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: extractData
def extractData(self):
self.logger.info('Connecting to database')
engine = create_engine(self.config['database'])
meta = MetaData()
meta.bind = engine
self.job = Table('job', meta, autoload=True)
self.job_status = Table('job_status', meta, autoload=True)
self.job_status_type_description = Table('job_status_type_description', meta, autoload=True)
job=self.job
job_status=self.job_status
s=self.job_status_type_description.select()
self.type_db_values = s.execute().fetchall()
self.queue_db_values = self.getJobSummary(job.c.queue)
self.user_db_values = self.getJobSummary(job.c.userId, job.c.localUser)
self.node_db_values = self.getJobSummary(job.c.workerNode)
self.logger.info('Generating job list')
node_job_status = self.config['node_job_status'].split(',')
maximum = func.max(self.job_status.c.id).label('m')
s1 = select([maximum]).group_by(job_status.c.jobId).alias('a')
s2 = select([job.c.id, job.c.lrmsAbsLayerJobId, job.c.workerNode, job_status.c.type,
job_status.c.time_stamp]).select_from(job.join(job_status).join(s1,job_status.c.id==text('m'))). \
where(and_(job_status.c.type.in_(node_job_status)))
self.job_db_values = s2.execute().fetchall()
return {}
开发者ID:HappyFaceMonitoring,项目名称:HappyFaceModules,代码行数:31,代码来源:CREAMCE.py
示例2: test_sql_create_extra_fields
def test_sql_create_extra_fields(self):
"""
Test if `SqlStore` creates extra columns.
"""
# extend the db
self._make_store(self.db_url,
extra_fields={sqlalchemy.Column('extra', sqlalchemy.VARCHAR(length=128)): (lambda arg: arg.foo.value)})
# if this query does not error out, the column is defined
q = sql.select([sqlfunc.count(self.store.t_store.c.extra)]).distinct()
results = self.conn.execute(q)
# self.c.execute("select distinct count(extra) from %s" % self.store.table_name)
rows = results.fetchall()
assert_equal(len(rows), 1)
# create and save an object
obj = SimplePersistableObject('an object')
obj.foo = SimplePersistableObject('an attribute')
id_ = db.save(obj)
# check that the value has been saved
q = sql.select([self.store.t_store.c.extra]).where(self.store.t_store.c.id == id_)
# self.c.execute("select extra from %s where id=%d"
# % (self.store.table_name, id_))
results = self.conn.execute(q)
rows = results.fetchall()
assert_equal(len(rows), 1)
assert_equal(rows[0][0], obj.foo.value)
开发者ID:bringhurst,项目名称:gc3pie,代码行数:29,代码来源:test_persistence.py
示例3: test_all_aliases
def test_all_aliases(self):
orders = table('order', column('order'))
s = select([orders.c.order]).cte("regional_sales")
r1 = s.alias()
r2 = s.alias()
s2 = select([r1, r2]).where(r1.c.order > r2.c.order)
self.assert_compile(
s2,
'WITH regional_sales AS (SELECT "order"."order" '
'AS "order" FROM "order") '
'SELECT anon_1."order", anon_2."order" '
'FROM regional_sales AS anon_1, '
'regional_sales AS anon_2 WHERE anon_1."order" > anon_2."order"'
)
s3 = select([orders]).select_from(orders.join(r1, r1.c.order == orders.c.order))
self.assert_compile(
s3,
'WITH regional_sales AS '
'(SELECT "order"."order" AS "order" '
'FROM "order")'
' SELECT "order"."order" '
'FROM "order" JOIN regional_sales AS anon_1 ON anon_1."order" = "order"."order"'
)
开发者ID:biner,项目名称:sqlalchemy,代码行数:28,代码来源:test_cte.py
示例4: create_type
def create_type(sess, fqname):
u'''
クラスなり型なりを追加
'''
t = tables.Type.__table__
query = sql.select([t.c.id], t.c.fqname == fqname, t)
result = sess.execute(query).fetchall()
if result:
return result[0][0]
name = fqname.split('.')[-1]
typ = tables.Type(name=name, fqname=fqname)
with sess.begin():
sess.add(typ)
query = sql.select([t.c.id], t.c.fqname == fqname, t)
result = sess.execute(query).fetchone()[0]
return result
开发者ID:shomah4a,项目名称:sakuya,代码行数:26,代码来源:functions.py
示例5: create_bought_product
def create_bought_product(self, qty, user_uuid, product_uuid):
"""
Create new bought item given product_uuid and user_uuid
of product or user with given uuid doesnt exists returns None
Returns primary_key that has been inserted
Keyword Arguments:
qty -- amount of items bought (int),
user_uuid -- unique user uuid (str),
product_uuid -- unique product uuid (str)
"""
try:
user_id = self.conn.execute(select([users.c.user_id])\
.where(users.c.user_uuid == user_uuid)).scalar()
product_id = self.conn.execute(select([products.c.product_id])\
.where(products.c.product_uuid == product_uuid)).scalar()
except:
raise
if product_id and user_id:
ins = bought_products.insert()\
.values(quantity = qty, user_id = user_id, product_id = product_id)
trans = self.conn.begin()
try:
res = self.conn.execute(ins)
trans.commit()
return res.inserted_primary_key[0]
except Exception as e:
trans.rollback()
raise
logging.error(sys.exc_info[0])
else:
return
开发者ID:archdark,项目名称:consumption,代码行数:34,代码来源:db_base.py
示例6: grant_permission_for_group
def grant_permission_for_group(permission_name, group_name):
execute(
groups_permissions.insert().values(
group_id=select([groups.c.group_id]).where(groups.c.group_name == group_name),
permission_id=select([permissions.c.permission_id]).where(permissions.c.permission_name == permission_name)
)
)
开发者ID:knyar,项目名称:mediadrop,代码行数:7,代码来源:006-30bb0d88d139-add_view_permission.py
示例7: _get_trace_id
def _get_trace_id(self, trace):
# Get the hash.
trace_hash = self.get_hash(trace)
# Check the database.
stmt = select([traces_table.c.id]).where(
traces_table.c.trace_hash == trace_hash
)
row = self._execute(stmt).first()
if row:
return row.id
# Attempt to insert a new trace.
compressed = zlib.compress(str(trace))
stmt = traces_table.insert().from_select([
traces_table.c.trace_hash,
traces_table.c.data,
], select([
literal(trace_hash),
literal(compressed, type_=Binary),
]).where(
~exists([traces_table.c.id]).where(
traces_table.c.trace_hash == trace_hash
)
))
self._execute(stmt)
return self._get_trace_id(trace)
开发者ID:hoangt,项目名称:ms3,代码行数:29,代码来源:sql.py
示例8: tags
def tags(metadata):
conn = metadata.bind.connect()
session = DBSession()
taggings = metadata.tables["taggings"]
tags = metadata.tables["tags"]
tag_result = conn.execute(select([tags]))
for row in tag_result:
tag = Tag(id=row["id"], name=row["name"])
session.add(tag)
session.flush()
result = conn.execute(select([taggings]))
for row in result:
# get type
tag_type = row["taggable_type"]
if tag_type == "Group":
Model = Group
elif tag_type == "Person":
Model = Profile
elif tag_type == "Company":
Model = Company
# get tag id
tag = Tag.query.get(row["tag_id"])
obj = Model.query.get(row["taggable_id"])
if obj:
obj.tags.append(tag)
# get taggable id
session.flush()
开发者ID:binarydud,项目名称:ppl,代码行数:28,代码来源:migrate.py
示例9: get_tx_addresses
def get_tx_addresses(tx=None):
in_addresses = []
out_addresses = []
if tx['removed']==True:
in_addresses = ALL_VOUT.query.with_entities(ALL_VOUT.address, ALL_VOUT.value, ALL_VOUT.txin_tx_id, ALL_VOUT.txout_tx_hash).filter(ALL_VOUT.txin_tx_id==int(tx['id'])).order_by(ALL_VOUT.in_idx).all()
out_addresses = ALL_VOUT.query.with_entities(ALL_VOUT.address, ALL_VOUT.value, ALL_VOUT.txin_tx_id, ALL_VOUT.txin_tx_hash).filter(ALL_VOUT.txout_tx_id==int(tx['id'])).order_by(ALL_VOUT.out_idx).all()
return in_addresses , out_addresses
s1 = select([STXO.address, STXO.value, STXO.txin_tx_id, STXO.txout_tx_hash, STXO.in_idx]).where(STXO.txin_tx_id == int(tx['id']))
s2 = select([VTXO.address, VTXO.value, VTXO.txin_tx_id, VTXO.txout_tx_hash, VTXO.in_idx]).where(VTXO.txin_tx_id == int(tx['id']))
q = s1.union(s2).alias('in_addresses')
in_addresses=db_session.query(q).order_by('in_idx').all()
s1 = select([STXO.address, STXO.value, STXO.txin_tx_id, STXO.txout_tx_hash, STXO.out_idx]).where(STXO.txout_tx_id == tx['id'])
s2 = select([VTXO.address, VTXO.value, VTXO.txin_tx_id, VTXO.txout_tx_hash, VTXO.out_idx]).where(VTXO.txout_tx_id == tx['id'])
q = s1.union(s2).alias('out_addresses')
out_addresses=db_session.query(q).order_by('out_idx').all()
return in_addresses , out_addresses
开发者ID:haobtc,项目名称:openblockchain,代码行数:28,代码来源:explorer_api.py
示例10: _get_memory_id
def _get_memory_id(self, mem):
# Check the local cache.
mem_hash = self.get_hash(mem)
if mem_hash in self.memories:
return self.memories[mem_hash]
# Attempt to insert a new memory.
# This is the expected case.
stmt = memories_table.insert().from_select([
memories_table.c.name_hash,
memories_table.c.name,
], select([
literal(mem_hash),
literal(mem),
]).where(
~exists([memories_table.c.id]).where(
memories_table.c.name_hash == mem_hash
)
)
)
self._execute(stmt)
# Check the database.
stmt = select([memories_table.c.id]).where(
memories_table.c.name_hash == mem_hash
)
row = self._execute(stmt).first()
ident = row.id
self.memories[mem_hash] = ident
return ident
开发者ID:hoangt,项目名称:ms3,代码行数:31,代码来源:sql.py
示例11: create_conversion_batch
async def create_conversion_batch(entity_name, entity_id, format, user_id):
entity_name = entity_name.upper()
if entity_name == 'AUTHOR':
author = model.Author.__table__
q = select([case([(author.c.first_name == None, author.c.last_name)],
else_ = author.c.first_name + ' ' + author.c.last_name)])\
.where(author.c.id == entity_id)
elif entity_name == 'SERIES':
series = model.Series.__table__
q = select([series.c.title]).where(series.c.id == entity_id)
elif entity_name == 'BOOKSHELF':
shelf = model.Bookshelf.__table__
q = select([shelf.c.name]).where(shelf.c.id == entity_id)
else:
raise ValueError('Invalid entity name')
format_id = await get_format_id(format)
async with engine.acquire() as conn:
batch = model.ConversionBatch.__table__
res = await conn.execute(q)
name = await res.scalar()
name = "Books for %s %s" % (entity_name.lower(), name)
res = await conn.execute(batch.insert()\
.values(name=name, for_entity=entity_name,
entity_id=entity_id, format_id=format_id,
created_by_id = user_id,
modified_by_id = user_id, version_id =1 )\
.returning(batch.c.id))
return await res.scalar()
开发者ID:izderadicka,项目名称:mybookshelf2,代码行数:31,代码来源:dal.py
示例12: start
def start(conf):
# connect to db
db.engine = engine = engine_from_config(dict(conf.items('sqlalchemy')), prefix='')
db.metadata.bind = engine
conn = engine.connect()
Session = sessionmaker(bind=engine)
session = Session()
profiles = []
topics = []
for user in session.query(User):
for profile in user.profiles:
if profile.origin == 5:
profiles.append(profile.profile_id)
for topic in user.topics:
if topic.profile_id in profiles:
topics.append(topic.topic_id)
for topic_id in topics:
print "checking", topic_id
s = select([func.count(db.t_message.c.message_id)], and_(db.t_message.c.origin == 5, db.t_message.c.topic_id == topic_id))
(count,) = conn.execute(s).fetchone()
if count > 1000:
(m_id,) = conn.execute(select([db.t_message.c.message_id],
db.t_message.c.topic_id == topic_id).order_by(
db.t_message.c.message_id.desc()).offset(1000).limit(1)).fetchone()
print "purging", topic_id, count, m_id
conn.execute(db.t_message.delete().where(and_(db.t_message.c.message_id < m_id, db.t_message.c.topic_id == topic_id)))
开发者ID:ak1394,项目名称:pavome-server,代码行数:29,代码来源:purge.py
示例13: create_mapper
def create_mapper(tag_tbl, tag_domain_tbl, tag_predicate_tbl, tag_value_tbl,
tagging_tbl):
"Mapper factory."
m = mapper(Tag,
tag_tbl,
id_attribute='tag_id',
slug_expression=lambda cls: as_slug_expression(
func.concatenate(cls.domain, ':',
cls.predicate, '=',
cls.value)),
extension=TagMapperExtension(tag_domain_tbl,
tag_predicate_tbl, tag_value_tbl),
properties=
dict(tagged=relationship(Tagged,
secondary=tagging_tbl,
back_populates='tags'),
domain=column_property(
select([tag_domain_tbl.c.domain]) \
.where(tag_tbl.c.tag_domain_id ==
tag_domain_tbl.c.tag_domain_id)
),
predicate=column_property(
select([tag_predicate_tbl.c.predicate]) \
.where(tag_tbl.c.tag_predicate_id ==
tag_predicate_tbl.c.tag_predicate_id)
),
value=column_property(
select([tag_value_tbl.c.value]) \
.where(tag_tbl.c.tag_value_id ==
tag_value_tbl.c.tag_value_id)
),
)
)
return m
开发者ID:helixyte,项目名称:TheLMA,代码行数:34,代码来源:tag.py
示例14: testMetricDataTimeStampQueryParams
def testMetricDataTimeStampQueryParams(uid):
'''
This test makes MetricDataHandler GET calls with from and to params :
_models/<uid>/data?from=<>&to=<>
'''
with repository.engineFactory().connect() as conn:
firstMetricData = conn.execute(
sql.select([schema.metric_data])
.where(schema.metric_data.c.uid == uid)
.order_by(sql.expression.asc(schema.metric_data.c.timestamp))
.limit(1)).fetchall()
lastMetricData = conn.execute(
sql.select([schema.metric_data])
.where(schema.metric_data.c.uid == uid)
.order_by(sql.expression.desc(schema.metric_data.c.timestamp))
.limit(1)).fetchall()
firstTimeStamp = firstMetricData[0].timestamp
lastTimeStamp = lastMetricData[0].timestamp
response = self.app.get("/%s/data?from=%s&to=%s"
% (uid, firstTimeStamp, lastTimeStamp), headers=self.headers)
assertions.assertSuccess(self, response)
getAllModelsResult = utils.jsonDecode(response.body)
for metricData in getAllModelsResult['data']:
self.assertGreaterEqual(datetime.strptime(metricData[0],
'%Y-%m-%d %H:%M:%S'), firstTimeStamp)
self.assertLessEqual(datetime.strptime(metricData[0],
'%Y-%m-%d %H:%M:%S'), lastTimeStamp)
开发者ID:darian19,项目名称:numenta-apps,代码行数:28,代码来源:models_all_inferences_test.py
示例15: get_school_info
def get_school_info(connection,school_id=None):
if school_id is None:
stmt = select([school])
return connection.execute(stmt)
else:
stmt = select([school]).where(school.c.id == school_id)
return connection.execute(stmt)
开发者ID:wangyu190810,项目名称:code,代码行数:7,代码来源:school.py
示例16: check_user_bought_product
def check_user_bought_product(self, user_uuid, product_uuid):
"""
Returns Falsy value if user has not bought product with given uuid else returns
bought product\'s id
Keyword Arguments:
user_uuid -- unique user\'s uuid,
product_uuid -- unique product\'s uuid
"""
sel = select([users.c.user_id]).where(users.c.user_uuid == user_uuid)
user = self.conn.execute(sel).fetchone()
if not user:
return None
sel = select([bought_products.c.bought_id]).select_from(products.join(bought_products))\
.where(and_(products.c.product_uuid == product_uuid,\
bought_products.c.user_id == user[0] ))
try:
res = self.conn.execute(sel).scalar()
return res
except:
raise
开发者ID:archdark,项目名称:consumption,代码行数:26,代码来源:db_base.py
示例17: test_delete_from_select
def test_delete_from_select(self):
table_name = "__test_deletefromselect_table__"
uuidstrs = []
for unused in range(10):
uuidstrs.append(uuid.uuid4().hex)
conn = self.engine.connect()
test_table = Table(table_name, self.meta,
Column('id', Integer, primary_key=True,
nullable=False, autoincrement=True),
Column('uuid', String(36), nullable=False))
test_table.create()
# Add 10 rows to table
for uuidstr in uuidstrs:
ins_stmt = test_table.insert().values(uuid=uuidstr)
conn.execute(ins_stmt)
# Delete 4 rows in one chunk
column = test_table.c.id
query_delete = sql.select([column],
test_table.c.id < 5).order_by(column)
delete_statement = utils.DeleteFromSelect(test_table,
query_delete, column)
result_delete = conn.execute(delete_statement)
# Verify we delete 4 rows
self.assertEqual(result_delete.rowcount, 4)
query_all = sql.select([test_table])\
.where(test_table.c.uuid.in_(uuidstrs))
rows = conn.execute(query_all).fetchall()
# Verify we still have 6 rows in table
self.assertEqual(len(rows), 6)
开发者ID:JohnLih,项目名称:nova,代码行数:32,代码来源:test_migration_utils.py
示例18: author
def author(mit_id, conn):
"""
Returns an author object for insertion into mongo summary collection.
The format is as follows:
{"_id": {"name": <name>, "mitid": <mitid>},
"type": "author",
"size": <num docs>,
"downloads": <num downloads>,
"countries": [
{"country": <3 ltr code>, "downloads": <num downloads>},...
]
"dates": [
{"date": <YYYY-MM-DD>, "downloads": <num>},...
]}
"""
requests_to_authors = requests.join(documents)\
.join(documents_authors)\
.join(authors)
totals = select([
authors.c.mit_id,
authors.c.name,
select([func.count()])
.select_from(documents_authors.join(authors))
.where(authors.c.mit_id==bindparam('mit_id'))
.label('size'),
select([func.count()])
.select_from(requests_to_authors)
.where(authors.c.mit_id==bindparam('mit_id'))
.label('downloads')
])\
.where(authors.c.mit_id==bindparam('mit_id'))
countries = select([requests.c.country, func.count().label('downloads')])\
.select_from(requests_to_authors)\
.where(authors.c.mit_id==bindparam('mit_id'))\
.group_by(requests.c.country)
dates = select([
func.date_trunc('day', requests.c.datetime).label('date'),
func.count().label('downloads')])\
.select_from(requests_to_authors)\
.where(authors.c.mit_id==bindparam('mit_id'))\
.group_by(func.date_trunc('day', requests.c.datetime))
author_obj = {'type': 'author'}
res = conn.execute(totals, mit_id=mit_id).first()
author_obj['_id'] = {'name': res['name'], 'mitid': res['mit_id']}
author_obj['size'] = res['size']
author_obj['downloads'] = res['downloads']
res = conn.execute(countries, mit_id=mit_id)
for row in res:
author_obj.setdefault('countries', [])\
.append({'country': row['country'], 'downloads': row['downloads']})
res = conn.execute(dates, mit_id=mit_id)
for row in res:
author_obj.setdefault('dates', [])\
.append({'date': row['date'].strftime('%Y-%m-%d'),
'downloads': row['downloads']})
return author_obj
开发者ID:MITLibraries,项目名称:oastats-backend,代码行数:60,代码来源:summary.py
示例19: upgrade
def upgrade():
connection = op.get_bind()
tasks_rows = connection.execute(
select([
tasks_table.c.id,
tasks_table.c.title,
tasks_table.c.description
]))
tasks_by_id = {}
for tasks_row in tasks_rows:
tasks_by_id[tasks_row.id] = tasks_row
task_group_tasks_rows = connection.execute(
select([
task_group_tasks_table.c.id,
task_group_tasks_table.c.task_id
]))
for task_group_tasks_row in task_group_tasks_rows:
task_id = task_group_tasks_row.task_id
if task_id:
tasks_row = tasks_by_id[task_id]
op.execute(
task_group_tasks_table.update()\
.values(
title=tasks_row.title,
description=tasks_row.description
)\
.where(
task_group_tasks_table.c.id == task_id
))
开发者ID:Smotko,项目名称:ggrc-core,代码行数:32,代码来源:20140804163035_eab2ab6a0fc_copy_task_data_into_taskgrouptask_rows.py
示例20: test_recursive_union_no_alias_two
def test_recursive_union_no_alias_two(self):
"""
pg's example:
WITH RECURSIVE t(n) AS (
VALUES (1)
UNION ALL
SELECT n+1 FROM t WHERE n < 100
)
SELECT sum(n) FROM t;
"""
# I know, this is the PG VALUES keyword,
# we're cheating here. also yes we need the SELECT,
# sorry PG.
t = select([func.values(1).label("n")]).cte("t", recursive=True)
t = t.union_all(select([t.c.n + 1]).where(t.c.n < 100))
s = select([func.sum(t.c.n)])
self.assert_compile(s,
"WITH RECURSIVE t(n) AS "
"(SELECT values(:values_1) AS n "
"UNION ALL SELECT t.n + :n_1 AS anon_1 "
"FROM t "
"WHERE t.n < :n_2) "
"SELECT sum(t.n) AS sum_1 FROM t"
)
开发者ID:biner,项目名称:sqlalchemy,代码行数:28,代码来源:test_cte.py
注:本文中的sqlalchemy.sql.select函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论