本文整理汇总了Python中shutil.copyfile函数的典型用法代码示例。如果您正苦于以下问题:Python copyfile函数的具体用法?Python copyfile怎么用?Python copyfile使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了copyfile函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: execute
def execute(self):
settings = QSettings()
lastDir = settings.value('Processing/lastModelsDir', '')
filename = QFileDialog.getOpenFileName(self.toolbox,
self.tr('Open model', 'AddModelFromFileAction'), lastDir,
self.tr('Processing model files (*.model *.MODEL)', 'AddModelFromFileAction'))
if filename:
try:
settings.setValue('Processing/lastModelsDir',
QFileInfo(filename).absoluteDir().absolutePath())
ModelerAlgorithm.fromFile(filename)
except WrongModelException:
QMessageBox.warning(
self.toolbox,
self.tr('Error reading model', 'AddModelFromFileAction'),
self.tr('The selected file does not contain a valid model', 'AddModelFromFileAction'))
return
except:
QMessageBox.warning(self.toolbox,
self.tr('Error reading model', 'AddModelFromFileAction'),
self.tr('Cannot read file', 'AddModelFromFileAction'))
return
destFilename = os.path.join(ModelerUtils.modelsFolder(), os.path.basename(filename))
shutil.copyfile(filename, destFilename)
self.toolbox.updateProvider('model')
开发者ID:Antoviscomi,项目名称:QGIS,代码行数:26,代码来源:AddModelFromFileAction.py
示例2: test_absent
def test_absent(self):
'''
ssh_known_hosts.absent
'''
known_hosts = os.path.join(integration.FILES, 'ssh', 'known_hosts')
shutil.copyfile(known_hosts, KNOWN_HOSTS)
if not os.path.isfile(KNOWN_HOSTS):
self.skipTest(
'Unable to copy {0} to {1}'.format(
known_hosts, KNOWN_HOSTS
)
)
kwargs = {'name': 'github.com', 'user': 'root', 'config': KNOWN_HOSTS}
# test first
ret = self.run_state('ssh_known_hosts.absent', test=True, **kwargs)
self.assertSaltNoneReturn(ret)
# remove once, the key is gone
ret = self.run_state('ssh_known_hosts.absent', **kwargs)
self.assertSaltStateChangesEqual(
ret, GITHUB_FINGERPRINT, keys=('old', 'fingerprint')
)
# remove twice, nothing has changed
ret = self.run_state('ssh_known_hosts.absent', **kwargs)
self.assertSaltStateChangesEqual(ret, {})
# test again
ret = self.run_state('ssh_known_hosts.absent', test=True, **kwargs)
self.assertSaltNoneReturn(ret)
开发者ID:jaypei,项目名称:salt,代码行数:31,代码来源:ssh.py
示例3: UpdateVersionFile
def UpdateVersionFile(self, message, dry_run, push_to=None):
"""Update the version file with our current version."""
if not self.version_file:
raise VersionUpdateException('Cannot call UpdateVersionFile without '
'an associated version_file')
components = (('CHROMEOS_BUILD', self.build_number),
('CHROMEOS_BRANCH', self.branch_build_number),
('CHROMEOS_PATCH', self.patch_number),
('CHROME_BRANCH', self.chrome_branch))
with tempfile.NamedTemporaryFile(prefix='mvp') as temp_fh:
with open(self.version_file, 'r') as source_version_fh:
for line in source_version_fh:
for key, value in components:
line = re.sub(self.KEY_VALUE_PATTERN % (key,),
'%s=%s\n' % (key, value), line)
temp_fh.write(line)
temp_fh.flush()
repo_dir = os.path.dirname(self.version_file)
try:
git.CreateBranch(repo_dir, PUSH_BRANCH)
shutil.copyfile(temp_fh.name, self.version_file)
_PushGitChanges(repo_dir, message, dry_run=dry_run, push_to=push_to)
finally:
# Update to the remote version that contains our changes. This is needed
# to ensure that we don't build a release using a local commit.
git.CleanAndCheckoutUpstream(repo_dir)
开发者ID:bpsinc-native,项目名称:src_third_party_chromite,代码行数:32,代码来源:manifest_version.py
示例4: _copy_contents
def _copy_contents(dst_dir, contents):
items = {"dirs": set(), "files": set()}
for path in contents:
if isdir(path):
items['dirs'].add(path)
elif isfile(path):
items['files'].add(path)
dst_dir_name = basename(dst_dir)
if dst_dir_name == "src" and len(items['dirs']) == 1:
copytree(list(items['dirs']).pop(), dst_dir, symlinks=True)
else:
makedirs(dst_dir)
for d in items['dirs']:
copytree(d, join(dst_dir, basename(d)), symlinks=True)
if not items['files']:
return
if dst_dir_name == "lib":
dst_dir = join(dst_dir, mkdtemp(dir=dst_dir))
for f in items['files']:
copyfile(f, join(dst_dir, basename(f)))
开发者ID:artynet,项目名称:platformio,代码行数:26,代码来源:ci.py
示例5: run_merge
def run_merge(filenames):
"""Merges all Skype databases to a new database."""
dbs = [skypedata.SkypeDatabase(f) for f in filenames]
db_base = dbs.pop()
counts = collections.defaultdict(lambda: collections.defaultdict(int))
postbacks = Queue.Queue()
postfunc = lambda r: postbacks.put(r)
worker = workers.MergeThread(postfunc)
name, ext = os.path.splitext(os.path.split(db_base.filename)[-1])
now = datetime.datetime.now().strftime("%Y%m%d")
filename_final = util.unique_path("%s.merged.%s%s" % (name, now, ext))
print("Creating %s, using %s as base." % (filename_final, db_base))
shutil.copyfile(db_base.filename, filename_final)
db2 = skypedata.SkypeDatabase(filename_final)
chats2 = db2.get_conversations()
db2.get_conversations_stats(chats2)
for db1 in dbs:
chats = db1.get_conversations()
db1.get_conversations_stats(chats)
bar_total = sum(c["message_count"] for c in chats)
bar_text = " Processing %.*s.." % (30, db1)
bar = ProgressBar(max=bar_total, afterword=bar_text)
bar.start()
args = {"db1": db1, "db2": db2, "chats": chats,
"type": "diff_merge_left"}
worker.work(args)
while True:
result = postbacks.get()
if "error" in result:
print("Error merging %s:\n\n%s" % (db1, result["error"]))
worker = None # Signal for global break
break # break while True
if "done" in result:
break # break while True
if "diff" in result:
counts[db1]["chats"] += 1
counts[db1]["msgs"] += len(result["diff"]["messages"])
msgcounts = sum(c["message_count"] for c in result["chats"])
bar.update(bar.value + msgcounts)
if result["output"]:
log(result["output"])
if not worker:
break # break for db1 in dbs
bar.stop()
bar.afterword = " Processed %s." % db1
bar.update(bar_total)
print
if not counts:
print("Nothing new to merge.")
db2.close()
os.unlink(filename_final)
else:
for db1 in dbs:
print("Merged %s in %s from %s." %
(util.plural("message", counts[db1]["msgs"]),
util.plural("chat", counts[db1]["chats"]), db1))
print("Merge into %s complete." % db2)
开发者ID:barneycarroll,项目名称:Skyperious,代码行数:60,代码来源:main.py
示例6: test_upgrade_pstate_files
def test_upgrade_pstate_files(self):
"""
Test whether the existing pstate files are correctly updated to 7.1.
"""
os.makedirs(os.path.join(self.state_dir, STATEDIR_DLPSTATE_DIR))
# Copy an old pstate file
src_path = os.path.join(self.CONFIG_PATH, "download_pstate_70.state")
shutil.copyfile(src_path, os.path.join(self.state_dir, STATEDIR_DLPSTATE_DIR, "download.state"))
# Copy a corrupt pstate file
src_path = os.path.join(self.CONFIG_PATH, "download_pstate_70_corrupt.state")
corrupt_dest_path = os.path.join(self.state_dir, STATEDIR_DLPSTATE_DIR, "downloadcorrupt.state")
shutil.copyfile(src_path, corrupt_dest_path)
old_config = RawConfigParser()
old_config.read(os.path.join(self.CONFIG_PATH, "tribler70.conf"))
convert_config_to_tribler71(old_config, state_dir=self.state_dir)
# Verify whether the section is correctly renamed
download_config = RawConfigParser()
download_config.read(os.path.join(self.state_dir, STATEDIR_DLPSTATE_DIR, "download.state"))
self.assertTrue(download_config.has_section("download_defaults"))
self.assertFalse(download_config.has_section("downloadconfig"))
self.assertFalse(os.path.exists(corrupt_dest_path))
# Do the upgrade again, it should not fail
convert_config_to_tribler71(old_config, state_dir=self.state_dir)
开发者ID:Tribler,项目名称:tribler,代码行数:28,代码来源:test_config_upgrade_70_71.py
示例7: cpMCNPproject
def cpMCNPproject(directory):
wkdir=getcwd()
if checkifMCNPproject(directory,1)==1:
return 1
elif checkifMCNPproject(wkdir,2)==2:
return 2
else:
cards = [ path.join(directory,"cards/parameters.part"),
path.join(directory,"cards/materials.part"),
path.join(directory,"cards/source.part"),
path.join(directory,"cards/tallies.part"),
path.join(directory,"cards/traslations.part")]
geom = [ path.join(directory,"geom/cells.part"),
path.join(directory,"geom/surfaces.part")]
for card in cards:
try:
copyfile(card, path.join(wkdir, "cards/",path.basename(card)))
except Exception as e:
print "\n\033[1;34mMCNPmanager cp error:\033[1;32m %s \033[0m\n" % (e)
for g in geom:
try:
copyfile(g, path.join(wkdir, "geom/",path.basename(g)))
except Exception as e:
print "\n\033[1;34mMCNPmanager cp error:\033[1;32m %s \033[0m\n" % (e)
return 0
开发者ID:ipostuma,项目名称:PyMCNPmanager,代码行数:26,代码来源:MCNPmanager.py
示例8: write_hash_manifests
def write_hash_manifests(self):
if not self.manifests_updated:
return False
today = datetime.datetime.strftime(
datetime.datetime.now(), "%Y%m%d%H%M%S")
for alg in set(self.algorithms):
manifest_path = os.path.join(self.path, 'manifest-{}.txt'.format(alg))
copy_manifest_path = os.path.join(self.path, 'manifest-{}-{}.old'.format(alg, today))
try:
shutil.copyfile(manifest_path, copy_manifest_path)
except:
LOGGER.error("Do not have permission to write new manifests")
else:
self.add_premisevent(process = "Copy Bag Manifest",
msg = "{} copied to {} before writing new manifest".format(
os.path.basename(manifest_path),
os.path.basename(copy_manifest_path)),
outcome = "Pass", sw_agent = sys._getframe().f_code.co_name)
try:
with open(manifest_path, 'w') as manifest:
for payload_file, hashes in self.entries.items():
if payload_file.startswith("data" + os.sep):
manifest.write("{} {}\n".format(hashes[alg], bagit._encode_filename(payload_file)))
except:
LOGGER.error("Do not have permission to overwrite hash manifests")
else:
LOGGER.info("{} written".format(manifest_path))
self.add_premisevent(process = "Write Bag Manifest",
msg = "{} written as a result of new or updated payload files".format(
os.path.basename(manifest_path)),
outcome = "Pass", sw_agent = sys._getframe().f_code.co_name)
return True
开发者ID:NYPL,项目名称:ami-tools,代码行数:35,代码来源:update_bag.py
示例9: storeFile
def storeFile(tmpFile, copyLocation, symLocation):
shutil.copyfile(tmpFile, copyLocation)
try:
os.remove(symLocation)
except:
pass
os.symlink(copyLocation, symLocation)
开发者ID:Michael-AU,项目名称:autotesting,代码行数:7,代码来源:autotesting.py
示例10: select_gcov_files_from_stdout
def select_gcov_files_from_stdout(out, gcov_filter, gcov_exclude, logger, chdir, tempdir):
active_files = []
all_files = []
for line in out.splitlines():
found = output_re.search(line.strip())
if found is None:
continue
fname = found.group(1)
full = os.path.join(chdir, fname)
all_files.append(full)
filtered, excluded = apply_filter_include_exclude(
fname, gcov_filter, gcov_exclude)
if filtered:
logger.verbose_msg("Filtering gcov file {}", fname)
continue
if excluded:
logger.verbose_msg("Excluding gcov file {}", fname)
continue
if tempdir and tempdir != chdir:
import shutil
active_files.append(os.path.join(tempdir, fname))
shutil.copyfile(full, active_files[-1])
else:
active_files.append(full)
return active_files, all_files
开发者ID:gcovr,项目名称:gcovr,代码行数:32,代码来源:gcov.py
示例11: __init__
def __init__(self, db_session, db_migrate, sql_connection,
sqlite_db, sqlite_clean_db):
self.sql_connection = sql_connection
self.sqlite_db = sqlite_db
self.sqlite_clean_db = sqlite_clean_db
self.engine = db_session.get_engine()
self.engine.dispose()
conn = self.engine.connect()
if sql_connection == "sqlite://":
if db_migrate.db_version() > db_migrate.db_initial_version():
return
else:
testdb = os.path.join(CONF.state_path, sqlite_db)
if os.path.exists(testdb):
return
db_migrate.db_sync()
# self.post_migrations()
if sql_connection == "sqlite://":
conn = self.engine.connect()
self._DB = "".join(line for line in conn.connection.iterdump())
self.engine.dispose()
else:
cleandb = os.path.join(CONF.state_path, sqlite_clean_db)
shutil.copyfile(testdb, cleandb)
开发者ID:chadlung,项目名称:cinder,代码行数:25,代码来源:test.py
示例12: makeTemp
def makeTemp(self, *filenames):
tmp = self.mktemp()
os.mkdir(tmp)
for filename in filenames:
tmpFile = os.path.join(tmp, filename)
shutil.copyfile(sp(filename), tmpFile)
return tmp
开发者ID:0004c,项目名称:VTK,代码行数:7,代码来源:test_lore.py
示例13: main
def main():
feature_file_path1 = "feature/test_app/"
feature_file_path2 = "feature/test_app_new/"
trg_file_path = "feature/cleaned_test_app/"
with con:
cur = con.cursor()
sql = "select permalink, next_round from bayarea_post2012_fewer4;"
cur.execute(sql)
results = cur.fetchall()
for result in results:
permalink = result[0]
next_round = result[1]
file_name = permalink + "_next_" + next_round + ".csv"
file1 = feature_file_path1 + file_name
file2 = feature_file_path2 + file_name
target = trg_file_path + file_name
if os.path.exists(file2):
shutil.copyfile(file2, target)
elif os.path.exists(file1):
shutil.copyfile(file1, target)
开发者ID:XinCindyChen,项目名称:Fuel-Your-Startup,代码行数:27,代码来源:pick_test_app.py
示例14: estimatePloidy
def estimatePloidy(tmpdir, workdir, snpSegfile):
"""
Runs extract_cnv.R, bedtools intersect, and base_cnv.R.
extract_cnv.R expects cnv.result<ploidy> and outputs cnv<ploidy>
bedtools then intersects the cnv<ploidy> file with the snpSegfile created in segmentRatio
base_cnv uses the intersectfiles to determine the correct ploidy, which it writes to a file named ploidy
The corresponding file is then moved to the working directory
"""
rScriptName = os.path.join(scriptPath,"extract_cnv.R")
subprocess.check_call(['Rscript', rScriptName, tmpdir])
for i in ["2", "3", "4"]:
cnvfile = os.path.join(tmpdir, 'cnv' + i)
outfile = os.path.join(tmpdir, 'cnv' + i + "_baf.txt")
with open(outfile, 'w') as o:
subprocess.check_call([
'bedtools', 'intersect',
'-a', snpSegfile,
'-b', cnvfile,
'-wb'
], stdout = o)
rScriptName = os.path.join(scriptPath,"base_cnv.R")
subprocess.check_call(['Rscript', rScriptName, tmpdir, workdir])
# now move the cnv results with the selected ploidy to the output file
ploidy=open(os.path.join(workdir, "ploidy")).readline().strip()
shutil.copyfile(os.path.join(tmpdir, "cnv.result" + ploidy), os.path.join(workdir, "cnv.result"))
开发者ID:Jeltje,项目名称:adtex,代码行数:27,代码来源:ADTEx.py
示例15: initialize
def initialize(self, test, log):
'''Does the init part of the test
1.Finds initial count of entry in log
2.Creates a file 'cron' under cron.d
3.Backs up /etc/crontab
4.Modifies /etc/crontab '''
self.log = log
self.initial_count = self.count_log('Cron automation')
f = open('/etc/cron.d/cron', 'w')
f.write('''#!/bin/bash
touch %s
echo 'Cron automation' >> %s
''' % (self.log, self.log))
f.close()
utils.system('chmod +x /etc/cron.d/cron')
shutil.copyfile('/etc/crontab', '/tmp/backup')
f = open('/etc/crontab', 'w')
f.write('* * * * * root run-parts /etc/cron.d/\n')
f.close()
if test == 'deny_cron':
if os.path.exists('/etc/cron.d/jobs.deny'):
shutil.move('/etc/cron.d/jobs.deny', '/tmp/jobs.deny')
f = open('/etc/cron.d/jobs.deny', 'w')
f.write('cron')
f.close()
elif test == 'allow_cron' :
os.remove('/etc/cron.d/jobs.deny')
if os.path.exists('/etc/cron.d/jobs.allow'):
shutil.move('/etc/cron.d/jobs.allow', '/tmp/jobs.allow')
f = open('/etc/cron.d/jobs.allow', 'w')
f.write('cron')
f.close()
开发者ID:dev-priya,项目名称:autotest-client-tests,代码行数:33,代码来源:crontab.py
示例16: write_oplog_progress
def write_oplog_progress(self):
""" Writes oplog progress to file provided by user
"""
if self.oplog_checkpoint is None:
return None
with self.oplog_progress as oplog_prog:
oplog_dict = oplog_prog.get_dict()
items = [[name, util.bson_ts_to_long(oplog_dict[name])]
for name in oplog_dict]
if not items:
return
# write to temp file
backup_file = self.oplog_checkpoint + '.backup'
os.rename(self.oplog_checkpoint, backup_file)
# for each of the threads write to file
with open(self.oplog_checkpoint, 'w') as dest:
if len(items) == 1:
# Write 1-dimensional array, as in previous versions.
json_str = json.dumps(items[0])
else:
# Write a 2d array to support sharded clusters.
json_str = json.dumps(items)
try:
dest.write(json_str)
except IOError:
# Basically wipe the file, copy from backup
dest.truncate()
with open(backup_file, 'r') as backup:
shutil.copyfile(backup, dest)
os.remove(backup_file)
开发者ID:boxrice007,项目名称:mongo-connector,代码行数:35,代码来源:connector.py
示例17: create_labelled_dataset
def create_labelled_dataset(source_directory, destination_directory):
#a random number appended at the end of the file names at the destination
sample_no = 1
file_paths = []
#check if the destination directory exists or not, create a new one
if not os.path.exists(destination_directory):
os.makedirs(destination_directory)
#read inside directories recursively
#root stores the absolute path, dirs - directories and files - name of the files
for root, dirs, files in os.walk(source_directory):
print "Traversing ____________", root
for _file in files:
file_path = root + "/" + _file
class_name = str(_file)[:3]
dest_file_path = destination_directory + "/" + class_name
#check if the class labeled directory exists or not, else create one
if not os.path.exists(dest_file_path):
os.makedirs(dest_file_path)
sample_no += 1
#copy the file from the source to the destination class labelled directory
shutil.copyfile(file_path, dest_file_path + "/" + class_name + \
str(sample_no) + ".tiff")
开发者ID:SiddharthaAnand,项目名称:handwriting_recognition,代码行数:29,代码来源:image_processing.py
示例18: saveDocsInfo
def saveDocsInfo(part_id, helper, corpus, doc_ids, token_doc_file_name, doc_id_to_text):
""" Сохраняем токены и отборажение токен->документ. """
is_first_doc = True
for doc_id in doc_ids:
# Сохраняем для документа его исходный текст
source_doc_path = os.path.join(doc_id_to_text[doc_id])
target_doc_path = os.path.join(helper.get_output_dir_path(part_id), str(doc_id) + ".txt")
copyfile(source_doc_path, target_doc_path)
try:
doc_tokens = list(corpus.get_document(doc_id).tokens.values())
except:
print("Bad document id: " + str(doc_id))
continue
doc_tokens.sort(key = lambda x: x.pos)
with open(os.path.join(helper.get_output_dir_path(part_id), str(doc_id) + ".tokens"), "w",
encoding="utf-8") as token_file:
for token in doc_tokens:
token_file.write(str(token.id) + " ")
token_file.write(str(token.pos) + " ")
token_file.write(str(token.length) + " ")
token_file.write(str(token.text) + "\n")
file_mode = "w"
if is_first_doc:
is_first_doc = False
else:
file_mode = "a"
with open(token_doc_file_name, file_mode, encoding="utf-8") as token_doc_file:
for token in doc_tokens:
token_doc_file.write(str(token.id) + " ")
token_doc_file.write(str(doc_id) + "\n")
开发者ID:VovaMind,项目名称:opencorpora,代码行数:30,代码来源:get_corpora_features.py
示例19: download
def download(self, cameras, path):
left_dir = os.path.join(path, 'left')
right_dir = os.path.join(path, 'right')
target_dir = os.path.join(path, 'raw')
if not os.path.exists(target_dir):
os.mkdir(target_dir)
left_pages = [os.path.join(left_dir, x)
for x in sorted(os.listdir(left_dir))]
right_pages = [os.path.join(right_dir, x)
for x in sorted(os.listdir(right_dir))]
# Write the orientation as a JPEG comment to the end of the file
if len(left_pages) != len(right_pages):
logger.warn("The left and right camera produced an inequal"
" amount of images, please fix the problem!")
logger.warn("Will not combine images")
return
if (self.config['first_page']
and not self.config['first_page'].get(str) == 'left'):
combined_pages = reduce(operator.add, zip(right_pages, left_pages))
else:
combined_pages = reduce(operator.add, zip(left_pages, right_pages))
logger.info("Combining images.")
for idx, fname in enumerate(combined_pages):
fext = os.path.splitext(os.path.split(fname)[1])[1]
target_file = os.path.join(target_dir, "{0:04d}{1}"
.format(idx, fext))
shutil.copyfile(fname, target_file)
shutil.rmtree(right_dir)
shutil.rmtree(left_dir)
开发者ID:Josuex09,项目名称:spreads,代码行数:29,代码来源:combine.py
示例20: save_db
def save_db(self, userpath):
# create the folder to save it by profile
relative_path = constant.folder_name + os.sep + 'firefox'
if not os.path.exists(relative_path):
os.makedirs(relative_path)
relative_path += os.sep + os.path.basename(userpath)
if not os.path.exists(relative_path):
os.makedirs(relative_path)
# Get the database name
if os.path.exists(userpath + os.sep + 'logins.json'):
dbname = 'logins.json'
elif os.path.exists(userpath + os.sep + 'signons.sqlite'):
dbname = 'signons.sqlite'
# copy the files (database + key3.db)
try:
ori_db = userpath + os.sep + dbname
dst_db = relative_path + os.sep + dbname
shutil.copyfile(ori_db, dst_db)
print_debug('INFO', '%s has been copied here: %s' % (dbname, dst_db))
except Exception,e:
print_debug('DEBUG', '{0}'.format(e))
print_debug('ERROR', '%s has not been copied' % dbname)
开发者ID:0ps,项目名称:LaZagne,代码行数:26,代码来源:mozilla.py
注:本文中的shutil.copyfile函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论