• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python escape.utf8函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tornado.escape.utf8函数的典型用法代码示例。如果您正苦于以下问题:Python utf8函数的具体用法?Python utf8怎么用?Python utf8使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了utf8函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _load_static_table

def _load_static_table():
    """Parses the hpack static table, which was copied from
    http://http2.github.io/http2-spec/compression.html#static.table
    corresponding to
    http://tools.ietf.org/html/draft-ietf-httpbis-header-compression-12#appendix-A
    """
    # start the table with a dummy entry 0
    table = [None]
    with open(os.path.join(os.path.dirname(__file__),
                           'hpack_static_table.txt')) as f:
        for line in f:
            if not line:
                continue
            fields = line.split('\t')
            if int(fields[0]) != len(table):
                raise ValueError("inconsistent numbering in static table")
            name = utf8(fields[1].strip())
            value = utf8(fields[2].strip()) if len(fields) > 2 else None
            table.append((name, value))
    static_keys = {}
    static_pairs = {}
    for i, pair in enumerate(table):
        if pair is None:
            continue
        if pair[0] not in static_keys:
            # For repeated keys, prefer the earlier one.
            static_keys[pair[0]] = i
        static_pairs[pair] = i
    return table, static_keys, static_pairs
开发者ID:bdarnell,项目名称:tornado_http2,代码行数:29,代码来源:hpack.py


示例2: publish

 def publish(self, channel, message, method=None):
     """
     Publish message into channel of stream.
     """
     method = method or DEFAULT_PUBLISH_METHOD
     to_publish = [utf8(channel), utf8(method), utf8(message)]
     self.pub_stream.send_multipart(to_publish)
开发者ID:ratancs,项目名称:centrifuge,代码行数:7,代码来源:pubsub.py


示例3: get_authenticated_user

    def get_authenticated_user(self, callback, http_client=None):
        """Gets the OAuth authorized user and access token on callback.

        This method should be called from the handler for your registered
        OAuth Callback URL to complete the registration process. We call
        callback with the authenticated user, which in addition to standard
        attributes like 'name' includes the 'access_key' attribute, which
        contains the OAuth access you can use to make authorized requests
        to this service on behalf of the user.

        """
        request_key = escape.utf8(self.get_argument("oauth_token"))
        oauth_verifier = self.get_argument("oauth_verifier", None)
        request_cookie = self.get_cookie("_oauth_request_token")
        if not request_cookie:
            logging.warning("Missing OAuth request token cookie")
            callback(None)
            return
        self.clear_cookie("_oauth_request_token")
        cookie_key, cookie_secret = [base64.b64decode(escape.utf8(i)) for i in request_cookie.split("|")]
        if cookie_key != request_key:
            logging.info((cookie_key, request_key, request_cookie))
            logging.warning("Request token does not match cookie")
            callback(None)
            return
        token = dict(key=cookie_key, secret=cookie_secret)
        if oauth_verifier:
            token["verifier"] = oauth_verifier
        if http_client is None:
            http_client = httpclient.AsyncHTTPClient()
        http_client.fetch(self._oauth_access_token_url(token),
                          self.async_callback(self._on_access_token, callback))
开发者ID:mathphreak,项目名称:curtains,代码行数:32,代码来源:auth.py


示例4: post

    def post(self):
        login_email = self.get_argument(app_db_model.DOC_KEY_ACCOUNT_EMAIL)
        login_password = escape.utf8(self.get_argument(app_db_model.DOC_KEY_ACCOUNT_PASSWORD))

        account = None
        try:
            account_cursor = self.db.account.find({
                app_db_model.DOC_KEY_ACCOUNT_EMAIL: login_email
            })
            while (yield account_cursor.fetch_next):
                account = account_cursor.next_object()
                break
        except Exception as e:
            logger.error(e)

        if not account:
            self.set_status(404, 'login error')
            self.finish()
            return

        hashed_password = bcrypt.hashpw(login_password, escape.utf8(account[app_db_model.DOC_KEY_ACCOUNT_PASSWORD]))
        if hashed_password:
            self.set_secure_cookie('user', str(account[app_db_model.DOC_KEY_ID]))
            self.set_current_user(
                str(account[app_db_model.DOC_KEY_ID]),
                account[app_db_model.DOC_KEY_ACCOUNT_NAME],
                account[app_db_model.DOC_KEY_ACCOUNT_EMAIL])
            self.redirect('/')
        else:
            self.set_status(404, 'login error')
            self.finish()
开发者ID:LordRoad,项目名称:little,代码行数:31,代码来源:app_framework.py


示例5: _render_parts

 def _render_parts(self, value, parts=[]):
     self._LOGGER.info('rendering parts')
     self._LOGGER.debug(value)
     if isinstance(value, str) or isinstance(value, unicode):
         #parts.append(escape.xhtml_escape(value))
         parts.append(value)
     elif isinstance(value, int):
         parts.append(str(value))
     elif isinstance(value, datetime.datetime):
         parts.append(value.strftime("%Y-%m-%dT%H:%M:%S.000Z"))
     elif isinstance(value, dict):
         for name, subvalue in value.iteritems():
             if not isinstance(subvalue, list):
                 subvalue = [subvalue]
             for subsubvalue in subvalue:
                 
                 parts.append('<' + escape.utf8(name) + '>')
                 self._render_parts(subsubvalue, parts)
                 parts.append('</' + escape.utf8(name) + '>')
     elif value == None:
         parts.append("")
     else:
         self._LOGGER.debug(parts)
         self._LOGGER.error("Unknown S3 value type %r", value)
         raise Exception("Unknown S3 value type %r", value)
开发者ID:fege,项目名称:Thesis-Project,代码行数:25,代码来源:S3Handler.py


示例6: any_to_bytes

def any_to_bytes(s):
    if isinstance(s, str):
        return utf8(s)
    elif isinstance(s, bytes):
        return s

    return utf8(str(s))
开发者ID:hhru,项目名称:frontik,代码行数:7,代码来源:util.py


示例7: get_authenticated_user

	def get_authenticated_user(self, callback, http_client=None):
		"""Gets the OAuth authorized user and access token.

		This method should be called from the handler for your
		OAuth callback URL to complete the registration process. We run the
		callback with the authenticated user dictionary.  This dictionary
		will contain an ``access_key`` which can be used to make authorized
		requests to this service on behalf of the user.  The dictionary will
		also contain other fields such as ``name``, depending on the service
		used.
		"""
		future = callback
		request_key = escape.utf8(self.get_argument("oauth_token"))
		oauth_verifier = self.get_argument("oauth_verifier", None)
		request_cookie = self.get_cookie("_oauth_request_token")
		if not request_cookie:
			future.set_exception(AuthError(
				"Missing OAuth request token cookie"))
			return
		self.clear_cookie("_oauth_request_token")
		cookie_key, cookie_secret = [base64.b64decode(escape.utf8(i)) for i in request_cookie.split("|")]
		if cookie_key != request_key:
			future.set_exception(AuthError(
				"Request token does not match cookie"))
			return
		token = dict(key=cookie_key, secret=cookie_secret)
		if oauth_verifier:
			token["verifier"] = oauth_verifier
		if http_client is None:
			http_client = self.get_auth_http_client()
		http_client.fetch(self._oauth_access_token_url(token),
						  self.async_callback(self._on_access_token, callback))
开发者ID:auscompgeek,项目名称:perfectgift,代码行数:32,代码来源:auth.py


示例8: save_blog

def save_blog(blog_url):
    print blog_url
    html = requests.get(blog_url, cookies=cookie).content
    # soup = BeautifulSoup(open("/Users/mio/Desktop/r_blog.html"), "html.parser")
    soup = BeautifulSoup(html, "html.parser")

    # 日期
    blog_date = soup.find('span', class_="blogDetail-ownerOther-date")
    blog_date = utf8(blog_date.contents[0])
    # 标题
    title = soup.find('h2', class_="blogDetail-title")
    title = utf8(title.contents[0])
    title = title.replace("/", "\\")
    print title

    # print soup
    a = soup.find_all("div", class_="blogDetail-content")
    blog_content = a[0]

    with open("{}{}.md".format(download_dir, title), "wb") as fw:
        fw.write("# {}\n".format(title))
        fw.write("> {}\n\n".format(blog_date))
        for i in blog_content:
            try:
                fw.write(str(i))
            except Exception as e:
                print e
                pass

    return get_next(soup)
开发者ID:MioYvo,项目名称:renren_blog_spider,代码行数:30,代码来源:spider.py


示例9: __call__

    def __call__(self, request):
        data = {}
        response = []
        def start_response(status, response_headers, exc_info=None):
            data["status"] = status
            data["headers"] = response_headers
            return response.append
        app_response = self.wsgi_application(
            WSGIContainer.environ(request), start_response)
        response.extend(app_response)
        body = b("").join(response)
        if hasattr(app_response, "close"):
            app_response.close()
        if not data: raise Exception("WSGI app did not call start_response")

        status_code = int(data["status"].split()[0])
        headers = data["headers"]
        header_set = set(k.lower() for (k,v) in headers)
        body = escape.utf8(body)
        if "content-length" not in header_set:
            headers.append(("Content-Length", str(len(body))))
        if "content-type" not in header_set:
            headers.append(("Content-Type", "text/html; charset=UTF-8"))
        if "server" not in header_set:
            headers.append(("Server", "TornadoServer/%s" % tornado.version))

        parts = [escape.utf8("HTTP/1.1 " + data["status"] + "\r\n")]
        for key, value in headers:
            parts.append(escape.utf8(key) + b(": ") + escape.utf8(value) + b("\r\n"))
        parts.append(b("\r\n"))
        parts.append(body)
        request.write(b("").join(parts))
        request.finish()
        self._log(status_code, request)
开发者ID:BantouTelecom,项目名称:alertbirds-community-edition,代码行数:34,代码来源:wsgi.py


示例10: prepare_request

    def prepare_request(cls, request, default_host):
        parsed = urlparse.urlsplit(_unicode(request.url))
        if request.method not in cls._SUPPORTED_METHODS and not request.allow_nonstandard_methods:
            raise KeyError("unknown method %s" % request.method)
        request.follow_redirects = False
        for key in (
            "network_interface",
            "proxy_host",
            "proxy_port",
            "proxy_username",
            "proxy_password",
            "expect_100_continue",
            "body_producer",
        ):
            if getattr(request, key, None):
                raise NotImplementedError("%s not supported" % key)

        request.headers.pop("Connection", None)
        if "Host" not in request.headers:
            if not parsed.netloc:
                request.headers["Host"] = default_host
            elif "@" in parsed.netloc:
                request.headers["Host"] = parsed.netloc.rpartition("@")[-1]
            else:
                request.headers["Host"] = parsed.netloc
        username, password = None, None
        if parsed.username is not None:
            username, password = parsed.username, parsed.password
        elif request.auth_username is not None:
            username = request.auth_username
            password = request.auth_password or ""
        if username is not None:
            if request.auth_mode not in (None, "basic"):
                raise ValueError("unsupported auth_mode %s", request.auth_mode)
            auth = utf8(username) + b":" + utf8(password)
            request.headers["Authorization"] = b"Basic " + base64.b64encode(auth)
        if request.user_agent:
            request.headers["User-Agent"] = request.user_agent
        if not request.allow_nonstandard_methods:
            # Some HTTP methods nearly always have bodies while others
            # almost never do. Fail in this case unless the user has
            # opted out of sanity checks with allow_nonstandard_methods.
            body_expected = request.method in ("POST", "PATCH", "PUT")
            body_present = request.body is not None or request.body_producer is not None
            if (body_expected and not body_present) or (body_present and not body_expected):
                raise ValueError(
                    "Body must %sbe None for method %s (unless "
                    "allow_nonstandard_methods is true)" % ("not " if body_expected else "", request.method)
                )
        if request.body is not None:
            # When body_producer is used the caller is responsible for
            # setting Content-Length (or else chunked encoding will be used).
            request.headers["Content-Length"] = str(len(request.body))
        if request.method == "POST" and "Content-Type" not in request.headers:
            request.headers["Content-Type"] = "application/x-www-form-urlencoded"
        if request.decompress_response:
            request.headers["Accept-Encoding"] = "gzip"

        request.url = (parsed.path or "/") + (("?" + parsed.query) if parsed.query else "")
        return request
开发者ID:mSOHU,项目名称:http2,代码行数:60,代码来源:tornado4.py


示例11: get

    def get(self):
        realm = 'test'
        opaque = 'asdf'
        # Real implementations would use a random nonce.
        nonce = "1234"

        auth_header = self.request.headers.get('Authorization', None)
        if auth_header is not None:
            auth_mode, params = auth_header.split(' ', 1)
            assert auth_mode == 'Digest'
            param_dict = {}
            for pair in params.split(','):
                k, v = pair.strip().split('=', 1)
                if v[0] == '"' and v[-1] == '"':
                    v = v[1:-1]
                param_dict[k] = v
            assert param_dict['realm'] == realm
            assert param_dict['opaque'] == opaque
            assert param_dict['nonce'] == nonce
            assert param_dict['username'] == self.username
            assert param_dict['uri'] == self.request.path
            h1 = md5(utf8('%s:%s:%s' % (self.username, realm, self.password))).hexdigest()
            h2 = md5(utf8('%s:%s' % (self.request.method,
                                     self.request.path))).hexdigest()
            digest = md5(utf8('%s:%s:%s' % (h1, nonce, h2))).hexdigest()
            if digest == param_dict['response']:
                self.write('ok')
            else:
                self.write('fail')
        else:
            self.set_status(401)
            self.set_header('WWW-Authenticate',
                            'Digest realm="%s", nonce="%s", opaque="%s"' %
                            (realm, nonce, opaque))
开发者ID:leeclemens,项目名称:tornado,代码行数:34,代码来源:curl_httpclient_test.py


示例12: write_headers

 def write_headers(self, start_line, headers, chunk=None, callback=None):
     """Implements `.HTTPConnection.write_headers`."""
     if self.is_client:
         self._request_start_line = start_line
         # Client requests with a non-empty body must have either a
         # Content-Length or a Transfer-Encoding.
         self._chunking_output = (
             start_line.method in ('POST', 'PUT', 'PATCH') and
             'Content-Length' not in headers and
             'Transfer-Encoding' not in headers)
     else:
         self._response_start_line = start_line
         self._chunking_output = (
             # TODO: should this use
             # self._request_start_line.version or
             # start_line.version?
             self._request_start_line.version == 'HTTP/1.1' and
             # 304 responses have no body (not even a zero-length body), and so
             # should not have either Content-Length or Transfer-Encoding.
             # headers.
             start_line.code != 304 and
             # No need to chunk the output if a Content-Length is specified.
             'Content-Length' not in headers and
             # Applications are discouraged from touching Transfer-Encoding,
             # but if they do, leave it alone.
             'Transfer-Encoding' not in headers)
         # If a 1.0 client asked for keep-alive, add the header.
         if (self._request_start_line.version == 'HTTP/1.0' and
             (self._request_headers.get('Connection', '').lower()
              == 'keep-alive')):
             headers['Connection'] = 'Keep-Alive'
     if self._chunking_output:
         headers['Transfer-Encoding'] = 'chunked'
     if (not self.is_client and
         (self._request_start_line.method == 'HEAD' or
          start_line.code == 304)):
         self._expected_content_remaining = 0
     elif 'Content-Length' in headers:
         self._expected_content_remaining = int(headers['Content-Length'])
     else:
         self._expected_content_remaining = None
     lines = [utf8("%s %s %s" % start_line)]
     lines.extend([utf8(n) + b": " + utf8(v) for n, v in headers.get_all()])
     for line in lines:
         if b'\n' in line:
             raise ValueError('Newline in header: ' + repr(line))
     if self.stream.closed():
         self._write_future = Future()
         self._write_future.set_exception(iostream.StreamClosedError())
     else:
         if callback is not None:
             self._write_callback = stack_context.wrap(callback)
         else:
             self._write_future = Future()
         data = b"\r\n".join(lines) + b"\r\n\r\n"
         if chunk:
             data += self._format_chunk(chunk)
         self._pending_write = self.stream.write(data)
         self._pending_write.add_done_callback(self._on_write_complete)
     return self._write_future
开发者ID:yantetree,项目名称:tornado,代码行数:60,代码来源:http1connection.py


示例13: get

    def get(self, path="."):
        download = self.get_argument("dl", None)
        if download:
            cwd = os.getcwd()
            try:
                os.chdir(utf8(path))
                self._download_dir_package(path, download)
            finally:
                os.chdir(cwd)
            return

        dirs, files = [], []
        path = utf8(path)
        for i in os.listdir(path):
            p = os.path.join(path, i)
            try:
                st = os.stat(p)
            except OSError:
                continue
            padding = " " * (30 - len(i))
            if os.path.isdir(p):
                dirs.append(
                    (i + "/", padding + "{:>15}".format(st.st_nlink))
                )
            else:
                files.append(
                    (i, padding + "{:>16}".format(humanize.naturalsize(st.st_size, gnu=True)))
                )
        lst = sorted(dirs) + sorted(files)
        self.render("dir.html", path=path, lst=lst)
开发者ID:lwzm,项目名称:bae_cache,代码行数:30,代码来源:dir.py


示例14: write_error

    def write_error(self, status_code, **kwargs):
        http_explanations = {
            400: 'Request not properly formatted or contains languages that Apertium APy does not support',
            404: 'Resource requested does not exist. URL may have been mistyped',
            408: 'Server did not receive a complete request within the time it was prepared to wait. Try again',
            500: 'Unexpected condition on server. Request could not be fulfilled.',
        }
        explanation = kwargs.get('explanation', http_explanations.get(status_code, ''))
        if 'exc_info' in kwargs and len(kwargs['exc_info']) > 1:
            exception = kwargs['exc_info'][1]
            if hasattr(exception, 'log_message') and exception.log_message:
                explanation = exception.log_message % exception.args
            elif hasattr(exception, 'reason'):
                explanation = exception.reason or tornado.httputil.responses.get(status_code, 'Unknown')
            else:
                explanation = tornado.httputil.responses.get(status_code, 'Unknown')

        result = {
            'status': 'error',
            'code': status_code,
            'message': tornado.httputil.responses.get(status_code, 'Unknown'),
            'explanation': explanation,
        }

        data = escape.json_encode(result)
        self.set_header('Content-Type', 'application/json; charset=UTF-8')

        if self.callback:
            self.set_header('Content-Type', 'application/javascript; charset=UTF-8')
            self._write_buffer.append(utf8('%s(%s)' % (self.callback, data)))
        else:
            self._write_buffer.append(utf8(data))
        self.finish()
开发者ID:wikimedia,项目名称:operations-debs-contenttranslation-apertium-apy,代码行数:33,代码来源:base.py


示例15: post

    def post(self):
        data = self.post_schema()
        name = data['name']
        password = data['password']
        try:
            user = User.objects(name=name).get()
        except DoesNotExist:
            # 失败
            self.write_not_found_entity_response()
            # self.render("login.html", error="name not found")
            return

        hashed_password = bcrypt.hashpw(utf8(password),
                                        utf8(user.hashed_password))
        if hashed_password == user.hashed_password:
            # 成功
            self.set_secure_cookie("shanbay_user", str(user.id))
            self.write_response(user.format_response())
            return
            # self.render("home.html")
            # self.redirect(self.get_argument("next", "/"))
        else:
            logging.error("incorrect password")
            self.render("login.html", error="incorrect password")
            return
开发者ID:MioYvo,项目名称:shanbay,代码行数:25,代码来源:user_handler.py


示例16: save_blog

def save_blog(blog_url):
    print blog_url
    req = urllib2.Request(blog_url)    
    html = urllib2.urlopen(req)    
##    print html
    #html = response.read()  
    # soup = BeautifulSoup(open("/Users/mio/Desktop/r_blog.html"), "html.parser")
    soup = BeautifulSoup(html, "html.parser")
##    print soup

    # 日期
    blog_date = soup.find('span', class_="blogDetail-ownerOther-date")
    blog_date = utf8(blog_date.contents[0])
    print blog_date
    # 标题
    title = soup.find('h2', class_="blogDetail-title")
    title = utf8(title.contents[0])
    title = title.replace("/", "\\")
    print title

    # print soup
    a = soup.find_all("div", class_="blogDetail-content")
    blog_content = a[0]
    filename = blog_date.replace(':','-')
    with open("{}{}.html".format(download_dir, filename), "wb") as fw:
        fw.write("# {}\n".format(title))
        fw.write("> {}\n\n".format(blog_date))
        for i in blog_content:
            try:
                fw.write(str(i))
            except Exception as e:
                print e
                pass

    return get_next(soup)
开发者ID:wellsleep,项目名称:Renren_blogspider,代码行数:35,代码来源:renren_blogspider.py


示例17: _on_connect

 def _on_connect(self, parsed):
     if self._timeout is not None:
         self.io_loop.remove_timeout(self._timeout)
         self._timeout = None
     if self.request.request_timeout:
         self._timeout = self.io_loop.add_timeout(
             self.start_time + self.request.request_timeout,
             self._on_timeout)
     if (self.request.validate_cert and
         isinstance(self.stream, SSLIOStream)):
         match_hostname(self.stream.socket.getpeercert(),
                        parsed.hostname)
     if (self.request.method not in self._SUPPORTED_METHODS and
         not self.request.allow_nonstandard_methods):
         raise KeyError("unknown method %s" % self.request.method)
     for key in ('network_interface',
                 'proxy_host', 'proxy_port',
                 'proxy_username', 'proxy_password'):
         if getattr(self.request, key, None):
             raise NotImplementedError('%s not supported' % key)
     if "Host" not in self.request.headers:
         self.request.headers["Host"] = parsed.netloc
     username, password = None, None
     if parsed.username is not None:
         username, password = parsed.username, parsed.password
     elif self.request.auth_username is not None:
         username = self.request.auth_username
         password = self.request.auth_password
     if username is not None:
         auth = utf8(username) + b(":") + utf8(password)
         self.request.headers["Authorization"] = (b("Basic ") +
                                                  base64.b64encode(auth))
     if self.request.user_agent:
         self.request.headers["User-Agent"] = self.request.user_agent
     if not self.request.allow_nonstandard_methods:
         if self.request.method in ("POST", "PUT"):
             assert self.request.body is not None
         else:
             assert self.request.body is None
     if self.request.body is not None:
         self.request.headers["Content-Length"] = str(len(
                 self.request.body))
     if (self.request.method == "POST" and
         "Content-Type" not in self.request.headers):
         self.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
     if self.request.use_gzip:
         self.request.headers["Accept-Encoding"] = "gzip"
     req_path = ((parsed.path or '/') +
             (('?' + parsed.query) if parsed.query else ''))
     request_lines = [utf8("%s %s HTTP/1.1" % (self.request.method,
                                               req_path))]
     for k, v in self.request.headers.get_all():
         line = utf8(k) + b(": ") + utf8(v)
         if b('\n') in line:
             raise ValueError('Newline in header: ' + repr(line))
         request_lines.append(line)
     self.stream.write(b("\r\n").join(request_lines) + b("\r\n\r\n"))
     if self.request.body is not None:
         self.stream.write(self.request.body)
     self.stream.read_until_regex(b("\r?\n\r?\n"), self._on_headers)
开发者ID:GALI472,项目名称:zhidaobot,代码行数:60,代码来源:simple_httpclient.py


示例18: _oauth10a_signature

def _oauth10a_signature(
    consumer_token: Dict[str, Any],
    method: str,
    url: str,
    parameters: Dict[str, Any] = {},
    token: Dict[str, Any] = None,
) -> bytes:
    """Calculates the HMAC-SHA1 OAuth 1.0a signature for the given request.

    See http://oauth.net/core/1.0a/#signing_process
    """
    parts = urllib.parse.urlparse(url)
    scheme, netloc, path = parts[:3]
    normalized_url = scheme.lower() + "://" + netloc.lower() + path

    base_elems = []
    base_elems.append(method.upper())
    base_elems.append(normalized_url)
    base_elems.append(
        "&".join(
            "%s=%s" % (k, _oauth_escape(str(v))) for k, v in sorted(parameters.items())
        )
    )

    base_string = "&".join(_oauth_escape(e) for e in base_elems)
    key_elems = [escape.utf8(urllib.parse.quote(consumer_token["secret"], safe="~"))]
    key_elems.append(
        escape.utf8(urllib.parse.quote(token["secret"], safe="~") if token else "")
    )
    key = b"&".join(key_elems)

    hash = hmac.new(key, escape.utf8(base_string), hashlib.sha1)
    return binascii.b2a_base64(hash.digest())[:-1]
开发者ID:bdarnell,项目名称:tornado,代码行数:33,代码来源:auth.py


示例19: test_unrecognised_command

 def test_unrecognised_command(self):
     self.connect()
     for command in ['', '  ']:
         self.stream.write(utf8('%s\r\n' % command))
         data = self.read_response()
         self.assertEqual(data, utf8('500 Error: bad syntax\r\n'))
     self.close()
开发者ID:puentesarrin,项目名称:bonzo,代码行数:7,代码来源:server_test.py


示例20: send_email

def send_email(from_, to_, subject, body, html=None, attachments=[]):
    if isinstance(from_, EmailAddress):
        from_ = str(from_)
    else:
        from_ = utf8(from_)
    
    to = [utf8(t) for t in to_]
    
    if html:
        message = MIMEMultipart("alternative")
        message.attach(MIMEText(body, "plain"))
        message.attach(MIMEText(html, "html"))
    else:
        message = MIMEText(body)
        
    if attachments:
        part = message
        message = MIMEMultipart("mixed")
        message.attach(part)
        for filename, data in attachments:
            part = MIMEBase("application", "octet-stream")
            part.set_payload(data)
            encoders.encode_base64(part)
            part.add_header("Content-Disposition", "attachment", filename=filename)
            message.attach(part)
    
    message["Date"] = formatdate(time.time())
    message["From"] = from_
    message["To"] = COMMASPACE.join(to)
    message["Subject"] = utf8(subject)
开发者ID:wangyixiang,项目名称:lottery,代码行数:30,代码来源:mail.py



注:本文中的tornado.escape.utf8函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python escape.xhtml_escape函数代码示例发布时间:2022-05-27
下一篇:
Python escape.url_unescape函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap