• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python event.listens_for函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.event.listens_for函数的典型用法代码示例。如果您正苦于以下问题:Python listens_for函数的具体用法?Python listens_for怎么用?Python listens_for使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了listens_for函数的13个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: register_event_listener

def register_event_listener():
    for event_ in sqlalchemy_events:
        # listen transient need extra work
        if event_ == 'transient':
            event.listens_for(Base, 'init', propagate=True)(transient_state)
            continue
        event.listens_for(Session, event_)(state_transition(event_))
开发者ID:JustScrapy,项目名称:blog,代码行数:7,代码来源:verify.py


示例2: initialize_logging

def initialize_logging(settings):
    # UGLY: I don't know how to count CPU time accurately in a multithreaded environment and I'm too lazy
    #       to implement proper query counting for a multithreaded environment. Not sure how to test this
    #       reliably, I resort to checking if we run under the Werkzeug reloader as a means to decide whether or
    #       not to count CPU time and database queries when logging. Rough, but effective.
    count_cpu_usage_and_db_queries = 'WERKZEUG_RUN_MAIN' in environ
    logging_exemptions = () if 'JJ_DEBUG_ASSETS' in environ else ('/static/', '/system/admin/static/', '/system/rq/')
    query_count_increment = install_request_logger(app, count_cpu_usage_and_db_queries, getLogger('jj.request'),
                                                   logging_exemptions)
    if count_cpu_usage_and_db_queries:
        event.listens_for(Engine, "after_cursor_execute")(query_count_increment)
开发者ID:Rydgel,项目名称:flask-todo,代码行数:11,代码来源:app.py


示例3: setup_ownership_load_event

    def setup_ownership_load_event(cls, owner_class, relns):
        def load_owner_object(target, context):
            for reln in relns:
                ls = getattr(target, reln, None)
                if ls is not None:
                    ls.owner_object = target
        event.listen(owner_class, "load", load_owner_object, propagate=True)
        event.listens_for(owner_class, "refresh", load_owner_object, propagate=True)

        def set_owner_object(target, value, old_value, initiator):
            if old_value is not None:
                old_value.owner_object = None
            if value is not None:
                value.owner_object = target
        for reln in relns:
            cls._owning_relns.append((owner_class, reln))
            event.listen(getattr(owner_class, reln), "set", set_owner_object, propagate=True)
开发者ID:assembl,项目名称:assembl,代码行数:17,代码来源:langstrings.py


示例4: _configure_creation

    def _configure_creation(self, connection):
        def do_connect(dbapi_connection, connection_record):
            # disable pysqlite's emitting of the BEGIN statement entirely.
            # also stops it from emitting COMMIT before any DDL.
            iso_level = dbapi_connection.isolation_level
            dbapi_connection.isolation_level = None
            try:
                dbapi_connection.execute("PRAGMA page_size = 5120;")
                dbapi_connection.execute("PRAGMA cache_size = 12000;")
                dbapi_connection.execute("PRAGMA foreign_keys = ON;")
                dbapi_connection.execute("PRAGMA journal_mode = WAL;")

            except:
                pass
            dbapi_connection.isolation_level = iso_level

        event.listens_for(connection, "connect")(do_connect)
开发者ID:BostonUniversityCBMS,项目名称:glycresoft_sqlalchemy,代码行数:17,代码来源:connection.py


示例5: test_session_events

    def test_session_events(self):
        app = flask.Flask(__name__)
        app.config['SQLALCHEMY_ENGINE'] = 'sqlite://'
        app.config['TESTING'] = True
        db = sqlalchemy.SQLAlchemy(app)

        from sqlalchemy.event import listens_for

        seen = []
        register = listens_for(db.session, 'after_commit')
        register(seen.append)

        db.session.commit()
        self.assertEqual(seen, [db.session()])
开发者ID:gdrius,项目名称:flask-sqlalchemy,代码行数:14,代码来源:test_sqlalchemy.py


示例6: create_schedule

def create_schedule(abc_schedule, base, mixins=None, persister=None):
    """Concrete schedule model factory.

    :param abc_schedule: Abstract base schedule to use as base.
    :type abc_schedule: Any ABC schedule from :func:`~create_abc_schedule`
        factory function.
    :param base: SQLAlchemy model base to use.
    :type base: Any SQLAlchemy model base from
        :func:`sqlalchemy.ext.declarative.declarative_base` factory function
    :param mixins: Mixins to be mixed into concrete schedule model.
    :type mixins: Iterable mixin classes.
    :param persister: Persister to use for the schedule persistence.
    :type persister: :class:`~news.persistence.ScheduleNotifier`
    :returns: Concrete schedule model based on given abc schedule.
    :rtype: :class:`~news.models.AbstractSchedule` SQLAlchemy
        implementation based on given abc schedule, model base and mixins.

    """
    mixins = mixins or tuple()
    Schedule = type('Schedule', mixins + (abc_schedule, base), {})

    # connect persister if given
    if persister:
        event.listens_for(Schedule, 'after_insert')(
            lambda mapper, connection, target:
            persister.notify_saved(target, created=True)
        )
        event.listens_for(Schedule, 'after_update')(
            lambda mapper, connection, target:
            persister.notify_saved(target, created=False)
        )
        event.listens_for(Schedule, 'after_delete')(
            lambda mapper, connection, target:
            persister.notify_deleted(target)
        )

    return Schedule
开发者ID:kuc2477,项目名称:news,代码行数:37,代码来源:sqlalchemy.py


示例7: setup

    def setup(self):
        # heavy load mode is read only mode with a different infobar
        if self.heavy_load_mode:
            self.read_only_mode = True

        if hasattr(signal, 'SIGUSR1'):
            # not all platforms have user signals
            signal.signal(signal.SIGUSR1, thread_dump)

        # initialize caches. Any cache-chains built here must be added
        # to cache_chains (closed around by reset_caches) so that they
        # can properly reset their local components

        localcache_cls = (SelfEmptyingCache if self.running_as_script
                          else LocalCache)
        num_mc_clients = self.num_mc_clients

        self.cache_chains = {}

        # for now, zookeeper will be an optional part of the stack.
        # if it's not configured, we will grab the expected config from the
        # [live_config] section of the ini file
        zk_hosts = self.config.get("zookeeper_connection_string")
        if zk_hosts:
            from r2.lib.zookeeper import (connect_to_zookeeper,
                                          LiveConfig, LiveList)
            zk_username = self.config["zookeeper_username"]
            zk_password = self.config["zookeeper_password"]
            self.zookeeper = connect_to_zookeeper(zk_hosts, (zk_username,
                                                             zk_password))
            self.live_config = LiveConfig(self.zookeeper, LIVE_CONFIG_NODE)
            self.throttles = LiveList(self.zookeeper, "/throttles",
                                      map_fn=ipaddress.ip_network,
                                      reduce_fn=ipaddress.collapse_addresses)
        else:
            self.zookeeper = None
            parser = ConfigParser.RawConfigParser()
            parser.read([self.config["__file__"]])
            self.live_config = extract_live_config(parser, self.plugins)
            self.throttles = tuple()  # immutable since it's not real

        self.memcache = CMemcache(self.memcaches, num_clients = num_mc_clients)
        self.lock_cache = CMemcache(self.lockcaches, num_clients=num_mc_clients)

        self.stats = Stats(self.config.get('statsd_addr'),
                           self.config.get('statsd_sample_rate'))

        event.listens_for(engine.Engine, 'before_cursor_execute')(
            self.stats.pg_before_cursor_execute)
        event.listens_for(engine.Engine, 'after_cursor_execute')(
            self.stats.pg_after_cursor_execute)

        self.make_lock = make_lock_factory(self.lock_cache, self.stats)

        if not self.cassandra_seeds:
            raise ValueError("cassandra_seeds not set in the .ini")


        keyspace = "reddit"
        self.cassandra_pools = {
            "main":
                StatsCollectingConnectionPool(
                    keyspace,
                    stats=self.stats,
                    logging_name="main",
                    server_list=self.cassandra_seeds,
                    pool_size=self.cassandra_pool_size,
                    timeout=2,
                    max_retries=3,
                    prefill=False
                ),
        }

        perma_memcache = (CMemcache(self.permacache_memcaches, num_clients = num_mc_clients)
                          if self.permacache_memcaches
                          else None)
        self.permacache = CassandraCacheChain(localcache_cls(),
                                              CassandraCache('permacache',
                                                             self.cassandra_pools[self.cassandra_default_pool],
                                                             read_consistency_level = self.cassandra_rcl,
                                                             write_consistency_level = self.cassandra_wcl),
                                              memcache = perma_memcache,
                                              lock_factory = self.make_lock)

        self.cache_chains.update(permacache=self.permacache)

        # hardcache is done after the db info is loaded, and then the
        # chains are reset to use the appropriate initial entries

        if self.stalecaches:
            self.cache = StaleCacheChain(localcache_cls(),
                                         CMemcache(self.stalecaches, num_clients=num_mc_clients),
                                         self.memcache)
        else:
            self.cache = MemcacheChain((localcache_cls(), self.memcache))
        self.cache_chains.update(cache=self.cache)

        self.rendercache = MemcacheChain((localcache_cls(),
                                          CMemcache(self.rendercaches,
                                                    noreply=True, no_block=True,
#.........这里部分代码省略.........
开发者ID:Anenome,项目名称:reddit,代码行数:101,代码来源:app_globals.py


示例8: __call__

 def __call__(self):
     connection = create_engine(
         self.connection_url, connect_args=self.connect_args,
         **self.engine_args)
     event.listens_for(connection, 'connect')(self.on_connect)
     return connection
开发者ID:BostonUniversityCBMS,项目名称:ms_deisotope,代码行数:6,代码来源:db.py


示例9: setup


#.........这里部分代码省略.........
            noreply=True,
            no_block=True,
            num_clients=num_mc_clients,
        )

        self.startup_timer.intermediate("memcache")

        ################# CASSANDRA
        keyspace = "reddit"
        self.cassandra_pools = {
            "main":
                StatsCollectingConnectionPool(
                    keyspace,
                    stats=self.stats,
                    logging_name="main",
                    server_list=self.cassandra_seeds,
                    pool_size=self.cassandra_pool_size,
                    timeout=4,
                    max_retries=3,
                    prefill=False
                ),
        }

        permacache_cf = CassandraCache(
            'permacache',
            self.cassandra_pools[self.cassandra_default_pool],
            read_consistency_level=self.cassandra_rcl,
            write_consistency_level=self.cassandra_wcl
        )

        self.startup_timer.intermediate("cassandra")

        ################# POSTGRES
        event.listens_for(engine.Engine, 'before_cursor_execute')(
            self.stats.pg_before_cursor_execute)
        event.listens_for(engine.Engine, 'after_cursor_execute')(
            self.stats.pg_after_cursor_execute)

        self.dbm = self.load_db_params()
        self.startup_timer.intermediate("postgres")

        ################# CHAINS
        # initialize caches. Any cache-chains built here must be added
        # to cache_chains (closed around by reset_caches) so that they
        # can properly reset their local components
        self.cache_chains = {}
        localcache_cls = (SelfEmptyingCache if self.running_as_script
                          else LocalCache)

        if stalecaches:
            self.cache = StaleCacheChain(
                localcache_cls(),
                stalecaches,
                self.memcache,
            )
        else:
            self.cache = MemcacheChain((localcache_cls(), self.memcache))
        self.cache_chains.update(cache=self.cache)

        self.rendercache = MemcacheChain((
            localcache_cls(),
            rendercaches,
        ))
        self.cache_chains.update(rendercache=self.rendercache)

        self.pagecache = MemcacheChain((
开发者ID:debanshuk,项目名称:reddit,代码行数:67,代码来源:app_globals.py


示例10: make_connector

 def make_connector(self, app=None, bind=None):
     connector = super(SQLAlchemy, self).make_connector(app, bind)
     engine = connector.get_engine()
     event.listens_for(engine, 'engine_connect')(ping_connection)
     return connector
开发者ID:renalreg,项目名称:radar,代码行数:5,代码来源:__init__.py


示例11: setup

    def setup(self, global_conf):
        # heavy load mode is read only mode with a different infobar
        if self.heavy_load_mode:
            self.read_only_mode = True

        if hasattr(signal, 'SIGUSR1'):
            # not all platforms have user signals
            signal.signal(signal.SIGUSR1, thread_dump)

        # initialize caches. Any cache-chains built here must be added
        # to cache_chains (closed around by reset_caches) so that they
        # can properly reset their local components

        localcache_cls = (SelfEmptyingCache if self.running_as_script
                          else LocalCache)
        num_mc_clients = self.num_mc_clients

        self.cache_chains = {}

        self.memcache = CMemcache(self.memcaches, num_clients = num_mc_clients)
        self.make_lock = make_lock_factory(self.memcache)

        self.stats = Stats(global_conf.get('statsd_addr'),
                           global_conf.get('statsd_sample_rate'))

        event.listens_for(engine.Engine, 'before_cursor_execute')(
            self.stats.pg_before_cursor_execute)
        event.listens_for(engine.Engine, 'after_cursor_execute')(
            self.stats.pg_after_cursor_execute)

        if not self.cassandra_seeds:
            raise ValueError("cassandra_seeds not set in the .ini")


        keyspace = "reddit"
        self.cassandra_pools = {
            "main":
                StatsCollectingConnectionPool(
                    keyspace,
                    stats=self.stats,
                    logging_name="main",
                    server_list=self.cassandra_seeds,
                    pool_size=self.cassandra_pool_size,
                    timeout=2,
                    max_retries=3,
                    prefill=False
                ),
            "noretries":
                StatsCollectingConnectionPool(
                    keyspace,
                    stats=self.stats,
                    logging_name="noretries",
                    server_list=self.cassandra_seeds,
                    pool_size=len(self.cassandra_seeds),
                    timeout=2,
                    max_retries=0,
                    prefill=False
                ),
        }

        perma_memcache = (CMemcache(self.permacache_memcaches, num_clients = num_mc_clients)
                          if self.permacache_memcaches
                          else None)
        self.permacache = CassandraCacheChain(localcache_cls(),
                                              CassandraCache('permacache',
                                                             self.cassandra_pools[self.cassandra_default_pool],
                                                             read_consistency_level = self.cassandra_rcl,
                                                             write_consistency_level = self.cassandra_wcl),
                                              memcache = perma_memcache,
                                              lock_factory = self.make_lock)

        self.cache_chains.update(permacache=self.permacache)

        # hardcache is done after the db info is loaded, and then the
        # chains are reset to use the appropriate initial entries

        if self.stalecaches:
            self.cache = StaleCacheChain(localcache_cls(),
                                         CMemcache(self.stalecaches, num_clients=num_mc_clients),
                                         self.memcache)
        else:
            self.cache = MemcacheChain((localcache_cls(), self.memcache))
        self.cache_chains.update(cache=self.cache)

        self.rendercache = MemcacheChain((localcache_cls(),
                                          CMemcache(self.rendercaches,
                                                    noreply=True, no_block=True,
                                                    num_clients = num_mc_clients)))
        self.cache_chains.update(rendercache=self.rendercache)

        self.thing_cache = CacheChain((localcache_cls(),))
        self.cache_chains.update(thing_cache=self.thing_cache)

        #load the database info
        self.dbm = self.load_db_params(global_conf)

        # can't do this until load_db_params() has been called
        self.hardcache = HardcacheChain((localcache_cls(),
                                         self.memcache,
                                         HardCache(self)),
#.........这里部分代码省略.........
开发者ID:DamonAnderson,项目名称:reddit,代码行数:101,代码来源:app_globals.py


示例12: setup


#.........这里部分代码省略.........

        # rendercache holds rendered partial templates.
        rendercaches = CMemcache(self.rendercaches, noreply=True, no_block=True, num_clients=num_mc_clients)

        # pagecaches hold fully rendered pages
        pagecaches = CMemcache(self.pagecaches, noreply=True, no_block=True, num_clients=num_mc_clients)

        self.startup_timer.intermediate("memcache")

        ################# CASSANDRA
        keyspace = "reddit"
        self.cassandra_pools = {
            "main": StatsCollectingConnectionPool(
                keyspace,
                stats=self.stats,
                logging_name="main",
                server_list=self.cassandra_seeds,
                pool_size=self.cassandra_pool_size,
                timeout=4,
                max_retries=3,
                prefill=False,
            )
        }

        permacache_cf = CassandraCache(
            "permacache",
            self.cassandra_pools[self.cassandra_default_pool],
            read_consistency_level=self.cassandra_rcl,
            write_consistency_level=self.cassandra_wcl,
        )

        self.startup_timer.intermediate("cassandra")

        ################# POSTGRES
        event.listens_for(engine.Engine, "before_cursor_execute")(self.stats.pg_before_cursor_execute)
        event.listens_for(engine.Engine, "after_cursor_execute")(self.stats.pg_after_cursor_execute)

        self.dbm = self.load_db_params()
        self.startup_timer.intermediate("postgres")

        ################# CHAINS
        # initialize caches. Any cache-chains built here must be added
        # to cache_chains (closed around by reset_caches) so that they
        # can properly reset their local components
        self.cache_chains = {}
        localcache_cls = SelfEmptyingCache if self.running_as_script else LocalCache

        if stalecaches:
            self.cache = StaleCacheChain(localcache_cls(), stalecaches, self.memcache)
        else:
            self.cache = MemcacheChain((localcache_cls(), self.memcache))
        self.cache_chains.update(cache=self.cache)

        self.rendercache = MemcacheChain((localcache_cls(), rendercaches))
        self.cache_chains.update(rendercache=self.rendercache)

        self.pagecache = MemcacheChain((localcache_cls(), pagecaches))
        self.cache_chains.update(pagecache=self.pagecache)

        # the thing_cache is used in tdb_cassandra.
        self.thing_cache = CacheChain((localcache_cls(),))
        self.cache_chains.update(thing_cache=self.thing_cache)

        self.permacache = CassandraCacheChain(
            localcache_cls(), permacache_cf, memcache=permacache_memcaches, lock_factory=self.make_lock
        )
        self.cache_chains.update(permacache=self.permacache)

        # hardcache is used for various things that tend to expire
        # TODO: replace hardcache w/ cassandra stuff
        self.hardcache = HardcacheChain((localcache_cls(), self.memcache, HardCache(self)), cache_negative_results=True)
        self.cache_chains.update(hardcache=self.hardcache)

        # I know this sucks, but we need non-request-threads to be
        # able to reset the caches, so we need them be able to close
        # around 'cache_chains' without being able to call getattr on
        # 'g'
        cache_chains = self.cache_chains.copy()

        def reset_caches():
            for name, chain in cache_chains.iteritems():
                chain.reset()
                chain.stats = CacheStats(self.stats, name)

        self.reset_caches = reset_caches
        self.reset_caches()

        self.startup_timer.intermediate("cache_chains")

        # try to set the source control revision numbers
        self.versions = {}
        r2_root = os.path.dirname(os.path.dirname(self.paths["root"]))
        r2_gitdir = os.path.join(r2_root, ".git")
        self.short_version = self.record_repo_version("r2", r2_gitdir)

        if I18N_PATH:
            i18n_git_path = os.path.join(os.path.dirname(I18N_PATH), ".git")
            self.record_repo_version("i18n", i18n_git_path)

        self.startup_timer.intermediate("revisions")
开发者ID:99plus2,项目名称:reddit,代码行数:101,代码来源:app_globals.py


示例13: __init__

 def __init__(self, engine=None, **kvargs):
     # setup regex support for sqlalchemy
     # this hooks up the callback in sqlite
     listens_for(engine, "begin")(self.hookup_regex)
开发者ID:VolatileDream,项目名称:whisper-composer,代码行数:4,代码来源:BaseDb.py



注:本文中的sqlalchemy.event.listens_for函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python event.remove函数代码示例发布时间:2022-05-27
下一篇:
Python event.listen函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap