• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python stopwatch.Stopwatch类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中stopwatch.Stopwatch的典型用法代码示例。如果您正苦于以下问题:Python Stopwatch类的具体用法?Python Stopwatch怎么用?Python Stopwatch使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Stopwatch类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_sanity

    def test_sanity(self):
        lock1 = threading.RLock()
        lock2 = threading.RLock()
        
        def sleep_and_inc(lock,sleep_time): 
            with synchronize_using(lock):
                time.sleep(sleep_time)
                lock.sync_test_counter = getattr(lock,'sync_test_counter',0) + 1
                
        sleep_time = 0.5
        n = 4
        
        s = Stopwatch()
        lst_threads = []
        for lock in [lock1,lock2]:
            for _ in xrange(n):
                t = threading.Thread(target=sleep_and_inc,args=[lock,sleep_time])
                lst_threads.append(t)
                t.start()

        # wait for all threads, then check results
        for t in lst_threads:
            t.join()
            
        duration = s.duration()
        self.assertEqual(lock1.sync_test_counter,n)
        self.assertEqual(lock2.sync_test_counter,n)
        ideal_time = n*sleep_time
        self.assert_(ideal_time*0.9 < duration < ideal_time*1.1, "duration=%s, ideal=%s" % (duration,ideal_time))
开发者ID:idobarkan,项目名称:my-code,代码行数:29,代码来源:test_synchronization.py


示例2: timeTrial

def timeTrial(n):
    a = stdarray.create1D(n, 0)
    for i in range(n):
        a[i] = stdrandom.uniformInt(-1000000, 1000000)
    watch = Stopwatch()
    count = threesum.countTriples(a)
    return watch.elapsedTime()
开发者ID:davidhuizhou,项目名称:python,代码行数:7,代码来源:doublingtest.py


示例3: RaisArmTime

class RaisArmTime(Command):

    def __init__(self, robot, raise_speed, stop_time, name=None, timeout=None):
        '''
        Constructor
        '''
        super().__init__(name, timeout)
        self._robot = robot
        self._raise_speed = raise_speed
        self._stop_time = stop_time
        self.requires(robot.arm)
        self._stopwatch = Stopwatch()

    def initialize(self):
        """Called before the Command is run for the first time."""
        self._stopwatch.start()

    def execute(self):
        """Called repeatedly when this Command is scheduled to run"""
        self._robot.arm.move_arm(self._raise_speed)

    def isFinished(self):
        """Returns true when the Command no longer needs to be run"""
        return self._stopwatch.elapsed_time_in_secs() >= self._stop_time

    def end(self):
        """Called once after isFinished returns true"""
        self._robot.arm.move_arm(0)

    def interrupted(self):
        """Called when another command which requires one or more of the same subsystems is scheduled to run"""
        self.end()
开发者ID:TechnoJays,项目名称:robot2016,代码行数:32,代码来源:raise_arm_time.py


示例4: GetDataForCatsFromServer

def GetDataForCatsFromServer(cat, desc, orgId, orgName, action):
  #action = 'GetData'
  #action = 'GetDataTest'

  reqUrl = base_url + '/api/Search/' + action #GetData

  SearchDataRequest = {
    'ContextTypes': 'Organization',
    'BuiltInCategories': [cat],
    'Version': '2019',
    'OrgId': orgId
  }

  sw = Stopwatch()
  while True:
    verbose = False
    if verbose:
      print("before GetData", SearchDataRequest['BuiltInCategories'], desc, reqUrl )

    r = requests.post( reqUrl, json = SearchDataRequest, headers=headers)
    if r.status_code == 200:
      break

    print("after GetData", r.status_code, orgId, orgName)
    if r.status_code == 401:
      ensureLogin()
    else:
      print(r.text)
      raise  
      break # or even throw?
    sw = Stopwatch() # start new stopwatch.
 
  m = sw.measure(silent=True) 
  return (r,m) 
开发者ID:pylgrym,项目名称:first_app,代码行数:34,代码来源:bimtok.py


示例5: test_n_threads

    def test_n_threads(self):
        sleep_time = 0.1

        class Sleeper(object):
            def __init__(self):
                self._lock = threading.RLock()  # used by @synchronized decorator
                self.i = 0
                self.thread_ids = set()

            @synchronized
            def _critical_section(self):
                self.thread_ids.add(id(threading.currentThread()))
                self.i += 1

            def sleep(self):
                time.sleep(sleep_time)
                self._critical_section()

        n_threads = 5
        n_calls = 20
        ao = AO.AO(Sleeper(), n_threads)
        ao.start()
        self.aos_to_stop.append(ao)

        s = Stopwatch()
        futures = [ao.sleep() for i in xrange(n_calls)]
        for f in futures:
            f.get()
        duration = s.duration()
        expected = sleep_time * n_calls / float(n_threads)
        self.assert_(0.9 * expected < duration < 1.2 * expected, "duration=%s, expected=%s" % (duration, expected))
        self.assertEqual(ao.obj.i, n_calls)
        self.assertEqual(len(ao.obj.thread_ids), n_threads)
        self.failIf(id(threading.currentThread()) in ao.obj.thread_ids)
开发者ID:giltayar,项目名称:Python-Exercises,代码行数:34,代码来源:test_AO.py


示例6: do_GET

    def do_GET(self):
        if 0:
          self.send_response(200)
          self.end_headers()
          self.wfile.write(b'fast')
          return

        watch = Stopwatch()
        global ourData 

        try:
          args = self.path.split('?')
          if len(args)<2:
            print("no size arg", args)
            return
          sizearg = int(args[1])

          front = '[%s]\n' % sizearg
        except Exception as e:
          self.send_response(500)
          self.end_headers()
          err_out = str(e).encode()
          self.wfile.write(err_out)
          return

        self.send_response(200)
        self.end_headers()

        self.wfile.write(front.encode())

        #print('len:', len(ourData.slices.get(sizearg,'missing')))
        self.wfile.write( ourData.slices.get(sizearg,'missing') )
        watch.measure()
开发者ID:pylgrym,项目名称:first_app,代码行数:33,代码来源:netloader.py


示例7: test_no_sync

 def test_no_sync(self):
     stopwatch = Stopwatch()
     threading.Thread(target=self.s.add,args=[10]).start()
     time.sleep(0.05) # make sure other thread gets head start
     val = self.s.add(5)
     self.assertEqual(val,0)
     
     duration = stopwatch.duration()
     self.assert_(duration < 1.2*self.s.sleep_time, 'calls took too long. duration=%s' % duration)
开发者ID:idobarkan,项目名称:my-code,代码行数:9,代码来源:test_synchronization.py


示例8: test_normal

 def test_normal(self):
     stopwatch = Stopwatch()
     threading.Thread(target=self.s.sync_add,args=[10]).start()
     time.sleep(0.05) # make sure other thread gets head start (and the mutex)
     val = self.s.sync_add(5)
     self.assertEqual(val,10)
     
     duration = stopwatch.duration()
     self.assert_(duration > 1.9*self.s.sleep_time, 'calls completed too quickly. duration=%s' % duration)
开发者ID:idobarkan,项目名称:my-code,代码行数:9,代码来源:test_synchronization.py


示例9: test_timeout

    def test_timeout(self):
        # preset future - should return within timeout
        f = future.Future.preset(3)
        self.assertEqual(f.get(100),3)

        # unset future - should raise exception
        timeout = 1 # timeout in seconds
        f = future.Future()
        stopwatch = Stopwatch()
        stopwatch.start()
        self.assertRaises(future.FutureTimeoutException,f.get,timeout*1000)
        duration = stopwatch.duration()
        self.assert_(0.9*timeout < duration < 1.3*timeout, 'duration=%s' % duration)  
开发者ID:idobarkan,项目名称:my-code,代码行数:13,代码来源:test_future.py


示例10: doDB_Work

def doDB_Work(limit=9999):
  dbServer = r'jg-pc\jg1' 
  dbName = 'Ajour_System_A/S_10aee6e8b7dd4f4a8fd0cbf9cedf91cb'

  theDBCfg = { 'server': dbServer, 'db': dbName, 'user': 'sa', 'pwd': 'morOg234' }

  with getConn(theDBCfg) as conn:
    csr = conn.cursor()

    sw = Stopwatch()
    a = selectFromDB(csr) 
    sw.measure()
    print(len(a))
开发者ID:pylgrym,项目名称:first_app,代码行数:13,代码来源:sqls_tiny2.py


示例11: getBCbyVsId

def getBCbyVsId(
  vsId = 'B14462',
  releaseId='624d9489-0f8c-48db-99f9-67701a040742'
 ):
  global headers, url
  bcUrl = url + '/api/v1/BuildingComponent/VSID/{vsId}/Release/{releaseId}'.format(vsId=vsId, releaseId=releaseId)
  watch = Stopwatch('req')
  r = requests.get( bcUrl, headers=headers)
  watch.measure()
  logIfBad(r)
  exes = json.loads(r.text)
  print(len(r.text))
  if 0:
    pprint.pprint(exes)
开发者ID:pylgrym,项目名称:first_app,代码行数:14,代码来源:mol1.py


示例12: getBC_chunk

def getBC_chunk(chunkSet):
  global headers, url
  bcUrl = url + '/api/v1/BuildingComponent/VSIDs/Release/624d9489-0f8c-48db-99f9-67701a040742?' + chunkSet #bcArgs
  print(bcUrl)
  #return
  watch = Stopwatch('req')
  r = requests.post( bcUrl, headers=headers)
  watch.measure()
  logIfBad(r)
  exes = json.loads(r.text)
  print('len:', len(r.text))
  with open( 'BCs.json', "w") as outfile:
    json.dump(exes, outfile, indent=2)
  if 0:
    pprint.pprint(exes)
开发者ID:pylgrym,项目名称:first_app,代码行数:15,代码来源:mol1.py


示例13: getBCs

def getBCs(
  releaseId='624d9489-0f8c-48db-99f9-67701a040742'
):
  global headers, url
  bcUrl = url + '/api/v1/BuildingComponent/VSIDs/Release/{releaseId}?VSID1=B33321&VSID2=B96932&VSID3=B75326&VSID4=B36547&VSID5=B20553&VSID6=B59061&VSID7=B58491&VSID8=B80296&VSID9=B81223'.format(releaseId=releaseId)
  print(bcUrl)
  watch = Stopwatch('req')
  r = requests.post( bcUrl, headers=headers)
  watch.measure()
  logIfBad(r)
  exes = json.loads(r.text)
  print(len(r.text))
  with open( 'BCs.json', "w") as outfile:
    json.dump(exes, outfile, indent=2)
  if 0:
    pprint.pprint(exes)
开发者ID:pylgrym,项目名称:first_app,代码行数:16,代码来源:mol1.py


示例14: getPrices

def getPrices():
  global headers, url
  releaseID = '624d9489-0f8c-48db-99f9-67701a040742'
  priceUrl = url + '/api/v1/BuildingComponent/Calculate/Release/%s?' % releaseID
  priceUrl += 'id1=d03b8b89-fb70-47da-9a18-011132b22921&id2=32e920bb-e927-4a10-ad85-14c32bd197ff&id3=3dbf3ce5-f7e5-4745-bf18-1bd766c1b54d'
  priceUrl += '&amount1=10&amount2=10&amount3=10'
  priceUrl += "&yourCalculationId=1246" #{myRandomCalculationNumber}
  print(priceUrl)
  watch = Stopwatch('req')
  r = requests.post( priceUrl, headers=headers)
  watch.measure()
  logIfBad(r)
  prices = json.loads(r.text)
  if 0:
    pprint.pprint(prices)
  with open( 'prices.json', "w") as outfile:
    json.dump(prices, outfile, indent=2)
开发者ID:pylgrym,项目名称:first_app,代码行数:17,代码来源:mol1.py


示例15: getPrice_chunk

def getPrice_chunk(chunkSet, chunkIx):
  global headers, url
  releaseID = '624d9489-0f8c-48db-99f9-67701a040742'
  priceUrl = url + '/api/v1/BuildingComponent/Calculate/Release/%s?' % releaseID
  priceUrl += chunkSet

  print(chunkIx, priceUrl)
  #return
  watch = Stopwatch('req')
  r = requests.post( priceUrl, headers=headers)
  watch.measure()
  logIfBad(r)
  prices = json.loads(r.text)
  if 0:
    pprint.pprint(prices)
  with open( 'prices.json', "w") as outfile:
    json.dump(prices, outfile, indent=2)
开发者ID:pylgrym,项目名称:first_app,代码行数:17,代码来源:mol1.py


示例16: getBCbyGroup

def getBCbyGroup(  # get the BCs within a BC-group.
  groupId = 'B14462',
  releaseId='624d9489-0f8c-48db-99f9-67701a040742'
):
  global headers, url
  bcUrl = url + '/api/v1/BuildingComponent/BuildingComponentGroup/{groupId}/Release/{releaseId}'.format(groupId=groupId, releaseId=releaseId)
  watch = Stopwatch('req')
  r = requests.get( bcUrl, headers=headers)
  watch.measure(show=False)
  logIfBad(r)
  exes = json.loads(r.text)
  if 0:
    print(len(r.text))
  if 0:
    pprint.pprint(exes)
  exes = exes['buildingComponents']
  return exes
开发者ID:pylgrym,项目名称:first_app,代码行数:17,代码来源:mol1.py


示例17: __init__

 def __init__(self, name=None):
     if name is None:
         self._name_argument_string = ""
     else:
         self._name_argument_string = "(%s)" % name
     self._fps_history = collections.deque(maxlen=10)
     self._previous_time = None
     self._previous_calculated_fps_time = None
     self._stopwatch = Stopwatch()
     self._fps = None
开发者ID:gaborpapp,项目名称:AIam,代码行数:10,代码来源:fps_meter.py


示例18: init

 def init(self):
     self.initStyles()
     self.initMaps()
     Terrain.initImages()
     Terrain.initSounds()
     self.initMenus()
     self.stopwatch = Stopwatch()
     self.initHelp()
     self.mode = "menu"
     self.menu = self.mainMenu
开发者ID:daniel-wen,项目名称:bouncier,代码行数:10,代码来源:main.py


示例19: __init__

 def __init__(self, robot, raise_speed, stop_time, name=None, timeout=None):
     '''
     Constructor
     '''
     super().__init__(name, timeout)
     self._robot = robot
     self._raise_speed = raise_speed
     self._stop_time = stop_time
     self.requires(robot.arm)
     self._stopwatch = Stopwatch()
开发者ID:TechnoJays,项目名称:robot2016,代码行数:10,代码来源:raise_arm_time.py


示例20: GetToken_Password

def GetToken_Password():  # oauth, exchange a password for a token.
  global env, url
  print(env['username'], env['password'])

  tokenReq = {
    'grant_type':    'password',
    'username':      env['username'], #+'2',
    'password':      env['password'],
    'client_id':     env['client_id'],
    'client_secret': env['client_secret'] #+'2'
  }

  token_url = url + '/token'
  print('before', token_url)
  watch = Stopwatch('req')
  r = requests.post(token_url, tokenReq)
  watch.measure()
  logIfBad(r)
  return r
开发者ID:pylgrym,项目名称:first_app,代码行数:19,代码来源:mol1.py



注:本文中的stopwatch.Stopwatch类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python serialbase.SerialBase类代码示例发布时间:2022-05-27
下一篇:
Python stop_words.get_stop_words函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap