本文整理汇总了Python中sqlalchemy.sql.expression.literal_column函数的典型用法代码示例。如果您正苦于以下问题:Python literal_column函数的具体用法?Python literal_column怎么用?Python literal_column使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了literal_column函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _run_query
def _run_query(start_date, end_date):
created_filters = [DBInstance.created < end_date,
DBInstance.deleted == 0]
created_columns = [DBInstance.created.label('timestamp'),
literal_column("0").label('deleted'),
DBDatastoreVersion.id.label('dsvid')]
deleted_filters = [DBInstance.created < end_date,
DBInstance.deleted_at >= start_date,
DBInstance.deleted == 1]
deleted_columns = [DBInstance.deleted_at.label('timestamp'),
literal_column("1").label('deleted'),
DBDatastoreVersion.id.label('dsvid')]
query1 = DBInstance.query().\
join(DBDatastoreVersion).\
add_columns(*created_columns)
query1 = query1.filter(*created_filters)
query2 = DBInstance.query().\
join(DBDatastoreVersion).\
add_columns(*created_columns)
query2 = query2.filter(*deleted_filters)
query3 = DBInstance.query().\
join(DBDatastoreVersion).\
add_columns(*deleted_columns)
query3 = query3.filter(*deleted_filters)
union_query = query1.union(query2, query3).\
order_by(text('anon_1.timestamp'))
return union_query.all()
开发者ID:Tesora,项目名称:tesora-trove,代码行数:32,代码来源:usage_report.py
示例2: visit_hierarchy
def visit_hierarchy(element, compiler, **kw):
"""visit compilation idiom for oracle"""
if compiler.dialect.server_version_info < supported_db['oracle']:
raise(HierarchyLesserError(compiler.dialect.name,
supported_db['oracle']))
else:
sel = element.select
sel.append_column(literal_column('level', type_=Integer))
sel.append_column(literal_column('CONNECT_BY_ISLEAF',
type_=Boolean).label('is_leaf'))
sel.append_column(literal_column(
"LTRIM(SYS_CONNECT_BY_PATH (%s,','),',')" % (element.child),
type_=String).label('connect_path'))
qry = "%s" % (compiler.process(sel))
if hasattr(element, 'starting_node') and \
getattr(element, 'starting_node') is not False:
if (element.starting_node == "a" and element.fk_type==String) or\
(element.starting_node == "0" and element.fk_type==Integer):
qry += " start with %s is null" % (element.parent)
elif getattr(element, 'starting_node') is False:
pass
else:
qry += " start with %s=%s" % (element.parent,
element.starting_node)
qry += " connect by prior %s=%s" % (element.child, element.parent)
if kw.get('asfrom', False):
qry = '(%s)' % qry
return qry
开发者ID:FashtimeDotCom,项目名称:sqla_hierarchy,代码行数:28,代码来源:hierarchy.py
示例3: construct_search
def construct_search(self, field_name, op=None):
if op == '^':
return literal_column(field_name).startswith
elif op == '=':
return literal_column(field_name).op('=')
else:
return literal_column(field_name).contains
开发者ID:ramin32,项目名称:Flask-SuperAdmin,代码行数:7,代码来源:view.py
示例4: quota_destroy_all_by_project
def quota_destroy_all_by_project(context, project_id):
session = get_session()
with session.begin():
model_query(context, models.Quota, session=session,
read_deleted="no").\
filter_by(project_id=project_id).\
update({'deleted': True,
'deleted_at': timeutils.utcnow(),
'updated_at': literal_column('updated_at')},
synchronize_session=False)
model_query(context, models.ProjectUserQuota, session=session,
read_deleted="no").\
filter_by(project_id=project_id).\
update({'deleted': True,
'deleted_at': timeutils.utcnow(),
'updated_at': literal_column('updated_at')},
synchronize_session=False)
model_query(context, models.QuotaUsage,
session=session, read_deleted="no").\
filter_by(project_id=project_id).\
update({'deleted': True,
'deleted_at': timeutils.utcnow(),
'updated_at': literal_column('updated_at')},
synchronize_session=False)
model_query(context, models.Reservation,
session=session, read_deleted="no").\
filter_by(project_id=project_id).\
update({'deleted': True,
'deleted_at': timeutils.utcnow(),
'updated_at': literal_column('updated_at')},
synchronize_session=False)
开发者ID:pombredanne,项目名称:manila,代码行数:34,代码来源:api.py
示例5: list_fleets
def list_fleets(self, message, user, params):
# Check the planet exists
planet = Planet.load(*params.group(1,3,5))
if planet is None:
message.alert("No planet with coords %s:%s:%s" % params.group(1,3,5))
return
# Find all fleets with a known alliance who have defended this planet
OQ = session.query(coalesce(FleetScan.launch_tick, FleetScan.landing_tick), literal_column("'From'").label("dir"), Planet.x, Planet.y, Planet.z, Alliance.name).select_from(FleetScan)
OQ = OQ.filter(FleetScan.target_id == planet.id, FleetScan.in_galaxy==False, FleetScan.mission=="Defend")
OQ = OQ.join(Intel, FleetScan.owner_id == Intel.planet_id).filter(Intel.alliance_id != None)
OQ = OQ.join(Alliance, Intel.alliance_id == Alliance.id).join(Planet, FleetScan.owner_id == Planet.id)
# Find all fleets with a known alliance who have been defended by this planet
TQ = session.query(coalesce(FleetScan.launch_tick, FleetScan.landing_tick), literal_column("'To '").label("dir"), Planet.x, Planet.y, Planet.z, Alliance.name).select_from(FleetScan)
TQ = TQ.filter(FleetScan.owner_id == planet.id, FleetScan.in_galaxy==False, FleetScan.mission=="Defend")
TQ = TQ.join(Intel, FleetScan.target_id == Intel.planet_id).filter(Intel.alliance_id != None)
TQ = TQ.join(Alliance, Intel.alliance_id == Alliance.id).join(Planet, FleetScan.target_id == Planet.id)
# Combine the results into one sorted list
results = sorted(OQ.all()+TQ.all(), reverse=True)
# Quit now if there are no results
if len(results) == 0:
message.reply("No suggestions found")
return
# Reply to the user
message.reply("Tick Dir Planet Alliance")
limit = int(params.group(6) or 5)
for r in results[:limit]:
message.reply("%4s %s %-9s %s" % (r[0], r[1], "%s:%s:%s" % (r[2], r[3], r[4]), r[5]))
if len(results) > limit:
message.reply("%s results not shown (%s total)" % (len(results)-limit, len(results)))
开发者ID:JDD,项目名称:merlin,代码行数:34,代码来源:guess.py
示例6: construct_search
def construct_search(self, field_name, op=None):
if op == "^":
return literal_column(field_name).startswith
elif op == "=":
return literal_column(field_name).op("=")
else:
return literal_column(field_name).ilike
开发者ID:le-zeste,项目名称:Flask-SuperAdmin,代码行数:7,代码来源:view.py
示例7: soft_delete
def soft_delete(self, synchronize_session="evaluate"):
return self.update(
{
"deleted": literal_column("id"),
"updated_at": literal_column("updated_at"),
"deleted_at": timeutils.utcnow(),
},
synchronize_session=synchronize_session,
)
开发者ID:Open-SFC,项目名称:nscs,代码行数:9,代码来源:session.py
示例8: initialize
def initialize(cls, url, initialize_data=True, initialize_schema=True):
"""Initialize the database.
This includes the schema, the materialized views, the custom
functions, and the initial content.
"""
if url in cls.engine_for_url:
engine = cls.engine_for_url[url]
return engine, engine.connect()
engine = cls.engine(url)
if initialize_schema:
cls.initialize_schema(engine)
connection = engine.connect()
# Check if the recursive equivalents function exists already.
query = select(
[literal_column('proname')]
).select_from(
table('pg_proc')
).where(
literal_column('proname')=='fn_recursive_equivalents'
)
result = connection.execute(query)
result = list(result)
# If it doesn't, create it.
if not result and initialize_data:
resource_file = os.path.join(
cls.resource_directory(), cls.RECURSIVE_EQUIVALENTS_FUNCTION
)
if not os.path.exists(resource_file):
raise IOError("Could not load recursive equivalents function from %s: file does not exist." % resource_file)
sql = open(resource_file).read()
connection.execute(sql)
if initialize_data:
session = Session(connection)
cls.initialize_data(session)
if connection:
connection.close()
if initialize_schema and initialize_data:
# Only cache the engine if all initialization has been performed.
#
# Some pieces of code (e.g. the script that runs
# migrations) have a legitimate need to bypass some of the
# initialization, but normal operation of the site
# requires that everything be initialized.
#
# Until someone tells this method to initialize
# everything, we can't short-circuit this method with a
# cache.
cls.engine_for_url[url] = engine
return engine, engine.connect()
开发者ID:NYPL-Simplified,项目名称:server_core,代码行数:56,代码来源:__init__.py
示例9: actions_query
def actions_query(post_id):
dbq = db.session
comments = dbq.query(Comment.id, Comment.time_posted.label("time"),
literal_column("'Comment'", Unicode).label("t"))\
.filter(Comment.post_id==post_id)
rels = dbq.query(Relation.id, Relation.time_linked.label("time"),
literal_column("'Relation'", Unicode).label("t"))\
.filter(Relation.parent_id==post_id)
q = comments.union(rels)
return q
开发者ID:yedi,项目名称:openthink,代码行数:10,代码来源:db_queries.py
示例10: drop_old_duplicate_entries_from_table
def drop_old_duplicate_entries_from_table(migrate_engine, table_name, use_soft_delete, *uc_column_names):
"""Drop all old rows having the same values for columns in uc_columns.
This method drop (or mark ad `deleted` if use_soft_delete is True) old
duplicate rows form table with name `table_name`.
:param migrate_engine: Sqlalchemy engine
:param table_name: Table with duplicates
:param use_soft_delete: If True - values will be marked as `deleted`,
if False - values will be removed from table
:param uc_column_names: Unique constraint columns
"""
meta = MetaData()
meta.bind = migrate_engine
table = Table(table_name, meta, autoload=True)
columns_for_group_by = [table.c[name] for name in uc_column_names]
columns_for_select = [func.max(table.c.id)]
columns_for_select.extend(columns_for_group_by)
duplicated_rows_select = select(
columns_for_select, group_by=columns_for_group_by, having=func.count(table.c.id) > 1
)
for row in migrate_engine.execute(duplicated_rows_select):
# NOTE(boris-42): Do not remove row that has the biggest ID.
delete_condition = table.c.id != row[0]
is_none = None # workaround for pyflakes
delete_condition &= table.c.deleted_at == is_none
for name in uc_column_names:
delete_condition &= table.c[name] == row[name]
rows_to_delete_select = select([table.c.id]).where(delete_condition)
for row in migrate_engine.execute(rows_to_delete_select).fetchall():
LOG.info(
_("Deleting duplicated row with id: %(id)s from table: " "%(table)s")
% dict(id=row[0], table=table_name)
)
if use_soft_delete:
delete_statement = (
table.update()
.where(delete_condition)
.values(
{
"deleted": literal_column("id"),
"updated_at": literal_column("updated_at"),
"deleted_at": timeutils.utcnow(),
}
)
)
else:
delete_statement = table.delete().where(delete_condition)
migrate_engine.execute(delete_statement)
开发者ID:koder-ua,项目名称:fuel-web,代码行数:55,代码来源:utils.py
示例11: create_stm
def create_stm(self, columns=list(), filters=None, ordering=None, limit=None, start=None, url_filters=None,
associations=None):
self.set_filters(filters)
self.set_query_columns(columns)
self.set_url_filters(url_filters)
self.limit = limit
self.start = start
self.ordering = ordering
stm = select(self.query_columns).select_from(self.table)
filters = list()
coordinates_filter = ""
for condition in self.filters:
if condition.get("op") == "coordinates":
coordinates_filter = self.get_condition_square(
condition.get("lowerleft"),
condition.get("upperright"),
condition.get("property_ra"),
condition.get("property_dec"))
else:
filters.append(condition)
base_filters = and_(*self.do_filter(self.table, filters))
stm = stm.where(and_(base_filters, coordinates_filter))
# Ordenacao
if self.ordering is not None:
asc = True
property = self.ordering.lower()
if self.ordering[0] == '-':
asc = False
property = self.ordering[1:].lower()
if asc:
stm = stm.order_by(property)
else:
stm = stm.order_by(desc(property))
# Paginacao
if self.limit:
stm = stm.limit(literal_column(str(self.limit)))
if self.start:
stm = stm.offset(literal_column(str(self.start)))
print(str(stm))
return stm
开发者ID:linea-it,项目名称:dri,代码行数:55,代码来源:CatalogDB.py
示例12: plan_destroy
def plan_destroy(context, plan_id):
session = get_session()
now = timeutils.utcnow()
with session.begin():
model_query(context, models.Plan, session=session).\
filter_by(id=plan_id).\
update({'deleted': True,
'deleted_at': now,
'updated_at': literal_column('updated_at')})
model_query(context, models.Resource, session=session).\
filter_by(plan_id=plan_id).\
update({'deleted': True,
'deleted_at': now,
'updated_at': literal_column('updated_at')})
开发者ID:WeAreFormalGroup,项目名称:smaug,代码行数:14,代码来源:api.py
示例13: migration_destroy
def migration_destroy(context, migration_id):
session = get_session()
now = timeutils.utcnow()
with session.begin():
model_query(context, models.Migration, session=session).\
filter_by(id=migration_id).\
update({'deleted': True,
'deleted_at': now,
'updated_at': literal_column('updated_at')})
model_query(context, models.MigrationProperties, session=session).\
filter_by(migration_id=migration_id).\
update({'deleted': True,
'deleted_at': now,
'updated_at': literal_column('updated_at')})
开发者ID:aptira,项目名称:guts,代码行数:14,代码来源:api.py
示例14: report_json
def report_json(request):
try:
division_code = request.matchdict.get('divisioncode')
except:
raise HTTPBadRequest(detail='incorrect value for parameter '
'"divisioncode"')
try:
resolution = float(request.params.get('resolution'))
except:
raise HTTPBadRequest(detail='invalid value for parameter "resolution"')
hazard_type = request.matchdict.get('hazardtype', None)
_filter = or_(AdministrativeDivision.code == division_code,
AdministrativeDivision.parent_code == division_code)
simplify = func.ST_Simplify(
func.ST_Transform(AdministrativeDivision.geom, 3857), resolution / 2)
if hazard_type is not None:
divisions = DBSession.query(AdministrativeDivision) \
.add_columns(simplify, HazardLevel.mnemonic, HazardLevel.title) \
.outerjoin(AdministrativeDivision.hazardcategories) \
.join(HazardCategory) \
.join(HazardType)\
.join(HazardLevel) \
.filter(and_(_filter, HazardType.mnemonic == hazard_type))
else:
divisions = DBSession.query(AdministrativeDivision) \
.add_columns(simplify, literal_column("'None'"),
literal_column("'blah'")) \
.filter(_filter)
return [{
'type': 'Feature',
'geometry': to_shape(geom_simplified),
'properties': {
'name': division.name,
'code': division.code,
'url': request.route_url(
'report' if hazard_type else 'report_overview',
division=division,
hazardtype=hazard_type),
'hazardLevelMnemonic': hazardlevel_mnemonic,
'hazardLevelTitle': hazardlevel_title
}
} for division, geom_simplified, hazardlevel_mnemonic,
hazardlevel_title in divisions]
开发者ID:pgiraud,项目名称:thinkhazard,代码行数:50,代码来源:report.py
示例15: _wall_events_query
def _wall_events_query(self):
"""WallMixin implementation."""
public_event_types = [
'group_created',
'subject_created',
'subject_modified',
'page_created',
'page_modified',
]
from ututi.lib.wall import generic_events_query
t_evt = meta.metadata.tables['events']
evts_generic = generic_events_query()
if hasattr(c, 'teacher'):
user_id = c.teacher.id
else:
user_id = c.user_info.id
query = evts_generic\
.where(t_evt.c.author_id == user_id)
# XXX using literal_column, this is because I don't know how
# to refer to the column in the query directly
query = query.where(or_(t_evt.c.event_type.in_(public_event_types),
and_(t_evt.c.event_type == 'file_uploaded',
literal_column('context_ci.content_type') == 'subject')))
return query
开发者ID:nous-consulting,项目名称:ututi,代码行数:28,代码来源:user.py
示例16: repl
def repl(element):
if isinstance(element, expression._BindParamClause):
return expression.literal_column(_quote_ddl_expr(element.value))
elif isinstance(element, expression.ColumnClause) and element.table is not None:
return expression.column(element.name)
else:
return None
开发者ID:pszafer,项目名称:dlna_upnp_invention,代码行数:7,代码来源:util.py
示例17: load_balancer_update_state
def load_balancer_update_state(context, load_balancer_uuid, state):
with context.session.begin():
context.session.query(models.LoadBalancer).filter_by(
uuid=load_balancer_uuid).update(
{'state': state,
'updated_at': literal_column('updated_at')}
)
开发者ID:ljjjustin,项目名称:nozzle,代码行数:7,代码来源:api.py
示例18: user_destroy
def user_destroy(user_id):
db_session.query(models.User).\
filter_by(id=user_id).\
update({'deleted': True,
'deleted_at': timeutils.utcnow(),
'updated_at': literal_column('updated_at')})
db_session.commit()
开发者ID:tpiperatgod,项目名称:turtle-web,代码行数:7,代码来源:dbapi.py
示例19: get_elevation
def get_elevation(fixes):
shortener = int(max(1, len(fixes) / 1000))
coordinates = [(fix[2]['longitude'], fix[2]['latitude']) for fix in fixes]
points = MultiPoint(coordinates[::shortener])
locations = from_shape(points, srid=4326).ST_DumpPoints()
locations_id = extract_array_item(locations.path, 1)
subq = db.session.query(locations_id.label('location_id'),
locations.geom.label('location')).subquery()
elevation = Elevation.rast.ST_Value(subq.c.location)
# Prepare main query
q = db.session.query(literal_column('location_id'), elevation.label('elevation')) \
.filter(and_(subq.c.location.ST_Intersects(Elevation.rast),
elevation != None)).all()
fixes_copy = [list(fix) for fix in fixes]
for i in xrange(1, len(q)):
prev = q[i - 1].location_id - 1
current = q[i].location_id - 1
for j in range(prev * shortener, current * shortener):
elev = q[i - 1].elevation + (q[i].elevation - q[i - 1].elevation) * (j - prev * shortener)
fixes_copy[j][11] = elev
return fixes_copy
开发者ID:bbonamin,项目名称:Skylines,代码行数:29,代码来源:flightpath.py
示例20: volume_type_destroy
def volume_type_destroy(context, name):
session = get_session()
with session.begin():
volume_type_ref = volume_type_get_by_name(context, name,
session=session)
volume_type_id = volume_type_ref['id']
session.query(models.VolumeTypes).\
filter_by(id=volume_type_id).\
update({'deleted': True,
'deleted_at': timeutils.utcnow(),
'updated_at': literal_column('updated_at')})
session.query(models.VolumeTypeExtraSpecs).\
filter_by(volume_type_id=volume_type_id).\
update({'deleted': True,
'deleted_at': timeutils.utcnow(),
'updated_at': literal_column('updated_at')})
开发者ID:CiscoSystems,项目名称:cinder-old,代码行数:16,代码来源:api.py
注:本文中的sqlalchemy.sql.expression.literal_column函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论