本文整理汇总了Python中sqlalchemy.inspect函数的典型用法代码示例。如果您正苦于以下问题:Python inspect函数的具体用法?Python inspect怎么用?Python inspect使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了inspect函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_relations_cascade
def test_relations_cascade(self):
from sqlalchemy import inspect
from ..models import Group, User
group = self._make(Group(name="foo"))
user1 = self._make(
User(
username="foo",
encrypted_password="dummy",
ident="fooident",
name="Nameless Foo",
groups=[group],
)
)
user2 = self._make(
User(
username="foo2",
encrypted_password="dummy",
ident="foo2ident",
name="Nameless Foo2",
groups=[group],
)
)
self.dbsession.commit()
self.dbsession.delete(group)
self.dbsession.commit()
self.assertTrue(inspect(group).was_deleted)
self.assertFalse(inspect(user1).was_deleted)
self.assertFalse(inspect(user2).was_deleted)
开发者ID:pxfs,项目名称:fanboi2,代码行数:29,代码来源:test_models_group.py
示例2: test_indexed_entity
def test_indexed_entity(self):
umapper = inspect(self.classes.User)
amapper = inspect(self.classes.Address)
path = PathRegistry.coerce((umapper, umapper.attrs.addresses,
amapper, amapper.attrs.email_address))
is_(path[0], umapper)
is_(path[2], amapper)
开发者ID:CyberCollins,项目名称:sqlalchemy,代码行数:7,代码来源:test_utils.py
示例3: test_eq
def test_eq(self):
umapper = inspect(self.classes.User)
amapper = inspect(self.classes.Address)
u_alias = inspect(aliased(self.classes.User))
p1 = PathRegistry.coerce((umapper, umapper.attrs.addresses))
p2 = PathRegistry.coerce((umapper, umapper.attrs.addresses))
p3 = PathRegistry.coerce((umapper, umapper.attrs.name))
p4 = PathRegistry.coerce((u_alias, umapper.attrs.addresses))
p5 = PathRegistry.coerce((umapper, umapper.attrs.addresses, amapper))
p6 = PathRegistry.coerce((amapper, amapper.attrs.user, umapper,
umapper.attrs.addresses))
p7 = PathRegistry.coerce((amapper, amapper.attrs.user, umapper,
umapper.attrs.addresses,
amapper, amapper.attrs.email_address))
is_(p1 == p2, True)
is_(p1 == p3, False)
is_(p1 == p4, False)
is_(p1 == p5, False)
is_(p6 == p7, False)
is_(p6 == p7.parent.parent, True)
is_(p1 != p2, False)
is_(p1 != p3, True)
is_(p1 != p4, True)
is_(p1 != p5, True)
开发者ID:CyberCollins,项目名称:sqlalchemy,代码行数:26,代码来源:test_utils.py
示例4: get_parent
def get_parent(self, attr_name):
# first, try grabbing it directly
parent = getattr(self, attr_name)
if parent:
return parent
# if nothing was found, grab the fk and lookup manually
mapper = inspect(type(self))
attr = getattr(type(self), attr_name)
prop = attr.property
local_col, remote_col = prop.local_remote_pairs[0]
local_prop = mapper.get_property_by_column(local_col)
value = getattr(self, local_prop.key)
if not value:
# no relation and no fk = no parent
return None
parent_cls = type(self).get_related_class(attr_name)
mapper = inspect(parent_cls)
remote_prop = mapper.get_property_by_column(remote_col)
filters = {remote_prop.key: value}
orm = ORM.get()
session = orm.sessionmaker()
parent = session.query(parent_cls).filter_by(**filters).first()
return parent
开发者ID:devhub,项目名称:baph,代码行数:27,代码来源:mixins.py
示例5: test_attrs_props_prop_added_after_configure
def test_attrs_props_prop_added_after_configure(self):
class AnonClass(object):
pass
from sqlalchemy.orm import mapper, column_property
from sqlalchemy.ext.hybrid import hybrid_property
m = mapper(AnonClass, self.tables.users)
eq_(
set(inspect(AnonClass).attrs.keys()),
set(['id', 'name']))
eq_(
set(inspect(AnonClass).all_orm_descriptors.keys()),
set(['id', 'name']))
m.add_property('q', column_property(self.tables.users.c.name))
def desc(self):
return self.name
AnonClass.foob = hybrid_property(desc)
eq_(
set(inspect(AnonClass).attrs.keys()),
set(['id', 'name', 'q']))
eq_(
set(inspect(AnonClass).all_orm_descriptors.keys()),
set(['id', 'name', 'q', 'foob']))
开发者ID:m32,项目名称:sqlalchemy,代码行数:27,代码来源:test_inspect.py
示例6: get_tables
def get_tables(mixed):
"""
Return a list of tables associated with given SQLAlchemy object.
Let's say we have three classes which use joined table inheritance
TextItem, Article and BlogPost. Article and BlogPost inherit TextItem.
::
get_tables(Article) # [Table('article', ...), Table('text_item')]
get_tables(Article())
get_tables(Article.__mapper__)
.. versionadded: 0.26.0
:param mixed:
SQLAlchemy Mapper / Declarative class or a SA Alias object wrapping
any of these objects.
"""
if isinstance(mixed, sa.orm.util.AliasedClass):
mapper = sa.inspect(mixed).mapper
else:
if not isclass(mixed):
mixed = mixed.__class__
mapper = sa.inspect(mixed)
return mapper.tables
开发者ID:Myconomy,项目名称:sqlalchemy-utils,代码行数:29,代码来源:orm.py
示例7: create_highlight
def create_highlight(self, **options):
"""
Returns an error message (string) if something went wrong, otherwise returns True
"""
if self.online is False or self.current_stream_chunk is None:
return 'The stream is not online'
if self.current_stream_chunk.video_url is None:
return 'No video URL fetched for this chunk yet, try in 5 minutes'
try:
highlight = StreamChunkHighlight(self.current_stream_chunk, **options)
session = DBManager.create_session(expire_on_commit=False)
session.add(highlight)
session.add(self.current_stream_chunk)
session.commit()
session.close()
x = inspect(self.current_stream_chunk)
log.info('{0.transient} - {0.pending} - {0.persistent} - {0.detached}'.format(x))
x = inspect(highlight)
log.info('{0.transient} - {0.pending} - {0.persistent} - {0.detached}'.format(x))
x = inspect(self.current_stream)
log.info('{0.transient} - {0.pending} - {0.persistent} - {0.detached}'.format(x))
log.info(self.current_stream.id)
log.info(highlight.id)
log.info(self.current_stream_chunk.id)
except:
log.exception('uncaught exception in create_highlight')
return 'Unknown reason, ask pajlada'
return True
开发者ID:savageboy74,项目名称:tyggbot,代码行数:34,代码来源:stream.py
示例8: test_postload_immutability
def test_postload_immutability(self):
i1 = IdOnly(is_regular=1, is_immutable=2, is_cached=3)
i2 = IdUuid(is_regular='a', is_immutable='b', is_cached='c')
i3 = UuidOnly(is_regular='x', is_immutable='y', is_cached='z')
self.session.add_all([i1, i2, i3])
self.session.commit()
id1 = i1.id
id2 = i2.id
id3 = i3.id
# Delete objects so SQLAlchemy's session cache can't populate fields from them
del i1, i2, i3
# Using `query.get` appears to ignore the `load_only` option,
# so we use `query.filter_by`
pi1 = IdOnly.query.options(db.load_only('id')).filter_by(id=id1).one()
pi2 = IdUuid.query.options(db.load_only('id')).filter_by(id=id2).one()
pi3 = UuidOnly.query.options(db.load_only('id')).filter_by(id=id3).one()
# Confirm there is no value for is_immutable
self.assertIs(inspect(pi1).attrs.is_immutable.loaded_value, NO_VALUE)
self.assertIs(inspect(pi2).attrs.is_immutable.loaded_value, NO_VALUE)
self.assertIs(inspect(pi3).attrs.is_immutable.loaded_value, NO_VALUE)
# Immutable columns are immutable even if not loaded
with self.assertRaises(ImmutableColumnError):
pi1.is_immutable = 20
with self.assertRaises(ImmutableColumnError):
pi2.is_immutable = 'bb'
with self.assertRaises(ImmutableColumnError):
pi3.is_immutable = 'yy'
开发者ID:hasgeek,项目名称:coaster,代码行数:32,代码来源:test_annotations.py
示例9: handle_cycle_task_group_object_task_put
def handle_cycle_task_group_object_task_put(obj):
if inspect(obj).attrs.contact.history.has_changes():
add_cycle_task_reassigned_notification(obj)
history = inspect(obj).attrs.end_date.history
if not history.has_changes():
return
# NOTE: A history might "detect" a change even if end_date was not changed
# due to different data types, i.e. date vs. datetime with the time part set
# to zero. Example:
#
# >>> datetime(2017, 5, 15, 0, 0) == date(2017, 5, 15)
# False
#
# We thus need to manually check both date values without the time part
# in order to avoid unnecessary work and DB updates.
old_date = history.deleted[0] if history.deleted else None
new_date = history.added[0] if history.added else None
if old_date is not None and new_date is not None:
if isinstance(old_date, datetime):
old_date = old_date.date()
if isinstance(new_date, datetime):
new_date = new_date.date()
if old_date == new_date:
return # we have a false positive, no change actually occurred
# the end date has actually changed, respond accordingly
modify_cycle_task_end_date(obj)
开发者ID:zidarsk8,项目名称:ggrc-core,代码行数:31,代码来源:notification_handler.py
示例10: add_superclass_path
def add_superclass_path(self, column, cls, alias_maker):
path = []
for i, sup in enumerate(sqla_inheritance_with_conditions(cls)):
# if getattr(inspect(sup), 'local_table', None) is None:
# continue
condition = inspect(cls).inherit_condition
if condition is not None:
alias_maker.add_condition(condition)
if i:
path.append(SuperClassRelationship(sup, cls))
cls = sup
alias_maker.alias_from_relns(*path)
if _columnish(column):
local_keys = {c.key for c in inspect(cls).local_table.columns}
if column.key in local_keys:
column = getattr(cls, column.key)
return column
elif _propertish(column):
if isinstance(column, InstrumentedAttribute):
column = column.impl.parent_token
if column.parent == inspect(cls):
return column
else:
assert False, "what is this column?"
else:
assert False, "The column is found in the "\
"class and not in superclasses?"
开发者ID:PMR2,项目名称:virtuoso-python,代码行数:27,代码来源:quadextractor.py
示例11: handle_cycle_task_group_object_task_put
def handle_cycle_task_group_object_task_put(
sender, obj=None, src=None, service=None): # noqa pylint: disable=unused-argument
if inspect(obj).attrs.contact.history.has_changes():
ensure_assignee_is_workflow_member(obj.cycle.workflow, obj.contact)
if any([inspect(obj).attrs.start_date.history.has_changes(),
inspect(obj).attrs.end_date.history.has_changes()]):
update_cycle_dates(obj.cycle)
if inspect(obj).attrs.status.history.has_changes():
# TODO: check why update_cycle_object_parent_state destroys object history
# when accepting the only task in a cycle. The listener below is a
# workaround because of that.
Signals.status_change.send(
obj.__class__,
obj=obj,
new_status=obj.status,
old_status=inspect(obj).attrs.status.history.deleted.pop(),
)
update_cycle_task_object_task_parent_state(obj)
# Doing this regardless of status.history.has_changes() is important in order
# to update objects that have been declined. It updates the os_last_updated
# date and last_updated_by.
if getattr(obj.task_group_task, 'object_approval', None):
for tgobj in obj.task_group_task.task_group.objects:
if obj.status == 'Verified':
tgobj.modified_by = get_current_user()
tgobj.set_reviewed_state()
db.session.add(tgobj)
db.session.flush()
开发者ID:zidarsk8,项目名称:ggrc-core,代码行数:32,代码来源:__init__.py
示例12: extract_qmps
def extract_qmps(self, sqla_cls, subject_pattern, alias_maker, for_graph):
mapper = inspect(sqla_cls)
supercls = next(islice(
sqla_inheritance_with_conditions(sqla_cls), 1, 2), None)
if supercls:
supermapper = inspect(supercls)
super_props = set(chain(
supermapper.columns, supermapper.relationships))
else:
super_props = set()
for c in chain(mapper.columns, mapper.relationships):
# Local columns only to avoid duplication
if c in super_props:
# But there are exceptions
if not self.delayed_column(sqla_cls, c, for_graph):
continue
# in this case, make sure superclass is in aliases.
c = self.add_superclass_path(c, sqla_cls, alias_maker)
if 'rdf' in getattr(c, 'info', ()):
from virtuoso.vmapping import QuadMapPattern
qmp = c.info['rdf']
if isinstance(qmp, QuadMapPattern):
qmp = self.qmp_with_defaults(
qmp, subject_pattern, sqla_cls, alias_maker,
for_graph, c)
if qmp is not None and qmp.graph_name == for_graph.name:
qmp.resolve(sqla_cls)
yield qmp
开发者ID:PMR2,项目名称:virtuoso-python,代码行数:28,代码来源:quadextractor.py
示例13: _get_criteria
def _get_criteria(keys, class_, obj):
criteria = []
visited_constraints = []
for key in keys:
if key.constraint in visited_constraints:
continue
visited_constraints.append(key.constraint)
subcriteria = []
for index, column in enumerate(key.constraint.columns):
prop = sa.inspect(class_).get_property_by_column(
column
)
foreign_column = (
key.constraint.elements[index].column
)
subcriteria.append(
getattr(class_, prop.key) ==
getattr(
obj,
sa.inspect(type(obj))
.get_property_by_column(
foreign_column
).key
)
)
criteria.append(sa.and_(*subcriteria))
return criteria
开发者ID:lnielsen,项目名称:sqlalchemy-utils,代码行数:28,代码来源:foreign_keys.py
示例14: test_length
def test_length(self):
umapper = inspect(self.classes.User)
amapper = inspect(self.classes.Address)
pneg1 = PathRegistry.coerce(())
p0 = PathRegistry.coerce((umapper,))
p1 = PathRegistry.coerce((umapper, umapper.attrs.addresses))
p2 = PathRegistry.coerce((umapper, umapper.attrs.addresses, amapper))
p3 = PathRegistry.coerce(
(
umapper,
umapper.attrs.addresses,
amapper,
amapper.attrs.email_address,
)
)
eq_(len(pneg1), 0)
eq_(len(p0), 1)
eq_(len(p1), 2)
eq_(len(p2), 3)
eq_(len(p3), 4)
eq_(pneg1.length, 0)
eq_(p0.length, 1)
eq_(p1.length, 2)
eq_(p2.length, 3)
eq_(p3.length, 4)
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:26,代码来源:test_utils.py
示例15: _get_person_link
def _get_person_link(self, data, extra_data=None):
extra_data = extra_data or {}
person = get_event_person(self.event, data, create_untrusted_persons=self.create_untrusted_persons,
allow_external=True)
person_data = {'title': next((x.value for x in UserTitle if data.get('title') == orig_string(x.title)),
UserTitle.none),
'first_name': data.get('firstName', ''), 'last_name': data['familyName'],
'affiliation': data.get('affiliation', ''), 'address': data.get('address', ''),
'phone': data.get('phone', ''), 'display_order': data['displayOrder']}
person_data.update(extra_data)
person_link = None
if self.object and inspect(person).persistent:
person_link = self.person_link_cls.find_first(person=person, object=self.object)
if not person_link:
person_link = self.person_link_cls(person=person)
person_link.populate_from_dict(person_data)
email = data.get('email', '').lower()
if email != person_link.email:
if not self.event or not self.event.persons.filter_by(email=email).first():
person_link.person.email = email
person_link.person.user = get_user_by_email(email)
if inspect(person).persistent:
signals.event.person_updated.send(person_link.person)
else:
raise UserValueError(_('There is already a person with the email {}').format(email))
return person_link
开发者ID:indico,项目名称:indico,代码行数:26,代码来源:fields.py
示例16: test_plain_aliased_compound
def test_plain_aliased_compound(self):
Company = _poly_fixtures.Company
Person = _poly_fixtures.Person
Engineer = _poly_fixtures.Engineer
cmapper = inspect(Company)
emapper = inspect(Engineer)
c_alias = aliased(Company)
p_alias = aliased(Person)
c_alias = inspect(c_alias)
p_alias = inspect(p_alias)
p1 = PathRegistry.coerce(
(c_alias, cmapper.attrs.employees, p_alias, emapper.attrs.machines)
)
# plain AliasedClass - the path keeps that AliasedClass directly
# as is in the path
eq_(
p1.path,
(
c_alias,
cmapper.attrs.employees,
p_alias,
emapper.attrs.machines,
),
)
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:27,代码来源:test_utils.py
示例17: done
def done(self, new_password, field_name='password'):
self.user.set_password(new_password, field_name)
if self.user.no_errors():
self.dt_use = datetime.datetime.utcnow()
inspect(self).session.commit()
else:
self.errors.extend(self.user.errors)
开发者ID:adam1978828,项目名称:webapp1,代码行数:7,代码来源:user_restore_password.py
示例18: log_activity
def log_activity(self, sender, actor, verb, object, target=None):
assert self.running
if not isinstance(object, Entity):
# generic forms may send signals inconditionnaly. For now we have activity
# only for Entities
return
session = object_session(object)
kwargs = dict(actor=actor, verb=verb, object_type=object.entity_type)
if sa.inspect(object).deleted:
# object is in deleted state: flush has occurred, don't reference it or
# we'll have an error when adding entry to session
kwargs['object_id'] = object.id
else:
kwargs['object'] = object
if target is not None:
kwargs['target_type'] = target.entity_type
if sa.inspect(target).deleted:
kwargs['target_id'] = target.id
else:
kwargs['target'] = target
entry = ActivityEntry(**kwargs)
entry.object_type = object.entity_type
session.add(entry)
开发者ID:debon,项目名称:abilian-core,代码行数:27,代码来源:service.py
示例19: test_insert_and_object_states
def test_insert_and_object_states(caplog):
engine = create_engine('sqlite:///:memory:', echo=True)
Base.metadata.create_all(engine)
session = sessionmaker(bind=engine)()
user = User(name='Jeremy', fullname='Jeremy Kao')
assert user not in session
assert inspect(user).transient
session.add(user)
assert inspect(user).pending
assert user.id is None
caplog.clear()
session.commit()
sqls = [r.message for r in caplog.records]
assert inspect(user).persistent
assert user.id == 1
assert sqls == [
'BEGIN (implicit)',
'INSERT INTO user (name, fullname) VALUES (?, ?)',
"('Jeremy', 'Jeremy Kao')",
'COMMIT'
]
开发者ID:imsardine,项目名称:learning,代码行数:25,代码来源:test_orm.py
示例20: inheritance_args
def inheritance_args(self, cls, version_table, table):
"""
Return mapper inheritance args for currently built history model.
"""
args = {}
if not sa.inspect(self.model).single:
parent = find_closest_versioned_parent(self.manager, self.model)
if parent:
# The version classes do not contain foreign keys, hence we
# need to map inheritance condition manually for classes that
# use joined table inheritance
if parent.__table__.name != table.name:
mapper = sa.inspect(self.model)
reflector = VersionExpressionParser()
inherit_condition = reflector(mapper.inherit_condition)
tx_column_name = self.manager.options["transaction_column_name"]
args["inherit_condition"] = sa.and_(
inherit_condition,
getattr(parent.__table__.c, tx_column_name) == getattr(cls.__table__.c, tx_column_name),
)
args["inherit_foreign_keys"] = [
version_table.c[column.key] for column in sa.inspect(self.model).columns if column.primary_key
]
args.update(copy_mapper_args(self.model))
return args
开发者ID:M4d40,项目名称:sqlalchemy-continuum,代码行数:29,代码来源:model_builder.py
注:本文中的sqlalchemy.inspect函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论