本文整理汇总了Python中sqlalchemy.orm.session.make_transient函数的典型用法代码示例。如果您正苦于以下问题:Python make_transient函数的具体用法?Python make_transient怎么用?Python make_transient使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了make_transient函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: import_obj
def import_obj(cls, slc_to_import, slc_to_override, import_time=None):
"""Inserts or overrides slc in the database.
remote_id and import_time fields in params_dict are set to track the
slice origin and ensure correct overrides for multiple imports.
Slice.perm is used to find the datasources and connect them.
:param Slice slc_to_import: Slice object to import
:param Slice slc_to_override: Slice to replace, id matches remote_id
:returns: The resulting id for the imported slice
:rtype: int
"""
session = db.session
make_transient(slc_to_import)
slc_to_import.dashboards = []
slc_to_import.alter_params(
remote_id=slc_to_import.id, import_time=import_time)
slc_to_import = slc_to_import.copy()
params = slc_to_import.params_dict
slc_to_import.datasource_id = ConnectorRegistry.get_datasource_by_name(
session, slc_to_import.datasource_type, params['datasource_name'],
params['schema'], params['database_name']).id
if slc_to_override:
slc_to_override.override(slc_to_import)
session.flush()
return slc_to_override.id
session.add(slc_to_import)
logging.info('Final slice: {}'.format(slc_to_import.to_json()))
session.flush()
return slc_to_import.id
开发者ID:neuroradiology,项目名称:caravel,代码行数:31,代码来源:core.py
示例2: cash_instances_to_dict
def cash_instances_to_dict(self, list_of_classes, do_make_transient = False):
#Закрываем сессию. Открываем новую и считываем в нее все классы. Отвязываем их от сессии.
#Возвращаем список инстансов в виде словаря. Надеюсь, это поможет работать с ними сколь угодно много..
#Была идея оставить возможность не закрывать сесиию - отказался. В худшем случае, можно отдельную сессию создавать.
#Но две одновременные сессии - тоже опасно.
self.close_session()
self.private_activate_session()
dict_with_instances = dict()
for cls_i in list_of_classes: #Интересно, нужно ли как-то особо считывать взаимосвязи
repr_cls_i = with_polymorphic(cls_i, '*')
inst_list = []
for inst_i in self.active_session.query(repr_cls_i).options(immediateload('*')).all():
#if not(inst_i in inst_list):
inst_list.append(inst_i)
dict_with_instances[cls_i.__name__] = inst_list
self.active_session.expunge_all() #именно поэтому закрываем сессию до запуска
for inst_list in dict_with_instances.itervalues():
for inst_i in inst_list:
if hasattr(inst_i, "disconnected_from_session"):
raise BaseException("[c_session_handler][cash_instances_to_dict] you cannot use 'disconnected_from_session' attribute in a class here")
inst_i.disconnected_from_session = True
if do_make_transient: #Без этого может пытаться обратиться к базе данных
make_transient(inst_i)
self.close_session()
return dict_with_instances
开发者ID:vyakhorev,项目名称:AlcheView,代码行数:25,代码来源:db_handlers.py
示例3: test_import_override_dashboard_2_slices
def test_import_override_dashboard_2_slices(self):
e_slc = self.create_slice('e_slc', id=10009, table_name='energy_usage')
b_slc = self.create_slice('b_slc', id=10010, table_name='birth_names')
dash_to_import = self.create_dashboard(
'override_dashboard', slcs=[e_slc, b_slc], id=10004)
imported_dash_id_1 = models.Dashboard.import_obj(
dash_to_import, import_time=1992)
# create new instances of the slices
e_slc = self.create_slice(
'e_slc', id=10009, table_name='energy_usage')
b_slc = self.create_slice(
'b_slc', id=10010, table_name='birth_names')
c_slc = self.create_slice('c_slc', id=10011, table_name='birth_names')
dash_to_import_override = self.create_dashboard(
'override_dashboard_new', slcs=[e_slc, b_slc, c_slc], id=10004)
imported_dash_id_2 = models.Dashboard.import_obj(
dash_to_import_override, import_time=1992)
# override doesn't change the id
self.assertEquals(imported_dash_id_1, imported_dash_id_2)
expected_dash = self.create_dashboard(
'override_dashboard_new', slcs=[e_slc, b_slc, c_slc], id=10004)
make_transient(expected_dash)
imported_dash = self.get_dash(imported_dash_id_2)
self.assert_dash_equals(
expected_dash, imported_dash, check_position=False)
self.assertEquals({"remote_id": 10004, "import_time": 1992},
json.loads(imported_dash.json_metadata))
开发者ID:7rack,项目名称:caravel,代码行数:29,代码来源:import_export_tests.py
示例4: _copy
def _copy(self):
schema = NavigationCopySchema().bind(request=self.request)
try:
controls = schema.deserialize(self.request.params)
(
DBSession.query(Navigation)
.filter(
Navigation.condition_position_id(
controls.get('position_id')
)
)
.delete()
)
navigations_from = (
DBSession.query(Navigation)
.filter(
Navigation.condition_position_id(
controls.get('from_position_id')
)
)
)
for navigation in navigations_from:
make_transient(navigation)
navigation.id = None
navigation.position_id = controls.get('position_id')
DBSession.add(navigation)
return {'success_message': _(u'Copied')}
except colander.Invalid, e:
return {
'error_message': _(u'Please, check errors'),
'errors': e.asdict()
}
开发者ID:djoudi,项目名称:travelcrm,代码行数:32,代码来源:navigations.py
示例5: restore
def restore(request, coupon_id):
if coupon_id:
coupon_id = int(coupon_id)
if request.user.is_focus:
coupon = request.db_session.query(Coupon).filter_by(id=coupon_id, is_deleted=True).first()
else:
coupon = request.db_session.query(Coupon).filter_by(id=coupon_id, is_deleted=True, company_id=request.user.company_id).first()
try:
coupon.modified_by_id = request.user.id
request.db_session.commit()
request.db_session.expunge(coupon)
make_transient(coupon)
coupon.main_coupon_id = coupon.main_coupon_id if coupon.main_coupon_id else coupon.id
coupon.id = None
coupon.restore()
request.db_session.add(coupon)
request.db_session.commit()
response = json_response_content('success', 'Coupon was successfully restored!')
except Exception as e:
request.db_session.rollback()
response = json_response_content('error', 'There was an error during coupon restore: {0}'.format(str(e)))
return JsonResponse(response)
else:
raise Http404
开发者ID:adam1978828,项目名称:webapp1,代码行数:26,代码来源:views.py
示例6: test_transient_exception
def test_transient_exception(self):
"""An object that goes from a pk value to transient/pending
doesn't count as a "pk" switch.
"""
users, Address, addresses, User = (self.tables.users,
self.classes.Address,
self.tables.addresses,
self.classes.User)
mapper(User, users)
mapper(Address, addresses, properties={'user':relationship(User)})
sess = create_session()
u1 = User(id=5, name='u1')
ad1 = Address(email_address='e1', user=u1)
sess.add_all([u1, ad1])
sess.flush()
make_transient(u1)
u1.id = None
u1.username='u2'
sess.add(u1)
sess.flush()
eq_(ad1.user_id, 5)
sess.expire_all()
eq_(ad1.user_id, 5)
ne_(u1.id, 5)
ne_(u1.id, None)
eq_(sess.query(User).count(), 2)
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:33,代码来源:test_naturalpks.py
示例7: import_obj
def import_obj(cls, slc_to_import, import_time=None):
"""Inserts or overrides slc in the database.
remote_id and import_time fields in params_dict are set to track the
slice origin and ensure correct overrides for multiple imports.
Slice.perm is used to find the datasources and connect them.
"""
session = db.session
make_transient(slc_to_import)
slc_to_import.dashboards = []
slc_to_import.alter_params(
remote_id=slc_to_import.id, import_time=import_time)
# find if the slice was already imported
slc_to_override = None
for slc in session.query(Slice).all():
if ('remote_id' in slc.params_dict and
slc.params_dict['remote_id'] == slc_to_import.id):
slc_to_override = slc
slc_to_import = slc_to_import.copy()
params = slc_to_import.params_dict
slc_to_import.datasource_id = ConnectorRegistry.get_datasource_by_name(
session, slc_to_import.datasource_type, params['datasource_name'],
params['schema'], params['database_name']).id
if slc_to_override:
slc_to_override.override(slc_to_import)
session.flush()
return slc_to_override.id
session.add(slc_to_import)
logging.info('Final slice: {}'.format(slc_to_import.to_json()))
session.flush()
return slc_to_import.id
开发者ID:avsolatorio,项目名称:caravel,代码行数:33,代码来源:core.py
示例8: copy_notification
def copy_notification(id):
notification = Notification.query.filter_by(id=id).first_or_404()
desc = notification.description
if notification.user != current_user and not current_user.is_admin():
abort(403)
notification.id = None
notification.description = desc + ' Clone'
make_transient(notification)
db.session.add(notification)
db.session.commit()
old_settings = NotificationSetting.query.filter_by(notification_id=id).all()
for s in old_settings:
s.id = None
s.notification_id = notification.id
make_transient(s)
db.session.add(s)
db.session.commit()
current_app.decoder.refresh_notifier(notification.id)
flash('Notification cloned.', 'success')
return redirect(url_for('notifications.index'))
开发者ID:nutechsoftware,项目名称:alarmdecoder-webapp,代码行数:28,代码来源:views.py
示例9: run
def run(self):
if statsd:
statsd.incr(self.__class__.__name__.lower()+'_start', 1, 1)
# Adding an entry in the DB
session = settings.Session()
self.state = State.RUNNING
if not self.id:
session.add(self)
else:
session.merge(self)
session.commit()
id_ = self.id
make_transient(self)
self.id = id_
# Run
self._execute()
#better to submit tasks and poll for completion...
# Marking the success in the DB
self.end_date = datetime.now()
self.state = State.SUCCESS
session.merge(self)
session.commit()
session.close()
if statsd:
statsd.incr(self.__class__.__name__.lower()+'_end', 1, 1)
开发者ID:russelldurrett,项目名称:airflow,代码行数:28,代码来源:jobs.py
示例10: copy_from_position
def copy_from_position(source_position_id, target_position_id):
assert isinstance(source_position_id, int), \
u"Integer expected"
assert isinstance(target_position_id, int), \
u"Integer expected"
(
DBSession.query(Permision)
.filter(
Permision.condition_position_id(target_position_id)
)
.delete()
)
permisions_from = (
DBSession.query(Permision)
.filter(
Permision.condition_position_id(source_position_id)
)
)
for permision in permisions_from:
make_transient(permision)
permision.id = None
permision.position_id = target_position_id
DBSession.add(permision)
return
开发者ID:alishir,项目名称:tcr,代码行数:25,代码来源:permisions.py
示例11: update_scorer
def update_scorer(scorer):
# detach from the session to load the database instance to compare
scorer_id = scorer.id
make_transient(scorer)
scorer.id = scorer_id
scorer_db = Scorer.query.get(scorer_id)
if scorer_db.scorer == scorer.scorer:
return
# reattach the instance
scorer = db.session.merge(scorer)
bets = BetScorer.query.filter(or_(BetScorer.scorer1==scorer, BetScorer.scorer2==scorer))
for bet in bets:
if not scorer.scorer and bet.score == 0:
continue
if scorer.scorer:
bet.score += SCORER_POINTS
else:
bet.score -= SCORER_POINTS
# pass as param since the user.bet_scorer backref is None here
update_total_score(bet.user, bet_scorer=bet)
db.session.commit()
开发者ID:tupy,项目名称:bolao,代码行数:26,代码来源:tasks.py
示例12: restore_provider_details
def restore_provider_details(notify_db, notify_db_session):
"""
We view ProviderDetails as a static in notify_db_session, since we don't modify it... except we do, we updated
priority. This fixture is designed to be used in tests that will knowingly touch provider details, to restore them
to previous state.
Note: This doesn't technically require notify_db_session (only notify_db), but kept as a requirement to encourage
good usage - if you're modifying ProviderDetails' state then it's good to clear down the rest of the DB too
"""
existing_provider_details = ProviderDetails.query.all()
existing_provider_details_history = ProviderDetailsHistory.query.all()
# make transient removes the objects from the session - since we'll want to delete them later
for epd in existing_provider_details:
make_transient(epd)
for epdh in existing_provider_details_history:
make_transient(epdh)
yield
# also delete these as they depend on provider_details
ProviderRates.query.delete()
ProviderDetails.query.delete()
ProviderDetailsHistory.query.delete()
notify_db.session.commit()
notify_db.session.add_all(existing_provider_details)
notify_db.session.add_all(existing_provider_details_history)
notify_db.session.commit()
开发者ID:alphagov,项目名称:notifications-api,代码行数:27,代码来源:conftest.py
示例13: test_import_dashboard_1_slice
def test_import_dashboard_1_slice(self):
slc = self.create_slice('health_slc', id=10006)
dash_with_1_slice = self.create_dashboard(
'dash_with_1_slice', slcs=[slc], id=10002)
dash_with_1_slice.position_json = """
[{{
"col": 5,
"row": 10,
"size_x": 4,
"size_y": 2,
"slice_id": "{}"
}}]
""".format(slc.id)
imported_dash_id = models.Dashboard.import_obj(
dash_with_1_slice, import_time=1990)
imported_dash = self.get_dash(imported_dash_id)
expected_dash = self.create_dashboard(
'dash_with_1_slice', slcs=[slc], id=10002)
make_transient(expected_dash)
self.assert_dash_equals(
expected_dash, imported_dash, check_position=False)
self.assertEquals({"remote_id": 10002, "import_time": 1990},
json.loads(imported_dash.json_metadata))
expected_position = dash_with_1_slice.position_array
expected_position[0]['slice_id'] = '{}'.format(
imported_dash.slices[0].id)
self.assertEquals(expected_position, imported_dash.position_array)
开发者ID:7rack,项目名称:caravel,代码行数:29,代码来源:import_export_tests.py
示例14: test_import_dashboard_1_slice
def test_import_dashboard_1_slice(self):
slc = self.create_slice('health_slc', id=10006)
dash_with_1_slice = self.create_dashboard(
'dash_with_1_slice', slcs=[slc], id=10002)
dash_with_1_slice.position_json = """
{{"DASHBOARD_VERSION_KEY": "v2",
"DASHBOARD_CHART_TYPE-{0}": {{
"type": "DASHBOARD_CHART_TYPE",
"id": {0},
"children": [],
"meta": {{
"width": 4,
"height": 50,
"chartId": {0}
}}
}}
}}
""".format(slc.id)
imported_dash_id = models.Dashboard.import_obj(
dash_with_1_slice, import_time=1990)
imported_dash = self.get_dash(imported_dash_id)
expected_dash = self.create_dashboard(
'dash_with_1_slice', slcs=[slc], id=10002)
make_transient(expected_dash)
self.assert_dash_equals(
expected_dash, imported_dash, check_position=False)
self.assertEquals({'remote_id': 10002, 'import_time': 1990},
json.loads(imported_dash.json_metadata))
expected_position = dash_with_1_slice.position
self.assertEquals(expected_position, imported_dash.position)
开发者ID:bkyryliuk,项目名称:caravel,代码行数:32,代码来源:import_export_tests.py
示例15: copy
def copy(self):
db.session.expunge(self)
make_transient(self)
self.id = None
db.session.add(self)
db.session.flush()
return self
开发者ID:rhayes777,项目名称:scripts,代码行数:8,代码来源:database.py
示例16: main
def main():
with sessionmanager as session:
for x in session.query(NPriceList).filter_by(almacen_id=1):
session.expunge(x)
make_transient(x)
x.pid = None
x.almacen_id = 3
session.add(x)
开发者ID:qihqi,项目名称:HenryFACTService,代码行数:8,代码来源:backfill_corp.py
示例17: test_sched_a_fulltext_trigger
def test_sched_a_fulltext_trigger(self):
# Test create
nml_row = self.NmlSchedAFactory(
rpt_yr=2014,
contbr_nm='Sheldon Adelson',
contb_receipt_dt=datetime.datetime(2014, 1, 1)
)
self.FItemReceiptOrExp(
sub_id=nml_row.sub_id,
rpt_yr=2014,
)
db.session.commit()
manage.refresh_itemized()
search = models.ScheduleA.query.filter(
models.ScheduleA.sub_id == nml_row.sub_id
).one()
self.assertEqual(search.contributor_name_text, "'adelson':2 'sheldon':1")
# Test update
nml_row.contbr_nm = 'Shelly Adelson'
db.session.add(nml_row)
db.session.commit()
manage.refresh_itemized()
search = models.ScheduleA.query.filter(
models.ScheduleA.sub_id == nml_row.sub_id
).one()
db.session.refresh(search)
self.assertEqual(search.contributor_name_text, "'adelson':2 'shelli':1")
# Test delete
db.session.delete(nml_row)
db.session.commit()
manage.refresh_itemized()
self.assertEqual(
models.ScheduleA.query.filter(
models.ScheduleA.sub_id == nml_row.sub_id
).count(),
0,
)
# Test sequential writes
make_transient(nml_row)
db.session.add(nml_row)
db.session.commit()
db.session.delete(nml_row)
db.session.commit()
make_transient(nml_row)
db.session.add(nml_row)
db.session.commit()
manage.refresh_itemized()
self.assertEqual(
models.ScheduleA.query.filter(
models.ScheduleA.sub_id == nml_row.sub_id
).count(),
1,
)
开发者ID:18F,项目名称:openFEC,代码行数:58,代码来源:test_integration.py
示例18: save
def save(self):
data = self.data
# remove the button
data.pop('submit', None)
forum = Forum(**data)
# flush SQLA info from created instance so that it can be merged
make_transient(forum)
make_transient_to_detached(forum)
return forum.save()
开发者ID:nathanhilbert,项目名称:FPA_Core,代码行数:10,代码来源:forms.py
示例19: test_sched_a_fulltext_trigger
def test_sched_a_fulltext_trigger(self):
# Test create
row = self.SchedAFactory(
rpt_yr=2014,
contbr_nm='Sheldon Adelson',
load_date=datetime.datetime.now(),
sub_id=7,
)
db.session.commit()
db.session.execute('select update_aggregates()')
search = models.ScheduleA.query.filter(
models.ScheduleA.sched_a_sk == row.sched_a_sk
).one()
self.assertEqual(search.contributor_name_text, "'adelson':2 'sheldon':1")
# Test update
row.contbr_nm = 'Shelly Adelson'
db.session.add(row)
db.session.commit()
db.session.execute('select update_aggregates()')
search = models.ScheduleA.query.filter(
models.ScheduleA.sched_a_sk == row.sched_a_sk
).one()
db.session.refresh(search)
self.assertEqual(search.contributor_name_text, "'adelson':2 'shelli':1")
# Test delete
db.session.delete(row)
db.session.commit()
db.session.execute('select update_aggregates()')
self.assertEqual(
models.ScheduleA.query.filter(
models.ScheduleA.sched_a_sk == row.sched_a_sk
).count(),
0,
)
# Test sequential writes
make_transient(row)
db.session.add(row)
db.session.commit()
db.session.delete(row)
db.session.commit()
make_transient(row)
db.session.add(row)
db.session.commit()
db.session.execute('select update_aggregates()')
self.assertEqual(
models.ScheduleA.query.filter(
models.ScheduleA.sched_a_sk == row.sched_a_sk
).count(),
1,
)
开发者ID:WhitTip,项目名称:openFEC,代码行数:55,代码来源:test_integration.py
示例20: save
def save(self):
data = self.data
# delete submit and csrf_token from data
data.pop('submit', None)
data.pop('csrf_token', None)
forum = Forum(**data)
# flush SQLA info from created instance so that it can be merged
make_transient(forum)
make_transient_to_detached(forum)
return forum.save()
开发者ID:eirnym,项目名称:flaskbb,代码行数:11,代码来源:forms.py
注:本文中的sqlalchemy.orm.session.make_transient函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论