• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python utils.utf8函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中utils.utf8函数的典型用法代码示例。如果您正苦于以下问题:Python utf8函数的具体用法?Python utf8怎么用?Python utf8使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了utf8函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _extract_html_

    def _extract_html_(rule, doc, url_list, result):
        tree = doc.getroottree()
        if rule.has_key('xpath'):
            node_list = doc.xpath(rule['xpath'])
            if rule.has_key('children'):
                for node in node_list: 
                    item = {}

                    for child in rule['children']:
                        _extract_html_(child, node, url_list, item)

                    if item:
                        if rule.has_key('key'):
                            if not result.has_key(rule['key']):
                                result[rule['key']] = []
                            result[rule['key']].append(item)
                        else:
                            result.update(item)
            else:
                for node in node_list:
                    node = node.strip()
                    if rule.has_key('type') and  rule['type'] == 'url':
                        url_list.append(
                                utils.utf8(
                                    consume_absolute_url(
                                        url, 
                                        consume_urlencode(
                                            node, tree.docinfo.encoding
                                        )
                                    )
                                )
                            )
                    if rule.has_key('key'):
                        result[rule['key']] = utils.utf8(node)
开发者ID:chungongyu,项目名称:xfcrawl,代码行数:34,代码来源:extractor.py


示例2: decrypt_data

        def decrypt_data():
            aes_cipher = AESCipher(client.secret_key)
            encrypted_uri = self.handler.request.headers.get('X-Api-Encrypted-Uri')
            if encrypted_uri:
                request.uri = aes_cipher.decrypt(utf8(encrypted_uri))
                logger.debug('decrypted uri %s' % request.uri)
                # 因为修改了 uri,需要重新生成 query_arguments
                request.path, sep, request.query = request.uri.partition('?')
                request.arguments = parse_qs_bytes(request.query, keep_blank_values=True)
                request.query_arguments = copy.deepcopy(request.arguments)

            encrypted_headers = self.handler.request.headers.get('X-Api-Encrypted-Headers')

            if encrypted_headers:
                headers_str = aes_cipher.decrypt(utf8(encrypted_headers))
                headers = dict(json_decode(headers_str))
                # logger.debug('raw headers %s' % request.headers)
                for k, v in iteritems(headers):
                    # 要全部使用 text_type,否则会出现有的为 str,有的为 unicode
                    # 导致422错误
                    request.headers[text_type(k)] = text_type(v)

                # logger.debug('decrypted headers %s' % request.headers)

            if request.body and len(request.body) > 0:
                logger.debug('解密 body')
                logger.debug(request.body)
                request.body = aes_cipher.decrypt(utf8(request.body))
                # 因为修改了 body,需要重新 _parse_body
                request._parse_body()
开发者ID:nicky1108,项目名称:api-gateway,代码行数:30,代码来源:encrypt.py


示例3: _response_string_to_sign

 def _response_string_to_sign(self, response_headers, request, response_body):
     """
     Return the canonical StringToSign as well as a dict
     containing the original version of all headers that
     were included in the StringToSign.
     """
     headers_to_sign = self._response_headers_to_sign(response_headers)
     canonical_headers = self._canonical_headers(headers_to_sign)
     string_to_sign = b'\n'.join([utf8(request.method.upper()),
                                  utf8(self.client.raw_uri),
                                  utf8(canonical_headers),
                                  utf8(response_body)])
     return string_to_sign
开发者ID:eaglewei,项目名称:api-gateway,代码行数:13,代码来源:auth.py


示例4: response_string_to_sign

 def response_string_to_sign(self, response):
     """
     Return the canonical StringToSign as well as a dict
     containing the original version of all headers that
     were included in the StringToSign.
     """
     headers_to_sign = self.response_headers_to_sign(response.headers)
     canonical_headers = self.canonical_headers(headers_to_sign)
     string_to_sign = b'\n'.join([utf8(self.request_data.method.upper()),
                                  utf8(self.request_data.uri),
                                  utf8(canonical_headers),
                                  utf8(response.content)])
     return string_to_sign
开发者ID:eaglewei,项目名称:api-gateway,代码行数:13,代码来源:api_client.py


示例5: h_line

 def h_line(self, i):
     out = []
     for x in i[THING]:
         if isinstance(x, (unicode, str)):
             out.append(utf8(x))
         elif x[WHAT] == 'itpl':
             o = self.h(x[NAME])
             if x[FILTER]:
                 o = self.filter(o)
             else: 
                 o = (o is not None and utf8(o)) or ""
             out.append(o)
         else:
             raise WTF, x
     return ''.join(out)
开发者ID:1ngmar,项目名称:ianki,代码行数:15,代码来源:template.py


示例6: signature_request

 def signature_request(self):
     string_to_sign = self.string_to_sign()
     # 如果不是 unicode 输出会引发异常
     # logger.debug('string_to_sign: %s' % string_to_sign.decode('utf-8'))
     hash_value = sha1(utf8(string_to_sign)).hexdigest()
     signature = self.sign_string(hash_value)
     return signature
开发者ID:nicky1108,项目名称:api-gateway,代码行数:7,代码来源:api_client.py


示例7: auth_request

    def auth_request(self, request):
        try:
            timestamp = int(request.headers.get('X-Api-Timestamp'))
        except ValueError:
            raise AuthRequestException('Invalid X-Api-Timestamp Header')

        now_ts = int(time.time())
        if abs(timestamp - now_ts) > settings.SIGNATURE_EXPIRE_SECONDS:
            logger.debug('Expired signature, timestamp: %s' % timestamp)
            raise AuthRequestException('Expired Signature')

        signature = request.headers.get('X-Api-Signature')
        if signature:
            del request.headers['X-Api-Signature']
        else:
            logger.debug('No Signature Provided')
            raise AuthRequestException('No Signature Provided')

        string_to_sign = self._request_string_to_sign(request)
        # 如果不是 unicode 输出会引发异常
        # logger.debug('string_to_sign: %s' % string_to_sign.decode('utf-8'))
        hash_value = sha256(utf8(string_to_sign)).hexdigest()
        real_signature = self.sign_string(hash_value)
        if signature != real_signature:
            logger.debug('Signature not match: %s, %s' % (signature, real_signature))
            raise AuthRequestException('Invalid Signature')
开发者ID:eaglewei,项目名称:api-gateway,代码行数:26,代码来源:auth.py


示例8: check_response

    def check_response(self, response):
        logger.debug(response.headers)
        try:
            timestamp = int(response.headers.get('X-Api-Timestamp'))
        except ValueError:
            logger.debug('Invalid X-Api-Timestamp Header')
            return False

        now_ts = int(time.time())
        if abs(timestamp - now_ts) > self.signature_expire_seconds:
            logger.debug('Expired signature, timestamp: %s' % timestamp)
            logger.debug('Expired Signature')
            return False

        signature = response.headers.get('X-Api-Signature')
        if signature:
            del response.headers['X-Api-Signature']
        else:
            logger.debug('No signature provide')
            return False

        string_to_sign = self.response_string_to_sign(response)
        logger.debug(string_to_sign)
        # 如果不是 unicode 输出会引发异常
        # logger.debug('string_to_sign: %s' % string_to_sign.decode('utf-8'))
        hash_value = sha256(utf8(string_to_sign)).hexdigest()
        real_signature = self.sign_string(hash_value)
        if signature != real_signature:
            logger.debug('Signature not match: %s, %s' % (signature, real_signature))
            return False
        else:
            return True
开发者ID:eaglewei,项目名称:api-gateway,代码行数:32,代码来源:api_client.py


示例9: post

    def post(self, uri, data=None, json=None, params=None, headers=None, **kwargs):
        url = self.prepare_request('POST', uri, params=params,
                                   data=data, json=json, headers=headers)

        if self.encrypt_type == 'aes':
            url = self.encrypt_data()
        self.request_data.headers.update(self.get_auth_headers())
        logger.debug(self.request_data.headers)
        signature = self.signature_request()
        self.request_data.headers['X-Api-Signature'] = signature
        r = requests.post(url, headers=self.request_data.headers,
                          data=utf8(self.request_data.body), **kwargs)

        logger.debug(url)
        logger.debug(self.request_data.headers)

        if r.status_code != GATEWAY_ERROR_STATUS_CODE:
            is_valid = self.check_response(r)
            if not is_valid:
                logger.debug('返回结果签名不正确')

        r_encrypt_type = r.headers.get('x-api-encrypt-type', 'raw')
        if r_encrypt_type == 'aes':
            r._content = self.decrypt_data(r.content)

        return r
开发者ID:eaglewei,项目名称:api-gateway,代码行数:26,代码来源:api_client.py


示例10: render

def render(template, params):
    jinja = Environment(
        loader=FileSystemLoader([os.path.join(ROOT, '_mobi')]),
        autoescape=False,
    )
    jinja.filters.update({'xmldatetime': xmldatetime})
    tpl = jinja.get_template(template)
    return utf8(tpl.render(params))
开发者ID:daqing15,项目名称:ebook,代码行数:8,代码来源:mobi.py


示例11: header

def header(hdr, value, unique=False):
    """
    Adds the header `hdr: value` with the response.
    
    If `unique` is True and a header with that name already exists,
    it doesn't add a new one. 
    """
    hdr, value = utf8(hdr), utf8(value)
    # protection against HTTP response splitting attack
    if '\n' in hdr or '\r' in hdr or '\n' in value or '\r' in value:
        raise ValueError, 'invalid characters in header'
        
    if unique is True:
        for h, v in ctx.headers:
            if h.lower() == hdr.lower(): return
    
    ctx.headers.append((hdr, value))
开发者ID:stucchio,项目名称:DJ-Pirate,代码行数:17,代码来源:webapi.py


示例12: signature_response

 def signature_response(self, response_header, request, response_body):
     string_to_sign = self._response_string_to_sign(
         response_header, request, response_body)
     # logger.debug(string_to_sign.decode('utf-8'))
     # 如果不是 unicode 输出会引发异常
     # logger.debug('string_to_sign: %s' % string_to_sign.decode('utf-8'))
     hash_value = sha256(utf8(string_to_sign)).hexdigest()
     signature = self.sign_string(hash_value)
     return signature
开发者ID:eaglewei,项目名称:api-gateway,代码行数:9,代码来源:auth.py


示例13: instance_url

    def instance_url(self, uid=None):
        if not uid and not self.get('uid'):
            raise ValueError(
                'Could not determine which URL to request: %s instance '
                'has invalid ID: %r' % (type(self).__name__, uid), 'id')

        uid = utils.utf8(self.get('uid', uid))
        extn = urllib.quote_plus(uid)
        return "%s%s/" % (self._class_url(), extn)
开发者ID:benny0924,项目名称:sense-python-client,代码行数:9,代码来源:resources.py


示例14: encrypt_data

 def encrypt_data(body):
     # 如果请求的使用 AES 加密,则加密返回的数据
     logger.debug('使用 AES 加密 body')
     aes_cipher = AESCipher(client.secret_key)
     body = aes_cipher.encrypt(utf8(body))
     # 更新为加密后的数据
     self.handler.clear_write_buffer()
     self.handler.write(body)
     self.handler.set_header('X-Api-Encrypt-Type', 'aes')
开发者ID:nicky1108,项目名称:api-gateway,代码行数:9,代码来源:encrypt.py


示例15: urlencode

def urlencode(query):
    """
    Same as urllib.urlencode, but supports unicode strings.
    
        >>> urlencode({'text':'foo bar'})
        'text=foo+bar'
    """
    query = dict([(k, utils.utf8(v)) for k, v in query.items()])
    return urllib.urlencode(query)
开发者ID:oopos,项目名称:opspy,代码行数:9,代码来源:http.py


示例16: consume_filter

def consume_filter(options, **parms):
    def _filter_js(options, links):
        urls = []

        for url in links:
            if not url.startswith('javascript:'):
                urls.append(url)

        return urls

    def _filter_site(options, links):
        urls = []

        def _get_tld(url):
            try: return utils.utf8(tld.get_tld(url))
            except: return None

        redisdb = options['redis']

        links = map(lambda x: (x, _get_tld(x)), links)
        sites = set(filter(
                        lambda x: x is not None, 
                        map(lambda x: x[1], links)
                    )
                )
        site_states = redisdb.smembers('sites_allowed') & sites

        for url, site in links:
            if site in site_states:
                urls.append(url)

        return urls
    def _filter_state(options, links):
        urls = []

        redisdb = options['redis']
        now = time.time()

        url_states = redisdb.mget(links) if len(links) > 0 else []
        with redisdb.pipeline() as pipe:
            for i, url in enumerate(links):
                if url_states[i] is None:
                    urls.append(url)
                    pipe.setex(url, json.dumps({'dispatch_time': now}), options.get('urlstate.expire', 24*3600))
            pipe.execute()
        return urls

    if consume_isvalid(**parms):
        links = map(lambda x: string.strip(utils.utf8(x)), parms['data']['links'])

        for _filter in [_filter_js, _filter_state]:
            links = list(_filter(options, links))
        
        return links

    return []
开发者ID:chungongyu,项目名称:xfcrawl,代码行数:56,代码来源:dispatcher.py


示例17: encrypt_data

    def encrypt_data(self):
        aes_cipher = AESCipher(self.secret_key)
        headers_str = json_util.dumps(self.request_data.headers)
        # 加密 Headers 和 url
        self.request_data.headers = {
            'Content-Type': 'application/octet-stream',
            'X-Api-Encrypted-Headers': aes_cipher.encrypt(utf8(headers_str)),
            'X-Api-Encrypted-Uri': aes_cipher.encrypt(utf8(self.request_data.uri))
        }
        self.request_data.uri = '/?_t=%d&_nonce=%s' % \
                                (int(time.time()), text_type(random.random()))

        # 设置一个新的 url
        url = self.api_server.strip() + self.request_data.uri

        if self.request_data.body is not None and len(self.request_data.body) > 0:
            self.request_data.body = aes_cipher.encrypt(utf8(self.request_data.body))
            logger.debug(self.request_data.body)
        return url
开发者ID:eaglewei,项目名称:api-gateway,代码行数:19,代码来源:api_client.py


示例18: wsgifunc

    def wsgifunc(self, *middleware, **kw):
        """Returns a WSGI-compatible function for this application."""

        def peep(iterator):
            """Peeps into an iterator by doing an iteration
            and returns an equivalent iterator.
            """
            # wsgi requires the headers first
            # so we need to do an iteration
            # and save the result for later
            try:
                firstchunk = iterator.next()
            except StopIteration:
                firstchunk = ""

            return itertools.chain([firstchunk], iterator)

        def is_generator(x):
            return x and hasattr(x, "next")

        def wsgi(env, start_resp):
            self.load(env)

            try:
                # allow uppercase methods only
                if web.ctx.method.upper() != web.ctx.method:
                    raise web.nomethod()

                result = self.handle_with_processors()
            except web.HTTPError, e:
                result = e.data

            if is_generator(result):
                result = peep(result)
            else:
                result = [utils.utf8(result)]

            status, headers = web.ctx.status, web.ctx.headers
            start_resp(status, headers)

            # @@@
            # Since the CherryPy Webserver uses thread pool, the thread-local state is never cleared.
            # This interferes with the other requests.
            # clearing the thread-local storage to avoid that.
            # see utils.ThreadedDict for details
            import threading

            t = threading.currentThread()
            if kw.get("cleanup_threadlocal", True) and hasattr(t, "_d"):
                del t._d

            return result
开发者ID:mzkmzk,项目名称:jit,代码行数:52,代码来源:application.py


示例19: serialize

    def serialize(self, request, content_type, default_serializers=None):
        """Serializes the wrapped object.

        Utility method for serializing the wrapped object.  Returns a
        webob.Response object.
        """

        if self.serializer:
            serializer = self.serializer
        else:
            _mtype, _serializer = self.get_serializer(content_type,
                                                      default_serializers)
            serializer = _serializer()

        response = webob.Response()
        response.status_int = self.code
        for hdr, value in self._headers.items():
            response.headers[hdr] = utils.utf8(str(value))
        response.headers['Content-Type'] = utils.utf8(content_type)
        if self.obj is not None:
            response.body = serializer.serialize(self.obj)

        return response
开发者ID:back1860,项目名称:cloudemulator,代码行数:23,代码来源:wsgi.py


示例20: set_cookie

 def set_cookie(self, name, value, domain=None, expires=None, path="/",
                  expires_days=None):
     """Sets the given cookie name/value with the given options."""
     name = utils.utf8(name)
     value = utils.utf8(value)
     if re.search(r"[\x00-\x20]", name + value):
         # Don't let us accidentally inject bad stuff
         raise ValueError("Invalid cookie %r: %r" % (name, value))
     cookie = '%s=%s' % (name, value)
     buf = [cookie]
     if domain:
         buf.append('domain=%s' % domain)
     if expires_days is not None and not expires:
         expires = datetime.datetime.utcnow() + datetime.timedelta(
             days=expires_days)
     if expires:
         timestamp = calendar.timegm(expires.utctimetuple())
         expires = email.utils.formatdate(
             timestamp, localtime=False, usegmt=True)
         buf.append('expires=%s' % expires)
     if path:
         buf.append('path=%s' % path)
     self.response.headers.add_header('Set-Cookie', '; '.join(buf))
开发者ID:mccutchen,项目名称:tweetlocker,代码行数:23,代码来源:handlers.py



注:本文中的utils.utf8函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python utils.utils函数代码示例发布时间:2022-05-26
下一篇:
Python utils.user_grant_permission函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap