本文整理汇总了Python中utils.which函数的典型用法代码示例。如果您正苦于以下问题:Python which函数的具体用法?Python which怎么用?Python which使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了which函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: run_mongo_restore
def run_mongo_restore(self, restore, destination_uri, dump_dir, source_database_name,
log_file_name, dump_log_file_name,
exclude_system_users=None,
exclude_admin_system_users=None,
exclude_system_roles=None,
options=None):
if source_database_name:
source_dir = os.path.join(dump_dir, source_database_name)
else:
source_dir = dump_dir
workspace = self.get_task_workspace_dir(restore)
# IMPORTANT delete dump log file so the restore command would not break
dump_log_path = os.path.join(workspace, dump_dir, dump_log_file_name)
if os.path.exists(dump_log_path):
os.remove(dump_log_path)
if exclude_system_users:
self._delete_system_users_from_dump(restore, source_dir)
if exclude_admin_system_users:
self._delete_admin_system_users_from_dump(restore, source_dir)
if exclude_system_roles:
self._delete_roles_from_dump(restore, source_dir)
working_dir = workspace
log_path = os.path.join(workspace, log_file_name)
restore_cmd = [
which("mongoctl"),
"restore",
destination_uri,
source_dir
]
if options:
restore_cmd.extend(options)
restore_cmd_display = restore_cmd[:]
restore_cmd_display[restore_cmd_display.index("restore") + 1] = mask_mongo_uri(destination_uri)
logger.info("Running mongoctl restore command: %s" %
" ".join(restore_cmd_display))
returncode = execute_command_wrapper(restore_cmd,
output_path=log_path,
cwd=working_dir)
# read the last dump log line
last_line_tail_cmd = [which('tail'), '-1', log_path]
last_log_line = execute_command(last_line_tail_cmd)
if returncode:
raise RestoreError(returncode, last_log_line)
开发者ID:mongolab,项目名称:mongodb-backup-system,代码行数:57,代码来源:backup_assistant.py
示例2: __init__
def __init__(self, queue, condition, dbpath):
threading.Thread.__init__(self, name="FiledataThread")
self.queue = queue
self.condition = condition
self.dbpath = dbpath
self.running = True
self.soxi = which('soxi')
self.sox = which('sox')
self.soundstretch = which('soundstretch')
开发者ID:radiocicletta,项目名称:Scriptcicletti,代码行数:9,代码来源:remote.py
示例3: archlinux_env
def archlinux_env():
if utils.which('yaourt'):
pacman = 'yaourt'
elif utils.which('pacman'):
pacman = 'sudo pacman'
else:
return False, None
enabled_by_default = utils.which('pkgfile')
return enabled_by_default, pacman
开发者ID:jeinglis,项目名称:thefuckServer,代码行数:11,代码来源:archlinux.py
示例4: serveur
def serveur():
serveur_menu = menu.Menu()
serveur_menu_selection = serveur_menu.show(
{
utils.SERVEUR_APACHE: u'Apache',
utils.SERVEUR_LIGHTTPD: u'Lighttpd'
}, u'Menu installation serveur')
if serveur_menu_selection == utils.SERVEUR_APACHE:
if utils.which('apache2') is None:
print(u'_____ Installation d''apache _____')
os.system('apt-get -y install apache2')
else:
print(u'Apache est déjà installé')
elif serveur_menu_selection == utils.SERVEUR_LIGHTTPD:
if utils.which('lighttpd') is None:
print(u'_____ Installation de lighttpd _____')
os.system('apt-get -y install lighttpd')
f = open('/etc/php5/cgi/php.ini', 'r')
file_data = f.read()
f.close()
new_data = file_data.replace(';cgi.fix_pathinfo=1', 'cgi.fix_pathinfo=1')
f = open('/etc/php5/cgi/php.ini', 'w')
f.write(new_data)
f.close()
os.system('lighttpd-enable-mod fastcgi')
os.system('lighttpd-enable-mod fastcgi-php')
os.system('/etc/init.d/lighttpd force-reload')
os.system('/bin/echo \'## directory listing configuration## we disable the directory listing by default##$HTTP["url"] =~ "^/" { dir-listing.activate = "disable"}\' | /usr/bin/tee /etc/lighttpd/conf-available/20-disable-listing.conf')
os.system('/usr/sbin/lighty-enable-mod disable-listing')
os.system('/etc/init.d/lighttpd force-reload')
f = open('/etc/lighttpd/lighttpd.conf', 'r')
file_data = f.read()
f.close()
new_data = file_data.replace('# "mod_rewrite"', ' "mod_rewrite"')
new_data = new_data + '\r\n server.error-handler-404 = "/index.php?error=404"'
f = open('/etc/lighttpd/lighttpd.conf', 'w')
f.write(new_data)
f.close()
os.system('/etc/init.d/lighttpd force-reload')
else:
print(u'Lighttpd est déjà installé')
return serveur_menu_selection
开发者ID:capic,项目名称:installeur,代码行数:52,代码来源:common.py
示例5: mux_audio_video
def mux_audio_video(file_name, output_dir):
"""
Rebuild the AVI container for source file with hardsubbed video track
:param filename: Name of the file that had to be reencoded
:type filename: str
:param output_dir: Directory where to place raw hardsubbed video
:type output_dir: str
"""
list_of_files = [f for f in os.listdir(output_dir) if re.match(r'.*\.(xvid|audio)', f)]
input_param = []
map_param = []
count = 0
for f in reversed(list_of_files):
if f[-5:] == '.xvid':
video_file = '-i "' + output_dir + os.sep + f + '"'
else:
count = count + 1
input_param.append('-i "' + output_dir + os.sep + f + '"')
map_param.append('-map ' + str(count) + ':0')
# Gather data for the progress bar.
# If you have ffmpeg you have ffprobe, so it is not checked in REQUIRED_EXECUTABLES
command = '{ffprobe} -show_streams "{video_input}"'.format(
ffprobe=which('ffprobe')[0],
video_input=output_dir + os.sep + f,
)
thread = pexpect.spawn(command)
pl = thread.compile_pattern_list([
pexpect.EOF,
"nb_frames=(\d+)"
])
while True:
i = thread.expect_list(pl, timeout=None)
if i == 0: # EOF, Process exited
break
if i == 1: # Status
tot_frames = int(thread.match.group(1))
thread.close()
command = '{ffmpeg} -y {video_input} {input_params} -c copy -map 0:0 {map_params} "{dest_file}"'.format(
ffmpeg=which('ffmpeg')[0],
video_input=video_file,
input_params=' '.join(input_param),
map_params=' '.join(map_param),
dest_file=output_dir + os.sep + os.path.basename(file_name)
)
launch_process_with_progress_bar(command, REQUIRED_EXECUTABLES['ffmpeg'], tot_frames, 'Rebuilding file: ', verbose, debug)
# Cleaning some mess
for f in reversed(list_of_files):
os.remove(output_dir + os.sep + f)
开发者ID:torre76,项目名称:hardsub,代码行数:50,代码来源:avi.py
示例6: __zcat
def __zcat():
"""
get the path to the zcat/gzcat executable
or raise an exception
"""
global __zcat_path
if __zcat_path is not None:
return __zcat_path
__zcat_path = which("gzcat")
if not __zcat_path:
__zcat_path = which("zcat")
if not __zcat_path:
raise ValueError("Unable to find a zcat|gzcat executable in PATH!")
return __zcat_path
开发者ID:maikroeder,项目名称:gemtools,代码行数:15,代码来源:files.py
示例7: generate_new_keypair
def generate_new_keypair(self, settings):
"""
Calls :func:`openssh_generate_new_keypair` or
:func:`dropbear_generate_new_keypair` depending on what's available on the
system.
"""
self.ssh_log.debug('generate_new_keypair()')
users_ssh_dir = get_ssh_dir(self)
name = 'id_ecdsa'
keytype = None
bits = None
passphrase = ''
comment = ''
if 'name' in settings:
name = settings['name']
if 'keytype' in settings:
keytype = settings['keytype']
if 'bits' in settings:
bits = settings['bits']
if 'passphrase' in settings:
passphrase = settings['passphrase']
if 'comment' in settings:
comment = settings['comment']
log_metadata = {
'name': name,
'keytype': keytype,
'bits': bits,
'comment': comment
}
self.ssh_log.info("Generating new SSH keypair", metadata=log_metadata)
if which('ssh-keygen'): # Prefer OpenSSH
openssh_generate_new_keypair(
self,
name, # Name to use when generating the keypair
users_ssh_dir, # Path to save it
keytype=keytype,
passphrase=passphrase,
bits=bits,
comment=comment
)
elif which('dropbearkey'):
dropbear_generate_new_keypair(self,
name, # Name to use when generating the keypair
users_ssh_dir, # Path to save it
keytype=keytype,
passphrase=passphrase,
bits=bits,
comment=comment)
开发者ID:ArneBab,项目名称:GateOne,代码行数:48,代码来源:ssh.py
示例8: test_run_compress
def test_run_compress():
if which('nccopy') is None:
print("Could not find nccopy in path")
assert(False)
# retdict = nccompress.run_nccopy('simple_xy.nc','simple_xy.run_nccopy.nc',level=3,verbose=False,shuffle=True)
# pdb.set_trace()
retdict = nccompress.run_compress('simple_xy.nc','simple_xy.run_nccopy.nc',level=3,verbose=True,shuffle=True,nccopy=True,timing=False)
print(retdict)
assert (retdict['orig_size']/retdict['comp_size'] >= 5.)
assert (retdict['dlevel'] == 3)
assert retdict['shuffle']
assert nccompress.are_equal('simple_xy.nc','simple_xy.run_nccopy.nc',verbose=True)
# This requires nc2nc to be in the path. If nccompress/nc2nc.py has changed this will not be reflect
# any change until installation. This is a test for nccompres to correctly call nc2nc
retdict = nccompress.run_compress('simple_xy.nc','simple_xy.run_nc2nc.nc',level=3,verbose=True,shuffle=True,nccopy=False,timing=False)
print(retdict)
assert (retdict['orig_size']/retdict['comp_size'] >= 5.)
assert (retdict['dlevel'] == 3)
assert retdict['shuffle']
assert nccompress.are_equal('simple_xy.nc','simple_xy.run_nc2nc.nc',verbose=True)
assert nccompress.are_equal('simple_xy.run_nc2nc.nc','simple_xy.run_nccopy.nc',verbose=True)
开发者ID:coecms,项目名称:nccompress,代码行数:25,代码来源:test_nccompress.py
示例9: find_java
def find_java():
required_major = 1
required_minor = 7
exec_command = None
has_server_jvm = True
java_path = utils.which('java')
if not java_path:
print 'NOTE: No Java executable found in $PATH.'
sys.exit(1)
is_ok = False
java_version_out, _ = popen([java_path, '-version']).communicate()
# pylint: disable=E1103
match = re.search(java_build_regex, java_version_out)
if match:
major = int(match.group(1))
minor = int(match.group(2))
is_ok = major >= required_major and minor >= required_minor
if is_ok:
exec_command = [java_path, '-Xms1024m', '-server', '-XX:+TieredCompilation']
check_server_proc = popen(exec_command + ['-version'])
check_server_proc.communicate()
if check_server_proc.returncode != 0:
# Not all Java installs have server JVMs.
exec_command = exec_command.remove('-server')
has_server_jvm = False
if not is_ok:
print 'NOTE: Java executable version %d.%d or above not found in $PATH.' % (required_major, required_minor)
sys.exit(1)
print 'Java executable: %s%s' % (java_path, '' if has_server_jvm else ' (no server JVM)')
return exec_command
开发者ID:easingthemes,项目名称:devtools-frontend,代码行数:33,代码来源:compile_frontend.py
示例10: mux_audio_video
def mux_audio_video(file_name, output_dir):
"""
Rebuild the MP4 container for source file with hardsubbed video track
:param filename: Name of the file that had to be reencoded
:type filename: str
:param output_dir: Directory where to place raw hardsubbed video
:type output_dir: str
"""
base_file_name = get_base_file_name(file_name)
list_of_files = [f for f in os.listdir(output_dir) if re.match(base_file_name + r'.*\.(264|aac|audio)', f)]
file_param = []
for f in reversed(list_of_files):
file_param.append('-add "' + output_dir + os.sep + f + '"')
# If the destination file already exists, MP4Box adds the new tracks to it
output_file = output_dir + os.sep + os.path.basename(file_name)
index_out = 0
while os.path.isfile(output_file):
print (colorama.Style.BRIGHT + 'File ' + output_file + ' already exists' + colorama.Style.NORMAL);
index_out = index_out + 1
output_file = output_dir + os.sep + os.path.basename(file_name) + '_' + str(index_out)
command = '{MP4Box} {add_audio_opts} "{dest_file}"'.format(
MP4Box = which('MP4Box')[0],
add_audio_opts = ' '.join(file_param),
dest_file = output_file
)
launch_process_with_progress_bar(command, REQUIRED_EXECUTABLES['MP4Box'], 100, 'Rebuilding file: ', verbose, debug)
# Cleaning some mess
for f in reversed(list_of_files):
os.remove(output_dir + os.sep + f)
开发者ID:torre76,项目名称:hardsub,代码行数:31,代码来源:mp4.py
示例11: resolve_node_paths
def resolve_node_paths():
if has_valid_global_node():
if sys.platform == "win32":
return (utils.which('node'), utils.which('npm.cmd'))
return (utils.which('node'), utils.which('npm'))
has_installed_local_node = path.isfile(local_node_binary_path)
if has_installed_local_node:
return (local_node_binary_path, local_npm_binary_path)
if path.isdir(local_node_runtimes_path):
shutil.rmtree(local_node_runtimes_path)
if sys.platform == 'linux2' or sys.platform == 'darwin':
install_node()
return (local_node_binary_path, local_npm_binary_path)
print('ERROR: Please install the latest node.js LTS version using the Windows installer:')
print('https://nodejs.org/en/download/')
raise
开发者ID:easingthemes,项目名称:devtools-frontend,代码行数:16,代码来源:install_node_deps.py
示例12: open_sub_channel
def open_sub_channel(term, tws):
"""
Opens a sub-channel of communication by executing a new shell on the SSH
server using OpenSSH's `Master mode <http://en.wikibooks.org/wiki/OpenSSH/Cookbook/Multiplexing>`_
capability (it spawns a new slave) and returns the resulting
:class:`termio.Multiplex` instance. If a slave has already been opened for
this purpose it will re-use the existing channel.
"""
logging.debug("open_sub_channel() term: %s" % term)
global OPEN_SUBCHANNELS
if term in OPEN_SUBCHANNELS and OPEN_SUBCHANNELS[term].isalive():
# Use existing sub-channel (much faster this way)
return OPEN_SUBCHANNELS[term]
# NOTE: When connecting a slave via ssh you can't tell it to execute a
# command like you normally can (e.g. 'ssh [email protected] <some command>'). This
# is why we're using the termio.Multiplex.expect() functionality below...
session = tws.session
session_dir = tws.settings['session_dir']
session_path = os.path.join(session_dir, session)
if not session_path:
raise SSHMultiplexingException(_(
"SSH Plugin: Unable to open slave sub-channel."))
socket_path = None
# Find the SSH socket path...
for f in os.listdir(session_path):
if f.startswith('ssh:%s:' % term):
# Grab the SSH socket path from the file
for line in open(os.path.join(session_path, f)):
if line.startswith('SSH_SOCKET'):
# NOTE: This will includes quotes (which is fine):
socket_path = line.split('=')[1].strip()
# Interesting: When using an existing socket you don't need to give it all
# the same options as you used to open it but you still need to give it
# *something* in place of the hostname or it will report a syntax error and
# print out the help. So that's why I've put 'go_ssh_remote_cmd' below.
# ...but I could have just used 'foo' :)
if not socket_path:
raise SSHMultiplexingException(_(
"SSH Plugin: Unable to open slave sub-channel."))
users_ssh_dir = get_ssh_dir(tws)
ssh_config_path = os.path.join(users_ssh_dir, 'config')
if not os.path.exists(ssh_config_path):
# Create it (an empty one so ssh doesn't error out)
with open(ssh_config_path, 'w') as f:
f.write('\n')
# Hopefully 'go_ssh_remote_cmd' will be a clear enough indication of
# what is going on by anyone that has to review the logs...
ssh = which('ssh')
ssh_command = '%s -x -S%s -F%s go_ssh_remote_cmd' % (
ssh, socket_path, ssh_config_path)
logging.debug("ssh_command: %s" % ssh_command)
OPEN_SUBCHANNELS[term] = m = tws.new_multiplex(
ssh_command, "%s (sub)" % term)
# Using huge numbers here so we don't miss much (if anything) if the user
# executes something like "ps -ef".
fd = m.spawn(rows=100, cols=200) # Hopefully 100/200 lines/cols is enough
# ...if it isn't, well, that's not really what this is for :)
# Set the term title so it gets a proper name in the logs
m.writeline(u'echo -e "\\033]0;Term %s sub-channel\\007"' % term)
return m
开发者ID:PaulReiber,项目名称:GateOne,代码行数:60,代码来源:ssh.py
示例13: extract_audio
def extract_audio(file_name, output_dir):
"""
Extract all audio tracks from a AVI container
:param filename: Name of the file that contains audio track
:type filename: str
:param output_dir: Directory where to place raw audio track
:type output_dir: str
"""
# detect how many audio track
command = '{mplayer} -noconfig all -cache-min 0 -vo null -ao null -frames 0 -identify "{input_file}" 2>/dev/null | grep ID_AUDIO_ID'.format(mplayer=which('mplayer')[0], input_file=file_name)
if verbose:
print command
thread = pexpect.spawn(command)
pl = thread.compile_pattern_list([
pexpect.EOF,
"ID_AUDIO_ID=(\d+).*"
])
audio_tracks = []
while True:
i = thread.expect_list(pl, timeout=None)
if i == 0: # EOF, Process exited
break
if i == 1: # Status
audio_tracks.append(int(thread.match.group(1)))
thread.close()
# Now extract each audio track
for track in audio_tracks:
t_command = '{mplayer} -aid {track} -dumpaudio -dumpfile {dest_file} "{input_file}"'.format(
mplayer=which("mplayer")[0],
input_file=file_name,
track=track,
dest_file=output_dir + os.sep + "{}".format(track) + ".audio"
)
launch_process_with_progress_bar(t_command, REQUIRED_EXECUTABLES['mplayer'], 100, 'Extract audio track {}: '.format(track), verbose, debug)
开发者ID:torre76,项目名称:hardsub,代码行数:34,代码来源:avi.py
示例14: dump_backup
def dump_backup(self, backup, uri, destination, log_file_name, options=None):
mongoctl_exe = which("mongoctl")
if not mongoctl_exe:
raise MBSError("mongoctl exe not found in PATH")
dump_cmd = [mongoctl_exe, "--noninteractive", "dump", uri, "-o", destination]
if options:
dump_cmd.extend(options)
dump_cmd_display = dump_cmd[:]
# mask mongo uri
dump_cmd_display[3] = mask_mongo_uri(uri)
logger.info("Running dump command: %s" % " ".join(dump_cmd_display))
workspace = self.get_task_workspace_dir(backup)
log_path = os.path.join(workspace, destination, log_file_name)
last_error_line = {"line": ""}
def capture_last_error(line):
if is_mongo_error_log_line(line):
last_error_line["line"] = line
# execute dump command
return_code = execute_command_wrapper(dump_cmd, cwd=workspace, output_path=log_path,
on_output=capture_last_error)
# raise an error if return code is not 0
if return_code:
errors.raise_dump_error(return_code, last_error_line["line"])
开发者ID:mongolab,项目名称:mongodb-backup-system,代码行数:30,代码来源:backup_assistant.py
示例15: create_config_file
def create_config_file(config_file, random_music_home):
"""
Create a configuration file.
:param config_file: path to config file we are creating
:type config_file: str
:param random_music_home: home of random_music application (i.e. where
index files are stored
:type random_music_home: str
"""
sys.stdout.write("You do not appear to have a config file, lets create one!\n")
sys.stdout.write("Creating config file at %s\n" % config_file)
config = RawConfigParser()
config.add_section('config')
config.set('config', 'loop_songs', 'true')
config.set('config', 'randomise', 'true')
config.set('config', 'index_dir', os.path.join(random_music_home,
"indicies"))
music_client = DEFAULT_MUSIC_CLIENT
while not which(music_client):
music_client = raw_input("The music player '%s' could not be found "
"on your path. Please input a different "
"music player:" % music_client)
config.set('config', 'music_client', music_client)
user_music_dirs = ""
while not all([os.path.isdir(d) for d in user_music_dirs.split(",")]):
user_music_dirs = raw_input("Input a csv list of full paths to "
"your music dirs:")
config.set('config', 'music_dirs', user_music_dirs)
with open(config_file, 'wb') as fh:
config.write(fh)
开发者ID:caoilteguiry,项目名称:random_music,代码行数:34,代码来源:random_music.py
示例16: on_openvpn_connected
def on_openvpn_connected(self):
if onWindows:
# For unknown reason, it can take quite some time before
# "explorer \\10.18.0.1" works properly after connecting.
# We poll using "net view \\SMB_HOSTNAME" until we're connected.
# TODO is there a way to speed this up?
while True:
l.info("calling 'net view'")
# WTF subprocess.Popen behaves differently than
# subprocess.call. This difference only occurs with
# the custom startupinfo.
pipe = subprocess.Popen(['net', 'view', '\\\\' + SMB_HOSTNAME],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
startupinfo=subprocess_sui)
out, err = pipe.communicate()
if pipe.returncode in (0, 2):
break
l.debug("returncode: %s", pipe.returncode)
l.debug("Message: %s", repr(err))
time.sleep(0.5)
if self.quiting:
return
self.set_state(STATE_CONNECTED)
if not onWindows and which('gvfs-mount') is not None:
subprocess.call(['gvfs-mount', 'smb://'+ SMB_HOSTNAME])
if self.open_files_on_connection:
self._show_files()
self.open_files_on_connection = False
开发者ID:karpenoktem,项目名称:uiltje,代码行数:29,代码来源:main.py
示例17: get_systemheader
def get_systemheader():
options = settings.get_settings()
fn = utils.which(
"header.ps", list(options.user_data_dirs) + [SYSTEM_DATA_DIR])
if fn:
return open(fn).read()
return "%%\%% System header %s not found!\n%%" % fn
开发者ID:IamMomotaros,项目名称:grailbrowser,代码行数:7,代码来源:PSStream.py
示例18: getHash
def getHash(fileName, pluginName):
"""Given a valid fileName it returns a string containing a md5sum
of the file content. If we are running on a system which prelink
binaries (aka RedHat based) the command prelink must be on the PATH"""
global _isPrelink
if _isPrelink == None:
#first execution let's check for prelink
_isPrelink = utils.which("prelink")
if _isPrelink == None:
_isPrelink = ""
else:
print "Using: ", _isPrelink
if pluginName == 'ELF' and len(_isPrelink) > 0:
#let's use prelink for the md5sum
#TODO what if isPrelink fails
(temp, returncode) = utils.getOutputAsList([_isPrelink, '-y', '--md5', fileName])
if returncode == 0:
return temp[0].split()[0]
else:
#undoing prelinking failed for some reasons
pass
try:
#ok let's do standard md5sum
fd=open(fileName)
md=md5()
md.update(fd.read())
fd.close()
return md.hexdigest()
except IOError:
#file not found
return None
开发者ID:neurodebian,项目名称:FingerPrint,代码行数:31,代码来源:sergeant.py
示例19: run
def run(self, action):
icv = InteractiveConfigValidation()
if action == 'pinging':
icv.run(require_domain=True)
else:
icv.run()
config = icv.get()
if action == "pinging":
print "In a second you will be presented with your default text editor. Copy text below and paste it at the bottom of that file, save and exit:"
print ""
print "@reboot %s pinging start" % (which('pd'),)
print ""
print "Copy the line and press enter to continue."
raw_input()
subprocess.call("crontab -e", shell=True)
print "Good job! If all went well pinging will now start automatically."
elif action == 'autohosts':
ensure_sudo()
cron_file = "/etc/cron.d/pd_etchosts"
cron_contents = "HOME=/home/%s\n*/1 * * * * root %s etchosts" % (getlogin(), which('pd'),)
def writeback():
with open(cron_file, "w+") as f:
f.write(cron_contents)
f.write('\n')
print "The following lines are going to be added to %s:\n\n%s\n" % (cron_file, cron_contents)
_, agreed = icv.ask_action("Do you wish to continue?", writeback)
if not agreed:
print 'Aborting.'
else:
print 'Done.'
开发者ID:nivwusquorum,项目名称:private-domains,代码行数:33,代码来源:methods.py
示例20: _multi_part_put
def _multi_part_put(self, file_path, destination_path, file_size):
"""
Uploads file in chunks using Swift Tool (st) command
http://bazaar.launchpad.net/~hudson-openstack/swift/1.2/view/head:/bin/st
"""
logger.info("RackspaceCloudFilesTarget: Starting multi-part put "
"for %s " % file_path)
# calculate chunk size
# split into 10 chunks if possible
chunk_size = int(file_size / 10)
if chunk_size > MAX_SPLIT_SIZE:
chunk_size = MAX_SPLIT_SIZE
st_exe = which("st")
st_command = [
st_exe,
"-A", "https://auth.api.rackspacecloud.com/v1.0",
"-U", self.username,
"-K", self.api_key,
"upload",
"--segment-size", str(chunk_size),
self.container_name, destination_path
]
logger.info("RackspaceCloudFilesTarget: Executing command: %s" %
" ".join(st_command))
working_dir = os.path.dirname(file_path)
execute_command(st_command, cwd=working_dir)
logger.info("RackspaceCloudFilesTarget: Multi-part put for %s "
"completed successfully!" % file_path)
开发者ID:gregbanks,项目名称:mongodb-backup-system,代码行数:32,代码来源:target.py
注:本文中的utils.which函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论