• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python utils.deploy函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中testenv.utils.deploy函数的典型用法代码示例。如果您正苦于以下问题:Python deploy函数的具体用法?Python deploy怎么用?Python deploy使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了deploy函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _test_retries_and_retry_interval_impl

 def _test_retries_and_retry_interval_impl(self,
                                           blueprint,
                                           retries,
                                           retry_interval,
                                           expected_interval,
                                           expected_retries,
                                           invocations_type,
                                           expect_failure=False,
                                           inputs=None):
     self.configure(retries=retries, retry_interval=retry_interval)
     deployment_id = str(uuid.uuid4())
     if expect_failure:
         with self.assertRaises(RuntimeError) as cm:
             deploy(
                 dsl_path=resource(blueprint),
                 deployment_id=deployment_id,
                 inputs=inputs)
         self.assertIn('Failing task on user defined exception',
                       str(cm.exception))
     else:
         deploy(resource(blueprint),
                deployment_id=deployment_id,
                inputs=inputs)
     invocations = self.get_plugin_data(
         plugin_name='testmockoperations',
         deployment_id=deployment_id
     )[invocations_type]
     self.assertEqual(expected_retries + 1, len(invocations))
     for i in range(len(invocations) - 1):
         self.assertLessEqual(expected_interval,
                              invocations[i+1] - invocations[i])
开发者ID:arcivanov,项目名称:cloudify-manager,代码行数:31,代码来源:test_task_retries.py


示例2: _test_retries_and_retry_interval_impl

 def _test_retries_and_retry_interval_impl(self,
                                           blueprint,
                                           retries,
                                           retry_interval,
                                           expected_interval,
                                           expected_retries,
                                           invocations_type,
                                           expect_failure=False):
     self.configure(retries=retries, retry_interval=retry_interval)
     deployment_id = str(uuid.uuid4())
     if expect_failure:
         self.assertRaises(RuntimeError, deploy,
                           dsl_path=resource(blueprint),
                           deployment_id=deployment_id)
     else:
         deploy(resource(blueprint),
                deployment_id=deployment_id)
     invocations = self.get_plugin_data(
         plugin_name='testmockoperations',
         deployment_id=deployment_id
     )[invocations_type]
     self.assertEqual(expected_retries + 1, len(invocations))
     for i in range(len(invocations) - 1):
         self.assertLessEqual(expected_interval,
                              invocations[i+1] - invocations[i])
开发者ID:filipposc5,项目名称:cloudify-manager,代码行数:25,代码来源:test_task_retries.py


示例3: test_deployment_logs

    def test_deployment_logs(self):
        message = 'TEST MESSAGE'
        inputs = {'message': message}

        dsl_path = resource("dsl/deployment_logs.yaml")
        deployment, _ = deploy(dsl_path, inputs=inputs)

        work_dir = testenv.testenv_instance.test_working_dir
        deployment_log_path = os.path.join(
            work_dir, 'cloudify.management', 'work', 'logs',
            '{0}.log'.format(deployment.id))

        def verify_logs_exist_with_content():
            print deployment_log_path
            self.assertTrue(os.path.isfile(deployment_log_path))
            with open(deployment_log_path) as f:
                self.assertIn(message, f.read())

        verify_logs_exist_with_content()

        undeploy(deployment.id, is_delete_deployment=True)

        # Verify log file id truncated on deployment delete
        with open(deployment_log_path) as f:
            self.assertTrue('' == f.read())

        deployment, _ = deploy(dsl_path, inputs=inputs,
                               deployment_id=deployment.id)

        # Verify new deployment with the same deployment id
        # can write to the previous location.
        verify_logs_exist_with_content()
开发者ID:01000101,项目名称:cloudify-manager,代码行数:32,代码来源:test_deployment_logs.py


示例4: test_execute_operation_failure

 def test_execute_operation_failure(self):
     deployment_id = str(uuid.uuid4())
     dsl_path = resource("dsl/basic.yaml")
     try:
         deploy(dsl_path, deployment_id=deployment_id)
         self.fail('expected exception')
     except Exception as e:
         if e.message:
             self.logger.info(e.message)
         pass
开发者ID:nagyist,项目名称:cloudify-manager,代码行数:10,代码来源:test_workflow.py


示例5: _local_task_fail_impl

 def _local_task_fail_impl(self, wf_name):
     if self.do_get:
         deploy(resource('dsl/workflow_api.yaml'), wf_name,
                parameters={'do_get': self.do_get})
     else:
         self.assertRaises(RuntimeError,
                           deploy,
                           resource('dsl/workflow_api.yaml'),
                           wf_name,
                           parameters={'do_get': self.do_get})
开发者ID:filipposc5,项目名称:cloudify-manager,代码行数:10,代码来源:test_wf_api.py


示例6: _test1

 def _test1(self):
     num_deps = 5
     for i in range(num_deps):
         deploy(resource('dsl/basic.yaml'), deployment_id='d{0}'.format(i))
     while True:
         for workflow in ['uninstall', 'install']:
             executions = []
             for i in range(num_deps):
                 execution = execute_workflow(
                     workflow,
                     deployment_id='d{0}'.format(i),
                     wait_for_execution=False)
                 executions.append(execution)
             for execution in executions:
                 wait_for_execution_to_end(execution)
开发者ID:isaac-s,项目名称:cloudify-manager,代码行数:15,代码来源:test_workflow.py


示例7: test_executions_sort

 def test_executions_sort(self):
     deployment = deploy(resource('dsl/sort.yaml'))
     for i in range(5):
         execute_workflow('install', deployment.id)
         execute_workflow('uninstall', deployment.id)
     self._test_sort('executions',
                     ['deployment_id', '-status'])
开发者ID:01000101,项目名称:cloudify-manager,代码行数:7,代码来源:test_rest_service_sort.py


示例8: test_plugin_workdir

    def test_plugin_workdir(self):
        filename = 'test_plugin_workdir.txt'
        host_content = 'HOST_CONTENT'
        central_content = 'CENTRAL_CONTENT'

        dsl_path = resource("dsl/plugin_workdir.yaml")
        deployment, _ = deploy(dsl_path,
                               inputs={
                                   'filename': filename,
                                   'host_content': host_content,
                                   'central_content': central_content
                               })
        host_id = self.client.node_instances.list(node_id='host').items[0].id

        from testenv import testenv_instance
        test_workdir = testenv_instance.test_working_dir
        central_agent = CeleryWorkerProcess(['cloudify.management'],
                                            test_workdir)
        host_agent = CeleryWorkerProcess([host_id], test_workdir)

        central_file = os.path.join(
            central_agent.workdir, 'deployments', deployment.id, 'plugins',
            'testmockoperations', filename)
        host_file = os.path.join(
            host_agent.workdir, 'plugins', 'testmockoperations', filename)

        with open(central_file) as f:
            self.assertEqual(central_content, f.read())
        with open(host_file) as f:
            self.assertEqual(host_content, f.read())
开发者ID:01000101,项目名称:cloudify-manager,代码行数:30,代码来源:test_plugin_workdir.py


示例9: test_cancel_on_wait_for_task_termination

 def test_cancel_on_wait_for_task_termination(self):
     _, eid = deploy(
         resource('dsl/workflow_api.yaml'), self._testMethodName,
         parameters={'do_get': self.do_get}, wait_for_execution=False)
     self.wait_for_execution_status(eid, status=Execution.STARTED)
     self.client.executions.cancel(eid)
     self.wait_for_execution_status(eid, status=Execution.CANCELLED)
开发者ID:filipposc5,项目名称:cloudify-manager,代码行数:7,代码来源:test_wf_api.py


示例10: test_pre_source_started_location_source

 def test_pre_source_started_location_source(self):
     dsl_path = resource(
         "dsl/relationship_interface_pre_source_location_source.yaml")
     deployment, _ = deploy(dsl_path)
     self.verify_assertions(deployment.id,
                            hook='pre-init',
                            runs_on_source=True)
开发者ID:01000101,项目名称:cloudify-manager,代码行数:7,代码来源:test_relationships.py


示例11: test_executions_pagination

 def test_executions_pagination(self):
     deployment = deploy(resource('dsl/pagination.yaml'))
     for i in range(5):
         execute_workflow('install', deployment.id)
         execute_workflow('uninstall', deployment.id)
     self._test_pagination(partial(self.client.executions.list,
                                   deployment_id=deployment.id))
开发者ID:Acidburn0zzz,项目名称:cloudify-manager,代码行数:7,代码来源:test_rest_service_pagination.py


示例12: test_post_source_started_location_target

 def test_post_source_started_location_target(self):
     dsl_path = resource(
         "dsl/relationship_interface_post_source_location_target.yaml")
     deployment, _ = deploy(dsl_path)
     self.verify_assertions(deployment.id,
                            hook='post-init',
                            runs_on_source=False)
开发者ID:01000101,项目名称:cloudify-manager,代码行数:7,代码来源:test_relationships.py


示例13: test_deploy_with_operation_executor_override

    def test_deploy_with_operation_executor_override(self):
        dsl_path = resource('dsl/operation_executor_override.yaml')
        deployment, _ = deploy(dsl_path)
        deployment_nodes = self.client.node_instances.list(
            deployment_id=deployment.id
        )

        webserver_nodes = filter(lambda node: 'host' not in node.node_id,
                                 deployment_nodes)
        self.assertEquals(1, len(webserver_nodes))
        webserver_node = webserver_nodes[0]
        start_invocation = self.get_plugin_data(
            plugin_name='target_aware_mock_plugin',
            deployment_id=deployment.id
        )[webserver_node.id]['start']

        expected_start_invocation = {'target': deployment.id}
        self.assertEqual(expected_start_invocation, start_invocation)

        plugin_installer_data = self.get_plugin_data(
            plugin_name='plugin_installer',
            deployment_id=deployment.id
        )

        deployment_operations_worker_name = deployment.id
        # target_aware_mock_plugin should have been installed
        # on the deployment worker as well because 'start'
        # overrides the executor
        self.assertEqual(
            plugin_installer_data[
                deployment_operations_worker_name
            ]['target_aware_mock_plugin'],
            ['installed'])
        undeploy(deployment_id=deployment.id)
开发者ID:nagyist,项目名称:cloudify-manager,代码行数:34,代码来源:test_workflow.py


示例14: test_workflow_deployment_scaling_groups

 def test_workflow_deployment_scaling_groups(self):
     deployment, _ = deploy(resource('dsl/store-scaling-groups.yaml'),
                            workflow_name='workflow')
     instance = self.client.node_instances.list(deployment.id)[0]
     self.assertEqual(
         ['node'],
         instance.runtime_properties['scaling_groups']['group1']['members'])
开发者ID:gvsurenderreddy,项目名称:cloudify-manager,代码行数:7,代码来源:test_wf_api.py


示例15: test_simple

    def test_simple(self):
        parameters = {
            'do_get': self.do_get,
            'key': 'key1',
            'value': 'value1'
        }
        result_dict = {
            'key1': 'value1'
        }
        deployment, _ = deploy(resource('dsl/workflow_api.yaml'),
                               self._testMethodName,
                               parameters=parameters)

        # testing workflow remote task
        invocation = self.get_plugin_data(
            plugin_name='testmockoperations',
            deployment_id=deployment.id
        )['mock_operation_invocation'][0]
        self.assertDictEqual(result_dict, invocation)

        # testing workflow local task
        instance = self.client.node_instances.list(
            deployment_id=deployment.id)[0]
        # I am in love with eventual consistency
        instance = self.client.node_instances.get(instance.id)
        self.assertEqual('test_state', instance.state)
开发者ID:filipposc5,项目名称:cloudify-manager,代码行数:26,代码来源:test_wf_api.py


示例16: test_deploy_with_agent_worker_windows_3_2

    def test_deploy_with_agent_worker_windows_3_2(self):
        dsl_path = resource('dsl/with_agent_worker_windows_3_2.yaml')
        deployment, _ = deploy(dsl_path, timeout_seconds=500)
        deployment_nodes = self.client.node_instances.list(
            deployment_id=deployment.id
        )

        webserver_nodes = filter(lambda node: 'host' not in node.node_id,
                                 deployment_nodes)
        self.assertEquals(1, len(webserver_nodes))
        webserver_node = webserver_nodes[0]
        invocations = self.get_plugin_data(
            plugin_name='mock_agent_plugin',
            deployment_id=deployment.id
        )[webserver_node.id]

        agent_installer_data = self.get_plugin_data(
            plugin_name='windows_agent_installer',
            deployment_id=deployment.id
        )

        # agent on host should have been started and restarted
        self.assertEqual(
            agent_installer_data[webserver_node.host_id]['states'],
            ['created', 'configured', 'started', 'restarted'])

        plugin_installer_data = self.get_plugin_data(
            plugin_name='windows_plugin_installer',
            deployment_id=deployment.id
        )

        self.assertEqual(
            plugin_installer_data[
                webserver_node.host_id
            ]['mock_agent_plugin'],
            ['installed'])

        expected_invocations = ['create', 'start']
        self.assertListEqual(invocations, expected_invocations)

        undeploy(deployment_id=deployment.id)
        invocations = self.get_plugin_data(
            plugin_name='mock_agent_plugin',
            deployment_id=deployment.id
        )[webserver_node.id]

        expected_invocations = ['create', 'start', 'stop', 'delete']
        self.assertListEqual(invocations, expected_invocations)

        # agent on host should have also
        # been stopped and uninstalled
        agent_installer_data = self.get_plugin_data(
            plugin_name='windows_agent_installer',
            deployment_id=deployment.id
        )
        self.assertEqual(
            agent_installer_data[webserver_node.host_id]['states'],
            ['created', 'configured', 'started',
             'restarted', 'stopped', 'deleted'])
开发者ID:KarnYong,项目名称:cloudify-manager,代码行数:59,代码来源:test_backwards.py


示例17: test_deployment_modifications_pagination

 def test_deployment_modifications_pagination(self):
     deployment = deploy(resource("dsl/pagination.yaml"))
     for i in range(2, 12):
         modification = self.client.deployment_modifications.start(
             deployment_id=deployment.id, nodes={"node": {"instances": i}}
         )
         self.client.deployment_modifications.finish(modification.id)
     self._test_pagination(partial(self.client.deployment_modifications.list, deployment_id=deployment.id))
开发者ID:KarnYong,项目名称:cloudify-manager,代码行数:8,代码来源:test_rest_service_pagination.py


示例18: test_deployment_modifications_sort

 def test_deployment_modifications_sort(self):
     deployment = deploy(resource('dsl/sort.yaml'))
     for i in range(2, 12):
         modification = self.client.deployment_modifications.start(
             deployment_id=deployment.id,
             nodes={'node': {'instances': i}})
         self.client.deployment_modifications.finish(modification.id)
     self._test_sort('deployment_modifications', 'deployment_id')
开发者ID:01000101,项目名称:cloudify-manager,代码行数:8,代码来源:test_rest_service_sort.py


示例19: test_cancel_on_task_retry_interval

 def test_cancel_on_task_retry_interval(self):
     self.configure(retries=2, interval=1000000)
     _, eid = deploy(
         resource('dsl/workflow_api.yaml'), self._testMethodName,
         parameters={'do_get': self.do_get}, wait_for_execution=False)
     self.wait_for_execution_status(eid, status=Execution.STARTED)
     self.client.executions.cancel(eid)
     self.wait_for_execution_status(eid, status=Execution.CANCELLED)
开发者ID:filipposc5,项目名称:cloudify-manager,代码行数:8,代码来源:test_wf_api.py


示例20: test_script_mapping

    def test_script_mapping(self):
        dsl_path = resource('dsl/test_script_mapping.yaml')
        deployment, _ = deploy(dsl_path)
        execute_workflow('workflow', deployment.id)

        data = self.get_plugin_data(plugin_name='script',
                                    deployment_id=deployment.id)
        self.assertEqual(data['op1_called_with_property'], 'op2_called')
        self.assertEqual(data['op2_prop'], 'op2_value')
开发者ID:01000101,项目名称:cloudify-manager,代码行数:9,代码来源:test_script_mapping.py



注:本文中的testenv.utils.deploy函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python utils.resource函数代码示例发布时间:2022-05-27
下一篇:
Python testenv.resource函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap