• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python inspection.inspect函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.inspection.inspect函数的典型用法代码示例。如果您正苦于以下问题:Python inspect函数的具体用法?Python inspect怎么用?Python inspect使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了inspect函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: serialize

 def serialize(self):
     data = {}
     if isinstance(self, Booking):
         hotel = Hotel.query.filter_by(id=self.hotel_id).first()
         data['hotel'] = {c: str(getattr(hotel, c)) for c in inspect(hotel).attrs.keys()}
     data.update({c: str(getattr(self, c)) for c in inspect(self).attrs.keys()})
     return data
开发者ID:sreekanthkaralmanna,项目名称:bookmyhotel,代码行数:7,代码来源:models.py


示例2: __new__

    def __new__(mcs, class_name, bases, attrs):
        """Create new factory type."""
        meta = attrs.get("Meta")
        if getattr(meta, "abstract", None) is True or meta.model is None:
            return super(ModelFactoryMetaClass, mcs).__new__(mcs, class_name, bases, attrs)

        # Process relationships and composites
        composite_columns = set()
        for prop in inspect(meta.model).iterate_properties:
            if prop.key in attrs:
                continue
            elif isinstance(prop, RelationshipProperty):
                if prop.direction == MANYTOONE:
                    attrs[prop.key] = ModelSubFactory(prop.mapper.class_)
            elif isinstance(prop, CompositeProperty):
                composite_columns.update(prop.columns)
                attrs[prop.key] = mcs.types.get[prop.composite_class]

        # Process columns
        for prop in inspect(meta.model).iterate_properties:
            if prop.key in attrs:
                continue
            elif isinstance(prop, ColumnProperty):
                for col in prop.columns:
                    if col not in composite_columns:
                        attrs[prop.key] = mcs.types[col.type.__class__](col.type)

        factory_class = super(ModelFactoryMetaClass, mcs).__new__(mcs, class_name, bases, attrs)
        mcs.factories[factory_class._meta.model] = factory_class
        return factory_class
开发者ID:jmuhlich,项目名称:alchemyboy,代码行数:30,代码来源:base.py


示例3: to_dict

    def to_dict(self, follow_relationships=True):
        to_return = {}
        for column in inspect(self.__class__).mapper.columns:
            to_return[column.name] = getattr(self, column.name)
        relationship_names = []
        if follow_relationships and isinstance(self, Base):
            relationship_names.extend([relationship_name for relationship_name in inspect(self.__class__).mapper.relationships.keys()])

        to_return['type'] = self.__class__.__name__
        if hasattr(self, 'get_primary_descriptor'):
            to_return['primary_descriptor'] = self.get_primary_descriptor()

        for attr in relationship_names: # Will only run if follow_relationships is True; see above
            value = getattr(self, attr)
            if hasattr(value, 'to_dict'):
                to_return[attr] = value.to_dict(follow_relationships = False)
            elif hasattr(value, '__iter__'):
                new_list = []
                for element in value:
                    if hasattr(element, 'to_dict'):
                        new_list.append(element.to_dict(follow_relationships = False))
                    else:
                        new_list.append(element)
                to_return[attr] = new_list

        return to_return
开发者ID:ClaudioBryla,项目名称:pydidit-backend,代码行数:26,代码来源:Model.py


示例4: args_for_model

def args_for_model(Model):
    # the attribute arguments (no filters)
    args = { column.name.lower() : convert_sqlalchemy_type(column.type, column) \
                                        for column in inspect(Model).columns }

    # add the primary key filter

    # the primary keys for the Model
    primary_keys = inspect(Model).primary_key
    # make sure there is only one
    assert len(primary_keys) == 1, "Can only support one primary key - how would I know what to reference for joins?"

    primary_key = primary_keys[0]
    # figure out the type of the primary key
    primary_key_type = convert_sqlalchemy_type(primary_key.type, primary_key)
    # add the primary key filter to the arg dictionary
    args['pk'] = primary_key_type

    # create a copy of the argument dict we can mutate
    fullArgs = args.copy()

    # todo: add type-specific filters
    # go over the arguments
    for arg, type in args.items():
        # add the list member filter
        fullArgs[arg + '_in'] = List(type)

    # return the complete dictionary of arguments
    return fullArgs
开发者ID:NickDubelman,项目名称:nautilus,代码行数:29,代码来源:helpers.py


示例5: __call__

    def __call__(self, context):
        # we take the fti type settings to get the selected connections and table
        if ISQLTypeSchemaContext.providedBy(context):
            context = ISQLTypeSettings(context.fti)
        elif IField.providedBy(context):
            context = ISQLTypeSettings(aq_parent(context.context).fti)
        urls = ISQLAlchemyConnectionStrings(component.getUtility(ISiteRoot)).values()
        if not getattr(context, 'sql_table', None):
            return SimpleVocabulary([])
        items = []
        connection = queryUtility(ISQLConnectionsUtility, name=context.id, default=None)
        if not connection:
            connection = registerConnectionUtilityForFTI(context.context)
        columns = []
        for a in inspect(connection.tableClass).columns:
            if a.name:
                items.append(SimpleTerm(a.name, a.name, a.name+' ('+str(a.type)+')'))
                columns.append(a.name)
        for a in getattr(inspect(connection.tableClass), 'relationships', []):
            if a.key in columns:
                continue
            items.append(SimpleTerm(a.key, a.key, a.key+' (Relation)'))
            for b in inspect(a.table).columns:
                if b.name:
                    items.append(SimpleTerm(a.key+'.'+b.name, a.key+'.'+b.name, a.key+'.'+b.name+' ('+str(b.type)+')'))
                    columns.append(a.key+'.'+b.name)
#            for b in getattr(inspect(connection.tableClass), 'relationships', []):
#                if a.key+'.'+b.key in columns:
#                    continue
#                items.append(SimpleTerm(a.key+'.'+b.key, a.key+'.'+b.key, a.key+'.'+b.key+' (Relation)'))
        items.sort( lambda x, y: cmp(x.value, y.value ) )
        return SimpleVocabulary(items)
开发者ID:Martronic-SA,项目名称:collective.behavior.sql,代码行数:32,代码来源:vocabularies.py


示例6: get_model_schema

 def get_model_schema(self, model, with_relationships=True):
     schema = {}
     for column in inspect(model).columns:
         schema[six.text_type(column.name)] = None
     if with_relationships:
         for rel in inspect(model).relationships:
             schema[six.text_type(rel.table.name)] = None
     return schema
开发者ID:mmalchuk,项目名称:openstack-fuel-web,代码行数:8,代码来源:test_installation_info.py


示例7: recursive_content

def recursive_content(self, publish):
    vec = []
    for entry in dir(self):
        if entry in inspect(type(self)).relationships:
            i = inspect(self.__class__).relationships[entry]
            if i.direction.name == "ONETOMANY":
                x = getattr(self, str(entry))
                for xx in x:
                    additional_metadata = None
                    if hasattr(xx, "additional_metadata"):
                        if xx.additional_metadata:
                            additional_metadata = json.loads(xx.additional_metadata)
                    locale_id = None
                    if hasattr(xx, "locale_id"):
                        locale_id = xx.locale_id
                    info = {'level': xx.__tablename__,
                            'content': xx.content,
                            'object_id': xx.object_id,
                            'client_id': xx.client_id,
                            'parent_object_id': xx.parent_object_id,
                            'parent_client_id': xx.parent_client_id,
                            'entity_type': xx.entity_type,
                            'marked_for_deletion': xx.marked_for_deletion,
                            'locale_id': locale_id,
                            'additional_metadata': additional_metadata,
                            'contains': recursive_content(xx, publish) or None}
                    published = False
                    if info['contains']:
                        log.debug(info['contains'])
                        ents = []
                        for ent in info['contains']:
                            ents += [ent]
                            # log.debug('CONTAINS', ent)
                        for ent in ents:
                            try:
                                if 'publish' in ent['level']:
                                    if not ent['marked_for_deletion']:
                                        published = True
                                        if not publish:
                                            break
                                    if publish:
                                        info['contains'].remove(ent)
                            except TypeError:
                                log.debug('IDK: %s' % str(ent))
                    if publish:
                        if not published:
                            if 'publish' in info['level']:
                                res = dict()
                                res['level'] = info['level']
                                res['marked_for_deletion'] = info['marked_for_deletion']
                                info = res
                            else:
                                continue
                    info['published'] = published
                    vec += [info]
                    # vec += recursive_content(xx)
    return vec
开发者ID:LukeTapekhin,项目名称:lingvodoc,代码行数:57,代码来源:models.py


示例8: survey

 def survey(self, survey_id):
     survey = Surveys.query.get(survey_id)
     return self.render('admin/survey.html',
             delete_form=self.get_delete_form(),
             survey=survey,
             surveys_table=inspect(Surveys),
             results_table=inspect(Results),
             month_table=inspect(MonthFeedback),
             year_table=inspect(YearFeedback),
             edit_permission=has_edit_permission())
开发者ID:jameshanlon,项目名称:cheese,代码行数:10,代码来源:views.py


示例9: __get__

	def __get__(self, obj, cls):
		if obj:
			cls = obj.__class__
		t = self._type.get(id(cls),None)
		if t is None:
			from .objtyp import ObjType
			self._type[id(cls)] = t = ObjType.get(cls)
		k = inspect(t).key
		if k is None:
			raise NoDataExc("ObjType for {}".format(cls))
		return inspect(t).key[1][0]
开发者ID:smurfix,项目名称:pybble,代码行数:11,代码来源:object.py


示例10: __str__

    def __str__(self):
        """
        Текстовое представление исключения.

        :return:
        """
        return (
            'Field "{0}" not found in model "{1}". '
            'Available fields: "{2}"'.format(
                self.field_name,
                inspect(self.model).mapped_table.name,
                ', '.join(col.key for col in inspect(self.model).attrs
                          if isinstance(col, ColumnProperty))
            ))
开发者ID:barsgroup,项目名称:barsup-core,代码行数:14,代码来源:exceptions.py


示例11: relationship_name

def relationship_name(parent_type, child_type, *args, **kwargs):
    if 'prereq' in args or ('prereq' in kwargs and kwargs['prereq']): # special cases
        if parent_type == 'Project':
            if child_type == 'Project':
                return 'prereq_projects'
            elif child_type == 'Todo':
                return 'prereq_todos'
        elif parent_type == 'Todo':
            if child_type == 'Project':
                return 'prereq_projects'
            elif child_type == 'Todo':
                return 'prereq_todos'
        return attribute
    if 'dependent' in args or ('dependent' in kwargs and kwargs['dependent']): # special cases
        if parent_type == 'Project':
            if child_type == 'Project':
                return 'dependent_projects'
            elif child_type == 'Todo':
                return 'dependent_todos'
        elif parent_type == 'Todo':
            if child_type == 'Project':
                return 'dependent_projects'
            elif child_type == 'Todo':
                return 'dependent_todos'
        return attribute
    elif 'contain' in args or ('contain' in kwargs and kwargs['contain']):
        if parent_type == 'Project':
            if child_type == 'Project':
                return 'contains_projects'
            elif child_type == 'Todo':
                return 'contains_todos'
    elif 'contained_by' in args or ('contained_by' in kwargs and kwargs['contained_by']):
        if parent_type == 'Project':
            if child_type == 'Project':
                return 'contained_by_projects'
        if parent_type == 'Todo':
            if child_type == 'Project':
                return 'contained_by_projects'

    # Non-special cases
    potential_relationships = []
    for relationship_name in inspect(eval(parent_type)).mapper.relationships.keys():
        if child_type == inspect(eval(parent_type)).mapper.relationships[relationship_name].mapper.class_.__name__:
            # Might this trigger on relationships we don't want?
            potential_relationships.append({relationship_name: (parent_type, child_type)})

    if len(potential_relationships) != 1:
        raise Exception('Something is wrong.')
    else:
        return potential_relationships[0].keys()[0]
开发者ID:adamlincoln,项目名称:pydidit-backend,代码行数:50,代码来源:__init__.py


示例12: get_or_create

 def get_or_create(self, session, create_method='', create_method_kwargs=None, **kwargs):
     """ Try to find an existing object filtering by kwargs. If not found, create. """
     inspector = reflection.Inspector.from_engine(db.engine)
     keys = list(chain.from_iterable([i['column_names'] for i in
                 inspector.get_indexes(inspect(self).mapped_table)]))
     keys += [k.name for k in inspect(self).primary_key]
     filter_args = {arg: kwargs[arg] for arg in kwargs if arg in keys}
     try:
         return session.query(self).filter_by(**filter_args).one()
     except NoResultFound:
         kwargs.update(create_method_kwargs or {})
         new = getattr(self, create_method, self)(**kwargs)
         session.add(new)
         return new
开发者ID:antsar,项目名称:pybusmap,代码行数:14,代码来源:models.py


示例13: check_for_client

def check_for_client(obj, clients):
    if obj.client_id in clients:
        return True
    for entry in dir(obj):
        if entry in inspect(type(obj)).relationships:
            i = inspect(obj.__class__).relationships[entry]
            if i.direction.name == "ONETOMANY":
                x = getattr(obj, str(entry))
                answer = False
                for xx in x:
                    answer = answer or check_for_client(xx, clients)
                    if answer:
                        break
                return answer
    return False
开发者ID:ispras,项目名称:lingvodoc,代码行数:15,代码来源:utils.py


示例14: add_model

 def add_model(self, model, name=None, toDictKargs=None):
     if not name:
         name = inspect(model).mapped_table.name
     if name in self.models:
         raise ValueError('model with name {0} already added'.format(name))
     self.models[name] = model
     self.modelDictKargs[name] = toDictKargs or {}
开发者ID:chiesax,项目名称:alchemyjson,代码行数:7,代码来源:manager.py


示例15: _filter_query

def _filter_query(model, query, filters):
    """Apply filter to query

    :param model:
    :param query:
    :param filters: list of filter dict with key 'key', 'comparator', 'value'
    like {'key': 'pod_id', 'comparator': 'eq', 'value': 'test_pod_uuid'}
    :return:
    """
    filter_dict = {}
    for query_filter in filters:
        # only eq filter supported at first
        if query_filter['comparator'] != 'eq':
            continue

        key = query_filter['key']
        if key not in model.attributes:
            continue
        if isinstance(inspect(model).columns[key].type, sql.Boolean):
            filter_dict[key] = strutils.bool_from_string(query_filter['value'])
        else:
            filter_dict[key] = query_filter['value']
    if filter_dict:
        return query.filter_by(**filter_dict)
    else:
        return query
开发者ID:Ronghui,项目名称:tricircle,代码行数:26,代码来源:core.py


示例16: parse_item

  def parse_item(self):
    """Parse 'repeat every' value
    """
    value = self.raw_value

    if self.value_explicitly_empty(value):
      self.set_empty = True
      value = None
    elif value:
      try:
        value = int(self.raw_value)
        if not (0 < value < 31):
          raise ValueError
      except ValueError:
        self.add_error(errors.WRONG_VALUE_ERROR,
                       column_name=self.display_name)
        return
    else:
      # if it is an empty string
      value = None

    # check if value is unmodified for existing workflow
    if not self.row_converter.is_new and self.raw_value:
      insp = inspection.inspect(self.row_converter.obj)
      repeat_prev = getattr(insp.attrs, self.key).history.unchanged
      if value not in repeat_prev:
        self.add_warning(errors.UNMODIFIABLE_COLUMN,
                         column_name=self.display_name)
        self.set_empty = False
      return None

    return value
开发者ID:egorhm,项目名称:ggrc-core,代码行数:32,代码来源:handlers.py


示例17: add_row

 def add_row(self, table_name, row):
     col_names = self.list_column_names(table_name)
     table_object = self._base.classes[table_name]
     prim_key = inspect(table_object).primary_key[0].name
     row_dict = {key: row[key] for key in col_names}
     stmt = table_object.__table__.insert().values(row_dict)
     self._connection.execute(stmt)
开发者ID:JohnLZeller,项目名称:climyadmin,代码行数:7,代码来源:db.py


示例18: fill_defaults_on_init

def fill_defaults_on_init(target, args, kwargs):
    for key, column in inspect(target.__class__).columns.items():
        if column.default is not None:
            if callable(column.default.arg):
                setattr(target, key, column.default.arg(target))
            else:
                setattr(target, key, column.default.arg)
开发者ID:samushighwind,项目名称:fullstack-nanodegree-vm,代码行数:7,代码来源:db.py


示例19: getContainerTree

    def getContainerTree(self, user, base, object_type=None):
        table = inspect(ObjectInfoIndex)
        o2 = aliased(ObjectInfoIndex)
        base = ObjectProxy.get_adjusted_dn(base, self.env.base)
        query = and_(getattr(ObjectInfoIndex, "_adjusted_parent_dn") == base, getattr(ObjectInfoIndex, "_type").in_(self.containers))
        count = func.count(getattr(o2, "_parent_dn"))
        parent_join_condition = getattr(o2, "_parent_dn") == getattr(ObjectInfoIndex, "dn")

        with make_session() as session:
            query_result = session.query(ObjectInfoIndex, count) \
                .outerjoin(o2, and_(getattr(o2, "_invisible").is_(False), parent_join_condition)) \
                .filter(query) \
                .group_by(*table.c)

        res = {}
        factory = ObjectFactory.getInstance()
        for item, children in query_result:
            self.update_res(res, item, user, 1)

            if item.dn in res:
                res[item.dn]['hasChildren'] = children > 0
                res[item.dn]['adjusted_dn'] = ObjectProxy.get_adjusted_dn(item.dn, self.env.base)
                if object_type is not None:
                    # check if object_type is allowed in this container
                    allowed = factory.getAllowedSubElementsForObject(res[item.dn]['tag'], includeInvisible=False)
                    if "*" in object_type:
                        # all allowed
                        res[item.dn]['allowed_move_target'] = True
                    elif isinstance(object_type, list):
                        res[item.dn]['allowed_move_target'] = len(set(object_type).intersection(allowed)) > 0
                    else:
                        res[item.dn]['allowed_move_target'] = object_type in allowed
        return res
开发者ID:gonicus,项目名称:gosa,代码行数:33,代码来源:methods.py


示例20: handle

 def handle(self, context, e):  # todo: refactoring
     if hasattr(e, "on_callback"):
         return e.on_callback(context.emit)  # hmm.
     try:
         m = inspect(e)
         if hasattr(m, "key") and hasattr(m, "class_"):  # User.id
             return ":{}".format(str(m))
         elif hasattr(m, "key") and hasattr(m, "value"):  # User.id == 1 <-
             return self.handle(context, m.value)
         elif hasattr(m, "name") and hasattr(m, "_annotations"):  # -> User.id == 1
             return ":{}.{}".format(m._annotations["parententity"].class_.__name__, m.name)
         elif hasattr(m, "operator") and hasattr(m, "clauses"):  # x & y,  x | y
             op = self.reverse_table[m.operator]
             args = [self.scan(context, str(i), x) for i, x in enumerate(m.clauses)]
             args.insert(0, op)
             return args
         elif hasattr(m, "mapper"):  # User
             return ":{}".format(m.mapper.class_.__name__)
         elif hasattr(m, "left") and hasattr(m, "right"):  # User.id == 1
             return [
                 self.reverse_table[m.operator],
                 self.scan(context, "1", m.left),
                 self.scan(context, "2", m.right),
             ]
         elif hasattr(m, "modifier") and hasattr(m, "element"):  # sa.desc(User.id)
             return [self.reverse_table[m.modifier], self.scan(context, "1", m.element)]
         else:
             raise HandleActionNotFound(e)
     except NoInspectionAvailable:
         return e  # 1, 2, 3?
开发者ID:podhmo,项目名称:block.sqla.lipy,代码行数:30,代码来源:reverse.py



注:本文中的sqlalchemy.inspection.inspect函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python inspection.sqlalchemy_inspect函数代码示例发布时间:2022-05-27
下一篇:
Python func.utc_timestamp函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap