本文整理汇总了Python中sqlalchemy.testing.engines.testing_engine函数的典型用法代码示例。如果您正苦于以下问题:Python testing_engine函数的具体用法?Python testing_engine怎么用?Python testing_engine使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了testing_engine函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_foreignkey_missing_insert
def test_foreignkey_missing_insert(self):
Table("t1", self.metadata, Column("id", Integer, primary_key=True))
t2 = Table(
"t2",
self.metadata,
Column("id", Integer, ForeignKey("t1.id"), primary_key=True),
)
self.metadata.create_all()
# want to ensure that "null value in column "id" violates not-
# null constraint" is raised (IntegrityError on psycoopg2, but
# ProgrammingError on pg8000), and not "ProgrammingError:
# (ProgrammingError) relationship "t2_id_seq" does not exist".
# the latter corresponds to autoincrement behavior, which is not
# the case here due to the foreign key.
for eng in [
engines.testing_engine(options={"implicit_returning": False}),
engines.testing_engine(options={"implicit_returning": True}),
]:
with expect_warnings(
".*has no Python-side or server-side default.*"
):
assert_raises(
(exc.IntegrityError, exc.ProgrammingError),
eng.execute,
t2.insert(),
)
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:28,代码来源:test_query.py
示例2: _init_dbs
def _init_dbs(self):
db1 = testing_engine("sqlite:///shard1.db", options=dict(pool_threadlocal=True))
db2 = testing_engine("sqlite:///shard2.db")
db3 = testing_engine("sqlite:///shard3.db")
db4 = testing_engine("sqlite:///shard4.db")
return db1, db2, db3, db4
开发者ID:GitHublong,项目名称:sqlalchemy,代码行数:7,代码来源:test_horizontal_shard.py
示例3: test_rowcount_flag
def test_rowcount_flag(self):
metadata = self.metadata
engine = engines.testing_engine(
options={'enable_rowcount': True})
assert engine.dialect.supports_sane_rowcount
metadata.bind = engine
t = Table('t1', metadata, Column('data', String(10)))
metadata.create_all()
r = t.insert().execute({'data': 'd1'}, {'data': 'd2'}, {'data'
: 'd3'})
r = t.update().where(t.c.data == 'd2').values(data='d3'
).execute()
eq_(r.rowcount, 1)
r = t.delete().where(t.c.data == 'd3').execute()
eq_(r.rowcount, 2)
r = \
t.delete().execution_options(enable_rowcount=False).execute()
eq_(r.rowcount, -1)
engine.dispose()
engine = engines.testing_engine(options={'enable_rowcount'
: False})
assert not engine.dialect.supports_sane_rowcount
metadata.bind = engine
r = t.insert().execute({'data': 'd1'}, {'data': 'd2'}, {'data'
: 'd3'})
r = t.update().where(t.c.data == 'd2').values(data='d3'
).execute()
eq_(r.rowcount, -1)
r = t.delete().where(t.c.data == 'd3').execute()
eq_(r.rowcount, -1)
r = t.delete().execution_options(enable_rowcount=True).execute()
eq_(r.rowcount, 1)
r.close()
engine.dispose()
开发者ID:Attsun1031,项目名称:sqlalchemy,代码行数:34,代码来源:test_firebird.py
示例4: test_foreignkey_missing_insert
def test_foreignkey_missing_insert(self):
t1 = Table('t1', metadata, Column('id', Integer,
primary_key=True))
t2 = Table(
't2',
metadata,
Column(
'id',
Integer,
ForeignKey('t1.id'),
primary_key=True))
metadata.create_all()
# want to ensure that "null value in column "id" violates not-
# null constraint" is raised (IntegrityError on psycoopg2, but
# ProgrammingError on pg8000), and not "ProgrammingError:
# (ProgrammingError) relationship "t2_id_seq" does not exist".
# the latter corresponds to autoincrement behavior, which is not
# the case here due to the foreign key.
for eng in [
engines.testing_engine(options={'implicit_returning': False}),
engines.testing_engine(options={'implicit_returning': True})
]:
assert_raises_message(exc.DBAPIError,
'violates not-null constraint',
eng.execute, t2.insert())
开发者ID:Attsun1031,项目名称:sqlalchemy,代码行数:27,代码来源:test_query.py
示例5: test_rowcount_flag
def test_rowcount_flag(self):
metadata = self.metadata
engine = engines.testing_engine(options={"enable_rowcount": True})
assert engine.dialect.supports_sane_rowcount
metadata.bind = engine
t = Table("t1", metadata, Column("data", String(10)))
metadata.create_all()
r = t.insert().execute({"data": "d1"}, {"data": "d2"}, {"data": "d3"})
r = t.update().where(t.c.data == "d2").values(data="d3").execute()
eq_(r.rowcount, 1)
r = t.delete().where(t.c.data == "d3").execute()
eq_(r.rowcount, 2)
r = t.delete().execution_options(enable_rowcount=False).execute()
eq_(r.rowcount, -1)
engine.dispose()
engine = engines.testing_engine(options={"enable_rowcount": False})
assert not engine.dialect.supports_sane_rowcount
metadata.bind = engine
r = t.insert().execute({"data": "d1"}, {"data": "d2"}, {"data": "d3"})
r = t.update().where(t.c.data == "d2").values(data="d3").execute()
eq_(r.rowcount, -1)
r = t.delete().where(t.c.data == "d3").execute()
eq_(r.rowcount, -1)
r = t.delete().execution_options(enable_rowcount=True).execute()
eq_(r.rowcount, 1)
r.close()
engine.dispose()
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:27,代码来源:test_firebird.py
示例6: test_bind_arguments
def test_bind_arguments(self):
users, Address, addresses, User = (self.tables.users,
self.classes.Address,
self.tables.addresses,
self.classes.User)
mapper(User, users)
mapper(Address, addresses)
e1 = engines.testing_engine()
e2 = engines.testing_engine()
e3 = engines.testing_engine()
sess = Session(e3)
sess.bind_mapper(User, e1)
sess.bind_mapper(Address, e2)
assert sess.connection().engine is e3
assert sess.connection(bind=e1).engine is e1
assert sess.connection(mapper=Address, bind=e1).engine is e1
assert sess.connection(mapper=Address).engine is e2
assert sess.connection(clause=addresses.select()).engine is e2
assert sess.connection(mapper=User,
clause=addresses.select()).engine is e1
assert sess.connection(mapper=User,
clause=addresses.select(),
bind=e2).engine is e2
sess.close()
开发者ID:Attsun1031,项目名称:sqlalchemy,代码行数:29,代码来源:test_bind.py
示例7: test_engine_param_stays
def test_engine_param_stays(self):
eng = testing_engine()
isolation_level = eng.dialect.get_isolation_level(
eng.connect().connection)
level = self._non_default_isolation_level()
ne_(isolation_level, level)
eng = testing_engine(options=dict(isolation_level=level))
eq_(
eng.dialect.get_isolation_level(
eng.connect().connection),
level
)
# check that it stays
conn = eng.connect()
eq_(
eng.dialect.get_isolation_level(conn.connection),
level
)
conn.close()
conn = eng.connect()
eq_(
eng.dialect.get_isolation_level(conn.connection),
level
)
conn.close()
开发者ID:SvenDowideit,项目名称:clearlinux,代码行数:30,代码来源:test_transaction.py
示例8: _test_lastrow_accessor
def _test_lastrow_accessor(self, table_, values, assertvalues):
"""Tests the inserted_primary_key and lastrow_has_id() functions."""
def insert_values(engine, table_, values):
"""
Inserts a row into a table, returns the full list of values
INSERTed including defaults that fired off on the DB side and
detects rows that had defaults and post-fetches.
"""
# verify implicit_returning is working
if engine.dialect.implicit_returning:
ins = table_.insert()
comp = ins.compile(engine, column_keys=list(values))
if not set(values).issuperset(
c.key for c in table_.primary_key
):
is_(bool(comp.returning), True)
result = engine.execute(table_.insert(), **values)
ret = values.copy()
for col, id_ in zip(
table_.primary_key, result.inserted_primary_key
):
ret[col.key] = id_
if result.lastrow_has_defaults():
criterion = and_(
*[
col == id_
for col, id_ in zip(
table_.primary_key, result.inserted_primary_key
)
]
)
row = engine.execute(table_.select(criterion)).first()
for c in table_.c:
ret[c.key] = row[c]
return ret
if testing.against("firebird", "postgresql", "oracle", "mssql"):
assert testing.db.dialect.implicit_returning
if testing.db.dialect.implicit_returning:
test_engines = [
engines.testing_engine(options={"implicit_returning": False}),
engines.testing_engine(options={"implicit_returning": True}),
]
else:
test_engines = [testing.db]
for engine in test_engines:
try:
table_.create(bind=engine, checkfirst=True)
i = insert_values(engine, table_, values)
eq_(i, assertvalues)
finally:
table_.drop(bind=engine)
开发者ID:vrajmohan,项目名称:sqlalchemy,代码行数:59,代码来源:test_insert_exec.py
示例9: _init_dbs
def _init_dbs(self):
db1 = testing_engine(
'sqlite:///shard1_%s.db' % provision.FOLLOWER_IDENT,
options=dict(pool_threadlocal=True))
db2 = testing_engine(
'sqlite:///shard2_%s.db' % provision.FOLLOWER_IDENT)
db3 = testing_engine(
'sqlite:///shard3_%s.db' % provision.FOLLOWER_IDENT)
db4 = testing_engine(
'sqlite:///shard4_%s.db' % provision.FOLLOWER_IDENT)
self.dbs = [db1, db2, db3, db4]
return self.dbs
开发者ID:m32,项目名称:sqlalchemy,代码行数:13,代码来源:test_horizontal_shard.py
示例10: _init_dbs
def _init_dbs(self):
self.db1 = db1 = testing_engine(
"sqlite:///shard1_%s.db" % provision.FOLLOWER_IDENT
)
self.db2 = db2 = testing_engine(
"sqlite:///shard2_%s.db" % provision.FOLLOWER_IDENT
)
for db in (db1, db2):
self.metadata.create_all(db)
self.dbs = [db1, db2]
return self.dbs
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:14,代码来源:test_horizontal_shard.py
示例11: test_special_encodings
def test_special_encodings(self):
for enc in ['utf8mb4', 'utf8']:
eng = engines.testing_engine(
options={"connect_args": {'charset': enc, 'use_unicode': 0}})
conn = eng.connect()
eq_(conn.dialect._connection_charset, enc)
开发者ID:anti-social,项目名称:sqlalchemy,代码行数:7,代码来源:test_dialect.py
示例12: go
def go():
engine = engines.testing_engine(
options={'logging_name': 'FOO',
'pool_logging_name': 'BAR',
'use_reaper': False}
)
sess = create_session(bind=engine)
a1 = A(col2="a1")
a2 = A(col2="a2")
a3 = A(col2="a3")
a1.bs.append(B(col2="b1"))
a1.bs.append(B(col2="b2"))
a3.bs.append(B(col2="b3"))
for x in [a1, a2, a3]:
sess.add(x)
sess.flush()
sess.expunge_all()
alist = sess.query(A).all()
eq_(
[
A(col2="a1", bs=[B(col2="b1"), B(col2="b2")]),
A(col2="a2", bs=[]),
A(col2="a3", bs=[B(col2="b3")])
],
alist)
for a in alist:
sess.delete(a)
sess.flush()
sess.close()
engine.dispose()
开发者ID:Attsun1031,项目名称:sqlalchemy,代码行数:33,代码来源:test_memusage.py
示例13: test_per_connection
def test_per_connection(self):
from sqlalchemy.pool import QueuePool
eng = testing_engine(options=dict(
poolclass=QueuePool,
pool_size=2, max_overflow=0))
c1 = eng.connect()
c1 = c1.execution_options(
isolation_level=self._non_default_isolation_level()
)
c2 = eng.connect()
eq_(
eng.dialect.get_isolation_level(c1.connection),
self._non_default_isolation_level()
)
eq_(
eng.dialect.get_isolation_level(c2.connection),
self._default_isolation_level()
)
c1.close()
c2.close()
c3 = eng.connect()
eq_(
eng.dialect.get_isolation_level(c3.connection),
self._default_isolation_level()
)
c4 = eng.connect()
eq_(
eng.dialect.get_isolation_level(c4.connection),
self._default_isolation_level()
)
c3.close()
c4.close()
开发者ID:SvenDowideit,项目名称:clearlinux,代码行数:34,代码来源:test_transaction.py
示例14: test_reset_rollback_two_phase_no_rollback
def test_reset_rollback_two_phase_no_rollback(self):
# test [ticket:2907], essentially that the
# TwoPhaseTransaction is given the job of "reset on return"
# so that picky backends like MySQL correctly clear out
# their state when a connection is closed without handling
# the transaction explicitly.
eng = testing_engine()
# MySQL raises if you call straight rollback() on
# a connection with an XID present
@event.listens_for(eng, "invalidate")
def conn_invalidated(dbapi_con, con_record, exception):
dbapi_con.close()
raise exception
with eng.connect() as conn:
rec = conn.connection._connection_record
raw_dbapi_con = rec.connection
xa = conn.begin_twophase()
conn.execute(users.insert(), user_id=1, user_name='user1')
assert rec.connection is raw_dbapi_con
with eng.connect() as conn:
result = \
conn.execute(select([users.c.user_name]).
order_by(users.c.user_id))
eq_(result.fetchall(), [])
开发者ID:SvenDowideit,项目名称:clearlinux,代码行数:29,代码来源:test_transaction.py
示例15: test_lobs_without_convert_many_rows
def test_lobs_without_convert_many_rows(self):
engine = testing_engine(
options=dict(auto_convert_lobs=False, arraysize=1))
result = engine.execute(
"select id, data, bindata from z_test order by id")
results = result.fetchall()
def go():
eq_(
[
dict(
id=row["id"],
data=row["data"].read(),
bindata=row["bindata"].read()
) for row in results
],
self.data)
# this comes from cx_Oracle because these are raw
# cx_Oracle.Variable objects
if testing.requires.oracle5x.enabled:
assert_raises_message(
testing.db.dialect.dbapi.ProgrammingError,
"LOB variable no longer valid after subsequent fetch",
go
)
else:
go()
开发者ID:eoghanmurray,项目名称:sqlalchemy,代码行数:27,代码来源:test_types.py
示例16: test_native_datetime
def test_native_datetime(self):
dbapi = testing.db.dialect.dbapi
connect_args = {
'detect_types': dbapi.PARSE_DECLTYPES | dbapi.PARSE_COLNAMES}
engine = engines.testing_engine(
options={'connect_args': connect_args, 'native_datetime': True})
t = Table(
'datetest', MetaData(),
Column('id', Integer, primary_key=True),
Column('d1', Date), Column('d2', sqltypes.TIMESTAMP))
t.create(engine)
try:
engine.execute(t.insert(), {
'd1': datetime.date(2010, 5, 10),
'd2': datetime.datetime(2010, 5, 10, 12, 15, 25)
})
row = engine.execute(t.select()).first()
eq_(
row,
(1, datetime.date(2010, 5, 10),
datetime.datetime(2010, 5, 10, 12, 15, 25)))
r = engine.execute(func.current_date()).scalar()
assert isinstance(r, util.string_types)
finally:
t.drop(engine)
engine.dispose()
开发者ID:Attsun1031,项目名称:sqlalchemy,代码行数:26,代码来源:test_sqlite.py
示例17: _assert_data_with_sequence_returning
def _assert_data_with_sequence_returning(self, table, seqname):
engine = engines.testing_engine(options={"implicit_returning": True})
with self.sql_execution_asserter(engine) as asserter:
with engine.connect() as conn:
conn.execute(table.insert(), {"id": 30, "data": "d1"})
conn.execute(table.insert(), {"data": "d2"})
conn.execute(table.insert(), {"id": 31, "data": "d3"}, {"id": 32, "data": "d4"})
conn.execute(table.insert(), {"data": "d5"}, {"data": "d6"})
conn.execute(table.insert(inline=True), {"id": 33, "data": "d7"})
conn.execute(table.insert(inline=True), {"data": "d8"})
asserter.assert_(
DialectSQL("INSERT INTO testtable (id, data) VALUES (:id, :data)", {"id": 30, "data": "d1"}),
DialectSQL(
"INSERT INTO testtable (id, data) VALUES " "(nextval('my_seq'), :data) RETURNING testtable.id",
{"data": "d2"},
),
DialectSQL(
"INSERT INTO testtable (id, data) VALUES (:id, :data)",
[{"id": 31, "data": "d3"}, {"id": 32, "data": "d4"}],
),
DialectSQL(
"INSERT INTO testtable (id, data) VALUES (nextval('%s'), " ":data)" % seqname,
[{"data": "d5"}, {"data": "d6"}],
),
DialectSQL("INSERT INTO testtable (id, data) VALUES (:id, :data)", [{"id": 33, "data": "d7"}]),
DialectSQL("INSERT INTO testtable (id, data) VALUES (nextval('%s'), " ":data)" % seqname, [{"data": "d8"}]),
)
with engine.connect() as conn:
eq_(
conn.execute(table.select()).fetchall(),
[(30, "d1"), (1, "d2"), (31, "d3"), (32, "d4"), (2, "d5"), (3, "d6"), (33, "d7"), (4, "d8")],
)
开发者ID:FluxIX,项目名称:sqlalchemy,代码行数:35,代码来源:test_query.py
示例18: test_coerce_to_unicode
def test_coerce_to_unicode(self):
engine = testing_engine(options=dict(coerce_to_unicode=True))
value = engine.scalar("SELECT 'hello' FROM DUAL")
assert isinstance(value, util.text_type)
value = testing.db.scalar("SELECT 'hello' FROM DUAL")
assert isinstance(value, util.binary_type)
开发者ID:eoghanmurray,项目名称:sqlalchemy,代码行数:7,代码来源:test_types.py
示例19: test_flag_on
def test_flag_on(self):
t = Table(
"t",
self.metadata,
Column("id", Integer, primary_key=True),
Column("data", String(50)),
)
t.create()
eng = engines.testing_engine(options={"fast_executemany": True})
@event.listens_for(eng, "after_cursor_execute")
def after_cursor_execute(
conn, cursor, statement, parameters, context, executemany
):
if executemany:
assert cursor.fast_executemany
with eng.connect() as conn:
conn.execute(
t.insert(),
[{"id": i, "data": "data_%d" % i} for i in range(100)],
)
conn.execute(t.insert(), {"id": 200, "data": "data_200"})
开发者ID:monetate,项目名称:sqlalchemy,代码行数:25,代码来源:test_engine.py
示例20: test_row_case_sensitive_unoptimized
def test_row_case_sensitive_unoptimized(self):
ins_db = engines.testing_engine(options={"case_sensitive": True})
row = ins_db.execute(
select([
literal_column("1").label("case_insensitive"),
literal_column("2").label("CaseSensitive"),
text("3 AS screw_up_the_cols")
])
).first()
eq_(
list(row.keys()),
["case_insensitive", "CaseSensitive", "screw_up_the_cols"])
in_("case_insensitive", row._keymap)
in_("CaseSensitive", row._keymap)
not_in_("casesensitive", row._keymap)
eq_(row["case_insensitive"], 1)
eq_(row["CaseSensitive"], 2)
eq_(row["screw_up_the_cols"], 3)
assert_raises(KeyError, lambda: row["Case_insensitive"])
assert_raises(KeyError, lambda: row["casesensitive"])
assert_raises(KeyError, lambda: row["screw_UP_the_cols"])
开发者ID:mattastica,项目名称:sqlalchemy,代码行数:25,代码来源:test_resultset.py
注:本文中的sqlalchemy.testing.engines.testing_engine函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论