• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python test.rand_string函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中zato.common.test.rand_string函数的典型用法代码示例。如果您正苦于以下问题:Python rand_string函数的具体用法?Python rand_string怎么用?Python rand_string使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了rand_string函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_nested_from_json

    def test_nested_from_json(self):

        n = Nested('elem', 'sub1', Bool('my_bool1'), 'sub2', 'sub3', Dict('my_dict1', 'key1', 'key2'))

        expected_sub1_1 = rand_string()
        expected_sub2_1 = rand_string()
        expected_sub3_1 = rand_string()
        expected_my_bool1_1 = rand_bool()
        expected_key1_1 = rand_string()
        expected_key2_1 = rand_string()

        value1 = {'elem': {
            'sub1': expected_sub1_1,
            'sub2': expected_sub2_1,
            'my_bool1': expected_my_bool1_1,
            'sub3': expected_sub3_1,
            'my_dict1' : {
                'key1': expected_key1_1,
                'key2': expected_key2_1,
            }
        }}

        ret_value = n.from_json(value1)

        eq_(ret_value,
            {'elem':
             {'my_bool1': expected_my_bool1_1, 'sub2': expected_sub2_1, 'sub3': expected_sub3_1,
              'my_dict1': {'key2': expected_key2_1, 'key1': expected_key1_1},
              'sub1': expected_sub1_1}}
        )
开发者ID:Aayush-Kasurde,项目名称:zato,代码行数:30,代码来源:test_sio.py


示例2: test__startup_services

    def test__startup_services(self):

        class FakeBrokerClient(object):
            def __init__(self):
                self.messages = {}
                
            def invoke_async(self, msg):
                self.messages[msg['service']] = msg
                
        broker_client = FakeBrokerClient()
        
        startup_services = Bunch()
        for x in range(10):
            name =  rand_string()
            payload = rand_string()
            startup_services[name] = payload
        
        ps = ParallelServer()
        ps.broker_client = broker_client
        ps.fs_server_config = Bunch()
        ps.fs_server_config.startup_services = startup_services
        
        ps.invoke_startup_services()
        
        for expected_service, expected_payload in startup_services.items():
            msg = Bunch(broker_client.messages[expected_service])
            eq_(msg.action, SERVICE.PUBLISH)
            eq_(msg.channel, CHANNEL.STARTUP_SERVICE)
            eq_(msg.payload, expected_payload)
            eq_(msg.service, expected_service)
            ok_(msg.cid.startswith('K'))
            self.assertEquals(len(msg.cid), 40)
开发者ID:alex-hutton,项目名称:zato,代码行数:32,代码来源:test_parallel.py


示例3: test_main_loop_max_repeats_reached

    def test_main_loop_max_repeats_reached(self):

        runs_ctx = []

        def callback(ctx):
            runs_ctx.append(ctx)

        cb_kwargs = {
            rand_string():rand_string(),
            rand_string():rand_string()
        }

        interval_in_seconds = 0.01
        max_repeats = choice(range(2, 5))

        job = get_job(interval_in_seconds=interval_in_seconds, max_repeats=max_repeats)
        job.callback = callback
        job.cb_kwargs = cb_kwargs

        self.assertTrue(job.main_loop())
        sleep(0.2)

        len_runs_ctx = len(runs_ctx)
        self.assertEquals(len_runs_ctx, max_repeats)

        self.assertFalse(job.keep_running)
        self.assertIs(job.callback, callback)

        for idx, ctx in enumerate(runs_ctx, 1):
            self.check_ctx(ctx, job, interval_in_seconds, max_repeats, idx, cb_kwargs, len_runs_ctx)
开发者ID:Aayush-Kasurde,项目名称:zato,代码行数:30,代码来源:test_scheduler.py


示例4: test_response

 def test_response(self):
     request = {'cluster_id':rand_int(), 'name':rand_string()}
     
     expected_id = rand_int()
     expected_name = rand_string()
     expected_is_active = rand_bool()
     expected_impl_name = rand_string()
     expected_is_internal = rand_bool()
     
     service = Service()
     service.id = expected_id
     service.name = expected_name
     service.is_active = expected_is_active
     service.impl_name = expected_impl_name
     service.is_internal = expected_is_internal
     
     expected = Expected()
     expected.add(service)
     
     instance = self.invoke(GetByName, request, expected)
     response = Bunch(loads(instance.response.payload.getvalue())['response'])
     
     eq_(response.id, expected_id)
     eq_(response.name, expected_name)
     eq_(response.is_active, expected_is_active)
     eq_(response.impl_name, expected_impl_name)
     eq_(response.is_internal, expected_is_internal)
     eq_(response.usage, 0)
开发者ID:dsuch,项目名称:zato,代码行数:28,代码来源:test_service.py


示例5: test_add

    def test_add(self):
        jps = JSONPointerStore()

        name1, expr1 = '1', config_value('/{}/{}'.format(*rand_string(2)))
        name2, expr2 = '2', config_value('/aaa/{}/{}'.format(*rand_string(2)))
        name3, expr3 = '3', config_value('/aaa/{}/{}'.format(*rand_string(2)))
        name4, expr4 = '2', config_value('/aaa/{}/{}'.format(*rand_string(2)))

        jps.add(name1, expr1)
        self.assertIn(name1, jps.data)
        self.assertEquals(expr1.value, jps.data[name1].path)

        jps.add(name2, expr2)
        self.assertIn(name2, jps.data)
        self.assertEquals(expr2.value, jps.data[name2].path)

        jps.add(name3, expr3)
        self.assertIn(name3, jps.data)
        self.assertEquals(expr3.value, jps.data[name3].path)

        # name4's value is '2' so it overrides 2

        jps.add(name4, expr4)
        self.assertIn(name4, jps.data)

        self.assertEquals(expr4.value, jps.data[name2].path)
        self.assertEquals(expr4.value, jps.data[name4].path)
开发者ID:bboerner,项目名称:zato,代码行数:27,代码来源:test_message.py


示例6: test__set_tls_info

    def test__set_tls_info(self):

        expected_cert_dict = rand_string()
        expected_cert_der = rand_string()
        expected_cert_sha1 = sha1(expected_cert_der).hexdigest().upper()
        
        for wsgi_url_scheme in('https', 'http'):
            wsgi_environ = {
                'wsgi.url_scheme': wsgi_url_scheme,
                'gunicorn.socket': FakeGunicornSocket(expected_cert_der, expected_cert_dict),
                'zato.http.response.status': rand_string(),
                'zato.http.channel_item': Bunch(audit_enabled=False),
            }
    
            ps = ParallelServer()
            ps.worker_store = FakeWorkerStore()
            ps.on_wsgi_request(wsgi_environ, StartResponse())
            
            if wsgi_url_scheme == 'https':
                eq_(wsgi_environ['zato.tls.client_cert.dict'], expected_cert_dict)
                eq_(wsgi_environ['zato.tls.client_cert.der'], expected_cert_der)
                eq_(wsgi_environ['zato.tls.client_cert.sha1'], expected_cert_sha1)
            else:
                self.assertTrue('zato.tls.client_cert.dict' not in wsgi_environ)
                self.assertTrue('zato.tls.client_cert.der' not in wsgi_environ)
                self.assertTrue('zato.tls.client_cert.sha1' not in wsgi_environ)
开发者ID:jeffreyacegong,项目名称:zato,代码行数:26,代码来源:test_parallel.py


示例7: test_client_ok

    def test_client_ok(self):

        cid = new_cid()
        headers = {'x-zato-cid':cid}
        ok = True
        _rand = rand_string()
        soap_action = rand_string()
        
        text = """
            <soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/">
             <soapenv:Body>
              <abc>{}</abc>
             </soapenv:Body>
            </soapenv:Envelope>""".format(_rand).strip()
        status_code = rand_int()
        
        client = self.get_client(FakeInnerResponse(headers, ok, text, status_code))
        response = client.invoke(soap_action)

        expected_response_data = """
            <abc xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/">{}</abc>
            """.format(_rand).strip()

        eq_(response.details, None)
        eq_(response.ok, ok)
        eq_(response.inner.text, text)
        eq_(etree.tostring(response.data), expected_response_data)
        eq_(response.has_data, True)
        eq_(response.cid, cid)
开发者ID:Aayush-Kasurde,项目名称:zato,代码行数:29,代码来源:test_client.py


示例8: test_nested_to_json

    def test_nested_to_json(self):

        n = Nested('elem', 'sub1', Bool('my_bool1'), 'sub2', 'sub3', Dict('my_dict1', 'key1', 'key2'))

        expected_sub1_2 = rand_string()
        expected_sub2_2 = rand_string()
        expected_sub3_2 = rand_string()
        expected_my_bool1_2 = rand_bool()
        expected_key1_2 = rand_string()
        expected_key2_2 = rand_string()

        value2 = {'elem': {
            'sub1': expected_sub1_2,
            'sub2': expected_sub2_2,
            'my_bool1': expected_my_bool1_2,
            'sub3': expected_sub3_2,
            'my_dict1' : {
                'key1': expected_key1_2,
                'key2': expected_key2_2,
            }
        }}

        ret_value = n.to_json(value2)

        eq_(ret_value,
            {'elem':
             {'my_bool1': expected_my_bool1_2, 'sub2': expected_sub2_2, 'sub3': expected_sub3_2,
              'my_dict1': {'key2': expected_key2_2, 'key1': expected_key1_2},
              'sub1': expected_sub1_2}}
        )
开发者ID:Aayush-Kasurde,项目名称:zato,代码行数:30,代码来源:test_sio.py


示例9: test_get_retry_settings_has_invalid_async_callback

    def test_get_retry_settings_has_invalid_async_callback(self):

        ir = InvokeRetry(None)

        callback = [None, rand_string()]
        repeats = [None, rand_int()]

        target = rand_string()

        for callback_item in callback:
            for repeats_item in repeats:
                kwargs = {
                    'async_fallback': True,
                    'callback': callback_item,
                    'repeats': repeats_item,
                }

            try:
                ir._get_retry_settings(target, **kwargs)
            except ValueError, e:

                for name in 'callback', 'repeats':
                    if name in e.message:
                        self.assertEquals(e.message, 'Could not invoke `{}`, `{}` was not provided ({})'.format(
                            target, name, None))
            else:
                self.assertTrue(callback_item is not None)
                self.assertTrue(repeats_item is not None)
开发者ID:aek,项目名称:zato,代码行数:28,代码来源:test_invoke_retry.py


示例10: test_sio_list_data_type_input_xml

 def test_sio_list_data_type_input_xml(self):
     cid = rand_string()
     data_format = DATA_FORMAT.XML
     transport = rand_string()
     
     sio_config = {'int_parameters': [rand_string()]} # Not really used but needed
     
     service_sio = Bunch()
     service_sio.input_required = ('first_name', 'last_name', List('emails'))
     
     expected_first_name = faker.first_name()
     expected_last_name = faker.last_name()
     expected_emails = sorted([faker.email(), faker.email()])
     
     r = Request(getLogger(__name__), sio_config)
     r.payload = etree.fromstring("""<request>
       <first_name>{}</first_name>
       <last_name>{}</last_name>
       <emails>
        <item>{}</item>
        <item>{}</item>
       </emails>
     </request>""".format(
         expected_first_name, expected_last_name, expected_emails[0], expected_emails[1]))
     
     r.init(True, cid, service_sio, data_format, transport, {})
     
     eq_(r.input.first_name, expected_first_name)
     eq_(r.input.last_name, expected_last_name)
     eq_(r.input.emails, expected_emails)
开发者ID:SciF0r,项目名称:zato,代码行数:30,代码来源:test_service.py


示例11: get_response_data

 def get_response_data(self):
     return Bunch({'id':rand_int(), 'name':self.name, 'host':rand_string(), 'port':rand_int(),
          'queue_manager':rand_int(), 'channel':rand_string(),
          'cache_open_send_queues':rand_bool(), 'cache_open_receive_queues':rand_bool(),
          'use_shared_connections':rand_bool(), 'ssl':rand_bool(),
          'needs_mcd':rand_bool(), 'max_chars_printed':rand_int(),
          'ssl_cipher_spec':rand_string(), 'ssl_key_repository':rand_string()})
开发者ID:barowski,项目名称:zato,代码行数:7,代码来源:test_jms_wmq.py


示例12: test_publish_custom_attrs

 def test_publish_custom_attrs(self):
     self._check_publish(**{
         'mime_type': rand_string(),
         'priority': rand_int(),
         'expiration': rand_int(1000, 2000),
         'msg_id': rand_string(),
     })
开发者ID:azazel75,项目名称:zato,代码行数:7,代码来源:test_api_impl.py


示例13: test_client

    def test_client(self):

        cid = new_cid()
        headers = {'x-zato-cid':cid}
        ok = True
        
        env = {
            'details': rand_string(),
            'result': ZATO_OK,
            'cid': cid
        }
        
        sio_payload_key = rand_string()
        sio_payload = {rand_string(): rand_string()}
        
        sio_response = {
            'zato_env': env,
            sio_payload_key: sio_payload
        }
        
        text = dumps(sio_response)
        status_code = rand_int()
        
        client = self.get_client(FakeInnerResponse(headers, ok, text, status_code))
        response = client.invoke()
        
        eq_(response.ok, ok)
        eq_(response.inner.text, text)
        eq_(response.data.items(), sio_response[sio_payload_key].items())
        eq_(response.has_data, True)
        eq_(response.cid, cid)
        eq_(response.cid, sio_response['zato_env']['cid'])
        eq_(response.details, sio_response['zato_env']['details'])
开发者ID:Aayush-Kasurde,项目名称:zato,代码行数:33,代码来源:test_client.py


示例14: test_sio_list_data_type_input_json

 def test_sio_list_data_type_input_json(self):
     cid = rand_string()
     data_format = DATA_FORMAT.JSON
     transport = rand_string()
     
     sio_config = {'int_parameters': [rand_string()]} # Not really used but needed
     
     service_sio = Bunch()
     service_sio.input_required = ('first_name', 'last_name', List('emails'))
     
     expected_first_name = faker.first_name()
     expected_last_name = faker.last_name()
     expected_emails = sorted([faker.email(), faker.email()])
     
     r = Request(getLogger(__name__), sio_config)
     r.payload = {
         'first_name': expected_first_name,
         'last_name': expected_last_name,
         'emails': expected_emails,
         }
     
     r.init(True, cid, service_sio, data_format, transport, {})
     
     eq_(r.input.first_name, expected_first_name)
     eq_(r.input.last_name, expected_last_name)
     eq_(r.input.emails, expected_emails)
开发者ID:SciF0r,项目名称:zato,代码行数:26,代码来源:test_service.py


示例15: get_response_data

 def get_response_data(self):
     return Bunch({'id':rand_int(), 'name':self.name, 'is_active':rand_bool(), 'is_internal':rand_bool(), 'url_path':rand_string(),
                   'service_id':rand_int(), 'service_name':rand_string(), 'security_id':rand_int(),
                   'security_name':rand_int(), 'sec_type':rand_string(), 'method':rand_string(), 'soap_action':rand_string(), 'soap_version':rand_string(),
                   'data_format':rand_string(), 'host':rand_string(), 'ping_method':rand_string(),
                   'pool_size':rand_int()}
     )
开发者ID:assad2012,项目名称:zato,代码行数:7,代码来源:test_http_soap.py


示例16: test_message_serialization

    def test_message_serialization(self):
        msg_id = rand_string()
        creation_time_utc = rand_date_utc()
        expire_at_utc = rand_date_utc()
        producer = rand_string()
        topic = rand_string()

        actual = self._get_object(Message, {
            'msg_id': msg_id,
            'creation_time_utc': creation_time_utc,
            'expire_at_utc': expire_at_utc,
            'producer': producer,
            'topic':topic,
        })

        expected = {
            'mime_type': PUB_SUB.DEFAULT_MIME_TYPE,
            'msg_id': msg_id,
            'topic': topic,
            'expiration': PUB_SUB.DEFAULT_EXPIRATION,
            'producer': producer,
            'creation_time_utc': creation_time_utc.isoformat(),
            'priority': PUB_SUB.DEFAULT_PRIORITY,
            'expire_at_utc': expire_at_utc.isoformat()
        }

        # Dicts must be equal ..
        assert_equal(actual.to_dict(), expected)

        # .. as well as JSON.
        json = actual.to_json()
        self.assertIsInstance(json, str)
        unjsonified = loads(json)
        assert_equal(unjsonified, expected)
开发者ID:azazel75,项目名称:zato,代码行数:34,代码来源:test_api_impl.py


示例17: setUp

 def setUp(self):
     self.url = rand_string()
     self.auth = None
     self.path = rand_string()
     self.session = FakeSession()
     self.to_bunch = rand_bool()
     self.max_response_repr = 10000
     self.max_cid_repr = rand_int()
     self.logger = rand_object()
开发者ID:Aayush-Kasurde,项目名称:zato,代码行数:9,代码来源:test_client.py


示例18: test_publish_custom_attrs

 def test_publish_custom_attrs(self):
     self._check_publish(
         **{
             "mime_type": rand_string(),
             "priority": rand_int(),
             "expiration": rand_int(1000, 2000),
             "msg_id": rand_string(),
         }
     )
开发者ID:rpeterson,项目名称:zato,代码行数:9,代码来源:test_api_impl.py


示例19: test_retry_limit_reached_msg

    def test_retry_limit_reached_msg(self):
        retry_repeats = rand_int()
        service_name = rand_string()
        retry_seconds = rand_int()
        orig_cid = rand_string()

        msg = retry_limit_reached_msg(retry_repeats, service_name, retry_seconds, orig_cid)

        self.assertEquals(msg, '({}/{}) Retry limit reached for:`{}`, retry_seconds:`{}`, orig_cid:`{}`'.format(
            retry_repeats, retry_repeats, service_name, retry_seconds, orig_cid))
开发者ID:aek,项目名称:zato,代码行数:10,代码来源:test_invoke_retry.py


示例20: test_retry_failed_msg

    def test_retry_failed_msg(self):
        so_far = rand_int()
        retry_repeats = rand_int()
        service_name = rand_string()
        retry_seconds = rand_int()
        orig_cid = rand_string()

        try:
            raise_exception()
        except Exception, e:
            pass
开发者ID:aek,项目名称:zato,代码行数:11,代码来源:test_invoke_retry.py



注:本文中的zato.common.test.rand_string函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python tls.TLSServer类代码示例发布时间:2022-05-26
下一篇:
Python test.rand_int函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap