本文整理汇总了Python中sqlalchemy.sql.null函数的典型用法代码示例。如果您正苦于以下问题:Python null函数的具体用法?Python null怎么用?Python null使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了null函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _process_null_records
def _process_null_records(table, col_name, check_fkeys, delete=False):
"""Queries the database and optionally deletes the NULL records.
:param table: sqlalchemy.Table object.
:param col_name: The name of the column to check in the table.
:param check_fkeys: If True, check the table for foreign keys back to the
instances table and if not found, return.
:param delete: If true, run a delete operation on the table, else just
query for number of records that match the NULL column.
:returns: The number of records processed for the table and column.
"""
records = 0
if col_name in table.columns:
# NOTE(mriedem): filter out tables that don't have a foreign key back
# to the instances table since they could have stale data even if
# instances.uuid wasn't NULL.
if check_fkeys:
fkey_found = False
fkeys = table.c[col_name].foreign_keys or []
for fkey in fkeys:
if fkey.column.table.name == 'instances':
fkey_found = True
if not fkey_found:
return 0
if delete:
records = table.delete().where(
table.c[col_name] == null()
).execute().rowcount
else:
records = len(list(
table.select().where(table.c[col_name] == null()).execute()
))
return records
开发者ID:EdLeafe,项目名称:nova,代码行数:35,代码来源:migration.py
示例2: test_reduce_aliased_union_2
def test_reduce_aliased_union_2(self):
metadata = MetaData()
page_table = Table('page', metadata,
Column('id', Integer, primary_key=True),
)
magazine_page_table = Table('magazine_page', metadata,
Column('page_id', Integer, ForeignKey('page.id'), primary_key=True),
)
classified_page_table = Table('classified_page', metadata,
Column('magazine_page_id', Integer, ForeignKey('magazine_page.page_id'), primary_key=True),
)
# this is essentially the union formed by the ORM's polymorphic_union function.
# we define two versions with different ordering of selects.
# the first selectable has the "real" column classified_page.magazine_page_id
pjoin = union(
select([
page_table.c.id,
magazine_page_table.c.page_id,
classified_page_table.c.magazine_page_id
]).select_from(page_table.join(magazine_page_table).join(classified_page_table)),
select([
page_table.c.id,
magazine_page_table.c.page_id,
cast(null(), Integer).label('magazine_page_id')
]).select_from(page_table.join(magazine_page_table)),
).alias('pjoin')
eq_(
util.column_set(sql_util.reduce_columns([pjoin.c.id, pjoin.c.page_id, pjoin.c.magazine_page_id])),
util.column_set([pjoin.c.id])
)
# the first selectable has a CAST, which is a placeholder for
# classified_page.magazine_page_id in the second selectable. reduce_columns
# needs to take into account all foreign keys derived from pjoin.c.magazine_page_id.
# the UNION construct currently makes the external column look like that of the first
# selectable only.
pjoin = union(
select([
page_table.c.id,
magazine_page_table.c.page_id,
cast(null(), Integer).label('magazine_page_id')
]).select_from(page_table.join(magazine_page_table)),
select([
page_table.c.id,
magazine_page_table.c.page_id,
classified_page_table.c.magazine_page_id
]).select_from(page_table.join(magazine_page_table).join(classified_page_table))
).alias('pjoin')
eq_(
util.column_set(sql_util.reduce_columns([pjoin.c.id, pjoin.c.page_id, pjoin.c.magazine_page_id])),
util.column_set([pjoin.c.id])
)
开发者ID:gajop,项目名称:springgrid,代码行数:60,代码来源:test_selectable.py
示例3: test_union_against_join
def test_union_against_join(self):
# same as testunion, except its an alias of the union
u = select([table1.c.col1, table1.c.col2, table1.c.col3, table1.c.colx, null().label('coly')]).union(
select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])
).alias('analias')
j1 = table1.join(table2)
assert u.corresponding_column(j1.c.table1_colx) is u.c.colx
assert j1.corresponding_column(u.c.colx) is j1.c.table1_colx
开发者ID:clones,项目名称:sqlalchemy,代码行数:8,代码来源:test_selectable.py
示例4: col
def col(name, table):
try:
return colnamemaps[table][name]
except KeyError:
if cast_nulls:
return sql.cast(sql.null(), types[name]).label(name)
else:
return sql.type_coerce(sql.null(), types[name]).label(name)
开发者ID:Am3s,项目名称:CouchPotatoServer,代码行数:8,代码来源:util.py
示例5: test_select_union
def test_select_union(self):
# like testaliasunion, but off a Select off the union.
u = select([table1.c.col1, table1.c.col2, table1.c.col3, table1.c.colx, null().label('coly')]).union(
select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])
).alias('analias')
s = select([u])
s1 = table1.select(use_labels=True)
s2 = table2.select(use_labels=True)
assert s.corresponding_column(s1.c.table1_col2) is s.c.col2
assert s.corresponding_column(s2.c.table2_col2) is s.c.col2
开发者ID:clones,项目名称:sqlalchemy,代码行数:10,代码来源:test_selectable.py
示例6: ___images_by_windowed_meta
def ___images_by_windowed_meta(
self,
context,
period_start,
period_stop,
project_id,
metadata
):
"""Simulated bottom most layer
:param context:
:param period_start: Datetime
:param period_stop: Datetime
:param project_id: String
:param metadata:
"""
if metadata:
aliases = [aliased(models.ImageProperty) for i in metadata]
else:
aliases = []
session = get_session()
query = session.query(
models.Image,
*aliases
)
query = query.filter(or_(models.Image.deleted_at == null(),
models.Image.deleted_at > period_start))
if period_stop:
query = query.filter(models.Image.created_at < period_stop)
if project_id:
query = query.filter_by(project_id=project_id)
if metadata:
for keypair, alias in zip(metadata.items(), aliases):
query = query.filter(alias.name == keypair[0])
query = query.filter(alias.value == keypair[1])
query = query.filter(alias.image_id == models.Image.id)
query = query.filter(or_(
alias.deleted_at == null(),
alias.deleted_at == models.Image.deleted_at
))
images = []
for tup in query.all():
if aliases:
image = tup[0]
# props = tup[1:]
else:
image = tup
# props = None
images.append(dict(image))
return images
开发者ID:absalon-james,项目名称:os_usage,代码行数:55,代码来源:usage.py
示例7: test_alias_union
def test_alias_union(self):
# same as testunion, except its an alias of the union
u = select([table1.c.col1, table1.c.col2, table1.c.col3, table1.c.colx, null().label('coly')]).union(
select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])
).alias('analias')
s1 = table1.select(use_labels=True)
s2 = table2.select(use_labels=True)
assert u.corresponding_column(s1.c.table1_col2) is u.c.col2
assert u.corresponding_column(s2.c.table2_col2) is u.c.col2
assert u.corresponding_column(s2.c.table2_coly) is u.c.coly
assert s2.corresponding_column(u.c.coly) is s2.c.table2_coly
开发者ID:clones,项目名称:sqlalchemy,代码行数:11,代码来源:test_selectable.py
示例8: test_union
def test_union(self):
# tests that we can correspond a column in a Select statement with a certain Table, against
# a column in a Union where one of its underlying Selects matches to that same Table
u = select([table1.c.col1, table1.c.col2, table1.c.col3, table1.c.colx, null().label('coly')]).union(
select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])
)
s1 = table1.select(use_labels=True)
s2 = table2.select(use_labels=True)
c = u.corresponding_column(s1.c.table1_col2)
assert u.corresponding_column(s1.c.table1_col2) is u.c.col2
assert u.corresponding_column(s2.c.table2_col2) is u.c.col2
开发者ID:clones,项目名称:sqlalchemy,代码行数:11,代码来源:test_selectable.py
示例9: ___get_active_by_window_metadata
def ___get_active_by_window_metadata(self, context, period_start,
period_stop=None,
project_id=None,
metadata=None,
use_slave=False):
"""Simulate bottom most layer
:param context: wsgi context
:param period_start: Datetime
:param period_stop: Datetime
:param project_id: String|None
:param metadata: Dict|None
:param use_slave: Boolean
"""
if metadata:
aliases = [aliased(models.VolumeMetadata) for i in metadata]
else:
aliases = []
session = get_session(use_slave=use_slave)
query = session.query(
models.Volume,
*aliases
)
query = query.filter(or_(models.Volume.terminated_at == null(),
models.Volume.terminated_at > period_start))
if period_stop:
query = query.filter(models.Volume.launched_at < period_stop)
if project_id:
query = query.filter_by(project_id=project_id)
if metadata:
for keypair, alias in zip(metadata.items(), aliases):
query = query.filter(alias.key == keypair[0])
query = query.filter(alias.value == keypair[1])
query = query.filter(alias.volume_id == models.Volume.id)
query = query.filter(or_(
alias.deleted_at == null(),
alias.deleted_at == models.Volume.deleted_at
))
volumes = []
for tup in query.all():
# If no metadata filters, then no aliases.
if aliases:
volume = tup[0]
else:
volume = tup
volumes.append(dict(volume))
return volumes
开发者ID:absalon-james,项目名称:os_usage,代码行数:52,代码来源:usage.py
示例10: _get_node_empty_ratio
def _get_node_empty_ratio(context, max_count):
"""Query the DB for non-deleted compute_nodes with 0.0/None alloc ratios
Results are limited by ``max_count``.
"""
return context.session.query(models.ComputeNode).filter(or_(
models.ComputeNode.ram_allocation_ratio == '0.0',
models.ComputeNode.cpu_allocation_ratio == '0.0',
models.ComputeNode.disk_allocation_ratio == '0.0',
models.ComputeNode.ram_allocation_ratio == null(),
models.ComputeNode.cpu_allocation_ratio == null(),
models.ComputeNode.disk_allocation_ratio == null()
)).filter(models.ComputeNode.deleted == 0).limit(max_count).all()
开发者ID:mikalstill,项目名称:nova,代码行数:13,代码来源:compute_node.py
示例11: scan_for_null_records
def scan_for_null_records(table, col_name, check_fkeys):
"""Queries the table looking for NULL instances of the given column.
:param col_name: The name of the column to look for in the table.
:param check_fkeys: If True, check the table for foreign keys back to the
instances table and if not found, return.
:raises: exception.ValidationError: If any records are found.
"""
if col_name in table.columns:
# NOTE(mriedem): filter out tables that don't have a foreign key back
# to the instances table since they could have stale data even if
# instances.uuid wasn't NULL.
if check_fkeys:
fkey_found = False
fkeys = table.c[col_name].foreign_keys or []
for fkey in fkeys:
if fkey.column.table.name == 'instances':
fkey_found = True
if not fkey_found:
return
records = len(list(
table.select().where(table.c[col_name] == null()).execute()
))
if records:
msg = _("There are %(records)d records in the "
"'%(table_name)s' table where the uuid or "
"instance_uuid column is NULL. These must be "
"manually cleaned up before the migration will pass. "
"Consider running the "
"'nova-manage db null_instance_uuid_scan' command.") % (
{'records': records, 'table_name': table.name})
raise exception.ValidationError(detail=msg)
开发者ID:375670450,项目名称:nova,代码行数:34,代码来源:267_instance_uuid_non_nullable.py
示例12: bm_node_get_associated
def bm_node_get_associated(context, service_host=None):
query = model_query(context, models.BareMetalNode, read_deleted="no").filter(
models.BareMetalNode.instance_uuid != null()
)
if service_host:
query = query.filter_by(service_host=service_host)
return query.all()
开发者ID:nagyist,项目名称:openstack-nova,代码行数:7,代码来源:api.py
示例13: check_versions
def check_versions(self):
"""Checks the whole database for incompatible objects.
This scans all the tables in search of objects that are not supported;
i.e., those that are not specified in
`ironic.common.release_mappings.RELEASE_MAPPING`. This includes objects
that have null 'version' values.
:returns: A Boolean. True if all the objects have supported versions;
False otherwise.
"""
object_versions = release_mappings.get_object_versions()
for model in models.Base.__subclasses__():
if model.__name__ in object_versions:
supported_versions = object_versions[model.__name__]
if not supported_versions:
continue
# NOTE(rloo): .notin_ does not handle null:
# http://docs.sqlalchemy.org/en/latest/core/sqlelement.html#sqlalchemy.sql.operators.ColumnOperators.notin_
query = model_query(model).filter(
sql.or_(model.version == sql.null(),
model.version.notin_(supported_versions)))
if query.count():
return False
return True
开发者ID:pshchelo,项目名称:ironic,代码行数:25,代码来源:api.py
示例14: update_beacon_receiver_distance
def update_beacon_receiver_distance(name):
"""
Calculate the distance between the receiver and its received aircrafts
and write this data into each aircraft_beacon.
"""
last_receiver_beacon = app.session.query(ReceiverBeacon) \
.filter(ReceiverBeacon.name == name) \
.order_by(desc(ReceiverBeacon.timestamp)) \
.first()
if (last_receiver_beacon is None):
return
aircraft_beacons_query = app.session.query(AircraftBeacon) \
.filter(and_(AircraftBeacon.timestamp > last_receiver_beacon.timestamp,
AircraftBeacon.receiver_name == name,
AircraftBeacon.radius == null()))
for aircraft_beacon in aircraft_beacons_query.all():
location0 = (last_receiver_beacon.latitude, last_receiver_beacon.longitude)
location1 = (aircraft_beacon.latitude, aircraft_beacon.longitude)
alt0 = last_receiver_beacon.altitude
alt1 = aircraft_beacon.altitude
(flat_distance, phi) = haversine_distance(location0, location1)
theta = atan2(alt1 - alt0, flat_distance) * 180 / pi
distance = sqrt(flat_distance**2 + (alt1 - alt0)**2)
aircraft_beacon.radius = distance
aircraft_beacon.theta = theta
aircraft_beacon.phi = phi
app.session.commit()
logger.warning("Updated receiver {}.".format(name))
开发者ID:kerel-fs,项目名称:ogn-python,代码行数:35,代码来源:heatmap.py
示例15: organizations_and_counters
def organizations_and_counters():
'''Query organizations with their counters'''
memberships = aliased(model.Member)
query = DB.query(model.Group,
func.count(distinct(model.Package.id)).label('nb_datasets'),
func.count(distinct(memberships.id)).label('nb_members')
)
query = query.outerjoin(CertifiedPublicService)
query = query.outerjoin(model.Package, and_(
model.Group.id == model.Package.owner_org,
~model.Package.private,
model.Package.state == 'active',
))
query = query.outerjoin(memberships, and_(
memberships.group_id == model.Group.id,
memberships.state == 'active',
memberships.table_name == 'user'
))
query = query.filter(model.Group.state == 'active')
query = query.filter(model.Group.approval_status == 'approved')
query = query.filter(model.Group.is_organization == True)
query = query.group_by(model.Group.id, CertifiedPublicService.organization_id)
query = query.order_by(
CertifiedPublicService.organization_id == null(),
desc('nb_datasets'),
desc('nb_members'),
model.Group.title
)
return query
开发者ID:etalab,项目名称:ckanext-youckan,代码行数:30,代码来源:queries.py
示例16: organizations_and_counters
def organizations_and_counters():
'''Query organizations with their counters'''
query = DB.query(Group,
func.count(distinct(Package.id)).label('nb_datasets'),
func.count(distinct(Member.id)).label('nb_members')
)
query = query.outerjoin(CertifiedPublicService)
query = query.outerjoin(Package, and_(
Group.id == Package.owner_org,
~Package.private,
Package.state == 'active',
))
query = query.outerjoin(Member, and_(
Member.group_id == Group.id,
Member.state == 'active',
Member.table_name == 'user'
))
query = query.filter(Group.state == 'active')
query = query.filter(Group.approval_status == 'approved')
query = query.filter(Group.is_organization == True)
query = query.group_by(Group.id, CertifiedPublicService.organization_id)
query = query.order_by(
CertifiedPublicService.organization_id == null(),
desc('nb_datasets'),
desc('nb_members'),
Group.title
)
query = query.options(orm.joinedload(Group.certified_public_service))
return query
开发者ID:etalab,项目名称:weckan,代码行数:29,代码来源:queries.py
示例17: upgrade
def upgrade():
network_iface = (CONF.default_network_interface or
('flat' if CONF.dhcp.dhcp_provider == 'neutron'
else 'noop'))
op.execute(
node.update().where(
node.c.network_interface == null()).values(
{'network_interface': network_iface}))
开发者ID:Tehsmash,项目名称:ironic,代码行数:8,代码来源:c14cef6dfedf_populate_node_network_interface.py
示例18: const
def const( me, value):
if _debug: print '?const', value
if value is None: return sql.null()
try:
m = object_mapper( value)
except Exception, e:
#print 'XXXXXXXXXXXXXX', e
pass
开发者ID:hstanev,项目名称:dbcook,代码行数:8,代码来源:expression.py
示例19: _get_build_requests_with_no_instance_uuid
def _get_build_requests_with_no_instance_uuid(context, limit):
"""Returns up to $limit build_requests where instance_uuid is null"""
# build_requests don't use the SoftDeleteMixin so we don't have to filter
# on the deleted column.
return context.session.query(api_models.BuildRequest).\
filter_by(instance_uuid=null()).\
limit(limit).\
all()
开发者ID:sapcc,项目名称:nova,代码行数:8,代码来源:build_request.py
示例20: reschedule_routers_from_down_agents
def reschedule_routers_from_down_agents(self):
"""Reschedule routers from down l3 agents if admin state is up."""
# give agents extra time to handle transient failures
agent_dead_limit = cfg.CONF.agent_down_time * 2
# check for an abrupt clock change since last check. if a change is
# detected, sleep for a while to let the agents check in.
tdelta = timeutils.utcnow() - getattr(self, "_clock_jump_canary", timeutils.utcnow())
if timeutils.total_seconds(tdelta) > cfg.CONF.agent_down_time:
LOG.warn(
_LW(
"Time since last L3 agent reschedule check has "
"exceeded the interval between checks. Waiting "
"before check to allow agents to send a heartbeat "
"in case there was a clock adjustment."
)
)
time.sleep(agent_dead_limit)
self._clock_jump_canary = timeutils.utcnow()
context = n_ctx.get_admin_context()
cutoff = timeutils.utcnow() - datetime.timedelta(seconds=agent_dead_limit)
down_bindings = (
context.session.query(RouterL3AgentBinding)
.join(agents_db.Agent)
.filter(agents_db.Agent.heartbeat_timestamp < cutoff, agents_db.Agent.admin_state_up)
.outerjoin(
l3_attrs_db.RouterExtraAttributes,
l3_attrs_db.RouterExtraAttributes.router_id == RouterL3AgentBinding.router_id,
)
.filter(
sa.or_(
l3_attrs_db.RouterExtraAttributes.ha == sql.false(),
l3_attrs_db.RouterExtraAttributes.ha == sql.null(),
)
)
)
try:
for binding in down_bindings:
LOG.warn(
_LW(
"Rescheduling router %(router)s from agent %(agent)s "
"because the agent did not report to the server in "
"the last %(dead_time)s seconds."
),
{"router": binding.router_id, "agent": binding.l3_agent_id, "dead_time": agent_dead_limit},
)
try:
self.reschedule_router(context, binding.router_id)
except (l3agentscheduler.RouterReschedulingFailed, n_rpc.RemoteError):
# Catch individual router rescheduling errors here
# so one broken one doesn't stop the iteration.
LOG.exception(_LE("Failed to reschedule router %s"), binding.router_id)
except db_exc.DBError:
# Catch DB errors here so a transient DB connectivity issue
# doesn't stop the loopingcall.
LOG.exception(_LE("Exception encountered during router " "rescheduling."))
开发者ID:nash-x,项目名称:hws,代码行数:58,代码来源:l3_agentschedulers_db.py
注:本文中的sqlalchemy.sql.null函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论