• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python signal.setitimer函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中signal.setitimer函数的典型用法代码示例。如果您正苦于以下问题:Python setitimer函数的具体用法?Python setitimer怎么用?Python setitimer使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了setitimer函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: getch

def getch(timeout=0.1):
    """Returns single character of raw input or '' if nothing after timeout"""
    def _handle_timeout(signum, frame):
        raise TimeoutError()

    def _getch():
        try:
            fd = sys.stdin.fileno()
            old_settings = termios.tcgetattr(fd)
            try:
                tty.setraw(sys.stdin.fileno())
                ch = sys.stdin.read(1)
            finally:
                termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
            return ch
        except TimeoutError:
            return ''

    signal.signal(signal.SIGALRM, _handle_timeout)
    signal.setitimer(signal.ITIMER_REAL, timeout)
    try:
        result = _getch()
    finally:
        signal.alarm(0)
    return result
开发者ID:Gr3yR0n1n,项目名称:silverlining,代码行数:25,代码来源:utils.py


示例2: test_eintr_zero_timeout

def test_eintr_zero_timeout(wfs, spair):
    a, b = spair
    interrupt_count = [0]

    def handler(sig, frame):
        assert sig == signal.SIGALRM
        interrupt_count[0] += 1

    old_handler = signal.signal(signal.SIGALRM, handler)
    try:
        assert not wfs(a, read=True, timeout=0)
        try:
            # Start delivering SIGALRM 1000 times per second,
            # to trigger race conditions such as
            # https://github.com/urllib3/urllib3/issues/1396.
            signal.setitimer(signal.ITIMER_REAL, 0.001, 0.001)
            # Hammer the system call for a while to trigger the
            # race.
            for i in range(100000):
                wfs(a, read=True, timeout=0)
        finally:
            # Stop delivering SIGALRM
            signal.setitimer(signal.ITIMER_REAL, 0)
    finally:
        signal.signal(signal.SIGALRM, old_handler)

    assert interrupt_count[0] > 0
开发者ID:christophermca,项目名称:dotfiles,代码行数:27,代码来源:test_wait.py


示例3: _reload

def _reload():
    global _reload_attempted
    _reload_attempted = True
    for fn in _reload_hooks:
        fn()
    if hasattr(signal, "setitimer"):
        # Clear the alarm signal set by
        # ioloop.set_blocking_log_threshold so it doesn't fire
        # after the exec.
        signal.setitimer(signal.ITIMER_REAL, 0, 0)
    if sys.platform == 'win32':
        # os.execv is broken on Windows and can't properly parse command line
        # arguments and executable name if they contain whitespaces. subprocess
        # fixes that behavior.
        subprocess.Popen([sys.executable] + sys.argv)
        sys.exit(0)
    else:
        try:
            os.execv(sys.executable, [sys.executable] + sys.argv)
        except OSError:
            # Mac OS X versions prior to 10.6 do not support execv in
            # a process that contains multiple threads.  Instead of
            # re-executing in the current process, start a new one
            # and cause the current process to exit.  This isn't
            # ideal since the new process is detached from the parent
            # terminal and thus cannot easily be killed with ctrl-C,
            # but it's better than not being able to autoreload at
            # all.
            # Unfortunately the errno returned in this case does not
            # appear to be consistent, so we can't easily check for
            # this error specifically.
            os.spawnv(os.P_NOWAIT, sys.executable,
                      [sys.executable] + sys.argv)
            sys.exit(0)
开发者ID:Akylas,项目名称:CouchPotatoServer,代码行数:34,代码来源:autoreload.py


示例4: handler

 def handler(signum=None, frame=None):
     if len(times) < N:
         times.append(time.perf_counter())
         # 1 µs is the smallest possible timer interval,
         # we want to measure what the concrete duration
         # will be on this platform
         signal.setitimer(signal.ITIMER_REAL, 1e-6)
开发者ID:Eyepea,项目名称:cpython,代码行数:7,代码来源:test_signal.py


示例5: test_stress_delivery_simultaneous

    def test_stress_delivery_simultaneous(self):
        """
        This test uses simultaneous signal handlers.
        """
        N = self.decide_itimer_count()
        sigs = []

        def handler(signum, frame):
            sigs.append(signum)

        self.setsig(signal.SIGUSR1, handler)
        self.setsig(signal.SIGALRM, handler)  # for ITIMER_REAL

        expected_sigs = 0
        deadline = time.monotonic() + 15.0

        while expected_sigs < N:
            # Hopefully the SIGALRM will be received somewhere during
            # initial processing of SIGUSR1.
            signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5)
            os.kill(os.getpid(), signal.SIGUSR1)

            expected_sigs += 2
            # Wait for handlers to run to avoid signal coalescing
            while len(sigs) < expected_sigs and time.monotonic() < deadline:
                time.sleep(1e-5)

        # All ITIMER_REAL signals should have been delivered to the
        # Python handler
        self.assertEqual(len(sigs), N, "Some signals were lost")
开发者ID:Eyepea,项目名称:cpython,代码行数:30,代码来源:test_signal.py


示例6: main

def main():
    pipe_r, pipe_w = os.pipe()
    flags = fcntl.fcntl(pipe_w, fcntl.F_GETFL, 0)
    flags = flags | os.O_NONBLOCK
    fcntl.fcntl(pipe_w, fcntl.F_SETFL, flags)
    
    signal.signal(signal.SIGCHLD, lambda x,y: None)
    signal.signal(signal.SIGALRM, lambda x,y: None)
    signal.siginterrupt(signal.SIGCHLD,False) #makes no difference
    signal.siginterrupt(signal.SIGALRM,False) #makes no difference
    signal.set_wakeup_fd(pipe_w)
    signal.setitimer(signal.ITIMER_REAL, 2, 2)
    
    poller = select.epoll()
    poller.register(pipe_r, select.EPOLLIN)
    poller.register(sys.stdin, select.EPOLLIN)
    
    print "Main screen turn on"
    while True:
        events=[]
        try:
            events = poller.poll()
            try:
                for fd, flags in events:
                    ch=os.read(fd, 1)
                    if fd==pipe_r:
                        sys.stdout.write( "We get Signal" )
                    if fd==sys.stdin.fileno():
                        sys.stdout.write( ch )
                    sys.stdout.flush()
            except IOError as e:
                print "exception loop" + str(e)
        except IOError as e:
            print "exception poll" + str(e)
开发者ID:senseiafpa,项目名称:clonage_a_chaud,代码行数:34,代码来源:test-jo.py


示例7: _send

    def _send(self, command, retries=5, timeout=100):
        fd = self._fd
        if len(command) != 33:
            raise ValueError("command must be 33 bytes long")
        handler = signal.signal(signal.SIGALRM, _TimeoutError.timeout)
        for attempt in range(retries):
            signal.setitimer(signal.ITIMER_REAL, timeout/1000.0)
            try:
                if LOG.isEnabledFor(logging.DEBUG):
                    LOG.debug("Write: {}", hexlify(command[1:]))
                fd.write(command)
                fd.flush()
                reply = bytearray(fd.read(32))
                if LOG.isEnabledFor(logging.DEBUG):
                    LOG.debug("Recv: {}", hexlify(reply))
                signal.setitimer(signal.ITIMER_REAL, 0)
                if reply[0] != command[1]:
                    msg = "Expected msg type {} but got {}"
                    raise IOError(msg.format(command[1], reply[0]))
                return reply[1:]
            except _TimeoutError:
                print("IO timed out, try #%d." % attempt)
#                time.sleep(0.000001)
            finally:
                signal.signal(signal.SIGALRM, handler)
        msg = "Gving up on PlasmaTrim {}"
        raise IOError(msg.format(self))
开发者ID:grantjbutler,项目名称:Radiance,代码行数:27,代码来源:plasmatrim.py


示例8: open

def open(name, inittime):
	#extend the commond on the condition of the type of the program
	execmd = "";
	if (name[-1] == "y"):
		execmd = "python " + name;
	if (name[-1] == "e"):
		execmd = "./" + name;
	if (name[-1] == 's'):
		execmd = "java " + name;
	#try to init the popen of the program
	try:
		p = subprocess.Popen(execmd, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE, shell = False);
	except:
		print "init can't find program"
		return None;
	#try to run the program if timeout colose the program
	success = True;
	try:
		signal.signal(signal.SIGALRM, open);
		signal.setitimer(signal.ITIMER_REAL, inittime);
		p.stdout.readline(),
		print "init success"
	except:
		success = False;
		print "init timeout"
	finally:
		signal.setitimer(signal.ITIMER_REAL, 0);
	if (success == False):
		p.kill();
		return None;
	else:
		return p;
开发者ID:MetisAI,项目名称:MetisAIPlatform,代码行数:32,代码来源:main.py


示例9: run

    def run(self):
        """When the consumer is ready to start running, kick off all of our
        consumer consumers and then loop while we process messages.

        """
        self.set_state(self.STATE_ACTIVE)
        self.setup_consumers()

        # Set the SIGCHLD handler for child creation errors
        signal.signal(signal.SIGCHLD, self.on_sigchld)

        # Set the SIGALRM handler for poll interval
        signal.signal(signal.SIGALRM, self.on_timer)

        # Kick off the poll timer
        signal.setitimer(signal.ITIMER_REAL, self.poll_interval, 0)

        # Loop for the lifetime of the app, pausing for a signal to pop up
        while self.is_running:
            if not self.is_sleeping:
                self.set_state(self.STATE_SLEEPING)
            signal.pause()

        # Note we're exiting run
        LOGGER.info('Exiting Master Control Program')
开发者ID:gmr,项目名称:rejected,代码行数:25,代码来源:mcp.py


示例10: stop

    def stop(self):
        """Shutdown the MCP and child processes cleanly"""
        LOGGER.info('Shutting down controller')
        self.set_state(self.STATE_STOP_REQUESTED)

        # Clear out the timer
        signal.setitimer(signal.ITIMER_PROF, 0, 0)

        self._mcp.stop_processes()

        if self._mcp.is_running:
            LOGGER.info('Waiting up to 3 seconds for MCP to shut things down')
            signal.setitimer(signal.ITIMER_REAL, 3, 0)
            signal.pause()
            LOGGER.info('Post pause')

        # Force MCP to stop
        if self._mcp.is_running:
            LOGGER.warning('MCP is taking too long, requesting process kills')
            self._mcp.stop_processes()
            del self._mcp
        else:
            LOGGER.info('MCP exited cleanly')

        # Change our state
        self._stopped()
        LOGGER.info('Shutdown complete')
开发者ID:gmr,项目名称:rejected,代码行数:27,代码来源:controller.py


示例11: read

def read(judge):
	nextstep = judge.nextstep;
	p = judge.popen[nextstep];
	#test if the program is inited succefully
	if (p == None):
		print "read: can't find program reading stop"
		return None;
	if (judge.info[nextstep] == None):
		info = "";
	else:
		info = judge.info[nextstep];
	success = True;
	print info
	#try communicate with the program
	try:
		signal.signal(signal.SIGALRM, read);
		signal.setitimer(signal.ITIMER_REAL, judge.runtime);
		p.stdin.write(info);
		p.stdin.flush();
		s = p.stdout.readline(),
		print "read: success"
	except:
		success = False;
		print "read: timeout try:",
	finally:
		signal.setitimer(signal.ITIMER_REAL, 0);
	#if the program can't return an answer
	#the program will be killed and restarted
	if (success == False):
		p.kill();
		judge.popen[nextstep] = open(judge.name[nextstep], judge.inittime);
		return None;	
	return s;
开发者ID:MetisAI,项目名称:MetisAIPlatform,代码行数:33,代码来源:main.py


示例12: setup_reporter

def setup_reporter(processor, arguments):
    if arguments['--no-follow']:
        return

    global LOGGING_SAMPLES
    if LOGGING_SAMPLES is None:
        scr = curses.initscr()
        atexit.register(curses.endwin)

    def print_report(sig, frame):
        global LOGGING_SAMPLES

        output = processor.report()
        if LOGGING_SAMPLES is None:
            scr.erase()

            try:
                scr.addstr(output)
            except curses.error:
                pass

            scr.refresh()
        else:
            print(output)
            LOGGING_SAMPLES -= 1
            if LOGGING_SAMPLES == 0:
                sys.exit(0)

    signal.signal(signal.SIGALRM, print_report)
    interval = float(arguments['--interval'])
    signal.setitimer(signal.ITIMER_REAL, 0.1, interval)
开发者ID:wujs,项目名称:ngxtop_rtmp_hls,代码行数:31,代码来源:ngxtop.py


示例13: start

 def  start(self):
     'start sampling interrupts'
     if self.started:
         return
     self.started = True
     self.rollback = signal.signal(self.signal, self._resample)
     signal.setitimer(self.which, 0.01 * (1 + random.random()))
开发者ID:doublereedkurt,项目名称:sampro,代码行数:7,代码来源:sampro.py


示例14: run

    def run(self):
        ctx = zmq.Context()
        sock = ctx.socket(zmq.PAIR)
        sock.connect(IPC_ADDR)

        # Set up interval timer with a dummy handler
        signal.signal(signal.SIGALRM, self._dummyHandler)
        signal.setitimer(signal.ITIMER_REAL, self.interval, self.interval)

        while True:
            # Wait for the timer
            signal.pause()
            try:
                # Non-blocking recv
                (comm, data) = sock.recv_multipart(flags=zmq.NOBLOCK)
            except zmq.Again:
                # No data ready, go back to wait for the timer
                continue

            # Handling command
            if comm == b"say":
                print("You want me to say: {}".format(data))
            elif comm == b"quit":
                print("Goodbye!")
                break
开发者ID:johnliu55tw,项目名称:PythonMisc,代码行数:25,代码来源:IntervalRunner.py


示例15: main

def main():
    global options
    if os.geteuid() != 0:
        sys.stderr.write("ping uses RAW SOCKETs therefore must running with root privilegies\n")
        sys.exit(-1)

    options = parse_options(sys.argv[1:])
    signal.signal(signal.SIGALRM, signanl_handler)

    try:
        sockfd = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW)
        sockfd.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, True)

        sockfd_r = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)

        options.sockfd = sockfd
        options.packet_id = os.getpid() % 0xFFFFFF
        options.destination = socket.gethostbyname(options.destination)
        options.sequence_number = 0

        print("PING %s, interval %d, packed identifier %d" % (options.destination, options.i, options.packet_id))
        signal.setitimer(signal.ITIMER_REAL, options.i, options.i)
        while True:
            receive_ip_packet(sockfd_r, options.packet_id)

    except socket.error as e:
        sys.stderr.write("Exception: " + e.strerror + '\n')
开发者ID:denis4net,项目名称:lanssw,代码行数:27,代码来源:hackable_ping.py


示例16: _sample

 def _sample(self, _, _2):
     for frame in sys._current_frames().values():
         formatted_stack = self._format_stack(frame)
         if formatted_stack:
             self.stack_counts[formatted_stack] += 1
     if self._started:
         signal.setitimer(signal.ITIMER_PROF, self.interval, 0)
开发者ID:dls-controls,项目名称:pymalcolm,代码行数:7,代码来源:profilingsampler.py


示例17: setUpClass

    def setUpClass(cls):
        cls.orig_handler = signal.signal(signal.SIGALRM, lambda *args: None)
        signal.setitimer(signal.ITIMER_REAL, cls.signal_delay,
                         cls.signal_period)

        # Issue #25277: Use faulthandler to try to debug a hang on FreeBSD
        faulthandler.dump_traceback_later(10 * 60, exit=True)
开发者ID:janetizzy,项目名称:cpython,代码行数:7,代码来源:eintr_tester.py


示例18: profile_signal_handler

def profile_signal_handler(signum, frame):
    if state.profile_level > 0:
        state.accumulate_time(clock())
        sample_stack_procs(frame)
        signal.setitimer(signal.ITIMER_PROF,
            state.sample_interval, 0.0)
        state.last_start_time = clock()
开发者ID:jedbrown,项目名称:statprof.py,代码行数:7,代码来源:statprof.py


示例19: reload_server

def reload_server(io_loop):
    # wait for other files to be written
    time.sleep(0.5)
    for fd in io_loop._handlers.keys():
        try:
            os.close(fd)
        except:
            pass
    if hasattr(signal, "setitimer"):
        # Clear the alarm signal set by
        # ioloop.set_blocking_log_threshold so it doesn't fire
        # after the exec.
        signal.setitimer(signal.ITIMER_REAL, 0, 0)
    try:
        os.execv(sys.executable, [sys.executable] + sys.argv)
    except OSError:
        # Mac OS X versions prior to 10.6 do not support execv in
        # a process that contains multiple threads.  Instead of
        # re-executing in the current process, start a new one
        # and cause the current process to exit.  This isn't
        # ideal since the new process is detached from the parent
        # terminal and thus cannot easily be killed with ctrl-C,
        # but it's better than not being able to autoreload at
        # all.
        # Unfortunately the errno returned in this case does not
        # appear to be consistent, so we can't easily check for
        # this error specifically.
        os.spawnv(os.P_NOWAIT, sys.executable, [sys.executable] + sys.argv)
        sys.exit(0)
开发者ID:menghan,项目名称:menghan-pool,代码行数:29,代码来源:run_wsgi.py


示例20: start

    def start(self):
        segment = self.segment or AUDIO_SEGMENT_LENGTH
        self.num_frames = int(RATE / FRAMES_PER_BUFFER * segment)
        if self.seconds:
            signal.setitimer(signal.ITIMER_REAL, self.seconds)
        if self.verbose:
            self._timer = time.time()
        if self.collect:
            print 'Collecting RMS values...'
        if self.action:
            # Interpret threshold
            self.get_threshold()

        try:
            self.is_running = True
            while not self._graceful:
                self.record()  # Record stream in `AUDIO_SEGMENT_LENGTH' long
                data = self.output.getvalue()
                segment = pydub.AudioSegment(data)
                rms = segment.rms
                if self.collect:
                    self.collect_rms(rms)
                self.meter(rms)
                if self.action:
                    if self.is_triggered(rms):
                        self.execute()
                self.monitor(rms)
            self.is_running = False
            self.stop()

        except self.__class__.StopException:
            self.is_running = False
            self.stop()
开发者ID:kingjason,项目名称:soundmeter,代码行数:33,代码来源:meter.py



注:本文中的signal.setitimer函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python signal.siginterrupt函数代码示例发布时间:2022-05-27
下一篇:
Python signal.set_wakeup_fd函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap