本文整理汇总了Python中xattr.getxattr函数的典型用法代码示例。如果您正苦于以下问题:Python getxattr函数的具体用法?Python getxattr怎么用?Python getxattr使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了getxattr函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: read
def read(self):
try:
self.ts = xattr.getxattr(fsencode(self.filename), 'user.shatag.ts').decode('ascii')
self.shatag = xattr.getxattr(fsencode(self.filename), 'user.shatag.sha256').decode('ascii')
except IOError as e:
if e.errno != errno.ENODATA: # no tag present
raise e
开发者ID:maugier,项目名称:shatag,代码行数:8,代码来源:xattr.py
示例2: _message_at_path
def _message_at_path(self, path, load_content=True):
try:
content = None
if load_content:
f = open(path, "rb")
content = f.read()
f.close()
mtime = os.path.getmtime(path)
directory, filename = os.path.split(path)
directory, subdir = os.path.split(directory)
msgid = None
info = None
parts = filename.split(":")
if len(parts) > 0:
msgid = parts[0]
if len(parts) > 1:
info = parts[1]
msg = Message(content=content, msgid=msgid, info=info, subdir=subdir, mtime=mtime)
if not msg.msg_md5 and self._use_xattrs and load_content:
try:
xattrs = xattr.listxattr(path)
# logging.debug(xattrs)
if XATTR_MD5SUM in xattrs:
msg.msg_md5 = xattr.getxattr(path, XATTR_MD5SUM)
# logging.debug("Read md5: %s", msg.msg_md5)
else:
c = msg.content_hash
if c:
# logging.debug("Setting shasum xattr: %r", c)
xattr.setxattr(path, XATTR_MD5SUM, c)
else:
logging.warning("Could not generate content hash of %s", msgid)
if XATTR_DATE in xattrs:
msg._date = xattr.getxattr(path, XATTR_DATE).decode("utf8")
msg._date = datetime.datetime.fromtimestamp(float(msg._date))
# logging.debug("Read date: %s", msg._date)
else:
d = str(msg.date.timestamp()).encode("utf8")
if d:
# logging.debug("Setting date xattr: %r", d)
xattr.setxattr(path, XATTR_DATE, d)
else:
logging.warning("Could not determine message date of %s", msgid)
except IOError:
# read-only FS, unsupported on FS, etc.
self._use_xattrs = False
log.debug("host filesystem for %s does not support xattrs; disabling" % self.name)
return msg
except OSError:
raise KeyError
开发者ID:ahknight,项目名称:maildir-lite,代码行数:58,代码来源:maildir.py
示例3: get_attrs
def get_attrs(self, uuid):
attrs = {}
tags = xattr.getxattr(os.path.join(self.basepath, uuid), 'user.tags')
flags = xattr.getxattr(os.path.join(self.basepath, uuid), 'user.flags')
stored = xattr.getxattr(os.path.join(self.basepath, uuid), 'user.stored')
attrs['tags'] = json.loads(tags)
attrs['flags'] = json.loads(flags)
attrs['stored'] = int(stored)
return attrs
开发者ID:via,项目名称:python-httpmail,代码行数:9,代码来源:localstorage.py
示例4: _mount_quobyte
def _mount_quobyte(self, quobyte_volume, mount_path, ensure=False):
"""Mount Quobyte volume to mount path."""
mounted = False
for l in QuobyteDriver.read_proc_mount():
if l.split()[1] == mount_path:
mounted = True
break
if mounted:
try:
os.stat(mount_path)
except OSError as exc:
if exc.errno == errno.ENOTCONN:
mounted = False
try:
LOG.info(_LI('Fixing previous mount %s which was not'
' unmounted correctly.') % mount_path)
self._execute('umount.quobyte', mount_path,
run_as_root=False)
except processutils.ProcessExecutionError as exc:
LOG.warn(_LW("Failed to unmount previous mount: %s"),
exc)
else:
# TODO(quobyte): Extend exc analysis in here?
LOG.warn(_LW("Unknown error occurred while checking mount"
" point: %s Trying to continue."), exc)
if not mounted:
if not os.path.isdir(mount_path):
self._execute('mkdir', '-p', mount_path)
command = ['mount.quobyte', quobyte_volume, mount_path]
if self.configuration.quobyte_client_cfg:
command.extend(['-c', self.configuration.quobyte_client_cfg])
try:
LOG.info(_LI('Mounting volume: %s ...') % quobyte_volume)
self._execute(*command, run_as_root=False)
LOG.info(_LI('Mounting volume: %s succeeded') % quobyte_volume)
mounted = True
except processutils.ProcessExecutionError as exc:
if ensure and 'already mounted' in exc.stderr:
LOG.warn(_LW("%s is already mounted"), quobyte_volume)
else:
raise
if mounted:
try:
xattr.getxattr(mount_path, 'quobyte.info')
except Exception as exc:
msg = _LE("The mount %(mount_path)s is not a valid"
" Quobyte USP volume. Error: %(exc)s") \
% {'mount_path': mount_path, 'exc': exc}
raise exception.VolumeDriverException(msg)
if not os.access(mount_path, os.W_OK | os.X_OK):
LOG.warn(_LW("Volume is not writable. Please broaden the file"
" permissions. Mount: %s"), mount_path)
开发者ID:NxtCloud,项目名称:cinder,代码行数:57,代码来源:quobyte.py
示例5: run_expunge_dirname
def run_expunge_dirname (dirname):
orig_cwd = os.getcwd()
execute_print ("orig dir: " + orig_cwd)
os.mkdir (dirname)
execute("kill -9 `cat /etc/glusterd/vols/vol/run/"+HOSTNAME+"-home-gluster1.pid`");
execute_print (binascii.b2a_hex (xattr.getxattr ("/home/gluster2/"+dirname, "trusted.gfid")))
os.rmdir(dirname)
execute_print (xattr.getxattr ("/home/gluster2/", "trusted.afr.vol-client-0").encode('base64'))
execute ("gluster volume start vol force")
execute ("sleep 20")
os.listdir ('.')
开发者ID:chriszl,项目名称:gluster-tests,代码行数:11,代码来源:lost_plus_found.py
示例6: read_xattr_metadata
def read_xattr_metadata(file_list):
"""For the given list of files read us the kMDItemDescription and kMDItemKeywords."""
meta_data = []
for file in file_list:
md = {}
attrs = xattr.listxattr(file)
if 'com.apple.metadata:kMDItemDescription' in attrs:
md['Caption-Abstract'] = biplist.readPlistFromString(xattr.getxattr(file, 'com.apple.metadata:kMDItemDescription'))
if 'com.apple.metadata:kMDItemKeywords' in attrs:
md['Keywords'] = biplist.readPlistFromString(xattr.getxattr(file, 'com.apple.metadata:kMDItemKeywords'))
meta_data.append(md)
return meta_data
开发者ID:kghose,项目名称:Chhobi2,代码行数:12,代码来源:libchhobi.py
示例7: main
def main():
masterurl = sys.argv[1]
slaveurl = sys.argv[2]
slave_node, slavevol = parse_url(slaveurl)
master_node, mastervol = parse_url(masterurl)
master_mnt = tempfile.mkdtemp()
slave_mnt = tempfile.mkdtemp()
try:
print "Mounting master volume on a temp mnt_pnt"
os.system("glusterfs -s %s --volfile-id %s %s" % (master_node,
mastervol,
master_mnt))
except:
print("Failed to mount the master volume")
cleanup(master_mnt, slave_mnt)
sys.exit(1)
try:
print "Mounting slave voluem on a temp mnt_pnt"
os.system("glusterfs -s %s --volfile-id %s %s" % (slave_node, slavevol,
slave_mnt))
except:
print("Failed to mount the master volume")
cleanup(master_mnt, slave_mnt)
sys.exit(1)
slave_file_list = [slave_mnt]
for top, dirs, files in os.walk(slave_mnt, topdown=False):
for subdir in dirs:
slave_file_list.append(os.path.join(top, subdir))
for file in files:
slave_file_list.append(os.path.join(top, file))
# chdir and then get the gfid, so that you don't need to replace
gfid_attr = 'glusterfs.gfid'
ret = 0
for sfile in slave_file_list:
mfile = sfile.replace(slave_mnt, master_mnt)
if xattr.getxattr(sfile, gfid_attr, True) != xattr.getxattr(
mfile, gfid_attr, True):
print ("gfid of file %s in slave is different from %s" +
" in master" % (sfile, mfile))
ret = 1
cleanup(master_mnt, slave_mnt)
sys.exit(ret)
开发者ID:Jingle-Wang,项目名称:glusterfs,代码行数:50,代码来源:compare-gfid.py
示例8: target_supports_extended_attributes
def target_supports_extended_attributes(self):
"""
Check if the target directory supports extended filesystem
attributes
:rtype: bool
"""
try:
xattr.getxattr(self.target_dir, 'user.mime_type')
except Exception as e:
if format(e).startswith('[Errno 95]'):
# libc interface [Errno 95] Operation not supported:
return False
return True
开发者ID:AdamMajer,项目名称:kiwi-ng,代码行数:14,代码来源:sync.py
示例9: get_file
def get_file(id):
source_file = file_from_id(id)
with open(source_file, 'rb') as f:
source = f.read().decode('utf-8')
try:
language = xattr.getxattr(source_file, 'user.upaste.language').decode('utf-8')
except:
language = config.DEFAULT_LANGUAGE
try:
original_id = xattr.getxattr(source_file, 'user.upaste.original_id').decode('utf-8')
except:
original_id = None
return source, language, original_id
开发者ID:Shizmob,项目名称:upaste,代码行数:15,代码来源:upaste.py
示例10: get_raw_tags
def get_raw_tags(path):
if 'com.apple.metadata:_kMDItemUserTags' in xattr.listxattr(path):
d = xattr.getxattr(path, 'com.apple.metadata:_kMDItemUserTags')
d = biplist.readPlistFromString(d)
return d
else:
return []
开发者ID:kdbdallas,项目名称:half-moon-tagging,代码行数:7,代码来源:tag.py
示例11: diff
def diff(filename, mf_path=''):
'''Use hashlib library to compare sha256 hash of
current file to the sha256 hash stored on MediaFire'''
full_expansion = get_path_expansion(filename)
if(mf_path == '' or not mf_path):
if ('/' in filename):
mf_path = os.path.basename(filename)
else:
mf_path = filename
if (os.path.isfile(full_expansion)):
in_file = open(filename, 'r')
mf_path = sanitize_path(mf_path)
file_contents = in_file.read().encode('utf-8')
new_hash = hashlib.sha256(file_contents).hexdigest()
media_hash = get_hash(mf_path)
try:
old_hash = xattr.getxattr(full_expansion, 'hash').decode('ascii')
except OSError:
old_hash = ''
if (media_hash == ''):
logger.die('No file path "' + mf_path + '/' + os.path.basename(filename) + '" in MediaFire')
else:
figure_time_scale(media_hash, old_hash, new_hash, os.path.basename(full_expansion))
else:
logger.die('No local file "' + os.path.basename(full_expansion) + '" found')
开发者ID:cdpetty,项目名称:one,代码行数:28,代码来源:diff.py
示例12: get_dir_layout
def get_dir_layout(root_dir_path):
dir_map = dict()
for dir_path, dir_names, filenames in os.walk(root_dir_path):
if len(dir_map) == 0:
dir_map[dir_path] = DirLayout() # Root folder
else:
par_dir_path = os.path.dirname(dir_path)
par_dir_layout = dir_map[par_dir_path]
# Create the subfolder
dir_name = os.path.basename(dir_path)
par_dir_layout.add_subfolder(dir_name)
dir_map[dir_path] = par_dir_layout.get_subfolder(dir_name)
dir_layout = dir_map[dir_path]
# TODO: Ignore this part first. Figure out what to do with this folders
if '.glusterfs' in dir_names:
dir_names.remove('.glusterfs')
if '.trashcan' in dir_names:
dir_names.remove('.trashcan')
ext_attr = xattr.getxattr(dir_path, 'trusted.glusterfs.dht')
(dummy, start, end) = struct.unpack_from(">qII", ext_attr)
dir_layout.set_layout(Layout(start, end))
return dir_map[root_dir_path]
开发者ID:opelhoward,项目名称:gdash,代码行数:25,代码来源:util.py
示例13: GetExtAttrs
def GetExtAttrs(filepath):
"""Fetches extended file attributes.
Args:
filepath: A path to the file.
Yields:
`ExtAttr` pairs.
"""
path = CanonicalPathToLocalPath(filepath)
try:
attr_names = xattr.listxattr(path)
except (IOError, OSError) as error:
msg = "Failed to retrieve extended attributes for '%s': %s"
logging.error(msg, path, error)
return
for attr_name in attr_names:
try:
attr_value = xattr.getxattr(path, attr_name)
except (IOError, OSError) as error:
msg = "Failed to retrieve attribute '%s' for '%s': %s"
logging.error(msg, attr_name, path, error)
continue
yield rdf_client.ExtAttr(name=attr_name, value=attr_value)
开发者ID:bhyvex,项目名称:grr,代码行数:27,代码来源:client_utils_osx_linux.py
示例14: _get_extended_attr
def _get_extended_attr(file_path, attr):
"""Get extended attributes from a file, returns an array of strings or None if no value is set.
Inspired by https://gist.github.com/dunhamsteve/2889617
Args:
file_path: str path of file to examine
attr: key of the attribute to retrieve
Returns:
a list of strings or None
"""
try:
xattr_val = getxattr(file_path, attr)
if xattr_val.startswith('bplist'):
try:
plist_array, _, plist_error = Foundation.NSPropertyListSerialization.propertyListWithData_options_format_error_(
buffer(xattr_val), 0, None, None)
if plist_error:
Logger.log_error(message='plist de-serialization error: {0}'.format(plist_error))
return None
return list(plist_array)
except Exception as deserialize_plist_e:
Logger.log_exception(deserialize_plist_e, message='_get_extended_attr failed on {0} for {1}'.format(file_path, attr))
else:
return [xattr_val]
except KeyError:
pass # ignore missing key in xattr
return None
开发者ID:djgrasss,项目名称:osxcollector,代码行数:28,代码来源:osxcollector.py
示例15: is_partition_supported
def is_partition_supported(self, folder):
if folder is None:
return False
result = False
to_delete = not os.path.exists(folder)
try:
if to_delete:
os.mkdir(folder)
if not os.access(folder, os.W_OK):
import stat
os.chmod(folder, stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP |
stat.S_IRUSR | stat.S_IWGRP | stat.S_IWUSR)
import xattr
attr = "drive-test"
xattr.setxattr(folder, attr, attr)
if xattr.getxattr(folder, attr) == attr:
result = True
xattr.removexattr(folder, attr)
finally:
try:
if to_delete:
os.rmdir(folder)
except:
pass
return result
开发者ID:Bindupriya,项目名称:nuxeo-drive,代码行数:25,代码来源:darwin.py
示例16: get_xattr
def get_xattr(self, filepath, key):
logger.info("get_xattr - %s, %s" % (filepath, key))
with self._get_lock():
ascii_path = filepath.encode('ascii', 'ignore')
localfs_path = self._make_localfs_path(ascii_path)
return xattr.getxattr(localfs_path, key)
开发者ID:syndicate-storage,项目名称:syndicate-fs-driver,代码行数:7,代码来源:local_plugin.py
示例17: get_local_files
def get_local_files (self, channels):
downloads_dir = self.config.config['downloads']
files = []
for channel in channels:
if '..' in channel:
continue
dir_fp = os.path.join (downloads_dir, channel)
if not os.path.exists (dir_fp):
continue
if not os.path.isdir (dir_fp):
continue
for f in os.listdir (dir_fp):
if f[0] in '.#~':
continue
fname = os.path.join (channel, f)
fname_fp = os.path.join (dir_fp, f)
fname_size = os.path.getsize(fname_fp)
utils.set_md5_attr (fname_fp)
fname_md5 = xattr.getxattr (fname_fp, 'md5')
files.append ({'path': fname, 'size': fname_size, 'md5': fname_md5})
return files
开发者ID:mangelajo,项目名称:common,代码行数:29,代码来源:Channels.py
示例18: _checkDeprecated
def _checkDeprecated(self, item, symlink=False):
"""check deprecated list, set, get operations against an item"""
self.assertEqual(self._ignore(xattr.listxattr(item, symlink)),
[])
self.assertRaises(EnvironmentError, xattr.setxattr, item,
self.USER_ATTR, self.USER_VAL,
XATTR_REPLACE)
try:
xattr.setxattr(item, self.USER_ATTR, self.USER_VAL, 0, symlink)
except IOError:
err = sys.exc_info()[1]
if err.errno == errno.EPERM and symlink:
# symlinks may fail, in which case we abort the rest
# of the test for this case
return
raise
self.assertRaises(EnvironmentError, xattr.setxattr, item,
self.USER_ATTR, self.USER_VAL, XATTR_CREATE)
self.assertEqual(self._ignore(xattr.listxattr(item, symlink)),
[self.USER_ATTR])
self.assertEqual(xattr.getxattr(item, self.USER_ATTR, symlink),
self.USER_VAL)
self.assertEqual(self._ignore_tuples(xattr.get_all(item,
nofollow=symlink)),
[(self.USER_ATTR, self.USER_VAL)])
xattr.removexattr(item, self.USER_ATTR)
self.assertEqual(self._ignore(xattr.listxattr(item, symlink)),
[])
self.assertEqual(self._ignore_tuples(xattr.get_all(item,
nofollow=symlink)),
[])
self.assertRaises(EnvironmentError, xattr.removexattr,
item, self.USER_ATTR)
开发者ID:quantheory,项目名称:pyxattr,代码行数:33,代码来源:test_xattr.py
示例19: read_metadata
def read_metadata(fd, md_key=None):
"""
Helper function to read the pickled metadata from an object file.
:param fd: file descriptor or filename to load the metadata from
:param md_key: metadata key to be read from object file
:returns: dictionary of metadata
"""
meta_key = SWIFT_METADATA_KEY
metadata = ''
key = 0
try:
while True:
metadata += xattr.getxattr(fd, '%s%s' % (meta_key,
(key or '')))
key += 1
except (IOError, OSError) as e:
if metadata == '':
return False
for err in 'ENOTSUP', 'EOPNOTSUPP':
if hasattr(errno, err) and e.errno == getattr(errno, err):
msg = "Filesystem at %s does not support xattr" % \
_get_filename(fd)
logging.exception(msg)
raise DiskFileXattrNotSupported(e)
if e.errno == errno.ENOENT:
raise DiskFileNotExist()
return pickle.loads(metadata)
开发者ID:iostackproject,项目名称:vertigo,代码行数:29,代码来源:utils.py
示例20: _test_xattr_on_file_with_contents
def _test_xattr_on_file_with_contents(filename, file_contents, xattrs=[], xar_create_flags=[], xar_extract_flags=[]):
try:
# Write file out
with open(filename, "w") as f:
f.write(file_contents)
for (key, value) in xattrs:
xattr.setxattr(f, key, value)
# Store it into a xarchive
archive_name = "{f}.xar".format(f=filename)
with util.archive_created(archive_name, filename, *xar_create_flags) as path:
# Extract xarchive
with util.directory_created("extracted") as directory:
# Validate resulting xattrs
subprocess.check_call(["xar", "-x", "-C", directory, "-f", path] + xar_extract_flags)
for (key, value) in xattrs:
try:
assert xattr.getxattr(os.path.join(directory, filename), key) == value, "extended attribute \"{n}\" has incorrect contents after extraction".format(n=key)
except KeyError:
raise MissingExtendedAttributeError("extended attribute \"{n}\" missing after extraction".format(n=key))
# Validate file contents
with open(os.path.join(directory, filename), "r") as f:
if f.read() != file_contents:
raise MissingExtendedAttributeError("archived file \"{f}\" has has incorrect contents after extraction".format(f=filename))
finally:
os.unlink(filename)
开发者ID:010001111,项目名称:darling,代码行数:27,代码来源:attr.py
注:本文中的xattr.getxattr函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论