本文整理汇总了Python中sqlalchemy.dialects.postgresql.array函数的典型用法代码示例。如果您正苦于以下问题:Python array函数的具体用法?Python array怎么用?Python array使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了array函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: query_recursive_tree
def query_recursive_tree():
structure_tree = (
DBSession.query(
Structure.id,
Structure.name,
Structure.parent_id,
cast(1, Integer()).label('depth'),
array([cast(Structure.name, Text)]).label('name_path'),
array([Structure.id]).label('path'),
)
.filter(Structure.condition_root_level())
.cte(name='structure_tree', recursive=True)
)
st = aliased(structure_tree, name='st')
s = aliased(Structure, name='s')
structure_tree = structure_tree.union_all(
DBSession.query(
s.id, s.name, s.parent_id,
(st.c.depth + 1).label('depth'),
func.array_append(
st.c.name_path, cast(s.name, Text)
).label('name_path'),
func.array_append(st.c.path, s.id).label('path'),
)
.filter(s.parent_id == st.c.id)
)
return DBSession.query(structure_tree)
开发者ID:alishir,项目名称:tcr,代码行数:27,代码来源:structures.py
示例2: test_cols_hstore_pair_array
def test_cols_hstore_pair_array(self):
self._test_cols(
hstore(array(['1', '2']), array(['3', None]))['1'],
("hstore(ARRAY[%(param_1)s, %(param_2)s], "
"ARRAY[%(param_3)s, NULL]) -> %(hstore_1)s AS anon_1"),
False
)
开发者ID:d0ugal-archive,项目名称:peps,代码行数:7,代码来源:test_hstore.py
示例3: test_array_literal
def test_array_literal(self):
self.assert_compile(
func.array_dims(postgresql.array([1, 2]) +
postgresql.array([3, 4, 5])),
"array_dims(ARRAY[%(param_1)s, %(param_2)s] || "
"ARRAY[%(param_3)s, %(param_4)s, %(param_5)s])",
checkparams={'param_5': 5, 'param_4': 4, 'param_1': 1,
'param_3': 3, 'param_2': 2}
)
开发者ID:t3573393,项目名称:sqlalchemy,代码行数:9,代码来源:test_compiler.py
示例4: filter_request
def filter_request(self, query, args):
#By default we filter contracts
con_type = args['type'] if ('type' in args and args['type'] != None) else self.default_type
query = query.filter(Release.type == con_type)
if 'q' in args and args['q'] != None:
#search = unidecode(unicode(args['q'])).replace(" ", "&")
#query = query.filter(func.to_tsvector(app.config["FTS_LANG"], Release.concat).match(search, postgresql_regconfig=app.config["FTS_LANG"]))
query = query.filter('to_tsvector(\'' + app.config["FTS_LANG"] +'\', releases.concat) @@ plainto_tsquery(\''+ app.config["FTS_LANG"] +'\', \''+ args['q'] +'\')')
if 'value_gt' in args and args['value_gt'] != None:
query = query.filter(Release.value >= args['value_gt'])
if 'value_lt' in args and args['value_lt'] != None:
query = query.filter(Release.value <= args['value_lt'])
if 'date_gt' in args and args['date_gt'] != None:
query = query.filter(Release.date >= args['date_gt'])
if 'date_lt' in args and args['date_lt'] != None:
query = query.filter(Release.date <= args['date_lt'])
if 'buyer' in args and args['buyer'] != None:
if self.buyer_joined == False:
query = query.join(Buyer)
self.buyer_joined = True
query = query.filter(array(args['buyer'].split(';')).any(Buyer.slug))
if 'activity' in args and args['activity'] != None:
query = query.filter(Release.activities.overlap(args['activity'].split(';')))
if 'procuring_entity' in args and args['procuring_entity'] != None:
query = query.filter(array(args['procuring_entity'].split(';')).any(Release.procuring_entity_slug))
if ('supplier' in args and args['supplier'] != None) or ('supplier_size' in args and args['supplier_size'] != None):
if self.supplier_joined == False:
query = query.join(Supplier)
self.supplier_joined = True
if ('supplier' in args and args['supplier'] != None):
query = query.filter(array(args['supplier'].split(';')).any(Supplier.slug))
if ('supplier_size' in args and args['supplier_size'] != None):
integered = [ int(item) for item in args['supplier_size'].split(';')]
query = query.filter(array(integered).any(Supplier.size))
return query
开发者ID:fprieur,项目名称:ovc-vdm,代码行数:51,代码来源:app.py
示例5: get
def get(self):
"""
Gathers all events from the database with their data
return a json object representing the events
"""
session = db.loadSession()
# Make the sql query
result = session.query(
# What to select
# distinct because of multiple medals per event
distinct(db.Event.id),
db.Event.name,
db.Sport.name,
func.array_agg_cust(distinct(array([cast(db.Olympics.id, String), cast(db.Olympics.year, String), db.Olympics.season])))
)\
.select_from(db.Event)\
.join(db.Sport)\
.join(db.Medal)\
.join(db.Olympics)\
.group_by(db.Event.id,
db.Event.name,
db.Sport.name)\
.all() # Actually executes the query and returns a list of tuples
session.close()
keys = ('id', 'name', 'sport', ('olympics', ('id', 'year', 'season')))
all_events_dict = list_of_dict_to_dict_of_dict(add_keys(keys, row) for row in result)
return jsonify(all_events_dict)
开发者ID:alsukurr,项目名称:cs373-idb,代码行数:33,代码来源:api.py
示例6: test_cols_delete_array_of_keys
def test_cols_delete_array_of_keys(self):
self._test_cols(
self.hashcol.delete(array(['foo', 'bar'])),
("delete(test_table.hash, ARRAY[%(param_1)s, %(param_2)s]) "
"AS delete_1"),
True
)
开发者ID:d0ugal-archive,项目名称:peps,代码行数:7,代码来源:test_hstore.py
示例7: test_array_literal_insert
def test_array_literal_insert(self):
m = MetaData()
t = Table("t", m, Column("data", postgresql.ARRAY(Integer)))
self.assert_compile(
t.insert().values(data=array([1, 2, 3])),
"INSERT INTO t (data) VALUES (ARRAY[%(param_1)s, " "%(param_2)s, %(param_3)s])",
)
开发者ID:EvaSDK,项目名称:sqlalchemy,代码行数:7,代码来源:test_compiler.py
示例8: test_cols_hstore_single_array
def test_cols_hstore_single_array(self):
self._test_cols(
hstore(array(['1', '2', '3', None]))['3'],
("hstore(ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, NULL]) "
"-> %(hstore_1)s AS anon_1"),
False
)
开发者ID:d0ugal-archive,项目名称:peps,代码行数:7,代码来源:test_hstore.py
示例9: get_tree_cte
def get_tree_cte(cls, col='id'):
"""Create a CTE for the category tree.
The CTE contains the followign columns:
- ``id`` -- the category id
- ``path`` -- an array containing the path from the root to
the category itself
- ``is_deleted`` -- whether the category is deleted
:param col: The name of the column to use in the path or a
callable receiving the category alias that must
return the expression used for the 'path'
retrieved by the CTE.
"""
cat_alias = db.aliased(cls)
if callable(col):
path_column = col(cat_alias)
else:
path_column = getattr(cat_alias, col)
cte_query = (select([cat_alias.id, array([path_column]).label('path'), cat_alias.is_deleted])
.where(cat_alias.parent_id.is_(None))
.cte(recursive=True))
rec_query = (select([cat_alias.id,
cte_query.c.path.op('||')(path_column),
cte_query.c.is_deleted | cat_alias.is_deleted])
.where(cat_alias.parent_id == cte_query.c.id))
return cte_query.union_all(rec_query)
开发者ID:fph,项目名称:indico,代码行数:27,代码来源:categories.py
示例10: test_cols_slice
def test_cols_slice(self):
self._test_cols(
self.hashcol.slice(array(['1', '2'])),
("slice(test_table.hash, ARRAY[%(param_1)s, %(param_2)s]) "
"AS slice_1"),
True
)
开发者ID:d0ugal-archive,项目名称:peps,代码行数:7,代码来源:test_hstore.py
示例11: compile_array_agg
def compile_array_agg(element, compiler, **kw):
compiled = "%s(%s)" % (element.name, compiler.process(element.clauses))
if element.default is None:
return compiled
return str(sa.func.coalesce(
sa.text(compiled),
sa.cast(postgresql.array(element.default), element.type)
).compile(compiler))
开发者ID:dset0x,项目名称:sqlalchemy-utils,代码行数:8,代码来源:expressions.py
示例12: qualstat_get_figures
def qualstat_get_figures(conn, database, tsfrom, tsto, queries=None, quals=None):
condition = text("""datname = :database AND coalesce_range && tstzrange(:from, :to)""")
if queries is not None:
condition = and_(condition, array([int(q) for q in queries])
.any(literal_column("s.queryid")))
if quals is not None:
condition = and_(condition, array([int(q) for q in quals])
.any(literal_column("qnc.qualid")))
sql = (select([
text('most_filtering.quals'),
text('most_filtering.query'),
text('to_json(most_filtering) as "most filtering"'),
text('to_json(least_filtering) as "least filtering"'),
text('to_json(most_executed) as "most executed"'),
text('to_json(most_used) as "most used"')])
.select_from(
qual_constants("most_filtering", condition)
.alias("most_filtering")
.join(
qual_constants("least_filtering", condition)
.alias("least_filtering"),
text("most_filtering.rownumber = "
"least_filtering.rownumber"))
.join(qual_constants("most_executed", condition)
.alias("most_executed"),
text("most_executed.rownumber = "
"least_filtering.rownumber"))
.join(qual_constants("most_used", condition)
.alias("most_used"),
text("most_used.rownumber = "
"least_filtering.rownumber"))))
params = {"database": database,
"from": tsfrom,
"to": tsto}
quals = conn.execute(sql, params=params)
if quals.rowcount == 0:
return None
row = quals.first()
return row
开发者ID:girgen,项目名称:powa-web,代码行数:45,代码来源:__init__.py
示例13: test_array_literal_compare
def test_array_literal_compare(self):
self.assert_compile(
postgresql.array([1, 2]) == [3, 4, 5],
"ARRAY[%(param_1)s, %(param_2)s] = "
"ARRAY[%(param_3)s, %(param_4)s, %(param_5)s]",
checkparams={'param_5': 5, 'param_4': 4, 'param_1': 1,
'param_3': 3, 'param_2': 2}
)
开发者ID:t3573393,项目名称:sqlalchemy,代码行数:9,代码来源:test_compiler.py
示例14: search_query
def search_query(cls, tokens, weight_func=None, include_misses=False, ordered=True):
# Read the searchable columns from the table (strings)
columns = cls.__searchable_columns__
# Convert the columns from strings into column objects
columns = [getattr(cls, c) for c in columns]
# The model name that can be used to match search result to model
cls_name = literal_column("'{}'".format(cls.__name__))
# Filter out id: tokens for later
ids, tokens = process_id_option(tokens)
# If there are still tokens left after id: token filtering
if tokens:
# Generate the search weight expression from the
# searchable columns, tokens and patterns
if not weight_func:
weight_func = weight_expression
weight = weight_func(columns, tokens)
# If the search expression only included "special" tokens like id:
else:
weight = literal_column(str(1))
# Create an array of stringified detail columns
details = getattr(cls, "__search_detail_columns__", None)
if details:
details = [cast(getattr(cls, d), Unicode) for d in details]
else:
details = [literal_column("NULL")]
# Create a query object
query = db.session.query(
cls_name.label("model"),
cls.id.label("id"),
cls.name.label("name"),
array(details).label("details"),
weight.label("weight"),
)
# Filter out specific ids (optional)
if ids:
query = query.filter(cls.id.in_(ids))
# Filter out results that don't match the patterns at all (optional)
if not include_misses:
query = query.filter(weight > 0)
# Order by weight (optional)
if ordered:
query = query.order_by(desc(weight))
return query
开发者ID:skylines-project,项目名称:skylines,代码行数:56,代码来源:search.py
示例15: _find_in_set
def _find_in_set(t, expr):
# postgresql 9.5 has array_position, but the code below works on any
# version of postgres with generate_subscripts
# TODO: could make it even more generic by using generate_series
# TODO: this works with *any* type, not just strings. should the operation
# itself also have this property?
needle, haystack = expr.op().args
return array_search(
t.translate(needle), pg.array(list(map(t.translate, haystack)))
)
开发者ID:cloudera,项目名称:ibis,代码行数:10,代码来源:compiler.py
示例16: search
def search():
keywords = request.args.get('keywords')
sort = request.args.get('sort')
client = SphinxClient()
client.SetServer(SEARCH_HOST, SEARCH_PORT)
# Sorting mode
if sort == 'newest':
client.SetSortMode(SPH_SORT_ATTR_DESC, 'date_added')
elif sort == 'oldest':
client.SetSortMode(SPH_SORT_ATTR_ASC, 'date_added')
elif sort == 'highest_cost':
client.SetSortMode(SPH_SORT_ATTR_DESC, 'cost')
elif sort == 'lowest_cost':
client.SetSortMode(SPH_SORT_ATTR_ASC, 'cost')
# Filter by category
category = request.args.get('category')
try:
category = int(category)
except (ValueError, TypeError):
category = None
if category:
client.SetFilter('category', [category])
# Paging
try:
per_page = int(per_page)
except ValueError:
per_page = 20
page = request.args.get('page', default=1)
try:
page = int(page)
except ValueError:
page = 1
# Use our SphinxSearch query to construct our page
client.SetLimits(per_page*(page-1), per_page)
# Handle the query
q = client.Query(keywords)
if not q:
return 'Could not complete search', 400
ids = []
for res in q['matches']:
ids.append(res['id'])
if not ids:
return jsonify(data=[], num_pages=0), 200
# First construct the subquery
s_ids = db.session.query(func.unnest(array(ids)).label('id')).subquery('s_ids')
query = Postings.query.join(s_ids, Postings.id == s_ids.c.id)
# Return the JSON
return jsonify(data=[to_dict(r) for r in page.items], num_pages=page.pages), 200
开发者ID:yao7zhao,项目名称:image,代码行数:55,代码来源:main.py
示例17: get_bounding_box
def get_bounding_box(self):
positions = []
for position in self.positions:
positions.append(position.geom)
# We return the max number of positions plus one, so it can detect
# there are more and not just the barrier number
if len(positions) == (max_positions + 1):
break
return self.session.scalar(func.ST_Envelope(
func.ST_MakeLine(array(positions)))) if len(positions) > 0\
else None
开发者ID:onde-estan,项目名称:ondestan,代码行数:11,代码来源:animal.py
示例18: get_plots_bounding_box_as_json
def get_plots_bounding_box_as_json(self):
positions = []
if self.role.name == Role._ADMIN_ROLE:
plots = Plot().queryObject().all()
else:
plots = self.plots
for plot in plots:
positions.append(plot.geom)
return self.session.scalar(func.ST_AsGeoJson(func.ST_Envelope(
func.ST_Union(array(positions))))) if len(positions) > 0\
else None
开发者ID:onde-estan,项目名称:ondestan,代码行数:11,代码来源:user.py
示例19: preprocess_value_and_column
def preprocess_value_and_column(cls, column, value):
value_array = is_array(value)
# Coerce operand
if column.is_array and value_array:
value = cast(pg.array(value), pg.ARRAY(column.sql_col.type.item_type))
if column.is_json:
coerce_type = column.sql_col.type.coerce_compared_value('=', value) # HACKY: use sqlalchemy type coercion
column.sql_col = cast(column.sql_col, coerce_type)
return column, value
开发者ID:kolypto,项目名称:py-mongosql,代码行数:11,代码来源:statements.py
示例20: test_literal_binds_pgarray
def test_literal_binds_pgarray(self):
from sqlalchemy.dialects.postgresql import ARRAY, array
m = MetaData()
t = Table('t', m, Column(
'x', ARRAY(Integer),
server_default=array([1, 2, 3]))
)
self.assert_compile(
CreateTable(t),
"CREATE TABLE t (x INTEGER[] DEFAULT ARRAY[1, 2, 3])",
dialect='postgresql'
)
开发者ID:NaiRobley,项目名称:sqlalchemy,代码行数:12,代码来源:test_defaults.py
注:本文中的sqlalchemy.dialects.postgresql.array函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论