本文整理汇总了Python中sqlalchemy.inspection.inspect函数的典型用法代码示例。如果您正苦于以下问题:Python inspect函数的具体用法?Python inspect怎么用?Python inspect使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了inspect函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: serialize
def serialize(self):
data = {}
if isinstance(self, Booking):
hotel = Hotel.query.filter_by(id=self.hotel_id).first()
data['hotel'] = {c: str(getattr(hotel, c)) for c in inspect(hotel).attrs.keys()}
data.update({c: str(getattr(self, c)) for c in inspect(self).attrs.keys()})
return data
开发者ID:sreekanthkaralmanna,项目名称:bookmyhotel,代码行数:7,代码来源:models.py
示例2: __new__
def __new__(mcs, class_name, bases, attrs):
"""Create new factory type."""
meta = attrs.get("Meta")
if getattr(meta, "abstract", None) is True or meta.model is None:
return super(ModelFactoryMetaClass, mcs).__new__(mcs, class_name, bases, attrs)
# Process relationships and composites
composite_columns = set()
for prop in inspect(meta.model).iterate_properties:
if prop.key in attrs:
continue
elif isinstance(prop, RelationshipProperty):
if prop.direction == MANYTOONE:
attrs[prop.key] = ModelSubFactory(prop.mapper.class_)
elif isinstance(prop, CompositeProperty):
composite_columns.update(prop.columns)
attrs[prop.key] = mcs.types.get[prop.composite_class]
# Process columns
for prop in inspect(meta.model).iterate_properties:
if prop.key in attrs:
continue
elif isinstance(prop, ColumnProperty):
for col in prop.columns:
if col not in composite_columns:
attrs[prop.key] = mcs.types[col.type.__class__](col.type)
factory_class = super(ModelFactoryMetaClass, mcs).__new__(mcs, class_name, bases, attrs)
mcs.factories[factory_class._meta.model] = factory_class
return factory_class
开发者ID:jmuhlich,项目名称:alchemyboy,代码行数:30,代码来源:base.py
示例3: to_dict
def to_dict(self, follow_relationships=True):
to_return = {}
for column in inspect(self.__class__).mapper.columns:
to_return[column.name] = getattr(self, column.name)
relationship_names = []
if follow_relationships and isinstance(self, Base):
relationship_names.extend([relationship_name for relationship_name in inspect(self.__class__).mapper.relationships.keys()])
to_return['type'] = self.__class__.__name__
if hasattr(self, 'get_primary_descriptor'):
to_return['primary_descriptor'] = self.get_primary_descriptor()
for attr in relationship_names: # Will only run if follow_relationships is True; see above
value = getattr(self, attr)
if hasattr(value, 'to_dict'):
to_return[attr] = value.to_dict(follow_relationships = False)
elif hasattr(value, '__iter__'):
new_list = []
for element in value:
if hasattr(element, 'to_dict'):
new_list.append(element.to_dict(follow_relationships = False))
else:
new_list.append(element)
to_return[attr] = new_list
return to_return
开发者ID:ClaudioBryla,项目名称:pydidit-backend,代码行数:26,代码来源:Model.py
示例4: args_for_model
def args_for_model(Model):
# the attribute arguments (no filters)
args = { column.name.lower() : convert_sqlalchemy_type(column.type, column) \
for column in inspect(Model).columns }
# add the primary key filter
# the primary keys for the Model
primary_keys = inspect(Model).primary_key
# make sure there is only one
assert len(primary_keys) == 1, "Can only support one primary key - how would I know what to reference for joins?"
primary_key = primary_keys[0]
# figure out the type of the primary key
primary_key_type = convert_sqlalchemy_type(primary_key.type, primary_key)
# add the primary key filter to the arg dictionary
args['pk'] = primary_key_type
# create a copy of the argument dict we can mutate
fullArgs = args.copy()
# todo: add type-specific filters
# go over the arguments
for arg, type in args.items():
# add the list member filter
fullArgs[arg + '_in'] = List(type)
# return the complete dictionary of arguments
return fullArgs
开发者ID:NickDubelman,项目名称:nautilus,代码行数:29,代码来源:helpers.py
示例5: __call__
def __call__(self, context):
# we take the fti type settings to get the selected connections and table
if ISQLTypeSchemaContext.providedBy(context):
context = ISQLTypeSettings(context.fti)
elif IField.providedBy(context):
context = ISQLTypeSettings(aq_parent(context.context).fti)
urls = ISQLAlchemyConnectionStrings(component.getUtility(ISiteRoot)).values()
if not getattr(context, 'sql_table', None):
return SimpleVocabulary([])
items = []
connection = queryUtility(ISQLConnectionsUtility, name=context.id, default=None)
if not connection:
connection = registerConnectionUtilityForFTI(context.context)
columns = []
for a in inspect(connection.tableClass).columns:
if a.name:
items.append(SimpleTerm(a.name, a.name, a.name+' ('+str(a.type)+')'))
columns.append(a.name)
for a in getattr(inspect(connection.tableClass), 'relationships', []):
if a.key in columns:
continue
items.append(SimpleTerm(a.key, a.key, a.key+' (Relation)'))
for b in inspect(a.table).columns:
if b.name:
items.append(SimpleTerm(a.key+'.'+b.name, a.key+'.'+b.name, a.key+'.'+b.name+' ('+str(b.type)+')'))
columns.append(a.key+'.'+b.name)
# for b in getattr(inspect(connection.tableClass), 'relationships', []):
# if a.key+'.'+b.key in columns:
# continue
# items.append(SimpleTerm(a.key+'.'+b.key, a.key+'.'+b.key, a.key+'.'+b.key+' (Relation)'))
items.sort( lambda x, y: cmp(x.value, y.value ) )
return SimpleVocabulary(items)
开发者ID:Martronic-SA,项目名称:collective.behavior.sql,代码行数:32,代码来源:vocabularies.py
示例6: get_model_schema
def get_model_schema(self, model, with_relationships=True):
schema = {}
for column in inspect(model).columns:
schema[six.text_type(column.name)] = None
if with_relationships:
for rel in inspect(model).relationships:
schema[six.text_type(rel.table.name)] = None
return schema
开发者ID:mmalchuk,项目名称:openstack-fuel-web,代码行数:8,代码来源:test_installation_info.py
示例7: recursive_content
def recursive_content(self, publish):
vec = []
for entry in dir(self):
if entry in inspect(type(self)).relationships:
i = inspect(self.__class__).relationships[entry]
if i.direction.name == "ONETOMANY":
x = getattr(self, str(entry))
for xx in x:
additional_metadata = None
if hasattr(xx, "additional_metadata"):
if xx.additional_metadata:
additional_metadata = json.loads(xx.additional_metadata)
locale_id = None
if hasattr(xx, "locale_id"):
locale_id = xx.locale_id
info = {'level': xx.__tablename__,
'content': xx.content,
'object_id': xx.object_id,
'client_id': xx.client_id,
'parent_object_id': xx.parent_object_id,
'parent_client_id': xx.parent_client_id,
'entity_type': xx.entity_type,
'marked_for_deletion': xx.marked_for_deletion,
'locale_id': locale_id,
'additional_metadata': additional_metadata,
'contains': recursive_content(xx, publish) or None}
published = False
if info['contains']:
log.debug(info['contains'])
ents = []
for ent in info['contains']:
ents += [ent]
# log.debug('CONTAINS', ent)
for ent in ents:
try:
if 'publish' in ent['level']:
if not ent['marked_for_deletion']:
published = True
if not publish:
break
if publish:
info['contains'].remove(ent)
except TypeError:
log.debug('IDK: %s' % str(ent))
if publish:
if not published:
if 'publish' in info['level']:
res = dict()
res['level'] = info['level']
res['marked_for_deletion'] = info['marked_for_deletion']
info = res
else:
continue
info['published'] = published
vec += [info]
# vec += recursive_content(xx)
return vec
开发者ID:LukeTapekhin,项目名称:lingvodoc,代码行数:57,代码来源:models.py
示例8: survey
def survey(self, survey_id):
survey = Surveys.query.get(survey_id)
return self.render('admin/survey.html',
delete_form=self.get_delete_form(),
survey=survey,
surveys_table=inspect(Surveys),
results_table=inspect(Results),
month_table=inspect(MonthFeedback),
year_table=inspect(YearFeedback),
edit_permission=has_edit_permission())
开发者ID:jameshanlon,项目名称:cheese,代码行数:10,代码来源:views.py
示例9: __get__
def __get__(self, obj, cls):
if obj:
cls = obj.__class__
t = self._type.get(id(cls),None)
if t is None:
from .objtyp import ObjType
self._type[id(cls)] = t = ObjType.get(cls)
k = inspect(t).key
if k is None:
raise NoDataExc("ObjType for {}".format(cls))
return inspect(t).key[1][0]
开发者ID:smurfix,项目名称:pybble,代码行数:11,代码来源:object.py
示例10: __str__
def __str__(self):
"""
Текстовое представление исключения.
:return:
"""
return (
'Field "{0}" not found in model "{1}". '
'Available fields: "{2}"'.format(
self.field_name,
inspect(self.model).mapped_table.name,
', '.join(col.key for col in inspect(self.model).attrs
if isinstance(col, ColumnProperty))
))
开发者ID:barsgroup,项目名称:barsup-core,代码行数:14,代码来源:exceptions.py
示例11: relationship_name
def relationship_name(parent_type, child_type, *args, **kwargs):
if 'prereq' in args or ('prereq' in kwargs and kwargs['prereq']): # special cases
if parent_type == 'Project':
if child_type == 'Project':
return 'prereq_projects'
elif child_type == 'Todo':
return 'prereq_todos'
elif parent_type == 'Todo':
if child_type == 'Project':
return 'prereq_projects'
elif child_type == 'Todo':
return 'prereq_todos'
return attribute
if 'dependent' in args or ('dependent' in kwargs and kwargs['dependent']): # special cases
if parent_type == 'Project':
if child_type == 'Project':
return 'dependent_projects'
elif child_type == 'Todo':
return 'dependent_todos'
elif parent_type == 'Todo':
if child_type == 'Project':
return 'dependent_projects'
elif child_type == 'Todo':
return 'dependent_todos'
return attribute
elif 'contain' in args or ('contain' in kwargs and kwargs['contain']):
if parent_type == 'Project':
if child_type == 'Project':
return 'contains_projects'
elif child_type == 'Todo':
return 'contains_todos'
elif 'contained_by' in args or ('contained_by' in kwargs and kwargs['contained_by']):
if parent_type == 'Project':
if child_type == 'Project':
return 'contained_by_projects'
if parent_type == 'Todo':
if child_type == 'Project':
return 'contained_by_projects'
# Non-special cases
potential_relationships = []
for relationship_name in inspect(eval(parent_type)).mapper.relationships.keys():
if child_type == inspect(eval(parent_type)).mapper.relationships[relationship_name].mapper.class_.__name__:
# Might this trigger on relationships we don't want?
potential_relationships.append({relationship_name: (parent_type, child_type)})
if len(potential_relationships) != 1:
raise Exception('Something is wrong.')
else:
return potential_relationships[0].keys()[0]
开发者ID:adamlincoln,项目名称:pydidit-backend,代码行数:50,代码来源:__init__.py
示例12: get_or_create
def get_or_create(self, session, create_method='', create_method_kwargs=None, **kwargs):
""" Try to find an existing object filtering by kwargs. If not found, create. """
inspector = reflection.Inspector.from_engine(db.engine)
keys = list(chain.from_iterable([i['column_names'] for i in
inspector.get_indexes(inspect(self).mapped_table)]))
keys += [k.name for k in inspect(self).primary_key]
filter_args = {arg: kwargs[arg] for arg in kwargs if arg in keys}
try:
return session.query(self).filter_by(**filter_args).one()
except NoResultFound:
kwargs.update(create_method_kwargs or {})
new = getattr(self, create_method, self)(**kwargs)
session.add(new)
return new
开发者ID:antsar,项目名称:pybusmap,代码行数:14,代码来源:models.py
示例13: check_for_client
def check_for_client(obj, clients):
if obj.client_id in clients:
return True
for entry in dir(obj):
if entry in inspect(type(obj)).relationships:
i = inspect(obj.__class__).relationships[entry]
if i.direction.name == "ONETOMANY":
x = getattr(obj, str(entry))
answer = False
for xx in x:
answer = answer or check_for_client(xx, clients)
if answer:
break
return answer
return False
开发者ID:ispras,项目名称:lingvodoc,代码行数:15,代码来源:utils.py
示例14: add_model
def add_model(self, model, name=None, toDictKargs=None):
if not name:
name = inspect(model).mapped_table.name
if name in self.models:
raise ValueError('model with name {0} already added'.format(name))
self.models[name] = model
self.modelDictKargs[name] = toDictKargs or {}
开发者ID:chiesax,项目名称:alchemyjson,代码行数:7,代码来源:manager.py
示例15: _filter_query
def _filter_query(model, query, filters):
"""Apply filter to query
:param model:
:param query:
:param filters: list of filter dict with key 'key', 'comparator', 'value'
like {'key': 'pod_id', 'comparator': 'eq', 'value': 'test_pod_uuid'}
:return:
"""
filter_dict = {}
for query_filter in filters:
# only eq filter supported at first
if query_filter['comparator'] != 'eq':
continue
key = query_filter['key']
if key not in model.attributes:
continue
if isinstance(inspect(model).columns[key].type, sql.Boolean):
filter_dict[key] = strutils.bool_from_string(query_filter['value'])
else:
filter_dict[key] = query_filter['value']
if filter_dict:
return query.filter_by(**filter_dict)
else:
return query
开发者ID:Ronghui,项目名称:tricircle,代码行数:26,代码来源:core.py
示例16: parse_item
def parse_item(self):
"""Parse 'repeat every' value
"""
value = self.raw_value
if self.value_explicitly_empty(value):
self.set_empty = True
value = None
elif value:
try:
value = int(self.raw_value)
if not (0 < value < 31):
raise ValueError
except ValueError:
self.add_error(errors.WRONG_VALUE_ERROR,
column_name=self.display_name)
return
else:
# if it is an empty string
value = None
# check if value is unmodified for existing workflow
if not self.row_converter.is_new and self.raw_value:
insp = inspection.inspect(self.row_converter.obj)
repeat_prev = getattr(insp.attrs, self.key).history.unchanged
if value not in repeat_prev:
self.add_warning(errors.UNMODIFIABLE_COLUMN,
column_name=self.display_name)
self.set_empty = False
return None
return value
开发者ID:egorhm,项目名称:ggrc-core,代码行数:32,代码来源:handlers.py
示例17: add_row
def add_row(self, table_name, row):
col_names = self.list_column_names(table_name)
table_object = self._base.classes[table_name]
prim_key = inspect(table_object).primary_key[0].name
row_dict = {key: row[key] for key in col_names}
stmt = table_object.__table__.insert().values(row_dict)
self._connection.execute(stmt)
开发者ID:JohnLZeller,项目名称:climyadmin,代码行数:7,代码来源:db.py
示例18: fill_defaults_on_init
def fill_defaults_on_init(target, args, kwargs):
for key, column in inspect(target.__class__).columns.items():
if column.default is not None:
if callable(column.default.arg):
setattr(target, key, column.default.arg(target))
else:
setattr(target, key, column.default.arg)
开发者ID:samushighwind,项目名称:fullstack-nanodegree-vm,代码行数:7,代码来源:db.py
示例19: getContainerTree
def getContainerTree(self, user, base, object_type=None):
table = inspect(ObjectInfoIndex)
o2 = aliased(ObjectInfoIndex)
base = ObjectProxy.get_adjusted_dn(base, self.env.base)
query = and_(getattr(ObjectInfoIndex, "_adjusted_parent_dn") == base, getattr(ObjectInfoIndex, "_type").in_(self.containers))
count = func.count(getattr(o2, "_parent_dn"))
parent_join_condition = getattr(o2, "_parent_dn") == getattr(ObjectInfoIndex, "dn")
with make_session() as session:
query_result = session.query(ObjectInfoIndex, count) \
.outerjoin(o2, and_(getattr(o2, "_invisible").is_(False), parent_join_condition)) \
.filter(query) \
.group_by(*table.c)
res = {}
factory = ObjectFactory.getInstance()
for item, children in query_result:
self.update_res(res, item, user, 1)
if item.dn in res:
res[item.dn]['hasChildren'] = children > 0
res[item.dn]['adjusted_dn'] = ObjectProxy.get_adjusted_dn(item.dn, self.env.base)
if object_type is not None:
# check if object_type is allowed in this container
allowed = factory.getAllowedSubElementsForObject(res[item.dn]['tag'], includeInvisible=False)
if "*" in object_type:
# all allowed
res[item.dn]['allowed_move_target'] = True
elif isinstance(object_type, list):
res[item.dn]['allowed_move_target'] = len(set(object_type).intersection(allowed)) > 0
else:
res[item.dn]['allowed_move_target'] = object_type in allowed
return res
开发者ID:gonicus,项目名称:gosa,代码行数:33,代码来源:methods.py
示例20: handle
def handle(self, context, e): # todo: refactoring
if hasattr(e, "on_callback"):
return e.on_callback(context.emit) # hmm.
try:
m = inspect(e)
if hasattr(m, "key") and hasattr(m, "class_"): # User.id
return ":{}".format(str(m))
elif hasattr(m, "key") and hasattr(m, "value"): # User.id == 1 <-
return self.handle(context, m.value)
elif hasattr(m, "name") and hasattr(m, "_annotations"): # -> User.id == 1
return ":{}.{}".format(m._annotations["parententity"].class_.__name__, m.name)
elif hasattr(m, "operator") and hasattr(m, "clauses"): # x & y, x | y
op = self.reverse_table[m.operator]
args = [self.scan(context, str(i), x) for i, x in enumerate(m.clauses)]
args.insert(0, op)
return args
elif hasattr(m, "mapper"): # User
return ":{}".format(m.mapper.class_.__name__)
elif hasattr(m, "left") and hasattr(m, "right"): # User.id == 1
return [
self.reverse_table[m.operator],
self.scan(context, "1", m.left),
self.scan(context, "2", m.right),
]
elif hasattr(m, "modifier") and hasattr(m, "element"): # sa.desc(User.id)
return [self.reverse_table[m.modifier], self.scan(context, "1", m.element)]
else:
raise HandleActionNotFound(e)
except NoInspectionAvailable:
return e # 1, 2, 3?
开发者ID:podhmo,项目名称:block.sqla.lipy,代码行数:30,代码来源:reverse.py
注:本文中的sqlalchemy.inspection.inspect函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论