本文整理汇总了Python中shakedown.clients.marathon.create_client函数的典型用法代码示例。如果您正苦于以下问题:Python create_client函数的具体用法?Python create_client怎么用?Python create_client使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了create_client函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_create_pod_with_private_image
def test_create_pod_with_private_image():
"""Deploys a pod with a private Docker image, using Mesos containerizer.
This method relies on the global `install_enterprise_cli` fixture to install the
enterprise-cli-package.
"""
username = os.environ['DOCKER_HUB_USERNAME']
password = os.environ['DOCKER_HUB_PASSWORD']
secret_name = "pullconfig"
secret_value_json = common.create_docker_pull_config_json(username, password)
secret_value = json.dumps(secret_value_json)
pod_def = pods.private_docker_pod()
pod_id = pod_def['id']
common.create_secret(secret_name, secret_value)
client = marathon.create_client()
try:
client.add_pod(pod_def)
deployment_wait(service_id=pod_id, max_attempts=300)
pod = client.show_pod(pod_id)
assert pod is not None, "The pod has not been created"
finally:
common.delete_secret(secret_name)
开发者ID:mesosphere,项目名称:marathon,代码行数:25,代码来源:marathon_pods_tests.py
示例2: test_pod_with_persistent_volume
def test_pod_with_persistent_volume():
pod_def = pods.persistent_volume_pod()
pod_id = pod_def['id']
client = marathon.create_client()
client.add_pod(pod_def)
deployment_wait(service_id=pod_id)
tasks = common.get_pod_tasks(pod_id)
host = common.running_status_network_info(tasks[0]['statuses'])['ip_addresses'][0]['ip_address']
# Container with the name 'container1' appends its taskId to the file. So we search for the
# taskId of that container which is not always the tasks[0]
expected_data = next((t['id'] for t in tasks if t['name'] == 'container1'), None)
assert expected_data, f"Hasn't found a container with the name 'container1' in the pod {tasks}"
port1 = tasks[0]['discovery']['ports']['ports'][0]["number"]
port2 = tasks[1]['discovery']['ports']['ports'][0]["number"]
path1 = tasks[0]['container']['volumes'][0]['container_path']
path2 = tasks[1]['container']['volumes'][0]['container_path']
logger.info('Deployd two containers on {}:{}/{} and {}:{}/{}'.format(host, port1, path1, host, port2, path2))
@retrying.retry(wait_fixed=1000, stop_max_attempt_number=60, retry_on_exception=common.ignore_exception)
def check_http_endpoint(port, path, expected):
cmd = "curl {}:{}/{}/foo".format(host, port, path)
run, data = run_command_on_master(cmd)
assert run, "{} did not succeed".format(cmd)
assert expected in data, "'{}' was not found in '{}'".format(data, expected)
check_http_endpoint(port1, path1, expected_data)
check_http_endpoint(port2, path2, expected_data)
开发者ID:mesosphere,项目名称:marathon,代码行数:32,代码来源:marathon_pods_tests.py
示例3: test_pod_health_failed_check
def test_pod_health_failed_check():
"""Deploys a pod with correct health checks, then partitions the network and verifies that
the tasks get restarted with new task IDs.
"""
pod_def = pods.ports_pod()
pod_id = pod_def['id']
host = common.ip_other_than_mom()
common.pin_pod_to_host(pod_def, host)
client = marathon.create_client()
client.add_pod(pod_def)
deployment_wait(service_id=pod_id)
tasks = common.get_pod_tasks(pod_id)
initial_id1 = tasks[0]['id']
initial_id2 = tasks[1]['id']
pod = client.list_pod()[0]
container1 = pod['instances'][0]['containers'][0]
port = container1['endpoints'][0]['allocatedHostPort']
common.block_iptable_rules_for_seconds(host, port, 7, block_input=True, block_output=False)
deployment_wait(service_id=pod_id)
tasks = common.get_pod_tasks(pod_id)
for new_task in tasks:
new_task_id = new_task['id']
assert new_task_id != initial_id1, f"Task {new_task_id} has not been restarted" # NOQA E999
assert new_task_id != initial_id2, f"Task {new_task_id} has not been restarted"
开发者ID:mesosphere,项目名称:marathon,代码行数:31,代码来源:marathon_pods_tests.py
示例4: test_pod_with_container_bridge_network
def test_pod_with_container_bridge_network():
"""Tests creation of a pod with a "container/bridge" network, and its HTTP endpoint accessibility."""
pod_def = pods.container_bridge_pod()
pod_id = pod_def['id']
# In strict mode all tasks are started as user `nobody` by default and `nobody`
# doesn't have permissions to write to /var/log within the container.
if shakedown.dcos.cluster.ee_version() == 'strict':
pod_def['user'] = 'root'
common.add_dcos_marathon_user_acls()
client = marathon.create_client()
client.add_pod(pod_def)
deployment_wait(service_id=pod_id)
task = common.task_by_name(common.get_pod_tasks(pod_id), "nginx")
network_info = common.running_status_network_info(task['statuses'])
assert network_info['name'] == "mesos-bridge", \
"The network is {}, but mesos-bridge was expected".format(network_info['name'])
# get the port on the host
port = task['discovery']['ports']['ports'][0]['number']
# the agent IP:port will be routed to the bridge IP:port
# test against the agent_ip, however it is hard to get.. translating from
# slave_id
agent_ip = common.agent_hostname_by_id(task['slave_id'])
assert agent_ip is not None, "Failed to get the agent IP address"
container_ip = network_info['ip_addresses'][0]['ip_address']
assert agent_ip != container_ip, "The container IP address is the same as the agent one"
url = "http://{}:{}/".format(agent_ip, port)
common.assert_http_code(url)
开发者ID:mesosphere,项目名称:marathon,代码行数:34,代码来源:marathon_pods_tests.py
示例5: assert_mom_ee
def assert_mom_ee(version, security_mode='permissive'):
ensure_service_account()
ensure_permissions()
ensure_sa_secret(strict=True if security_mode == 'strict' else False)
ensure_docker_config_secret()
# In strict mode all tasks are started as user `nobody` by default. However we start
# MoM-EE as 'root' and for that we need to give root marathon ACLs to start
# tasks as 'root'.
if security_mode == 'strict':
common.add_dcos_marathon_user_acls()
# Deploy MoM-EE in permissive mode
app_def_file = '{}/mom-ee-{}-{}.json'.format(fixtures.fixtures_dir(), security_mode, version)
assert os.path.isfile(app_def_file), "Couldn't find appropriate MoM-EE definition: {}".format(app_def_file)
image = mom_ee_image(version)
logger.info('Deploying {} definition with {} image'.format(app_def_file, image))
app_def = get_resource(app_def_file)
app_def['container']['docker']['image'] = 'mesosphere/marathon-dcos-ee:{}'.format(image)
app_id = app_def["id"]
client = marathon.create_client()
client.add_app(app_def)
deployment_wait(service_id=app_id)
shakedown.dcos.service.wait_for_service_endpoint(mom_ee_endpoint(version, security_mode), path="ping")
开发者ID:mesosphere,项目名称:marathon,代码行数:27,代码来源:test_marathon_on_marathon_ee.py
示例6: test_vip_docker_bridge_mode
def test_vip_docker_bridge_mode(marathon_service_name):
"""Tests the creation of a VIP from a python command in a docker image using bridge mode.
the test validates the creation of an app with the VIP label and the accessability
of the service via the VIP.
"""
app_def = apps.docker_http_server(app_id='vip-docker-bridge-mode-app')
app_id = app_def["id"]
vip_name = app_id.lstrip("/")
fqn = '{}.{}.l4lb.thisdcos.directory'.format(vip_name, marathon_service_name)
app_def['id'] = vip_name
app_def['container']['docker']['portMappings'] = [{
"containerPort": 8080,
"hostPort": 0,
"labels": {
"VIP_0": "/{}:10000".format(vip_name)
},
"protocol": "tcp",
"name": "{}".format(vip_name)
}]
client = marathon.create_client()
client.add_app(app_def)
deployment_wait(service_id=app_id)
@retrying.retry(wait_fixed=1000, stop_max_attempt_number=30, retry_on_exception=common.ignore_exception)
def http_output_check():
time.sleep(1)
common.assert_http_code('{}:{}'.format(fqn, 10000))
http_output_check()
开发者ID:mesosphere,项目名称:marathon,代码行数:34,代码来源:marathon_common_tests.py
示例7: test_marathon_when_disconnected_from_zk
def test_marathon_when_disconnected_from_zk():
"""Launches an app from Marathon, then knocks out access to ZK from Marathon.
Verifies the task is preserved.
"""
app_def = apps.sleep_app()
app_id = app_def["id"]
host = common.ip_other_than_mom()
common.pin_to_host(app_def, host)
client = marathon.create_client()
client.add_app(app_def)
deployment_wait(service_id=app_id)
tasks = client.get_tasks(app_id)
original_task_id = tasks[0]['id']
common.block_iptable_rules_for_seconds(host, 2181, sleep_seconds=10, block_input=True, block_output=False)
@retrying.retry(wait_fixed=1000, stop_max_attempt_number=30, retry_on_exception=common.ignore_exception)
def check_task_is_back():
tasks = client.get_tasks(app_id)
assert tasks[0]['id'] == original_task_id, \
"The task {} got replaced with {}".format(original_task_id, tasks[0]['id'])
check_task_is_back()
开发者ID:mesosphere,项目名称:marathon,代码行数:27,代码来源:marathon_common_tests.py
示例8: test_private_repository_mesos_app
def test_private_repository_mesos_app():
"""Deploys an app with a private Docker image, using Mesos containerizer.
It relies on the global `install_enterprise_cli` fixture to install the
enterprise-cli-package.
"""
username = os.environ['DOCKER_HUB_USERNAME']
password = os.environ['DOCKER_HUB_PASSWORD']
secret_name = "pullconfig"
secret_value_json = common.create_docker_pull_config_json(username, password)
secret_value = json.dumps(secret_value_json)
app_def = apps.private_ucr_docker_app()
app_id = app_def["id"]
# In strict mode all tasks are started as user `nobody` by default and `nobody`
# doesn't have permissions to write to /var/log within the container.
if is_strict():
app_def['user'] = 'root'
common.add_dcos_marathon_user_acls()
common.create_secret(secret_name, secret_value)
client = marathon.create_client()
try:
client.add_app(app_def)
deployment_wait(service_id=app_id)
common.assert_app_tasks_running(client, app_def)
finally:
common.delete_secret(secret_name)
开发者ID:mesosphere,项目名称:marathon,代码行数:32,代码来源:test_marathon_root.py
示例9: test_unhealthy_app_can_be_rolled_back
def test_unhealthy_app_can_be_rolled_back():
"""Verifies that an updated app gets rolled back due to being unhealthy."""
app_def = apps.readiness_and_health_app()
app_id = app_def["id"]
@retrying.retry(
wait_fixed=1000,
stop_max_attempt_number=30,
retry_on_exception=common.ignore_provided_exception(DCOSException)
)
def wait_for_deployment():
deployment_wait(service_id=app_id)
client = marathon.create_client()
client.add_app(app_def)
wait_for_deployment()
tasks = client.get_tasks(app_id)
assert len(tasks) == 1, "The number of tasks is {} after deployment, but 1 was expected".format(len(tasks))
app_def['healthChecks'][0]['path'] = '/non-existent'
app_def['instances'] = 2
deployment_id = client.update_app(app_id, app_def)
try:
wait_for_deployment()
except Exception:
client.rollback_deployment(deployment_id)
wait_for_deployment()
tasks = client.get_tasks(app_id)
assert len(tasks) == 1, "The number of tasks is {} after rollback, but 1 was expected".format(len(tasks))
开发者ID:mesosphere,项目名称:marathon,代码行数:33,代码来源:marathon_common_tests.py
示例10: test_event_channel_for_pods
async def test_event_channel_for_pods(sse_events):
"""Tests the Marathon event channel specific to pod events."""
await common.assert_event('event_stream_attached', sse_events)
pod_def = pods.simple_pod()
pod_id = pod_def['id']
# In strict mode all tasks are started as user `nobody` by default and `nobody`
# doesn't have permissions to write files.
if shakedown.dcos.cluster.ee_version() == 'strict':
pod_def['user'] = 'root'
common.add_dcos_marathon_user_acls()
client = marathon.create_client()
client.add_pod(pod_def)
deployment_wait(service_id=pod_id)
await common.assert_event('pod_created_event', sse_events)
await common.assert_event('deployment_step_success', sse_events)
pod_def["scaling"]["instances"] = 3
client.update_pod(pod_id, pod_def)
deployment_wait(service_id=pod_id)
await common.assert_event('pod_updated_event', sse_events)
开发者ID:mesosphere,项目名称:marathon,代码行数:26,代码来源:marathon_pods_tests.py
示例11: test_pinned_task_does_not_scale_to_unpinned_host
def test_pinned_task_does_not_scale_to_unpinned_host():
"""Tests when a task lands on a pinned node (and barely fits) and it is asked to scale past
the resources of that node, no tasks will be launched on any other node.
"""
app_def = apps.sleep_app()
app_id = app_def['id']
host = common.ip_other_than_mom()
logger.info('Constraint set to host: {}'.format(host))
# the size of cpus is designed to be greater than 1/2 of a node
# such that only 1 task can land on the node.
cores = common.cpus_on_agent(host)
app_def['cpus'] = max(0.6, cores - 0.5)
common.pin_to_host(app_def, host)
client = marathon.create_client()
client.add_app(app_def)
deployment_wait(service_id=app_id)
client.scale_app(app_id, 2)
time.sleep(5)
deployments = client.get_deployments(app_id=app_id)
tasks = client.get_tasks(app_id)
# still deploying
assert len(deployments) == 1, "The number of deployments is {}, but 1 was expected".format(len(deployments))
assert len(tasks) == 1, "The number of tasks is {}, but 1 was expected".format(len(tasks))
开发者ID:mesosphere,项目名称:marathon,代码行数:29,代码来源:marathon_common_tests.py
示例12: test_pinned_task_scales_on_host_only
def test_pinned_task_scales_on_host_only():
"""Tests that a pinned app scales only on the pinned node."""
app_def = apps.sleep_app()
app_id = app_def["id"]
host = common.ip_other_than_mom()
common.pin_to_host(app_def, host)
client = marathon.create_client()
client.add_app(app_def)
deployment_wait(service_id=app_id)
tasks = client.get_tasks(app_id)
assert len(tasks) == 1, "The number of tasks is {} after deployment, but 1 was expected".format(len(tasks))
assert tasks[0]['host'] == host, \
"The task is on {}, but it is supposed to be on {}".format(tasks[0]['host'], host)
client.scale_app(app_id, 10)
deployment_wait(service_id=app_id)
tasks = client.get_tasks(app_id)
assert len(tasks) == 10, "The number of tasks is {} after scale, but 10 was expected".format(len(tasks))
for task in tasks:
assert task['host'] == host, "The task is on {}, but it is supposed to be on {}".format(task['host'], host)
开发者ID:mesosphere,项目名称:marathon,代码行数:25,代码来源:marathon_common_tests.py
示例13: test_pinned_task_recovers_on_host
def test_pinned_task_recovers_on_host():
"""Tests that when a pinned task gets killed, it recovers on the node it was pinned to."""
app_def = apps.sleep_app()
app_id = app_def["id"]
host = common.ip_other_than_mom()
common.pin_to_host(app_def, host)
client = marathon.create_client()
client.add_app(app_def)
deployment_wait(service_id=app_id)
tasks = client.get_tasks(app_id)
common.kill_process_on_host(host, '[s]leep')
deployment_wait(service_id=app_id)
@retrying.retry(wait_fixed=1000, stop_max_attempt_number=30, retry_on_exception=common.ignore_exception)
def check_for_new_task():
new_tasks = client.get_tasks(app_id)
assert tasks[0]['id'] != new_tasks[0]['id'], "The task did not get killed: {}".format(tasks[0]['id'])
assert new_tasks[0]['host'] == host, \
"The task got restarted on {}, but it was supposed to stay on {}".format(new_tasks[0]['host'], host)
check_for_new_task()
开发者ID:mesosphere,项目名称:marathon,代码行数:25,代码来源:marathon_common_tests.py
示例14: test_marathon_with_master_process_failure
def test_marathon_with_master_process_failure(marathon_service_name):
"""Launches an app and restarts the master. It is expected that the service endpoint eventually comes back and
the task ID stays the same.
"""
app_def = apps.sleep_app()
app_id = app_def["id"]
host = common.ip_other_than_mom()
common.pin_to_host(app_def, host)
client = marathon.create_client()
client.add_app(app_def)
deployment_wait(service_id=app_id)
tasks = client.get_tasks(app_id)
original_task_id = tasks[0]['id']
common.systemctl_master('restart')
shakedown.dcos.service.wait_for_service_endpoint(marathon_service_name, path="ping")
@retrying.retry(wait_fixed=1000, stop_max_attempt_number=30, retry_on_exception=common.ignore_exception)
def check_task_recovery():
tasks = client.get_tasks(app_id)
assert len(tasks) == 1, "The number of tasks is {} after master restart, but 1 was expected".format(len(tasks))
assert tasks[0]['id'] == original_task_id, \
"Task {} has not recovered, it got replaced with another one: {}".format(original_task_id, tasks[0]['id'])
check_task_recovery()
开发者ID:mesosphere,项目名称:marathon,代码行数:29,代码来源:marathon_common_tests.py
示例15: test_scale_app_in_group
def test_scale_app_in_group():
"""Scales an individual app in a group."""
group_def = groups.sleep_group()
groups_id = group_def["groups"][0]["id"]
app1_id = group_def["groups"][0]["apps"][0]["id"]
app2_id = group_def["groups"][0]["apps"][1]["id"]
client = marathon.create_client()
client.create_group(group_def)
deployment_wait(service_id=app1_id)
group_apps = client.get_group(groups_id)
apps = group_apps['apps']
assert len(apps) == 2, "The number of apps is {}, but 2 was expected".format(len(apps))
tasks1 = client.get_tasks(app1_id)
tasks2 = client.get_tasks(app2_id)
assert len(tasks1) == 1, "The number of tasks #1 is {} after deployment, but 1 was expected".format(len(tasks1))
assert len(tasks2) == 1, "The number of tasks #2 is {} after deployment, but 1 was expected".format(len(tasks2))
# scaling just one app in the group
client.scale_app(app1_id, 2)
deployment_wait(service_id=app1_id)
tasks1 = client.get_tasks(app1_id)
tasks2 = client.get_tasks(app2_id)
assert len(tasks1) == 2, "The number of tasks #1 is {} after scale, but 2 was expected".format(len(tasks1))
assert len(tasks2) == 1, "The number of tasks #2 is {} after scale, but 1 was expected".format(len(tasks2))
开发者ID:mesosphere,项目名称:marathon,代码行数:30,代码来源:marathon_common_tests.py
示例16: test_pod_with_container_network
def test_pod_with_container_network():
"""Tests creation of a pod with a "container" network, and its HTTP endpoint accessibility."""
pod_def = pods.container_net_pod()
pod_id = pod_def['id']
# In strict mode all tasks are started as user `nobody` by default and `nobody`
# doesn't have permissions to write to /var/log within the container.
if shakedown.dcos.cluster.ee_version() == 'strict':
pod_def['user'] = 'root'
common.add_dcos_marathon_user_acls()
client = marathon.create_client()
client.add_pod(pod_def)
deployment_wait(service_id=pod_id)
task = common.task_by_name(common.get_pod_tasks(pod_id), "nginx")
network_info = common.running_status_network_info(task['statuses'])
assert network_info['name'] == "dcos", \
"The network name is {}, but 'dcos' was expected".format(network_info['name'])
container_ip = network_info['ip_addresses'][0]['ip_address']
assert container_ip is not None, "No IP address has been assigned to the pod's container"
url = "http://{}:80/".format(container_ip)
common.assert_http_code(url)
开发者ID:mesosphere,项目名称:marathon,代码行数:27,代码来源:marathon_pods_tests.py
示例17: test_launch_docker_grace_period
def test_launch_docker_grace_period(marathon_service_name):
"""Tests 'taskKillGracePeriodSeconds' option using a Docker container in a Marathon environment.
Read more details about this test in `test_root_marathon.py::test_launch_mesos_root_marathon_grace_period`
"""
app_id = '/launch-docker-grace-period-app'
app_def = apps.docker_http_server(app_id)
app_def['container']['docker']['image'] = 'kensipe/python-test'
default_grace_period = 3
grace_period = 20
app_def['taskKillGracePeriodSeconds'] = grace_period
app_def['cmd'] = 'python test.py'
task_name = app_id.lstrip('/')
client = marathon.create_client()
client.add_app(app_def)
deployment_wait(service_id=app_id)
tasks = get_service_task(marathon_service_name, task_name)
assert tasks is not None
client.scale_app(app_id, 0)
tasks = get_service_task(marathon_service_name, task_name)
assert tasks is not None
# tasks should still be here after the default_graceperiod
time.sleep(default_grace_period + 1)
tasks = get_service_task(marathon_service_name, task_name)
assert tasks is not None
# but not after the set grace_period
time.sleep(grace_period)
assert_that(lambda: get_service_task(marathon_service_name, task_name),
eventually(equal_to(None), max_attempts=30))
开发者ID:mesosphere,项目名称:marathon,代码行数:35,代码来源:marathon_common_tests.py
示例18: test_vip_mesos_cmd
def test_vip_mesos_cmd(marathon_service_name):
"""Validates the creation of an app with a VIP label and the accessibility of the service via the VIP."""
app_def = apps.http_server()
app_id = app_def["id"]
vip_name = app_id.lstrip("/")
fqn = '{}.{}.l4lb.thisdcos.directory'.format(vip_name, marathon_service_name)
app_def['portDefinitions'] = [{
"port": 0,
"protocol": "tcp",
"name": "{}".format(vip_name),
"labels": {
"VIP_0": "/{}:10000".format(vip_name)
}
}]
client = marathon.create_client()
client.add_app(app_def)
deployment_wait(service_id=app_id)
@retrying.retry(wait_fixed=1000, stop_max_attempt_number=30, retry_on_exception=common.ignore_exception)
def http_output_check():
time.sleep(1)
common.assert_http_code('{}:{}'.format(fqn, 10000))
http_output_check()
开发者ID:mesosphere,项目名称:marathon,代码行数:29,代码来源:marathon_common_tests.py
示例19: _test_declined_offer
def _test_declined_offer(app_def, reason):
"""Used to confirm that offers were declined. The `processedOffersSummary` and these tests
in general require 1.4+ marathon with the queue end point.
The retry is the best possible way to "time" the success of the test.
"""
app_id = app_def["id"]
client = marathon.create_client()
client.add_app(app_def)
@retrying.retry(wait_fixed=1000, stop_max_attempt_number=30, retry_on_exception=common.ignore_exception)
def verify_declined_offer():
deployments = client.get_deployments(app_id)
assert len(deployments) == 1
offer_summary = client.get_queued_app(app_id)['processedOffersSummary']
role_summary = declined_offer_by_reason(offer_summary['rejectSummaryLastOffers'], reason)
last_attempt = declined_offer_by_reason(offer_summary['rejectSummaryLaunchAttempt'], reason)
assert role_summary['declined'] > 0, "There are no declined offers because of {}".format(reason)
assert role_summary['processed'] > 0, "There are no processed offers for {}".format(reason)
assert last_attempt['declined'] > 0, "There are no declined offers because of {}".format(reason)
assert last_attempt['processed'] > 0, "There are no processed offers for {}".format(reason)
verify_declined_offer()
开发者ID:mesosphere,项目名称:marathon,代码行数:25,代码来源:marathon_common_tests.py
示例20: test_app_update_rollback
def test_app_update_rollback():
"""Tests that an updated app can be rolled back to its initial version."""
app_def = apps.readiness_and_health_app("app-update-rollback")
app_id = app_def["id"]
# First deployment
client = marathon.create_client()
client.add_app(app_def)
deployment_wait(service_id=app_id)
tasks = client.get_tasks(app_id)
assert_that(tasks, has_len(equal_to(1)))
# Second deployment
app_def['instances'] = 2
client.update_app(app_id, app_def)
deployment_wait(service_id=app_id)
tasks = client.get_tasks(app_id)
assert_that(tasks, has_len(equal_to(2)))
# Third deployment with rollback
# provides a testing delay to rollback in the meantime
app_def['readinessChecks'][0]['intervalSeconds'] = 30
app_def['instances'] = 1
deployment_id = client.update_app(app_id, app_def)
client.rollback_deployment(deployment_id)
deployment_wait(service_id=app_id)
# update to 1 instance is rollback to 2
tasks = client.get_tasks(app_id)
assert_that(tasks, has_len(equal_to(2)))
开发者ID:mesosphere,项目名称:marathon,代码行数:33,代码来源:marathon_common_tests.py
注:本文中的shakedown.clients.marathon.create_client函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论