• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python orm.object_session函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.orm.object_session函数的典型用法代码示例。如果您正苦于以下问题:Python object_session函数的具体用法?Python object_session怎么用?Python object_session使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了object_session函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_i18n

 def test_i18n( self ):
     from camelot.model.i18n import Translation, ExportAsPO
     session = Session()
     session.execute( Translation.__table__.delete() )
     self.assertEqual( Translation.translate( 'bucket', 'nl_BE' ), None )
     # run twice to check all branches in the code
     Translation.translate_or_register( 'bucket', 'nl_BE' )
     Translation.translate_or_register( 'bucket', 'nl_BE' )
     self.assertEqual( Translation.translate( 'bucket', 'nl_BE' ), 'bucket' )
     self.assertEqual( Translation.translate( '', 'nl_BE' ), '' )
     self.assertEqual( Translation.translate_or_register( '', 'nl_BE' ), '' )
     # clear the cache
     Translation._cache.clear()
     # fill the cache again
     translation = Translation( language = 'nl_BE', source = 'bucket',
                                value = 'emmer', uid=1 )
     orm.object_session( translation ).flush()
     self.assertEqual( Translation.translate( 'bucket', 'nl_BE' ), 'emmer' )
     export_action = ExportAsPO()
     model_context = MockModelContext()
     model_context.obj = translation
     try:
         generator = export_action.model_run( model_context )
         file_step = generator.next()
         generator.send( ['/tmp/test.po'] )
     except StopIteration:
         pass
开发者ID:jeroendierckx,项目名称:Camelot,代码行数:27,代码来源:test_model.py


示例2: longslit_calibrate_to_database

    def longslit_calibrate_to_database(self, gmos_long_slit_arc_calibration=None, destination_dir='.', force=False):

        session = object_session(self)

        if self.wave_cal is not None:
            if not force:
                raise GMOSDatabaseDuplicate('This arc already has a wavelength calibration {0}. '
                                            'Use force=True to override'.format(self.wave_cal))
            else:
                session.delete(self.wave_cal)
                session.commit()

        arctab, linesall, shift, fake = self.longslit_calibrate(gmos_long_slit_arc_calibration=gmos_long_slit_arc_calibration)

        wavecal_store_fname = 'arccal-{}.h5'.format(self.raw.fits.fname.replace('.fits',''))
        wavecal_store_full_path = os.path.join(destination_dir, wavecal_store_fname)
        arctab.write(wavecal_store_full_path, path='arc', overwrite=True, format='hdf5')
        #linesall.write(wavecal_store_full_path, path='lines', append=True, format='hdf5')

        session = object_session(self)

        gmos_long_slit_wavecal = GMOSLongSlitArcWavelengthSolution(id = self.id, fname=wavecal_store_fname,
                                                                   path=os.path.abspath(destination_dir))

        session.add(gmos_long_slit_wavecal)
        session.commit()

        return gmos_long_slit_wavecal
开发者ID:wkerzendorf,项目名称:geminiutil,代码行数:28,代码来源:gmos_alchemy.py


示例3: last_modified

    def last_modified(self):
        """ Returns last change of the elections. """

        changes = [self.last_change, self.last_result_change]
        session = object_session(self)
        election_ids = [election.id for election in self.elections]

        # Get the last election change
        result = object_session(self).query(Election.last_change)
        result = result.order_by(desc(Election.last_change))
        result = result.filter(Election.id.in_(election_ids))
        changes.append(result.first()[0] if result.first() else None)

        # Get the last candidate change
        result = object_session(self).query(Candidate.last_change)
        result = result.order_by(desc(Candidate.last_change))
        result = result.filter(Candidate.election_id.in_(election_ids))
        changes.append(result.first()[0] if result.first() else None)

        # Get the last list connection change
        result = session.query(ListConnection.last_change)
        result = result.order_by(desc(ListConnection.last_change))
        result = result.filter(ListConnection.election_id.in_(election_ids))
        changes.append(result.first()[0] if result.first() else None)

        # Get the last list change
        result = session.query(List.last_change)
        result = result.order_by(desc(List.last_change))
        result = result.filter(List.election_id == self.id)
        changes.append(result.first()[0] if result.first() else None)

        changes = [change for change in changes if change]
        return max(changes) if changes else None
开发者ID:OneGov,项目名称:onegov.ballot,代码行数:33,代码来源:election_compound.py


示例4: __getattr__

 def __getattr__( self, name ):
     if name in self.spec:
         if name in self.parent._metadata:
             return self.spec[name].wrap( self.parent._metadata[name], object_session( self.parent ) )
         return self.spec[name].wrap( self.spec[name].default, object_session( self.parent ) )
     if name in self.parent._metadata:
         return self.parent._metadata[name]
开发者ID:knowingchaos,项目名称:galaxy-central,代码行数:7,代码来源:metadata.py


示例5: set_session_profile

def set_session_profile(survey, survey_session, profile):
    """Set up the survey session using a given profile.

    :param survey: the survey to use
    :type survey: :py:class:`euphorie.content.survey.Survey`
    :param survey_session: survey session to update
    :type survey_session: :py:class:`euphorie.client.model.SurveySession`
    :param dict profile: desired profile
    :rtype: :py:class:`euphorie.client.model.SurveySession`
    :return: the update session (this might be a new session)

    This will rebuild the survey session tree if the profile has changed.
    """
    if not survey_session.hasTree():
        BuildSurveyTree(survey, profile, survey_session)
        return survey_session

    current_profile = extractProfile(survey, survey_session)
    if current_profile == profile and not treeChanges(survey_session, survey):
        survey_session.touch()
        return survey_session

    new_session = create_survey_session(
            survey_session.title, survey, survey_session.account)
    BuildSurveyTree(survey, profile, new_session, survey_session)
    new_session.copySessionData(survey_session)
    object_session(survey_session).delete(survey_session)
    return new_session
开发者ID:euphorie,项目名称:Euphorie,代码行数:28,代码来源:profile.py


示例6: test_invalid_token_during_connect

def test_invalid_token_during_connect(db, patch_access_token_getter,
                                      account_with_single_auth_creds):
    account_id = account_with_single_auth_creds.id

    patch_access_token_getter.revoke_refresh_token(
        account_with_single_auth_creds.auth_credentials[0].refresh_token)
    account_with_single_auth_creds.verify_all_credentials()
    assert len(account_with_single_auth_creds.valid_auth_credentials) == 0
    g_token_manager.clear_cache(account_with_single_auth_creds)

    # connect_account() takes an /expunged/ account object
    # that has the necessary relationships eager-loaded
    object_session(account_with_single_auth_creds).expunge(
        account_with_single_auth_creds)
    assert not object_session(account_with_single_auth_creds)

    account = db.session.query(GmailAccount).options(
        joinedload(GmailAccount.auth_credentials)).get(
        account_id)
    db.session.expunge(account)
    assert not object_session(account)

    g = GmailAuthHandler('gmail')

    with pytest.raises(OAuthError):
        g.connect_account(account)

    invalid_account = db.session.query(GmailAccount).get(account_id)
    for auth_creds in invalid_account.auth_credentials:
        assert not auth_creds.is_valid
开发者ID:nagyistge,项目名称:nylas-sync-engine,代码行数:30,代码来源:test_gmail_auth_credentials.py


示例7: on_after_update

def on_after_update(mapper, connection, target):
    request = getattr(target, '_request', None)
    from .documents import BaseDocument

    # Reindex old one-to-one related object
    committed_state = attributes.instance_state(target).committed_state
    columns = set()
    for field, value in committed_state.items():
        if isinstance(value, BaseDocument):
            obj_session = object_session(value)
            # Make sure object is not updated yet
            if not obj_session.is_modified(value):
                obj_session.expire(value)
            index_object(value, with_refs=False,
                         request=request)
        else:
            id_pos = field.rfind('_id')
            if id_pos >= 0:
                rel_name = field[:id_pos]
                rel = mapper.relationships.get(rel_name, False)
                if rel and any(c.name == field for c in rel.local_columns):
                    columns.add(rel_name)

    # Reload `target` to get access to processed fields values
    columns = columns.union([c.name for c in class_mapper(target.__class__).columns])
    object_session(target).expire(target, attribute_names=columns)
    index_object(target, request=request, with_refs=False, nested_only=True)

    # Reindex the item's parents. This must be done after the child has been processes
    for parent, children_field in target.get_parent_documents(nested_only=True):
        columns = [c.name for c in class_mapper(parent.__class__).columns]
        object_session(parent).expire(parent, attribute_names=columns)
        ES(parent.__class__.__name__).index_nested_document(parent, children_field, target)
开发者ID:geniusproject,项目名称:nefertari-sqla,代码行数:33,代码来源:signals.py


示例8: __init__

    def __init__(self, namespace_id=None, subject=None, thread_public_id=None,
                 started_before=None, started_after=None,
                 last_message_before=None, last_message_after=None,
                 any_email=None, to_addr=None, from_addr=None, cc_addr=None,
                 bcc_addr=None, filename=None, tag=None, detached=True):

        self.namespace_id = namespace_id
        self.subject = subject
        self.thread_public_id = thread_public_id
        self.started_before = started_before
        self.started_after = started_after
        self.last_message_before = last_message_before
        self.last_message_after = last_message_after
        self.any_email = any_email
        self.to_addr = to_addr
        self.from_addr = from_addr
        self.cc_addr = cc_addr
        self.bcc_addr = bcc_addr
        self.filename = filename
        self.tag = tag

        if detached and object_session(self) is not None:
            s = object_session(self)
            s.expunge(self)
            # Note, you can later add this object to a session by doing
            # session.merge(detached_objecdt)

        # For transaction filters
        self.filters = []
        self.setup_filters()
开发者ID:dlitz,项目名称:inbox,代码行数:30,代码来源:lens.py


示例9: test_explicit_expunge_deleted

    def test_explicit_expunge_deleted(self):
        users, User = self.tables.users, self.classes.User

        mapper(User, users)
        sess = Session()
        sess.add(User(name='x'))
        sess.commit()

        u1 = sess.query(User).first()
        sess.delete(u1)

        sess.flush()

        assert was_deleted(u1)
        assert u1 not in sess
        assert object_session(u1) is sess

        sess.expunge(u1)
        assert was_deleted(u1)
        assert u1 not in sess
        assert object_session(u1) is None

        sess.rollback()
        assert was_deleted(u1)
        assert u1 not in sess
        assert object_session(u1) is None
开发者ID:DeepakAlevoor,项目名称:sqlalchemy,代码行数:26,代码来源:test_session.py


示例10: answer_mode

def answer_mode(survey_node: AnswerableSurveyNode):
    """Get the mode of the answers."""
    type_constraint = survey_node.the_type_constraint
    allowable_types = {
        'text', 'integer', 'decimal', 'date', 'time', 'timestamp', 'location',
        'facility', 'multiple_choice'
    }
    if type_constraint not in allowable_types:
        raise InvalidTypeForOperation((type_constraint, 'mode'))
    answer_cls = ANSWER_TYPES[survey_node.the_type_constraint]
    result = (
        object_session(survey_node)
        .execute(
            sa.text(
                'SELECT MODE() WITHIN GROUP (ORDER BY main_answer)'
                ' FROM {table} JOIN {answer_table} ON'
                ' {table}.id = {answer_table}.id'
                ' WHERE {answer_table}.survey_node_id = :sn_id'.format(
                    table=answer_cls.__table__,
                    answer_table=Answer.__table__,
                )
            ),
            {'sn_id': survey_node.id}
        )
        .scalar()
    )
    if type_constraint == 'multiple_choice' and result:
        result = object_session(survey_node).query(Choice).get(str(result))
    return result
开发者ID:4sp1r3,项目名称:dokomoforms,代码行数:29,代码来源:column_properties.py


示例11: _create_object_in_session

def _create_object_in_session( obj ):
    session = object_session( obj ) if object_session is not None else None
    if session is not None:
        object_session( obj ).add( obj )
        object_session( obj ).flush()
    else:
        raise Exception( NO_SESSION_ERROR_MESSAGE )
开发者ID:ashvark,项目名称:galaxy,代码行数:7,代码来源:__init__.py


示例12: should_suppress_transaction_creation

 def should_suppress_transaction_creation(self):
     if (self in object_session(self).new or
             self in object_session(self).deleted):
         return False
     obj_state = inspect(self)
     return not (obj_state.attrs.name.history.has_changes() or
                 obj_state.attrs.description.history.has_changes() or
                 obj_state.attrs.read_only.history.has_changes())
开发者ID:GordonYip,项目名称:sync-engine,代码行数:8,代码来源:calendar.py


示例13: __exit__

 def __exit__( self, exc_type, exc_val, exc_tb ):
     if exc_type != None:
         self.add_exception_to_message( exc_type, exc_val, exc_tb )
         self.status = 'errors'
     elif self.status == 'running':
         self.status = 'success'
     orm.object_session( self ).commit()
     return True
开发者ID:odmsolutions,项目名称:Camelot,代码行数:8,代码来源:batch_job.py


示例14: fix

    def fix(self, geolocation):
        if self.latitude == 0 or self.longitude == 0:
            position = geolocation.get_position_by_name(self.address)

            self.latitude = position[0]
            self.longitude = position[1]

            object_session(self).commit()
            object_session(self).flush()
开发者ID:Ipukema,项目名称:PokeStats,代码行数:9,代码来源:Schema.py


示例15: log_activity

  def log_activity(self, sender, actor, verb, object, target=None):
    assert self.running
    entry = ActivityEntry(actor=actor, verb=verb, object=object, target=target)
    entry.object_type = object.entity_type

    if target is not None:
        entry.target_type = target.entity_type

    object_session(object).add(entry)
开发者ID:mmariani,项目名称:abilian-core,代码行数:9,代码来源:service.py


示例16: _record

 def _record(self, mapper, model, operation):
     pk = tuple(mapper.primary_key_from_instance(model))
     # FIXME: Some hack just to prevent from crashing when trying to look
     # for _model_changes attribute. Happens when loading fixutres with
     # the fixture library.
     if not hasattr(orm.object_session(model), '_model_changes'):
         orm.object_session(model)._model_changes = dict()
     orm.object_session(model)._model_changes[pk] = (model, operation)
     return EXT_CONTINUE
开发者ID:jpanganiban,项目名称:flask-sqlalchemy,代码行数:9,代码来源:flask_sqlalchemy.py


示例17: make_copy

 def make_copy( self, value, target_context, source_context ):
     value = self.wrap( value, object_session( target_context.parent ) )
     if value:
         new_value = galaxy.model.MetadataFile( dataset = target_context.parent, name = self.spec.name )
         object_session( target_context.parent ).add( new_value )
         object_session( target_context.parent ).flush()
         shutil.copy( value.file_name, new_value.file_name )
         return self.unwrap( new_value )
     return None
开发者ID:knowingchaos,项目名称:galaxy-central,代码行数:9,代码来源:metadata.py


示例18: test_current_authentication

    def test_current_authentication(self):
        from camelot.model.authentication import get_current_authentication

        authentication = get_current_authentication()
        # current authentication cache should survive
        # a session expire + expunge
        orm.object_session(authentication).expire_all()
        orm.object_session(authentication).expunge_all()
        authentication = get_current_authentication()
        self.assertTrue(authentication.username)
开发者ID:odmsolutions,项目名称:Camelot,代码行数:10,代码来源:test_model.py


示例19: get_object_state

 def get_object_state(self):
     if object_session(self) is None and not has_identity(self):
         return 'transient'
     elif object_session(self) is not None and not has_identity(self):
         return 'pending'
     elif object_session(self) is None and has_identity(self):
         return 'detached'
     elif object_session(self) is not None and has_identity(self):
         return 'persistent'
     raise Exception
开发者ID:prinzdezibel,项目名称:p2.datashackle.core,代码行数:10,代码来源:model.py


示例20: also_set_tag

 def also_set_tag(self, key, folderitem, is_remove):
     # Also add or remove the associated tag whenever a folder is added or
     # removed.
     with object_session(self).no_autoflush:
         folder = folderitem.folder
         tag = folder.get_associated_tag(object_session(self))
         if is_remove:
             self.remove_tag(tag)
         else:
             self.apply_tag(tag)
     return folderitem
开发者ID:MediaPreneur,项目名称:inbox,代码行数:11,代码来源:thread.py



注:本文中的sqlalchemy.orm.object_session函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python orm.relation函数代码示例发布时间:2022-05-27
下一篇:
Python orm.object_mapper函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap