• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python time.monotonic函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中time.monotonic函数的典型用法代码示例。如果您正苦于以下问题:Python monotonic函数的具体用法?Python monotonic怎么用?Python monotonic使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了monotonic函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: incoming_telemetry

 def incoming_telemetry(self, label, telemetry: TelemetryInfo):
     self.telemetries[label] = telemetry
     self.timestamps[label] = time.monotonic()
     self.emit('telemetry', label, telemetry)
     if (time.monotonic() - self._last_overall_telemetry_emit) > self.state['overall_telemetry_interval']:
         self.emit('telemetry', None, self.get_telemetry(None))
         self._last_overall_telemetry_emit = time.monotonic()
开发者ID:awacha,项目名称:cct,代码行数:7,代码来源:telemetry.py


示例2: test_eintr

def test_eintr(wfs, spair):
    a, b = spair
    interrupt_count = [0]

    def handler(sig, frame):
        assert sig == signal.SIGALRM
        interrupt_count[0] += 1

    old_handler = signal.signal(signal.SIGALRM, handler)
    try:
        assert not wfs(a, read=True, timeout=0)
        start = monotonic()
        try:
            # Start delivering SIGALRM 10 times per second
            signal.setitimer(signal.ITIMER_REAL, 0.1, 0.1)
            # Sleep for 1 second (we hope!)
            wfs(a, read=True, timeout=1)
        finally:
            # Stop delivering SIGALRM
            signal.setitimer(signal.ITIMER_REAL, 0)
        end = monotonic()
        dur = end - start
        assert 0.9 < dur < 3
    finally:
        signal.signal(signal.SIGALRM, old_handler)

    assert interrupt_count[0] > 0
开发者ID:christophermca,项目名称:dotfiles,代码行数:27,代码来源:test_wait.py


示例3: go

    def go():
        router_closed = asyncio.Future()
        dealer_closed = asyncio.Future()
        router, _ = yield from loop.create_zmq_connection(
            lambda: ZmqRouterProtocol(router_closed),
            zmq.ROUTER,
            bind='tcp://127.0.0.1:*')

        addr = next(iter(router.bindings()))
        dealer, _ = yield from loop.create_zmq_connection(
            lambda: ZmqDealerProtocol(count, dealer_closed),
            zmq.DEALER,
            connect=addr)

        msg = b'func', b'\0'*200

        gc.collect()
        t1 = time.monotonic()
        dealer.write(msg)
        yield from dealer_closed
        t2 = time.monotonic()
        gc.collect()
        router.close()
        yield from router_closed
        return t2 - t1
开发者ID:waytai,项目名称:aiozmq,代码行数:25,代码来源:simple.py


示例4: test_call_later_1

    def test_call_later_1(self):
        calls = []

        def cb(inc=10, stop=False):
            calls.append(inc)
            self.assertTrue(self.loop.is_running())
            if stop:
                self.loop.call_soon(self.loop.stop)

        self.loop.call_later(0.05, cb)

        # canceled right away
        h = self.loop.call_later(0.05, cb, 100, True)
        self.assertIn('.cb', repr(h))
        h.cancel()
        self.assertIn('cancelled', repr(h))

        self.loop.call_later(0.05, cb, 1, True)
        self.loop.call_later(1000, cb, 1000)  # shouldn't be called

        started = time.monotonic()
        self.loop.run_forever()
        finished = time.monotonic()

        self.assertLess(finished - started, 0.1)
        self.assertGreater(finished - started, 0.04)

        self.assertEqual(calls, [10, 1])

        self.assertFalse(self.loop.is_running())
开发者ID:complyue,项目名称:uvloop,代码行数:30,代码来源:test_base.py


示例5: debug_sql

 def debug_sql(self, sql=None, params=None, use_last_executed_query=False, many=False):
     start = time.monotonic()
     try:
         yield
     finally:
         stop = time.monotonic()
         duration = stop - start
         if use_last_executed_query:
             sql = self.db.ops.last_executed_query(self.cursor, sql, params)
         try:
             times = len(params) if many else ''
         except TypeError:
             # params could be an iterator.
             times = '?'
         self.db.queries_log.append({
             'sql': '%s times: %s' % (times, sql) if many else sql,
             'time': '%.3f' % duration,
         })
         logger.debug(
             '(%.3f) %s; args=%s',
             duration,
             sql,
             params,
             extra={'duration': duration, 'sql': sql, 'params': params},
         )
开发者ID:MattBlack85,项目名称:django,代码行数:25,代码来源:utils.py


示例6: check_parallel_module_init

    def check_parallel_module_init(self, mock_os):
        if imp.lock_held():
            # This triggers on, e.g., from test import autotest.
            raise unittest.SkipTest("can't run when import lock is held")

        done = threading.Event()
        for N in (20, 50) * 3:
            if verbose:
                print("Trying", N, "threads ...", end=' ')
            # Make sure that random and modulefinder get reimported freshly
            for modname in ['random', 'modulefinder']:
                try:
                    del sys.modules[modname]
                except KeyError:
                    pass
            errors = []
            done_tasks = []
            done.clear()
            t0 = time.monotonic()
            with start_threads(threading.Thread(target=task,
                                                args=(N, done, done_tasks, errors,))
                               for i in range(N)):
                pass
            completed = done.wait(10 * 60)
            dt = time.monotonic() - t0
            if verbose:
                print("%.1f ms" % (dt*1e3), flush=True, end=" ")
            dbg_info = 'done: %s/%s' % (len(done_tasks), N)
            self.assertFalse(errors, dbg_info)
            self.assertTrue(completed, dbg_info)
            if verbose:
                print("OK.")
开发者ID:M31MOTH,项目名称:cpython,代码行数:32,代码来源:test_threaded_import.py


示例7: _get_spad_info

 def _get_spad_info(self):
     # Get reference SPAD count and type, returned as a 2-tuple of
     # count and boolean is_aperture.  Based on code from:
     #   https://github.com/pololu/vl53l0x-arduino/blob/master/VL53L0X.cpp
     for pair in ((0x80, 0x01), (0xFF, 0x01), (0x00, 0x00), (0xFF, 0x06)):
         self._write_u8(pair[0], pair[1])
     self._write_u8(0x83, self._read_u8(0x83) | 0x04)
     for pair in ((0xFF, 0x07), (0x81, 0x01), (0x80, 0x01),
                  (0x94, 0x6b), (0x83, 0x00)):
         self._write_u8(pair[0], pair[1])
     start = time.monotonic()
     while self._read_u8(0x83) == 0x00:
         if self.io_timeout_s > 0 and \
            (time.monotonic() - start) >= self.io_timeout_s:
             raise RuntimeError('Timeout waiting for VL53L0X!')
     self._write_u8(0x83, 0x01)
     tmp = self._read_u8(0x92)
     count = tmp & 0x7F
     is_aperture = ((tmp >> 7) & 0x01) == 1
     for pair in ((0x81, 0x00), (0xFF, 0x06)):
         self._write_u8(pair[0], pair[1])
     self._write_u8(0x83, self._read_u8(0x83) & ~0x04)
     for pair in ((0xFF, 0x01), (0x00, 0x01), (0xFF, 0x00), (0x80, 0x00)):
         self._write_u8(pair[0], pair[1])
     return (count, is_aperture)
开发者ID:eiselekd,项目名称:hw,代码行数:25,代码来源:adafruit_vl53l0x.py


示例8: run

    def run(self):
        while True:
            item = None
            try:
                item = self.work_queue.get(timeout=2)
            except(Empty):
                pass

            if item:
                self.transmit_buffer.append(item)
                self.last_time = time.monotonic()

            delta = time.monotonic() - self.last_time

            if self.work_queue.empty() and len(self.transmit_buffer) > 0:
                if (delta > 30) or (len(self.transmit_buffer) > 1):
                    self._transmit.set()

            if self._transmit.is_set() and self.work_queue.empty():
                self.transmit()

            if self.stopped():
                while len(self.transmit_buffer) > 0:
                    self.transmit()
                return
开发者ID:bradfier,项目名称:rockclock,代码行数:25,代码来源:transmitter.py


示例9: timer

def timer():
    tic = monotonic()
    toc = None
    try:
        yield lambda : toc - tic
    finally:
        toc = monotonic()
开发者ID:Lothiraldan,项目名称:pyzmq,代码行数:7,代码来源:collect.py


示例10: send_messages

    def send_messages(self, message_batch):
        if not self._init_es():
            return
        start_time = time.monotonic()
        try:
            actions = []
            for msg in message_batch:
                message = json.loads(msg.decode("utf8"))
                timestamp = message.get("timestamp")
                if "__REALTIME_TIMESTAMP" in message:
                    timestamp = datetime.datetime.utcfromtimestamp(message["__REALTIME_TIMESTAMP"])
                else:
                    timestamp = datetime.datetime.utcnow()

                message["timestamp"] = timestamp
                index_name = "{}-{}".format(self.index_name, datetime.datetime.date(timestamp))
                if index_name not in self.indices:
                    self.create_index_and_mappings(index_name)

                actions.append({
                    "_index": index_name,
                    "_type": "journal_msg",
                    "_source": message,
                })
            if actions:
                helpers.bulk(self.es, actions)
                self.log.debug("Sent %d log events to ES, took: %.2fs",
                               len(message_batch), time.monotonic() - start_time)
        except Exception as ex:  # pylint: disable=broad-except
            self.log.warning("Problem sending logs to ES: %r", ex)
            return False
        return True
开发者ID:aiven,项目名称:journalpump,代码行数:32,代码来源:journalpump.py


示例11: spin

    def spin(self, timeout=None):
        """
        Runs background processes until timeout expires.
        Note that all processing is implemented in one thread.
        :param timeout: The method will return once this amount of time expires.
                        If None, the method will never return.
                        If zero, the method will handle only those events that are ready, then return immediately.
        """
        if timeout != 0:
            deadline = (time.monotonic() + timeout) if timeout is not None else sys.float_info.max

            def execute_once():
                next_event_at = self._poll_scheduler_and_get_next_deadline()
                if next_event_at is None:
                    next_event_at = sys.float_info.max

                read_timeout = min(next_event_at, deadline) - time.monotonic()
                read_timeout = max(read_timeout, 0)
                read_timeout = min(read_timeout, 1)

                frame = self._can_driver.receive(read_timeout)
                if frame:
                    self._recv_frame(frame)

            execute_once()
            while time.monotonic() < deadline:
                execute_once()
        else:
            while True:
                frame = self._can_driver.receive(0)
                if frame:
                    self._recv_frame(frame)
                else:
                    break
            self._poll_scheduler_and_get_next_deadline()
开发者ID:antoinealb,项目名称:pyuavcan,代码行数:35,代码来源:node.py


示例12: run_baton_query

    def run_baton_query(self, baton_binary: BatonBinary, program_arguments: List[str]=None, input_data: Any=None) \
            -> List[Dict]:
        """
        Runs a baton query.
        :param baton_binary: the baton binary to use
        :param program_arguments: arguments to give to the baton binary
        :param input_data: input data to the baton binary
        :return: parsed serialization returned by baton
        """
        if program_arguments is None:
            program_arguments = []

        baton_binary_location = os.path.join(self._baton_binaries_directory, baton_binary.value)
        program_arguments = [baton_binary_location] + program_arguments

        _logger.info("Running baton command: '%s' with data '%s'" % (program_arguments, input_data))
        start_at = time.monotonic()
        baton_out = self._run_command(program_arguments, input_data=input_data)
        time_taken_to_run_query = time.monotonic() - start_at
        _logger.debug("baton output (took %s seconds, wall time): %s" % (time_taken_to_run_query, baton_out))

        if len(baton_out) == 0:
            return []
        if len(baton_out) > 0 and baton_out[0] != '[':
            # If information about multiple files is returned, baton does not return valid JSON - it returns a line
            # separated list of JSON, where each line corresponds to a different file
            baton_out = "[%s]" % baton_out.replace('\n', ',')

        baton_out_as_json = json.loads(baton_out)
        BatonRunner._raise_any_errors_given_in_baton_out(baton_out_as_json)

        return baton_out_as_json
开发者ID:wtsi-hgi,项目名称:python-baton-wrapper,代码行数:32,代码来源:_baton_runner.py


示例13: _select_next_server

    def _select_next_server(self):
        """
        Looks up in the server pool for an available server
        and attempts to connect.
        """
        srv = None
        now = time.monotonic()
        for s in self._server_pool:
            if s.reconnects > self.options["max_reconnect_attempts"]:
                continue
            if s.did_connect and now > s.last_attempt + self.options["reconnect_time_wait"]:
                yield from asyncio.sleep(self.options["reconnect_time_wait"], loop=self._loop)
            try:
                s.last_attempt = time.monotonic()
                r, w = yield from asyncio.open_connection(
                    s.uri.hostname,
                    s.uri.port,
                    loop=self._loop,
                    limit=DEFAULT_BUFFER_SIZE)
                srv = s
                self._io_reader = r
                self._io_writer = w
                s.did_connect = True
                break
            except Exception as e:
                self._err = e

        if srv is None:
            raise ErrNoServers
        self._current_server = srv
开发者ID:habibutsu,项目名称:asyncio-nats,代码行数:30,代码来源:client.py


示例14: dispatch

 def dispatch(self, wdelay=0):
     """ xxx """
     self.round += 1
     start = time.monotonic()
     with self._lock:
         xqueue = self._queue[:]
         while True:
             if not xqueue:
                 break
             event = xqueue[0]
             now = time.monotonic()
             if event.atime > (now + wdelay):
                 time.sleep(wdelay)
                 break
             elif event.atime <= (now + wdelay) and event.atime > now:
                 # D("sleep {}".format(event.atime - now))
                 time.sleep(event.atime - now)
             else:  # event.atime <= now:
                 xqueue.pop(0)
                 more = event._call()
                 if more is not None and more > 0:
                     event.atime = time.monotonic() + event.delay
                 else:
                     self._queue.pop(0)
         heapq.heapify(self._queue)
     return time.monotonic() - start
开发者ID:hevi9,项目名称:hevi-lib,代码行数:26,代码来源:dispatchers.py


示例15: __next__

	def __next__(self):
		with self.lock:
			t = time.monotonic()
			if t < self.next_yield:
				time.sleep(self.next_yield - t)
				t = time.monotonic()
			self.next_yield = t + self.interval
开发者ID:jpn--,项目名称:larch,代码行数:7,代码来源:rate_limiter.py


示例16: _handle_status

    def _handle_status(self, msg):
        """
        Reimplemented to refresh the namespacebrowser after kernel
        restarts
        """
        state = msg['content'].get('execution_state', '')
        msg_type = msg['parent_header'].get('msg_type', '')
        if state == 'starting':
            # This is needed to show the time a kernel
            # has been alive in each console.
            self.ipyclient.t0 = time.monotonic()
            self.ipyclient.timer.timeout.connect(self.ipyclient.show_time)
            self.ipyclient.timer.start(1000)

            # This handles restarts when the kernel dies
            # unexpectedly
            if not self._kernel_is_starting:
                self._kernel_is_starting = True
        elif state == 'idle' and msg_type == 'shutdown_request':
            # This handles restarts asked by the user
            if self.namespacebrowser is not None:
                self.set_namespace_view_settings()
                self.refresh_namespacebrowser()
            self.ipyclient.t0 = time.monotonic()
        else:
            super(NamepaceBrowserWidget, self)._handle_status(msg)
开发者ID:burrbull,项目名称:spyder,代码行数:26,代码来源:namespacebrowser.py


示例17: should_terminate

 def should_terminate(self):
     if self.stopping_start is None:
         self.stopping_start = monotonic()
         return False
     else:
         dt = monotonic() - self.stopping_start
         return dt if dt >= ACTOR_ACTION_TIMEOUT else False
开发者ID:quantmind,项目名称:pulsar,代码行数:7,代码来源:proxy.py


示例18: execute_cli_command

    def execute_cli_command(self, command, timeout=None):
        # While the command is being executed, incoming frames will be lost.
        # SLCAN driver from PyUAVCAN goes at great lengths to properly separate CLI response lines
        # from SLCAN messages in real time with minimal additional latency, so use it if you care about this.
        timeout = self._resolve_timeout(timeout)
        self.port.writeTimeout = timeout
        command += '\r\n'
        self._write(command)

        deadline = time.monotonic() + (timeout if timeout is not None else 999999999)
        self.port.timeout = 1
        response = bytes()

        while True:
            if time.monotonic() > deadline:
                raise TimeoutException('SLCAN CLI response timeout; command: %r' % command)

            b = self.port.read()
            if b == self.CLI_END_OF_TEXT:
                break
            if b:
                response += b

        # Removing SLCAN lines from response
        return re.sub(r'.*\r[^\n]', '', response.decode()).strip().replace(command, '')
开发者ID:Zubax,项目名称:drwatson,代码行数:25,代码来源:can.py


示例19: _poll_for_io

    def _poll_for_io(self):
        if self._sleeping:
            timeout = self._sleeping[0][0] - time.monotonic()
        else:
            timeout = None

        events = self._selector.select(timeout)
        for key, mask in events:
            task = key.data
            self._selector.unregister(key.fileobj)
            self._reschedule_task(task)

        # Process sleeping tasks (if any)
        if self._sleeping:
            current = time.monotonic()
            while self._sleeping and self._sleeping[0][0] <= current:
                tm, _, task, sleep_type = heapq.heappop(self._sleeping)
                # When a task wakes, verify that the timeout value matches that stored
                # on the task. If it differs, it means that the task completed its
                # operation, was cancelled, or is no longer concerned with this
                # sleep operation.  In that case, we do nothing
                if tm == task.timeout:
                    if sleep_type == 'sleep':
                        self._reschedule_task(task)
                    elif sleep_type == 'timeout':
                        self._cancel_task(task, exc=TimeoutError)
开发者ID:Hellowlol,项目名称:curio,代码行数:26,代码来源:kernel.py


示例20: zipTest

    def zipTest(self, f, compression):
        # Create the ZIP archive.
        zipfp = zipfile.ZipFile(f, "w", compression)

        # It will contain enough copies of self.data to reach about 6 GiB of
        # raw data to store.
        filecount = 6*1024**3 // len(self.data)

        next_time = time.monotonic() + _PRINT_WORKING_MSG_INTERVAL
        for num in range(filecount):
            zipfp.writestr("testfn%d" % num, self.data)
            # Print still working message since this test can be really slow
            if next_time <= time.monotonic():
                next_time = time.monotonic() + _PRINT_WORKING_MSG_INTERVAL
                print((
                   '  zipTest still writing %d of %d, be patient...' %
                   (num, filecount)), file=sys.__stdout__)
                sys.__stdout__.flush()
        zipfp.close()

        # Read the ZIP archive
        zipfp = zipfile.ZipFile(f, "r", compression)
        for num in range(filecount):
            self.assertEqual(zipfp.read("testfn%d" % num), self.data)
            # Print still working message since this test can be really slow
            if next_time <= time.monotonic():
                next_time = time.monotonic() + _PRINT_WORKING_MSG_INTERVAL
                print((
                   '  zipTest still reading %d of %d, be patient...' %
                   (num, filecount)), file=sys.__stdout__)
                sys.__stdout__.flush()
        zipfp.close()
开发者ID:Victor-Savu,项目名称:cpython,代码行数:32,代码来源:test_zipfile64.py



注:本文中的time.monotonic函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python time.now函数代码示例发布时间:2022-05-27
下一篇:
Python time.mktime函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap