本文整理汇总了Python中utils.run函数的典型用法代码示例。如果您正苦于以下问题:Python run函数的具体用法?Python run怎么用?Python run使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了run函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: hook_enable
def hook_enable(self):
"""apply PXE variables on the configured enable hook(s) and run the
hook(s)"""
for hook in cfg['hooks_pxe']['enable']:
hook = utils.apply_template(hook, self.pxe_variables)
logging.info('found enable hook "%s"', hook)
utils.run(hook, error=True)
开发者ID:COOLSPOIL,项目名称:seedBank,代码行数:7,代码来源:pxe.py
示例2: test_table_and_columns_query
def test_table_and_columns_query(executor):
run(executor, "create table a(x text, y text)")
run(executor, "create table b(z text)")
assert set(executor.tables()) == set([('a',), ('b',)])
assert set(executor.table_columns()) == set(
[('a', 'x'), ('a', 'y'), ('b', 'z')])
开发者ID:jzoe,项目名称:mycli,代码行数:7,代码来源:test_sqlexecute.py
示例3: test_regular_operation
def test_regular_operation(self):
# Use a dedicated worker to run all vtworker commands.
worker_proc, _, worker_rpc_port = utils.run_vtworker_bg(["--cell", "test_nj"], auto_log=True)
vtworker_endpoint = "localhost:" + str(worker_rpc_port)
automation_server_proc, automation_server_port = utils.run_automation_server()
source_shard_list = "0"
dest_shard_list = "-80,80-"
_, vtctld_endpoint = utils.vtctld.rpc_endpoint()
utils.run(
environment.binary_argstr("automation_client")
+ " --server localhost:"
+ str(automation_server_port)
+ " --task HorizontalReshardingTask"
+ " --param keyspace="
+ self.KEYSPACE
+ " --param source_shard_list="
+ source_shard_list
+ " --param dest_shard_list="
+ dest_shard_list
+ " --param vtctld_endpoint="
+ vtctld_endpoint
+ " --param vtworker_endpoint="
+ vtworker_endpoint
+ " --param min_healthy_rdonly_tablets=1"
)
self.verify()
utils.kill_sub_process(automation_server_proc, soft=True)
utils.kill_sub_process(worker_proc, soft=True)
开发者ID:Analyticalloopholes,项目名称:vitess,代码行数:32,代码来源:automation_horizontal_resharding.py
示例4: test_regular_operation
def test_regular_operation(self):
# Use a dedicated worker to run all vtworker commands.
worker_proc, _, worker_rpc_port = utils.run_vtworker_bg(
['--cell', 'test_nj'],
auto_log=True)
vtworker_endpoint = "localhost:" + str(worker_rpc_port)
automation_server_proc, automation_server_port = utils.run_automation_server()
keyspace = 'test_keyspace'
source_shard_list = '0'
dest_shard_list = '-80,80-'
_, vtctld_endpoint = utils.vtctld.rpc_endpoint()
utils.run(environment.binary_argstr('automation_client') +
' --server localhost:' + str(automation_server_port) +
' --task HorizontalReshardingTask' +
' --param keyspace=' + keyspace +
' --param source_shard_list=' + source_shard_list +
' --param dest_shard_list=' + dest_shard_list +
' --param source_shard_rdonly_list=' +
worker.shard_rdonly1.tablet_alias +
' --param dest_shard_rdonly_list=' +
worker.shard_0_rdonly1.tablet_alias + ',' +
worker.shard_1_rdonly1.tablet_alias +
' --param vtctld_endpoint=' + vtctld_endpoint +
' --param vtworker_endpoint=' + vtworker_endpoint)
self.assert_shard_data_equal(0, worker.shard_master,
worker.shard_0_tablets.replica)
self.assert_shard_data_equal(1, worker.shard_master,
worker.shard_1_tablets.replica)
utils.kill_sub_process(automation_server_proc, soft=True)
utils.kill_sub_process(worker_proc, soft=True)
开发者ID:haoqoo,项目名称:vitess,代码行数:34,代码来源:automation_horizontal_resharding.py
示例5: test_get_srv_keyspace_names
def test_get_srv_keyspace_names(self):
utils.run_vtctl('CreateKeyspace test_keyspace1')
utils.run_vtctl('CreateKeyspace test_keyspace2')
t1 = tablet.Tablet(tablet_uid=1, cell="nj")
t1.init_tablet("master", "test_keyspace1", "0")
t1.update_addrs()
t2 = tablet.Tablet(tablet_uid=2, cell="nj")
t2.init_tablet("master", "test_keyspace2", "0")
t2.update_addrs()
utils.run_vtctl('RebuildKeyspaceGraph /zk/global/vt/keyspaces/*', auto_log=True)
self.assertItemsEqual(self.topo.get_srv_keyspace_names('local'), ["test_keyspace1", "test_keyspace2"])
# zkocc API test
utils.prog_compile(['zkclient2'])
out, err = utils.run(utils.vtroot+'/bin/zkclient2 -server localhost:%u -mode getSrvKeyspaceNames test_nj' % utils.zkocc_port_base, trap_output=True)
self.assertEqual(err, "KeyspaceNames[0] = test_keyspace1\n" +
"KeyspaceNames[1] = test_keyspace2\n")
# vtgate zk API test
out, err = utils.run(utils.vtroot+'/bin/zkclient2 -server localhost:%u -mode getSrvKeyspaceNames test_nj' % self.vtgate_zk_port, trap_output=True)
self.assertEqual(err, "KeyspaceNames[0] = test_keyspace1\n" +
"KeyspaceNames[1] = test_keyspace2\n")
# vtgate zkocc API test
out, err = utils.run(utils.vtroot+'/bin/zkclient2 -server localhost:%u -mode getSrvKeyspaceNames test_nj' % self.vtgate_zkocc_port, trap_output=True)
self.assertEqual(err, "KeyspaceNames[0] = test_keyspace1\n" +
"KeyspaceNames[1] = test_keyspace2\n")
开发者ID:iamima,项目名称:vitess,代码行数:27,代码来源:zkocc_test.py
示例6: update_master
def update_master():
"""
Master branch is now checked out and needs updating.
"""
para("Step 4 of 5: Commit versions and push changes to master.")
utils.check_or_exit("Is your git repository now on master")
# Update the master files.
utils.update_files(MASTER_VERSION_REPLACE, release_data["versions"],
is_release=False)
new_version = release_data["versions"]["version"]
para("The master codebase has now been updated to reference the latest "
"release.")
para("Commit changes to master")
run("git add --all")
run('git commit -m "Update docs to version %s"' % new_version)
actions()
bullet("Self review the latest changes to master")
bullet("Run: git diff origin/master", level=1)
bullet("Push changes to master")
bullet("Run: git push origin master", level=1)
bullet("Verify builds are working")
next("Once complete, re-run the script")
开发者ID:xdongp,项目名称:calico-docker,代码行数:25,代码来源:do_release.py
示例7: test_unicode_support_in_output
def test_unicode_support_in_output(executor, expanded):
run(executor, "create table unicodechars(t text)")
run(executor, "insert into unicodechars (t) values ('é')")
# See issue #24, this raises an exception without proper handling
assert u'é' in run(executor, "select * from unicodechars",
join=True, expanded=expanded)
开发者ID:conorccamp,项目名称:pgcli,代码行数:7,代码来源:test_pgexecute.py
示例8: test_large_numbers_render_directly
def test_large_numbers_render_directly(executor, value):
run(executor, "create table vcli_test.numbertest(a numeric)")
run(executor,
"insert into vcli_test.numbertest (a) values ({0})".format(value))
assert value in run(executor, "select * from vcli_test.numbertest",
join=True)
开发者ID:adnanyaqoobvirk,项目名称:vcli,代码行数:7,代码来源:test_vexecute.py
示例9: test_copy_from_local_csv
def test_copy_from_local_csv(executor):
run(executor, """
create table vcli_test.people (
name varchar(50),
age integer)
""")
with tempfile.NamedTemporaryFile(delete=False) as f:
f.write('Alice,20\nBob,30\nCindy,40\n')
try:
run(executor, """
copy vcli_test.people from local '%s' delimiter ','
""" % f.name)
finally:
os.remove(f.name)
output = run(executor, "select * from vcli_test.people", join=True)
assert output == dedent("""\
+--------+-------+
| name | age |
|--------+-------|
| Alice | 20 |
| Bob | 30 |
| Cindy | 40 |
+--------+-------+""")
开发者ID:adnanyaqoobvirk,项目名称:vcli,代码行数:26,代码来源:test_vexecute.py
示例10: _xbuild
def _xbuild(self, **kwargs):
"""
Run xbuild on the solution
"""
cmd = '%s %s' % (self.xbuild_path, self.solution_file)
self.logger.info('This can take some time...')
run(cmd)
开发者ID:jness,项目名称:monotool,代码行数:7,代码来源:monotool.py
示例11: restore_customrules
def restore_customrules(self):
customrules = os.path.join(environment.tmproot, 'customrules.json')
self.create_customrules(customrules)
if environment.topo_server().flavor() == 'zookeeper':
utils.run(
environment.binary_argstr('zk') + ' cp ' + customrules +
' /zk/test_ca/config/customrules/testrules')
开发者ID:zhaoyta,项目名称:vitess,代码行数:7,代码来源:test_env.py
示例12: test_utils_cleanup
def test_utils_cleanup(self):
test_file = '/tmp/' + utils.test_name()
self.assertFalse(os.path.exists(test_file))
with utils.cleanup(['rm', test_file]):
utils.run(['touch', test_file])
self.assertTrue(os.path.exists(test_file))
self.assertFalse(os.path.exists(test_file))
开发者ID:vincent-legoll,项目名称:glancing,代码行数:7,代码来源:test_utils.py
示例13: iso
def iso(self, args):
"""validate the input and if no errors are found build an (unattended)
installation ISO from a regular install ISO"""
if not 'seed' in args:
args.seed = ''
if not 'config' in args:
args.config = ''
if not 'puppet' in args:
args.puppet = []
args, config = self._shared(args, 'iso')
if args.release in config['isos']:
iso_file = os.path.join(config['paths']['isos'], args.release)
iso_file += '.iso'
else:
raise self.exception('"%s" is not a valid release' % args.release)
if not os.path.isfile(iso_file):
raise self.exception('"%s" is a valid release, but the installer '
'ISO is not available (run "seedbank manage -i %s" to download '
'the ISO)' % (args.release, args.release))
if 'isofile' in args and args.isofile:
iso_dst = os.path.abspath(args.isofile)
else:
iso_dst = os.path.join(os.getcwd(), '%s.iso' % args.fqdn)
if os.path.isfile(iso_dst):
logging.warning('"%s" already exists, will overwrite', iso_dst)
build = iso.Build(config, iso_file, args.fqdn, iso_dst)
build.prepare()
if args.puppet:
build.add_puppet_manifests(args.fqdn)
template_cfg = settings.template(args.fqdn, args.overlay, args.config,
args.variables)
if args.overlay:
overlay = pimp.Overlay(self.cfg, args.overlay, args.fqdn)
overlay.prepare(template_cfg['seed'])
permissions = pimp.OverlayPermissions(self.cfg)
permissions.script(overlay.dst, args.overlay, '/target')
seed = pimp.SeedPimp(template_cfg, 'iso')
preseed = seed.pimp(args.seeds, args.overlay, args.puppet)
build.add_preseed(preseed)
distribution = args.release.split('-')[0]
build.add_templates(distribution)
if args.overlay:
build.add_overlay(overlay.dst)
build.non_free_firmware(args.release)
build.rebuild_initrd()
build.create()
logging.info('ISO "%s" has been created', iso_dst)
for hook in self.cfg['hooks_iso']['enable']:
#hook = utils.apply_template(hook, self.pxe_variables)
logging.info('found enable hook "%s"', hook)
utils.run(hook, error=True)
开发者ID:bobcanthelpyou,项目名称:seedBank,代码行数:60,代码来源:parse.py
示例14: action_test
def action_test(arguments):
graph = Graph.generate()
statistic = graph.statistic()
scores_reference = {n.name: round(v, 10) for n, v in
graph.markov_page_rank(arguments.steps,
arguments.probability).items()}
with tempfile.TemporaryDirectory() as temp_dir:
filename = os.path.join(temp_dir, 'graph.dot')
with open(filename, 'wb') as graph_file:
graph_file.write(('\n'.join(graph.graphviz)).encode('utf-8'))
process_stats, output_stats = utils.run(arguments.binary,
['-s', filename],
os.getcwd())
args = ['-r', str(arguments.steps), '-p', str(arguments.probability),
filename]
process_random, output_random = utils.run(arguments.binary,
args, os.getcwd())
args = ['-m', str(arguments.steps), '-p', str(arguments.probability),
filename]
process_markov, output_markov = utils.run(arguments.binary,
args, os.getcwd())
try:
utils.expect_stats(process_stats, output_stats, graph.name,
statistic.num_nodes, statistic.num_edges,
statistic.min_in, statistic.max_in,
statistic.min_out, statistic.max_out)
utils.expect_scores(process_random, output_random, scores_reference,
arguments.delta)
utils.expect_scores(process_markov, output_markov, scores_reference)
except utils.TestFailure as error:
print(error)
开发者ID:koehlma,项目名称:page_rank_tester,代码行数:32,代码来源:pagerank.py
示例15: start_cluster
def start_cluster(argv):
if len(argv) > 0:
start_cluster_nodes(argv)
return
print "Starting Cluster..."
if utils.is_rhel6():
# Verify that CMAN_QUORUM_TIMEOUT is set, if not, then we set it to 0
retval, output = commands.getstatusoutput('source /etc/sysconfig/cman ; [ -z "$CMAN_QUORUM_TIMEOUT" ]')
if retval == 0:
with open("/etc/sysconfig/cman", "a") as cman_conf_file:
cman_conf_file.write("\nCMAN_QUORUM_TIMEOUT=0\n")
output, retval = utils.run(["service", "cman","start"])
if retval != 0:
print output
utils.err("unable to start cman")
else:
output, retval = utils.run(["service", "corosync","start"])
if retval != 0:
print output
utils.err("unable to start corosync")
output, retval = utils.run(["service", "pacemaker", "start"])
if retval != 0:
print output
utils.err("unable to start pacemaker")
开发者ID:deriamis,项目名称:pcs,代码行数:26,代码来源:cluster.py
示例16: test_get_srv_keyspace_names
def test_get_srv_keyspace_names(self):
utils.run_vtctl('CreateKeyspace test_keyspace1')
utils.run_vtctl('CreateKeyspace test_keyspace2')
t1 = tablet.Tablet(tablet_uid=1, cell="nj")
t1.init_tablet("master", "test_keyspace1", "0")
t1.update_addrs()
t2 = tablet.Tablet(tablet_uid=2, cell="nj")
t2.init_tablet("master", "test_keyspace2", "0")
t2.update_addrs()
utils.run_vtctl(['RebuildKeyspaceGraph', 'test_keyspace*'], auto_log=True)
# vtgate API test
out, err = utils.run(environment.binary_path('zkclient2')+' -server localhost:%u -mode getSrvKeyspaceNames test_nj' % self.vtgate_zk_port, trap_output=True)
self.assertEqual(err, "KeyspaceNames[0] = test_keyspace1\n" +
"KeyspaceNames[1] = test_keyspace2\n")
if environment.topo_server_implementation == 'zookeeper':
self.assertItemsEqual(self.topo.get_srv_keyspace_names('local'), ["test_keyspace1", "test_keyspace2"])
# zkocc API test
out, err = utils.run(environment.binary_path('zkclient2')+' -server localhost:%u -mode getSrvKeyspaceNames test_nj' % environment.zkocc_port_base, trap_output=True)
self.assertEqual(err, "KeyspaceNames[0] = test_keyspace1\n" +
"KeyspaceNames[1] = test_keyspace2\n")
# vtgate zkocc API test
out, err = utils.run(environment.binary_path('zkclient2')+' -server localhost:%u -mode getSrvKeyspaceNames test_nj' % self.vtgate_zkocc_port, trap_output=True)
self.assertEqual(err, "KeyspaceNames[0] = test_keyspace1\n" +
"KeyspaceNames[1] = test_keyspace2\n")
开发者ID:Abioy,项目名称:vitess,代码行数:28,代码来源:zkocc_test.py
示例17: test_annotate_coverage_filename
def test_annotate_coverage_filename(self):
"Does the '-c' flag work for figleaf-annotate?"
TEST_FILENAME = '.figleaf_blah'
# fail to list anything (no .figleaf_blah file)
status, out, errout = utils.run('figleaf-annotate', 'list',
'-c', TEST_FILENAME)
assert status != 0
# now run coverage...
status, out, errout = utils.run('figleaf', 'tst-cover.py')
print out, errout
assert status == 0
# rename coverage output file
os.rename('.figleaf', TEST_FILENAME)
# now list files that are covered in that recording...
status, out, errout = utils.run('figleaf-annotate', 'list',
'-c', TEST_FILENAME)
assert status == 0
assert out.find("\ntst-cover.py\n")
# be sure to remove the file.
os.unlink(TEST_FILENAME)
开发者ID:Chaos99,项目名称:cachetools,代码行数:26,代码来源:__init__.py
示例18: get_default_properties
def get_default_properties():
(output, retVal) = utils.run([settings.pengine_binary, "metadata"])
if retVal != 0:
utils.err("unable to get pengine metadata\n"+output)
pe_root = ET.fromstring(output)
(output, retVal) = utils.run([settings.crmd_binary, "metadata"])
if retVal != 0:
utils.err("unable to get crmd metadata\n"+output)
crmd_root = ET.fromstring(output)
(output, retVal) = utils.run([settings.cib_binary, "metadata"])
if retVal != 0:
utils.err("unable to get cib metadata\n"+output)
cib_root = ET.fromstring(output)
parameters = {}
for root in [pe_root, crmd_root, cib_root]:
for param in root.getiterator('parameter'):
name = param.attrib["name"]
content = param.find("content")
if content is not None:
default = content.attrib["default"]
else:
default = ""
parameters[name] = default
return parameters
开发者ID:tradej,项目名称:pcs,代码行数:28,代码来源:prop.py
示例19: run_app
def run_app(app):
app_module = utils.get_app_module(app)
require_password = config.get(app,'REQUIRE_PASSWORD') == "True"
if(require_password):
password_hash = get_password_hash(app_module)
pass_param = " -p " + password_hash
else:
pass_param = ""
# register job with broker
cmd = BROKER_PATH + " -r -a " + app + pass_param
data = utils.run(cmd = cmd, local_host = SERVER_IP, remote_host = BROKER_IP)
job = json.loads(data[0])
# run the new job
cmd = prep_run_command(app_module, job)
proc = Popen(cmd)
child_pid = proc.pid
# give broker the job's pid
key_values = {"job_id": job["job_id"], "pid" : child_pid}
key_values = json.dumps(key_values)
cmd = BROKER_PATH + " -u '{}'".format(key_values)
utils.run(cmd = cmd, local_host = SERVER_IP, remote_host = BROKER_IP)
(output, error) = proc.communicate()
if error:
print("error:", error)
print("output:", output)
开发者ID:LLNL-Collaboration,项目名称:uiuc2015,代码行数:29,代码来源:server.py
示例20: full_status
def full_status():
if "--full" in utils.pcs_options:
(output, retval) = utils.run(["crm_mon", "-1", "-r", "-R", "-A", "-f"])
else:
(output, retval) = utils.run(["crm_mon", "-1", "-r"])
if (retval != 0):
utils.err("cluster is not currently running on this node")
if not utils.usefile or "--corosync_conf" in utils.pcs_options:
cluster_name = utils.getClusterName()
print "Cluster name: %s" % cluster_name
if utils.stonithCheck():
print("WARNING: no stonith devices and stonith-enabled is not false")
if utils.corosyncPacemakerNodeCheck():
print("WARNING: corosync and pacemaker node names do not match (IPs used in setup?)")
print output
if not utils.usefile:
if not utils.is_rhel6():
print "PCSD Status:"
cluster.cluster_gui_status([],True)
print ""
utils.serviceStatus(" ")
开发者ID:disco-stu,项目名称:pcs,代码行数:27,代码来源:status.py
注:本文中的utils.run函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论