• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python system_test_utils.unique_resource_id函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中system_test_utils.unique_resource_id函数的典型用法代码示例。如果您正苦于以下问题:Python unique_resource_id函数的具体用法?Python unique_resource_id怎么用?Python unique_resource_id使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了unique_resource_id函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_subscription_iam_policy

    def test_subscription_iam_policy(self):
        from google.cloud.pubsub.iam import PUBSUB_SUBSCRIPTIONS_GET_IAM_POLICY
        self._maybe_emulator_skip()
        topic_name = 'test-sub-iam-policy-topic' + unique_resource_id('-')
        topic = Config.CLIENT.topic(topic_name)
        topic.create()

        # Retry / backoff up to 7 seconds (1 + 2 + 4)
        retry = RetryResult(lambda result: result, max_tries=4)
        retry(topic.exists)()

        self.assertTrue(topic.exists())
        self.to_delete.append(topic)
        SUB_NAME = 'test-sub-iam-policy-sub' + unique_resource_id('-')
        subscription = topic.subscription(SUB_NAME)
        subscription.create()

        # Retry / backoff up to 7 seconds (1 + 2 + 4)
        retry = RetryResult(lambda result: result, max_tries=4)
        retry(subscription.exists)()

        self.assertTrue(subscription.exists())
        self.to_delete.insert(0, subscription)

        if subscription.check_iam_permissions(
                [PUBSUB_SUBSCRIPTIONS_GET_IAM_POLICY]):
            policy = subscription.get_iam_policy()
            policy.viewers.add(policy.user('[email protected]'))
            new_policy = subscription.set_iam_policy(policy)
            self.assertEqual(new_policy.viewers, policy.viewers)
开发者ID:tmatsuo,项目名称:gcloud-python,代码行数:30,代码来源:pubsub.py


示例2: test_fetch_delete_subscription_w_deleted_topic

    def test_fetch_delete_subscription_w_deleted_topic(self):
        TO_DELETE = 'delete-me' + unique_resource_id('-')
        ORPHANED = 'orphaned' + unique_resource_id('-')
        topic = Config.CLIENT.topic(TO_DELETE)
        topic.create()
        subscription = topic.subscription(ORPHANED)
        subscription.create()
        topic.delete()

        all_subs = []
        token = None
        while True:
            subs, token = Config.CLIENT.list_subscriptions(page_token=token)
            all_subs.extend(subs)
            if token is None:
                break

        created = [subscription for subscription in all_subs
                   if subscription.name == ORPHANED]
        self.assertEqual(len(created), 1)
        orphaned = created[0]

        def _no_topic(instance):
            return instance.topic is None

        retry = RetryInstanceState(_no_topic, max_tries=6)
        retry(orphaned.reload)()

        self.assertTrue(orphaned.topic is None)
        orphaned.delete()
开发者ID:muscovitegrinder,项目名称:gcloud-python,代码行数:30,代码来源:pubsub.py


示例3: test_list_subscriptions

    def test_list_subscriptions(self):
        TOPIC_NAME = 'list-sub' + unique_resource_id('-')
        topic = Config.CLIENT.topic(TOPIC_NAME)
        topic.create()
        self.to_delete.append(topic)
        empty, _ = topic.list_subscriptions()
        self.assertEqual(len(empty), 0)
        subscriptions_to_create = [
            'new' + unique_resource_id(),
            'newer' + unique_resource_id(),
            'newest' + unique_resource_id(),
        ]
        for subscription_name in subscriptions_to_create:
            subscription = topic.subscription(subscription_name)
            subscription.create()
            self.to_delete.append(subscription)

        # Retrieve the subscriptions.
        def _all_created(result):
            return len(result[0]) == len(subscriptions_to_create)

        retry = RetryResult(_all_created)
        all_subscriptions, _ = retry(topic.list_subscriptions)()

        created = [subscription for subscription in all_subscriptions
                   if subscription.name in subscriptions_to_create]
        self.assertEqual(len(created), len(subscriptions_to_create))
开发者ID:tmatsuo,项目名称:gcloud-python,代码行数:27,代码来源:pubsub.py


示例4: test_fetch_delete_subscription_w_deleted_topic

    def test_fetch_delete_subscription_w_deleted_topic(self):
        from google.cloud.iterator import MethodIterator
        TO_DELETE = 'delete-me' + unique_resource_id('-')
        ORPHANED = 'orphaned' + unique_resource_id('-')
        topic = Config.CLIENT.topic(TO_DELETE)
        topic.create()
        subscription = topic.subscription(ORPHANED)
        subscription.create()
        topic.delete()

        def _fetch():
            return list(MethodIterator(Config.CLIENT.list_subscriptions))

        def _found_orphan(result):
            names = [subscription.name for subscription in result]
            return ORPHANED in names

        retry_until_found_orphan = RetryResult(_found_orphan)
        all_subs = retry_until_found_orphan(_fetch)()

        created = [subscription for subscription in all_subs
                   if subscription.name == ORPHANED]
        self.assertEqual(len(created), 1)
        orphaned = created[0]

        def _no_topic(instance):
            return instance.topic is None

        retry_until_no_topic = RetryInstanceState(_no_topic)
        retry_until_no_topic(orphaned.reload)()

        self.assertTrue(orphaned.topic is None)
        orphaned.delete()
开发者ID:tmatsuo,项目名称:gcloud-python,代码行数:33,代码来源:pubsub.py


示例5: test_subscription_iam_policy

 def test_subscription_iam_policy(self):
     from gcloud.pubsub.iam import PUBSUB_SUBSCRIPTIONS_GET_IAM_POLICY
     self._maybe_emulator_skip()
     topic_name = 'test-sub-iam-policy-topic' + unique_resource_id('-')
     topic = Config.CLIENT.topic(topic_name)
     topic.create()
     count = 5
     while count > 0 and not topic.exists():
         time.sleep(1)
         count -= 1
     self.assertTrue(topic.exists())
     self.to_delete.append(topic)
     SUB_NAME = 'test-sub-iam-policy-sub' + unique_resource_id('-')
     subscription = topic.subscription(SUB_NAME)
     subscription.create()
     count = 5
     while count > 0 and not subscription.exists():
         time.sleep(1)
         count -= 1
     self.assertTrue(subscription.exists())
     self.to_delete.insert(0, subscription)
     if subscription.check_iam_permissions(
             [PUBSUB_SUBSCRIPTIONS_GET_IAM_POLICY]):
         policy = subscription.get_iam_policy()
         policy.viewers.add(policy.user('[email protected]'))
         new_policy = subscription.set_iam_policy(policy)
         self.assertEqual(new_policy.viewers, policy.viewers)
开发者ID:E-LLP,项目名称:gcloud-python,代码行数:27,代码来源:pubsub.py


示例6: test_list_tables

    def test_list_tables(self):
        dataset = Config.CLIENT.dataset(DATASET_NAME)
        self.assertFalse(dataset.exists())
        dataset.create()
        self.to_delete.append(dataset)

        # Retrieve tables before any are created for the dataset.
        all_tables, token = dataset.list_tables()
        self.assertEqual(all_tables, [])
        self.assertEqual(token, None)

        # Insert some tables to be listed.
        tables_to_create = [
            'new' + unique_resource_id(),
            'newer' + unique_resource_id(),
            'newest' + unique_resource_id(),
        ]
        full_name = bigquery.SchemaField('full_name', 'STRING',
                                         mode='REQUIRED')
        age = bigquery.SchemaField('age', 'INTEGER', mode='REQUIRED')
        for table_name in tables_to_create:
            table = dataset.table(table_name, schema=[full_name, age])
            table.create()
            self.to_delete.insert(0, table)

        # Retrieve the tables.
        all_tables, token = dataset.list_tables()
        self.assertTrue(token is None)
        created = [table for table in all_tables
                   if (table.name in tables_to_create and
                       table.dataset_name == DATASET_NAME)]
        self.assertEqual(len(created), len(tables_to_create))
开发者ID:muscovitegrinder,项目名称:gcloud-python,代码行数:32,代码来源:bigquery.py


示例7: test_create_subscription_defaults

 def test_create_subscription_defaults(self):
     TOPIC_NAME = 'create-sub-def' + unique_resource_id('-')
     topic = Config.CLIENT.topic(TOPIC_NAME)
     self.assertFalse(topic.exists())
     topic.create()
     self.to_delete.append(topic)
     SUBSCRIPTION_NAME = 'subscribing-now' + unique_resource_id('-')
     subscription = topic.subscription(SUBSCRIPTION_NAME)
     self.assertFalse(subscription.exists())
     subscription.create()
     self.to_delete.append(subscription)
     self.assertTrue(subscription.exists())
     self.assertEqual(subscription.name, SUBSCRIPTION_NAME)
     self.assertTrue(subscription.topic is topic)
开发者ID:tmatsuo,项目名称:gcloud-python,代码行数:14,代码来源:pubsub.py


示例8: test_message_pull_mode_e2e

    def test_message_pull_mode_e2e(self):
        topic = Config.CLIENT.topic(DEFAULT_TOPIC_NAME,
                                    timestamp_messages=True)
        self.assertFalse(topic.exists())
        topic.create()
        self.to_delete.append(topic)
        SUBSCRIPTION_NAME = 'subscribing-now' + unique_resource_id('-')
        subscription = topic.subscription(SUBSCRIPTION_NAME)
        self.assertFalse(subscription.exists())
        subscription.create()
        self.to_delete.append(subscription)

        MESSAGE_1 = b'MESSAGE ONE'
        MESSAGE_2 = b'MESSAGE ONE'
        EXTRA_1 = 'EXTRA 1'
        EXTRA_2 = 'EXTRA 2'
        topic.publish(MESSAGE_1, extra=EXTRA_1)
        topic.publish(MESSAGE_2, extra=EXTRA_2)

        received = subscription.pull(max_messages=2)
        ack_ids = [recv[0] for recv in received]
        subscription.acknowledge(ack_ids)
        messages = [recv[1] for recv in received]

        def _by_timestamp(message):
            return message.timestamp

        message1, message2 = sorted(messages, key=_by_timestamp)
        self.assertEqual(message1.data, MESSAGE_1)
        self.assertEqual(message1.attributes['extra'], EXTRA_1)
        self.assertEqual(message2.data, MESSAGE_2)
        self.assertEqual(message2.attributes['extra'], EXTRA_2)
开发者ID:muscovitegrinder,项目名称:gcloud-python,代码行数:32,代码来源:pubsub.py


示例9: test_list_buckets

    def test_list_buckets(self):
        buckets_to_create = [
            'new' + unique_resource_id(),
            'newer' + unique_resource_id(),
            'newest' + unique_resource_id(),
        ]
        created_buckets = []
        for bucket_name in buckets_to_create:
            bucket = Config.CLIENT.create_bucket(bucket_name)
            self.case_buckets_to_delete.append(bucket_name)

        # Retrieve the buckets.
        all_buckets = Config.CLIENT.list_buckets()
        created_buckets = [bucket for bucket in all_buckets
                           if bucket.name in buckets_to_create]
        self.assertEqual(len(created_buckets), len(buckets_to_create))
开发者ID:su27k-2003,项目名称:gcloud-python,代码行数:16,代码来源:storage.py


示例10: setUpModule

def setUpModule():
    Config.CLIENT = storage.Client()
    bucket_name = 'new' + unique_resource_id()
    # In the **very** rare case the bucket name is reserved, this
    # fails with a ConnectionError.
    Config.TEST_BUCKET = Config.CLIENT.bucket(bucket_name)
    retry_429(Config.TEST_BUCKET.create)()
开发者ID:Fkawala,项目名称:gcloud-python,代码行数:7,代码来源:storage.py


示例11: test_create_bucket

 def test_create_bucket(self):
     new_bucket_name = 'a-new-bucket' + unique_resource_id('-')
     self.assertRaises(exceptions.NotFound,
                       Config.CLIENT.get_bucket, new_bucket_name)
     created = Config.CLIENT.create_bucket(new_bucket_name)
     self.case_buckets_to_delete.append(new_bucket_name)
     self.assertEqual(created.name, new_bucket_name)
开发者ID:su27k-2003,项目名称:gcloud-python,代码行数:7,代码来源:storage.py


示例12: test_message_pull_mode_e2e

    def test_message_pull_mode_e2e(self):
        import operator
        TOPIC_NAME = 'message-e2e' + unique_resource_id('-')
        topic = Config.CLIENT.topic(TOPIC_NAME,
                                    timestamp_messages=True)
        self.assertFalse(topic.exists())
        topic.create()
        self.to_delete.append(topic)
        SUBSCRIPTION_NAME = 'subscribing-now' + unique_resource_id('-')
        subscription = topic.subscription(SUBSCRIPTION_NAME)
        self.assertFalse(subscription.exists())
        subscription.create()
        self.to_delete.append(subscription)

        MESSAGE_1 = b'MESSAGE ONE'
        MESSAGE_2 = b'MESSAGE ONE'
        EXTRA_1 = 'EXTRA 1'
        EXTRA_2 = 'EXTRA 2'
        topic.publish(MESSAGE_1, extra=EXTRA_1)
        topic.publish(MESSAGE_2, extra=EXTRA_2)

        class Hoover(object):

            def __init__(self):
                self.received = []

            def done(self, *dummy):
                return len(self.received) == 2

            def suction(self):
                with subscription.auto_ack(max_messages=2) as ack:
                    self.received.extend(ack.values())

        hoover = Hoover()
        retry = RetryInstanceState(hoover.done)
        retry(hoover.suction)()

        message1, message2 = sorted(hoover.received,
                                    key=operator.attrgetter('timestamp'))

        self.assertEqual(message1.data, MESSAGE_1)
        self.assertEqual(message1.attributes['extra'], EXTRA_1)
        self.assertIsNotNone(message1.service_timestamp)

        self.assertEqual(message2.data, MESSAGE_2)
        self.assertEqual(message2.attributes['extra'], EXTRA_2)
        self.assertIsNotNone(message2.service_timestamp)
开发者ID:Fkawala,项目名称:gcloud-python,代码行数:47,代码来源:pubsub.py


示例13: test_list_topics

    def test_list_topics(self):
        topics_to_create = [
            'new' + unique_resource_id(),
            'newer' + unique_resource_id(),
            'newest' + unique_resource_id(),
        ]
        for topic_name in topics_to_create:
            topic = Config.CLIENT.topic(topic_name)
            topic.create()
            self.to_delete.append(topic)

        # Retrieve the topics.
        all_topics, _ = Config.CLIENT.list_topics()
        created = [topic for topic in all_topics
                   if topic.name in topics_to_create and
                   topic.project == Config.CLIENT.project]
        self.assertEqual(len(created), len(topics_to_create))
开发者ID:cksharma,项目名称:gcloud-python,代码行数:17,代码来源:pubsub.py


示例14: setUpModule

def setUpModule():
    _helpers.PROJECT = TESTS_PROJECT
    Config.CLIENT = language.Client()
    # Now create a bucket for GCS stored content.
    storage_client = storage.Client()
    bucket_name = 'new' + unique_resource_id()
    Config.TEST_BUCKET = storage_client.bucket(bucket_name)
    retry_429(Config.TEST_BUCKET.create)()
开发者ID:tmatsuo,项目名称:gcloud-python,代码行数:8,代码来源:language.py


示例15: test_create_topic

 def test_create_topic(self):
     topic_name = 'a-new-topic' + unique_resource_id('-')
     topic = Config.CLIENT.topic(topic_name)
     self.assertFalse(topic.exists())
     topic.create()
     self.to_delete.append(topic)
     self.assertTrue(topic.exists())
     self.assertEqual(topic.name, topic_name)
开发者ID:tmatsuo,项目名称:gcloud-python,代码行数:8,代码来源:pubsub.py


示例16: test_list_datasets

    def test_list_datasets(self):
        datasets_to_create = [
            'new' + unique_resource_id(),
            'newer' + unique_resource_id(),
            'newest' + unique_resource_id(),
        ]
        for dataset_name in datasets_to_create:
            dataset = Config.CLIENT.dataset(dataset_name)
            dataset.create()
            self.to_delete.append(dataset)

        # Retrieve the datasets.
        all_datasets, token = Config.CLIENT.list_datasets()
        self.assertTrue(token is None)
        created = [dataset for dataset in all_datasets
                   if dataset.name in datasets_to_create and
                   dataset.project == Config.CLIENT.project]
        self.assertEqual(len(created), len(datasets_to_create))
开发者ID:muscovitegrinder,项目名称:gcloud-python,代码行数:18,代码来源:bigquery.py


示例17: test_list_datasets

    def test_list_datasets(self):
        datasets_to_create = [
            'new' + unique_resource_id(),
            'newer' + unique_resource_id(),
            'newest' + unique_resource_id(),
        ]
        for dataset_name in datasets_to_create:
            dataset = Config.CLIENT.dataset(dataset_name)
            retry_403(dataset.create)()
            self.to_delete.append(dataset)

        # Retrieve the datasets.
        iterator = Config.CLIENT.list_datasets()
        all_datasets = list(iterator)
        self.assertIsNone(iterator.next_page_token)
        created = [dataset for dataset in all_datasets
                   if dataset.name in datasets_to_create and
                   dataset.project == Config.CLIENT.project]
        self.assertEqual(len(created), len(datasets_to_create))
开发者ID:jonparrott,项目名称:gcloud-python,代码行数:19,代码来源:bigquery.py


示例18: setUpModule

def setUpModule():
    emulator_dataset = os.getenv(GCD_DATASET)
    # Isolated namespace so concurrent test runs don't collide.
    test_namespace = "ns" + unique_resource_id()
    if emulator_dataset is None:
        Config.CLIENT = datastore.Client(namespace=test_namespace)
    else:
        credentials = EmulatorCreds()
        http = httplib2.Http()  # Un-authorized.
        Config.CLIENT = datastore.Client(
            project=emulator_dataset, namespace=test_namespace, credentials=credentials, http=http
        )
开发者ID:jonparrott,项目名称:gcloud-python,代码行数:12,代码来源:datastore.py


示例19: test_list_subscriptions

    def test_list_subscriptions(self):
        topic = Config.CLIENT.topic(DEFAULT_TOPIC_NAME)
        self.assertFalse(topic.exists())
        topic.create()
        self.to_delete.append(topic)
        empty, _ = topic.list_subscriptions()
        self.assertEqual(len(empty), 0)
        subscriptions_to_create = [
            'new' + unique_resource_id(),
            'newer' + unique_resource_id(),
            'newest' + unique_resource_id(),
        ]
        for subscription_name in subscriptions_to_create:
            subscription = topic.subscription(subscription_name)
            subscription.create()
            self.to_delete.append(subscription)

        # Retrieve the subscriptions.
        all_subscriptions, _ = topic.list_subscriptions()
        created = [subscription for subscription in all_subscriptions
                   if subscription.name in subscriptions_to_create]
        self.assertEqual(len(created), len(subscriptions_to_create))
开发者ID:cksharma,项目名称:gcloud-python,代码行数:22,代码来源:pubsub.py


示例20: test_list_topics

    def test_list_topics(self):
        before, _ = Config.CLIENT.list_topics()
        topics_to_create = [
            'new' + unique_resource_id(),
            'newer' + unique_resource_id(),
            'newest' + unique_resource_id(),
        ]
        for topic_name in topics_to_create:
            topic = Config.CLIENT.topic(topic_name)
            topic.create()
            self.to_delete.append(topic)

        # Retrieve the topics.
        def _all_created(result):
            return len(result[0]) == len(before) + len(topics_to_create)

        retry = RetryResult(_all_created)
        after, _ = retry(Config.CLIENT.list_topics)()

        created = [topic for topic in after
                   if topic.name in topics_to_create and
                   topic.project == Config.CLIENT.project]
        self.assertEqual(len(created), len(topics_to_create))
开发者ID:tmatsuo,项目名称:gcloud-python,代码行数:23,代码来源:pubsub.py



注:本文中的system_test_utils.unique_resource_id函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python system_tests_common.SystemTestsCommon类代码示例发布时间:2022-05-27
下一篇:
Python logger.info函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap