• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python stringutils.to_string函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中spacewalk.common.stringutils.to_string函数的典型用法代码示例。如果您正苦于以下问题:Python to_string函数的具体用法?Python to_string怎么用?Python to_string使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了to_string函数的14个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: print_locals

def print_locals(fd=sys.stderr, tb=None):
    """ Dump a listing of all local variables and their value for better debugging
        chance.
    """
    if tb is None:
        tb = sys.exc_info()[2]
    stack = []
    # walk the traceback to the end
    while 1:
        if not tb.tb_next:
            break
        tb = tb.tb_next
    # and now start extracting the stack frames
    f = tb.tb_frame
    while f:
        stack.append(f)
        f = f.f_back
    fd.write("\nLocal variables by frame\n")
    for frame in stack:
        fd.write("Frame %s in %s at line %s\n" % (frame.f_code.co_name, frame.f_code.co_filename, frame.f_lineno))
        for key, value in frame.f_locals.items():
            fd.write("\t%20s = " % to_string(key))
            # We have to be careful not to cause a new error in our error
            # printer! Calling str() on an unknown object could cause an
            # error we don't want.
            # pylint: disable=W0702
            try:
                s = str(to_string(value))
            except:
                s = "<ERROR WHILE PRINTING VALUE>"
            if len(s) > 100 * 1024:
                s = "<ERROR WHILE PRINTING VALUE: string representation too large>"
            fd.write("%s %s\n" % (type(value), s))
        fd.write("\n")
开发者ID:shastah,项目名称:spacewalk,代码行数:34,代码来源:rhnTB.py


示例2: print_env

def print_env(fd=sys.stderr):
    """ Dump the environment. """
    dct = os.environ
    fd.write("\nEnvironment for PID=%d on exception:\n" % os.getpid())
    el = list(dct.keys())
    el.sort()
    for k in el:
        fd.write("%s = %s\n" % (to_string(k), to_string(dct[k])))
开发者ID:dewayneHat,项目名称:spacewalk,代码行数:8,代码来源:rhnTB.py


示例3: send

def send(headers, body, sender=None):
    (headers, toaddrs) = __check_headers(headers)
    if sender is None:
        sender = headers["From"]
    joined_headers = u''
    for h in headers.keys():
        joined_headers += u"%s: %s\n" % (h, headers[h])

    server = smtplib.SMTP('localhost')
    msg = "%s\n%s\n" % (to_string(joined_headers), to_string(body))
    server.sendmail(sender, toaddrs, msg)
    server.quit()
开发者ID:dewayneHat,项目名称:spacewalk,代码行数:12,代码来源:rhnMail.py


示例4: print_req

def print_req(req, fd=sys.stderr):
    """ get some debugging information about the current exception for sending
        out when we raise an exception
    """

    fd.write("Request object information:\n")
    fd.write("URI: %s\n" % req.unparsed_uri)
    fd.write("Remote Host: %s\nServer Name: %s:%d\n" % (
        req.get_remote_host(), req.server.server_hostname, req.server.port))
    fd.write("Headers passed in:\n")
    kl = list(req.headers_in.keys())
    kl.sort()
    for k in kl:
        fd.write("\t%s: %s\n" % (to_string(k), to_string(req.headers_in[k])))
    return 0
开发者ID:dewayneHat,项目名称:spacewalk,代码行数:15,代码来源:rhnTB.py


示例5: _execute_

    def _execute_(self, args, kwargs):
        """
        Oracle specific execution of the query.
        """
        # TODO: args appears unused, raise exception if we see any?

        # Only copy the arguments we're interested in
        _p = UserDictCase(kwargs)
        params = {}

        # Check that all required parameters were provided:
        # NOTE: bindnames() is Oracle specific:
        for k in self._real_cursor.bindnames():
            if not _p.has_key(k):
                # Raise the fault ourselves
                raise sql_base.SQLError(1008, 'Not all variables bound', k)
            params[k] = to_string(_p[k])

        # cx_Oracle expects the first arg to be the statement and no
        # positional args:
        try:
            self._real_cursor.execute(*(None, ), **params)
        except cx_Oracle.OperationalError:
            e = sys.exc_info()[1]
            raise sql_base.SQLError("Cannot execute SQL statement: %s" % str(e))

        self.description = self._real_cursor.description
        return self._real_cursor.rowcount
开发者ID:jdobes,项目名称:spacewalk,代码行数:28,代码来源:driver_cx_Oracle.py


示例6: populate

    def populate(self, header, size, checksum_type, checksum, path=None, org_id=None,
        header_start=None, header_end=None, channels=[]):

        # XXX is seems to me that this is the place that 'source_rpm' is getting
        # set
        for f in self.keys():
            field = f
            if self.tagMap.has_key(f):
                field = self.tagMap[f]
                if not field:
                    # Unsupported
                    continue

            # get the db field value from the header
            val = header[field]
            if f == 'build_time':
                if type(val) in (IntType, LongType):
                    # A UNIX timestamp
                    val = gmtime(val)
	    if f == 'payload_size':
                # workaround for older rpms where signed
                # attributes go negative for size > 2G
                if val < 0:
                    val = long(val) + 2 ** 32
            elif val:
                # Convert to strings
                if isinstance(val, unicode):
                    val = to_string(val)
                else:
                    val = str(val)
            elif val == []:
                val = None
            self[f] = val

        self['package_size'] = size
        self['checksum_type'] = checksum_type
        self['checksum'] = checksum
        self['checksums'] = {checksum_type:checksum}
        self['path'] = path
        self['org_id'] = org_id
        self['header_start'] = header_start
        self['header_end'] = header_end
        self['last_modified'] = localtime(time.time())
        if 'sigmd5' in self:
           if self['sigmd5']:
               self['sigchecksum_type'] = 'md5'
               self['sigchecksum'] = self['sigmd5']
           del(self['sigmd5'])

        # Fix some of the information up
        vendor = self['vendor']
        if vendor is None:
            self['vendor'] = 'Red Hat, Inc.'
        payloadFormat = self['payload_format']
        if payloadFormat is None:
            self['payload_format'] = 'cpio'
        if self['payload_size'] is None:
            self['payload_size'] = 0
        return self
开发者ID:T-D-Oe,项目名称:spacewalk,代码行数:59,代码来源:headerSource.py


示例7: feed

 def feed(self, arg):
     #sys.stderr.write("arg = %s, type = %s\n" % (arg, type(arg)))
     if type(arg) == type(()) or type(arg) == type([]):
         for s in arg:
             self.sum.update(s)
     else:
         if type(arg) == type(0):
             arg = str(arg)
         self.sum.update(to_string(arg))
开发者ID:Kilian-Petsch,项目名称:spacewalk,代码行数:9,代码来源:server_certificate.py


示例8: _executemany

    def _executemany(self, *args, **kwargs):
        # cx_Oracle expects the first arg to be the statement
        if not kwargs:
            return 0
        # Compute number of values
        max_array_size = 25
        i = kwargs.itervalues()
        firstval = i.next()
        array_size = len(firstval)
        if array_size == 0:
            return 0

        chunk_size = min(max_array_size, array_size)
        pdict = {}
        for k in kwargs.iterkeys():
            pdict[k] = None
        arr = []
        for i in xrange(chunk_size):
            arr.append(pdict.copy())

        # Now arr is an array of the desired size
        rowcount = 0
        start = 0
        while start < array_size:
            item_count = min(array_size - start, chunk_size)
            # Trim the array if it is too big
            if item_count != chunk_size:
                arr = arr[:item_count]

            for i in xrange(item_count):
                pdict = arr[i]
                for k, v in kwargs.iteritems():
                    pdict[k] = to_string(v[start + i])

            # We clear self->bindVariables so that list of all nulls
            # in the previous chunk which caused the type to be set to
            # string does not affect our chunk which may have number
            # there.
            self._real_cursor.setinputsizes(**{})

            # arr is now a list of dictionaries. Each dictionary contains the
            # data for one execution of the query where the key is the column
            # name and the value self explanatory.
            self._real_cursor.executemany(None, arr)
            self.description = self._real_cursor.description

            rowcount = rowcount + self._real_cursor.rowcount
            start = start + chunk_size

        return rowcount
开发者ID:TJM,项目名称:spacewalk,代码行数:50,代码来源:driver_cx_Oracle.py


示例9: _add_result

def _add_result(action_config_revision_id, diff):

    log_debug(4, action_config_revision_id, diff)

    if diff:
        blob_map = {'result': 'result'}
        diff = to_string(diff)
    else:
        blob_map = None
        diff = None

    h = rhnSQL.prepare(_query_add_result_diff, blob_map=blob_map)
    h.execute(action_config_revision_id=action_config_revision_id,
              result=diff)
开发者ID:T-D-Oe,项目名称:spacewalk,代码行数:14,代码来源:configfiles.py


示例10: reload

    def reload(self, text):
        """ load data from a text certificate passed on by a client """
        log_debug(4)
        text_id = string.strip(text)
        if not text_id:
            return -1
        # Now decode this certificate
        try:
            sysid, junk = xmlrpclib.loads(to_string(text_id))
        except:
            return -1
        else:
            s = sysid[0]
            del junk
        if not s.has_key("system_id") or not s.has_key("fields"):
            log_error("Got certificate with missing entries: %s" % s)
            return -1
        # check the certificate some more
        for k in s["fields"]:
            if not s.has_key(k):
                log_error("Certificate lists unknown %s as a checksum field" % k,
                          "cert data: %s" % s)
                return -1

        # clear out the state
        self.__init__()

        # at this point we know the certificate is sane enough for the
        # following processing
        for k in s.keys():
            if k == "fields":
                self.__fields = s[k]
                continue
            if k == "checksum":
                self.__checksum = s[k]
                continue
            self.attrs[k] = s[k]
        # okay, the certificate is now loaded
        return 0
开发者ID:Kilian-Petsch,项目名称:spacewalk,代码行数:39,代码来源:server_certificate.py


示例11: __init__

    def __init__(self, header, size, checksum_type, checksum, path=None, org_id=None,
            channels=[]):

        headerSource.rpmBinaryPackage.__init__(self)

        self.tagMap = headerSource.rpmBinaryPackage.tagMap.copy()

        # Remove already-mapped tags
        self._already_mapped = [
            'rpm_version', 'payload_size', 'payload_format',
            'package_group', 'build_time', 'build_host'
        ]

        for t in self._already_mapped:
            if self.tagMap.has_key(t):
                del self.tagMap[t]

        # XXX is seems to me that this is the place that 'source_rpm' is getting
        # set
        for f in self.keys():
            field = f
            if self.tagMap.has_key(f):
                field = self.tagMap[f]
                if not field:
                    # Unsupported
                    continue

            # get the db field value from the header
            val = header[field]
            if f == 'build_time':
                if val is not None and isinstance(val, IntType):
                    # A UNIX timestamp
                    val = gmtime(val)
            elif val:
                # Convert to strings
                if isinstance(val, unicode):
                    val = to_string(val)
                else:
                    val = str(val)
            elif val == []:
                val = None
            self[f] = val

        self['package_size'] = size
        self['checksum_type'] = checksum_type
        self['checksum'] = checksum
        self['path'] = path
        self['org_id'] = org_id
        self['header_start'] = None
        self['header_end'] = None
        self['last_modified'] = localtime(time.time())
        if self['sigmd5']:
            self['sigchecksum_type'] = 'md5'
            self['sigchecksum'] = self['sigmd5']
        del(self['sigmd5'])

        # Fix some of the information up
        vendor = self['vendor']
        if vendor is None:
            self['vendor'] = 'Debian'
        payloadFormat = self['payload_format']
        if payloadFormat is None:
            self['payload_format'] = 'ar'
        if self['payload_size'] is None:
            self['payload_size'] = 0

        # Populate file information
        self._populateFiles(header)
        # Populate dependency information
        self._populateDependencyInformation(header)
        # Populate changelogs
        self._populateChangeLog(header)
        # Channels
        self._populateChannels(channels)

        self['source_rpm'] = None

        group = self.get('package_group', '')
        if group == '' or group is None:
            self['package_group'] = 'NoGroup'
开发者ID:T-D-Oe,项目名称:spacewalk,代码行数:80,代码来源:debPackage.py


示例12: populate_channel_family_permissions

def populate_channel_family_permissions(cert):
    # Find channel families that we have imported
    current_cfs = _fetch_existing_channel_families()

    # Put the channel families coming from the cert into a hash
    # Add rh-public with unlimited subscriptions
    # Filter channel families that do not exist locally (this is possible with
    # channel dumps, where not all channel families have been dumped and
    # available for the satellite to import)

    # XXX hardcoding rh-public bad bad bad - but we committed to have
    # rh-public the only implied channel family. If we ever have to have a
    # different public channel family, it will have to be in the cert
    cert_chfam_hash = {}

    # Bugs 171160, 183365: We can't assume that the satellite already knows
    # about rh-public (it may not yet know about any channels).
    if current_cfs.has_key("rh-public"):
        cert_chfam_hash["rh-public"] = None

    for cf in cert.channel_families:
        if not current_cfs.has_key(cf.name):
            # Ignoring unavailable channel family at this point,
            # we'll create it at sync time.
            continue

        quant = cf.quantity
        if quant is not None:
            quant = int(quant)
        flex = cf.flex
        if flex == '':
            flex = 0
        if flex is not None:
            flex = int(flex)

        #we subtract flex from quantity since flex count is included
        #   in the full quantity for backwards compatibility
        cert_chfam_hash[cf.name] = [quant - flex, flex]

    # Generate the channel family permissions data structure
    cfps = {}
    for cfp in _fetch_channel_family_permissions():
        cf_name = cfp['channel_family']

        # org_id is the org_id which is given permission
        org_id = cfp['org_id']

        # Initially populate cf info with old limits from db
        cfps[(cf_name, org_id)] = [cfp['max_members'], cfp['max_flex']]

    # Now set max_members based on the cert's max_members
    for cf_name, max_tuple in cert_chfam_hash.items():
        # Make the channel families with null max_members public
        if max_tuple is None:
            max_tuple = [0,0]
            org_id = None
        else:
            max_members, max_flex = max_tuple
            # default the org to 1 for channel families from cert
            org_id = 1

        cf_name = to_string(cf_name)
        try:
            old_max_tuple = cfps[(cf_name, org_id)]
        except KeyError:
            # New channel family, populate the db from cert
            cfps[(cf_name, org_id)] = max_tuple
            old_max_tuple = None


    sum_max_values = compute_sum_max_members(cfps)
    for (cf_name, org_id), (max_members, max_flex) in cfps.items():
        if org_id == 1:
            if cert_chfam_hash.has_key(cf_name):
                cert_max_value = cert_chfam_hash[cf_name][0] or 0
                cert_max_flex = cert_chfam_hash[cf_name][1] or 0
            else:
                # remove entitlements on extra slots
                cfps[(cf_name, org_id)] = None
                continue
            if not max_members:
                max_members = 0
            if not max_flex:
                max_flex = 0

            (sum_max_mem, sum_max_flex) = sum_max_values[cf_name]
            if cert_max_value >= sum_max_mem:
                cfps[(cf_name, 1)][0] = max_members + \
                                  (cert_max_value - sum_max_mem)
            else:
                purge_count = sum_max_mem - cert_max_value
                cfps[(cf_name, 1)][0] = max_members - purge_count

            if cert_max_flex >= sum_max_flex:
                cfps[(cf_name, 1)][1] = max_flex +\
                                  (cert_max_flex - sum_max_flex)
            else:
                # lowering entitlements
                flex_purge_count = sum_max_flex - cert_max_flex
                cfps[(cf_name, 1)][1] = max_flex - flex_purge_count
#.........这里部分代码省略.........
开发者ID:T-D-Oe,项目名称:spacewalk,代码行数:101,代码来源:sync_handlers.py


示例13: Traceback

def Traceback(method=None, req=None, mail=1, ostream=sys.stderr,
              extra=None, severity="notification", with_locals=0):
    """ Reports an traceback error and optionally sends mail about it.
        NOTE: extra = extra text information.
    """
    # pylint: disable=C0103

    global QUIET_MAIL

    if mail:
        # safeguard
        if QUIET_MAIL is None:
            QUIET_MAIL = CFG.QUIET_MAIL

        if QUIET_MAIL < 0:
            QUIET_MAIL = 0
        if QUIET_MAIL == 0:  # make sure we don't mail
            mail = 0

    e_type = sys.exc_info()[:2][0]
    t = time.ctime(time.time())
    exc = StringIO()

    unicode_hostname = idn_puny_to_unicode(hostname)
    exc.write("Exception reported from %s\nTime: %s\n" % (to_string(unicode_hostname), t))
    exc.write("Exception type %s\n" % to_string(e_type))
    if method:
        exc.write("Exception while handling function %s\n" % to_string(method))

    # print information about the request being served
    if req:
        print_req(req, exc)
    if extra:
        exc.write("Extra information about this error:\n%s\n" % to_string(extra))

    # Print the traceback
    exc.write("\nException Handler Information\n")
    traceback.print_exc(None, exc)

    if with_locals and not mail:
        # The mail case will call print_locals by itself
        print_locals(exc)

    # we always log it somewhere
    if ostream:
        ostream.write(to_string(exc.getvalue()))
        ostream.write("\n")

    if mail:
        # print the stack frames for the mail we send out
        print_locals(exc)
        # dump the environment
        print_env(exc)
        # and send the mail
        # build the headers
        to = CFG.TRACEBACK_MAIL
        fr = to
        if isinstance(to, type([])):
            fr = to[0].strip()
            to = ', '.join([x.strip() for x in to])
        headers = {
            "Subject": "%s TRACEBACK from %s" % (PRODUCT_NAME, unicode_hostname),
            "From": "%s <%s>" % (hostname, fr),
            "To": to,
            "X-RHN-Traceback-Severity": severity,
            "Content-Type": 'text/plain; charset="utf-8"',

        }
        QUIET_MAIL = QUIET_MAIL - 1     # count it no matter what

        outstring = to_string(exc.getvalue())

        # 5/18/05 wregglej - 151158 Go through every string in the security list
        # and censor it out of the debug information.
        outstring = censor_string(outstring)

        rhnMail.send(headers, outstring)

    exc.close()
    return
开发者ID:dewayneHat,项目名称:spacewalk,代码行数:80,代码来源:rhnTB.py


示例14: __init__

    def __init__(self, header, size, checksum_type, checksum, path=None, org_id=None, channels=[]):

        headerSource.rpmBinaryPackage.__init__(self)

        self.tagMap = headerSource.rpmBinaryPackage.tagMap.copy()

        # Remove already-mapped tags
        self._already_mapped = [
            "rpm_version",
            "payload_size",
            "payload_format",
            "package_group",
            "build_time",
            "build_host",
        ]

        for t in self._already_mapped:
            if t in self.tagMap:
                del self.tagMap[t]

        # XXX is seems to me that this is the place that 'source_rpm' is getting
        # set
        for f in self.keys():
            field = f
            if f in self.tagMap:
                field = self.tagMap[f]
                if not field:
                    # Unsupported
                    continue

            # get the db field value from the header
            val = header[field]
            if f == "build_time":
                if val is not None and isinstance(val, IntType):
                    # A UNIX timestamp
                    val = gmtime(val)
            elif val:
                # Convert to strings
                if isinstance(val, UnicodeType):
                    val = to_string(val)
                else:
                    val = str(val)
            elif val == []:
                val = None
            self[f] = val

        self["package_size"] = size
        self["checksum_type"] = checksum_type
        self["checksum"] = checksum
        self["path"] = path
        self["org_id"] = org_id
        self["header_start"] = None
        self["header_end"] = None
        self["last_modified"] = localtime(time.time())
        if self["sigmd5"]:
            self["sigchecksum_type"] = "md5"
            self["sigchecksum"] = self["sigmd5"]
        del (self["sigmd5"])

        # Fix some of the information up
        vendor = self["vendor"]
        if vendor is None:
            self["vendor"] = "Debian"
        payloadFormat = self["payload_format"]
        if payloadFormat is None:
            self["payload_format"] = "ar"
        if self["payload_size"] is None:
            self["payload_size"] = 0

        # Populate file information
        self._populateFiles(header)
        # Populate dependency information
        self._populateDependencyInformation(header)
        # Populate changelogs
        self._populateChangeLog(header)
        # Channels
        self._populateChannels(channels)

        self["source_rpm"] = None

        group = self.get("package_group", "")
        if group == "" or group is None:
            self["package_group"] = "NoGroup"
开发者ID:shastah,项目名称:spacewalk,代码行数:83,代码来源:debPackage.py



注:本文中的spacewalk.common.stringutils.to_string函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python usix.raise_with_tb函数代码示例发布时间:2022-05-27
下一篇:
Python rhnTranslate._函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap