• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python sys.setcheckinterval函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sys.setcheckinterval函数的典型用法代码示例。如果您正苦于以下问题:Python setcheckinterval函数的具体用法?Python setcheckinterval怎么用?Python setcheckinterval使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了setcheckinterval函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: evaluate

 def evaluate(self,pop,force = 0):
     #import tree
     #print '1',tree.ref()
     #only send the individuals out that need evaluation
     if force:
         _eval_list = pop.data
     else:
         _eval_list = filter(lambda x: not x.evaluated,pop)
     #print '2',tree.ref()
     eval_list = pop.clone()
     #print '3',tree.ref()
     eval_list.data = _eval_list
     if len(eval_list):
         Nserv = len(pop.server_list)
         groups = divide_list(eval_list,Nserv)
         #print '4',tree.ref()
         sys.setcheckinterval(10)
         finished = sync.event()
         bar = sync.barrier(Nserv)
         #print "EVAL LENGTH!!!", plen(pop.evaluator)
         gr = groups[0]
         print "GROUP LENGTH!!!", plen(groups[0]), len(gr),
         #print "IND!!!", plen(gr[0]),plen(gr[0].root)
         #print '4.5',tree.ref()
         for i in range(len(groups)):
             inputs = {'sub_pop':groups[i], 'evaluator':pop.evaluator, 'force':force}
             returns = ('sub_pop',)
             code = 'evaluator.evaluate(sub_pop,force)'
             data_pack = (inputs,returns,code)
             server = pop.server_list[i]
             thread.start_new_thread(remote_thread_eval,(bar,finished,server,data_pack))
         #print '7',tree.ref()
         finished.wait()
         sys.setcheckinterval(10)
开发者ID:mbentz80,项目名称:jzigbeercp,代码行数:34,代码来源:parallel_pop.py


示例2: test_string_slicing

    def test_string_slicing(self):
        def f(ct, passCt,chars):
            x = "asdfasdf" * (ct / 8)
            res = 0
            for _ in xrange(passCt):
                for ix in xrange(len(x)):
                    res = res + len(x[ix:ix+chars])
            return res

        self.evaluateWithExecutor(f, 1000000, 1, 2)
        self.evaluateWithExecutor(f, 10000, 1, 2)
        
        def runTest(func, name):
            PerformanceTestReporter.PerfTest(name)(func)()

        runTest(lambda: self.evaluateWithExecutor(f, 1000000, 10, 2), "pyfora.string_slicing_10mm.2_char_large_string.pyfora")
        runTest(lambda: self.evaluateWithExecutor(f, 1000000, 10, 200), "pyfora.string_slicing_10mm.200_char_large_string.pyfora")
        runTest(lambda: self.evaluateWithExecutor(f, 10000, 1000, 2), "pyfora.string_slicing_10mm.2_char_small_string.pyfora")
        runTest(lambda: self.evaluateWithExecutor(f, 10000, 1000, 200), "pyfora.string_slicing_10mm.200_char_small_string.pyfora")
        
        sys.setcheckinterval(100000)

        runTest(lambda: f(1000000, 10, 2), "pyfora.string_slicing_10mm.2_char_large_string.native")
        runTest(lambda: f(1000000, 10, 200), "pyfora.string_slicing_10mm.200_char_large_string.native")
        runTest(lambda: f(10000, 1000, 2), "pyfora.string_slicing_10mm.2_char_small_string.native")
        runTest(lambda: f(10000, 1000, 200), "pyfora.string_slicing_10mm.200_char_small_string.native")
        
        sys.setcheckinterval(100)
开发者ID:WantonSoup,项目名称:ufora,代码行数:28,代码来源:StringTestCases.py


示例3: tidy_up

 def tidy_up():
     """
     Put the bytecode back how it was. Good as new.
     """
     sys.setcheckinterval(old_check_interval)
     for i in xrange(3):
         pb[where+i][0] = orig_bytes[i]
开发者ID:akubera,项目名称:rootpy,代码行数:7,代码来源:magic.py


示例4: __init__

 def __init__(self):
     threading.Thread.__init__(self)
     self.myOverlay = None
     sys.setcheckinterval(25)
     self.chanlist = ChannelList()
     self.paused = False
     self.fullUpdating = True
开发者ID:PseudoTV,项目名称:PseudoTV_Live,代码行数:7,代码来源:ChannelListThread.py


示例5: main

def main():
    """Main entry point for the application"""
    # Prepare for mantid import
    prepare_mantid_env()

    # todo: parse command arguments

    # general initialization
    app = initialize()
    # the default sys check interval leads to long lags
    # when request scripts to be aborted
    sys.setcheckinterval(SYSCHECK_INTERVAL)
    main_window = None
    try:
        main_window = start_workbench(app)
    except BaseException:
        # We count this as a crash
        import traceback
        # This is type of thing we want to capture and have reports
        # about. Prints to stderr as we can't really count on anything
        # else
        traceback.print_exc(file=ORIGINAL_STDERR)

    if main_window is None:
        # An exception occurred don't exit here
        return

    ORIGINAL_SYS_EXIT()
开发者ID:DanNixon,项目名称:mantid,代码行数:28,代码来源:mainwindow.py


示例6: setup

def setup(options):
    sys.setcheckinterval(options.check_interval)

    zope.app.appsetup.product.setProductConfigurations(
        options.product_config)
    options.eventlog()
    options.accesslog()
    for logger in options.loggers:
        logger()

    features = ('zserver',)
    # Provide the devmode, if activated
    if options.devmode:
        features += ('devmode',)
        logging.warning("Developer mode is enabled: this is a security risk "
            "and should NOT be enabled on production servers. Developer mode "
            "can be turned off in etc/zope.conf")

    zope.app.appsetup.config(options.site_definition, features=features)

    db = zope.app.appsetup.appsetup.multi_database(options.databases)[0][0]

    notify(zope.processlifetime.DatabaseOpened(db))

    task_dispatcher = ThreadedTaskDispatcher()
    task_dispatcher.setThreadCount(options.threads)

    for server in options.servers:
        server.create(task_dispatcher, db)

    notify(zope.processlifetime.ProcessStarting())

    return db
开发者ID:zopefoundation,项目名称:zope.app.server,代码行数:33,代码来源:main.py


示例7: xdump

def xdump(path, show_scheme=True, show_data=True):
    # print "query_res " + str(xquery_res)
    xobj, scheme, ret_type = list_path(path)
    if xobj is None:
        return None
    if ret_type == "DIR":
        ret_fields = [['dir']]
        for (son_dir_name, son_dir) in xobj.items():
            ret_fields.append([add_cross_if_dir(son_dir_name, son_dir)])
        return ret_fields
    ret_fields = list()
    if show_scheme:
        ret_fields.append(list(scheme.keys()))
    if ret_type == "LOGS":
        ret_fields.extend(xobj)
        return ret_fields
    def_interval = sys.getcheckinterval()
    # TODO: maybe copy before and no need to lock?
    sys.setcheckinterval(1000000000)
    try:
        ret_fields.extend(decompose_fields(xobj,
                                           show_scheme=False,
                                           show_data=show_data))
    except Exception as e:
        raise e
    finally:
        sys.setcheckinterval(def_interval)
    return ret_fields
开发者ID:kfir-drivenets,项目名称:pyxray,代码行数:28,代码来源:xnode.py


示例8: test_string_slicing_into_vector

    def test_string_slicing_into_vector(self):
        def testFunction(ct, passCt,chars):
            x = "asdfasdf" * (ct / 8)
            res = 0
            for _ in xrange(passCt):
                v = [x[ix*chars:ix*chars+chars] for ix in xrange(len(x) / chars)]
                for e in v:
                    res = res + len(e)
            return res
        f = testFunction

        self.evaluateWithExecutor(f, 1000000, 1, 2)
        self.evaluateWithExecutor(f, 10000, 1, 2)
        
        def runTest(func, name):
            PerformanceTestReporter.PerfTest(name)(func)()

        runTest(lambda: self.evaluateWithExecutor(f, 1000000, 10, 2), "pyfora.string_slicing_into_vector_10mm.2_char_large_string.pyfora")
        runTest(lambda: self.evaluateWithExecutor(f, 1000000, 1000, 200), "pyfora.string_slicing_into_vector_10mm.200_char_large_string.pyfora")
        runTest(lambda: self.evaluateWithExecutor(f, 10000, 1000, 2), "pyfora.string_slicing_into_vector_10mm.2_char_small_string.pyfora")
        runTest(lambda: self.evaluateWithExecutor(f, 10000, 100000, 200), "pyfora.string_slicing_into_vector_10mm.200_char_small_string.pyfora")
        
        sys.setcheckinterval(100000)

        runTest(lambda: f(1000000, 10, 2), "pyfora.string_slicing_into_vector_10mm.2_char_large_string.native")
        runTest(lambda: f(1000000, 1000, 200), "pyfora.string_slicing_into_vector_10mm.200_char_large_string.native")
        runTest(lambda: f(10000, 1000, 2), "pyfora.string_slicing_into_vector_10mm.2_char_small_string.native")
        runTest(lambda: f(10000, 100000, 200), "pyfora.string_slicing_into_vector_10mm.200_char_small_string.native")
        
        sys.setcheckinterval(100)
开发者ID:WantonSoup,项目名称:ufora,代码行数:30,代码来源:StringTestCases.py


示例9: __init__

 def __init__(self):
     threading.Thread.__init__(self)
     self.myOverlay = None
     self.shouldExit = False
     sys.setcheckinterval(25)
     self.chanlist = ChannelList()
     self.chanlist.sleepTime = 0.1
开发者ID:PseudoTV,项目名称:PseudoTV_Archive,代码行数:7,代码来源:ChannelListThread.py


示例10: inject_jump

    def inject_jump(self, where, dest):
        """
        Monkeypatch bytecode at ``where`` to force it to jump to ``dest``.

        Returns function which puts things back how they were.
        """
        # We're about to do dangerous things to a functions code content.
        # We can't make a lock to prevent the interpreter from using those
        # bytes, so the best we can do is to set the check interval to be high
        # and just pray that this keeps other threads at bay.
        old_check_interval = sys.getcheckinterval()
        sys.setcheckinterval(2**20)

        pb = ctypes.pointer(self.ob_sval)
        orig_bytes = [pb[where+i][0] for i in xrange(where)]

        v = struct.pack("<BH", opcode.opmap["JUMP_ABSOLUTE"], dest)

        # Overwrite code to cause it to jump to the target
        for i in xrange(3):
            pb[where+i][0] = ord(v[i])

        def tidy_up():
            """
            Put the bytecode back how it was. Good as new.
            """
            sys.setcheckinterval(old_check_interval)
            for i in xrange(3):
                pb[where+i][0] = orig_bytes[i]

        return tidy_up
开发者ID:akubera,项目名称:rootpy,代码行数:31,代码来源:magic.py


示例11: test_setcheckinterval

 def test_setcheckinterval(self):
     import sys
     raises(TypeError, sys.setcheckinterval)
     orig = sys.getcheckinterval()
     for n in 0, 100, 120, orig: # orig last to restore starting state
         sys.setcheckinterval(n)
         assert sys.getcheckinterval() == n
开发者ID:yuyichao,项目名称:pypy,代码行数:7,代码来源:test_sysmodule.py


示例12: preprocessPercentileRatios

 def preprocessPercentileRatios(self):
     print "preprocessPercentileRatios start"
     distributionsFile = self.getDatasetSlidingSizesFile()
     if os.path.isfile(distributionsFile):
         #Teh distributions file is processed
         print "The distribution file exists"
         return
     self.initializePropertiesComputeStructures(False)
     
     print "computing the ratios"
     try:
         zpa = zipfile.ZipFile(distributionsFile,"w",zipfile.ZIP_DEFLATED)
         zpa.writestr("dummy.txt","dummy file")
         zpa.close()
         self.ziplock = threading.Lock()
         # Create a pool with three worker threads
         pool = ThreadPool.ThreadPool(5)  
         sys.setcheckinterval(1000)          
         for windowSize in self.slidingWindowSizes:
             if self.useWindowThreading:
                 pool.queueTask(self.preprocessWindowSize, windowSize, None)
             else:
                 self.preprocessWindowSize(windowSize)
         # When all tasks are finished, allow the threads to terminate
         if self.useWindowThreading:
             print "Joining the threads"
             pool.joinAll()             
     except:            
         os.unlink(distributionsFile)
         raise
     print "preprocessPercentileRatios end"
开发者ID:kosio,项目名称:EpiExplorer,代码行数:31,代码来源:NIHEpigenomeWig.py


示例13: test_setcheckinterval

 def test_setcheckinterval(self):
     with warnings.catch_warnings():
         warnings.simplefilter("ignore")
         self.assertRaises(TypeError, sys.setcheckinterval)
         orig = sys.getcheckinterval()
         for n in 0, 100, 120, orig: # orig last to restore starting state
             sys.setcheckinterval(n)
             self.assertEqual(sys.getcheckinterval(), n)
开发者ID:MaximVanyushkin,项目名称:Sharp.RemoteQueryable,代码行数:8,代码来源:test_sys.py


示例14: _collect

 def _collect(self):
     gc.collect()
     check_interval = sys.getcheckinterval()
     sys.setcheckinterval(sys.maxint)
     try:
         return {id(object) for object in gc.get_objects() if not isinstance(object, EXCLUDE_TYPES)}
     finally:
         sys.setcheckinterval(check_interval)
开发者ID:VDOMBoxGroup,项目名称:runtime2.0,代码行数:8,代码来源:snapshots.py


示例15: test_setcheckinterval

 def test_setcheckinterval(self):
     if test.test_support.due_to_ironpython_bug("http://tkbgitvstfat01:8080/WorkItemTracking/WorkItem.aspx?artifactMoniker=148342"):
         return
     self.assertRaises(TypeError, sys.setcheckinterval)
     orig = sys.getcheckinterval()
     for n in 0, 100, 120, orig: # orig last to restore starting state
         sys.setcheckinterval(n)
         self.assertEquals(sys.getcheckinterval(), n)
开发者ID:BillyboyD,项目名称:main,代码行数:8,代码来源:test_sys.py


示例16: doStatProf

def doStatProf(*args):
    import bench.statprof

    sys.setcheckinterval(0)
    bench.statprof.start()
    runLimited(*args)
    bench.statprof.stop()
    bench.statprof.display()
开发者ID:siebenmann,项目名称:dwiki,代码行数:8,代码来源:testbench.py


示例17: short_checkinterval

def short_checkinterval(request):
    """
    Sets a small interval using sys.setcheckinterval to cause many context
    switches.
    """
    old_interval = sys.getcheckinterval()
    sys.setcheckinterval(0)
    request.addfinalizer(lambda: sys.setcheckinterval(old_interval))
开发者ID:Bluehorn,项目名称:calltrace,代码行数:8,代码来源:test_current_frames.py


示例18: run

    def run(self):
        pthread_setname_np(self.ident, "Manhole ----")

        client = self.client
        client.settimeout(None)
        pid, uid, gid = get_peercred(client)
        euid = os.geteuid()
        client_name = "PID:%s UID:%s GID:%s" % (pid, uid, gid)
        if uid not in (0, euid):
            raise SuspiciousClient(
                "Can't accept client with %s. "
                "It doesn't match the current EUID:%s or ROOT." % (
                    client_name, euid
            ))

        cry("Accepted connection %s from %s" % (client, client_name))
        pthread_setname_np(self.ident, "Manhole %s" % pid)
        client.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 0)
        client.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 0)
        backup = []
        try:
            client_fd = client.fileno()
            for mode, names in (
                ('w', (
                    'stderr',
                    'stdout',
                    '__stderr__',
                    '__stdout__'
                )),
                ('r', (
                    'stdin',
                    '__stdin__'
                ))
            ):
                for name in names:
                    backup.append((name, getattr(sys, name)))
                    setattr(sys, name, os.fdopen(client_fd, mode, 0))

            run_repl()
            cry("DONE.")
        finally:
            cry("Cleaning up.")
            old_interval = sys.getcheckinterval()
            sys.setcheckinterval(2147483647)
            junk = [] # keep the old file objects alive for a bit
            for name, fh in backup:
                junk.append(getattr(sys, name))
                setattr(sys, name, fh)
            del backup
            for fh in junk:
                try:
                    fh.close()
                except IOError:
                    pass
                del fh
            del junk
            self.client = None
            sys.setcheckinterval(old_interval)
开发者ID:knzm,项目名称:python-manhole,代码行数:58,代码来源:manhole.py


示例19: thread_func

 def thread_func(lock, acquire_array, lock_array, index):
     gevent.sleep(random.uniform(0, 0.1))
     # We use the fact that the GIL prevents CPython from context switching between the acquire_array.append()
     # and the lock.acquire() when we set the check interval to a great value
     sys.setcheckinterval(10000000)
     acquire_array.append(index)
     with lock:
         sys.setcheckinterval(1)
         lock_array.append(index)
开发者ID:yalon,项目名称:gevent,代码行数:9,代码来源:test__native_friendly_rlock.py


示例20: test_trashcan_threads

    def test_trashcan_threads(self):
        # Issue #13992: trashcan mechanism should be thread-safe
        NESTING = 60
        N_THREADS = 2

        def sleeper_gen():
            """A generator that releases the GIL when closed or dealloc'ed."""
            try:
                yield
            finally:
                time.sleep(0.000001)

        class C(list):
            # Appending to a list is atomic, which avoids the use of a lock.
            inits = []
            dels = []
            def __init__(self, alist):
                self[:] = alist
                C.inits.append(None)
            def __del__(self):
                # This __del__ is called by subtype_dealloc().
                C.dels.append(None)
                # `g` will release the GIL when garbage-collected.  This
                # helps assert subtype_dealloc's behaviour when threads
                # switch in the middle of it.
                g = sleeper_gen()
                next(g)
                # Now that __del__ is finished, subtype_dealloc will proceed
                # to call list_dealloc, which also uses the trashcan mechanism.

        def make_nested():
            """Create a sufficiently nested container object so that the
            trashcan mechanism is invoked when deallocating it."""
            x = C([])
            for i in range(NESTING):
                x = [C([x])]
            del x

        def run_thread():
            """Exercise make_nested() in a loop."""
            while not exit:
                make_nested()

        old_checkinterval = sys.getcheckinterval()
        sys.setcheckinterval(3)
        try:
            exit = []
            threads = []
            for i in range(N_THREADS):
                t = threading.Thread(target=run_thread)
                threads.append(t)
            with start_threads(threads, lambda: exit.append(1)):
                time.sleep(1.0)
        finally:
            sys.setcheckinterval(old_checkinterval)
        gc.collect()
        self.assertEqual(len(C.inits), len(C.dels))
开发者ID:AtomicConductor,项目名称:conductor_client,代码行数:57,代码来源:test_gc.py



注:本文中的sys.setcheckinterval函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python sys.setdefaultencoding函数代码示例发布时间:2022-05-27
下一篇:
Python sys.intern函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap