• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python time.clock_gettime函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中time.clock_gettime函数的典型用法代码示例。如果您正苦于以下问题:Python clock_gettime函数的具体用法?Python clock_gettime怎么用?Python clock_gettime使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了clock_gettime函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_pthread_getcpuclockid

 def test_pthread_getcpuclockid(self):
     clk_id = time.pthread_getcpuclockid(threading.get_ident())
     self.assertTrue(type(clk_id) is int)
     self.assertNotEqual(clk_id, time.CLOCK_THREAD_CPUTIME_ID)
     t1 = time.clock_gettime(clk_id)
     t2 = time.clock_gettime(clk_id)
     self.assertLessEqual(t1, t2)
开发者ID:1st1,项目名称:cpython,代码行数:7,代码来源:test_time.py


示例2: run

 def run(self):
     """
     main loop
     · run events
     · run .update() functions
     · update viewport
     · and sleep if there's time left
     """
     smallju_frame = False
     while self.running:
         t = time.clock_gettime(time.CLOCK_MONOTONIC)
         for ev in pygame.event.get():
             ev.state = self.view_proxy
             handler.dispatch(ev, self, ev)
         if self.view_proxy == "mandel" and self.smallju_show and \
           self.smallju.updating and smallju_frame:
             self.smallju.update()
             self.update = True
         elif self.view().updating:
             self.view().update()
             self.update = True
         smallju_frame ^= self.smallju_show
         if self.update:
             self.draw()
             self.update = False
         # naively sleep until we expect the new frame
         t = time.clock_gettime(time.CLOCK_MONOTONIC) - t
         if t < config["interface"]["max_fps"]:
             time.sleep(config["interface"]["max_fps"] - t)
开发者ID:mar77i,项目名称:mandelizer,代码行数:29,代码来源:mandelizer.py


示例3: infinite_loop

    def infinite_loop(self):
        self.debug.write("Successfully launched")

        self.gps.wait()

        tick = 1
        while True:

            if self.kill_now:
                self.debug.write("Quitting")
                self.log.close()
                self.uart.close()
                self.debug.close()
                break

            # monotonic clock will not change as system time changes
            begin = time.clock_gettime(time.CLOCK_MONOTONIC)
            # system time can be converted to date/time stamp
            stamp = time.time()
            t1 = threading.Thread(target=self.log_data, args=(stamp,))
            t1.start()

            if tick >= self.cam_period:
                tick = 1
                t2 = threading.Thread(target=self.take_picture, args=(begin,))
                t2.start()
            else:
                tick = tick + 1

            delta = time.clock_gettime(time.CLOCK_MONOTONIC) - begin

            if self.serial_period > delta:
                time.sleep(self.serial_period - delta) 
开发者ID:bjester,项目名称:project-leda,代码行数:33,代码来源:leda.py


示例4: sendTexture

    def sendTexture(scene):
        for name, target in Splash._targets.items():
            if target._texture is None or target._texWriterPath is None:
                return
    
            buffer = bytearray()
            pixels = [pix for pix in target._texture.pixels]
            pixels = numpy.array(pixels)
            pixels = (pixels * 255.0).astype(numpy.ubyte)
            buffer += pixels.tostring()
            currentTime = time.clock_gettime(time.CLOCK_REALTIME) - target._startTime

            if target._texWriter is None or target._texture.size[0] != target._texSize[0] or target._texture.size[1] != target._texSize[1]:
                try:
                    from pyshmdata import Writer
                    os.remove(target._texWriterPath)
                except:
                    pass
    
                if target._texWriter is not None:
                    del target._texWriter
    
                target._texSize[0] = target._texture.size[0]
                target._texSize[1] = target._texture.size[1]
                target._texWriter = Writer(path=target._texWriterPath, datatype="video/x-raw, format=(string)RGBA, width=(int){0}, height=(int){1}, framerate=(fraction)1/1".format(target._texSize[0], target._texSize[1]), framesize=len(buffer))
            target._texWriter.push(buffer, floor(currentTime * 1e9))

            # We send twice to pass through the Splash input buffer
            time.sleep(0.1)
            currentTime = time.clock_gettime(time.CLOCK_REALTIME) - target._startTime
            target._texWriter.push(buffer, floor(currentTime * 1e9)) 
开发者ID:ameisso,项目名称:splash,代码行数:31,代码来源:operators.py


示例5: cpu_measurement_wrapper

 def cpu_measurement_wrapper(*args, **kwargs):
     start = time.clock_gettime(time.CLOCK_THREAD_CPUTIME_ID)
     try:
         return f(*args, **kwargs)
     finally:
         end = time.clock_gettime(time.CLOCK_THREAD_CPUTIME_ID)
         tracking[1] += 1
         tracking[2] += (end - start)
开发者ID:mutability,项目名称:mlat-server,代码行数:8,代码来源:profile.py


示例6: _generic_impl

    def _generic_impl(self):
        try:
            yield from self.connect()
        except AuthenticationError as e:
            self._statistics.req_401_rsp += 1
            self._logger.error("Authentication failed. Bad username/password.")
            raise RestError(401, "bad username/password")
        except ValueError as e:
            self._statistics.req_401_rsp += 1
            self._logger.error("Authentication failed. Missing BasicAuth.")
            raise RestError(401, "must supply BasicAuth")

        # ATTN: special checks for POST and PUT
        if self.request.method == "DELETE" and not is_config(self.request.uri):
            self._logger.error("Operational data DELETE not supported.")
            raise RestError(405, "can't delete operational data")

        # ATTN: make json it's own request handler
        if self.request.method == "GET" and is_schema_api(self.request.uri):
            return 200, "application/vnd.yang.data+json", get_json_schema(self._schema_root, self.request.uri)

        # convert url and body into xml payload 
        body = self.request.body.decode("utf-8")
        body_header = self.request.headers.get("Content-Type")
        convert_payload = (body, body_header)

        try:
            if self.request.method in ("GET"):
                converted_url = convert_get_request_to_xml(self._schema_root, self.request.uri)
            else:
                converted_url = self._confd_url_converter.convert(self.request.method, self.request.uri, convert_payload)
        except Exception as e:
            self._logger.error("invalid url requested %s, %s", self.request.uri, e)
            raise RestError(404, "Resource target or resource node not found")

        self._logger.debug("converted url: %s" % converted_url)

        if self._configuration.log_timing:
            sb_start_time = time.clock_gettime(time.CLOCK_REALTIME)

        result, netconf_response = yield from self._netconf.execute(self._netconf_operation, self.request.uri, converted_url)

        if self._configuration.log_timing:
            sb_end_time = time.clock_gettime(time.CLOCK_REALTIME)
            self._logger.debug("southbound transaction time: %s " % sb_end_time - sb_start_time)

        if result != Result.OK:
            code = map_error_to_http_code(result)
            raise RestError(code, netconf_response)

        # convert response to match the accept type
        code, response = self._convert_response_and_get_status(self._netconf_operation, self._accept_type, netconf_response)
        return code, self._accept_type.value, response
开发者ID:RIFTIO,项目名称:RIFT.ware,代码行数:53,代码来源:http_handler.py


示例7: main_loop

def main_loop(bar, inputs = None):
    # TODO: remove eventinputs again?
    #inputs += bar.widget.eventinputs()
    if inputs == None:
        inputs = global_inputs

    global_update = True
    def signal_quit(signal, frame):
        quit_main_loop()
    signal.signal(signal.SIGINT, signal_quit)
    signal.signal(signal.SIGTERM, signal_quit)

    # main loop
    while not core.shutdown_requested() and bar.is_running():
        now = time.clock_gettime(time.CLOCK_MONOTONIC)
        if bar.widget.maybe_timeout(now):
            global_update = True
        data_ready = []
        if global_update:
            painter = bar.painter()
            painter.widget(bar.widget)
            data_ready = select.select(inputs,[],[], 0.00)[0]
            if not data_ready:
                #print("REDRAW: " + str(time.clock_gettime(time.CLOCK_MONOTONIC)))
                painter.flush()
                global_update = False
            else:
                pass
                #print("more data already ready")
        if not data_ready:
            # wait for new data
            next_timeout = now + 360 # wait for at most one hour until the next bar update
            to = bar.widget.next_timeout()
            if to != None:
                next_timeout = min(next_timeout, to)
            now = time.clock_gettime(time.CLOCK_MONOTONIC)
            next_timeout -= now
            next_timeout = max(next_timeout,0.1)
            #print("next timeout = " + str(next_timeout))
            data_ready = select.select(inputs,[],[], next_timeout)[0]
            if core.shutdown_requested():
                break
        if not data_ready:
            pass #print('timeout!')
        else:
            for x in data_ready:
                x.process()
                global_update = True
    bar.proc.kill()
    for i in inputs:
        i.kill()
    bar.proc.wait()
开发者ID:t-wissmann,项目名称:barpyrus,代码行数:52,代码来源:mainloop.py


示例8: test_pthread_getcpuclockid

 def test_pthread_getcpuclockid(self):
     clk_id = time.pthread_getcpuclockid(threading.get_ident())
     self.assertTrue(type(clk_id) is int)
     # when in 32-bit mode AIX only returns the predefined constant
     if not platform.system() == "AIX":
         self.assertNotEqual(clk_id, time.CLOCK_THREAD_CPUTIME_ID)
     elif (sys.maxsize.bit_length() > 32):
         self.assertNotEqual(clk_id, time.CLOCK_THREAD_CPUTIME_ID)
     else:
         self.assertEqual(clk_id, time.CLOCK_THREAD_CPUTIME_ID)
     t1 = time.clock_gettime(clk_id)
     t2 = time.clock_gettime(clk_id)
     self.assertLessEqual(t1, t2)
开发者ID:Eyepea,项目名称:cpython,代码行数:13,代码来源:test_time.py


示例9: ping

def ping(target, timeout=5.0):
    request = IcmpEcho(payload=os.urandom(32))
    with socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_ICMP) as s:
        s.connect(target.address)
        s.settimeout(timeout)

        start = time.clock_gettime(time.CLOCK_MONOTONIC)
        s.send(request.to_bytes())
        response = s.recv(65536)
        end = time.clock_gettime(time.CLOCK_MONOTONIC)

    response = IcmpEcho.from_bytes(response)
    rtt_ms = (end - start) * 1000
    print('Got response in {delay:.3f} ms'.format(delay=rtt_ms))
开发者ID:olavmrk,项目名称:python-ping,代码行数:14,代码来源:ping.py


示例10: RunCompilation

def RunCompilation(num):
    global total_tasks
    global total_time
    global end

    while True:
        start = time.clock_gettime(time.CLOCK_MONOTONIC)
        if os.system("./compile.sh /usr/bin/dist-clang/clang++ {} > /dev/null".format(num)) != 0:
            break
        total_tasks += 1
        total_time += time.clock_gettime(time.CLOCK_MONOTONIC) - start

        if len(sys.argv) > 2 and total_tasks >= int(sys.argv[2]):
            end = time.clock_gettime(time.CLOCK_MONOTONIC)
            break
开发者ID:RainM,项目名称:dist-clang,代码行数:15,代码来源:load.py


示例11: test_FDTimer_absolute

 def test_FDTimer_absolute(self):
     t = timers.FDTimer(timers.CLOCK_REALTIME)
     start = time.time()
     t.settime(time.clock_gettime(time.CLOCK_REALTIME)+5.0, 0.0, absolute=True)
     print(t.read())
     t.close()
     self.assertAlmostEqual(time.time()-start, 5.0, places=2)
开发者ID:kdart,项目名称:pycopia3,代码行数:7,代码来源:test.py


示例12: poll_timed

 def poll_timed(self, timeout : float, clock_id : int = None) -> bytes:
     '''
     Wait for a message to be broadcasted on the bus.
     The caller should make a copy of the received message,
     without freeing the original copy, and parse it in a
     separate thread. When the new thread has started be
     started, the caller of this function should then
     either call `Bus.poll_timed` again or `Bus.poll_stop`.
     
     @param   timeout:float  The time the function shall fail with `os.errno.EAGAIN`,
                             if it has not already completed
     @param   clock_id:int?  The clock `timeout` is measured in, it must be a
                             predictable clock, if `None`, `timeout` is measured in
                             relative time instead of absolute time
     @return  :bytes         The received message
                             NB! The received message will not be decoded from UTF-8
     '''
     from native_bus import bus_poll_timed_wrapped
     if clock_id is None:
         import time
         clock_id = time.CLOCK_MONOTONIC_RAW
         timeout += time.clock_gettime(clock_id)
     (message, e) = bus_poll_timed_wrapped(self.bus, timeout, clock_id)
     if message is None:
         raise self.__oserror(e)
     return message
开发者ID:maandree,项目名称:python-bus,代码行数:26,代码来源:bus.py


示例13: dump_cpu_profiles

    def dump_cpu_profiles(tofile=sys.stderr):
        elapsed_cpu = time.clock_gettime(time.CLOCK_THREAD_CPUTIME_ID) - baseline_cpu
        elapsed_wall = time.monotonic() - baseline_wall

        print('Elapsed: {wall:.1f}   CPU: {cpu:.1f} ({percent:.0f}%)'.format(
            wall=elapsed_wall,
            cpu=elapsed_cpu,
            percent=100.0 * elapsed_cpu / elapsed_wall), file=tofile)
        print('{rank:4s} {name:60s} {count:6s} {persec:6s} {total:8s} {each:8s} {fraction:6s}'.format(
            rank='#',
            name='Function',
            count='Calls',
            persec='(/sec)',
            total='Total(s)',
            each='Each(us)',
            fraction="Frac"), file=tofile)

        rank = 1
        for name, count, total in sorted(_cpu_tracking, key=operator.itemgetter(2), reverse=True):
            if count == 0:
                break

            print('{rank:4d} {name:60s} {count:6d} {persec:6.1f} {total:8.3f} {each:8.0f} {fraction:6.1f}'.format(
                rank=rank,
                name=name,
                count=count,
                persec=1.0 * count / elapsed_wall,
                total=total,
                each=total * 1e6 / count,
                fraction=100.0 * total / elapsed_cpu), file=tofile)
            rank += 1

        tofile.flush()
开发者ID:mutability,项目名称:mlat-server,代码行数:33,代码来源:profile.py


示例14: HandleEvent

	def HandleEvent(self, event):
		now = time.clock_gettime(time.CLOCK_MONOTONIC)
		if not self._first_event:
			self._first_event = event
			self._first_event_timestamp = now
		self._last_event = event
		self._last_event_timestamp = now
		self._count_by_type[event['type']] += 1
开发者ID:flamingcowtv,项目名称:adsb-tools,代码行数:8,代码来源:sourcestats.py


示例15: main

def main(argv):
    """
    Entry point for ntpclient.py.

    Arguments:
        argv: command line arguments
    """
    res = None
    quiet = False
    args = set(argv)
    if {'-q', '--quiet'}.intersection(args):
        quiet = True
    if '-s' in argv:
        server = argv[argv.index('-s')+1]
    elif '--server' in argv:
        server = argv[argv.index('--server')+1]
    elif 'NTPSERVER' in os.environ:
        server = os.environ['NTPSERVER']
    else:
        server = 'pool.ntp.org'
    if {'-h', '--help'}.intersection(args):
        print('Usage: ntpclient [-h|--help] [-q|--quiet] [-s|--server timeserver]')
        print(f'Using time server {server}.')
        sys.exit(0)
    t1 = time.clock_gettime(time.CLOCK_REALTIME)
    ntptime = get_ntp_time(server)
    t4 = time.clock_gettime(time.CLOCK_REALTIME)
    # It is not guaranteed that the NTP time is *exactly* in the middle of both
    # local times. But it is a reasonable simplification.
    roundtrip = round(t4 - t1, 4)
    localtime = (t1 + t4) / 2
    diff = localtime - ntptime
    if os.geteuid() == 0:
        time.clock_settime(time.CLOCK_REALTIME, ntptime)
        res = 'Time set to NTP time.'
    localtime = datetime.fromtimestamp(localtime)
    ntptime = datetime.fromtimestamp(ntptime)
    if not quiet:
        print('Using server {}.'.format(server))
        print('NTP call took approximately', roundtrip, 's.')
        print('Local time value:', localtime.strftime('%a %b %d %H:%M:%S.%f %Y.'))
        print('NTP time value:', ntptime.strftime('%a %b %d %H:%M:%S.%f %Y.'),
              '±', roundtrip/2, 's.')
        print('Local time - ntp time: {:.6f} s.'.format(diff))
        if res:
            print(res)
开发者ID:rsmith-nl,项目名称:scripts,代码行数:46,代码来源:ntpclient.py


示例16: test_record_event_sequence_sync_has_reasonable_relative_timestamp

    def test_record_event_sequence_sync_has_reasonable_relative_timestamp(self):
        monotonic_time_first = time.clock_gettime(time.CLOCK_MONOTONIC)

        calls = self.call_start_stop_event_sync()
        relative_time_first = calls[0][2][2][0][0]
        self.interface_mock.ClearCalls()
        calls = self.call_start_stop_event_sync()
        relative_time_second = calls[0][2][2][1][0]

        monotonic_time_second = time.clock_gettime(time.CLOCK_MONOTONIC)

        relative_time_difference = relative_time_second - relative_time_first
        self.assertLessEqual(0, relative_time_difference)
        monotonic_time_difference = 1e9 * \
            (monotonic_time_second - monotonic_time_first)
        self.assertLessEqual(relative_time_difference,
                             monotonic_time_difference)
开发者ID:endlessm,项目名称:eos-metrics,代码行数:17,代码来源:test-daemon-integration.py


示例17: test_clock_settime

    def test_clock_settime(self):
        t = time.clock_gettime(time.CLOCK_REALTIME)
        try:
            time.clock_settime(time.CLOCK_REALTIME, t)
        except PermissionError:
            pass

        if hasattr(time, "CLOCK_MONOTONIC"):
            self.assertRaises(OSError, time.clock_settime, time.CLOCK_MONOTONIC, 0)
开发者ID:juleskt,项目名称:ek128_work,代码行数:9,代码来源:test_time.py


示例18: createRecordData

def createRecordData(timeSec:float, dataSz:int, node, sAddr, nAddr, frames):
    return {
        'node'  : node,
        'sz'    : dataSz,
        'saddr' : sAddr,
        'naddr' : nAddr,
        'frames': frames,
        'time'  : timeSec,
        'start' : time.clock_gettime(time.CLOCK_MONOTONIC),
    }
开发者ID:finaldie,项目名称:skull,代码行数:10,代码来源:skull-addr-remap.py


示例19: handler

def handler(conn):
    clean_list = []
    while True:
        if conn.poll(10):
            data = conn.recv()
            sh_thread = Process(target=gs_shakalizing, args=data)
            sh_thread.start()
            sh_thread.join()
            clean_list.append((data[1], time.clock_gettime(0)))
        curr_time = time.clock_gettime(0)
        new_list = []
        for (filename, l_time) in clean_list:
            delta = curr_time - l_time
            if delta > TIME_DELTA:
                gen_file = path.join(app.config['UPLOAD_FOLDER'], filename)
                remove(gen_file)
            else:
                new_list.append((filename, l_time))
        clean_list = new_list
开发者ID:FreeCX,项目名称:shakalator,代码行数:19,代码来源:main.py


示例20: log

 def log(self,sstr,level='msg'):
     now=time.strftime("%y/%m/%d %H:%M:%S")
     nows=(str(time.clock_gettime(time.CLOCK_REALTIME)).split('.')[1]+'000')[0:3]
     head=tail=''
     if level=='error':
         head='\033[31m'
         tail='\033[0m'
     output=now+nows+"\033[32m ["+str(level)+"]\033[0m "+":"+head+str(sstr)+tail
     print(output,file=sys.stderr)
     self.logf.write(output+"\n")
开发者ID:yyd01245,项目名称:overlay_video,代码行数:10,代码来源:test-vdm.py



注:本文中的time.clock_gettime函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python time.clock_settime函数代码示例发布时间:2022-05-27
下一篇:
Python time.clock函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap