• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python testcase.TestMode类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tests.testcase.TestMode的典型用法代码示例。如果您正苦于以下问题:Python TestMode类的具体用法?Python TestMode怎么用?Python TestMode使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了TestMode类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_sas_update

    def test_sas_update(self):
        # SAS URL is calculated from storage key, so this test runs live only
        if TestMode.need_recording_file(self.test_mode):
            return

        # Arrange
        queue_name = self._create_queue()
        self.qs.put_message(queue_name, u'message1')
        token = self.qs.generate_queue_shared_access_signature(
            queue_name,
            QueuePermissions.UPDATE,
            datetime.utcnow() + timedelta(hours=1),
        )
        result = self.qs.get_messages(queue_name)

        # Act
        service = QueueService(
            account_name=self.settings.STORAGE_ACCOUNT_NAME,
            sas_token=token,
        )
        self._set_service_options(service, self.settings)
        service.update_message(
            queue_name,
            result[0].id,
            result[0].pop_receipt,
            visibility_timeout=0,
            content=u'updatedmessage1',
        )

        # Assert
        result = self.qs.get_messages(queue_name)
        self.assertEqual(u'updatedmessage1', result[0].content)
开发者ID:afshinm,项目名称:azure-storage-python,代码行数:32,代码来源:test_queue.py


示例2: test_sas_upper_case_table_name

    def test_sas_upper_case_table_name(self):
        # SAS URL is calculated from storage key, so this test runs live only
        if TestMode.need_recording_file(self.test_mode):
            return

        # Arrange
        entity = self._insert_random_entity()

        # Table names are case insensitive, so simply upper case our existing table name to test
        token = self.ts.generate_table_shared_access_signature(
            self.table_name.upper(),
            TablePermissions.QUERY,
            datetime.utcnow() + timedelta(hours=1),
            datetime.utcnow() - timedelta(minutes=1),
        )

        # Act
        service = TableService(
            account_name=self.settings.STORAGE_ACCOUNT_NAME,
            sas_token=token,
        )
        self._set_service_options(service, self.settings)
        entities = list(service.query_entities(self.table_name, 
                                               filter="PartitionKey eq '{}'".format(entity['PartitionKey'])))

        # Assert
        self.assertEqual(len(entities), 1)
        self._assert_default_entity(entities[0])
开发者ID:afshinm,项目名称:azure-storage-python,代码行数:28,代码来源:test_table_entity.py


示例3: test_sas_signed_identifier

    def test_sas_signed_identifier(self):
        # SAS URL is calculated from storage key, so this test runs live only
        if TestMode.need_recording_file(self.test_mode):
            return

        # Arrange
        entity = self._insert_random_entity()

        access_policy = AccessPolicy()
        access_policy.start = '2011-10-11'
        access_policy.expiry = '2018-10-12'
        access_policy.permission = TablePermissions.QUERY
        identifiers = {'testid': access_policy}

        entities = self.ts.set_table_acl(self.table_name, identifiers)

        token = self.ts.generate_table_shared_access_signature(
            self.table_name,
            id='testid',
        )

        # Act
        service = TableService(
            account_name=self.settings.STORAGE_ACCOUNT_NAME,
            sas_token=token,
        )
        self._set_service_options(service, self.settings)
        entities = list(self.ts.query_entities(self.table_name, filter="PartitionKey eq '{}'".format(entity.PartitionKey)))

        # Assert
        self.assertEqual(len(entities), 1)
        self._assert_default_entity(entities[0])
开发者ID:afshinm,项目名称:azure-storage-python,代码行数:32,代码来源:test_table_entity.py


示例4: test_sas_update

    def test_sas_update(self):
        # SAS URL is calculated from storage key, so this test runs live only
        if TestMode.need_recording_file(self.test_mode):
            return

        # Arrange
        entity = self._insert_random_entity()
        token = self.ts.generate_table_shared_access_signature(
            self.table_name,
            TablePermissions.UPDATE,
            datetime.utcnow() + timedelta(hours=1),
        )

        # Act
        service = TableService(
            account_name=self.settings.STORAGE_ACCOUNT_NAME,
            sas_token=token,
        )
        self._set_service_options(service, self.settings)
        updated_entity = self._create_updated_entity_dict(entity.PartitionKey, entity.RowKey)
        resp = service.update_entity(self.table_name, updated_entity)

        # Assert
        received_entity = self.ts.get_entity(self.table_name, entity.PartitionKey, entity.RowKey)
        self._assert_updated_entity(received_entity)
开发者ID:afshinm,项目名称:azure-storage-python,代码行数:25,代码来源:test_table_entity.py


示例5: test_sas_add_inside_range

    def test_sas_add_inside_range(self):
        # SAS URL is calculated from storage key, so this test runs live only
        if TestMode.need_recording_file(self.test_mode):
            return

        # Arrange
        
        token = self.ts.generate_table_shared_access_signature(
            self.table_name,
            TablePermissions.ADD,
            datetime.utcnow() + timedelta(hours=1),
            start_pk='test', start_rk='test1',
            end_pk='test', end_rk='test1',
        )

        # Act
        service = TableService(
            account_name=self.settings.STORAGE_ACCOUNT_NAME,
            sas_token=token,
        )
        self._set_service_options(service, self.settings)
        entity = self._create_random_entity_dict('test', 'test1')
        service.insert_entity(self.table_name, entity)

        # Assert
        resp = self.ts.get_entity(self.table_name, 'test', 'test1')
        self._assert_default_entity(resp)
开发者ID:afshinm,项目名称:azure-storage-python,代码行数:27,代码来源:test_table_entity.py


示例6: test_generate_account_sas

    def test_generate_account_sas(self):
        # SAS URL is calculated from storage key, so this test runs live only
        if TestMode.need_recording_file(self.test_mode):
            return

        # Arrange
        token = self.account.generate_shared_access_signature(
            Services.BLOB,
            ResourceTypes.OBJECT,
            AccountPermissions.READ,
            datetime.utcnow() + timedelta(hours=1),
        )

        service = self.account.create_block_blob_service()
        data = b'shared access signature with read permission on blob'
        container_name = 'container1'
        blob_name = 'blob1.txt'

        try:
            service.create_container(container_name)
            service.create_blob_from_bytes(container_name, blob_name, data)

            # Act
            url = service.make_blob_url(
                container_name,
                blob_name,
                sas_token=token,
            )
            response = requests.get(url)

            # Assert
            self.assertTrue(response.ok)
            self.assertEqual(data, response.content)
        finally:
            service.delete_container(container_name)
开发者ID:rickle-msft,项目名称:azure-storage-python,代码行数:35,代码来源:test_account.py


示例7: test_sas_process

    def test_sas_process(self):
        # SAS URL is calculated from storage key, so this test runs live only
        if TestMode.need_recording_file(self.test_mode):
            return

        # Arrange
        queue_name = self._create_queue()
        self.qs.put_message(queue_name, u'message1')
        token = self.qs.generate_queue_shared_access_signature(
            queue_name,
            QueuePermissions.PROCESS,
            datetime.utcnow() + timedelta(hours=1),
        )

        # Act
        service = QueueService(
            account_name=self.settings.STORAGE_ACCOUNT_NAME,
            sas_token=token,
        )
        self._set_service_options(service, self.settings)
        result = service.get_messages(queue_name)

        # Assert
        self.assertIsNotNone(result)
        self.assertEqual(1, len(result))
        message = result[0]
        self.assertIsNotNone(message)
        self.assertNotEqual('', message.id)
        self.assertEqual(u'message1', message.content)
开发者ID:afshinm,项目名称:azure-storage-python,代码行数:29,代码来源:test_queue.py


示例8: test_account_sas

    def test_account_sas(self):
        # SAS URL is calculated from storage key, so this test runs live only
        if TestMode.need_recording_file(self.test_mode):
            return

        # Arrange
        file_name = self._create_file()

        token = self.fs.generate_account_shared_access_signature(
            ResourceTypes.OBJECT,
            AccountPermissions.READ,
            datetime.utcnow() + timedelta(hours=1),
        )

        # Act
        url = self.fs.make_file_url(
            self.share_name,
            None,
            file_name,
            sas_token=token,
        )
        response = requests.get(url)

        # Assert
        self.assertTrue(response.ok)
        self.assertEqual(self.short_byte_data, response.content)
开发者ID:jofriedm-msft,项目名称:azure-storage-python,代码行数:26,代码来源:test_file.py


示例9: test_shared_access_share

    def test_shared_access_share(self):
        # SAS URL is calculated from storage key, so this test runs live only
        if TestMode.need_recordingfile(self.test_mode):
            return

        # Arrange
        file_name  = 'file1'
        dir_name = 'dir1'
        data = b'hello world'

        share_name = self._create_share()
        self.fs.create_directory(share_name, dir_name)
        self.fs.create_file_from_bytes(share_name, dir_name, file_name, data)

        token = self.fs.generate_share_shared_access_signature(
            share_name,
            expiry=datetime.utcnow() + timedelta(hours=1),
            permission=SharePermissions.READ,
        )
        url = self.fs.make_file_url(
            share_name,
            dir_name,
            file_name,
            sas_token=token,
        )

        # Act
        response = requests.get(url)

        # Assert
        self.assertTrue(response.ok)
        self.assertEqual(data, response.content)
开发者ID:Siriuszyq,项目名称:azure-storage-python,代码行数:32,代码来源:test_share.py


示例10: test_ranged_get_file_to_path_with_progress

    def test_ranged_get_file_to_path_with_progress(self):
        # parallel tests introduce random order of requests, can only run live
        if TestMode.need_recording_file(self.test_mode):
            return

        # Arrange
        progress = []

        def callback(current, total):
            progress.append((current, total))

        # Act
        start_range = 3
        end_range = self.fs.MAX_SINGLE_GET_SIZE + 1024
        file = self.fs.get_file_to_path(self.share_name, self.directory_name, self.byte_file, FILE_PATH,
                                        start_range=start_range, end_range=end_range,
                                        progress_callback=callback)

        # Assert
        self.assertIsInstance(file, File)
        with open(FILE_PATH, 'rb') as stream:
            actual = stream.read()
            self.assertEqual(self.byte_data[start_range:end_range + 1], actual)
        self.assert_download_progress(end_range - start_range + 1, self.fs.MAX_CHUNK_GET_SIZE,
                                      self.fs.MAX_SINGLE_GET_SIZE, progress)
开发者ID:rickle-msft,项目名称:azure-storage-python,代码行数:25,代码来源:test_get_file.py


示例11: test_sas_signed_identifier

    def test_sas_signed_identifier(self):
        # SAS URL is calculated from storage key, so this test runs live only
        if TestMode.need_recording_file(self.test_mode):
            return

        # Arrange
        blob_name = self._create_block_blob()

        access_policy = AccessPolicy()
        access_policy.start = '2011-10-11'
        access_policy.expiry = '2018-10-12'
        access_policy.permission = BlobPermissions.READ
        identifiers = {'testid': access_policy}

        resp = self.bs.set_container_acl(self.container_name, identifiers)

        token = self.bs.generate_blob_shared_access_signature(
            self.container_name,
            blob_name,
            id='testid'
            )

        # Act
        service = BlockBlobService(
            self.settings.STORAGE_ACCOUNT_NAME,
            sas_token=token,
            request_session=requests.Session(),
        )
        self._set_test_proxy(service, self.settings)
        result = service.get_blob_to_bytes(self.container_name, blob_name)

        # Assert
        self.assertEqual(self.byte_data, result.content)
开发者ID:rickle-msft,项目名称:azure-storage-python,代码行数:33,代码来源:test_common_blob.py


示例12: test_create_file_from_stream_with_progress_truncated

    def test_create_file_from_stream_with_progress_truncated(self):
        # parallel tests introduce random order of requests, can only run live
        if TestMode.need_recording_file(self.test_mode):
            return

        # Arrange       
        file_name = self._get_file_reference()
        data = self.get_random_bytes(LARGE_FILE_SIZE)
        with open(INPUT_FILE_PATH, 'wb') as stream:
            stream.write(data)

        # Act
        progress = []

        def callback(current, total):
            progress.append((current, total))

        file_size = len(data) - 5
        with open(INPUT_FILE_PATH, 'rb') as stream:
            self.fs.create_file_from_stream(self.share_name, None, file_name, stream, 
                                            file_size, progress_callback=callback)

        # Assert
        self.assertFileEqual(self.share_name, None, file_name, data[:file_size])
        self.assert_upload_progress(file_size, self.fs.MAX_RANGE_SIZE, progress, unknown_size=False)
开发者ID:jofriedm-msft,项目名称:azure-storage-python,代码行数:25,代码来源:test_file.py


示例13: test_get_blob_to_stream_with_progress_parallel

    def test_get_blob_to_stream_with_progress_parallel(self):
        # parallel tests introduce random order of requests, can only run live
        if TestMode.need_recordingfile(self.test_mode):
            return

        # Arrange

        # Act
        progress = []

        def callback(current, total):
            progress.append((current, total))

        with open(FILE_PATH, 'wb') as stream:
            blob = self.bs.get_blob_to_stream(
                self.container_name, self.byte_blob, stream,
                progress_callback=callback,
                max_connections=5)

        # Assert
        self.assertIsInstance(blob, Blob)
        with open(FILE_PATH, 'rb') as stream:
            actual = stream.read()
            self.assertEqual(self.byte_data, actual)
        self.assert_download_progress(len(self.byte_data), self.bs.MAX_CHUNK_GET_SIZE, progress, single_download=False)
开发者ID:Siriuszyq,项目名称:azure-storage-python,代码行数:25,代码来源:test_get_blob.py


示例14: test_get_blob_range_with_range_md5

    def test_get_blob_range_with_range_md5(self):
        # parallel tests introduce random order of requests, can only run live
        if TestMode.need_recording_file(self.test_mode):
            return

        blob = self.bs.get_blob_to_bytes(self.container_name, self.byte_blob, start_range=0,
                                         end_range=1024, validate_content=True)

        # Arrange
        props = self.bs.get_blob_properties(self.container_name, self.byte_blob)
        props.properties.content_settings.content_md5 = None;
        self.bs.set_blob_properties(self.container_name, self.byte_blob, props.properties.content_settings)

        # Act
        blob = self.bs.get_blob_to_bytes(self.container_name, self.byte_blob, start_range=0,
                                         end_range=1024, validate_content=True)

        # Assert
        self.assertTrue(hasattr(blob.properties.content_settings, "content_type"));
        self.assertFalse(hasattr(blob.properties.content_settings, "content_md5"));

        # Act
        blob = self.bs.get_blob_to_bytes(self.container_name, self.byte_blob, start_range=0,
                                         end_range=1024, validate_content=True)

        # Assert
        self.assertTrue(hasattr(blob.properties.content_settings, "content_type"));
        self.assertFalse(hasattr(blob.properties.content_settings, "content_md5"));
开发者ID:asorrin-msft,项目名称:azure-storage-python,代码行数:28,代码来源:test_get_blob.py


示例15: test_shared_delete_access_blob

    def test_shared_delete_access_blob(self):
        # SAS URL is calculated from storage key, so this test runs live only
        if TestMode.need_recording_file(self.test_mode):
            return

        # Arrange
        blob_name = self._create_block_blob()

        token = self.bs.generate_blob_shared_access_signature(
            self.container_name,
            blob_name,
            permission=BlobPermissions.DELETE,
            expiry=datetime.utcnow() + timedelta(hours=1),
        )
        url = self.bs.make_blob_url(
            self.container_name,
            blob_name,
            sas_token=token,
        )

        # Act
        response = requests.delete(url)

        # Assert
        self.assertTrue(response.ok)
        with self.assertRaises(AzureMissingResourceHttpError):
            blob = self.bs.get_blob_to_bytes(self.container_name, blob_name)
开发者ID:rickle-msft,项目名称:azure-storage-python,代码行数:27,代码来源:test_common_blob.py


示例16: test_sas_access_blob

    def test_sas_access_blob(self):
        # SAS URL is calculated from storage key, so this test runs live only
        if TestMode.need_recording_file(self.test_mode):
            return

        # Arrange
        blob_name = self._create_block_blob()
        
        token = self.bs.generate_blob_shared_access_signature(
            self.container_name,
            blob_name,
            permission=BlobPermissions.READ,
            expiry=datetime.utcnow() + timedelta(hours=1),
        )

        # Act
        service = BlockBlobService(
            self.settings.STORAGE_ACCOUNT_NAME,
            sas_token=token,
            request_session=requests.Session(),
        )
        self._set_test_proxy(service, self.settings)
        result = service.get_blob_to_bytes(self.container_name, blob_name)

        # Assert
        self.assertEqual(self.byte_data, result.content)
开发者ID:rickle-msft,项目名称:azure-storage-python,代码行数:26,代码来源:test_common_blob.py


示例17: test_shared_read_access_blob_with_content_query_params

    def test_shared_read_access_blob_with_content_query_params(self):
        # SAS URL is calculated from storage key, so this test runs live only
        if TestMode.need_recording_file(self.test_mode):
            return

        # Arrange
        blob_name = self._create_block_blob()

        token = self.bs.generate_blob_shared_access_signature(
            self.container_name,
            blob_name,
            permission=BlobPermissions.READ,
            expiry=datetime.utcnow() + timedelta(hours=1),
            cache_control='no-cache',
            content_disposition='inline',
            content_encoding='utf-8',
            content_language='fr',
            content_type='text',
        )
        url = self.bs.make_blob_url(
            self.container_name,
            blob_name,
            sas_token=token,
        )

        # Act
        response = requests.get(url)

        # Assert
        self.assertEqual(self.byte_data, response.content)
        self.assertEqual(response.headers['cache-control'], 'no-cache')
        self.assertEqual(response.headers['content-disposition'], 'inline')
        self.assertEqual(response.headers['content-encoding'], 'utf-8')
        self.assertEqual(response.headers['content-language'], 'fr')
        self.assertEqual(response.headers['content-type'], 'text')
开发者ID:rickle-msft,项目名称:azure-storage-python,代码行数:35,代码来源:test_common_blob.py


示例18: test_shared_write_access_blob

    def test_shared_write_access_blob(self):
        # SAS URL is calculated from storage key, so this test runs live only
        if TestMode.need_recording_file(self.test_mode):
            return

        # Arrange
        updated_data = b'updated blob data'
        blob_name = self._create_block_blob()

        token = self.bs.generate_blob_shared_access_signature(
            self.container_name,
            blob_name,
            permission=BlobPermissions.WRITE,
            expiry=datetime.utcnow() + timedelta(hours=1),
        )
        url = self.bs.make_blob_url(
            self.container_name,
            blob_name,
            sas_token=token,
        )

        # Act
        headers = {'x-ms-blob-type': self.bs.blob_type}
        response = requests.put(url, headers=headers, data=updated_data)

        # Assert
        self.assertTrue(response.ok)
        blob = self.bs.get_blob_to_bytes(self.container_name, blob_name)
        self.assertEqual(updated_data, blob.content)
开发者ID:rickle-msft,项目名称:azure-storage-python,代码行数:29,代码来源:test_common_blob.py


示例19: test_shared_write_access_file

    def test_shared_write_access_file(self):
        # SAS URL is calculated from storage key, so this test runs live only
        if TestMode.need_recording_file(self.test_mode):
            return

        # Arrange
        updated_data = b'updated file data'
        file_name = self._create_file()

        token = self.fs.generate_file_shared_access_signature(
            self.share_name,
            None,
            file_name,
            permission=FilePermissions.WRITE,
            expiry=datetime.utcnow() + timedelta(hours=1),
        )
        url = self.fs.make_file_url(
            self.share_name,
            None,
            file_name,
            sas_token=token,
        )

        # Act
        headers={'x-ms-range': 'bytes=0-16', 'x-ms-write': 'update'} 
        response = requests.put(url + '&comp=range', headers=headers, data=updated_data)

        # Assert
        self.assertTrue(response.ok)
        file = self.fs.get_file_to_bytes(self.share_name, None, file_name)
        self.assertEqual(updated_data, file.content[:len(updated_data)])
开发者ID:jofriedm-msft,项目名称:azure-storage-python,代码行数:31,代码来源:test_file.py


示例20: test_shared_access_container

    def test_shared_access_container(self):
        # SAS URL is calculated from storage key, so this test runs live only
        if TestMode.need_recording_file(self.test_mode):
            return

        # Arrange
        container_name = self._create_container()
        blob_name  = 'blob1'
        data = b'hello world'

        self.bs.create_blob_from_bytes (container_name, blob_name, data)

        token = self.bs.generate_container_shared_access_signature(
            container_name,
            expiry=datetime.utcnow() + timedelta(hours=1),
            permission=ContainerPermissions.READ,
        )
        url = self.bs.make_blob_url(
            container_name,
            blob_name,
            sas_token=token,
        )

        # Act
        response = requests.get(url)

        # Assert
        self.assertTrue(response.ok)
        self.assertEqual(data, response.content)
开发者ID:rickle-msft,项目名称:azure-storage-python,代码行数:29,代码来源:test_container.py



注:本文中的tests.testcase.TestMode类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python testing.make_environ函数代码示例发布时间:2022-05-27
下一篇:
Python testapp.testapp函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap