• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python testenv.deploy函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中testenv.deploy函数的典型用法代码示例。如果您正苦于以下问题:Python deploy函数的具体用法?Python deploy怎么用?Python deploy使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了deploy函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_invalid_dsl

    def test_invalid_dsl(self):
        # note: this actually tests the validation part of the "deploy" command
        dsl_path = resource("dsl/invalid-dsl.yaml")

        with self.assertRaises(Exception) as cm:
            deploy(dsl_path)
            self.assertTrue('invalid blueprint' in
                            cm.exception.message.lower(), cm.exception.message)
开发者ID:Fewbytes,项目名称:cloudify-manager,代码行数:8,代码来源:test_validate_dsl.py


示例2: test_execute_operation_failure

 def test_execute_operation_failure(self):
     from plugins.cloudmock.tasks import set_raise_exception_on_start
     send_task(set_raise_exception_on_start).get(timeout=10)
     dsl_path = resource("dsl/basic.yaml")
     try:
         deploy(dsl_path)
         self.fail('expected exception')
     except Exception:
         pass
开发者ID:boreys,项目名称:cloudify-manager,代码行数:9,代码来源:test_workflow.py


示例3: test_fail_remote_task_eventual_success

    def test_fail_remote_task_eventual_success(self):
        deploy(resource('dsl/workflow_api.yaml'), self._testMethodName,
               parameters={'do_get': self.do_get})

        # testing workflow remote task
        invocations = send_task(get_fail_invocations).get()
        self.assertEqual(3, len(invocations))
        for i in range(len(invocations) - 1):
            self.assertLessEqual(1, invocations[i+1] - invocations[i])
开发者ID:boreys,项目名称:cloudify-manager,代码行数:9,代码来源:test_wf_api.py


示例4: test_start_monitor_node_operation

 def test_start_monitor_node_operation(self):
     dsl_path = resource("dsl/hardcoded_operation_properties.yaml")
     deploy(dsl_path)
     from plugins.testmockoperations.tasks import \
         get_monitoring_operations_invocation
     invocations = send_task(get_monitoring_operations_invocation)\
         .get(timeout=10)
     self.assertEqual(1, len(invocations))
     invocation = invocations[0]
     self.assertEqual('start_monitor', invocation['operation'])
开发者ID:boreys,项目名称:cloudify-manager,代码行数:10,代码来源:test_workflow.py


示例5: _local_task_fail_impl

 def _local_task_fail_impl(self, wf_name):
     if self.do_get:
         deploy(resource('dsl/workflow_api.yaml'), wf_name,
                parameters={'do_get': self.do_get})
     else:
         self.assertRaises(RuntimeError,
                           deploy,
                           resource('dsl/workflow_api.yaml'),
                           wf_name,
                           parameters={'do_get': self.do_get})
开发者ID:boreys,项目名称:cloudify-manager,代码行数:10,代码来源:test_wf_api.py


示例6: test_inject_properties_to_operation

 def test_inject_properties_to_operation(self):
     dsl_path = resource("dsl/hardcoded_operation_properties.yaml")
     deploy(dsl_path)
     from plugins.testmockoperations.tasks import get_state as \
         testmock_get_state
     states = send_task(testmock_get_state).get(timeout=10)
     from plugins.testmockoperations.tasks import \
         get_mock_operation_invocations as testmock_get__invocations
     invocations = send_task(testmock_get__invocations).get(timeout=10)
     self.assertEqual(1, len(invocations))
     invocation = invocations[0]
     self.assertEqual('mockpropvalue', invocation['mockprop'])
     self.assertEqual(states[0]['id'], invocation['id'])
开发者ID:boreys,项目名称:cloudify-manager,代码行数:13,代码来源:test_workflow.py


示例7: test_deploy_multi_instance_many_different_hosts

    def test_deploy_multi_instance_many_different_hosts(self):
        dsl_path = resource('dsl/multi_instance_many_different_hosts.yaml')
        deploy(dsl_path)

        from plugins.cloudmock.tasks import get_machines
        result = send_task(get_machines)
        machines = set(result.get(timeout=10))
        self.assertEquals(15, len(machines))

        self.assertEquals(5, len(filter(lambda ma: ma.startswith('host1'),
                                        machines)))
        self.assertEquals(5, len(filter(lambda ma: ma.startswith('host2'),
                                        machines)))
        self.assertEquals(5, len(filter(lambda ma: ma.startswith('host3'),
                                        machines)))
开发者ID:boreys,项目名称:cloudify-manager,代码行数:15,代码来源:test_n_instances.py


示例8: test_cancel_on_wait_for_task_termination

 def test_cancel_on_wait_for_task_termination(self):
     _, eid = deploy(
         resource('dsl/workflow_api.yaml'), self._testMethodName,
         parameters={'do_get': self.do_get}, wait_for_execution=False)
     self.wait_for_execution_status(eid, status=Execution.STARTED)
     self.client.executions.cancel(eid)
     self.wait_for_execution_status(eid, status=Execution.CANCELLED)
开发者ID:boreys,项目名称:cloudify-manager,代码行数:7,代码来源:test_wf_api.py


示例9: test_deploy_multi_instance_application

    def test_deploy_multi_instance_application(self):
        dsl_path = resource("dsl/multi_instance.yaml")
        deploy(dsl_path)

        from plugins.cloudmock.tasks import get_machines
        result = send_task(get_machines)
        machines = set(result.get(timeout=10))
        self.assertEquals(2, len(machines))

        from plugins.testmockoperations.tasks import get_state as get_state
        apps_state = send_task(get_state).get(timeout=10)
        machines_with_apps = set([])
        for app_state in apps_state:
            host_id = app_state['capabilities'].keys()[0]
            machines_with_apps.add(host_id)
        self.assertEquals(machines, machines_with_apps)
开发者ID:boreys,项目名称:cloudify-manager,代码行数:16,代码来源:test_n_instances.py


示例10: test_policies

    def test_policies(self):
        dsl_path = resource("dsl/with_policies.yaml")
        deployment, _ = deploy(dsl_path)

        def assertion():
            instances = self.client.node_instances.list(deployment.id)
            self.assertEqual(1, len(instances))
        self.do_assertions(assertion)

        instance = self.client.node_instances.list(deployment.id)[0]
        self.publish_riemann_event(deployment.id,
                                   node_name='node',
                                   node_id=instance.id,
                                   metric=123)

        def assertion():
            executions = self.client.executions.list(deployment.id)
            self.assertEqual(3, len(executions))
            invocations = send_task(testmock_get_invocations).get(timeout=10)
            self.assertEqual(2, len(invocations))
            instances = self.client.node_instances.list(deployment.id)
            self.assertEqual(1, len(instances))
            instance = instances[0]
            self.assertEqual(instance.id, invocations[0]['node_id'])
            self.assertEqual(123, invocations[1]['metric'])
        self.do_assertions(assertion)
开发者ID:mahak,项目名称:cloudify-manager,代码行数:26,代码来源:test_policies.py


示例11: test_update_node_bad_version

    def test_update_node_bad_version(self):
        deploy(resource("dsl/basic.yaml"))
        client = create_rest_client()
        instance = client.node_instances.list()[0]
        instance = client.node_instances.get(instance.id)  # need the version

        props = {"key": "value"}
        result = client.node_instances.update(
            instance.id, state="started", runtime_properties=props, version=instance.version
        )
        self.assertEquals(instance.version + 1, result.version)
        self.assertEquals(instance.id, result.id)
        self.assertDictContainsSubset(props, result.runtime_properties)
        self.assertEquals("started", result.state)

        # making another call with a bad version
        self.assertRaises(CloudifyClientError, client.node_instances.update, instance.id, version=1)
开发者ID:boreys,项目名称:cloudify-manager,代码行数:17,代码来源:test_storage.py


示例12: test_cancel_on_task_retry_interval

 def test_cancel_on_task_retry_interval(self):
     self.configure(retries=2, interval=1000000)
     _, eid = deploy(
         resource('dsl/workflow_api.yaml'), self._testMethodName,
         parameters={'do_get': self.do_get}, wait_for_execution=False)
     self.wait_for_execution_status(eid, status=Execution.STARTED)
     self.client.executions.cancel(eid)
     self.wait_for_execution_status(eid, status=Execution.CANCELLED)
开发者ID:boreys,项目名称:cloudify-manager,代码行数:8,代码来源:test_wf_api.py


示例13: test_cloudify_runtime_properties_injection

    def test_cloudify_runtime_properties_injection(self):
        dsl_path = resource("dsl/dependencies_order_with_two_nodes.yaml")
        deploy(dsl_path)

        from plugins.testmockoperations.tasks import get_state as \
            testmock_get_state
        states = send_task(testmock_get_state).get(timeout=10)
        node_runtime_props = None
        for k, v in states[1]['capabilities'].iteritems():
            if 'host_node' in k:
                node_runtime_props = v
                break
        self.assertEquals('value1', node_runtime_props['property1'])
        self.assertEquals(1,
                          len(node_runtime_props),
                          msg='Expected 2 but contains: {0}'.format(
                              node_runtime_props))
开发者ID:boreys,项目名称:cloudify-manager,代码行数:17,代码来源:test_workflow.py


示例14: test_operation_mapping_override

 def test_operation_mapping_override(self):
     dsl_path = resource("dsl/operation_mapping.yaml")
     deployment, _ = deploy(dsl_path, 'workflow2')
     invocations = send_task(get_mock_operation_invocations).get()
     self.assertEqual(3, len(invocations))
     for invocation in invocations:
         self.assertEqual(1, len(invocation))
         self.assertEqual(invocation['test_key'], 'overridden_test_value')
开发者ID:boreys,项目名称:cloudify-manager,代码行数:8,代码来源:test_operation_mapping.py


示例15: test_get_blueprint

    def test_get_blueprint(self):
        dsl_path = resource("dsl/basic.yaml")
        blueprint_id = str(uuid.uuid4())
        deployment, _ = deploy(dsl_path, blueprint_id=blueprint_id)

        self.assertEqual(blueprint_id, deployment.blueprint_id)
        blueprint = self.client.blueprints.get(blueprint_id)
        self.assertEqual(blueprint_id, blueprint.id)
        self.assertTrue(len(blueprint['plan']) > 0)
开发者ID:boreys,项目名称:cloudify-manager,代码行数:9,代码来源:test_workflow.py


示例16: _test_retries_and_retry_interval_impl

 def _test_retries_and_retry_interval_impl(self,
                                           blueprint,
                                           retries,
                                           retry_interval,
                                           expected_interval,
                                           expected_retries,
                                           invocations_task,
                                           expect_failure=False):
     self.configure(retries=retries, retry_interval=retry_interval)
     if expect_failure:
         self.assertRaises(RuntimeError, deploy, resource(blueprint))
     else:
         deploy(resource(blueprint))
     invocations = send_task(invocations_task).get()
     self.assertEqual(expected_retries + 1, len(invocations))
     for i in range(len(invocations) - 1):
         self.assertLessEqual(expected_interval,
                              invocations[i+1] - invocations[i])
开发者ID:boreys,项目名称:cloudify-manager,代码行数:18,代码来源:test_task_retries.py


示例17: test_deploy_multi_instance_application

    def test_deploy_multi_instance_application(self):
        dsl_path = resource("dsl/multi_instance.yaml")
        deploy(dsl_path)

        from cosmo.cloudmock.tasks import get_machines
        result = get_machines.apply_async()
        machines = set(result.get(timeout=10))
        self.assertEquals(2, len(machines))

        from cosmo.testmockoperations.tasks import get_state as get_state
        apps_state = get_state.apply_async().get(timeout=10)
        machines_with_apps = set([])
        for app_state in apps_state:
            host_id = app_state['relationships'].keys()[0]
            machines_with_apps.add(host_id)
        self.assertEquals(machines, machines_with_apps)

        pass
开发者ID:erezgiga,项目名称:cosmo-manager,代码行数:18,代码来源:test_n_instances.py


示例18: test_deployment_workflows

 def test_deployment_workflows(self):
     dsl_path = resource("dsl/custom_workflow_mapping.yaml")
     deployment, _ = deploy(dsl_path)
     deployment_id = deployment.id
     workflows = self.client.deployments.get(deployment_id).workflows
     self.assertEqual(3, len(workflows))
     wf_ids = [x.name for x in workflows]
     self.assertTrue('uninstall' in wf_ids)
     self.assertTrue('install' in wf_ids)
     self.assertTrue('custom' in wf_ids)
开发者ID:mahak,项目名称:cloudify-manager,代码行数:10,代码来源:test_deployment_workflows.py


示例19: test_plugin_get_resource

    def test_plugin_get_resource(self):
        dsl_path = resource("dsl/get_resource_in_plugin.yaml")
        deploy(dsl_path)
        from plugins.testmockoperations.tasks import \
            get_resource_operation_invocations as testmock_get_invocations
        invocations = send_task(testmock_get_invocations).get(
            timeout=10)
        self.assertEquals(1, len(invocations))
        invocation = invocations[0]
        with open(resource("dsl/basic.yaml")) as f:
            basic_data = f.read()

        # checking the resources are the correct data
        self.assertEquals(basic_data, invocation['res1_data'])
        self.assertEquals(basic_data, invocation['res2_data'])

        # checking the custom filepath provided is indeed where the second
        # resource was saved
        self.assertEquals(invocation['custom_filepath'],
                          invocation['res2_path'])
开发者ID:boreys,项目名称:cloudify-manager,代码行数:20,代码来源:test_workflow.py


示例20: test_execute_operation

    def test_execute_operation(self):
        dsl_path = resource("dsl/basic.yaml")
        blueprint_id = self.id()
        deployment, _ = deploy(dsl_path, blueprint_id=blueprint_id)

        self.assertEqual(blueprint_id, deployment.blueprint_id)

        from plugins.cloudmock.tasks import get_machines
        result = send_task(get_machines)
        machines = result.get(timeout=10)

        self.assertEquals(1, len(machines))
开发者ID:mahak,项目名称:cloudify-manager,代码行数:12,代码来源:test_workflow.py



注:本文中的testenv.deploy函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python testenv.main函数代码示例发布时间:2022-05-27
下一篇:
Python testenv.configure_for_tests函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap