本文整理汇总了Python中rhn.i18n.bstr函数的典型用法代码示例。如果您正苦于以下问题:Python bstr函数的具体用法?Python bstr怎么用?Python bstr使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了bstr函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _add_proxy_headers
def _add_proxy_headers(self):
if not self.__username:
return
# Authenticated proxy
userpass = "%s:%s" % (self.__username, self.__password)
enc_userpass = base64.encodestring(i18n.bstr(userpass)).replace(i18n.bstr("\n"), i18n.bstr(""))
self.putheader("Proxy-Authorization", "Basic %s" % i18n.sstr(enc_userpass))
开发者ID:mcalmer,项目名称:spacewalk,代码行数:7,代码来源:connections.py
示例2: _getOSVersionAndRelease
def _getOSVersionAndRelease():
osVersionRelease = None
ts = transaction.initReadOnlyTransaction()
for h in ts.dbMatch('Providename', "oraclelinux-release"):
SYSRELVER = 'system-release(releasever)'
version = sstr(h['version'])
release = sstr(h['release'])
if SYSRELVER in h['providename']:
provides = dict(zip(h['providename'], h['provideversion']))
release = '%s-%s' % (version, release)
version = provides[SYSRELVER]
osVersionRelease = (sstr(h['name']), version, release)
return osVersionRelease
else:
for h in ts.dbMatch('Providename', "redhat-release"):
SYSRELVER = 'system-release(releasever)'
version = sstr(h['version'])
release = sstr(h['release'])
if SYSRELVER in h['providename']:
provides = dict(zip(h['providename'], h['provideversion']))
release = '%s-%s' % (version, release)
version = provides[SYSRELVER]
osVersionRelease = (sstr(h['name']), version, release)
return osVersionRelease
else:
# new SUSE always has a baseproduct link which point to the
# product file of the first installed product (the OS)
# all rpms containing a product must provide "product()"
# search now for the package providing the base product
baseproduct = '/etc/products.d/baseproduct'
if os.path.exists(baseproduct):
bp = os.path.abspath(os.path.join(os.path.dirname(baseproduct), os.readlink(baseproduct)))
for h in ts.dbMatch('Providename', "product()"):
if bstr(bp) in h['filenames']:
osVersionRelease = (sstr(h['name']), sstr(h['version']), sstr(h['release']))
# zypper requires a exclusive lock on the rpmdb. So we need
# to close it here.
ts.ts.closeDB()
return osVersionRelease
else:
# for older SUSE versions we need to search for distribution-release
# package which also has /etc/SuSE-release file
for h in ts.dbMatch('Providename', "distribution-release"):
osVersionRelease = (sstr(h['name']), sstr(h['version']), sstr(h['release']))
if bstr('/etc/SuSE-release') in h['filenames']:
# zypper requires a exclusive lock on the rpmdb. So we need
# to close it here.
ts.ts.closeDB()
return osVersionRelease
raise up2dateErrors.RpmError(
"Could not determine what version of Linux you "\
"are running.\nIf you get this error, try running \n\n"\
"\t\trpm --rebuilddb\n\n")
开发者ID:m47ik,项目名称:uyuni,代码行数:54,代码来源:up2dateUtils.py
示例3: _read_username
def _read_username(self):
tty = open("/dev/tty", "rb+", buffering=0)
tty.write(bstr("Username: "))
try:
username = tty.readline()
except KeyboardInterrupt:
tty.write(bstr("\n"))
sys.exit(0)
if username is None:
# EOF
tty.write(bstr("\n"))
sys.exit(0)
return username.strip()
开发者ID:BlackSmith,项目名称:spacewalk,代码行数:13,代码来源:handler_base.py
示例4: idn_puny_to_unicode
def idn_puny_to_unicode(hostname):
""" Convert Internationalized domain name from Punycode (RFC3492) to Unicode """
if hostname is None:
return None
else:
hostname = i18n.bstr(hostname)
return hostname.decode('idna')
开发者ID:mcalmer,项目名称:spacewalk,代码行数:7,代码来源:connections.py
示例5: __init__
def __init__(self, socket, trusted_certs=None):
# SSL.Context object
self._ctx = None
# SSL.Connection object
self._connection = None
self._sock = socket
self._trusted_certs = []
# convert None to empty list
trusted_certs = trusted_certs or []
for f in trusted_certs:
self.add_trusted_cert(f)
# SSL method to use
self._ssl_method = SSL.SSLv23_METHOD
# Flags to pass to the SSL layer
self._ssl_verify_flags = SSL.VERIFY_PEER
# Buffer size for reads
self._buffer_size = 8192
# Position, for tell()
self._pos = 0
# Buffer
self._buffer = bstr("")
# Flag to show if makefile() was called
self._makefile_called = 0
self._closed = None
开发者ID:spacewalkproject,项目名称:spacewalk,代码行数:28,代码来源:SSL.py
示例6: _read_bytes
def _read_bytes(stream, amt):
ret = bstr('')
while amt:
buf = stream.read(min(amt, BUFFER_SIZE))
if not buf:
return ret
ret = ret + buf
amt = amt - len(buf)
return ret
开发者ID:jdobes,项目名称:spacewalk,代码行数:9,代码来源:rhn_pkg.py
示例7: __parse_action_data
def __parse_action_data(self, action):
""" Parse action data and returns (method, params) """
data = action['action']
parser, decoder = xmlrpclib.getparser()
parser.feed(bstr(data))
parser.close()
params = decoder.close()
method = decoder.getmethodname()
return (method, params)
开发者ID:m47ik,项目名称:uyuni,代码行数:9,代码来源:rhn_check.py
示例8: process
def process(self, data):
# Assume straight text/xml
self.data = data
# Content-Encoding header
if self.encoding == self.ENCODE_GZIP:
import gzip
encoding_name = self.encodings[self.ENCODE_GZIP][0]
self.set_header("Content-Encoding", encoding_name)
f = SmartIO(force_mem=1)
gz = gzip.GzipFile(mode="wb", compresslevel=COMPRESS_LEVEL, fileobj=f)
if sys.version_info[0] == 3:
gz.write(bstr(data))
else:
gz.write(sstr(data))
gz.close()
self.data = f.getvalue()
f.close()
elif self.encoding == self.ENCODE_ZLIB:
import zlib
encoding_name = self.encodings[self.ENCODE_ZLIB][0]
self.set_header("Content-Encoding", encoding_name)
obj = zlib.compressobj(COMPRESS_LEVEL)
self.data = obj.compress(data) + obj.flush()
elif self.encoding == self.ENCODE_GPG:
# XXX: fix me.
raise NotImplementedError(self.transfer, self.encoding)
encoding_name = self.encodings[self.ENCODE_GPG][0]
self.set_header("Content-Encoding", encoding_name)
# Content-Transfer-Encoding header
if self.transfer == self.TRANSFER_BINARY:
transfer_name = self.transfers[self.TRANSFER_BINARY]
self.set_header("Content-Transfer-Encoding", transfer_name)
self.set_header("Content-Type", "application/binary")
elif self.transfer == self.TRANSFER_BASE64:
import base64
transfer_name = self.transfers[self.TRANSFER_BASE64]
self.set_header("Content-Transfer-Encoding", transfer_name)
self.set_header("Content-Type", "text/base64")
self.data = base64.encodestring(self.data)
self.set_header("Content-Length", len(self.data))
rpc_version = __version__
if len(__version__.split()) > 1:
rpc_version = __version__.split()[1]
# other headers
self.set_header(
"X-Transport-Info", "Extended Capabilities Transport (C) Red Hat, Inc (version %s)" % rpc_version
)
self.__processed = 1
开发者ID:shastah,项目名称:spacewalk,代码行数:56,代码来源:transports.py
示例9: _copy_file_to_fd
def _copy_file_to_fd(filename, fd):
f = open(filename)
buffer_size = 16384
count = 0
while 1:
buf = f.read(buffer_size)
if not buf:
break
os.write(fd, bstr(buf))
count = count + len(buf)
return count
开发者ID:mcalmer,项目名称:spacewalk,代码行数:11,代码来源:rhn_ssl_tool.py
示例10: sha256_file
def sha256_file(filename):
engine = hashlib.new('sha256')
fh = open(filename, "rb")
while 1:
buf = fh.read(4096)
if not buf:
break
engine.update(bstr(buf))
return engine.hexdigest()
开发者ID:BlackSmith,项目名称:spacewalk,代码行数:12,代码来源:utils.py
示例11: xccdf_eval
def xccdf_eval(args, cache_only=None):
if cache_only:
return (0, 'no-ops for caching', {})
results_dir = None
if ('id' in args) and ('file_size' in args) and args['file_size'] > 0:
results_dir = tempfile.mkdtemp()
pwd = os.getcwd()
os.chdir(results_dir)
results_file = tempfile.NamedTemporaryFile(dir=results_dir)
params, oscap_err = _process_params(args['params'], results_file.name, results_dir)
oscap_err += _run_oscap(['xccdf', 'eval'] + params + [args['path']])
if results_dir:
os.chdir(pwd)
if not _assert_xml(results_file.file):
del(results_file)
_cleanup_temp(results_dir)
return (1, 'oscap tool did not produce valid xml.\n' + oscap_err, {})
ret, resume, xslt_err = _xccdf_resume(results_file.name, temp_dir=results_dir)
if ret != 0 or resume == '':
del(results_file)
_cleanup_temp(results_dir)
return (1, 'Problems with extracting resume:\n' + xslt_err, {})
try:
up_err = _upload_results(results_file, results_dir, args)
except:
# An error during the upload must not prevent scan completion
log.log_exception(*sys.exc_info())
up_err = "Upload of detailed results failed. Fatal error in Python code occurred"
del(results_file)
_cleanup_temp(results_dir)
return (0, 'openscap scan completed', {
'resume': encodestring(resume),
'errors': encodestring(bstr(oscap_err) + bstr(xslt_err) + bstr(up_err))
})
开发者ID:BlackSmith,项目名称:spacewalk,代码行数:40,代码来源:scap.py
示例12: diff_file
def diff_file(self, channel, path, local_file, revision):
r = self.repository
try:
info = r.get_raw_file_info(channel, path, revision)
if 'encoding' in info and info['file_contents']:
if info['encoding'] == 'base64':
info['file_contents'] = base64.decodestring(bstr(info['file_contents']))
else:
die(9, 'Error: unknown encoding %s' % info['encoding'])
except cfg_exceptions.RepositoryFileMissingError:
die(2, "Error: no such file %s (revision %s) in config channel %s"
% (path, revision, channel))
if os.path.islink(local_file) and info['filetype'] != 'symlink' :
die(8, "Cannot diff %s; the file on the system is a symbolic link while the file in the channel is not. " % local_file)
if info['filetype'] == 'symlink' and not os.path.islink(local_file) :
die(8, "Cannot diff %s; the file on the system is not a symbolic link while the file in the channel is. " % local_file)
if info['filetype'] == 'symlink':
src_link = info['symlink']
dest_link = os.readlink(local_file)
if src_link != os.readlink(local_file):
return "Symbolic links differ. Channel: '%s' -> '%s' System: '%s' -> '%s' \n " % (path,src_link, path, dest_link)
return ""
fromlines = sstr(info['file_contents']).splitlines(1)
tolines = open(local_file, 'r').readlines()
diff_output = difflib.unified_diff(fromlines, tolines, info['path'], local_file)
first_row = second_row = ''
try:
first_row = next(diff_output)
second_row = next(diff_output)
except StopIteration:
pass
file_stat = os.lstat(local_file)
local_info = r.make_stat_info(local_file, file_stat)
# rhel4 do not support selinux
if not 'selinux_ctx' in local_info:
local_info['selinux_ctx'] = ''
if 'selinux_ctx' not in info:
info['selinux_ctx'] = ''
if not first_row and not self.__attributes_differ(info, local_info):
return ""
else:
template = "--- %s\t%s\tattributes: %s %s %s %s\tconfig channel: %s\trevision: %s"
if 'modified' not in info:
info['modified'] = ''
first_row = template % (path, str(info['modified']), ostr_to_sym(info['filemode'], info['filetype']),
info['username'], info['groupname'], info['selinux_ctx'], channel,
info['revision'],
)
second_row = template % (local_file, f_date(datetime.fromtimestamp(local_info['mtime'])), ostr_to_sym(local_info['mode'], 'file'),
local_info['user'], local_info['group'], local_info['selinux_ctx'], 'local file', None
)
return first_row + '\n' + second_row + '\n' + ''.join(list(diff_output))
开发者ID:dewayneHat,项目名称:spacewalk,代码行数:52,代码来源:rhncfg_diff.py
示例13: read_to_file
def read_to_file(self, file):
"""Copies the contents of this File object into another file
object"""
fd = self._get_file()
while 1:
buf = fd.read(self.bufferSize)
if not buf:
break
if sys.version_info[0] == 3:
file.write(bstr(buf))
else:
file.write(sstr(buf))
return file
开发者ID:BlackSmith,项目名称:spacewalk,代码行数:13,代码来源:transports.py
示例14: read
def read(self, amt=None):
"""
Reads up to amt bytes from the SSL connection.
"""
self._check_closed()
# Initially, the buffer size is the default buffer size.
# Unfortunately, pending() does not return meaningful data until
# recv() is called, so we only adjust the buffer size after the
# first read
buffer_size = self._buffer_size
buffer_length = len(self._buffer)
# Read only the specified amount of data
while buffer_length < amt or amt is None:
# if amt is None (read till the end), fills in self._buffer
if amt is not None:
buffer_size = min(amt - buffer_length, buffer_size)
try:
data = self._connection.recv(buffer_size)
self._buffer = self._buffer + data
buffer_length = len(self._buffer)
# More bytes to read?
pending = self._connection.pending()
if pending == 0:
# we're done here
break
except SSL.ZeroReturnError:
# Nothing more to be read
break
except SSL.SysCallError:
e = sys.exc_info()[1]
print("SSL exception", e.args)
break
except SSL.WantWriteError:
self._poll(select.POLLOUT, 'read')
except SSL.WantReadError:
self._poll(select.POLLIN, 'read')
if amt:
ret = self._buffer[:amt]
self._buffer = self._buffer[amt:]
else:
ret = self._buffer
self._buffer = bstr("")
self._pos = self._pos + len(ret)
return ret
开发者ID:spacewalkproject,项目名称:spacewalk,代码行数:50,代码来源:SSL.py
示例15: acquire
def acquire(self):
"""acquire the lock; else raise LockfileLockedException."""
try:
fcntl.flock(self.f, fcntl.LOCK_EX|fcntl.LOCK_NB)
except IOError:
if sys.exc_info()[1].errno == EWOULDBLOCK:
raise LockfileLockedException(
"cannot acquire lock on %s." % self.lockfile, None, sys.exc_info()[2])
else:
raise
# unlock upon exit
fcntl.fcntl(self.f, fcntl.F_SETFD, 1)
# truncate and write the pid
os.ftruncate(self.f, 0)
os.write(self.f, bstr(str(self.pid) + '\n'))
开发者ID:Bearlock,项目名称:spacewalk,代码行数:16,代码来源:rhnLockfile.py
示例16: readline
def readline(self, length=None):
"""
Reads a single line (up to `length' characters long) from the SSL
connection.
"""
self._check_closed()
while True:
# charcount contains the number of chars to be outputted (or None
# if none to be outputted at this time)
charcount = None
i = self._buffer.find(bstr('\n'))
if i >= 0:
# Go one char past newline
charcount = i + 1
elif length and len(self._buffer) >= length:
charcount = length
if charcount is not None:
ret = self._buffer[:charcount]
self._buffer = self._buffer[charcount:]
self._pos = self._pos + len(ret)
return ret
# Determine the number of chars to be read next
bufsize = self._buffer_size
if length:
# we know length > len(self._buffer)
bufsize = min(self._buffer_size, length - len(self._buffer))
try:
data = self._connection.recv(bufsize)
self._buffer = self._buffer + data
except SSL.ZeroReturnError:
# Nothing more to be read
break
except SSL.WantWriteError:
self._poll(select.POLLOUT, 'readline')
except SSL.WantReadError:
self._poll(select.POLLIN, 'readline')
# We got here if we're done reading, so return everything
ret = self._buffer
self._buffer = ""
self._pos = self._pos + len(ret)
return ret
开发者ID:spacewalkproject,项目名称:spacewalk,代码行数:45,代码来源:SSL.py
示例17: log_debug
def log_debug(self, debug_level, *args):
if debug_level <= self.debug_level:
info_out = (
time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())),
self.get_caller(),
" ".join([str(x) for x in args])
)
outstring = "%s %s: %s\n" % info_out
sys.stdout.write( outstring )
if not Logger.logfile is None:
try:
fd = os.open(Logger.logfile, os.O_APPEND | os.O_RDWR | os.O_CREAT, int("0600", 8))
os.write(fd, bstr(outstring))
os.close(fd)
except IOError:
raise
开发者ID:dewayneHat,项目名称:spacewalk,代码行数:18,代码来源:rhn_log.py
示例18: run
def run(self):
log_debug(2)
r = self.repository
files = r.list_files()
if not files:
die(1, "No managed files.")
label = "Config Channel"
maxlen = max([len(s[0]) for s in files])
maxlen = max(maxlen, len(label)) + 2
print("%-10s %8s %-8s %10s %+3s %*s %s" % ('Mode', 'Owner', 'Group', 'Size', 'Rev', maxlen, label, "File"))
arg_files = []
if len(sys.argv) > 2:
arg_files = sys.argv[2:len(sys.argv)]
for file in files:
if len(arg_files) and not file[1] in arg_files:
continue
# Get the file info
finfo = r.get_file_info(file[1])[1]
# Get the file length
if finfo['encoding'] == 'base64':
fsize = len(base64.decodestring(bstr(finfo['file_contents'])))
else:
# * indicates raw 'unencoded' size
fsize = '*' + str(len(finfo['file_contents']))
if finfo['filetype'] == 'symlink':
permstr = ostr_to_sym('777', finfo['filetype'])
dest = "%s -> %s" % (file[1], finfo['symlink'])
fsize = str(len(finfo['symlink']))
finfo['username'] = 'root'
finfo['groupname'] = 'root'
else:
permstr = ostr_to_sym(finfo['filemode'], finfo['filetype']) or ''
dest = file[1]
print("%10s %8s %-8s %10s %+3s %*s %s" % (permstr, finfo['username'], finfo['groupname'], fsize, finfo['revision'], maxlen, file[0], dest))
开发者ID:BlackSmith,项目名称:spacewalk,代码行数:41,代码来源:rhncfgcli_elist.py
示例19: send_http
def send_http(self, host, handler="/RPC2"):
if not self.__processed:
raise NotProcessed
self._host = host
if self._connection is None:
raise Exception("No connection object found")
self._connection.connect()
# wrap self data into binary object, otherwise HTTPConnection.request
# will encode it as ISO-8859-1 https://docs.python.org/3/library/http.client.html#httpconnection-objects
self._connection.request(self.method, handler, body=bstr(self.data), headers=self.headers)
response = self._connection.getresponse()
if not self.response_acceptable(response):
raise xmlrpclib.ProtocolError(
"%s %s" % (self._host, handler), response.status, response.reason, response.msg
)
# A response object has read() and close() methods, so we can safely
# pass the whole object back
return response.msg, response
开发者ID:shastah,项目名称:spacewalk,代码行数:23,代码来源:transports.py
示例20: genPublicCaCert
def genPublicCaCert(password, d, verbosity=0, forceYN=0):
""" public CA certificate (client-side) generation """
ca_key = os.path.join(d['--dir'], os.path.basename(d['--ca-key']))
ca_cert_name = os.path.basename(d['--ca-cert'])
ca_cert = os.path.join(d['--dir'], ca_cert_name)
ca_openssl_cnf = os.path.join(d['--dir'], CA_OPENSSL_CNF_NAME)
genPublicCaCert_dependencies(password, d, forceYN)
configFile = ConfigFile(ca_openssl_cnf)
if '--set-hostname' in d:
del d['--set-hostname']
configFile.save(d, caYN=1, verbosity=verbosity)
args = ("/usr/bin/openssl req -passin pass:%s -text -config %s "
"-new -x509 -days %s -%s -key %s -out %s"
% ('%s', repr(cleanupAbsPath(configFile.filename)),
repr(d['--cert-expiration']),
MD, repr(cleanupAbsPath(ca_key)),
repr(cleanupAbsPath(ca_cert))))
if verbosity >= 0:
print("\nGenerating public CA certificate: %s" % ca_cert)
print("Using distinguishing variables:")
for k in ('--set-country', '--set-state', '--set-city', '--set-org',
'--set-org-unit', '--set-common-name', '--set-email'):
print(' %s%s = "%s"' % (k, ' '*(18-len(k)), d[k]))
if verbosity > 1:
print("Commandline:", args % "PASSWORD")
try:
rotated = rotateFile(filepath=ca_cert, verbosity=verbosity)
if verbosity>=0 and rotated:
print("Rotated: %s --> %s" \
% (d['--ca-cert'], os.path.basename(rotated)))
except ValueError:
pass
cwd = chdir(_getWorkDir())
try:
ret, out_stream, err_stream = rhn_popen(args % repr(password))
finally:
chdir(cwd)
out = out_stream.read(); out_stream.close()
err = err_stream.read(); err_stream.close()
if ret:
raise GenPublicCaCertException("Certificate Authority public "
"SSL certificate generation failed:\n%s\n"
"%s" % (out, err))
if verbosity > 2:
if out:
print("STDOUT:", out)
if err:
print("STDERR:", err)
latest_txt = os.path.join(d['--dir'], 'latest.txt')
fo = open(latest_txt, 'wb')
fo.write(bstr('%s\n' % ca_cert_name))
fo.close()
# permissions:
os.chmod(ca_cert, int('0644',8))
os.chmod(latest_txt, int('0644',8))
开发者ID:mcalmer,项目名称:spacewalk,代码行数:65,代码来源:rhn_ssl_tool.py
注:本文中的rhn.i18n.bstr函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论