• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python event.remove函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.event.remove函数的典型用法代码示例。如果您正苦于以下问题:Python remove函数的具体用法?Python remove怎么用?Python remove使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了remove函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_external_tables_not_changed

    def test_external_tables_not_changed(self):

        def block_external_tables(conn, clauseelement, multiparams, params):
            if isinstance(clauseelement, sqlalchemy.sql.selectable.Select):
                return

            if (isinstance(clauseelement, six.string_types) and
                    any(name in clauseelement for name in external.TABLES)):
                self.fail("External table referenced by neutron core "
                          "migration.")

            if hasattr(clauseelement, 'element'):
                if (clauseelement.element.name in external.TABLES or
                        (hasattr(clauseelement, 'table') and
                         clauseelement.element.table.name in external.TABLES)):
                    self.fail("External table referenced by neutron core "
                              "migration.")

        engine = self.get_engine()
        cfg.CONF.set_override('connection', engine.url, group='database')
        migration.do_alembic_command(self.alembic_config, 'upgrade', 'kilo')

        event.listen(engine, 'before_execute', block_external_tables)
        migration.do_alembic_command(self.alembic_config, 'upgrade', 'heads')

        event.remove(engine, 'before_execute', block_external_tables)
开发者ID:smokony,项目名称:neutron,代码行数:26,代码来源:test_migrations.py


示例2: _listener

 def _listener(self, engine, listener_func):
     try:
         event.listen(engine, 'before_execute', listener_func)
         yield
     finally:
         event.remove(engine, 'before_execute',
                      listener_func)
开发者ID:jaguar13,项目名称:neutron,代码行数:7,代码来源:test_migrations.py


示例3: attach_ddl_listeners

    def attach_ddl_listeners(self):
        # Remove all previously added listeners, so that same listener don't
        # get added twice in situations where class configuration happens in
        # multiple phases (issue #31).
        for listener in self.listeners:
            event.remove(*listener)
        self.listeners = []

        for column in self.processed_columns:
            # This sets up the trigger that keeps the tsvector column up to
            # date.
            if column.type.columns:
                table = column.table
                if self.option(column, 'remove_symbols'):
                    self.add_listener((
                        table,
                        'after_create',
                        self.search_function_ddl(column)
                    ))
                    self.add_listener((
                        table,
                        'after_drop',
                        DDL(str(DropSearchFunctionSQL(column)))
                    ))

                self.add_listener((
                    table,
                    'after_create',
                    self.search_trigger_ddl(column)
                ))
开发者ID:pablogamboa,项目名称:sqlalchemy-searchable,代码行数:30,代码来源:__init__.py


示例4: remove_sqlalchemy_known_event

 def remove_sqlalchemy_known_event(self):
     for e, namespace, method in self._sqlalchemy_known_events:
         try:
             event.remove(e.mapper(self, namespace), e.event,
                          method.get_attribute(self))
         except InvalidRequestError:
             pass
开发者ID:jssuzanne,项目名称:AnyBlok,代码行数:7,代码来源:registry.py


示例5: execute_counter

def execute_counter(connection, zsa_savepoints, check_constraints):
    """ Count calls to execute
    """
    from contextlib import contextmanager
    from sqlalchemy import event

    class Counter(object):
        def __init__(self):
            self.reset()
            self.conn = connection

        def reset(self):
            self.count = 0

        @contextmanager
        def expect(self, count):
            start = self.count
            yield
            difference = self.count - start
            assert difference == count

    counter = Counter()

    @event.listens_for(connection, 'after_cursor_execute')
    def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
        # Ignore the testing savepoints
        if zsa_savepoints.state != 'begun' or check_constraints.state == 'checking':
            return
        counter.count += 1

    yield counter

    event.remove(connection, 'after_cursor_execute', after_cursor_execute)
开发者ID:Kapeel,项目名称:encoded,代码行数:33,代码来源:serverfixtures.py


示例6: register

    def register(cls, encryptor: Encryptor):
        """
        Register this encryptable with an encryptor.

        Instances of this encryptor will be encrypted on initialization and decrypted on load.

        """
        # save the current encryptor statically
        cls.__encryptor__ = encryptor

        # NB: we cannot use the before_insert listener in conjunction with a foreign key relationship
        # for encrypted data; SQLAlchemy will warn about using 'related attribute set' operation so
        # late in its insert/flush process.
        listeners = dict(
            init=on_init,
            load=on_load,
        )

        for name, func in listeners.items():
            # If we initialize the graph multiple times (as in many unit testing scenarios),
            # we will accumulate listener functions -- with unpredictable results. As protection,
            # we need to remove existing listeners before adding new ones; this solution only
            # works if the id (e.g. memory address) of the listener does not change, which means
            # they cannot be closures around the `encryptor` reference.
            #
            # Hence the `__encryptor__` hack above...
            if contains(cls, name, func):
                remove(cls, name, func)
            listen(cls, name, func)
开发者ID:globality-corp,项目名称:microcosm-postgres,代码行数:29,代码来源:models.py


示例7: ro_session_tracker

def ro_session_tracker(ro_session):
    """
    This install an event handler into the active session, which
    tracks all SQL statements that are send via the session.

    The yielded checker can be called with an integer argument,
    representing the number of expected SQL statements, for example::

        ro_session_tracker(0)

    would only succeed if no SQL statements where made.
    """

    db_events = []

    def scoped_conn_event_handler(calls):
        def conn_event_handler(**kw):
            calls.append((kw['statement'], kw['parameters']))
        return conn_event_handler

    handler = scoped_conn_event_handler(db_events)
    event.listen(ro_session.bind,
                 'before_cursor_execute',
                 handler, named=True)

    def checker(num=None):
        if num is not None:
            assert len(db_events) == num

    yield checker

    event.remove(ro_session.bind, 'before_cursor_execute', handler)
开发者ID:amjadm61,项目名称:ichnaea,代码行数:32,代码来源:conftest.py


示例8: test_instance

    def test_instance(self):
        Target = self._fixture()

        class Foo(object):
            def __init__(self):
                self.mock = Mock()

            def evt(self, arg):
                self.mock(arg)

        f1 = Foo()
        f2 = Foo()

        event.listen(Target, "event_one", f1.evt)
        event.listen(Target, "event_one", f2.evt)

        t1 = Target()
        t1.dispatch.event_one("x")

        event.remove(Target, "event_one", f1.evt)

        t1.dispatch.event_one("y")

        eq_(f1.mock.mock_calls, [call("x")])
        eq_(f2.mock.mock_calls, [call("x"), call("y")])
开发者ID:Callek,项目名称:sqlalchemy,代码行数:25,代码来源:test_events.py


示例9: count_queries

def count_queries():
    """Provides a query counter.

    Usage::

        with count_queries() as count:
            do_stuff()
        assert count() == number_of_queries
    """
    def _after_cursor_execute(*args, **kwargs):
        if active_counter[0]:
            active_counter[0][0] += 1

    @contextmanager
    def _counter():
        if active_counter[0]:
            raise RuntimeError('Cannot nest count_queries calls')
        active_counter[0] = counter = [0]
        try:
            yield lambda: counter[0]
        finally:
            active_counter[0] = None

    active_counter = [None]
    event.listen(Engine, 'after_cursor_execute', _after_cursor_execute)
    try:
        yield _counter
    finally:
        event.remove(Engine, 'after_cursor_execute', _after_cursor_execute)
开发者ID:MichelCordeiro,项目名称:indico,代码行数:29,代码来源:database.py


示例10: test_propagate

    def test_propagate(self):
        Target = self._fixture()

        m1 = Mock()

        t1 = Target()
        t2 = Target()

        event.listen(t1, "event_one", m1, propagate=True)
        event.listen(t1, "event_two", m1, propagate=False)

        t2.dispatch._update(t1.dispatch)

        t1.dispatch.event_one("t1e1x")
        t1.dispatch.event_two("t1e2x")
        t2.dispatch.event_one("t2e1x")
        t2.dispatch.event_two("t2e2x")

        event.remove(t1, "event_one", m1)
        event.remove(t1, "event_two", m1)

        t1.dispatch.event_one("t1e1y")
        t1.dispatch.event_two("t1e2y")
        t2.dispatch.event_one("t2e1y")
        t2.dispatch.event_two("t2e2y")

        eq_(m1.mock_calls,
                [call('t1e1x'), call('t1e2x'),
                call('t2e1x')])
开发者ID:Callek,项目名称:sqlalchemy,代码行数:29,代码来源:test_events.py


示例11: unregister_fsa_session_signals

def unregister_fsa_session_signals():
    """Unregisters Flask-SQLAlchemy session commit and rollback signal
    handlers.

    When a Flask-SQLAlchemy object is created, it registers signal handlers for
    ``before_commit``, ``after_commit``, and ``after_rollback`` signals. In
    case of using both a plain SQLAlchemy session and a Flask-SQLAlchemy
    session (as is happening in the tests in this package), we need to
    unregister handlers or there will be some exceptions during test
    executions like::

        AttributeError: 'Session' object has no attribute '_model_changes'

    """
    # We don't need to do this if Flask-SQLAlchemy is not installed.
    if not has_flask_sqlalchemy:
        return
    # We don't need to do this if Flask-SQLAlchemy version 2.0 or
    # greater is installed.
    version = parse_version(flask_sqlalchemy.__version__)
    if version >= (2, 0):
        return
    events = flask_sqlalchemy._SessionSignalEvents
    signal_names = ('before_commit', 'after_commit', 'after_rollback')
    for signal_name in signal_names:
        # For Flask-SQLAlchemy version less than 3.0.
        signal = getattr(events, 'session_signal_{0}'.format(signal_name))
        event.remove(SessionBase, signal_name, signal)
开发者ID:PlotWatt,项目名称:flask-restless,代码行数:28,代码来源:helpers.py


示例12: tearDown

 def tearDown(self):
     """Roll back the session """
     super(BlokTestCase, self).tearDown()
     self.case_savepoint.rollback()
     self.registry.System.Cache.invalidate_all()
     event.remove(self.registry.session, "after_transaction_end",
                  self.savepoint_restarter)
     self._transaction_case_teared_down = True
开发者ID:jssuzanne,项目名称:AnyBlok,代码行数:8,代码来源:testcase.py


示例13: sqla_remove_all

def sqla_remove_all():
    for args in _REGISTERED_SQLA_EVENTS:
        try:
            event.remove(*args)
        except sql_exc.InvalidRequestError:
            # already removed
            pass
    del _REGISTERED_SQLA_EVENTS[:]
开发者ID:mmalchuk,项目名称:openstack-neutron,代码行数:8,代码来源:api.py


示例14: unregister

    def unregister(cls, session):
        if hasattr(session, "_model_changes"):
            del session._model_changes

        event.remove(session, "before_flush", cls.record_ops)
        event.remove(session, "before_commit", cls.record_ops)
        event.remove(session, "before_commit", cls.before_commit)
        event.remove(session, "after_commit", cls.after_commit)
        event.remove(session, "after_rollback", cls.after_rollback)
开发者ID:pbecotte,项目名称:flask-sqlalchemy,代码行数:9,代码来源:__init__.py


示例15: test_remove_instancelevel

 def test_remove_instancelevel(self):
     listen_one = Mock()
     t1 = self.Target()
     event.listen(t1, "event_one", listen_one, add=True)
     t1.dispatch.event_one(5, 7)
     eq_(listen_one.mock_calls, [call(12)])
     event.remove(t1, "event_one", listen_one)
     t1.dispatch.event_one(10, 5)
     eq_(listen_one.mock_calls, [call(12)])
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:9,代码来源:test_events.py


示例16: unregister_events

 def unregister_events(self):
     try:
         event.remove(self._model_class, 'after_insert',
                      self._db_event_handler)
         event.remove(self._model_class, 'after_delete',
                      self._db_event_handler)
     except sql_exc.InvalidRequestError:
         LOG.warning(_LW("No sqlalchemy event for resource %s found"),
                     self.name)
开发者ID:MODITDC,项目名称:neutron,代码行数:9,代码来源:resource.py


示例17: count_queries

    def count_queries(self):
        self.statements = []

        def catch_queries(conn, cursor, statement, *args):
            self.statements.append(statement)

        event.listen(self.connection, 'before_cursor_execute', catch_queries)
        yield
        event.remove(self.connection, 'before_cursor_execute', catch_queries)
开发者ID:omh,项目名称:yoshimi,代码行数:9,代码来源:__init__.py


示例18: setUpPackage

def setUpPackage():
    """Setup the plugin."""
    from default import flask_app
    with flask_app.app_context():
        plugin_dir = os.path.dirname(plugin.__file__)
        plugin.LibCrowdsStatistics(plugin_dir).setup()

        # Remove event listeners
        func = event_listeners.record_new_task_run_ip_event
        event.remove(TaskRun, 'before_insert', func)
开发者ID:LibCrowds,项目名称:libcrowds-statistics,代码行数:10,代码来源:__init__.py


示例19: __declare_last__

    def __declare_last__(cls):
        # Unconfigure the event set in _SQLAMutationTracker, we have _save_data
        mapper = cls._sa_class_manager.mapper
        args = (mapper.attrs['data'], 'set', _SQLAMutationTracker._field_set)
        if event.contains(*args):
            event.remove(*args)

        # Declaring the event on the class attribute instead of mapper property
        # enables proper registration on its subclasses
        event.listen(cls.data, 'set', cls._save_data, retval=True)
开发者ID:yuanbosdu,项目名称:Kotti,代码行数:10,代码来源:resources.py


示例20: on_dispose

    def on_dispose(self):
        assert self.linked
        self.linked = False

        event.remove(self.child_rel_prop, 'set', self.on_key_update)

        self.parent_obj = None
        self.child_cls = None
        self.child_rel_name = None
        self.child_rel_prop = None
开发者ID:hydai,项目名称:cms,代码行数:10,代码来源:smartmappedcollection.py



注:本文中的sqlalchemy.event.remove函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python associationproxy.association_proxy函数代码示例发布时间:2022-05-27
下一篇:
Python event.listens_for函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap