本文整理汇总了Python中sqlalchemy.orm.object_session函数的典型用法代码示例。如果您正苦于以下问题:Python object_session函数的具体用法?Python object_session怎么用?Python object_session使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了object_session函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_i18n
def test_i18n( self ):
from camelot.model.i18n import Translation, ExportAsPO
session = Session()
session.execute( Translation.__table__.delete() )
self.assertEqual( Translation.translate( 'bucket', 'nl_BE' ), None )
# run twice to check all branches in the code
Translation.translate_or_register( 'bucket', 'nl_BE' )
Translation.translate_or_register( 'bucket', 'nl_BE' )
self.assertEqual( Translation.translate( 'bucket', 'nl_BE' ), 'bucket' )
self.assertEqual( Translation.translate( '', 'nl_BE' ), '' )
self.assertEqual( Translation.translate_or_register( '', 'nl_BE' ), '' )
# clear the cache
Translation._cache.clear()
# fill the cache again
translation = Translation( language = 'nl_BE', source = 'bucket',
value = 'emmer', uid=1 )
orm.object_session( translation ).flush()
self.assertEqual( Translation.translate( 'bucket', 'nl_BE' ), 'emmer' )
export_action = ExportAsPO()
model_context = MockModelContext()
model_context.obj = translation
try:
generator = export_action.model_run( model_context )
file_step = generator.next()
generator.send( ['/tmp/test.po'] )
except StopIteration:
pass
开发者ID:jeroendierckx,项目名称:Camelot,代码行数:27,代码来源:test_model.py
示例2: longslit_calibrate_to_database
def longslit_calibrate_to_database(self, gmos_long_slit_arc_calibration=None, destination_dir='.', force=False):
session = object_session(self)
if self.wave_cal is not None:
if not force:
raise GMOSDatabaseDuplicate('This arc already has a wavelength calibration {0}. '
'Use force=True to override'.format(self.wave_cal))
else:
session.delete(self.wave_cal)
session.commit()
arctab, linesall, shift, fake = self.longslit_calibrate(gmos_long_slit_arc_calibration=gmos_long_slit_arc_calibration)
wavecal_store_fname = 'arccal-{}.h5'.format(self.raw.fits.fname.replace('.fits',''))
wavecal_store_full_path = os.path.join(destination_dir, wavecal_store_fname)
arctab.write(wavecal_store_full_path, path='arc', overwrite=True, format='hdf5')
#linesall.write(wavecal_store_full_path, path='lines', append=True, format='hdf5')
session = object_session(self)
gmos_long_slit_wavecal = GMOSLongSlitArcWavelengthSolution(id = self.id, fname=wavecal_store_fname,
path=os.path.abspath(destination_dir))
session.add(gmos_long_slit_wavecal)
session.commit()
return gmos_long_slit_wavecal
开发者ID:wkerzendorf,项目名称:geminiutil,代码行数:28,代码来源:gmos_alchemy.py
示例3: last_modified
def last_modified(self):
""" Returns last change of the elections. """
changes = [self.last_change, self.last_result_change]
session = object_session(self)
election_ids = [election.id for election in self.elections]
# Get the last election change
result = object_session(self).query(Election.last_change)
result = result.order_by(desc(Election.last_change))
result = result.filter(Election.id.in_(election_ids))
changes.append(result.first()[0] if result.first() else None)
# Get the last candidate change
result = object_session(self).query(Candidate.last_change)
result = result.order_by(desc(Candidate.last_change))
result = result.filter(Candidate.election_id.in_(election_ids))
changes.append(result.first()[0] if result.first() else None)
# Get the last list connection change
result = session.query(ListConnection.last_change)
result = result.order_by(desc(ListConnection.last_change))
result = result.filter(ListConnection.election_id.in_(election_ids))
changes.append(result.first()[0] if result.first() else None)
# Get the last list change
result = session.query(List.last_change)
result = result.order_by(desc(List.last_change))
result = result.filter(List.election_id == self.id)
changes.append(result.first()[0] if result.first() else None)
changes = [change for change in changes if change]
return max(changes) if changes else None
开发者ID:OneGov,项目名称:onegov.ballot,代码行数:33,代码来源:election_compound.py
示例4: __getattr__
def __getattr__( self, name ):
if name in self.spec:
if name in self.parent._metadata:
return self.spec[name].wrap( self.parent._metadata[name], object_session( self.parent ) )
return self.spec[name].wrap( self.spec[name].default, object_session( self.parent ) )
if name in self.parent._metadata:
return self.parent._metadata[name]
开发者ID:knowingchaos,项目名称:galaxy-central,代码行数:7,代码来源:metadata.py
示例5: set_session_profile
def set_session_profile(survey, survey_session, profile):
"""Set up the survey session using a given profile.
:param survey: the survey to use
:type survey: :py:class:`euphorie.content.survey.Survey`
:param survey_session: survey session to update
:type survey_session: :py:class:`euphorie.client.model.SurveySession`
:param dict profile: desired profile
:rtype: :py:class:`euphorie.client.model.SurveySession`
:return: the update session (this might be a new session)
This will rebuild the survey session tree if the profile has changed.
"""
if not survey_session.hasTree():
BuildSurveyTree(survey, profile, survey_session)
return survey_session
current_profile = extractProfile(survey, survey_session)
if current_profile == profile and not treeChanges(survey_session, survey):
survey_session.touch()
return survey_session
new_session = create_survey_session(
survey_session.title, survey, survey_session.account)
BuildSurveyTree(survey, profile, new_session, survey_session)
new_session.copySessionData(survey_session)
object_session(survey_session).delete(survey_session)
return new_session
开发者ID:euphorie,项目名称:Euphorie,代码行数:28,代码来源:profile.py
示例6: test_invalid_token_during_connect
def test_invalid_token_during_connect(db, patch_access_token_getter,
account_with_single_auth_creds):
account_id = account_with_single_auth_creds.id
patch_access_token_getter.revoke_refresh_token(
account_with_single_auth_creds.auth_credentials[0].refresh_token)
account_with_single_auth_creds.verify_all_credentials()
assert len(account_with_single_auth_creds.valid_auth_credentials) == 0
g_token_manager.clear_cache(account_with_single_auth_creds)
# connect_account() takes an /expunged/ account object
# that has the necessary relationships eager-loaded
object_session(account_with_single_auth_creds).expunge(
account_with_single_auth_creds)
assert not object_session(account_with_single_auth_creds)
account = db.session.query(GmailAccount).options(
joinedload(GmailAccount.auth_credentials)).get(
account_id)
db.session.expunge(account)
assert not object_session(account)
g = GmailAuthHandler('gmail')
with pytest.raises(OAuthError):
g.connect_account(account)
invalid_account = db.session.query(GmailAccount).get(account_id)
for auth_creds in invalid_account.auth_credentials:
assert not auth_creds.is_valid
开发者ID:nagyistge,项目名称:nylas-sync-engine,代码行数:30,代码来源:test_gmail_auth_credentials.py
示例7: on_after_update
def on_after_update(mapper, connection, target):
request = getattr(target, '_request', None)
from .documents import BaseDocument
# Reindex old one-to-one related object
committed_state = attributes.instance_state(target).committed_state
columns = set()
for field, value in committed_state.items():
if isinstance(value, BaseDocument):
obj_session = object_session(value)
# Make sure object is not updated yet
if not obj_session.is_modified(value):
obj_session.expire(value)
index_object(value, with_refs=False,
request=request)
else:
id_pos = field.rfind('_id')
if id_pos >= 0:
rel_name = field[:id_pos]
rel = mapper.relationships.get(rel_name, False)
if rel and any(c.name == field for c in rel.local_columns):
columns.add(rel_name)
# Reload `target` to get access to processed fields values
columns = columns.union([c.name for c in class_mapper(target.__class__).columns])
object_session(target).expire(target, attribute_names=columns)
index_object(target, request=request, with_refs=False, nested_only=True)
# Reindex the item's parents. This must be done after the child has been processes
for parent, children_field in target.get_parent_documents(nested_only=True):
columns = [c.name for c in class_mapper(parent.__class__).columns]
object_session(parent).expire(parent, attribute_names=columns)
ES(parent.__class__.__name__).index_nested_document(parent, children_field, target)
开发者ID:geniusproject,项目名称:nefertari-sqla,代码行数:33,代码来源:signals.py
示例8: __init__
def __init__(self, namespace_id=None, subject=None, thread_public_id=None,
started_before=None, started_after=None,
last_message_before=None, last_message_after=None,
any_email=None, to_addr=None, from_addr=None, cc_addr=None,
bcc_addr=None, filename=None, tag=None, detached=True):
self.namespace_id = namespace_id
self.subject = subject
self.thread_public_id = thread_public_id
self.started_before = started_before
self.started_after = started_after
self.last_message_before = last_message_before
self.last_message_after = last_message_after
self.any_email = any_email
self.to_addr = to_addr
self.from_addr = from_addr
self.cc_addr = cc_addr
self.bcc_addr = bcc_addr
self.filename = filename
self.tag = tag
if detached and object_session(self) is not None:
s = object_session(self)
s.expunge(self)
# Note, you can later add this object to a session by doing
# session.merge(detached_objecdt)
# For transaction filters
self.filters = []
self.setup_filters()
开发者ID:dlitz,项目名称:inbox,代码行数:30,代码来源:lens.py
示例9: test_explicit_expunge_deleted
def test_explicit_expunge_deleted(self):
users, User = self.tables.users, self.classes.User
mapper(User, users)
sess = Session()
sess.add(User(name='x'))
sess.commit()
u1 = sess.query(User).first()
sess.delete(u1)
sess.flush()
assert was_deleted(u1)
assert u1 not in sess
assert object_session(u1) is sess
sess.expunge(u1)
assert was_deleted(u1)
assert u1 not in sess
assert object_session(u1) is None
sess.rollback()
assert was_deleted(u1)
assert u1 not in sess
assert object_session(u1) is None
开发者ID:DeepakAlevoor,项目名称:sqlalchemy,代码行数:26,代码来源:test_session.py
示例10: answer_mode
def answer_mode(survey_node: AnswerableSurveyNode):
"""Get the mode of the answers."""
type_constraint = survey_node.the_type_constraint
allowable_types = {
'text', 'integer', 'decimal', 'date', 'time', 'timestamp', 'location',
'facility', 'multiple_choice'
}
if type_constraint not in allowable_types:
raise InvalidTypeForOperation((type_constraint, 'mode'))
answer_cls = ANSWER_TYPES[survey_node.the_type_constraint]
result = (
object_session(survey_node)
.execute(
sa.text(
'SELECT MODE() WITHIN GROUP (ORDER BY main_answer)'
' FROM {table} JOIN {answer_table} ON'
' {table}.id = {answer_table}.id'
' WHERE {answer_table}.survey_node_id = :sn_id'.format(
table=answer_cls.__table__,
answer_table=Answer.__table__,
)
),
{'sn_id': survey_node.id}
)
.scalar()
)
if type_constraint == 'multiple_choice' and result:
result = object_session(survey_node).query(Choice).get(str(result))
return result
开发者ID:4sp1r3,项目名称:dokomoforms,代码行数:29,代码来源:column_properties.py
示例11: _create_object_in_session
def _create_object_in_session( obj ):
session = object_session( obj ) if object_session is not None else None
if session is not None:
object_session( obj ).add( obj )
object_session( obj ).flush()
else:
raise Exception( NO_SESSION_ERROR_MESSAGE )
开发者ID:ashvark,项目名称:galaxy,代码行数:7,代码来源:__init__.py
示例12: should_suppress_transaction_creation
def should_suppress_transaction_creation(self):
if (self in object_session(self).new or
self in object_session(self).deleted):
return False
obj_state = inspect(self)
return not (obj_state.attrs.name.history.has_changes() or
obj_state.attrs.description.history.has_changes() or
obj_state.attrs.read_only.history.has_changes())
开发者ID:GordonYip,项目名称:sync-engine,代码行数:8,代码来源:calendar.py
示例13: __exit__
def __exit__( self, exc_type, exc_val, exc_tb ):
if exc_type != None:
self.add_exception_to_message( exc_type, exc_val, exc_tb )
self.status = 'errors'
elif self.status == 'running':
self.status = 'success'
orm.object_session( self ).commit()
return True
开发者ID:odmsolutions,项目名称:Camelot,代码行数:8,代码来源:batch_job.py
示例14: fix
def fix(self, geolocation):
if self.latitude == 0 or self.longitude == 0:
position = geolocation.get_position_by_name(self.address)
self.latitude = position[0]
self.longitude = position[1]
object_session(self).commit()
object_session(self).flush()
开发者ID:Ipukema,项目名称:PokeStats,代码行数:9,代码来源:Schema.py
示例15: log_activity
def log_activity(self, sender, actor, verb, object, target=None):
assert self.running
entry = ActivityEntry(actor=actor, verb=verb, object=object, target=target)
entry.object_type = object.entity_type
if target is not None:
entry.target_type = target.entity_type
object_session(object).add(entry)
开发者ID:mmariani,项目名称:abilian-core,代码行数:9,代码来源:service.py
示例16: _record
def _record(self, mapper, model, operation):
pk = tuple(mapper.primary_key_from_instance(model))
# FIXME: Some hack just to prevent from crashing when trying to look
# for _model_changes attribute. Happens when loading fixutres with
# the fixture library.
if not hasattr(orm.object_session(model), '_model_changes'):
orm.object_session(model)._model_changes = dict()
orm.object_session(model)._model_changes[pk] = (model, operation)
return EXT_CONTINUE
开发者ID:jpanganiban,项目名称:flask-sqlalchemy,代码行数:9,代码来源:flask_sqlalchemy.py
示例17: make_copy
def make_copy( self, value, target_context, source_context ):
value = self.wrap( value, object_session( target_context.parent ) )
if value:
new_value = galaxy.model.MetadataFile( dataset = target_context.parent, name = self.spec.name )
object_session( target_context.parent ).add( new_value )
object_session( target_context.parent ).flush()
shutil.copy( value.file_name, new_value.file_name )
return self.unwrap( new_value )
return None
开发者ID:knowingchaos,项目名称:galaxy-central,代码行数:9,代码来源:metadata.py
示例18: test_current_authentication
def test_current_authentication(self):
from camelot.model.authentication import get_current_authentication
authentication = get_current_authentication()
# current authentication cache should survive
# a session expire + expunge
orm.object_session(authentication).expire_all()
orm.object_session(authentication).expunge_all()
authentication = get_current_authentication()
self.assertTrue(authentication.username)
开发者ID:odmsolutions,项目名称:Camelot,代码行数:10,代码来源:test_model.py
示例19: get_object_state
def get_object_state(self):
if object_session(self) is None and not has_identity(self):
return 'transient'
elif object_session(self) is not None and not has_identity(self):
return 'pending'
elif object_session(self) is None and has_identity(self):
return 'detached'
elif object_session(self) is not None and has_identity(self):
return 'persistent'
raise Exception
开发者ID:prinzdezibel,项目名称:p2.datashackle.core,代码行数:10,代码来源:model.py
示例20: also_set_tag
def also_set_tag(self, key, folderitem, is_remove):
# Also add or remove the associated tag whenever a folder is added or
# removed.
with object_session(self).no_autoflush:
folder = folderitem.folder
tag = folder.get_associated_tag(object_session(self))
if is_remove:
self.remove_tag(tag)
else:
self.apply_tag(tag)
return folderitem
开发者ID:MediaPreneur,项目名称:inbox,代码行数:11,代码来源:thread.py
注:本文中的sqlalchemy.orm.object_session函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论