本文整理汇总了Python中spacewalk.common.stringutils.to_string函数的典型用法代码示例。如果您正苦于以下问题:Python to_string函数的具体用法?Python to_string怎么用?Python to_string使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了to_string函数的14个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: print_locals
def print_locals(fd=sys.stderr, tb=None):
""" Dump a listing of all local variables and their value for better debugging
chance.
"""
if tb is None:
tb = sys.exc_info()[2]
stack = []
# walk the traceback to the end
while 1:
if not tb.tb_next:
break
tb = tb.tb_next
# and now start extracting the stack frames
f = tb.tb_frame
while f:
stack.append(f)
f = f.f_back
fd.write("\nLocal variables by frame\n")
for frame in stack:
fd.write("Frame %s in %s at line %s\n" % (frame.f_code.co_name, frame.f_code.co_filename, frame.f_lineno))
for key, value in frame.f_locals.items():
fd.write("\t%20s = " % to_string(key))
# We have to be careful not to cause a new error in our error
# printer! Calling str() on an unknown object could cause an
# error we don't want.
# pylint: disable=W0702
try:
s = str(to_string(value))
except:
s = "<ERROR WHILE PRINTING VALUE>"
if len(s) > 100 * 1024:
s = "<ERROR WHILE PRINTING VALUE: string representation too large>"
fd.write("%s %s\n" % (type(value), s))
fd.write("\n")
开发者ID:shastah,项目名称:spacewalk,代码行数:34,代码来源:rhnTB.py
示例2: print_env
def print_env(fd=sys.stderr):
""" Dump the environment. """
dct = os.environ
fd.write("\nEnvironment for PID=%d on exception:\n" % os.getpid())
el = list(dct.keys())
el.sort()
for k in el:
fd.write("%s = %s\n" % (to_string(k), to_string(dct[k])))
开发者ID:dewayneHat,项目名称:spacewalk,代码行数:8,代码来源:rhnTB.py
示例3: send
def send(headers, body, sender=None):
(headers, toaddrs) = __check_headers(headers)
if sender is None:
sender = headers["From"]
joined_headers = u''
for h in headers.keys():
joined_headers += u"%s: %s\n" % (h, headers[h])
server = smtplib.SMTP('localhost')
msg = "%s\n%s\n" % (to_string(joined_headers), to_string(body))
server.sendmail(sender, toaddrs, msg)
server.quit()
开发者ID:dewayneHat,项目名称:spacewalk,代码行数:12,代码来源:rhnMail.py
示例4: print_req
def print_req(req, fd=sys.stderr):
""" get some debugging information about the current exception for sending
out when we raise an exception
"""
fd.write("Request object information:\n")
fd.write("URI: %s\n" % req.unparsed_uri)
fd.write("Remote Host: %s\nServer Name: %s:%d\n" % (
req.get_remote_host(), req.server.server_hostname, req.server.port))
fd.write("Headers passed in:\n")
kl = list(req.headers_in.keys())
kl.sort()
for k in kl:
fd.write("\t%s: %s\n" % (to_string(k), to_string(req.headers_in[k])))
return 0
开发者ID:dewayneHat,项目名称:spacewalk,代码行数:15,代码来源:rhnTB.py
示例5: _execute_
def _execute_(self, args, kwargs):
"""
Oracle specific execution of the query.
"""
# TODO: args appears unused, raise exception if we see any?
# Only copy the arguments we're interested in
_p = UserDictCase(kwargs)
params = {}
# Check that all required parameters were provided:
# NOTE: bindnames() is Oracle specific:
for k in self._real_cursor.bindnames():
if not _p.has_key(k):
# Raise the fault ourselves
raise sql_base.SQLError(1008, 'Not all variables bound', k)
params[k] = to_string(_p[k])
# cx_Oracle expects the first arg to be the statement and no
# positional args:
try:
self._real_cursor.execute(*(None, ), **params)
except cx_Oracle.OperationalError:
e = sys.exc_info()[1]
raise sql_base.SQLError("Cannot execute SQL statement: %s" % str(e))
self.description = self._real_cursor.description
return self._real_cursor.rowcount
开发者ID:jdobes,项目名称:spacewalk,代码行数:28,代码来源:driver_cx_Oracle.py
示例6: populate
def populate(self, header, size, checksum_type, checksum, path=None, org_id=None,
header_start=None, header_end=None, channels=[]):
# XXX is seems to me that this is the place that 'source_rpm' is getting
# set
for f in self.keys():
field = f
if self.tagMap.has_key(f):
field = self.tagMap[f]
if not field:
# Unsupported
continue
# get the db field value from the header
val = header[field]
if f == 'build_time':
if type(val) in (IntType, LongType):
# A UNIX timestamp
val = gmtime(val)
if f == 'payload_size':
# workaround for older rpms where signed
# attributes go negative for size > 2G
if val < 0:
val = long(val) + 2 ** 32
elif val:
# Convert to strings
if isinstance(val, unicode):
val = to_string(val)
else:
val = str(val)
elif val == []:
val = None
self[f] = val
self['package_size'] = size
self['checksum_type'] = checksum_type
self['checksum'] = checksum
self['checksums'] = {checksum_type:checksum}
self['path'] = path
self['org_id'] = org_id
self['header_start'] = header_start
self['header_end'] = header_end
self['last_modified'] = localtime(time.time())
if 'sigmd5' in self:
if self['sigmd5']:
self['sigchecksum_type'] = 'md5'
self['sigchecksum'] = self['sigmd5']
del(self['sigmd5'])
# Fix some of the information up
vendor = self['vendor']
if vendor is None:
self['vendor'] = 'Red Hat, Inc.'
payloadFormat = self['payload_format']
if payloadFormat is None:
self['payload_format'] = 'cpio'
if self['payload_size'] is None:
self['payload_size'] = 0
return self
开发者ID:T-D-Oe,项目名称:spacewalk,代码行数:59,代码来源:headerSource.py
示例7: feed
def feed(self, arg):
#sys.stderr.write("arg = %s, type = %s\n" % (arg, type(arg)))
if type(arg) == type(()) or type(arg) == type([]):
for s in arg:
self.sum.update(s)
else:
if type(arg) == type(0):
arg = str(arg)
self.sum.update(to_string(arg))
开发者ID:Kilian-Petsch,项目名称:spacewalk,代码行数:9,代码来源:server_certificate.py
示例8: _executemany
def _executemany(self, *args, **kwargs):
# cx_Oracle expects the first arg to be the statement
if not kwargs:
return 0
# Compute number of values
max_array_size = 25
i = kwargs.itervalues()
firstval = i.next()
array_size = len(firstval)
if array_size == 0:
return 0
chunk_size = min(max_array_size, array_size)
pdict = {}
for k in kwargs.iterkeys():
pdict[k] = None
arr = []
for i in xrange(chunk_size):
arr.append(pdict.copy())
# Now arr is an array of the desired size
rowcount = 0
start = 0
while start < array_size:
item_count = min(array_size - start, chunk_size)
# Trim the array if it is too big
if item_count != chunk_size:
arr = arr[:item_count]
for i in xrange(item_count):
pdict = arr[i]
for k, v in kwargs.iteritems():
pdict[k] = to_string(v[start + i])
# We clear self->bindVariables so that list of all nulls
# in the previous chunk which caused the type to be set to
# string does not affect our chunk which may have number
# there.
self._real_cursor.setinputsizes(**{})
# arr is now a list of dictionaries. Each dictionary contains the
# data for one execution of the query where the key is the column
# name and the value self explanatory.
self._real_cursor.executemany(None, arr)
self.description = self._real_cursor.description
rowcount = rowcount + self._real_cursor.rowcount
start = start + chunk_size
return rowcount
开发者ID:TJM,项目名称:spacewalk,代码行数:50,代码来源:driver_cx_Oracle.py
示例9: _add_result
def _add_result(action_config_revision_id, diff):
log_debug(4, action_config_revision_id, diff)
if diff:
blob_map = {'result': 'result'}
diff = to_string(diff)
else:
blob_map = None
diff = None
h = rhnSQL.prepare(_query_add_result_diff, blob_map=blob_map)
h.execute(action_config_revision_id=action_config_revision_id,
result=diff)
开发者ID:T-D-Oe,项目名称:spacewalk,代码行数:14,代码来源:configfiles.py
示例10: reload
def reload(self, text):
""" load data from a text certificate passed on by a client """
log_debug(4)
text_id = string.strip(text)
if not text_id:
return -1
# Now decode this certificate
try:
sysid, junk = xmlrpclib.loads(to_string(text_id))
except:
return -1
else:
s = sysid[0]
del junk
if not s.has_key("system_id") or not s.has_key("fields"):
log_error("Got certificate with missing entries: %s" % s)
return -1
# check the certificate some more
for k in s["fields"]:
if not s.has_key(k):
log_error("Certificate lists unknown %s as a checksum field" % k,
"cert data: %s" % s)
return -1
# clear out the state
self.__init__()
# at this point we know the certificate is sane enough for the
# following processing
for k in s.keys():
if k == "fields":
self.__fields = s[k]
continue
if k == "checksum":
self.__checksum = s[k]
continue
self.attrs[k] = s[k]
# okay, the certificate is now loaded
return 0
开发者ID:Kilian-Petsch,项目名称:spacewalk,代码行数:39,代码来源:server_certificate.py
示例11: __init__
def __init__(self, header, size, checksum_type, checksum, path=None, org_id=None,
channels=[]):
headerSource.rpmBinaryPackage.__init__(self)
self.tagMap = headerSource.rpmBinaryPackage.tagMap.copy()
# Remove already-mapped tags
self._already_mapped = [
'rpm_version', 'payload_size', 'payload_format',
'package_group', 'build_time', 'build_host'
]
for t in self._already_mapped:
if self.tagMap.has_key(t):
del self.tagMap[t]
# XXX is seems to me that this is the place that 'source_rpm' is getting
# set
for f in self.keys():
field = f
if self.tagMap.has_key(f):
field = self.tagMap[f]
if not field:
# Unsupported
continue
# get the db field value from the header
val = header[field]
if f == 'build_time':
if val is not None and isinstance(val, IntType):
# A UNIX timestamp
val = gmtime(val)
elif val:
# Convert to strings
if isinstance(val, unicode):
val = to_string(val)
else:
val = str(val)
elif val == []:
val = None
self[f] = val
self['package_size'] = size
self['checksum_type'] = checksum_type
self['checksum'] = checksum
self['path'] = path
self['org_id'] = org_id
self['header_start'] = None
self['header_end'] = None
self['last_modified'] = localtime(time.time())
if self['sigmd5']:
self['sigchecksum_type'] = 'md5'
self['sigchecksum'] = self['sigmd5']
del(self['sigmd5'])
# Fix some of the information up
vendor = self['vendor']
if vendor is None:
self['vendor'] = 'Debian'
payloadFormat = self['payload_format']
if payloadFormat is None:
self['payload_format'] = 'ar'
if self['payload_size'] is None:
self['payload_size'] = 0
# Populate file information
self._populateFiles(header)
# Populate dependency information
self._populateDependencyInformation(header)
# Populate changelogs
self._populateChangeLog(header)
# Channels
self._populateChannels(channels)
self['source_rpm'] = None
group = self.get('package_group', '')
if group == '' or group is None:
self['package_group'] = 'NoGroup'
开发者ID:T-D-Oe,项目名称:spacewalk,代码行数:80,代码来源:debPackage.py
示例12: populate_channel_family_permissions
def populate_channel_family_permissions(cert):
# Find channel families that we have imported
current_cfs = _fetch_existing_channel_families()
# Put the channel families coming from the cert into a hash
# Add rh-public with unlimited subscriptions
# Filter channel families that do not exist locally (this is possible with
# channel dumps, where not all channel families have been dumped and
# available for the satellite to import)
# XXX hardcoding rh-public bad bad bad - but we committed to have
# rh-public the only implied channel family. If we ever have to have a
# different public channel family, it will have to be in the cert
cert_chfam_hash = {}
# Bugs 171160, 183365: We can't assume that the satellite already knows
# about rh-public (it may not yet know about any channels).
if current_cfs.has_key("rh-public"):
cert_chfam_hash["rh-public"] = None
for cf in cert.channel_families:
if not current_cfs.has_key(cf.name):
# Ignoring unavailable channel family at this point,
# we'll create it at sync time.
continue
quant = cf.quantity
if quant is not None:
quant = int(quant)
flex = cf.flex
if flex == '':
flex = 0
if flex is not None:
flex = int(flex)
#we subtract flex from quantity since flex count is included
# in the full quantity for backwards compatibility
cert_chfam_hash[cf.name] = [quant - flex, flex]
# Generate the channel family permissions data structure
cfps = {}
for cfp in _fetch_channel_family_permissions():
cf_name = cfp['channel_family']
# org_id is the org_id which is given permission
org_id = cfp['org_id']
# Initially populate cf info with old limits from db
cfps[(cf_name, org_id)] = [cfp['max_members'], cfp['max_flex']]
# Now set max_members based on the cert's max_members
for cf_name, max_tuple in cert_chfam_hash.items():
# Make the channel families with null max_members public
if max_tuple is None:
max_tuple = [0,0]
org_id = None
else:
max_members, max_flex = max_tuple
# default the org to 1 for channel families from cert
org_id = 1
cf_name = to_string(cf_name)
try:
old_max_tuple = cfps[(cf_name, org_id)]
except KeyError:
# New channel family, populate the db from cert
cfps[(cf_name, org_id)] = max_tuple
old_max_tuple = None
sum_max_values = compute_sum_max_members(cfps)
for (cf_name, org_id), (max_members, max_flex) in cfps.items():
if org_id == 1:
if cert_chfam_hash.has_key(cf_name):
cert_max_value = cert_chfam_hash[cf_name][0] or 0
cert_max_flex = cert_chfam_hash[cf_name][1] or 0
else:
# remove entitlements on extra slots
cfps[(cf_name, org_id)] = None
continue
if not max_members:
max_members = 0
if not max_flex:
max_flex = 0
(sum_max_mem, sum_max_flex) = sum_max_values[cf_name]
if cert_max_value >= sum_max_mem:
cfps[(cf_name, 1)][0] = max_members + \
(cert_max_value - sum_max_mem)
else:
purge_count = sum_max_mem - cert_max_value
cfps[(cf_name, 1)][0] = max_members - purge_count
if cert_max_flex >= sum_max_flex:
cfps[(cf_name, 1)][1] = max_flex +\
(cert_max_flex - sum_max_flex)
else:
# lowering entitlements
flex_purge_count = sum_max_flex - cert_max_flex
cfps[(cf_name, 1)][1] = max_flex - flex_purge_count
#.........这里部分代码省略.........
开发者ID:T-D-Oe,项目名称:spacewalk,代码行数:101,代码来源:sync_handlers.py
示例13: Traceback
def Traceback(method=None, req=None, mail=1, ostream=sys.stderr,
extra=None, severity="notification", with_locals=0):
""" Reports an traceback error and optionally sends mail about it.
NOTE: extra = extra text information.
"""
# pylint: disable=C0103
global QUIET_MAIL
if mail:
# safeguard
if QUIET_MAIL is None:
QUIET_MAIL = CFG.QUIET_MAIL
if QUIET_MAIL < 0:
QUIET_MAIL = 0
if QUIET_MAIL == 0: # make sure we don't mail
mail = 0
e_type = sys.exc_info()[:2][0]
t = time.ctime(time.time())
exc = StringIO()
unicode_hostname = idn_puny_to_unicode(hostname)
exc.write("Exception reported from %s\nTime: %s\n" % (to_string(unicode_hostname), t))
exc.write("Exception type %s\n" % to_string(e_type))
if method:
exc.write("Exception while handling function %s\n" % to_string(method))
# print information about the request being served
if req:
print_req(req, exc)
if extra:
exc.write("Extra information about this error:\n%s\n" % to_string(extra))
# Print the traceback
exc.write("\nException Handler Information\n")
traceback.print_exc(None, exc)
if with_locals and not mail:
# The mail case will call print_locals by itself
print_locals(exc)
# we always log it somewhere
if ostream:
ostream.write(to_string(exc.getvalue()))
ostream.write("\n")
if mail:
# print the stack frames for the mail we send out
print_locals(exc)
# dump the environment
print_env(exc)
# and send the mail
# build the headers
to = CFG.TRACEBACK_MAIL
fr = to
if isinstance(to, type([])):
fr = to[0].strip()
to = ', '.join([x.strip() for x in to])
headers = {
"Subject": "%s TRACEBACK from %s" % (PRODUCT_NAME, unicode_hostname),
"From": "%s <%s>" % (hostname, fr),
"To": to,
"X-RHN-Traceback-Severity": severity,
"Content-Type": 'text/plain; charset="utf-8"',
}
QUIET_MAIL = QUIET_MAIL - 1 # count it no matter what
outstring = to_string(exc.getvalue())
# 5/18/05 wregglej - 151158 Go through every string in the security list
# and censor it out of the debug information.
outstring = censor_string(outstring)
rhnMail.send(headers, outstring)
exc.close()
return
开发者ID:dewayneHat,项目名称:spacewalk,代码行数:80,代码来源:rhnTB.py
示例14: __init__
def __init__(self, header, size, checksum_type, checksum, path=None, org_id=None, channels=[]):
headerSource.rpmBinaryPackage.__init__(self)
self.tagMap = headerSource.rpmBinaryPackage.tagMap.copy()
# Remove already-mapped tags
self._already_mapped = [
"rpm_version",
"payload_size",
"payload_format",
"package_group",
"build_time",
"build_host",
]
for t in self._already_mapped:
if t in self.tagMap:
del self.tagMap[t]
# XXX is seems to me that this is the place that 'source_rpm' is getting
# set
for f in self.keys():
field = f
if f in self.tagMap:
field = self.tagMap[f]
if not field:
# Unsupported
continue
# get the db field value from the header
val = header[field]
if f == "build_time":
if val is not None and isinstance(val, IntType):
# A UNIX timestamp
val = gmtime(val)
elif val:
# Convert to strings
if isinstance(val, UnicodeType):
val = to_string(val)
else:
val = str(val)
elif val == []:
val = None
self[f] = val
self["package_size"] = size
self["checksum_type"] = checksum_type
self["checksum"] = checksum
self["path"] = path
self["org_id"] = org_id
self["header_start"] = None
self["header_end"] = None
self["last_modified"] = localtime(time.time())
if self["sigmd5"]:
self["sigchecksum_type"] = "md5"
self["sigchecksum"] = self["sigmd5"]
del (self["sigmd5"])
# Fix some of the information up
vendor = self["vendor"]
if vendor is None:
self["vendor"] = "Debian"
payloadFormat = self["payload_format"]
if payloadFormat is None:
self["payload_format"] = "ar"
if self["payload_size"] is None:
self["payload_size"] = 0
# Populate file information
self._populateFiles(header)
# Populate dependency information
self._populateDependencyInformation(header)
# Populate changelogs
self._populateChangeLog(header)
# Channels
self._populateChannels(channels)
self["source_rpm"] = None
group = self.get("package_group", "")
if group == "" or group is None:
self["package_group"] = "NoGroup"
开发者ID:shastah,项目名称:spacewalk,代码行数:83,代码来源:debPackage.py
注:本文中的spacewalk.common.stringutils.to_string函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论