本文整理汇总了Python中utils.read_file函数的典型用法代码示例。如果您正苦于以下问题:Python read_file函数的具体用法?Python read_file怎么用?Python read_file使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了read_file函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: process
def process(arg):
global not_deleted_list, update_time
curs = _connect.cursor()
res = curs.execute("SELECT BookId FROM libbook WHERE NOT (Deleted&1) and FileType = 'fb2' ")
not_deleted_list = curs.fetchall()
not_deleted_list = set([i[0] for i in not_deleted_list])
curs.execute('SELECT * FROM librusec')
update_time = curs.fetchone()[0]
for fn in walk(arg):
for ftype, z_filename, data in read_file(fn, zip_charset='utf-8'):
process_file(fn, ftype, z_filename, data)
if options.search_deleted:
deleted = set()
for fn in walk(options.search_deleted):
bookid = base_name(fn, '.fb2.zip')
try:
bookid = int(bookid)
except ValueError:
continue
if bookid in not_deleted_list:
deleted.append(fn)
for fn in deleted:
for ftype, z_filename, data in read_file(fn, zip_charset='utf-8'):
ret = process_file(fn, ftype, z_filename, data)
if ret:
print_log('restore deleted:', bookid)
print
print 'processed:', stats.total
print 'passed:', stats.passed
print 'fixed:', stats.fixed
print 'errors:', stats.errors
if options.not_found:
fd = open(options.not_found, 'w')
for bookid in not_deleted_list:
print >> fd, bookid
开发者ID:jn0,项目名称:fb2utils,代码行数:35,代码来源:update_librusec.py
示例2: __init__
def __init__(self):
'''
Constructor
'''
self.head_name = read_file(HEAD_PATH).strip('\n').rsplit('/', 1)[-1]
self.head_path = os.path.join(REF_HEADS_DIR, self.head_name)
self.head_commit = read_file(self.head_path).strip() if os.path.exists(self.head_path) else None
开发者ID:Oloshka,项目名称:git-in-python,代码行数:7,代码来源:branch.py
示例3: commit
def commit(self, msg, ref="HEAD"):
cur_tree = self.index.do_commit(self.workspace)
branch_name = read_file(os.path.join(self.workspace, ".git", "HEAD")).strip("\n").rsplit("/", 1)[-1]
ref_path = os.path.join(self.workspace, ".git", "refs", "heads", branch_name)
parent_sha1 = None
if os.path.exists(ref_path):
parent_sha1 = read_file(ref_path)
committer_name = self.config.config_dict["user"]["name"]
committer_email = "<%s>" % (self.config.config_dict["user"]["email"])
commit_time = int(time.time())
# TO FIX
commit_timezone = time.strftime("%z", time.gmtime())
commit = Commit(
self.workspace,
tree_sha1=cur_tree.sha1,
parent_sha1=parent_sha1,
name=committer_name,
email=committer_email,
timestamp=commit_time,
timezone=commit_timezone,
msg=msg,
)
write_object_to_file(commit.path, commit.content)
write_to_file(ref_path, commit.sha1)
开发者ID:ningg,项目名称:git-in-python,代码行数:26,代码来源:repository.py
示例4: like_recs
def like_recs():
counter = 0
try:
while counter < 20:
results = get_recs()
liked = utils.read_file("liked")
instagrams = utils.read_file("/Instagram/instagrams")
for i in results:
time.sleep(1)
link = 'https://api.gotinder.com/like/{0}'.format(i["_id"])
liking_header = {'X-Auth-Token': tinder_token,
'Authorization': 'Token token="{0}"'.format(tinder_token).encode('ascii', 'ignore'),
'firstPhotoID': ''+str(i['photos'][0]['id'])
}
likereq = requests.get(link, headers = liking_header)
print i['name'] + ' - ' + i['_id']
print 'status: ' + str(likereq.status_code) + ' text: ' + str(likereq.text)
liked += str(i['name']) + ' - ' + str(i['_id']) + ' - ' + str(i['photos'][0]['url']) + '\n'
try:
if 'instagram' in i:
instagrams+= str(i['instagram']['username'] + " ")
else:
print "no instagram mate soz"
except KeyError as ex:
print 'nah mate'
#print "photoid " + str(i['photos'][0]['id'])
utils.write_file("liked", liked)
utils.write_file("/Instagram/instagrams", instagrams)
counter += 1
except Exception as ex:
print "hit an exception i guess"
print ex
开发者ID:simion96,项目名称:tinder,代码行数:33,代码来源:main.py
示例5: test_bcache_status
def test_bcache_status(self):
succ, dev = BlockDev.kbd_bcache_create(self.loop_dev, self.loop_dev2, None)
self.assertTrue(succ)
self.assertTrue(dev)
self.bcache_dev = dev
_wait_for_bcache_setup(dev)
# should work with both "bcacheX" and "/dev/bcacheX"
status = BlockDev.kbd_bcache_status(self.bcache_dev)
self.assertTrue(status)
status = BlockDev.kbd_bcache_status("/dev/" + self.bcache_dev)
self.assertTrue(status)
# check some basic values
self.assertTrue(status.state)
sys_state = read_file("/sys/block/%s/bcache/state" % self.bcache_dev).strip()
self.assertEqual(status.state, sys_state)
sys_block = read_file("/sys/block/%s/bcache/cache/block_size" % self.bcache_dev).strip()
self.assertEqual(status.block_size, int(bytesize.Size(sys_block)))
sys_size = self._get_size(self.bcache_dev)
self.assertGreater(status.cache_size, sys_size)
succ = BlockDev.kbd_bcache_destroy(self.bcache_dev)
self.assertTrue(succ)
self.bcache_dev = None
time.sleep(1)
wipe_all(self.loop_dev, self.loop_dev2)
开发者ID:rhinstaller,项目名称:libblockdev,代码行数:29,代码来源:kbd_test.py
示例6: main
def main():
FILE_NAME = 'rates_to_watch.pkl'
EMAIL = ''
PASSWORD = ''
arg_count = len(sys.argv)
# Load trackers and record new rates to them.
if arg_count == 1:
# Check if tracking file exists.
if os.path.isfile(FILE_NAME):
rates_to_watch = utils.read_file(FILE_NAME)
for rate in rates_to_watch:
rate.add_rate(grab_rate(rate.get_currencies()))
utils.write_file(FILE_NAME, rates_to_watch)
report = generate_report(rates_to_watch)
utils.send_email('Exchange Rate Report', report,
EMAIL, EMAIL, PASSWORD)
# Tracking file doesn't exist, tell user to add trackers.
else:
print("Error: No currencies are being tracked.")
print("Please run the following command:")
print("python currency_report.py CURRENCY1 CURRENCY2")
print("eg. python currency_report.py GBP JPY")
# Create new currency tracker.
elif arg_count == 3:
__, currency_1, currency_2 = sys.argv
with open('currencies.txt') as file:
valid_currencies = file.read()
# Check if currencies are valid.
if currency_1 in valid_currencies and currency_1 in valid_currencies:
currencies = (currency_1, currency_2)
new_tracker = trackers.CurrencyTracker(currencies,
grab_rate(currencies))
# Edit existing tracker file.
if os.path.isfile(FILE_NAME):
rates_to_watch = utils.read_file(FILE_NAME)
rates_to_watch.append(new_tracker)
utils.write_file(FILE_NAME, rates_to_watch)
# Create new tracker file.
else:
rates_to_watch = [new_tracker]
utils.write_file(FILE_NAME, rates_to_watch)
else:
print("Error: Invalid currency codes.")
else:
print("Error: Invalid number of arguments. {count}"
"argument(s).".format(count=arg_count))
开发者ID:jamalmoir,项目名称:currency_report,代码行数:57,代码来源:reporting.py
示例7: verify_gpg_signature
def verify_gpg_signature(detached_file, signature_file, key_file):
signature = read_file(signature_file, binary=True)
#not generic but ok if the signature is generated in linux
#this is to avoid the signature to be misinterpreted when parsed in another OS
signature = signature.replace('\n', os.linesep)
key = read_file(key_file, binary=True)
message = read_file(detached_file, binary=True)
result = verify_str(signature, key, detached=message)
return result == message
开发者ID:Karho,项目名称:mint4win,代码行数:9,代码来源:signature.py
示例8: test_rewrite_exists
def test_rewrite_exists(self):
"""Test rewrite() with an existing file"""
with utils.TempDir() as tmpdir:
fname = os.path.join(tmpdir, 'fname')
utils.write_file(fname, 'foo')
python_tools.rewrite(fname, 'foo')
self.assertEqual(utils.read_file(fname), 'foo')
python_tools.rewrite(fname, 'bar')
self.assertEqual(utils.read_file(fname), 'bar')
python_tools.rewrite(fname, 'foo', verbose=True)
self.assertEqual(utils.read_file(fname), 'foo')
开发者ID:salilab,项目名称:developer_tools,代码行数:11,代码来源:test_python_tools.py
示例9: initialize_for_query
def initialize_for_query(self, genre_file, movie_file, actors_file):
print 'reading genre list...'
self.genre_dicts = utils.read_file(genre_file)
for genre in self.genre_dicts:
self.genres += genre.keys()[0]
self.genres += '|'
print 'recording movie data...'
movies = utils.read_file(movie_file)
record_movies(movies)
print 'reading actors file...'
actors = utils.read_file(actors_file)
record_data(actors)
开发者ID:yohoadam,项目名称:actorsHITS-1,代码行数:14,代码来源:hits.py
示例10: like_recs_AI
def like_recs_AI():
try:
folder = "ladiesAI/"
extension = ".jpg"
counter = 0
#try:
while True:
results = get_recs()
liked = utils.read_file("liked")
instagrams = utils.read_file("/Instagram/instagrams")
for i in results:
time.sleep(1)
like = 'https://api.gotinder.com/like/{0}'.format(i["_id"])
dislike = 'https://api.gotinder.com/pass/{0}'.format(i["_id"])
liking_header = {'X-Auth-Token': tinder_token,
'Authorization': 'Token token="{0}"'.format(tinder_token).encode('ascii', 'ignore'),
'firstPhotoID': '' + str(i['photos'][0]['id'])
}
#img - tinder img url, path = path on physical device, urllib saves the picture
img = str(i['photos'][0]['url'])
path = folder+img[27:51]+extension
#print "image is " + str(i['photos'][0]['url'])
urllib.urlretrieve(str(i['photos'][0]['url']), path)
result = Predictor.predict(path)
print 'AIresult is: ' + result
if result == "G":
req = requests.get(like, headers = liking_header)
print 'status: ' + str(req.status_code) + ' text: ' + str(req.text)
elif result == "B":
req = requests.get(dislike, headers = liking_header)
liked += str(i['name']) + ' - ' + str(i['_id']) + ' - ' + str(i['photos'][0]['url']) + '\n'
print 'status: ' + str(req.status_code) + ' text: ' + str(req.text)
print i['name'] + ' - ' + i['_id']
try:
if 'instagram' in i:
instagrams += str(i['instagram']['username'] + " ")
else:
print "no instagram mate soz"
except KeyError as ex:
print 'nah mate'
# print "photoid " + str(i['photos'][0]['id'])
utils.write_file("liked", liked)
utils.write_file("/Instagram/instagrams", instagrams)
counter += 1
except Exception as ex:
print "hit an exception i guess"
print ex
开发者ID:simion96,项目名称:tinder,代码行数:50,代码来源:main.py
示例11: _zram_get_stats_new
def _zram_get_stats_new(self):
with _track_module_load(self, "zram", "_loaded_zram_module"):
self.assertTrue(BlockDev.kbd_zram_create_devices(1, [10 * 1024**2], [2]))
time.sleep(1)
# XXX: this needs to get more complex/serious
stats = BlockDev.kbd_zram_get_stats("zram0")
self.assertTrue(stats)
# /dev/zram0 should work too
stats = BlockDev.kbd_zram_get_stats("/dev/zram0")
self.assertTrue(stats)
self.assertEqual(stats.disksize, 10 * 1024**2)
# XXX: 'max_comp_streams' is currently broken on rawhide
# https://bugzilla.redhat.com/show_bug.cgi?id=1352567
# self.assertEqual(stats.max_comp_streams, 2)
self.assertTrue(stats.comp_algorithm)
# read 'num_reads' and 'num_writes' from '/sys/block/zram0/stat'
sys_stats = read_file("/sys/block/zram0/stat").strip().split()
self.assertGreaterEqual(len(sys_stats), 11) # 15 stats since 4.19
num_reads = int(sys_stats[0])
num_writes = int(sys_stats[4])
self.assertEqual(stats.num_reads, num_reads)
self.assertEqual(stats.num_writes, num_writes)
# read 'orig_data_size', 'compr_data_size', 'mem_used_total' and
# 'zero_pages' from '/sys/block/zram0/mm_stat'
sys_stats = read_file("/sys/block/zram0/mm_stat").strip().split()
self.assertGreaterEqual(len(sys_stats), 7) # since 4.18 we have 8 stats
orig_data_size = int(sys_stats[0])
compr_data_size = int(sys_stats[1])
mem_used_total = int(sys_stats[2])
zero_pages = int(sys_stats[5])
self.assertEqual(stats.orig_data_size, orig_data_size)
self.assertEqual(stats.compr_data_size, compr_data_size)
self.assertEqual(stats.mem_used_total, mem_used_total)
self.assertEqual(stats.zero_pages, zero_pages)
# read 'invalid_io' and 'num_writes' from '/sys/block/zram0/io_stat'
sys_stats = read_file("/sys/block/zram0/io_stat").strip().split()
self.assertEqual(len(sys_stats), 4)
invalid_io = int(sys_stats[2])
self.assertEqual(stats.invalid_io, invalid_io)
with _track_module_load(self, "zram", "_loaded_zram_module"):
self.assertTrue(BlockDev.kbd_zram_destroy_devices())
开发者ID:rhinstaller,项目名称:libblockdev,代码行数:48,代码来源:kbd_test.py
示例12: check_rss
def check_rss(self):
rss_cache_dir = config.cache_dir + os.sep + "rss"
newest_item_written = False
if config.rss_feeds:
self.rootlog.debug("rss feeds found:" + str(config.rss_feeds))
for name, feed in config.rss_feeds.items():
last_cache_item = utils.read_file(rss_cache_dir + os.sep + name)
f = feedparser.parse(feed)
self.rootlog.debug(str(f["channel"]["title"] + " feed fetched"))
if last_cache_item != None:
self.rootlog.debug("last_cache_item not None: " + last_cache_item)
for i in f["items"]:
if str(last_cache_item.strip()) == str(i["date"].strip()):
self.rootlog.debug("item found, aborting")
break
else:
if newest_item_written == False:
utils.write_file(rss_cache_dir + os.sep + name, i["date"].strip())
newest_item_written = True
# write date of this feed into file (not elegant)
text2chat = "".join(["[", name, "] ", i["title"], " ", i["link"]])
self.rootlog.debug(text2chat)
self.send(self.room_name, text2chat)
else:
self.rootlog.debug("last_cache_item is None")
utils.write_file(rss_cache_dir + os.sep + name, f["items"][0]["date"])
开发者ID:Strubbl,项目名称:pynder,代码行数:26,代码来源:pynder.py
示例13: genereHtml
def genereHtml(d, i):
s = utils.read_file("corpus/" + i)
for j in d[i]:
pattern = re.compile(j + ' ' , re.I)
s = pattern.sub('<a href="keywords/' + j + '.html"/>' + j + "</a> ", s)
# print j, "\n", s.encode('utf-8'), "\n\n"
utils.write_file("wiki/" + i + ".html", s)
开发者ID:petosorus,项目名称:wikileaks,代码行数:7,代码来源:graphe.py
示例14: _image2XMLhelper
def _image2XMLhelper(self, image_xml, output_xmls, qemu=False):
image2guestdir = self.basedir + "image2guest/"
image = virtinst.ImageParser.parse_file(self.basedir + image_xml)
if type(output_xmls) is not list:
output_xmls = [output_xmls]
conn = qemu and self.qemuconn or self.conn
caps = qemu and self.qemucaps or self.caps
gtype = qemu and "qemu" or "xen"
for idx in range(len(output_xmls)):
fname = output_xmls[idx]
inst = virtinst.ImageInstaller(image, caps, boot_index=idx,
conn=conn)
utils.set_conn(conn)
if inst.is_hvm():
g = utils.get_basic_fullyvirt_guest(typ=gtype)
else:
g = utils.get_basic_paravirt_guest()
g.installer = inst
g._prepare_install(None)
actual_out = g.get_config_xml(install=False)
expect_file = os.path.join(image2guestdir + fname)
expect_out = utils.read_file(expect_file)
expect_out = expect_out.replace("REPLACEME", os.getcwd())
utils.diff_compare(actual_out,
expect_file,
expect_out=expect_out)
utils.reset_conn()
开发者ID:palli,项目名称:python-virtinst,代码行数:35,代码来源:image.py
示例15: test_luks2_integrity
def test_luks2_integrity(self):
"""Verify that we can get create a LUKS 2 device with integrity"""
if not BlockDev.utils_have_kernel_module("dm-integrity"):
self.skipTest('dm-integrity kernel module not available, skipping.')
extra = BlockDev.CryptoLUKSExtra()
extra.integrity = "hmac(sha256)"
succ = BlockDev.crypto_luks_format(self.loop_dev, "aes-cbc-essiv:sha256", 512, PASSWD, None, 0,
BlockDev.CryptoLUKSVersion.LUKS2, extra)
self.assertTrue(succ)
succ = BlockDev.crypto_luks_open(self.loop_dev, "libblockdevTestLUKS", PASSWD, None, False)
self.assertTrue(succ)
info = BlockDev.crypto_integrity_info("libblockdevTestLUKS")
self.assertIsNotNone(info)
self.assertEqual(info.algorithm, "hmac(sha256)")
# get integrity device dm name
_ret, int_name, _err = run_command('ls /sys/block/%s/holders/' % self.loop_dev.split("/")[-1])
self.assertTrue(int_name) # true == not empty
tag_size = read_file("/sys/block/%s/integrity/tag_size" % int_name)
self.assertEqual(info.tag_size, int(tag_size))
succ = BlockDev.crypto_luks_close("libblockdevTestLUKS")
self.assertTrue(succ)
开发者ID:rhinstaller,项目名称:libblockdev,代码行数:30,代码来源:crypto_test.py
示例16: get_info
def get_info(self, cd_or_iso_path):
if (cd_or_iso_path, self.info_file) in Distro.cache:
return Distro.cache[(cd_or_iso_path, self.info_file)]
else:
Distro.cache[(cd_or_iso_path, self.info_file)] = None
if os.path.isfile(cd_or_iso_path):
info_file = self.backend.extract_file_from_iso(
cd_or_iso_path,
self.info_file,
output_dir=self.backend.info.temp_dir,
overwrite=True)
elif os.path.isdir(cd_or_iso_path):
info_file = os.path.join(cd_or_iso_path, self.info_file)
else:
return
if not info_file or not os.path.isfile(info_file):
return
try:
info = read_file(info_file)
info = self.parse_isoinfo(info)
except Exception, err:
log.error(err)
return
Distro.cache[(cd_or_iso_path, self.info_file)] = info
return info
开发者ID:Ando02,项目名称:wubiuefi,代码行数:25,代码来源:distro.py
示例17: authenticate
def authenticate(domain=None,token_folder=None):
'''authenticate will authenticate the user with Singularity Hub. This means
either obtaining the token from the environment, and then trying to obtain
the token file and reading it, and then finally telling the user to get it.
:param domain: the domain to direct the user to for the token, default is api_base
:param token_folder: the location of the token file, default is $HOME (~/)
'''
# Attempt 1: Get token from environmental variable
token = os.environ.get("SINGULARITY_TOKEN",None)
if token == None:
# Is the user specifying a custom home folder?
if token_folder == None:
token_folder = os.environ["HOME"]
token_file = "%s/.shub" %(token_folder)
if os.path.exists(token_file):
token = read_file(token_file)[0].strip('\n')
else:
if domain == None:
domain = api_base
print('''Please obtain token from %s/token
and save to .shub in your $HOME folder''' %(domain))
sys.exit(1)
return token
开发者ID:gmkurtzer,项目名称:singularity,代码行数:25,代码来源:api.py
示例18: convert_inc
def convert_inc(fn, dst):
text = utils.read_file(fn)
lst = []
for line in text.split('\n'):
if line.startswith(';'):
continue
if line.find(' EQU ') > 0:
line = line.replace(' EQU ', ';')
t = line.split(';')
t[0] = t[0].strip()
t[1] = t[1].strip()
lst.append([t[0], t[1]])
f = open(dst, 'w+')
#print >>f, 'defines = {'
for t in lst:
name = t[0]
value = t[1]
if value.find('H') >= 0: #"H'0F8C'"
value = value.replace('H', '')
value = value.replace('\'', '')
value = '0x' + value
else:
value = hex(int(value))
#name = '\'' + name + '\''
print >>f, '%s:%s' % (name, value)
#print >>f, '}'
f.close()
开发者ID:athenajc,项目名称:XideSDCC,代码行数:28,代码来源:pic_sim.py
示例19: load_configuration
def load_configuration(configuration_file):
default_tokens ={'http_port':9090,
'http_host':'localhost',
'log_level': 'DEBUG',
'log_level_db': 'ERROR',
'log_level_vimconn': 'DEBUG',
}
try:
#Check config file exists
if not os.path.isfile(configuration_file):
raise LoadConfigurationException("Error: Configuration file '"+configuration_file+"' does not exist.")
#Read file
(return_status, code) = utils.read_file(configuration_file)
if not return_status:
raise LoadConfigurationException("Error loading configuration file '"+configuration_file+"': "+code)
#Parse configuration file
try:
config = yaml.load(code)
except yaml.YAMLError, exc:
error_pos = ""
if hasattr(exc, 'problem_mark'):
mark = exc.problem_mark
error_pos = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
raise LoadConfigurationException("Error loading configuration file '"+configuration_file+"'"+error_pos+": content format error: Failed to parse yaml format")
#Validate configuration file with the config_schema
try:
js_v(config, config_schema)
except js_e.ValidationError, exc:
error_pos = ""
if len(exc.path)>0: error_pos=" at '" + ":".join(map(str, exc.path))+"'"
raise LoadConfigurationException("Error loading configuration file '"+configuration_file+"'"+error_pos+": "+exc.message)
开发者ID:dagelf,项目名称:openmano,代码行数:33,代码来源:openmanod.py
示例20: test_write_read_files
def test_write_read_files(self):
'''test_write_read_files will test the functions write_file and read_file
'''
print("Testing utils.write_file...")
from utils import write_file
import json
tmpfile = tempfile.mkstemp()[1]
os.remove(tmpfile)
write_file(tmpfile,"hello!")
self.assertTrue(os.path.exists(tmpfile))
print("Testing utils.read_file...")
from utils import read_file
content = read_file(tmpfile)[0]
self.assertEqual("hello!",content)
from utils import write_json
print("Testing utils.write_json...")
print("Case 1: Providing bad json")
bad_json = {"Wakkawakkawakka'}":[{True},"2",3]}
tmpfile = tempfile.mkstemp()[1]
os.remove(tmpfile)
with self.assertRaises(TypeError) as cm:
write_json(bad_json,tmpfile)
print("Case 2: Providing good json")
good_json = {"Wakkawakkawakka":[True,"2",3]}
tmpfile = tempfile.mkstemp()[1]
os.remove(tmpfile)
write_json(good_json,tmpfile)
content = json.load(open(tmpfile,'r'))
self.assertTrue(isinstance(content,dict))
self.assertTrue("Wakkawakkawakka" in content)
开发者ID:gmkurtzer,项目名称:singularity,代码行数:33,代码来源:test_client.py
注:本文中的utils.read_file函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论