本文整理汇总了Python中sqlalchemy.orm.session.Session类的典型用法代码示例。如果您正苦于以下问题:Python Session类的具体用法?Python Session怎么用?Python Session使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Session类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: create_initial_application_acl
def create_initial_application_acl(mapper, connection, target):
if target.application_type == SVN:
acl_rules = [
('internal_developer', 'edit'),
('internal_developer', 'view'),
('external_developer', 'edit'),
('external_developer', 'view'),
]
else:
acl_rules = [
('internal_developer', 'view'),
('external_developer', 'view'),
('secretary', 'view'),
('secretary', 'edit'),
]
if target.application_type == 'trac':
acl_rules.append(('customer', 'view'))
for role_id, permission_name in acl_rules:
acl = Session.object_session(target).query(ApplicationACL).get((target.id, role_id, permission_name))
if not acl:
acl = ApplicationACL(application_id=target.id,
role_id=role_id,
permission_name=permission_name)
Session.object_session(target).add(acl)
else:
# XXX this should not happen.
pass
开发者ID:getpenelope,项目名称:penelope.models,代码行数:29,代码来源:dashboard.py
示例2: __init__
def __init__(self, db, autocommit=False, autoflush=False, **options):
self.sender = db.sender
self._model_changes = {}
Session.__init__(self, autocommit=autocommit, autoflush=autoflush,
expire_on_commit=False,
extension=db.session_extensions,
bind=db.engine, **options)
开发者ID:Magnil,项目名称:pypress-tornado,代码行数:7,代码来源:sqlalchemy.py
示例3: SbciFinanceDB
class SbciFinanceDB(object):
'''TODO'''
def __init__(self, verbose=0, *args, **kwds): # @UnusedVariable
super(SbciFinanceDB, self).__init__(*args, **kwds)
if not os.access(FINANCEDB_FILE, os.R_OK | os.W_OK):
raise RuntimeError('cannot access Finance DB file ({}) for R/W!'
.format(FINANCEDB_FILE))
self.Base = automap_base()
self.engine = create_engine('sqlite:///' + FINANCEDB_FILE)
self.Base.prepare(self.engine, reflect=True)
self.Categories = self.Base.classes.categories
self.Seasons = self.Base.classes.seasons
self.Cheques = self.Base.classes.cheques
self.Transactions = self.Base.classes.transactions
self.Trybooking = self.Base.classes.trybooking
self.dbsession = Session(self.engine)
self.categories_query = self.dbsession.query(self.Categories)
self.seasons_query = self.dbsession.query(self.Seasons)
self.cheques_query = self.dbsession.query(self.Cheques)
self.transactions_query = self.dbsession.query(self.Transactions)
self.trybooking_query = self.dbsession.query(self.Trybooking)
开发者ID:mjjensen,项目名称:shootersbasketball,代码行数:29,代码来源:financedb.py
示例4: __init__
def __init__(self, db, autocommit=False, autoflush=False, **options):
self.app = db.get_app()
self._model_changes = {}
Session.__init__(self, autocommit=autocommit, autoflush=autoflush,
extension=db.session_extensions,
bind=db.engine,
binds=db.get_binds(self.app), **options)
开发者ID:mishok13,项目名称:flask-sqlalchemy,代码行数:7,代码来源:flask_sqlalchemy.py
示例5: test_ui_cancel_withdraw
def test_ui_cancel_withdraw(
logged_in_wallet_user_browser: DriverAPI, dbsession: Session, user_phone_number, top_up_user, eth_asset_id
):
"""Create new account through UI."""
# Record balance before cancel
with transaction.manager:
uca = dbsession.query(UserCryptoAddress).first()
asset = dbsession.query(Asset).get(eth_asset_id)
original_balance = uca.get_crypto_account(asset).account.get_balance()
# Go to address
b = logged_in_wallet_user_browser
b.find_by_css("#nav-wallet").click()
b.find_by_css("#row-asset-{} a.withdraw-asset".format(eth_asset_id)).click()
b.fill("address", TEST_ADDRESS)
b.fill("amount", "0.1")
b.find_by_css("button[name='process']").click()
# We should arrive to the confirmation page
assert b.is_element_present_by_css("#heading-confirm-withdraw")
b.find_by_css("button[name='cancel']").click()
# We got text box telling withdraw was cancelled
assert b.is_element_present_by_css("#msg-withdraw-cancelled")
# Balance should be back to original
with transaction.manager:
uca = dbsession.query(UserCryptoAddress).first()
asset = dbsession.query(Asset).get(eth_asset_id)
assert original_balance == uca.get_crypto_account(asset).account.get_balance()
开发者ID:websauna,项目名称:websauna.wallet,代码行数:33,代码来源:test_confirm_withdraw.py
示例6: init_settings
def init_settings(session: Session):
set_new_setting(session, "ffmpeg_path", "ffmpeg")
set_new_setting(session, "ffmpeg_verbose", False)
set_new_setting(session, "input_directory", "/set/me")
set_new_setting(session, "output_directory", "esa_videos")
set_new_setting(session, "final_output_directory", "esa_final_videos")
set_new_setting(session, "common_keywords", "ESA")
set_new_setting(session, "input_video_extension", ".flv")
set_new_setting(session, "output_video_extension", ".mp4")
set_new_setting(session, "output_video_prefix", "esa_")
set_new_setting(session, "upload_logfile", "esa_upload.log")
set_new_setting(session, "youtube_public", False)
set_new_setting(session, "youtube_schedule", False)
set_new_setting(session, "youtube_schedule_start_ts", 0)
set_new_setting(session, "schedule_offset_hours", 12)
set_new_setting(session, "upload_to_twitch", True)
set_new_setting(session, "upload_to_youtube", True)
set_new_setting(session, "delete_after_upload", True)
set_new_setting(session, "upload_ratelimit", 2.5)
set_new_setting(session, "upload_retry_count", 3)
set_new_setting(session, "twitch_ul_pid", 0)
set_new_setting(session, "twitch_ul_create_time", 0.0)
set_new_setting(session, "youtube_ul_pid", 0)
set_new_setting(session, "youtube_ul_create_time", 0.0)
set_new_setting(session, "proc_ul_pid", 0)
set_new_setting(session, "proc_ul_create_time", 0.0)
set_new_setting(session, "init_proc_status", "")
session.commit()
开发者ID:BtbN,项目名称:esaupload,代码行数:28,代码来源:manager.py
示例7: post_process_video
def post_process_video(session: Session, debug):
vids = session.query(Video)
vids = vids.filter(Video.processing_done)
vids = vids.filter(not_(Video.post_processing_done))
vids = vids.filter(or_(and_(Video.do_twitch, Video.done_twitch), not_(Video.do_twitch)))
vids = vids.filter(or_(and_(Video.do_youtube, Video.done_youtube), not_(Video.do_youtube)))
vids = vids.order_by(Video.id.asc())
vid = vids.first()
if not vid:
print("No video in need of processing found")
return 0
out_dir = get_setting(session, "output_directory")
final_dir = get_setting(session, "final_output_directory")
out_prefix = get_setting(session, "output_video_prefix")
out_ext = get_setting(session, "output_video_extension")
out_fname = "%s%s%s" % (out_prefix, vid.id, out_ext)
out_path = os.path.join(out_dir, out_fname)
final_path = os.path.join(final_dir, out_fname)
if out_path != final_path:
shutil.move(out_path, final_path)
vid.post_processing_status = "Moved %s to %s" % (out_path, final_path)
else:
vid.post_processing_status = "Nothing to do"
vid.post_processing_done = True
session.commit()
return 0
开发者ID:BtbN,项目名称:esaupload,代码行数:32,代码来源:manager.py
示例8: TrialModelTestCase
class TrialModelTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
global transaction, connection, engine
# Connect to the database and create the schema within a transaction
engine = create_engine(TEST_DATABASE_URI)
connection = engine.connect()
transaction = connection.begin()
Trial.metadata.create_all(connection)
# Load test trials fixtures from xml files
nct_ids = ['NCT02034110', 'NCT00001160', 'NCT00001163']
cls.trials = load_sample_trials(nct_ids)
@classmethod
def tearDownClass(cls):
# Roll back the top level transaction and disconnect from the database
transaction.rollback()
connection.close()
engine.dispose()
def setUp(self):
self.__transaction = connection.begin_nested()
self.session = Session(connection)
def tearDown(self):
self.session.close()
self.__transaction.rollback()
def test_add(self):
trial = Trial(ct_dict=self.trials[0])
self.session.add(trial)
开发者ID:mlatief,项目名称:haukka,代码行数:33,代码来源:test_db_model.py
示例9: __init__
def __init__(self, db, autocommit=False, autoflush=True, **options):
#: The application that this session belongs to.
self.app = db.get_app()
bind = options.pop('bind', None) or db.engine
SessionBase.__init__(self, autocommit=autocommit, autoflush=autoflush,
bind=bind,
binds=db.get_binds(self.app), **options)
开发者ID:dppereyra,项目名称:zask,代码行数:7,代码来源:__init__.py
示例10: DBBackedController
class DBBackedController(object):
def __init__(self, engine, session=None):
if type(engine) == types.StringType:
logging.info("Using connection string '%s'" % (engine,))
new_engine = create_engine(engine, encoding='utf-8')
if "sqlite:" in engine:
logging.debug("Setting text factory for unicode compat.")
new_engine.raw_connection().connection.text_factory = str
self._engine = new_engine
else:
logging.info("Using existing engine...")
self._engine = engine
if session is None:
logging.info("Binding session...")
self._session = Session(bind=self._engine)
else:
self._session = session
logging.info("Updating metadata...")
Base.metadata.create_all(self._engine)
def commit(self):
logging.info("Commiting...")
self._session.commit()
开发者ID:Sentimentron,项目名称:sentropy,代码行数:28,代码来源:crawl.py
示例11: list_videos
def list_videos(session: Session, count):
if int(count) > 0:
qry = reversed(session.query(Video).order_by(Video.id.desc()).limit(int(count)).all())
else:
qry = session.query(Video).order_by(Video.id.asc())
for vid in qry:
print("Video %s:" % vid.id)
print("\tTitle: %s" % vid.title)
print("\tDate Added: %s" % vid.date_added)
if vid.description:
print("\tDescription:\n\t\t%s" % vid.description.replace("\n", "\n\t\t"))
else:
print("\tDescription: None")
print("\tKeywords: %s" % vid.keywords)
print("\tGame: %s" % vid.game)
print("\tStart Segment: %s" % vid.start_segment)
print("\tEnd Segment: %s" % vid.end_segment)
print("\tProcessing:")
print("\t\tDone: %s" % vid.processing_done)
print("\t\tStatus: %s" % str(vid.processing_status).replace("\n", "\n\t\t\t"))
print("\tTwitch:")
print("\t\tDo: %s" % vid.do_twitch)
print("\t\tDone: %s" % vid.done_twitch)
print("\t\tStatus: %s" % str(vid.twitch_status).replace("\n", "\n\t\t\t"))
print("\tYouTube:")
print("\t\tDo: %s" % vid.do_youtube)
print("\t\tDone: %s" % vid.done_youtube)
print("\t\tStatus: %s" % str(vid.youtube_status).replace("\n", "\n\t\t\t"))
print("\t\tPublish Date: %s" % vid.youtube_pubdate)
print("\t\tPublic: %s" % vid.youtube_public)
if vid.comment:
print("\tComment:\n\t\t%s" % vid.comment.replace("\n", "\n\t\t"))
开发者ID:BtbN,项目名称:esaupload,代码行数:33,代码来源:manager.py
示例12: cache_keywords
def cache_keywords():
core.configure_logging('debug')
from backend.db import Keyword
engine = core.get_database_engine_string()
logging.info("Using connection string '%s'" % (engine,))
engine = create_engine(engine, encoding='utf-8', isolation_level="READ UNCOMMITTED")
session = Session(bind=engine, autocommit = False)
# Estimate the number of keywords
logging.debug("Estimating number of keywords...")
for count, in session.execute("SELECT COUNT(*) FROM keywords"):
total = count
logging.debug("Establishing connection to redis...")
r = get_redis_instance(1)
logging.info("Caching %d keywords...", total)
cached = 0
for _id, word in session.execute("SELECT id, word FROM keywords"):
assert r.set(word, _id)
cached += 1
if cached % 1000 == 0:
logging.info("Cached %d keywords (%.2f%% done)", cached, 100.0*cached/total)
logging.info("Cached %d keywords (%.2f%% done)", cached, 100.0*cached/total)
开发者ID:Sentimentron,项目名称:sentropy,代码行数:25,代码来源:cli_caching.py
示例13: get_document_rows
def get_document_rows(self, keywords, domains=set([]), dmset = set([])):
# Create a new session
session = Session(bind = engine)
# Look up the article keywords
kres = KeywordIDResolutionService()
_keywords = {k : self._kres.resolve(k) for k in keywords}
resolved = 0
for k in _keywords:
if _keywords[k] is None:
yield QueryMessage("No matching keyword: %s", k)
else:
resolved += 1
if resolved == 0:
raise QueryException("No matching keywords.")
# Find the sites which talk about a particular keyword
sql = """ SELECT domains.`key`, COUNT(*) AS c from domains JOIN articles ON articles.domain_id = domains.id
JOIN documents ON documents.article_id = articles.id
JOIN keyword_adjacencies ON keyword_adjacencies.doc_id = documents.id
WHERE keyword_adjacencies.key1_id IN (:keys)
OR keyword_adjacencies.key2_id IN (:keys)
GROUP BY domains.id
ORDER BY c DESC
LIMIT 0,5
"""
for key, count in session.execute(sql, ({'keys': ','.join([str(i) for i in _keywords.values()])})):
logging.info((key, count))
domains.add(key)
return self._kd_proc.get_document_rows(keywords, domains, dmset)
开发者ID:Sentimentron,项目名称:sentropy,代码行数:31,代码来源:queue_query_processor.py
示例14: ServiceTest
class ServiceTest(unittest.TestCase):
def setup(self):
self.__transaction = connection.begin_nested()
self.session = Session(connection)
def teardown(self):
self.session.close()
self.__transaction.rollback()
开发者ID:grdaneault,项目名称:service-scoring-engine,代码行数:8,代码来源:service_test.py
示例15: delete
def delete(self, synchronize_session='evaluate'):
from rome.core.session.session import Session
temporary_session = Session()
objects = self.matching_objects(filter_deleted=False)
for obj in objects:
temporary_session.delete(obj)
temporary_session.flush()
return len(objects)
开发者ID:badock,项目名称:rome,代码行数:8,代码来源:query.py
示例16: session
def session(engine):
session = Session(engine)
yield session
session.close()
exec_sql("SELECT truncate_tables('osm_test')")
开发者ID:geometalab,项目名称:OSMNames,代码行数:8,代码来源:conftest.py
示例17: DatabaseTest
class DatabaseTest(object):
def setup(self):
self.__transaction = connection.begin_nested()
self.session = Session(connection)
def teardown(self):
self.session.close()
self.__transaction.rollback()
开发者ID:mrbitsdcf,项目名称:banco_imobiliario,代码行数:8,代码来源:test.py
示例18: update_translation_table
def update_translation_table():
from camelot.model.i18n import Translation
from sqlalchemy.orm.session import Session
t = Translation.get_by(source=source, language=language)
if not t:
t = Translation(source=source, language=language)
t.value = value
Session.object_session( t ).flush( [t] )
开发者ID:Governa,项目名称:Camelot,代码行数:8,代码来源:user_translatable_label.py
示例19: DatabaseTest
class DatabaseTest(unittest.TestCase):
def setUp(self):
self.trans = connection.begin()
self.session = Session(connection)
def tearDown(self):
self.trans.rollback()
self.session.close()
开发者ID:llazzaro,项目名称:pystock,代码行数:9,代码来源:__init__.py
示例20: create_initial_kanban_acl
def create_initial_kanban_acl(mapper, connection, target):
acl_rules = [
('role:redturtle_developer', 'view'),
]
for principal_id, permission_name in acl_rules:
acl = KanbanACL(principal=principal_id,
board_id=target.id,
permission_name=permission_name)
Session.object_session(target).add(acl)
开发者ID:getpenelope,项目名称:penelope.models,代码行数:10,代码来源:dashboard.py
注:本文中的sqlalchemy.orm.session.Session类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论