本文整理汇总了Python中senlin.tests.unit.common.utils.dummy_context函数的典型用法代码示例。如果您正苦于以下问题:Python dummy_context函数的具体用法?Python dummy_context怎么用?Python dummy_context使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了dummy_context函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: setUp
def setUp(self):
super(ProfileTypeTest, self).setUp()
self.ctx = utils.dummy_context(project='profile_type_test_project')
self.eng = service.EngineService('host-a', 'topic-a')
self.eng.init_tgm()
environment.global_env().register_profile('TestProfile',
fakes.TestProfile)
开发者ID:KongJustin,项目名称:senlin,代码行数:7,代码来源:test_profile_types.py
示例2: setUp
def setUp(self):
super(TestLoadBalancingPolicyOperations, self).setUp()
self.context = utils.dummy_context()
self.spec = {
'type': 'senlin.policy.loadbalance',
'version': '1.0',
'properties': {
'pool': {
'protocol': 'HTTP',
'protocol_port': 80,
'subnet': 'test-subnet',
'lb_method': 'ROUND_ROBIN',
'admin_state_up': True,
'session_persistence': {
'type': 'SOURCE_IP',
'cookie_name': 'whatever'
}
},
'vip': {
'address': '192.168.1.100',
'subnet': 'test-subnet',
'connection_limit': 500,
'protocol': 'HTTP',
'protocol_port': 80,
'admin_state_up': True,
}
}
}
sd = mock.Mock()
self.patchobject(driver_base, 'SenlinDriver', return_value=sd)
self.lb_driver = mock.Mock()
sd.loadbalancing.return_value = self.lb_driver
self.patchobject(oslo_context, 'get_current')
开发者ID:zzxwill,项目名称:senlin,代码行数:34,代码来源:test_lb_policy.py
示例3: test_receiver_get_admin_context
def test_receiver_get_admin_context(self):
admin_ctx = utils.dummy_context(project='a-different-project',
is_admin=True)
r = self._create_receiver(self.ctx)
res = db_api.receiver_get(admin_ctx, r.id, project_safe=True)
self.assertIsNotNone(res)
开发者ID:jonnary,项目名称:senlin,代码行数:7,代码来源:test_receiver_api.py
示例4: test_cluster_get_by_short_id
def test_cluster_get_by_short_id(self):
cid1 = 'same-part-unique-part'
cid2 = 'same-part-part-unique'
cluster1 = shared.create_cluster(self.ctx, self.profile,
id=cid1,
name='cluster-1')
cluster2 = shared.create_cluster(self.ctx, self.profile,
id=cid2,
name='cluster-2')
for x in range(len('same-part-')):
self.assertRaises(exception.MultipleChoices,
db_api.cluster_get_by_short_id,
self.ctx, cid1[:x])
res = db_api.cluster_get_by_short_id(self.ctx, cid1[:11])
self.assertEqual(cluster1.id, res.id)
res = db_api.cluster_get_by_short_id(self.ctx, cid2[:11])
self.assertEqual(cluster2.id, res.id)
res = db_api.cluster_get_by_short_id(self.ctx, 'non-existent')
self.assertIsNone(res)
ctx_new = utils.dummy_context(project='different_project_id')
res = db_api.cluster_get_by_short_id(ctx_new, cid1[:11])
self.assertIsNone(res)
开发者ID:onesafe,项目名称:senlin,代码行数:25,代码来源:test_cluster_api.py
示例5: setUp
def setUp(self):
super(TestNotificationBase, self).setUp()
self.ctx = utils.dummy_context()
service_id = uuidutils.generate_uuid()
self.service_obj = objects.Service.create(
self.ctx, service_id,
self.fake_service['host'],
self.fake_service['binary'],
self.fake_service['topic'])
self.my_obj = TestObject(field_1='test1', field_2=42,
not_important_field=13)
self.payload = TestPayload(extra_field='test string')
self.payload.populate_schema(source_field=self.my_obj)
self.notification = TestNotification(
event_type=notification.EventType(
object='test_object',
action=fields.NotificationAction.UPDATE,
phase=fields.NotificationPhase.START),
publisher=notification.NotificationPublisher.from_service_obj(
self.service_obj),
priority=fields.NotificationPriority.INFO,
payload=self.payload)
开发者ID:jonnary,项目名称:senlin,代码行数:25,代码来源:test_notification.py
示例6: setUp
def setUp(self):
super(TestNeutronLBaaSDriver, self).setUp()
self.context = utils.dummy_context()
self.conn_params = self.context.to_dict()
self.lb_driver = lbaas.LoadBalancerDriver(self.conn_params)
self.patchobject(neutron_v2, 'NeutronClient')
self.nc = self.lb_driver.nc()
self.vip = {
'subnet': 'subnet-01',
'address': '192.168.1.100',
'admin_state_up': True,
'protocol': 'HTTP',
'protocol_port': 80,
'connection_limit': 50
}
self.pool = {
'lb_method': 'ROUND_ROBIN',
'protocol': 'HTTP',
'admin_state_up': True
}
self.hm = {
"type": "HTTP",
"delay": "1",
"timeout": 1,
"max_retries": 5,
"pool_id": "POOL_ID",
"admin_state_up": True,
"http_method": "GET",
"url_path": "/index.html",
"expected_codes": "200,201,202"
}
开发者ID:gongwayne,项目名称:Openstack,代码行数:32,代码来源:test_lbaas_driver.py
示例7: setUp
def setUp(self):
super(TriggerTest, self).setUp()
self.ctx = utils.dummy_context(project='trigger_test_project')
self.eng = service.EngineService('host-a', 'topic-a')
self.eng.init_tgm()
environment.global_env().register_trigger('TestTrigger',
fakes.TestTrigger)
开发者ID:KongJustin,项目名称:senlin,代码行数:7,代码来源:test_triggers.py
示例8: setUp
def setUp(self):
super(SenlinLockTest, self).setUp()
self.ctx = utils.dummy_context()
ret = mock.Mock(owner='ENGINE', id='ACTION_ABC')
self.stub_get = self.patchobject(ao.Action, 'get', return_value=ret)
开发者ID:jonnary,项目名称:senlin,代码行数:7,代码来源:test_senlin_lock.py
示例9: _test_engine_api
def _test_engine_api(self, method, rpc_method, **kwargs):
ctxt = utils.dummy_context()
expected_retval = 'foo' if method == 'call' else None
kwargs.pop('version', None)
if 'expected_message' in kwargs:
expected_message = kwargs['expected_message']
del kwargs['expected_message']
else:
expected_message = self.rpcapi.make_msg(method, **kwargs)
cast_and_call = [
'profile_delete',
'policy_delete',
'cluster_delete',
'node_delete',
'receiver_delete',
'webhook_delete',
]
if rpc_method == 'call' and method in cast_and_call:
kwargs['cast'] = False
mock_rpc_method = self.patchobject(self.rpcapi, rpc_method,
return_value=expected_retval)
retval = getattr(self.rpcapi, method)(ctxt, **kwargs)
self.assertEqual(expected_retval, retval)
expected_args = [ctxt, expected_message, mock.ANY]
actual_args, _ = mock_rpc_method.call_args
for expected_arg, actual_arg in zip(expected_args, actual_args):
self.assertEqual(expected_arg, actual_arg)
开发者ID:zzxwill,项目名称:senlin,代码行数:34,代码来源:test_rpc_client.py
示例10: test_cluster_list_project_safe
def test_cluster_list_project_safe(self, notify):
new_ctx = utils.dummy_context(project='a_diff_project')
spec = {
'type': 'TestProfile',
'version': '1.0',
'properties': {'INT': 10, 'STR': 'string'},
}
p1 = self.eng.profile_create(self.ctx, 'p-test-1', spec,
permission='1111')
p2 = self.eng.profile_create(new_ctx, 'p-test-2', spec,
permission='1111')
c1 = self.eng.cluster_create(self.ctx, 'c1', 0, p1['id'])
c2 = self.eng.cluster_create(new_ctx, 'c2', 0, p2['id'])
# default is project_safe
result = self.eng.cluster_list(self.ctx)
self.assertIsInstance(result, list)
self.assertEqual(1, len(result))
self.assertEqual(c1['id'], result[0]['id'])
result = self.eng.cluster_list(new_ctx)
self.assertIsInstance(result, list)
self.assertEqual(1, len(result))
self.assertEqual(c2['id'], result[0]['id'])
# try project_safe set to False
result = self.eng.cluster_list(self.ctx, project_safe=False)
self.assertIsInstance(result, list)
self.assertEqual(2, len(result))
self.assertEqual(c1['id'], result[0]['id'])
self.assertEqual(c2['id'], result[1]['id'])
开发者ID:MountainWei,项目名称:senlin,代码行数:31,代码来源:test_clusters.py
示例11: test_get_delete_candidates_no_deletion_data_resize
def test_get_delete_candidates_no_deletion_data_resize(self,
m_nodes_random,
m_parse_param,
m_cluster_get,
m_node_get):
def _parse_param(action, cluster):
action.data = {'deletion': {'count': 2}}
action = mock.Mock()
self.context = utils.dummy_context()
action.data = {}
action.action = consts.CLUSTER_RESIZE
m_parse_param.side_effect = _parse_param
m_node_get.return_value = ['node1', 'node2', 'node3']
m_cluster_get.return_value = 'cluster1'
m_nodes_random.return_value = ['node1', 'node3']
policy = lb_policy.LoadBalancingPolicy('test-policy', self.spec)
res = policy._get_delete_candidates('CLUSTERID', action)
m_cluster_get.assert_called_once_with(action.context,
'CLUSTERID')
m_parse_param.assert_called_once_with(action, 'cluster1')
m_node_get.assert_called_once_with(action.context,
cluster_id='CLUSTERID')
m_nodes_random.assert_called_once_with(['node1', 'node2', 'node3'], 2)
self.assertEqual(['node1', 'node3'], res)
开发者ID:gongwayne,项目名称:Openstack,代码行数:27,代码来源:test_lb_policy.py
示例12: setUp
def setUp(self):
super(ClusterTest, self).setUp()
self.ctx = utils.dummy_context(project='cluster_test_project')
self.eng = service.EngineService('host-a', 'topic-a')
self.eng.init_tgm()
self.eng.dispatcher = mock.Mock()
env = environment.global_env()
env.register_profile('TestProfile', fakes.TestProfile)
env.register_policy('TestPolicy', fakes.TestPolicy)
spec = {
'type': 'TestProfile',
'version': '1.0',
'properties': {'INT': 10, 'STR': 'string'},
}
self.profile = self.eng.profile_create(self.ctx, 'p-test', spec,
permission='1111')
spec = {
'type': 'TestPolicy',
'version': '1.0',
'properties': {'KEY2': 3}
}
self.policy = self.eng.policy_create(self.ctx, 'policy_1', spec,
cooldown=60, level=50)
开发者ID:KongJustin,项目名称:senlin,代码行数:26,代码来源:test_clusters.py
示例13: setUp
def setUp(self):
super(TestHealthPolicy, self).setUp()
self.context = utils.dummy_context()
self.spec = {
'type': 'senlin.policy.health',
'version': '1.0',
'properties': {
'detection': {
'type': 'NODE_STATUS_POLLING',
'options': {
'interval': 60
}
},
'recovery': {
'fencing': ['COMPUTE'],
'actions': ['REBUILD']
}
}
}
cluster = mock.Mock()
cluster.id = 'CLUSTER_ID'
node = mock.Mock()
node.status = 'ACTIVE'
cluster.nodes = [node]
self.cluster = cluster
self.patch('senlin.rpc.client.EngineClient')
self.hp = health_policy.HealthPolicy('test-policy', self.spec)
开发者ID:gongwayne,项目名称:Openstack,代码行数:28,代码来源:test_health_policy.py
示例14: test_node_get_all_with_admin_context
def test_node_get_all_with_admin_context(self):
shared.create_node(self.ctx, None, self.profile, name='node1')
shared.create_node(self.ctx, None, self.profile, name='node2')
admin_ctx = utils.dummy_context(project='a_different_project',
is_admin=True)
results = db_api.node_get_all(admin_ctx, project_safe=True)
self.assertEqual(2, len(results))
开发者ID:onesafe,项目名称:senlin,代码行数:8,代码来源:test_node_api.py
示例15: test_action_get_project_safe
def test_action_get_project_safe(self):
parser.simple_parse(shared.sample_action)
action = _create_action(self.ctx)
new_ctx = utils.dummy_context(project='another-project')
retobj = db_api.action_get(new_ctx, action.id, project_safe=True)
self.assertIsNone(retobj)
retobj = db_api.action_get(new_ctx, action.id, project_safe=False)
self.assertIsNotNone(retobj)
开发者ID:jonnary,项目名称:senlin,代码行数:8,代码来源:test_action_api.py
示例16: test_receiver_get_all_with_admin_context
def test_receiver_get_all_with_admin_context(self):
self._create_receiver(self.ctx, name='receiver1')
self._create_receiver(self.ctx, name='receiver2')
admin_ctx = utils.dummy_context(project='a-different-project',
is_admin=True)
results = db_api.receiver_get_all(admin_ctx, project_safe=True)
self.assertEqual(2, len(results))
开发者ID:jonnary,项目名称:senlin,代码行数:8,代码来源:test_receiver_api.py
示例17: test_policy_get_admin_context
def test_policy_get_admin_context(self):
data = self.new_policy_data()
policy = db_api.policy_create(self.ctx, data)
admin_ctx = utils.dummy_context(project='a-different-project',
is_admin=True)
res = db_api.policy_get(admin_ctx, policy.id, project_safe=True)
self.assertIsNotNone(res)
开发者ID:jonnary,项目名称:senlin,代码行数:8,代码来源:test_policy_api.py
示例18: test_trigger_get_diff_project
def test_trigger_get_diff_project(self):
trigger = self._create_trigger(self.ctx)
new_ctx = utils.dummy_context(project='a-different-project')
retobj = db_api.trigger_get(new_ctx, trigger.id)
self.assertIsNone(retobj)
retobj = db_api.trigger_get(new_ctx, trigger.id, project_safe=False)
self.assertIsNotNone(retobj)
self.assertEqual(trigger.id, retobj.id)
开发者ID:MountainWei,项目名称:senlin,代码行数:8,代码来源:test_trigger_api.py
示例19: setUp
def setUp(self):
super(TestDispatcher, self).setUp()
self.context = utils.dummy_context()
self.thm = scheduler.ThreadGroupManager()
self.svc = service.EngineService('HOST', 'TOPIC')
self.svc.engine_id = '1234'
开发者ID:jonnary,项目名称:senlin,代码行数:8,代码来源:test_dispatcher.py
示例20: setUp
def setUp(self):
super(TestNeutronV2Driver, self).setUp()
self.context = utils.dummy_context()
self.conn_params = self.context.to_dict()
self.conn = mock.Mock()
with mock.patch.object(sdk, 'create_connection') as mock_creare_conn:
mock_creare_conn.return_value = self.conn
self.nc = neutron_v2.NeutronClient(self.context)
开发者ID:gongwayne,项目名称:Openstack,代码行数:8,代码来源:test_neutron_v2_driver.py
注:本文中的senlin.tests.unit.common.utils.dummy_context函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论