本文整理汇总了Python中sqlalchemy.testing.assertions.eq_函数的典型用法代码示例。如果您正苦于以下问题:Python eq_函数的具体用法?Python eq_怎么用?Python eq_使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了eq_函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_autocommit_isolation_level
def test_autocommit_isolation_level(self):
extensions = __import__('psycopg2.extensions').extensions
c = testing.db.connect()
c = c.execution_options(isolation_level='AUTOCOMMIT')
eq_(c.connection.connection.isolation_level,
extensions.ISOLATION_LEVEL_AUTOCOMMIT)
开发者ID:andrewwatts,项目名称:sqlalchemy,代码行数:7,代码来源:test_dialect.py
示例2: test_serial_integer
def test_serial_integer(self):
for version, type_, expected in [
(None, Integer, 'SERIAL'),
(None, BigInteger, 'BIGSERIAL'),
((9, 1), SmallInteger, 'SMALLINT'),
((9, 2), SmallInteger, 'SMALLSERIAL'),
(None, postgresql.INTEGER, 'SERIAL'),
(None, postgresql.BIGINT, 'BIGSERIAL'),
]:
m = MetaData()
t = Table('t', m, Column('c', type_, primary_key=True))
if version:
dialect = postgresql.dialect()
dialect._get_server_version_info = Mock(return_value=version)
dialect.initialize(testing.db.connect())
else:
dialect = testing.db.dialect
ddl_compiler = dialect.ddl_compiler(dialect, schema.CreateTable(t))
eq_(
ddl_compiler.get_column_specification(t.c.c),
"c %s NOT NULL" % expected
)
开发者ID:EvaSDK,项目名称:sqlalchemy,代码行数:26,代码来源:test_dialect.py
示例3: test_refresh_scalar
def test_refresh_scalar(self):
Order = self.classes.Order
s = Session()
q = s.query(Order).options(orm.nestedload(Order.customer)).\
filter(Order.id == 102)
order = q.all()
customer = order[0].customer
# expire...
s.expire(customer)
# + load again, should refresh Customer on Order
s.query(Order).options(orm.nestedload(Order.customer)).\
filter(Order.id == 102).all()
fixture_order = self._orm_fixture(orders=True).orders[1]
# avoid comparison/lazy load of 'orders' on the customer
del fixture_order.customer.__dict__['orders']
with self.assert_statement_count(0):
eq_(
fixture_order,
order[0]
)
开发者ID:computerstaat,项目名称:sql-layer-adapter-sqlalchemy,代码行数:25,代码来源:test_nested_loading.py
示例4: test_load_collection_mixed
def test_load_collection_mixed(self):
Customer = self.classes.Customer
Order = self.classes.Order
s = Session()
n = orm_nested(s.query(Order.id, Order).filter(Customer.orders))
q = s.query(Customer, n).filter(Customer.id == 1)
with self.assert_statement_count(1):
eq_(
q.all(),
[
(Customer(id=1, name='David McFarlane'),
[
(101, Order(customer_id=1, id=101,
order_info='apple related')),
(102, Order(customer_id=1, id=102,
order_info='apple related')),
(103, Order(customer_id=1, id=103,
order_info='apple related'))
]
)
]
)
开发者ID:zzzeek,项目名称:sqlalchemy_akiban,代码行数:26,代码来源:test_nested_query.py
示例5: _exotic_targets_fixture
def _exotic_targets_fixture(self, conn):
users = self.tables.users_xtra
conn.execute(
insert(users),
dict(
id=1,
name="name1",
login_email="[email protected]",
lets_index_this="not",
),
)
conn.execute(
users.insert(),
dict(
id=2,
name="name2",
login_email="[email protected]",
lets_index_this="not",
),
)
eq_(
conn.execute(users.select().where(users.c.id == 1)).fetchall(),
[(1, "name1", "[email protected]", "not")],
)
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:26,代码来源:test_on_conflict.py
示例6: test_on_conflict_do_update_special_types_in_set
def test_on_conflict_do_update_special_types_in_set(self):
bind_targets = self.tables.bind_targets
with testing.db.connect() as conn:
i = insert(bind_targets)
conn.execute(i, {"id": 1, "data": "initial data"})
eq_(
conn.scalar(sql.select([bind_targets.c.data])),
"initial data processed"
)
i = insert(bind_targets)
i = i.on_conflict_do_update(
index_elements=[bind_targets.c.id],
set_=dict(data="new updated data")
)
conn.execute(
i, {"id": 1, "data": "new inserted data"}
)
eq_(
conn.scalar(sql.select([bind_targets.c.data])),
"new updated data processed"
)
开发者ID:rlugojr,项目名称:sqlalchemy,代码行数:25,代码来源:test_on_conflict.py
示例7: test_match_across_joins
def test_match_across_joins(self):
results = matchtable.select().where(and_(cattable.c.id
== matchtable.c.category_id,
or_(cattable.c.description.match('Ruby'),
matchtable.c.title.match('nutshells'
)))).order_by(matchtable.c.id).execute().fetchall()
eq_([1, 3, 5], [r.id for r in results])
开发者ID:Affirm,项目名称:sqlalchemy,代码行数:7,代码来源:test_query.py
示例8: test_on_conflict_do_nothing_connectionless
def test_on_conflict_do_nothing_connectionless(self):
users = self.tables.users_xtra
with testing.db.connect() as conn:
result = conn.execute(
insert(users).on_conflict_do_nothing(
constraint="uq_login_email"
),
dict(name="name1", login_email="email1"),
)
eq_(result.inserted_primary_key, [1])
eq_(result.returned_defaults, (1,))
result = testing.db.execute(
insert(users).on_conflict_do_nothing(constraint="uq_login_email"),
dict(name="name2", login_email="email1"),
)
eq_(result.inserted_primary_key, None)
eq_(result.returned_defaults, None)
eq_(
testing.db.execute(
users.select().where(users.c.id == 1)
).fetchall(),
[(1, "name1", "email1", None)],
)
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:26,代码来源:test_on_conflict.py
示例9: test_update
def test_update(self):
with self.engine.connect() as conn:
conn.execute(
self.tables.data.insert(),
[
{"x": "x1", "y": "y1"},
{"x": "x2", "y": "y2"},
{"x": "x3", "y": "y3"}
]
)
conn.execute(
self.tables.data.update().
where(self.tables.data.c.x == bindparam('xval')).
values(y=bindparam('yval')),
[
{"xval": "x1", "yval": "y5"},
{"xval": "x3", "yval": "y6"}
]
)
eq_(
conn.execute(
select([self.tables.data]).
order_by(self.tables.data.c.id)).
fetchall(),
[
(1, "x1", "y5", 5),
(2, "x2", "y2", 5),
(3, "x3", "y6", 5)
]
)
开发者ID:cpcloud,项目名称:sqlalchemy,代码行数:31,代码来源:test_dialect.py
示例10: test_version_parsing
def test_version_parsing(self):
def mock_conn(res):
return Mock(
execute=Mock(return_value=Mock(scalar=Mock(return_value=res))))
for string, version in [
(
'PostgreSQL 8.3.8 on i686-redhat-linux-gnu, compiled by '
'GCC gcc (GCC) 4.1.2 20070925 (Red Hat 4.1.2-33)',
(8, 3, 8)),
(
'PostgreSQL 8.5devel on x86_64-unknown-linux-gnu, '
'compiled by GCC gcc (GCC) 4.4.2, 64-bit', (8, 5)),
(
'EnterpriseDB 9.1.2.2 on x86_64-unknown-linux-gnu, '
'compiled by gcc (GCC) 4.1.2 20080704 (Red Hat 4.1.2-50), '
'64-bit', (9, 1, 2)),
(
'[PostgreSQL 9.2.4 ] VMware vFabric Postgres 9.2.4.0 '
'release build 1080137', (9, 2, 4)),
(
'PostgreSQL 10devel on x86_64-pc-linux-gnu'
'compiled by gcc (GCC) 6.3.1 20170306, 64-bit', (10,))]:
eq_(testing.db.dialect._get_server_version_info(mock_conn(string)),
version)
开发者ID:biroc,项目名称:sqlalchemy,代码行数:26,代码来源:test_dialect.py
示例11: test_on_conflict_do_update_exotic_targets_four
def test_on_conflict_do_update_exotic_targets_four(self):
users = self.tables.users_xtra
with testing.db.connect() as conn:
self._exotic_targets_fixture(conn)
# try unique constraint by name: cause an
# upsert on target login_email, not id
i = insert(users)
i = i.on_conflict_do_update(
constraint=self.unique_constraint.name,
set_=dict(
id=i.excluded.id, name=i.excluded.name,
login_email=i.excluded.login_email)
)
# note: lets_index_this value totally ignored in SET clause.
conn.execute(i, dict(
id=43, name='nameunique2',
login_email='[email protected]', lets_index_this='unique')
)
eq_(
conn.execute(
users.select().
where(users.c.login_email == '[email protected]')
).fetchall(),
[(43, 'nameunique2', '[email protected]', 'not')]
)
开发者ID:FluxIX,项目名称:sqlalchemy,代码行数:28,代码来源:test_on_conflict.py
示例12: test_index_reflection_with_access_method
def test_index_reflection_with_access_method(self):
"""reflect indexes with storage options set"""
metadata = self.metadata
Table(
"t",
metadata,
Column("id", Integer, primary_key=True),
Column("x", ARRAY(Integer)),
)
metadata.create_all()
with testing.db.connect().execution_options(autocommit=True) as conn:
conn.execute("CREATE INDEX idx1 ON t USING gin (x)")
ind = testing.db.dialect.get_indexes(conn, "t", None)
eq_(
ind,
[
{
"unique": False,
"column_names": ["x"],
"name": "idx1",
"dialect_options": {"postgresql_using": "gin"},
}
],
)
m = MetaData()
t1 = Table("t", m, autoload_with=conn)
eq_(
list(t1.indexes)[0].dialect_options["postgresql"]["using"],
"gin",
)
开发者ID:vrajmohan,项目名称:sqlalchemy,代码行数:33,代码来源:test_reflection.py
示例13: test_cross_schema_reflection_metadata_uses_schema
def test_cross_schema_reflection_metadata_uses_schema(self):
# test [ticket:3716]
metadata = self.metadata
Table(
"some_table",
metadata,
Column("id", Integer, primary_key=True),
Column("sid", Integer, ForeignKey("some_other_table.id")),
schema="test_schema",
)
Table(
"some_other_table",
metadata,
Column("id", Integer, primary_key=True),
schema=None,
)
metadata.create_all()
with testing.db.connect() as conn:
meta2 = MetaData(conn, schema="test_schema")
meta2.reflect()
eq_(
set(meta2.tables),
set(["some_other_table", "test_schema.some_table"]),
)
开发者ID:vrajmohan,项目名称:sqlalchemy,代码行数:27,代码来源:test_reflection.py
示例14: test_reflect_check_constraint
def test_reflect_check_constraint(self):
meta = self.metadata
cc_table = Table(
"pgsql_cc",
meta,
Column("a", Integer()),
CheckConstraint("a > 1 AND a < 5", name="cc1"),
CheckConstraint("a = 1 OR (a > 2 AND a < 5)", name="cc2"),
)
cc_table.create()
reflected = Table("pgsql_cc", MetaData(testing.db), autoload=True)
check_constraints = dict(
(uc.name, uc.sqltext.text)
for uc in reflected.constraints
if isinstance(uc, CheckConstraint)
)
eq_(
check_constraints,
{
u"cc1": u"(a > 1) AND (a < 5)",
u"cc2": u"(a = 1) OR ((a > 2) AND (a < 5))",
},
)
开发者ID:vrajmohan,项目名称:sqlalchemy,代码行数:28,代码来源:test_reflection.py
示例15: test_tuple_containment
def test_tuple_containment(self):
for test, exp in [
([('a', 'b')], True),
([('a', 'c')], False),
([('f', 'q'), ('a', 'b')], True),
([('f', 'q'), ('a', 'c')], False)
]:
eq_(
testing.db.execute(
select([
tuple_(
literal_column("'a'"),
literal_column("'b'")
).\
in_([
tuple_(*[
literal_column("'%s'" % letter)
for letter in elem
]) for elem in test
])
])
).scalar(),
exp
)
开发者ID:Affirm,项目名称:sqlalchemy,代码行数:25,代码来源:test_query.py
示例16: test_reflection_with_exclude_constraint
def test_reflection_with_exclude_constraint(self):
m = self.metadata
Table(
't', m,
Column('id', Integer, primary_key=True),
Column('period', TSRANGE),
ExcludeConstraint(('period', '&&'), name='quarters_period_excl')
)
m.create_all()
insp = inspect(testing.db)
# PostgreSQL will create an implicit index for an exclude constraint.
# we don't reflect the EXCLUDE yet.
eq_(
insp.get_indexes('t'),
[{'unique': False, 'name': 'quarters_period_excl',
'duplicates_constraint': 'quarters_period_excl',
'dialect_options': {'postgresql_using': 'gist'},
'column_names': ['period']}]
)
# reflection corrects for the dupe
reflected = Table('t', MetaData(testing.db), autoload=True)
eq_(set(reflected.indexes), set())
开发者ID:cpcloud,项目名称:sqlalchemy,代码行数:27,代码来源:test_reflection.py
示例17: _test
def _test(self, expr, field="all", overrides=None):
t = self.tables.t
if field == "all":
fields = {"year": 2012, "month": 5, "day": 10,
"epoch": 1336652125.0,
"hour": 12, "minute": 15}
elif field == "time":
fields = {"hour": 12, "minute": 15, "second": 25}
elif field == 'date':
fields = {"year": 2012, "month": 5, "day": 10}
elif field == 'all+tz':
fields = {"year": 2012, "month": 5, "day": 10,
"epoch": 1336637725.0,
"hour": 8,
"timezone": 0
}
else:
fields = field
if overrides:
fields.update(overrides)
for field in fields:
result = testing.db.scalar(
select([extract(field, expr)]).select_from(t))
eq_(result, fields[field])
开发者ID:Affirm,项目名称:sqlalchemy,代码行数:27,代码来源:test_query.py
示例18: test_on_conflict_do_nothing_target
def test_on_conflict_do_nothing_target(self):
users = self.tables.users
with testing.db.connect() as conn:
result = conn.execute(
insert(users).on_conflict_do_nothing(
index_elements=users.primary_key.columns
),
dict(id=1, name="name1"),
)
eq_(result.inserted_primary_key, [1])
eq_(result.returned_defaults, None)
result = conn.execute(
insert(users).on_conflict_do_nothing(
index_elements=users.primary_key.columns
),
dict(id=1, name="name2"),
)
eq_(result.inserted_primary_key, [1])
eq_(result.returned_defaults, None)
eq_(
conn.execute(users.select().where(users.c.id == 1)).fetchall(),
[(1, "name1")],
)
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:26,代码来源:test_on_conflict.py
示例19: test_nested_type_trans
def test_nested_type_trans(self):
customer = self.tables.customer
order = self.tables.order
item = self.tables.item
class SpecialType(TypeDecorator):
impl = Integer
def process_result_value(self, value, dialect):
return str(value) + "_processed"
sub_sub_stmt = nested(select([type_coerce(item.c.price, SpecialType)]).\
where(item.c.order_id ==
order.c.id)).label('i')
sub_stmt = nested(select([sub_sub_stmt]).where(order.c.customer_id ==
customer.c.id)).label('o')
stmt = select([sub_stmt]).where(customer.c.id == 1)
r = config.db.execute(stmt)
row = r.fetchone()
sub_result = row['o']
sub_sub_result = sub_result.fetchone()['i']
eq_(
list(sub_sub_result),
[('9.99_processed',), ('19.99_processed',)]
)
开发者ID:computerstaat,项目名称:sql-layer-adapter-sqlalchemy,代码行数:25,代码来源:test_nested_cursor.py
示例20: test_serial_integer
def test_serial_integer(self):
class BITD(TypeDecorator):
impl = Integer
def load_dialect_impl(self, dialect):
if dialect.name == "postgresql":
return BigInteger()
else:
return Integer()
for version, type_, expected in [
(None, Integer, "SERIAL"),
(None, BigInteger, "BIGSERIAL"),
((9, 1), SmallInteger, "SMALLINT"),
((9, 2), SmallInteger, "SMALLSERIAL"),
(None, postgresql.INTEGER, "SERIAL"),
(None, postgresql.BIGINT, "BIGSERIAL"),
(None, Integer().with_variant(BigInteger(), "postgresql"), "BIGSERIAL"),
(None, Integer().with_variant(postgresql.BIGINT, "postgresql"), "BIGSERIAL"),
((9, 2), Integer().with_variant(SmallInteger, "postgresql"), "SMALLSERIAL"),
(None, BITD(), "BIGSERIAL"),
]:
m = MetaData()
t = Table("t", m, Column("c", type_, primary_key=True))
if version:
dialect = postgresql.dialect()
dialect._get_server_version_info = Mock(return_value=version)
dialect.initialize(testing.db.connect())
else:
dialect = testing.db.dialect
ddl_compiler = dialect.ddl_compiler(dialect, schema.CreateTable(t))
eq_(ddl_compiler.get_column_specification(t.c.c), "c %s NOT NULL" % expected)
开发者ID:fuzzball81,项目名称:sqlalchemy,代码行数:35,代码来源:test_dialect.py
注:本文中的sqlalchemy.testing.assertions.eq_函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论