本文整理汇总了Python中shlex.quote函数的典型用法代码示例。如果您正苦于以下问题:Python quote函数的具体用法?Python quote怎么用?Python quote使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了quote函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _novoalign_command
def _novoalign_command(path, args, ncpu, reference, sample_name, read1, read2=None):
"""
Args:
path (str): path to aligner executable
args (str): raw arguments to be passed to the aligner
ncpu: number of alignment threads to launch
reference: (str): reference filename
sample_name (str):
read1 (str): absolute path to read1 fastq[.gz|.bz2]
read2 (str): absolute path to read2 fastq[.gz|.bz2]
Returns:
string: command to execute bowtie2 aligner
"""
import os
aligner_command = '{novoalign} -d {dbname} -f {read1} {read2} {paired_string} -c {ncpu} -o SAM {bam_string} {novoalign_args}'.format(**{
'novoalign': path,
'dbname': shlex.quote(reference + '.idx'),
'read1': shlex.quote(read1),
'read2': shlex.quote(read2) if read2 else '',
'paired_string': '-i PE 500,100' if read2 else '',
'ncpu': shlex.quote(str(ncpu)),
'bam_string': shlex.quote('@RG\\tID:{sample_name}\\tSM:{sample_name}'.format(sample_name=sample_name)),
'novoalign_args': ' '.join(map(shlex.quote, shlex.split(args)))
})
return aligner_command
开发者ID:TGenNorth,项目名称:NASP,代码行数:28,代码来源:dispatcher.py
示例2: bash_complete
def bash_complete(self, prefix, line, begidx, endidx):
"""Attempts BASH completion."""
splt = line.split()
cmd = splt[0]
func = self.bash_complete_funcs.get(cmd, None)
fnme = self.bash_complete_files.get(cmd, None)
if func is None or fnme is None:
return set()
idx = n = 0
for n, tok in enumerate(splt):
if tok == prefix:
idx = line.find(prefix, idx)
if idx >= begidx:
break
prev = tok
if len(prefix) == 0:
prefix = '""'
n += 1
else:
prefix = shlex.quote(prefix)
script = BASH_COMPLETE_SCRIPT.format(
filename=fnme, line=' '.join(shlex.quote(p) for p in splt),
comp_line=shlex.quote(line), n=n, func=func, cmd=cmd,
end=endidx + 1, prefix=prefix, prev=shlex.quote(prev))
try:
out = subprocess.check_output(
['bash'], input=script, universal_newlines=True,
stderr=subprocess.PIPE, env=builtins.__xonsh_env__.detype())
except subprocess.CalledProcessError:
out = ''
rtn = set(out.splitlines())
return rtn
开发者ID:mwiebe,项目名称:xonsh,代码行数:34,代码来源:completer.py
示例3: png_add_mask_and_drop_shadow
def png_add_mask_and_drop_shadow(source_filepath, mask_filepath, destination_filepath, shadow_offset=10):
"""
Resize source png at 205x280 (75dpi), then cut out with 75 dpi mask, then add drop shadow
:param str source_filepath: Path to source png file
:param str mask_filepath: Path to mask png file
:param str destination_filepath: Path to save result file
:param int shadow_offset: Offset of shadow in pixels
:return:
"""
quoted_source_filepath = shlex.quote(source_filepath)
quoted_mask_filepath = shlex.quote(mask_filepath)
quoted_destination_filepath = shlex.quote(destination_filepath)
command = "convert \( {source_filepath} -resize 205x280 {mask_filepath} -alpha Off -compose copyopacity -composite \) \
-background black \( +clone -shadow 60x{offset}+{offset}+{offset} \) +swap \
-compose Over -composite +repage \
{destination_filepath}".format(
source_filepath=quoted_source_filepath,
mask_filepath=quoted_mask_filepath,
destination_filepath=quoted_destination_filepath,
offset=shadow_offset)
# execute command
status = subprocess.call(shlex.split(command))
if status != 0:
exit(status)
开发者ID:GammaNu,项目名称:Geconomicus,代码行数:27,代码来源:build_cards.py
示例4: file
def file(self, root, filename):
self.file_count += 1
fullpath = os.path.join(root, filename)
byte_size = os.lstat(fullpath).st_size
self.size_total += byte_size
self.ctx.current_file = fullpath
local_vars = {
'_': os.path.basename(filename),
'p': fullpath,
'ap': os.path.abspath(fullpath),
'apq': shlex.quote(os.path.abspath(fullpath)),
'pq': shlex.quote(fullpath),
'q': shlex.quote(filename),
}
fmt = string.Formatter()
for (literal_text, field_name, format_spec, _) in fmt.parse(self.fmt_str):
if literal_text is not None:
sys.stdout.write(literal_text)
if field_name is not None:
value = eval(field_name, self.global_vars, local_vars) # pylint: disable=W0123
sys.stdout.write(format(value, format_spec))
开发者ID:Grumbel,项目名称:dirtool,代码行数:28,代码来源:action.py
示例5: update_platform
def update_platform(workspace_id: int, platform_id: int, platform_data) -> dict:
"""
Update the platform entry
:param workspace_id:
:param platform_id:
:return: The updated platform definition
"""
platform_name = shlex.quote(platform_data['name'])
platform_url = shlex.quote(platform_data['url'])
session = db_session()
workspace = session.query(Workspace).filter(Workspace.id == workspace_id).first()
if workspace is None:
raise NotFound("workspace with id {} could not be found".format(workspace_id))
platform = session.query(Platform). \
filter(Platform.workspace == workspace). \
filter(Platform.id == platform_id). \
first()
if platform is None:
raise NotFound("Platform with id {} could not be found".format(platform_id))
if platform_name != platform.name:
existing_platforms = session.query(Platform). \
filter(Platform.workspace == workspace). \
filter(Platform.name == platform_data['name']). \
all()
if len(existing_platforms) > 0:
raise NameConflict("Platform with name {} already exists".format(platform_data['name']))
platform.name = platform_name
platform.url = platform_url
update_workspace_descriptor(platform.workspace)
session.commit()
return platform.as_dict()
开发者ID:chrz89,项目名称:upb-son-editor-backend,代码行数:35,代码来源:platformsimpl.py
示例6: run_configure_script
def run_configure_script(self):
"runs configure-landscape, returns output (LDS hostname)"
ldscreds = self.config.getopt('landscapecreds')
args = {"bin": self.lscape_configure_bin,
"admin_email": shlex.quote(ldscreds['admin_email']),
"admin_name": shlex.quote(ldscreds['admin_name']),
"sys_email": shlex.quote(ldscreds['system_email']),
"maas_host": shlex.quote(
self.config.getopt('maascreds')['api_host'])}
cmd = ("{bin} --admin-email {admin_email} "
"--admin-name {admin_name} "
"--system-email {sys_email} "
"--maas-host {maas_host}".format(**args))
log.debug("Running landscape configure: {}".format(cmd))
out = utils.get_command_output(cmd, timeout=None)
if out['status']:
log.error("Problem with configuring Landscape: {}.".format(out))
raise Exception("Error configuring Landscape.")
return out['output'].strip()
开发者ID:kavinsivak,项目名称:openstack-installer,代码行数:25,代码来源:multi.py
示例7: get_command
def get_command(self, file, **options):
# on darwin open returns immediately resulting in the temp
# file removal while app is opening
command = "open -a /Applications/Preview.app"
command = "(%s %s; sleep 20; rm -f %s)&" % (command, quote(file),
quote(file))
return command
开发者ID:gopal-neosoft,项目名称:Assignment,代码行数:7,代码来源:ImageShow.py
示例8: xcheck_host_prog
def xcheck_host_prog(conf, name, tool, wafname=None):
wafname = wafname or name
chost, chost_envar = get_chost_stuff(conf)
specific = None
if chost:
specific = os.environ.get('%s_%s' % (chost_envar, name))
if specific:
value = Utils.to_list(specific)
conf.env[wafname] += value
conf.msg('Will use cross-compilation %s from %s_%s' % (name, chost_envar, name),
" ".join(quote(x) for x in value))
return
else:
envar = os.environ.get('HOST_%s' % name)
if envar is not None:
value = Utils.to_list(envar)
conf.env[wafname] = value
conf.msg('Will use cross-compilation %s from HOST_%s' % (name, name),
" ".join(quote(x) for x in value))
return
if conf.env[wafname]:
return
value = None
if chost:
value = '%s-%s' % (chost, tool)
if value:
conf.env[wafname] = value
conf.msg('Will use cross-compilation %s from CHOST' % wafname, value)
开发者ID:blablack,项目名称:ams-lv2,代码行数:34,代码来源:cross_gnu.py
示例9: __str__
def __str__(self):
ret = ''
for attr in 'cmd', 'ret_code', 'out', 'err':
value = getattr(self, attr, None)
if value is not None and str(value).strip():
mesg = ''
if attr == 'cmd' and self.cmd_kwargs.get('stdin_files'):
mesg += 'cat'
for file_path in self.cmd_kwargs.get('stdin_files'):
mesg += ' ' + quote(file_path)
mesg += ' | '
if attr == 'cmd' and isinstance(value, list):
mesg += ' '.join(quote(item) for item in value)
else:
mesg = str(value).strip()
if attr == 'cmd' and self.cmd_kwargs.get('stdin_str'):
mesg += ' <<<%s' % quote(self.cmd_kwargs.get('stdin_str'))
if len(mesg.splitlines()) > 1:
fmt = self.JOB_LOG_FMT_M
else:
fmt = self.JOB_LOG_FMT_1
if not mesg.endswith('\n'):
mesg += '\n'
ret += fmt % {
'cmd_key': self.cmd_key,
'attr': attr,
'mesg': mesg}
return ret.rstrip()
开发者ID:dpmatthews,项目名称:cylc,代码行数:28,代码来源:subprocctx.py
示例10: run
def run(self):
from subprocess import call
from pathlib import Path
import os.path
import inspect
import shlex
tm_path = shlex.quote(self.input()['tm'].fn)
lm_path = shlex.quote(self.input()['lm']['blm'].fn)
home = os.path.expanduser('~')
dir_name = Path(inspect.stack()[-1][1]).absolute().parent
current_dir_from_home = dir_name.relative_to(home)
print(current_dir_from_home)
toktagger_cmd = 'cd {cdir}; source {venv}; ./toktagger.py -t {tm} -l {lm} -f /'.format(
cdir='~/' + shlex.quote(str(current_dir_from_home)),
venv=shlex.quote('venv/bin/activate'),
tm=tm_path,
lm=lm_path)
print(toktagger_cmd)
parallel_cmd = 'parallel {params} -k --block-size {blocksize} --pipe {cmd}'.format(
params=self.parallel_params,
blocksize=self.parallel_blocksize,
cmd=shlex.quote(toktagger_cmd))
cmd = shlex.split(parallel_cmd)
print('running... ', parallel_cmd)
with self.input()['input'].open(
'r') as in_file, self.output().open('w') as out_file:
retcode = call(cmd, stdin=in_file, stdout=out_file)
assert retcode == 0
开发者ID:d2207197,项目名称:smttoktag,代码行数:35,代码来源:gentask.py
示例11: disp_kv
def disp_kv(key, val):
""" display a shell-escaped version of value ``val`` of ``key``,
using terminal 'dim' attribute for read-only variables.
"""
return (self.dim(shlex.quote(val))
if key in self.server.readonly_env
else shlex.quote(val))
开发者ID:signalpillar,项目名称:telnetlib3,代码行数:7,代码来源:telsh.py
示例12: keygen
def keygen(self, filename='', passphrase=''):
"""
Generate a public/private key pair and store them in ``filename``, encrypted by ``passphrase``
:param str filename: File name to store the private key in. The file name for the public key
will be derived from this name by suffixing it with ``.pub``
:param str passphrase: The passphrase used for encrypting the private key. Please note this passphrase
will only be accepted if it's longer than 4 characters. In case the passphrase
being empty or too short, ssh-keygen will ask for a passphrase using the system's
ssh-askpass mechanism.
"""
self._arguments.extend([
'-q',
'-t', self._algorithm,
'-b', str(self._keylength),
'-O', 'clear',
'-O', 'permit-pty',
'-C', shlex.quote('{user}@{host}'.format(user=os.getlogin(), host=os.uname().nodename)),
'-f', shlex.quote(filename)
])
if passphrase and len(passphrase) > 4:
self._arguments.extend([
'-N', shlex.quote(passphrase)
])
self._execute()
开发者ID:daemotron,项目名称:controlbeast,代码行数:25,代码来源:keygen.py
示例13: daemux_start
def daemux_start(transitions, session="pmjq", shell='sh'):
"""Instantiate the transitions, each in its own tmux window"""
for t in transitions:
t = normalize(t)
commands = []
# Template "directories" deals with watch-able templates
# that use a list as input
for dirs_key in [x for x in ["inputs", "outputs", "errors"]
if x in t]:
commands.append("watch -n1 "+shlex.quote(
COMMAND_TEMPLATES['directories'].format(
dirs=' '.join(
map(lambda d:
os.path.dirname(smart_unquote(d)),
t[dirs_key])))))
# Template "stderr" deals with the log files
if "stderr" in t:
commands.append("watch -n1 "+shlex.quote(
COMMAND_TEMPLATES['stderr'].format(
stderr=os.path.dirname(
smart_unquote(t['stderr'])))))
# The command
if shell == "sh":
commands.append(pmjq_command(t))
elif shell == 'fish':
commands.append(pmjq_command(t, redirect='^'))
# The other templates can be used as is
for k in [k for k in COMMAND_TEMPLATES
if k not in ['directories', 'stderr', 'cmd']]:
if k in t:
commands.append(COMMAND_TEMPLATES[k].format(**t))
for i, cmd in enumerate(commands):
daemux.start(cmd, session=session, window=t['id'], pane=i,
layout='tiled')
开发者ID:edouardklein,项目名称:pmjq,代码行数:34,代码来源:tools.py
示例14: _snap_command
def _snap_command(path, args, ncpu, reference, output_folder, sample_name, read1, read2=None):
"""
Args:
path (str): path to aligner executable
args (str): raw arguments to be passed to the aligner
ncpu: number of alignment threads to launch
reference: (str): reference filename
output_folder (str): directory for aligner output
sample_name (str):
read1 (str): absolute path to read1 fastq[.gz|.bz2]
read2 (str): absolute path to read2 fastq[.gz|.bz2]
Returns:
string: command to execute bowtie2 aligner
"""
import os
aligner_command = '{snap} {single_or_paired} {ref_dir} {read1} {read2} -t {ncpu} -b {snap_args} -o sam -'.format(**{
'snap': path,
'single_or_paired': 'paired' if read2 else 'single',
'ref_dir': shlex.quote(os.path.join(output_folder, 'reference', 'snap')),
'read1': shlex.quote(read1),
'read2': shlex.quote(read2) if read2 else '',
'ncpu': shlex.quote(str(ncpu)),
'snap_args': ' '.join(map(shlex.quote, shlex.split(args)))
})
return aligner_command
开发者ID:TGenNorth,项目名称:NASP,代码行数:27,代码来源:dispatcher.py
示例15: start
def start(self, collection, docker, ping, database_name):
options = self.options
"""Launches a cAdvisor container on the instance."""
volumes = {
'/': {'bind': '/rootfs', 'ro': True},
'/var/run': {'bind': '/var/run', 'ro': False},
'/sys': {'bind': '/sys', 'ro': True},
'/var/lib/docker': {'bind': '/var/lib/docker', 'ro': True}
}
logger.debug("cAdvisor: Writing stats to %s" % database_name)
command_args = " ".join([
"-storage_driver=influxdb",
"-log_dir=/",
"-storage_driver_db=%s" % quote(database_name),
"-storage_driver_host=%s:%d" % (quote(options.host),
options.port),
"-storage_driver_user=%s" % quote(options.user),
"-storage_driver_password=%s" % quote(options.password),
"-storage_driver_secure=%d" % options.secure,
# TODO: Calculate based on the run time.
"-storage_driver_buffer_duration=5s"
])
yield docker.run_containers(collection, self.info.name,
None, command_args, volumes,
ports={8080: 8080})
yield self.wait(collection, ping)
开发者ID:kitcambridge,项目名称:loads-broker,代码行数:28,代码来源:extensions.py
示例16: install
def install(
self,
ssh_client: paramiko.client.SSHClient,
cluster: FlintrockCluster):
print("[{h}] Installing Spark...".format(
h=ssh_client.get_transport().getpeername()[0]))
try:
if self.version:
with ssh_client.open_sftp() as sftp:
sftp.put(
localpath=os.path.join(SCRIPTS_DIR, 'install-spark.sh'),
remotepath='/tmp/install-spark.sh')
sftp.chmod(path='/tmp/install-spark.sh', mode=0o755)
url = self.download_source.format(v=self.version)
ssh_check_output(
client=ssh_client,
command="""
set -e
/tmp/install-spark.sh {url}
rm -f /tmp/install-spark.sh
""".format(url=shlex.quote(url)))
else:
ssh_check_output(
client=ssh_client,
command="""
set -e
sudo yum install -y git
sudo yum install -y java-devel
""")
ssh_check_output(
client=ssh_client,
command="""
set -e
git clone {repo} spark
cd spark
git reset --hard {commit}
if [ -e "make-distribution.sh" ]; then
./make-distribution.sh -Phadoop-2.6
else
./dev/make-distribution.sh -Phadoop-2.6
fi
""".format(
repo=shlex.quote(self.git_repository),
commit=shlex.quote(self.git_commit)))
ssh_check_output(
client=ssh_client,
command="""
set -e
for f in $(find spark/bin -type f -executable -not -name '*.cmd'); do
sudo ln -s "$(pwd)/$f" "/usr/local/bin/$(basename $f)"
done
echo "export SPARK_HOME='$(pwd)/spark'" >> .bashrc
""")
except Exception as e:
# TODO: This should be a more specific exception.
print("Error: Failed to install Spark.", file=sys.stderr)
print(e, file=sys.stderr)
raise
开发者ID:ereed-tesla,项目名称:flintrock,代码行数:60,代码来源:services.py
示例17: async_exec
async def async_exec(*args, display=False):
"""Execute, return code & log."""
argsp = []
for arg in args:
if os.path.isfile(arg):
argsp.append("\\\n {}".format(shlex.quote(arg)))
else:
argsp.append(shlex.quote(arg))
printc('cyan', *argsp)
try:
kwargs = {'loop': LOOP, 'stdout': asyncio.subprocess.PIPE,
'stderr': asyncio.subprocess.STDOUT}
if display:
kwargs['stderr'] = asyncio.subprocess.PIPE
proc = await asyncio.create_subprocess_exec(*args, **kwargs)
except FileNotFoundError as err:
printc(FAIL, "Could not execute {}. Did you install test requirements?"
.format(args[0]))
raise err
if not display:
# Readin stdout into log
stdout, _ = await proc.communicate()
else:
# read child's stdout/stderr concurrently (capture and display)
stdout, _ = await asyncio.gather(
read_stream(proc.stdout, sys.stdout.write),
read_stream(proc.stderr, sys.stderr.write))
exit_code = await proc.wait()
stdout = stdout.decode('utf-8')
return exit_code, stdout
开发者ID:EarthlingRich,项目名称:home-assistant,代码行数:31,代码来源:lazytox.py
示例18: configure
def configure(
self,
ssh_client: paramiko.client.SSHClient,
cluster: FlintrockCluster):
template_paths = [
'spark/conf/spark-env.sh',
'spark/conf/slaves',
]
ssh_check_output(
client=ssh_client,
command="mkdir -p spark/conf",
)
for template_path in template_paths:
ssh_check_output(
client=ssh_client,
command="""
echo {f} > {p}
""".format(
f=shlex.quote(
get_formatted_template(
path=os.path.join(THIS_DIR, "templates", template_path),
mapping=generate_template_mapping(
cluster=cluster,
spark_executor_instances=self.spark_executor_instances,
hadoop_version=self.hadoop_version,
spark_version=self.version or self.git_commit,
))),
p=shlex.quote(template_path)))
开发者ID:adotmob,项目名称:flintrock,代码行数:31,代码来源:services.py
示例19: send_notify
def send_notify(self, uid, title, body):
icon,name,surname = self._get_icon(uid)
summary = title + ", " + name + ' ' + surname
summary = shlex.quote(summary)
body = shlex.quote(self._sanitize(body))
os.system("notify-send -i {icon} {summary} {body}".format(
icon = icon, summary = summary, body = body))
开发者ID:Uran198,项目名称:vk_notify,代码行数:7,代码来源:vk.py
示例20: runExtensionCommand
def runExtensionCommand(self, command, filefilter, defaultext):
import shlex
of = ('%of' in command)
html = ('%html' in command)
if of:
if defaultext and not filefilter:
filefilter = '*'+defaultext
fileName = QFileDialog.getSaveFileName(self,
self.tr('Export document'), '', filefilter)[0]
if not fileName:
return
if defaultext and not QFileInfo(fileName).suffix():
fileName += defaultext
else:
fileName = 'out' + defaultext
basename = '.%s.retext-temp' % self.currentTab.getBaseName()
if html:
tmpname = basename+'.html'
self.saveHtml(tmpname)
else:
tmpname = basename + self.currentTab.getActiveMarkupClass().default_extension
self.currentTab.writeTextToFile(tmpname)
command = command.replace('%of', shlex.quote(fileName))
command = command.replace('%html' if html else '%if', shlex.quote(tmpname))
try:
Popen(str(command), shell=True).wait()
except Exception as error:
errorstr = str(error)
QMessageBox.warning(self, '', self.tr('Failed to execute the command:')
+ '\n' + errorstr)
QFile(tmpname).remove()
开发者ID:retext-project,项目名称:retext,代码行数:31,代码来源:window.py
注:本文中的shlex.quote函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论