本文整理汇总了Python中sqlalchemy.testing.mock.call函数的典型用法代码示例。如果您正苦于以下问题:Python call函数的具体用法?Python call怎么用?Python call使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了call函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_parent_instance_child_class_apply_after
def test_parent_instance_child_class_apply_after(self):
l1 = Mock()
l2 = Mock()
event.listen(self.TargetElement, "event_one", l2)
factory = self.TargetFactory()
element = factory.create()
element.run_event(1)
event.listen(factory, "event_one", l1)
element.run_event(2)
element.run_event(3)
# if _JoinedListener fixes .listeners
# at construction time, then we don't get
# the new listeners.
#eq_(l1.mock_calls, [])
# alternatively, if _JoinedListener shares the list
# using a @property, then we get them, at the arguable
# expense of the extra method call to access the .listeners
# collection
eq_(
l1.mock_calls, [call(element, 2), call(element, 3)]
)
eq_(
l2.mock_calls,
[call(element, 1), call(element, 2), call(element, 3)]
)
开发者ID:Callek,项目名称:sqlalchemy,代码行数:33,代码来源:test_events.py
示例2: test_instance
def test_instance(self):
Target = self._fixture()
class Foo(object):
def __init__(self):
self.mock = Mock()
def evt(self, arg):
self.mock(arg)
f1 = Foo()
f2 = Foo()
event.listen(Target, "event_one", f1.evt)
event.listen(Target, "event_one", f2.evt)
t1 = Target()
t1.dispatch.event_one("x")
event.remove(Target, "event_one", f1.evt)
t1.dispatch.event_one("y")
eq_(f1.mock.mock_calls, [call("x")])
eq_(f2.mock.mock_calls, [call("x"), call("y")])
开发者ID:Callek,项目名称:sqlalchemy,代码行数:25,代码来源:test_events.py
示例3: test_scalar
def test_scalar(self):
users = self.tables.users
canary = Mock()
class User(fixtures.ComparableEntity):
@validates('name')
def validate_name(self, key, name):
canary(key, name)
ne_(name, 'fred')
return name + ' modified'
mapper(User, users)
sess = Session()
u1 = User(name='ed')
eq_(u1.name, 'ed modified')
assert_raises(AssertionError, setattr, u1, "name", "fred")
eq_(u1.name, 'ed modified')
eq_(canary.mock_calls, [call('name', 'ed'), call('name', 'fred')])
sess.add(u1)
sess.commit()
eq_(
sess.query(User).filter_by(name='ed modified').one(),
User(name='ed')
)
开发者ID:robin900,项目名称:sqlalchemy,代码行数:26,代码来源:test_validators.py
示例4: test_conn_reusable
def test_conn_reusable(self):
conn = self.db.connect()
conn.execute(select([1]))
eq_(
self.dbapi.connect.mock_calls,
[self.mock_connect]
)
self.dbapi.shutdown()
assert_raises(
tsa.exc.DBAPIError,
conn.execute, select([1])
)
assert not conn.closed
assert conn.invalidated
eq_(
[c.close.mock_calls for c in self.dbapi.connections],
[[call()]]
)
# test reconnects
conn.execute(select([1]))
assert not conn.invalidated
eq_(
[c.close.mock_calls for c in self.dbapi.connections],
[[call()], []]
)
开发者ID:Attsun1031,项目名称:sqlalchemy,代码行数:33,代码来源:test_reconnect.py
示例5: test_collection
def test_collection(self):
users, addresses, Address = (self.tables.users,
self.tables.addresses,
self.classes.Address)
canary = Mock()
class User(fixtures.ComparableEntity):
@validates('addresses')
def validate_address(self, key, ad):
canary(key, ad)
assert '@' in ad.email_address
return ad
mapper(User, users, properties={
'addresses': relationship(Address)}
)
mapper(Address, addresses)
sess = Session()
u1 = User(name='edward')
a0 = Address(email_address='noemail')
assert_raises(AssertionError, u1.addresses.append, a0)
a1 = Address(id=15, email_address='[email protected]')
u1.addresses.append(a1)
eq_(canary.mock_calls, [call('addresses', a0), call('addresses', a1)])
sess.add(u1)
sess.commit()
eq_(
sess.query(User).filter_by(name='edward').one(),
User(name='edward', addresses=[Address(email_address='[email protected]')])
)
开发者ID:robin900,项目名称:sqlalchemy,代码行数:31,代码来源:test_validators.py
示例6: test_deferred_map_event_subclass_post_mapping_propagate_two
def test_deferred_map_event_subclass_post_mapping_propagate_two(self):
"""
1. map only subclass of class
2. mapper event listen on class, w propagate
3. event fire should receive event
"""
users, User = (self.tables.users,
self.classes.User)
class SubUser(User):
pass
class SubSubUser(SubUser):
pass
m = mapper(SubUser, users)
canary = Mock()
event.listen(User, "before_insert", canary, propagate=True, raw=True)
m2 = mapper(SubSubUser, users)
m.dispatch.before_insert(5, 6, 7)
eq_(canary.mock_calls, [call(5, 6, 7)])
m2.dispatch.before_insert(8, 9, 10)
eq_(canary.mock_calls, [call(5, 6, 7), call(8, 9, 10)])
开发者ID:malor,项目名称:sqlalchemy,代码行数:28,代码来源:test_events.py
示例7: test_parent_instance_child_class_apply_after
def test_parent_instance_child_class_apply_after(self):
l1 = Mock()
l2 = Mock()
event.listen(self.TargetElement, "event_one", l2)
factory = self.TargetFactory()
element = factory.create()
element.run_event(1)
event.listen(factory, "event_one", l1)
element.run_event(2)
element.run_event(3)
# c1 gets no events due to _JoinedListener
# fixing the "parent" at construction time.
# this can be changed to be "live" at the cost
# of performance.
eq_(
l1.mock_calls, []
)
eq_(
l2.mock_calls,
[call(element, 1), call(element, 2), call(element, 3)]
)
开发者ID:domenkozar,项目名称:sqlalchemy,代码行数:27,代码来源:test_events.py
示例8: test_propagate
def test_propagate(self):
Target = self._fixture()
m1 = Mock()
t1 = Target()
t2 = Target()
event.listen(t1, "event_one", m1, propagate=True)
event.listen(t1, "event_two", m1, propagate=False)
t2.dispatch._update(t1.dispatch)
t1.dispatch.event_one("t1e1x")
t1.dispatch.event_two("t1e2x")
t2.dispatch.event_one("t2e1x")
t2.dispatch.event_two("t2e2x")
event.remove(t1, "event_one", m1)
event.remove(t1, "event_two", m1)
t1.dispatch.event_one("t1e1y")
t1.dispatch.event_two("t1e2y")
t2.dispatch.event_one("t2e1y")
t2.dispatch.event_two("t2e2y")
eq_(m1.mock_calls,
[call('t1e1x'), call('t1e2x'),
call('t2e1x')])
开发者ID:Callek,项目名称:sqlalchemy,代码行数:29,代码来源:test_events.py
示例9: test_invalidate_trans
def test_invalidate_trans(self):
conn = self.db.connect()
trans = conn.begin()
self.dbapi.shutdown()
assert_raises(tsa.exc.DBAPIError, conn.execute, select([1]))
eq_([c.close.mock_calls for c in self.dbapi.connections], [[call()]])
assert not conn.closed
assert conn.invalidated
assert trans.is_active
assert_raises_message(
tsa.exc.StatementError,
"Can't reconnect until invalid transaction is rolled back",
conn.execute,
select([1]),
)
assert trans.is_active
assert_raises_message(
tsa.exc.InvalidRequestError, "Can't reconnect until invalid transaction is rolled back", trans.commit
)
assert trans.is_active
trans.rollback()
assert not trans.is_active
conn.execute(select([1]))
assert not conn.invalidated
eq_([c.close.mock_calls for c in self.dbapi.connections], [[call()], []])
开发者ID:t3573393,项目名称:sqlalchemy,代码行数:29,代码来源:test_reconnect.py
示例10: test_reconnect
def test_reconnect(self):
"""test that an 'is_disconnect' condition will invalidate the
connection, and additionally dispose the previous connection
pool and recreate."""
db_pool = self.db.pool
# make a connection
conn = self.db.connect()
# connection works
conn.execute(select([1]))
# create a second connection within the pool, which we'll ensure
# also goes away
conn2 = self.db.connect()
conn2.close()
# two connections opened total now
assert len(self.dbapi.connections) == 2
# set it to fail
self.dbapi.shutdown()
assert_raises(tsa.exc.DBAPIError, conn.execute, select([1]))
# assert was invalidated
assert not conn.closed
assert conn.invalidated
# close shouldn't break
conn.close()
# ensure one connection closed...
eq_(
[c.close.mock_calls for c in self.dbapi.connections],
[[call()], []],
)
conn = self.db.connect()
eq_(
[c.close.mock_calls for c in self.dbapi.connections],
[[call()], [call()], []],
)
conn.execute(select([1]))
conn.close()
eq_(
[c.close.mock_calls for c in self.dbapi.connections],
[[call()], [call()], []],
)
开发者ID:vrajmohan,项目名称:sqlalchemy,代码行数:59,代码来源:test_reconnect.py
示例11: test_hanging_connect_within_overflow
def test_hanging_connect_within_overflow(self):
"""test that a single connect() call which is hanging
does not block other connections from proceeding."""
dbapi = Mock()
mutex = threading.Lock()
def hanging_dbapi():
time.sleep(2)
with mutex:
return dbapi.connect()
def fast_dbapi():
with mutex:
return dbapi.connect()
creator = threading.local()
def create():
return creator.mock_connector()
def run_test(name, pool, should_hang):
if should_hang:
creator.mock_connector = hanging_dbapi
else:
creator.mock_connector = fast_dbapi
conn = pool.connect()
conn.operation(name)
time.sleep(1)
conn.close()
p = pool.QueuePool(creator=create, pool_size=2, max_overflow=3)
threads = [
threading.Thread(
target=run_test, args=("success_one", p, False)),
threading.Thread(
target=run_test, args=("success_two", p, False)),
threading.Thread(
target=run_test, args=("overflow_one", p, True)),
threading.Thread(
target=run_test, args=("overflow_two", p, False)),
threading.Thread(
target=run_test, args=("overflow_three", p, False))
]
for t in threads:
t.start()
time.sleep(.2)
for t in threads:
t.join(timeout=join_timeout)
eq_(
dbapi.connect().operation.mock_calls,
[call("success_one"), call("success_two"),
call("overflow_two"), call("overflow_three"),
call("overflow_one")]
)
开发者ID:Attsun1031,项目名称:sqlalchemy,代码行数:58,代码来源:test_pool.py
示例12: test_remove_instancelevel
def test_remove_instancelevel(self):
listen_one = Mock()
t1 = self.Target()
event.listen(t1, "event_one", listen_one, add=True)
t1.dispatch.event_one(5, 7)
eq_(listen_one.mock_calls, [call(12)])
event.remove(t1, "event_one", listen_one)
t1.dispatch.event_one(10, 5)
eq_(listen_one.mock_calls, [call(12)])
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:9,代码来源:test_events.py
示例13: test_parent_class_only
def test_parent_class_only(self):
l1 = Mock()
event.listen(self.TargetFactory, "event_one", l1)
element = self.TargetFactory().create()
element.run_event(1)
element.run_event(2)
element.run_event(3)
eq_(l1.mock_calls, [call(element, 1), call(element, 2), call(element, 3)])
开发者ID:regularex,项目名称:sqlalchemy,代码行数:10,代码来源:test_events.py
示例14: test_bulk_save_mappings_preserve_order
def test_bulk_save_mappings_preserve_order(self):
User, = self.classes("User")
s = Session()
# commit some object into db
user1 = User(name="i1")
user2 = User(name="i2")
s.add(user1)
s.add(user2)
s.commit()
# make some changes
user1.name = "u1"
user3 = User(name="i3")
s.add(user3)
user2.name = "u2"
objects = [user1, user3, user2]
from sqlalchemy import inspect
def _bulk_save_mappings(
mapper,
mappings,
isupdate,
isstates,
return_defaults,
update_changed_only,
render_nulls,
):
mock_method(list(mappings), isupdate)
mock_method = mock.Mock()
with mock.patch.object(s, "_bulk_save_mappings", _bulk_save_mappings):
s.bulk_save_objects(objects)
eq_(
mock_method.mock_calls,
[
mock.call([inspect(user1)], True),
mock.call([inspect(user3)], False),
mock.call([inspect(user2)], True),
],
)
mock_method = mock.Mock()
with mock.patch.object(s, "_bulk_save_mappings", _bulk_save_mappings):
s.bulk_save_objects(objects, preserve_order=False)
eq_(
mock_method.mock_calls,
[
mock.call([inspect(user3)], False),
mock.call([inspect(user1), inspect(user2)], True),
],
)
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:55,代码来源:test_bulk.py
示例15: test_kw_ok
def test_kw_ok(self):
l1 = Mock()
def listen(**kw):
l1(kw)
event.listen(self.TargetFactory, "event_one", listen, named=True)
element = self.TargetFactory().create()
element.run_event(1)
element.run_event(2)
eq_(l1.mock_calls, [call({"target": element, "arg": 1}), call({"target": element, "arg": 2})])
开发者ID:regularex,项目名称:sqlalchemy,代码行数:11,代码来源:test_events.py
示例16: test_validator_w_removes
def test_validator_w_removes(self):
users, addresses, Address = (
self.tables.users,
self.tables.addresses,
self.classes.Address,
)
canary = Mock()
class User(fixtures.ComparableEntity):
@validates("name", include_removes=True)
def validate_name(self, key, item, remove):
canary(key, item, remove)
return item
@validates("addresses", include_removes=True)
def validate_address(self, key, item, remove):
canary(key, item, remove)
return item
mapper(User, users, properties={"addresses": relationship(Address)})
mapper(Address, addresses)
u1 = User()
u1.name = "ed"
u1.name = "mary"
del u1.name
a1, a2, a3 = Address(), Address(), Address()
u1.addresses.append(a1)
u1.addresses.remove(a1)
u1.addresses = [a1, a2]
u1.addresses = [a2, a3]
eq_(
canary.mock_calls,
[
call("name", "ed", False),
call("name", "mary", False),
call("name", "mary", True),
# append a1
call("addresses", a1, False),
# remove a1
call("addresses", a1, True),
# set to [a1, a2] - this is two appends
call("addresses", a1, False),
call("addresses", a2, False),
# set to [a2, a3] - this is a remove of a1,
# append of a3. the appends are first.
# in 1.2 due to #3896, we also get 'a2' in the
# validates as it is part of the set
call("addresses", a2, False),
call("addresses", a3, False),
call("addresses", a1, True),
],
)
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:54,代码来源:test_validators.py
示例17: test_plugin_multiple_url_registration
def test_plugin_multiple_url_registration(self):
from sqlalchemy.dialects import sqlite
global MyEnginePlugin1
global MyEnginePlugin2
def side_effect_1(url, kw):
eq_(kw, {"logging_name": "foob"})
kw["logging_name"] = "bar"
url.query.pop("myplugin1_arg", None)
return MyEnginePlugin1
def side_effect_2(url, kw):
url.query.pop("myplugin2_arg", None)
return MyEnginePlugin2
MyEnginePlugin1 = Mock(side_effect=side_effect_1)
MyEnginePlugin2 = Mock(side_effect=side_effect_2)
plugins.register("engineplugin1", __name__, "MyEnginePlugin1")
plugins.register("engineplugin2", __name__, "MyEnginePlugin2")
e = create_engine(
"sqlite:///?plugin=engineplugin1&foo=bar&myplugin1_arg=bat"
"&plugin=engineplugin2&myplugin2_arg=hoho",
logging_name="foob",
)
eq_(e.dialect.name, "sqlite")
eq_(e.logging_name, "bar")
# plugin args are removed from URL.
eq_(e.url.query, {"foo": "bar"})
assert isinstance(e.dialect, sqlite.dialect)
eq_(
MyEnginePlugin1.mock_calls,
[
call(url.make_url("sqlite:///?foo=bar"), {}),
call.handle_dialect_kwargs(sqlite.dialect, mock.ANY),
call.handle_pool_kwargs(mock.ANY, {"dialect": e.dialect}),
call.engine_created(e),
],
)
eq_(
MyEnginePlugin2.mock_calls,
[
call(url.make_url("sqlite:///?foo=bar"), {}),
call.handle_dialect_kwargs(sqlite.dialect, mock.ANY),
call.handle_pool_kwargs(mock.ANY, {"dialect": e.dialect}),
call.engine_created(e),
],
)
开发者ID:vrajmohan,项目名称:sqlalchemy,代码行数:53,代码来源:test_parseconnect.py
示例18: test_detach
def test_detach(self):
dbapi, p = self._queuepool_dbapi_fixture(pool_size=1, max_overflow=0)
c1 = p.connect()
c1.detach()
c2 = p.connect()
eq_(dbapi.connect.mock_calls, [call("foo.db"), call("foo.db")])
c1_con = c1.connection
assert c1_con is not None
eq_(c1_con.close.call_count, 0)
c1.close()
eq_(c1_con.close.call_count, 1)
开发者ID:kihon10,项目名称:sqlalchemy,代码行数:12,代码来源:test_pool.py
示例19: test_parent_events_child_no_events
def test_parent_events_child_no_events(self):
l1 = Mock()
factory = self.TargetFactory()
event.listen(self.TargetElement, "event_one", l1)
element = factory.create()
element.run_event(1)
element.run_event(2)
element.run_event(3)
eq_(l1.mock_calls, [call(element, 1), call(element, 2), call(element, 3)])
开发者ID:regularex,项目名称:sqlalchemy,代码行数:12,代码来源:test_events.py
示例20: test_remove_wrapped_named
def test_remove_wrapped_named(self):
Target = self._wrapped_fixture()
listen_one = Mock()
t1 = Target()
event.listen(t1, "event_one", listen_one, named=True)
t1.dispatch.event_one("t1")
eq_(listen_one.mock_calls, [call(x="adapted t1")])
event.remove(t1, "event_one", listen_one)
t1.dispatch.event_one("t2")
eq_(listen_one.mock_calls, [call(x="adapted t1")])
开发者ID:cpcloud,项目名称:sqlalchemy,代码行数:13,代码来源:test_events.py
注:本文中的sqlalchemy.testing.mock.call函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论