本文整理汇总了Python中sqlalchemy.func.length函数的典型用法代码示例。如果您正苦于以下问题:Python length函数的具体用法?Python length怎么用?Python length使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了length函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: birth_time
def birth_time(cls):
hour = cast(func.extract("hour", cls.birth_datetime), String)
minute = cast(func.extract("minute", cls.birth_datetime), String)
hour = case([(func.length(hour) == 1, "0" + hour)], else_=hour)
minute = case([(func.length(minute) == 1, "0" + minute)], else_=minute)
return hour + ":" + minute
开发者ID:PEDSnet,项目名称:pedsnetcdm_to_pcornetcdm,代码行数:7,代码来源:demographics.py
示例2: query
def query(self):
tables = self.left.from_clause + self.right.from_clause
left_lt = self.config.linktab.alias('__left_linktab')
right_lt = self.config.linktab.alias('__right_linktab')
tables += [left_lt, right_lt]
columns = []
score_length = func.greatest(func.length(self.left.key),
func.length(self.right.key))
score_leven = func.levenshtein(self.left.key, self.right.key)
score_leven = cast(score_leven, Float)
score = 1 - (score_leven / score_length)
columns.append(score.label("score"))
for field in self.left.fields:
columns.append(field.column.label(field.column_ref))
for field in self.right.fields:
columns.append(field.column.label(field.column_ref))
q = select(columns=columns, from_obj=tables)
q = self.left.apply_filters(q)
q = self.right.apply_filters(q)
q = q.where(left_lt.c.key == self.left.key)
q = q.where(left_lt.c.view == self.left.name)
q = q.where(right_lt.c.key == self.right.key)
q = q.where(right_lt.c.view == self.right.name)
# TODO: make this levenshteinable
q = q.where(right_lt.c.fingerprint == left_lt.c.fingerprint)
q = q.limit(self.config.cutoff + 1)
q = q.order_by(score.desc())
q = q.distinct()
# print q
return q
开发者ID:backgroundcheck,项目名称:linkage,代码行数:35,代码来源:model.py
示例3: randomProfile
def randomProfile(category = None):
ids = db.session.query(Search.id).filter(Search.kind == category)
if category in ("bra"):
ids = ids.filter(not_(Search.id.startswith("0xx")))
elif category in ("cnae", "cbo", "wld"):
ids = ids.filter(not_(Search.id.startswith("xx")))
elif category in ("hs"):
ids = ids.filter(not_(Search.id.startswith("22")))
elif category in ("course_hedu", "course_sc"):
ids = ids.filter(not_(Search.id.startswith("00")))
if category == "bra":
ids = ids.filter(func.length(Search.id) == 9)
elif category in ("cnae", "hs", "course_hedu"):
ids = ids.filter(func.length(Search.id) == 6)
elif category == ("wld", "course_sc"):
ids = ids.filter(func.length(Search.id) == 5)
elif category == "cnae":
ids = ids.filter(func.length(Search.id) == 4)
ids = ids.order_by(Search.weight.desc()).limit(100).all()
rand = random.randrange(0, len(ids))
id = ids[rand][0]
return redirect(url_for("profiles.profiles", category=category, id=id))
开发者ID:VincentVW,项目名称:dataviva-site,代码行数:27,代码来源:views.py
示例4: rename_directory
def rename_directory(db, user_id, old_api_path, new_api_path):
"""
Rename a directory.
"""
old_db_path = from_api_dirname(old_api_path)
new_db_path = from_api_dirname(new_api_path)
if old_db_path == '/':
raise RenameRoot('Renaming the root directory is not permitted.')
# Overwriting existing directories is disallowed.
if _dir_exists(db, user_id, new_db_path):
raise DirectoryExists(new_api_path)
# Set this foreign key constraint to deferred so it's not violated
# when we run the first statement to update the name of the directory.
db.execute('SET CONSTRAINTS '
'pgcontents.directories_parent_user_id_fkey DEFERRED')
# Update name column for the directory that's being renamed
db.execute(
directories.update().where(
and_(
directories.c.user_id == user_id,
directories.c.name == old_db_path,
)
).values(
name=new_db_path,
)
)
# Update the name and parent_name of any descendant directories. Do
# this in a single statement so the non-deferrable check constraint
# is satisfied.
db.execute(
directories.update().where(
and_(
directories.c.user_id == user_id,
directories.c.name.startswith(old_db_path),
directories.c.parent_name.startswith(old_db_path),
)
).values(
name=func.concat(
new_db_path,
func.right(directories.c.name, -func.length(old_db_path))
),
parent_name=func.concat(
new_db_path,
func.right(
directories.c.parent_name,
-func.length(old_db_path)
)
),
)
)
开发者ID:quantopian,项目名称:pgcontents,代码行数:55,代码来源:query.py
示例5: birth_date
def birth_date(cls):
year = cast(cls.year_of_birth, String)
month = cast(cls.month_of_birth, String)
day = cast(cls.day_of_birth, String)
month = case([(month == "", "01")],
else_=case([(func.length(month) == 1, "0" + month)], else_=month))
day = case([(day == "", "01")],
else_=case([(func.length(day) == 1, "0" + day)], else_=day))
return year + "-" + month + "-" + day
开发者ID:PEDSnet,项目名称:pedsnetcdm_to_pcornetcdm,代码行数:11,代码来源:demographics.py
示例6: number_of_locations
def number_of_locations(self, bra_length):
if bra_length == 1 or bra_length == 3:
bra_query = db.session.query(func.count(Bra.id).label("total")).filter(
func.length(Bra.id) == bra_length)
elif bra_length == 7:
bra_query = db.session.query(func.count(Bra.id).label("total")).filter(
Bra.id.like(self.bra_id[:5] + '%'),
func.length(Bra.id) == bra_length)
else:
bra_query = db.session.query(func.count(Bra.id).label("total")).filter(
Bra.id.like(self.bra_id[:3] + '%'),
func.length(Bra.id) == bra_length)
bra = bra_query.first()
return bra.total
开发者ID:DataViva,项目名称:dataviva-site,代码行数:16,代码来源:services.py
示例7: _add_ordering
def _add_ordering(sql_query, table, column_type, column_name, order):
# Special case for this column, which sorts contigs correctly:
if column_name == 'contig':
get_contig_num = cast(
text("SUBSTRING({} FROM '\d+')".format(table.c.contig)),
type_=Integer)
starts_with_chr = (text("SUBSTRING({} FROM '^chr(\d+)')"
.format(table.c.contig)) != literal(''))
starts_with_number = (text("SUBSTRING({} FROM '^\d+')"
.format(table.c.contig)) != literal(''))
# 10000 used here to mean "should be at the end of all the numbers",
# assuming we never hit a chromosome number >= 10000.
contig_num_col = case(
[(starts_with_chr, get_contig_num),
(starts_with_number, get_contig_num)],
else_=literal(10000)
)
contig_len_col = func.length(table.c.contig)
contig_col = table.c.contig
if order == 'desc':
contig_len_col = desc(contig_len_col)
contig_col = desc(contig_col)
return sql_query.order_by(contig_num_col, contig_len_col, contig_col)
sqla_type = vcf_type_to_sqla_type(column_type)
column = cast(table.c[column_name], type_=sqla_type)
column = {'asc': asc(column), 'desc': desc(column)}.get(order)
return sql_query.order_by(column)
开发者ID:hammerlab,项目名称:cycledash,代码行数:27,代码来源:genotypes.py
示例8: test_update_returning
def test_update_returning(self):
table1 = table(
'mytable',
column('myid', Integer),
column('name', String(128)),
column('description', String(128)))
u = update(
table1,
values=dict(name='foo')).returning(table1.c.myid, table1.c.name)
self.assert_compile(u,
'UPDATE mytable SET name=:name OUTPUT '
'inserted.myid, inserted.name')
u = update(table1, values=dict(name='foo')).returning(table1)
self.assert_compile(u,
'UPDATE mytable SET name=:name OUTPUT '
'inserted.myid, inserted.name, '
'inserted.description')
u = update(
table1,
values=dict(
name='foo')).returning(table1).where(table1.c.name == 'bar')
self.assert_compile(u,
'UPDATE mytable SET name=:name OUTPUT '
'inserted.myid, inserted.name, '
'inserted.description WHERE mytable.name = '
':name_1')
u = update(table1, values=dict(name='foo'
)).returning(func.length(table1.c.name))
self.assert_compile(u,
'UPDATE mytable SET name=:name OUTPUT '
'LEN(inserted.name) AS length_1')
开发者ID:CyberCollins,项目名称:sqlalchemy,代码行数:31,代码来源:test_compiler.py
示例9: __init__
def __init__(self, bra_id):
Location.__init__(self, bra_id)
self.max_year_query = db.session.query(
func.max(Yb.year)).filter(Yb.bra_id == bra_id)
self.attrs_query = Yb.query.filter(
Yb.year == self.max_year_query,
func.length(Yb.bra_id) == len(self.bra_id))
开发者ID:DataViva,项目名称:dataviva-site,代码行数:7,代码来源:services.py
示例10: legislator_of
def legislator_of(cls, region_id, assembly_id=None):
if not region_id:
return None
if not assembly_id:
assembly_id = current_parliament_id('assembly')
region = Region.query.filter_by(id=region_id).one()
original_region = region
legislator = None
while not legislator and region:
legislators = region.candidates\
.filter(Candidacy.assembly_id == assembly_id)\
.filter_by(is_elected=True)
try:
legislator = legislators.one()
except MultipleResultsFound as e:
legislator = guess_legislator(legislators, original_region,
assembly_id)
break
except NoResultFound as e:
region = region.parents.order_by(False)\
.order_by(func.length(Region.id).desc())\
.first()
return legislator
开发者ID:KangGwanwoo,项目名称:pokr.kr,代码行数:27,代码来源:region.py
示例11: test_insert_returning
def test_insert_returning(self):
dialect = postgresql.dialect()
table1 = table(
"mytable", column("myid", Integer), column("name", String(128)), column("description", String(128))
)
i = insert(table1, values=dict(name="foo")).returning(table1.c.myid, table1.c.name)
self.assert_compile(
i,
"INSERT INTO mytable (name) VALUES " "(%(name)s) RETURNING mytable.myid, " "mytable.name",
dialect=dialect,
)
i = insert(table1, values=dict(name="foo")).returning(table1)
self.assert_compile(
i,
"INSERT INTO mytable (name) VALUES "
"(%(name)s) RETURNING mytable.myid, "
"mytable.name, mytable.description",
dialect=dialect,
)
i = insert(table1, values=dict(name="foo")).returning(func.length(table1.c.name))
self.assert_compile(
i,
"INSERT INTO mytable (name) VALUES " "(%(name)s) RETURNING length(mytable.name) " "AS length_1",
dialect=dialect,
)
开发者ID:EvaSDK,项目名称:sqlalchemy,代码行数:26,代码来源:test_compiler.py
示例12: aset_kibd_act
def aset_kibd_act(self):
ses = self.request.session
req = self.request
params = req.params
url_dict = req.matchdict
pk_id = 'id' in params and int(params['id']) or 0
if url_dict['act']=='grid':
# defining columns
columns = []
columns.append(ColumnDT('id'))
columns.append(ColumnDT('units.kode'))
columns.append(ColumnDT('units.nama'))
columns.append(ColumnDT('kats.kode'))
columns.append(ColumnDT('no_register'))
#columns.append(ColumnDT('uraian'))
columns.append(ColumnDT('kats.uraian'))
#columns.append(ColumnDT('tahun'))
columns.append(ColumnDT('tgl_perolehan', filter=self._DTstrftime))
columns.append(ColumnDT('th_beli'))
columns.append(ColumnDT('harga'))
columns.append(ColumnDT('kondisi'))
query = DBSession.query(AsetKib).\
join(AsetKategori, Unit).\
filter(AsetKib.unit_id == Unit.id,
#AsetKib.unit_id == ses['unit_id'],
AsetKib.kategori_id==AsetKategori.id,
AsetKib.kib=='D',
func.substr(Unit.kode,1,func.length(ses['unit_kd']))==ses['unit_kd'],
or_(AsetKib.disabled=='0',AsetKib.disabled==None))
rowTable = DataTables(req, AsetKib, query, columns)
return rowTable.output_result()
开发者ID:aagusti,项目名称:zosipkd,代码行数:32,代码来源:kibd.py
示例13: test_insert_returning
def test_insert_returning(self):
table1 = table(
"mytable",
column("myid", Integer),
column("name", String(128)),
column("description", String(128)),
)
i = insert(table1, values=dict(name="foo")).returning(
table1.c.myid, table1.c.name
)
self.assert_compile(
i,
"INSERT INTO mytable (name) OUTPUT "
"inserted.myid, inserted.name VALUES "
"(:name)",
)
i = insert(table1, values=dict(name="foo")).returning(table1)
self.assert_compile(
i,
"INSERT INTO mytable (name) OUTPUT "
"inserted.myid, inserted.name, "
"inserted.description VALUES (:name)",
)
i = insert(table1, values=dict(name="foo")).returning(
func.length(table1.c.name)
)
self.assert_compile(
i,
"INSERT INTO mytable (name) OUTPUT "
"LEN(inserted.name) AS length_1 VALUES "
"(:name)",
)
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:32,代码来源:test_compiler.py
示例14: get_instance
def get_instance(self, db):
"""
Decompresses the instance blob if necessary and returns it as string.
EDACC can store compressed and uncompressed instances. To distinguish
between them, we prepend the ASCII characters "LZMA" to a compressed instance.
"""
table = db.metadata.tables['Instances']
c_instance = table.c['instance']
c_id = table.c['idInstance']
# get prefix
instance_header = db.session.connection().execute(select([func.substring(c_instance, 1, 4)],
c_id == self.idInstance).select_from(
table)).first()[0]
data_length = db.session.connection().execute(select([func.length(c_instance)],
c_id == self.idInstance).select_from(
table)).first()[0]
if data_length > 32 * 1024 * 1024:
return "Instance too large for processing. Please use the EDACC GUI application."
if instance_header == 'LZMA': # compressed instance?
# get blob without LZMA prefix
instance_blob = db.session.connection().execute(select([func.substring(c_instance, 5)],
c_id == self.idInstance).select_from(
table)).first()[0]
return utils.lzma_decompress(instance_blob)
else:
return self.instance
开发者ID:ChunHungLiu,项目名称:edacc_web,代码行数:26,代码来源:models.py
示例15: search_regions
def search_regions():
options = {}
regions = Region.query\
.filter(and_(
Region.name.like(u'%{0}%'.format(query)),
func.length(Region.id) < 7))
return (regions, options)
开发者ID:nacyot,项目名称:pokr,代码行数:7,代码来源:search.py
示例16: test_insert_returning
def test_insert_returning(self):
table1 = table(
"mytable",
column("myid", Integer),
column("name", String(128)),
column("description", String(128)),
)
i = insert(table1, values=dict(name="foo")).returning(
table1.c.myid, table1.c.name
)
self.assert_compile(
i,
"INSERT INTO mytable (name) VALUES (:name) "
"RETURNING mytable.myid, mytable.name",
)
i = insert(table1, values=dict(name="foo")).returning(table1)
self.assert_compile(
i,
"INSERT INTO mytable (name) VALUES (:name) "
"RETURNING mytable.myid, mytable.name, "
"mytable.description",
)
i = insert(table1, values=dict(name="foo")).returning(
func.length(table1.c.name)
)
self.assert_compile(
i,
"INSERT INTO mytable (name) VALUES (:name) "
"RETURNING char_length(mytable.name) AS "
"length_1",
)
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:31,代码来源:test_firebird.py
示例17: test_update_returning
def test_update_returning(self):
dialect = postgresql.dialect()
table1 = table(
'mytable',
column(
'myid', Integer),
column(
'name', String(128)),
column(
'description', String(128)))
u = update(
table1,
values=dict(
name='foo')).returning(
table1.c.myid,
table1.c.name)
self.assert_compile(u,
'UPDATE mytable SET name=%(name)s '
'RETURNING mytable.myid, mytable.name',
dialect=dialect)
u = update(table1, values=dict(name='foo')).returning(table1)
self.assert_compile(u,
'UPDATE mytable SET name=%(name)s '
'RETURNING mytable.myid, mytable.name, '
'mytable.description', dialect=dialect)
u = update(table1, values=dict(name='foo'
)).returning(func.length(table1.c.name))
self.assert_compile(
u,
'UPDATE mytable SET name=%(name)s '
'RETURNING length(mytable.name) AS length_1',
dialect=dialect)
开发者ID:Attsun1031,项目名称:sqlalchemy,代码行数:32,代码来源:test_compiler.py
示例18: test_update_returning
def test_update_returning(self):
table1 = table(
"mytable",
column("myid", Integer),
column("name", String(128)),
column("description", String(128)),
)
u = update(table1, values=dict(name="foo")).returning(
table1.c.myid, table1.c.name
)
self.assert_compile(
u,
"UPDATE mytable SET name=:name RETURNING "
"mytable.myid, mytable.name",
)
u = update(table1, values=dict(name="foo")).returning(table1)
self.assert_compile(
u,
"UPDATE mytable SET name=:name RETURNING "
"mytable.myid, mytable.name, "
"mytable.description",
)
u = update(table1, values=dict(name="foo")).returning(
func.length(table1.c.name)
)
self.assert_compile(
u,
"UPDATE mytable SET name=:name RETURNING "
"char_length(mytable.name) AS length_1",
)
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:30,代码来源:test_firebird.py
示例19: test_insert_returning
def test_insert_returning(self):
dialect = postgresql.dialect()
table1 = table('mytable',
column('myid', Integer),
column('name', String(128)),
column('description', String(128)),
)
i = insert(
table1,
values=dict(
name='foo')).returning(
table1.c.myid,
table1.c.name)
self.assert_compile(i,
'INSERT INTO mytable (name) VALUES '
'(%(name)s) RETURNING mytable.myid, '
'mytable.name', dialect=dialect)
i = insert(table1, values=dict(name='foo')).returning(table1)
self.assert_compile(i,
'INSERT INTO mytable (name) VALUES '
'(%(name)s) RETURNING mytable.myid, '
'mytable.name, mytable.description',
dialect=dialect)
i = insert(table1, values=dict(name='foo'
)).returning(func.length(table1.c.name))
self.assert_compile(i,
'INSERT INTO mytable (name) VALUES '
'(%(name)s) RETURNING length(mytable.name) '
'AS length_1', dialect=dialect)
开发者ID:Attsun1031,项目名称:sqlalchemy,代码行数:30,代码来源:test_compiler.py
示例20: genotypes_for_records
def genotypes_for_records(vcf_id, query):
"""Return all genotypes which would appear on a row in a VCF (determined by
CHROM/POS/REF/ALT) if just one genotype on that row passes the selections in
`query'.
This is used to generate the list of genotypes to be transformed into
vcf.model._Records and then written to a VCF file.
"""
query = _annotate_query_with_types(query, spec(vcf_id))
with tables(db.engine, 'genotypes') as (con, gt):
keyfunc = func.concat(
gt.c.contig, ':', cast(gt.c.position, types.Unicode), '::',
gt.c.reference, '->', gt.c.alternates)
filtered_gts_q = select([keyfunc]).where(gt.c.vcf_id == vcf_id)
filtered_gts_q = _add_filters(filtered_gts_q, gt, query.get('filters'))
filtered_gts_q = _add_range(filtered_gts_q, gt, query.get('range'))
filtered_gts_q = filtered_gts_q.cte('filtered_gts')
records_q = select([gt]).where(
keyfunc.in_(select([filtered_gts_q]))).where(gt.c.vcf_id == vcf_id)
records_q = records_q.order_by(asc(func.length(gt.c.contig)),
asc(gt.c.contig),
asc(gt.c.position),
asc(gt.c.reference),
asc(gt.c.alternates),
asc(gt.c.sample_name))
genotypes = [dict(g) for g in con.execute(records_q).fetchall()]
return genotypes
开发者ID:hammerlab,项目名称:cycledash,代码行数:28,代码来源:genotypes.py
注:本文中的sqlalchemy.func.length函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论