本文整理汇总了Python中scandir.scandir函数的典型用法代码示例。如果您正苦于以下问题:Python scandir函数的具体用法?Python scandir怎么用?Python scandir使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了scandir函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: iter_files
def iter_files(root, exts=None, recursive=False):
"""
Iterate over file paths within root filtered by specified extensions.
:param str root: Root folder to start collecting files
:param iterable exts: Restrict results to given file extensions
:param bool recursive: Wether to walk the complete directory tree
:rtype collections.Iterable[str]: absolute file paths with given extensions
"""
if exts is not None:
exts = set((x.lower() for x in exts))
def matches(e):
return (exts is None) or (e in exts)
if recursive is False:
for entry in scandir(root):
if has_scandir:
ext = splitext(entry.name)[-1].lstrip('.').lower()
if entry.is_file() and matches(ext):
yield entry.path
else:
ext = splitext(entry)[-1].lstrip('.').lower()
if not isdir(entry) and matches(ext):
yield join(root, entry)
else:
for root, folders, files in walk(root):
for f in files:
ext = splitext(f)[-1].lstrip('.').lower()
if matches(ext):
yield join(root, f)
开发者ID:titusz,项目名称:onixcheck,代码行数:32,代码来源:utils.py
示例2: __init__
def __init__(self, path=None, archive=''):
self.metadata = {
"title":'',
"artist":'',
"type":'',
"tags":{},
"language":'',
"pub_date":'',
"link":'',
"info":'',
}
self.files = []
if path is None:
return
if archive:
zip = ArchiveFile(archive)
c = zip.dir_contents(path)
for x in c:
if x.endswith(app_constants.GALLERY_METAFILE_KEYWORDS):
self.files.append(open(zip.extract(x), encoding='utf-8'))
else:
for p in scandir.scandir(path):
if p.name in app_constants.GALLERY_METAFILE_KEYWORDS:
self.files.append(open(p.path, encoding='utf-8'))
if self.files:
self.detect()
else:
log_d('No metafile found...')
开发者ID:Pewpews,项目名称:happypanda,代码行数:29,代码来源:utils.py
示例3: yielding_checked_fnwalk
def yielding_checked_fnwalk(path, fn, sleep_interval=0.01):
try:
parent, name = os.path.split(path)
entry = scandir.GenericDirEntry(parent, name)
if fn(entry):
yield entry
queue = gevent.queue.LifoQueue()
if entry.is_dir():
queue.put(path)
while True:
try:
path = queue.get(timeout=0)
except gevent.queue.Empty:
break
else:
for entry in scandir.scandir(path):
if fn(entry):
if entry.is_dir():
queue.put(entry.path)
yield entry
gevent.sleep(sleep_interval)
except Exception as e:
logging.exception(
'Exception while directory walking: {}'.format(str(e)))
开发者ID:Outernet-Project,项目名称:fsal,代码行数:25,代码来源:fsdbmanager.py
示例4: fnwalk
def fnwalk(path, fn, shallow=False):
"""
Walk directory tree top-down until directories of desired length are found
This generator function takes a ``path`` from which to begin the traversal,
and a ``fn`` object that selects the paths to be returned. It calls
``os.listdir()`` recursively until either a full path is flagged by ``fn``
function as valid (by returning a truthy value) or ``os.listdir()`` fails
with ``OSError``.
This function has been added specifically to deal with large and deep
directory trees, and it's therefore not advisable to convert the return
values to lists and similar memory-intensive objects.
The ``shallow`` flag is used to terminate further recursion on match. If
``shallow`` is ``False``, recursion continues even after a path is matched.
For example, given a path ``/foo/bar/bar``, and a matcher that matches
``bar``, with ``shallow`` flag set to ``True``, only ``/foo/bar`` is
matched. Otherwise, both ``/foo/bar`` and ``/foo/bar/bar`` are matched.
"""
if fn(path):
yield path
if shallow:
return
try:
entries = scandir.scandir(path)
except OSError:
return
for entry in entries:
if entry.is_dir():
for child in fnwalk(entry.path, fn, shallow):
yield child
开发者ID:AbelMon,项目名称:librarian,代码行数:35,代码来源:content.py
示例5: iter_folders
def iter_folders(self):
"""Return a generator of folder names."""
for entry in scandir.scandir(self._path):
entry = entry.name
if len(entry) > 1 and entry[0] == '.' and \
os.path.isdir(os.path.join(self._path, entry)):
yield entry[1:]
开发者ID:SpamExperts,项目名称:se-mailbox,代码行数:7,代码来源:smaildir.py
示例6: process_dir
def process_dir(self, path, st):
""" i_dir should be absolute path
st is the stat object associated with the directory
"""
last_report = MPI.Wtime()
count = 0
try:
with timeout(seconds=30):
entries = scandir(path)
except OSError as e:
log.warn(e, extra=self.d)
self.skipped += 1
except TimeoutError as e:
log.error("%s when scandir() on %s" % (e, path), extra=self.d)
self.skipped += 1
else:
for entry in entries:
if entry.is_symlink():
self.sym_links += 1
elif entry.is_file():
self.circle.enq(entry.path)
else:
self.circle.preq(entry.path)
count += 1
if (MPI.Wtime() - last_report) > self.interval:
print("Rank %s : Scanning [%s] at %s" % (self.circle.rank, path, count))
last_report = MPI.Wtime()
log.info("Finish scan of [%s], count=%s" % (path, count), extra=self.d)
if count > self.maxfiles:
self.maxfiles = count
self.maxfiles_dir = path
开发者ID:verolero86,项目名称:pcircle,代码行数:32,代码来源:fprof.py
示例7: scanwalk
def scanwalk(self, path, followlinks=False):
''' lists of DirEntries instead of lists of strings '''
dirs, nondirs = [], []
try:
for entry in scandir(path):
# check if the file contains our pattern
for s in self.search_str:
if entry.name.lower().find(s) != -1:
yield '%s' % entry.path
# if directory, be recursive
if entry.is_dir(follow_symlinks=followlinks):
for res in self.scanwalk(entry.path):
yield res
# check inside the file to found our pattern
else:
if self.max_size > entry.stat(follow_symlinks=False).st_size:
if entry.name.endswith(self.files_extensions):
if self.check_content:
for res in self.search_string(entry.path):
try:
res = res.encode('utf-8')
yield '%s > %s' % (entry.path, res)
except:
pass
# try / except used for permission denied
except:
pass
开发者ID:dc3l1ne,项目名称:pupy,代码行数:32,代码来源:search.py
示例8: _refresh
def _refresh(self):
"""Update table of contents mapping."""
# If it has been less than two seconds since the last _refresh() call,
# we have to unconditionally re-read the mailbox just in case it has
# been modified, because os.path.mtime() has a 2 sec resolution in the
# most common worst case (FAT) and a 1 sec resolution typically. This
# results in a few unnecessary re-reads when _refresh() is called
# multiple times in that interval, but once the clock ticks over, we
# will only re-read as needed. Because the filesystem might be being
# served by an independent system with its own clock, we record and
# compare with the mtimes from the filesystem. Because the other
# system's clock might be skewing relative to our clock, we add an
# extra delta to our wait. The default is one tenth second, but is an
# instance variable and so can be adjusted if dealing with a
# particularly skewed or irregular system.
if time.time() - self._last_read > 2 + self._skewfactor:
refresh = False
for subdir in self._toc_mtimes:
mtime = os.path.getmtime(self._paths[subdir])
if mtime > self._toc_mtimes[subdir]:
refresh = True
self._toc_mtimes[subdir] = mtime
if not refresh:
return
# Refresh toc
self._toc = {}
for subdir in self._toc_mtimes:
path = self._paths[subdir]
for entry in scandir.scandir(path):
if entry.is_dir():
continue
entry = entry.name
uniq = entry.split(self.colon)[0]
self._toc[uniq] = os.path.join(subdir, entry)
self._last_read = time.time()
开发者ID:SpamExperts,项目名称:se-mailbox,代码行数:35,代码来源:smaildir.py
示例9: dir_tree_info_pars
def dir_tree_info_pars(self, path, dirtrtable, monitor_types):
"""
Recursively traverses the filesystem, loads the dirtrtable tree object
Return a dir_info dict with statistics from it's children
adds a dirtrtable dir node if none exises and sets the node content
to dir_info. Traverses filesystem using breath first method.
Main algorithmic worker for StorageStats.
"""
if not dirtrtable.is_node_by_name(path): # if this dir has no dir node in dirtrtable make one
if not dirtrtable.is_root_set(): # dirtrtable has only a uninitialized root node, root needs initialization
dirtrtable.set_root_name(path, {}) # init the root node set to this the first root dir path
else:
parNodeId = dirtrtable.getnode_idByName(os.path.dirname(path))
dirtrtable.add_child(path, parNodeId, {})
dir_info = self.dir_totals_by_type(path, monitor_types)
try:
for entry in scandir(path):
if entry.is_dir(follow_symlinks=False):
temp_dir_info = self.dir_tree_info_pars(os.path.join(path, entry.name), dirtrtable, monitor_types)
for each in dir_info:
dir_info[each] += temp_dir_info[each]
except Exception as e:
logging.warn( e )
dirtrtable.up_date_node_by_name(path, dir_info)
return dir_info
开发者ID:jayventi,项目名称:storagestats,代码行数:26,代码来源:storagestats.py
示例10: _scan_disk
def _scan_disk(self, on_disk, path):
for entry in scandir.scandir(path):
if not entry.name.startswith(".") and entry.is_dir():
self._scan_disk(on_disk, entry.path)
elif entry.is_file():
on_disk[entry.path] = entry.stat().st_mtime
return on_disk
开发者ID:parente,项目名称:contentmanagement,代码行数:7,代码来源:index.py
示例11: find_f_img_archive
def find_f_img_archive(extract=True):
zip = ArchiveFile(temp_p)
if extract:
gui_constants.NOTIF_BAR.add_text('Extracting...')
t_p = os.path.join('temp', str(uuid.uuid4()))
os.mkdir(t_p)
if is_archive or chapterpath.endswith(ARCHIVE_FILES):
if os.path.isdir(chapterpath):
t_p = chapterpath
elif chapterpath.endswith(ARCHIVE_FILES):
zip2 = ArchiveFile(chapterpath)
f_d = sorted(zip2.dir_list(True))
if f_d:
f_d = f_d[0]
t_p = zip2.extract(f_d, t_p)
else:
t_p = zip2.extract('', t_p)
else:
t_p = zip.extract(chapterpath, t_p)
else:
zip.extract_all(t_p) # Compatibility reasons.. TODO: REMOVE IN BETA
filepath = os.path.join(t_p, [x for x in sorted([y.name for y in scandir.scandir(t_p)])\
if x.lower().endswith(IMG_FILES)][0]) # Find first page
filepath = os.path.abspath(filepath)
else:
if is_archive:
con = zip.dir_contents('')
f_img = [x for x in sorted(con) if x.lower().endswith(IMG_FILES)]
if not f_img:
log_w('Extracting archive.. There are no images in the top-folder. ({})'.format(archive))
return find_f_img_archive()
filepath = os.path.normpath(archive)
else:
raise ValueError("Unsupported gallery version")
return filepath
开发者ID:peaceandpizza,项目名称:happypanda,代码行数:35,代码来源:utils.py
示例12: fts_scandir
def fts_scandir(path, *, logical=False, nochdir=False,
nostat=False, seedot=False, xdev=False,
_level=0):
for entry in scandir.scandir(path):
direntry = ScandirDirEntry(entry)
direntry.error = None
direntry.level = _level
direntry.postorder = False
direntry.skip = False
if not nostat:
try:
direntry.lstat()
except OSError as e:
direntry.error = e
if direntry.is_dir():
yield direntry
if not direntry.skip:
yield from fts_scandir(os.path.join(path, direntry.name),
logical=logical, nochdir=nochdir,
nostat=nostat, seedot=seedot, xdev=xdev,
_level=_level+1)
direntry = copy.copy(direntry)
direntry.postorder = True
yield direntry
else:
yield direntry
开发者ID:abarnert,项目名称:scandir,代码行数:26,代码来源:ftss.py
示例13: dir_totals_by_type
def dir_totals_by_type(self, path, monitor_types):
"""
Returns a dictionary with a keys set to each monitor types
The value is the long total size in bytes.
Fof each monitor file type an additional key is produced
"types +'_Cn'" for file count value set to integer file
count for that type. all files not falling under
monitor_types are summarized in the default category
'other'
"""
dir_info = {}
other, other_Cn = 0, 0
for k in monitor_types:
dir_info[k], dir_info[k+'_Cn'] = 0, 0
try:
dir_entry_list = scandir(path)
for entry in dir_entry_list:
if not entry.is_dir(follow_symlinks=False):
this_type = entry.name.split('.')[-1]
if this_type in monitor_types:
dir_info[this_type] += entry.stat(follow_symlinks=False).st_size
dir_info[this_type + '_Cn'] += 1
else:
other += entry.stat(follow_symlinks=False).st_size
other_Cn += 1
except Exception as e:
logging.warn( e )
dir_info['other'], dir_info['other_Cn'] = other, other_Cn
return dir_info
开发者ID:jayventi,项目名称:storagestats,代码行数:29,代码来源:storagestats.py
示例14: populate_userdir
def populate_userdir(fargs):
predefined_locations = ['www', 'secure-www']
userdir, checkmodes = fargs
locations = []
try:
userdir = os.path.abspath(userdir)
if not validate_directory(userdir, checkmodes):
return locations
public_html_location = userdir + '/public_html'
if validate_directory(public_html_location, checkmodes):
logging.debug('Appending to locations: %s', public_html_location)
locations.append(public_html_location)
sites_location = userdir + '/sites'
if validate_directory(sites_location, checkmodes):
for site in scandir.scandir(sites_location):
site = site.name
sitedir = sites_location + '/' + site
if checkmodes:
if not check_dir_execution_bit(sitedir):
continue
for predefined_directory in predefined_locations:
sites_location_last = sitedir + '/' + predefined_directory
if validate_directory(sites_location_last, checkmodes):
logging.debug('Appending to locations: %s', sites_location_last)
locations.append(sites_location_last)
except Exception:
logging.error(traceback.format_exc())
return locations
开发者ID:b1ueb0y,项目名称:pyfiscan,代码行数:33,代码来源:pyfiscan.py
示例15: gf_listdir
def gf_listdir(path):
if scandir_present:
for entry in scandir.scandir(path):
yield entry
else:
for name in os.listdir(path):
yield SmallDirEntry(path, name, DT_UNKNOWN)
开发者ID:gluster,项目名称:gluster-swift,代码行数:7,代码来源:utils.py
示例16: gen_gallery_hashes
def gen_gallery_hashes(gallery):
"Generates hashes for gallery's first chapter and inserts them to DB"
if gallery.id:
chap_id = ChapterDB.get_chapter_id(gallery.id, 0)
try:
if gallery.is_archive:
raise NotADirectoryError
chap_path = gallery.chapters[0]
imgs = scandir.scandir(chap_path)
# filter
except NotADirectoryError:
# HACK: Do not need to extract all.. can read bytes from acrhive!!!
t_p = os.path.join(gui_constants.temp_dir, str(uuid.uuid4()))
try:
if gallery.is_archive:
zip = ArchiveFile(gallery.path)
chap_path = zip.extract(gallery.chapters[0], t_p)
else:
chap_path = t_p
zip = ArchiveFile(gallery.chapters[0])
zip.extract_all(chap_path)
except CreateArchiveFail:
log_e('Could not generate hashes: CreateZipFail')
return []
imgs = scandir.scandir(chap_path)
except FileNotFoundError:
return False
# filter
imgs = [x.path for x in imgs if x.name.lower().endswith(tuple(IMG_FILES))]
hashes = []
for n, i in enumerate(sorted(imgs)):
with open(i, 'rb') as img:
hashes.append(generate_img_hash(img))
if gallery.id and chap_id:
executing = []
for hash in hashes:
executing.append(["""INSERT INTO hashes(hash, series_id, chapter_id, page)
VALUES(?, ?, ?, ?)""", (hash, gallery.id, chap_id, n)])
CommandQueue.put(executing)
c = ResultQueue.get()
del c
return hashes
开发者ID:peaceandpizza,项目名称:happypanda,代码行数:47,代码来源:gallerydb.py
示例17: from_directory
def from_directory(
directory,
calculate_bounding_box=False,
index_subdirs=True):
'''
Loads a section from a directory without loading any images.
If the directory does not seem to be a section or is not ready,
return None.
'''
if index_subdirs:
fovs = []
for f in Util.listdir(directory):
fov_path = os.path.join(directory, f)
# if not os.path.isdir(fov_path):
# # fovs always reside in directories
# continue
fov = FoV.from_directory(fov_path, calculate_bounding_box)
if fov:
fovs.append(fov)
else:
fovs = None
# Read the LUTS file in the directory, if one exists
# Should either be None or a mapping of a tile filename to its base64 luts string
luts64_map = None
if settings.LUTS_FILE_SUFFIX is not None:
#section_dir_name = os.path.split(directory)[-1]
#luts_fname = os.path.join(directory, '{}{}'.format(section_dir_name, settings.LUTS_FILE_SUFFIX))
luts_fname = ''
# Assuming there is only a single file with that prefix, use it
all_dir_files = scandir.scandir(directory)
for entry in all_dir_files:
if entry.name.endswith(settings.LUTS_FILE_SUFFIX):
luts_fname = os.path.join(directory, entry.name)
break
if os.path.exists(luts_fname):
# print "Using LUTS file: {}".format(luts_fname)
data = None
with open(luts_fname, 'r') as f:
data = f.readlines()
# Map between a file name and its luts base64 string
luts64_map = {}
for line in data:
tile_full_name, b64_str = line.split('\t')
tile_fname = tile_full_name.split('\\')[-1].lower() # Assuming Zeiss microscope system will always stay in windows
b64_str = b64_str[:-2] # Remove \r\n from the end of the string
luts64_map[tile_fname] = b64_str
section = Section(directory, fovs, calculate_bounding_box, luts64_map)
return section
开发者ID:Rhoana,项目名称:mb,代码行数:59,代码来源:section.py
示例18: clean
def clean(self):
"""Delete old files in "tmp"."""
now = time.time()
for entry in scandir.scandir(os.path.join(self._path, 'tmp')):
entry = entry.name
path = os.path.join(self._path, 'tmp', entry)
if now - os.path.getatime(path) > 129600: # 60 * 60 * 36
os.remove(path)
开发者ID:SpamExperts,项目名称:se-mailbox,代码行数:8,代码来源:smaildir.py
示例19: test_symlink
def test_symlink(self):
if not hasattr(os, 'symlink'):
return
entries = sorted(scandir.scandir(test_path), key=lambda e: e.name)
self.assertEqual([(e.name, e.is_symlink()) for e in entries],
[('file1.txt', False), ('file2.txt', False),
('link_to_dir', True), ('link_to_file', True),
('subdir', False)])
开发者ID:lowks,项目名称:scandir,代码行数:8,代码来源:test_scandir.py
示例20: filewalk
def filewalk(root):
"""Discover and yield all files found in the specified `root` folder."""
for entry in scandir.scandir(root):
if entry.is_dir():
for child in filewalk(entry.path):
yield child
else:
yield entry.path
开发者ID:AbelMon,项目名称:librarian,代码行数:8,代码来源:content.py
注:本文中的scandir.scandir函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论