本文整理汇总了Python中st2common.util.shell.quote_unix函数的典型用法代码示例。如果您正苦于以下问题:Python quote_unix函数的具体用法?Python quote_unix怎么用?Python quote_unix使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了quote_unix函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: get_pack_base_path
def get_pack_base_path(pack_name):
"""
Return full absolute base path to the content pack directory.
Note: This function looks for a pack in all the load paths and return path to the first pack
which matched the provided name.
If a pack is not found, we return a pack which points to the first packs directory (this is
here for backward compatibility reasons).
:param pack_name: Content pack name.
:type pack_name: ``str``
:rtype: ``str``
"""
if not pack_name:
return None
packs_base_paths = get_packs_base_paths()
for packs_base_path in packs_base_paths:
pack_base_path = os.path.join(packs_base_path, quote_unix(pack_name))
pack_base_path = os.path.abspath(pack_base_path)
if os.path.isdir(pack_base_path):
return pack_base_path
# Path with the provided name not found
pack_base_path = os.path.join(packs_base_paths[0], quote_unix(pack_name))
pack_base_path = os.path.abspath(pack_base_path)
return pack_base_path
开发者ID:pixelrebel,项目名称:st2,代码行数:30,代码来源:utils.py
示例2: get_full_command_string
def get_full_command_string(self):
# Note: We pass -E to sudo because we want to preserve user provided environment variables
env_str = self._get_env_vars_export_string()
cwd = self.get_cwd()
if self.sudo:
if env_str:
command = quote_unix('%s && cd %s && %s' % (env_str, cwd, self.command))
else:
command = quote_unix('cd %s && %s' % (cwd, self.command))
sudo_arguments = ' '.join(self._get_common_sudo_arguments())
command = 'sudo %s -- bash -c %s' % (sudo_arguments, command)
if self.sudo_password:
command = ('set +o history ; echo -e %s | %s' %
(quote_unix('%s\n' % (self.sudo_password)), command))
else:
if env_str:
command = '%s && cd %s && %s' % (env_str, cwd,
self.command)
else:
command = 'cd %s && %s' % (cwd, self.command)
LOG.debug('Command to run on remote host will be: %s', command)
return command
开发者ID:StackStorm,项目名称:st2,代码行数:26,代码来源:paramiko_command_action.py
示例3: _get_script_arguments
def _get_script_arguments(self, named_args=None, positional_args=None):
"""
Build a string of named and positional arguments which are passed to the
script.
:param named_args: Dictionary with named arguments.
:type named_args: ``dict``.
:param positional_args: List with positional arguments.
:type positional_args: ``dict``.
:rtype: ``str``
"""
command_parts = []
# add all named_args in the format <kwarg_op>name=value (e.g. --name=value)
if named_args is not None:
for (arg, value) in six.iteritems(named_args):
if value is None or (isinstance(value, (str, six.text_type)) and len(value) < 1):
LOG.debug('Ignoring arg %s as its value is %s.', arg, value)
continue
if isinstance(value, bool):
if value is True:
command_parts.append(arg)
else:
values = (quote_unix(arg), quote_unix(six.text_type(value)))
command_parts.append(six.text_type('%s=%s' % values))
# add the positional args
if positional_args:
quoted_pos_args = [quote_unix(pos_arg) for pos_arg in positional_args]
pos_args_string = ' '.join(quoted_pos_args)
command_parts.append(pos_args_string)
return ' '.join(command_parts)
开发者ID:lyandut,项目名称:st2,代码行数:35,代码来源:action.py
示例4: put
def put(self, local_path, remote_path, mode=None, mirror_local_mode=False):
"""
Upload a file to the remote node.
:type local_path: ``st``
:param local_path: File path on the local node.
:type remote_path: ``str``
:param remote_path: File path on the remote node.
:type mode: ``int``
:param mode: Permissions mode for the file. E.g. 0744.
:type mirror_local_mode: ``int``
:param mirror_local_mode: Should remote file mirror local mode.
:return: Attributes of the remote file.
:rtype: :class:`posix.stat_result` or ``None``
"""
if not local_path or not remote_path:
raise Exception("Need both local_path and remote_path. local: %s, remote: %s" % local_path, remote_path)
local_path = quote_unix(local_path)
remote_path = quote_unix(remote_path)
extra = {
"_local_path": local_path,
"_remote_path": remote_path,
"_mode": mode,
"_mirror_local_mode": mirror_local_mode,
}
self.logger.debug("Uploading file", extra=extra)
if not os.path.exists(local_path):
raise Exception("Path %s does not exist locally." % local_path)
rattrs = self.sftp.put(local_path, remote_path)
if mode or mirror_local_mode:
local_mode = mode
if not mode or mirror_local_mode:
local_mode = os.stat(local_path).st_mode
# Cast to octal integer in case of string
if isinstance(local_mode, basestring):
local_mode = int(local_mode, 8)
local_mode = local_mode & 07777
remote_mode = rattrs.st_mode
# Only bitshift if we actually got an remote_mode
if remote_mode is not None:
remote_mode = remote_mode & 07777
if local_mode != remote_mode:
self.sftp.chmod(remote_path, local_mode)
return rattrs
开发者ID:alfasin,项目名称:st2,代码行数:55,代码来源:paramiko_ssh.py
示例5: _get_env_vars_export_string
def _get_env_vars_export_string(self):
if self.env_vars:
# Envrionment variables could contain spaces and open us to shell
# injection attacks. Always quote the key and the value.
exports = ' '.join(
'%s=%s' % (quote_unix(k), quote_unix(v))
for k, v in self.env_vars.iteritems()
)
shell_env_str = '%s %s' % (ShellCommandAction.EXPORT_CMD, exports)
else:
shell_env_str = ''
return shell_env_str
开发者ID:Plexxi,项目名称:st2,代码行数:13,代码来源:action.py
示例6: get_pack_base_path
def get_pack_base_path(pack_name, include_trailing_slash=False, use_pack_cache=False):
"""
Return full absolute base path to the content pack directory.
Note: This function looks for a pack in all the load paths and return path to the first pack
which matched the provided name.
If a pack is not found, we return a pack which points to the first packs directory (this is
here for backward compatibility reasons).
:param pack_name: Content pack name.
:type pack_name: ``str``
:param include_trailing_slash: True to include trailing slash.
:type include_trailing_slash: ``bool``
:param use_pack_cache: True to cache base paths on per-pack basis. This help in situations
where this method is called multiple times with the same pack name.
:type use_pack_cache`` ``bool``
:rtype: ``str``
"""
if not pack_name:
return None
if use_pack_cache and pack_name in PACK_NAME_TO_BASE_PATH_CACHE:
return PACK_NAME_TO_BASE_PATH_CACHE[pack_name]
packs_base_paths = get_packs_base_paths()
for packs_base_path in packs_base_paths:
pack_base_path = os.path.join(packs_base_path, quote_unix(pack_name))
pack_base_path = os.path.abspath(pack_base_path)
if os.path.isdir(pack_base_path):
if include_trailing_slash and not pack_base_path.endswith(os.path.sep):
pack_base_path += os.path.sep
PACK_NAME_TO_BASE_PATH_CACHE[pack_name] = pack_base_path
return pack_base_path
# Path with the provided name not found
pack_base_path = os.path.join(packs_base_paths[0], quote_unix(pack_name))
pack_base_path = os.path.abspath(pack_base_path)
if include_trailing_slash and not pack_base_path.endswith(os.path.sep):
pack_base_path += os.path.sep
PACK_NAME_TO_BASE_PATH_CACHE[pack_name] = pack_base_path
return pack_base_path
开发者ID:nzlosh,项目名称:st2,代码行数:49,代码来源:utils.py
示例7: _get_script_arguments
def _get_script_arguments(self, named_args=None, positional_args=None):
"""
Build a string of named and positional arguments which are passed to the
script.
:param named_args: Dictionary with named arguments.
:type named_args: ``dict``.
:param positional_args: List with positional arguments.
:type positional_args: ``dict``.
:rtype: ``str``
"""
command_parts = []
# add all named_args in the format <kwarg_op>name=value (e.g. --name=value)
if named_args is not None:
for (arg, value) in six.iteritems(named_args):
if value is None or (isinstance(value, (str, unicode)) and len(value) < 1):
LOG.debug("Ignoring arg %s as its value is %s.", arg, value)
continue
if isinstance(value, bool):
if value is True:
command_parts.append(arg)
else:
command_parts.append("%s=%s" % (arg, quote_unix(str(value))))
# add the positional args
if positional_args:
command_parts.append(positional_args)
return " ".join(command_parts)
开发者ID:ipv1337,项目名称:st2,代码行数:32,代码来源:action.py
示例8: get_deb_package_list
def get_deb_package_list(name_startswith):
cmd = 'dpkg -l | grep %s' % (quote_unix(name_startswith))
exit_code, stdout, _ = run_command(cmd=cmd, shell=True)
lines = stdout.split('\n')
packages = []
for line in lines:
line = line.strip()
if not line:
continue
split = re.split(r'\s+', line)
name = split[1]
version = split[2]
if not name.startswith(name_startswith):
continue
item = {
'name': name,
'version': version
}
packages.append(item)
return packages
开发者ID:nzlosh,项目名称:st2,代码行数:27,代码来源:system_info.py
示例9: get_full_command_string
def get_full_command_string(self):
# Note: We pass -E to sudo because we want to preserve user provided
# environment variables
if self.sudo:
command = quote_unix(self.command)
command = 'sudo -E -- bash -c %s' % (command)
else:
if self.user and self.user != LOGGED_USER_USERNAME:
# Need to use sudo to run as a different user
user = quote_unix(self.user)
command = quote_unix(self.command)
command = 'sudo -E -u %s -- bash -c %s' % (user, command)
else:
command = self.command
return command
开发者ID:Kailashkatheth1,项目名称:st2,代码行数:16,代码来源:action.py
示例10: get_rpm_package_list
def get_rpm_package_list(name_startswith):
cmd = 'rpm -qa | grep %s' % (quote_unix(name_startswith))
exit_code, stdout, _ = run_command(cmd=cmd, shell=True)
lines = stdout.split('\n')
packages = []
for line in lines:
line = line.strip()
if not line:
continue
split = line.rsplit('.', 1)
split = split[0].split('-', 1)
name = split[0]
version = split[1]
if not name.startswith(name_startswith):
continue
item = {
'name': name,
'version': version
}
packages.append(item)
return packages
开发者ID:nzlosh,项目名称:st2,代码行数:28,代码来源:system_info.py
示例11: delete_dir
def delete_dir(self, path, force=False, timeout=None):
"""
Delete a dir on remote box.
:param path: Path to remote dir to be deleted.
:type path: ``str``
:param force: Optional Forcefully remove dir.
:type force: ``bool``
:param timeout: Optional Time to wait for dir to be deleted. Only relevant for force.
:type timeout: ``int``
:return: True if the file has been successfully deleted, False
otherwise.
:rtype: ``bool``
"""
path = quote_unix(path)
extra = {"_path": path}
if force:
command = "rm -rf %s" % path
extra["_command"] = command
extra["_force"] = force
self.logger.debug("Deleting dir", extra=extra)
return self.run(command, timeout=timeout)
self.logger.debug("Deleting dir", extra=extra)
return self.sftp.rmdir(path)
开发者ID:alfasin,项目名称:st2,代码行数:29,代码来源:paramiko_ssh.py
示例12: _setup_pack_virtualenv
def _setup_pack_virtualenv(self, pack_name, update=False):
"""
Setup virtual environment for the provided pack.
:param pack_name: Pack name.
:type pack_name: ``str``
"""
# Prevent directory traversal by whitelisting allowed characters in the
# pack name
if not re.match(PACK_NAME_WHITELIST, pack_name):
raise ValueError('Invalid pack name "%s"' % (pack_name))
self.logger.debug('Setting up virtualenv for pack "%s"' % (pack_name))
virtualenv_path = os.path.join(self._base_virtualenvs_path, quote_unix(pack_name))
# Ensure pack directory exists in one of the search paths
pack_path = get_pack_directory(pack_name=pack_name)
if not pack_path:
packs_base_paths = get_packs_base_paths()
search_paths = ', '.join(packs_base_paths)
msg = 'Pack "%s" is not installed. Looked in: %s' % (pack_name, search_paths)
raise Exception(msg)
if not os.path.exists(self._base_virtualenvs_path):
os.makedirs(self._base_virtualenvs_path)
# If we don't want to update, or if the virtualenv doesn't exist, let's create it.
if not update or not os.path.exists(virtualenv_path):
# 0. Delete virtual environment if it exists
self._remove_virtualenv(virtualenv_path=virtualenv_path)
# 1. Create virtual environment
self.logger.debug('Creating virtualenv for pack "%s" in "%s"' %
(pack_name, virtualenv_path))
self._create_virtualenv(virtualenv_path=virtualenv_path)
# 2. Install base requirements which are common to all the packs
self.logger.debug('Installing base requirements')
for requirement in BASE_PACK_REQUIREMENTS:
self._install_requirement(virtualenv_path=virtualenv_path,
requirement=requirement)
# 3. Install pack-specific requirements
requirements_file_path = os.path.join(pack_path, 'requirements.txt')
has_requirements = os.path.isfile(requirements_file_path)
if has_requirements:
self.logger.debug('Installing pack specific requirements from "%s"' %
(requirements_file_path))
self._install_requirements(virtualenv_path, requirements_file_path)
else:
self.logger.debug('No pack specific requirements found')
self.logger.debug('Virtualenv for pack "%s" successfully %s in "%s"' %
(pack_name,
'updated' if update else 'created',
virtualenv_path))
开发者ID:automotola,项目名称:st2,代码行数:59,代码来源:setup_virtualenv.py
示例13: get_full_command_string
def get_full_command_string(self):
# Note: We pass -E to sudo because we want to preserve user provided environment variables
if self.sudo:
command = quote_unix(self.command)
sudo_arguments = ' '.join(self._get_common_sudo_arguments())
command = 'sudo %s -- bash -c %s' % (sudo_arguments, command)
else:
if self.user and self.user != LOGGED_USER_USERNAME:
# Need to use sudo to run as a different (requested) user
user = quote_unix(self.user)
sudo_arguments = ' '.join(self._get_user_sudo_arguments(user=user))
command = quote_unix(self.command)
command = 'sudo %s -- bash -c %s' % (sudo_arguments, command)
else:
command = self.command
return command
开发者ID:Plexxi,项目名称:st2,代码行数:17,代码来源:action.py
示例14: _format_command
def _format_command(self):
script_arguments = self._get_script_arguments(named_args=self.named_args,
positional_args=self.positional_args)
if self.sudo:
if script_arguments:
command = quote_unix('%s %s' % (self.remote_script, script_arguments))
else:
command = quote_unix(self.remote_script)
command = 'sudo -E -- bash -c %s' % (command)
else:
script_path = quote_unix(self.remote_script)
if script_arguments:
command = '%s %s' % (script_path, script_arguments)
else:
command = script_path
return command
开发者ID:jonico,项目名称:st2,代码行数:20,代码来源:paramiko_script_action.py
示例15: _format_command
def _format_command(self):
script_arguments = self._get_script_arguments(named_args=self.named_args,
positional_args=self.positional_args)
if self.sudo:
if script_arguments:
command = quote_unix('%s %s' % (self.script_local_path_abs, script_arguments))
else:
command = quote_unix(self.script_local_path_abs)
sudo_arguments = ' '.join(self._get_common_sudo_arguments())
command = 'sudo %s -- bash -c %s' % (sudo_arguments, command)
else:
if self.user and self.user != LOGGED_USER_USERNAME:
# Need to use sudo to run as a different user
user = quote_unix(self.user)
if script_arguments:
command = quote_unix('%s %s' % (self.script_local_path_abs, script_arguments))
else:
command = quote_unix(self.script_local_path_abs)
sudo_arguments = ' '.join(self._get_user_sudo_arguments(user=user))
command = 'sudo %s -- bash -c %s' % (sudo_arguments, command)
else:
script_path = quote_unix(self.script_local_path_abs)
if script_arguments:
command = '%s %s' % (script_path, script_arguments)
else:
command = script_path
return command
开发者ID:lyandut,项目名称:st2,代码行数:31,代码来源:action.py
示例16: get_full_command_string
def get_full_command_string(self):
# Note: We pass -E to sudo because we want to preserve user provided
# environment variables
env_str = self._get_env_vars_export_string()
cwd = self.get_cwd()
if self.sudo:
if env_str:
command = quote_unix('%s && cd %s && %s' % (env_str, cwd, self.command))
else:
command = quote_unix('cd %s && %s' % (cwd, self.command))
command = 'sudo -E -- bash -c %s' % (command)
else:
if env_str:
command = '%s && cd %s && %s' % (env_str, cwd,
self.command)
else:
command = 'cd %s && %s' % (cwd, self.command)
LOG.debug('Command to run on remote host will be: %s', command)
return command
开发者ID:AlexeyDeyneko,项目名称:st2,代码行数:22,代码来源:paramiko_command_action.py
示例17: _get_command_string
def _get_command_string(self, cmd, args):
"""
Escape the command arguments and form a command string.
:type cmd: ``str``
:type args: ``list``
:rtype: ``str``
"""
assert isinstance(args, (list, tuple))
args = [quote_unix(arg) for arg in args]
args = ' '.join(args)
result = '%s %s' % (cmd, args)
return result
开发者ID:lyandut,项目名称:st2,代码行数:15,代码来源:action.py
示例18: _get_env_vars_export_string
def _get_env_vars_export_string(self):
if self.env_vars:
env_vars = copy.copy(self.env_vars)
# If sudo_password is provided, explicitly disable bash history to make sure password
# is not logged, because password is provided via command line
if self.sudo and self.sudo_password:
env_vars['HISTFILE'] = '/dev/null'
env_vars['HISTSIZE'] = '0'
# Sort the dict to guarantee consistent order
env_vars = collections.OrderedDict(sorted(env_vars.items()))
# Environment variables could contain spaces and open us to shell
# injection attacks. Always quote the key and the value.
exports = ' '.join(
'%s=%s' % (quote_unix(k), quote_unix(v))
for k, v in six.iteritems(env_vars)
)
shell_env_str = '%s %s' % (ShellCommandAction.EXPORT_CMD, exports)
else:
shell_env_str = ''
return shell_env_str
开发者ID:lyandut,项目名称:st2,代码行数:24,代码来源:action.py
示例19: _prepare_command
def _prepare_command(self, has_inputs, inputs_file_path):
LOG.debug('CloudSlang home: %s', self._cloudslang_home)
cloudslang_binary = os.path.join(self._cloudslang_home, 'bin/cslang')
LOG.debug('Using CloudSlang binary: %s', cloudslang_binary)
command_args = ['--f', self._flow_path,
'--cp', self._cloudslang_home]
if has_inputs:
command_args += ['--if', inputs_file_path]
command = cloudslang_binary + " run " + " ".join([quote_unix(arg) for arg in command_args])
LOG.info('Executing action via CloudSlangRunner: %s', self.runner_id)
LOG.debug('Command is: %s', command)
return command
开发者ID:AlexeyDeyneko,项目名称:st2,代码行数:16,代码来源:cloudslang_runner.py
示例20: mkdir
def mkdir(self, dir_path):
"""
Create a directory on remote box.
:param dir_path: Path to remote directory to be created.
:type dir_path: ``str``
:return: Returns nothing if successful else raises IOError exception.
:rtype: ``None``
"""
dir_path = quote_unix(dir_path)
extra = {"_dir_path": dir_path}
self.logger.debug("mkdir", extra=extra)
return self.sftp.mkdir(dir_path)
开发者ID:alfasin,项目名称:st2,代码行数:16,代码来源:paramiko_ssh.py
注:本文中的st2common.util.shell.quote_unix函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论