• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python str_utils.force_str函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中zerver.lib.str_utils.force_str函数的典型用法代码示例。如果您正苦于以下问题:Python force_str函数的具体用法?Python force_str怎么用?Python force_str使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了force_str函数的18个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: webathena_kerberos_login

def webathena_kerberos_login(request: HttpRequest, user_profile: UserProfile,
                             cred: Text=REQ(default=None)) -> HttpResponse:
    global kerberos_alter_egos
    if cred is None:
        return json_error(_("Could not find Kerberos credential"))
    if not user_profile.realm.webathena_enabled:
        return json_error(_("Webathena login not enabled"))

    try:
        parsed_cred = ujson.loads(cred)
        user = parsed_cred["cname"]["nameString"][0]
        if user in kerberos_alter_egos:
            user = kerberos_alter_egos[user]
        assert(user == user_profile.email.split("@")[0])
        ccache = make_ccache(parsed_cred)
    except Exception:
        return json_error(_("Invalid Kerberos cache"))

    # TODO: Send these data via (say) rabbitmq
    try:
        subprocess.check_call(["ssh", settings.PERSONAL_ZMIRROR_SERVER, "--",
                               "/home/zulip/python-zulip-api/zulip/integrations/zephyr/process_ccache",
                               force_str(user),
                               force_str(user_profile.api_key),
                               force_str(base64.b64encode(ccache))])
    except Exception:
        logging.exception("Error updating the user's ccache")
        return json_error(_("We were unable to setup mirroring for you"))

    return json_success()
开发者ID:gnprice,项目名称:zulip,代码行数:30,代码来源:zephyr.py


示例2: webathena_kerberos_login

def webathena_kerberos_login(request, user_profile,
                             cred=REQ(default=None)):
    # type: (HttpRequest, UserProfile, Text) -> HttpResponse
    if cred is None:
        return json_error(_("Could not find Kerberos credential"))
    if not user_profile.realm.webathena_enabled:
        return json_error(_("Webathena login not enabled"))

    try:
        parsed_cred = ujson.loads(cred)
        user = parsed_cred["cname"]["nameString"][0]
        if user == "golem":
            # Hack for an mit.edu user whose Kerberos username doesn't
            # match what he zephyrs as
            user = "ctl"
        assert(user == user_profile.email.split("@")[0])
        ccache = make_ccache(parsed_cred)
    except Exception:
        return json_error(_("Invalid Kerberos cache"))

    # TODO: Send these data via (say) rabbitmq
    try:
        subprocess.check_call(["ssh", settings.PERSONAL_ZMIRROR_SERVER, "--",
                               "/home/zulip/zulip/bots/process_ccache",
                               force_str(user),
                               force_str(user_profile.api_key),
                               force_str(base64.b64encode(ccache))])
    except Exception:
        logging.exception("Error updating the user's ccache")
        return json_error(_("We were unable to setup mirroring for you"))

    return json_success()
开发者ID:TomaszKolek,项目名称:zulip,代码行数:32,代码来源:zephyr.py


示例3: generate_secrets

def generate_secrets(development=False):
    # type: (bool) -> None
    if development:
        OUTPUT_SETTINGS_FILENAME = "zproject/dev-secrets.conf"
    else:
        OUTPUT_SETTINGS_FILENAME = "/etc/zulip/zulip-secrets.conf"

    lines = [u'[secrets]\n']

    def config_line(var, value):
        # type: (text_type, text_type) -> text_type
        return "%s = %s\n" % (var, value)

    old_conf = get_old_conf(OUTPUT_SETTINGS_FILENAME)
    for name in AUTOGENERATED_SETTINGS:
        lines.append(config_line(name, old_conf.get(name, generate_random_token(64))))

    secret_key = old_conf.get('secret_key', generate_django_secretkey())
    lines.append(config_line('secret_key', secret_key))

    camo_key = old_conf.get('camo_key', get_random_string(64))
    lines.append(config_line('camo_key', camo_key))

    if not development:
        # Write the Camo config file directly
        generate_camo_config_file(camo_key)

    out = open(OUTPUT_SETTINGS_FILENAME, 'w')
    out.write(force_str("".join(lines)))
    out.close()

    print("Generated %s with auto-generated secrets!" % (OUTPUT_SETTINGS_FILENAME,))
开发者ID:HKingz,项目名称:zulip,代码行数:32,代码来源:generate_secrets.py


示例4: generate_secrets

def generate_secrets(development=False):
    # type: (bool) -> None
    if development:
        OUTPUT_SETTINGS_FILENAME = "zproject/dev-secrets.conf"
    else:
        OUTPUT_SETTINGS_FILENAME = "/etc/zulip/zulip-secrets.conf"
    current_conf = get_old_conf(OUTPUT_SETTINGS_FILENAME)

    lines = []  # type: List[Text]
    if len(current_conf) == 0:
        lines = [u'[secrets]\n']

    def need_secret(name):
        # type: (str) -> bool
        return name not in current_conf

    def add_secret(name, value):
        # type: (str, Text) -> None
        lines.append("%s = %s\n" % (name, value))
        current_conf[name] = value

    for name in AUTOGENERATED_SETTINGS:
        if need_secret(name):
            add_secret(name, generate_random_token(64))

    if need_secret('secret_key'):
        add_secret('secret_key', generate_django_secretkey())

    if need_secret('camo_key'):
        add_secret('camo_key', get_random_string(64))

    # zulip_org_key is generated using os.urandom().
    # zulip_org_id does not require a secure CPRNG,
    # it only needs to be unique.
    if need_secret('zulip_org_key'):
        add_secret('zulip_org_key', get_random_string(64))
    if need_secret('zulip_org_id'):
        add_secret('zulip_org_id', str(uuid.uuid4()))

    if not development:
        # Write the Camo config file directly
        generate_camo_config_file(current_conf['camo_key'])

    if len(lines) == 0:
        print("generate_secrets: No new secrets to generate.")
        return

    out = open(OUTPUT_SETTINGS_FILENAME, 'a')
    # Write a newline at the start, in case there was no newline at
    # the end of the file due to human editing.
    out.write("\n" + force_str("".join(lines)))
    out.close()

    print("Generated new secrets in %s." % (OUTPUT_SETTINGS_FILENAME,))
开发者ID:joydeep1701,项目名称:zulip,代码行数:54,代码来源:generate_secrets.py


示例5: upload_image_to_s3

def upload_image_to_s3(
        bucket_name,
        file_name,
        content_type,
        user_profile,
        contents):
    # type: (NonBinaryStr, Text, Optional[Text], UserProfile, binary_type) -> None

    conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
    bucket = get_bucket(conn, force_str(bucket_name))
    key = Key(bucket)
    key.key = force_str(file_name)
    key.set_metadata("user_profile_id", str(user_profile.id))
    key.set_metadata("realm_id", str(user_profile.realm_id))

    if content_type is not None:
        headers = {'Content-Type': force_str(content_type)}
    else:
        headers = None

    key.set_contents_from_string(contents, headers=headers)
开发者ID:sharmaeklavya2,项目名称:zulip,代码行数:21,代码来源:upload.py


示例6: _wrapped_view_func

    def _wrapped_view_func(request, *args, **kwargs):
        # type: (HttpRequest, *Any, **Any) -> HttpResponse
        try:
            auth_type, encoded_value = request.META['HTTP_AUTHORIZATION'].split()  # type: str, str
            if auth_type.lower() == "basic":
                email, api_key = base64.b64decode(force_bytes(encoded_value)).decode('utf-8').split(":")
                email = email.replace('%40', '@')
                credentials = u"%s:%s" % (email, api_key)
                encoded_credentials = force_str(base64.b64encode(credentials.encode('utf-8')))
                request.META['HTTP_AUTHORIZATION'] = "Basic " + encoded_credentials
        except Exception:
            pass

        return view_func(request, *args, **kwargs)
开发者ID:christi3k,项目名称:zulip,代码行数:14,代码来源:view.py


示例7: post_process

    def post_process(self, paths, dry_run=False, **kwargs):
        # type: (Dict[str, Tuple[ZulipStorage, str]], bool, **Any) -> List[Tuple[str, str, bool]]
        if dry_run:
            return []

        with open(settings.STATIC_HEADER_FILE, 'rb') as header_file:
            header = header_file.read().decode(settings.FILE_CHARSET)

        # A dictionary of path to tuples of (old_path, new_path,
        # processed).  The return value of this method is the values
        # of this dictionary
        ret_dict = {}

        for name in paths:
            storage, path = paths[name]

            if not path.startswith('min/') or not path.endswith('.css'):
                ret_dict[path] = (path, path, False)
                continue

            # Prepend the header
            with storage.open(path, 'rb') as orig_file:
                orig_contents = orig_file.read().decode(settings.FILE_CHARSET)

            storage.delete(path)

            with storage.open(path, 'w') as new_file:
                new_file.write(force_str(header + orig_contents, encoding=settings.FILE_CHARSET))

            ret_dict[path] = (path, path, True)

        super_class = super()
        if hasattr(super_class, 'post_process'):
            super_ret = super_class.post_process(paths, dry_run, **kwargs)  # type: ignore # https://github.com/python/mypy/issues/2956
        else:
            super_ret = []

        # Merge super class's return value with ours
        for val in super_ret:
            old_path, new_path, processed = val
            if processed:
                ret_dict[old_path] = val

        return list(ret_dict.values())
开发者ID:brockwhittaker,项目名称:zulip,代码行数:44,代码来源:storage.py


示例8: test_success

    def test_success(self):
        # type: () -> None
        script = os.path.join(os.path.dirname(__file__),
                              '../../scripts/lib/email-mirror-postfix')

        sender = self.example_email('hamlet')
        stream = get_stream("Denmark", get_realm("zulip"))
        stream_to_address = encode_email_address(stream)

        template_path = os.path.join(MAILS_DIR, "simple.txt")
        with open(template_path) as template_file:
            mail_template = template_file.read()
        mail = mail_template.format(stream_to_address=stream_to_address, sender=sender)
        read_pipe, write_pipe = os.pipe()
        os.write(write_pipe, mail.encode())
        os.close(write_pipe)
        subprocess.check_call(
            [script, '-r', force_str(stream_to_address), '-s', settings.SHARED_SECRET, '-t'],
            stdin=read_pipe)
开发者ID:brockwhittaker,项目名称:zulip,代码行数:19,代码来源:test_email_mirror.py


示例9: compute_mit_user_fullname

def compute_mit_user_fullname(email: NonBinaryStr) -> NonBinaryStr:
    try:
        # Input is either e.g. [email protected] or user|[email protected]
        match_user = re.match(r'^([a-zA-Z0-9_.-]+)(\|.+)[email protected]\.edu$', email.lower())
        if match_user and match_user.group(2) is None:
            answer = DNS.dnslookup(
                "%s.passwd.ns.athena.mit.edu" % (match_user.group(1),),
                DNS.Type.TXT)
            hesiod_name = force_str(answer[0][0]).split(':')[4].split(',')[0].strip()
            if hesiod_name != "":
                return hesiod_name
        elif match_user:
            return match_user.group(1).lower() + "@" + match_user.group(2).upper()[1:]
    except DNS.Base.ServerError:
        pass
    except Exception:
        print("Error getting fullname for %s:" % (email,))
        traceback.print_exc()
    return email.lower()
开发者ID:brainwane,项目名称:zulip,代码行数:19,代码来源:zephyr.py


示例10: ensure_medium_avatar_image

    def ensure_medium_avatar_image(self, email):
        # type: (text_type) -> None
        user_profile = get_user_profile_by_email(email)
        email_hash = user_avatar_hash(email)
        s3_file_name = email_hash

        bucket_name = settings.S3_AVATAR_BUCKET
        conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
        bucket = get_bucket(conn, force_str(bucket_name))
        key = bucket.get_key(email_hash)
        image_data = key.get_contents_as_string()

        resized_medium = resize_avatar(image_data, MEDIUM_AVATAR_SIZE)
        upload_image_to_s3(
            bucket_name,
            s3_file_name + "-medium.png",
            "image/png",
            user_profile,
            resized_medium
        )
开发者ID:shekhirin,项目名称:zulip,代码行数:20,代码来源:upload.py


示例11: upload_image_to_s3

def upload_image_to_s3(
        bucket_name,
        file_name,
        content_type,
        user_profile,
        contents):
    # type: (NonBinaryStr, Text, Optional[Text], UserProfile, binary_type) -> None

    conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
    bucket = get_bucket(conn, bucket_name)
    key = Key(bucket)
    key.key = force_str(file_name)
    key.set_metadata("user_profile_id", str(user_profile.id))
    key.set_metadata("realm_id", str(user_profile.realm_id))

    if content_type is not None:
        headers = {u'Content-Type': content_type}  # type: Optional[Dict[Text, Text]]
    else:
        headers = None

    key.set_contents_from_string(contents, headers=headers)  # type: ignore # https://github.com/python/typeshed/issues/1552
开发者ID:brockwhittaker,项目名称:zulip,代码行数:21,代码来源:upload.py


示例12: get_signed_upload_url

def get_signed_upload_url(path):
    # type: (text_type) -> text_type
    conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
    return force_text(conn.generate_url(15, 'GET', bucket=settings.S3_AUTH_UPLOADS_BUCKET, key=force_str(path)))
开发者ID:shekhirin,项目名称:zulip,代码行数:4,代码来源:upload.py


示例13: hex_to_ascii

def hex_to_ascii(input_string):
    # type: (str) -> str
    """Given a hex array, decode it back to a string"""
    return force_str(binascii.unhexlify(input_string))
开发者ID:JamesLinus,项目名称:zulip,代码行数:4,代码来源:mobile_auth_otp.py


示例14: consume

 def consume(self, event: Mapping[str, Any]) -> None:
     message = force_str(event["message"])
     mirror_email(email.message_from_string(message),
                  rcpt_to=event["rcpt_to"], pre_checked=True)
开发者ID:brainwane,项目名称:zulip,代码行数:4,代码来源:queue_processors.py


示例15: api_github_v2

def api_github_v2(user_profile, event, payload, branches, default_stream,
                  commit_stream, issue_stream, topic_focus = None):
    # type: (UserProfile, text_type, Mapping[text_type, Any], text_type, text_type, text_type, text_type, Optional[text_type]) -> Tuple[text_type, text_type, text_type]
    """
    processes github payload with version 2 field specification
    `payload` comes in unmodified from github
    `default_stream` is set to what `stream` is in v1 above
    `commit_stream` and `issue_stream` fall back to `default_stream` if they are empty
    This and allowing alternative endpoints is what distinguishes v1 from v2 of the github configuration
    """
    target_stream = commit_stream if commit_stream else default_stream
    issue_stream = issue_stream if issue_stream else default_stream
    repository = payload['repository']
    topic_focus = topic_focus if topic_focus else repository['name']

    # Event Handlers
    if event == 'pull_request':
        subject = get_pull_request_or_issue_subject(repository, payload['pull_request'], 'PR')
        content = github_pull_request_content(payload)
    elif event == 'issues':
        # in v1, we assume that this stream exists since it is
        # deprecated and the few realms that use it already have the
        # stream
        target_stream = issue_stream
        subject = get_pull_request_or_issue_subject(repository, payload['issue'], 'Issue')
        content = github_issues_content(payload)
    elif event == 'issue_comment':
        # Comments on both issues and pull requests come in as issue_comment events
        issue = payload['issue']
        if 'pull_request' not in issue or issue['pull_request']['diff_url'] is None:
            # It's an issues comment
            target_stream = issue_stream
            type = 'Issue'
            subject = get_pull_request_or_issue_subject(repository, payload['issue'], type)
        else:
            # It's a pull request comment
            type = 'PR'
            subject = get_pull_request_or_issue_subject(repository, payload['issue'], type)

        content = github_object_commented_content(payload, type)

    elif event == 'push':
        subject, content = build_message_from_gitlog(user_profile, topic_focus,
                                                     payload['ref'], payload['commits'],
                                                     payload['before'], payload['after'],
                                                     payload['compare'],
                                                     payload['pusher']['name'],
                                                     forced=payload['forced'],
                                                     created=payload['created'])
    elif event == 'commit_comment':
        comment = payload['comment']
        subject = u'%s: commit %s' % (topic_focus, comment['commit_id'])

        content = (u'%s [commented](%s)'
                   % (comment['user']['login'],
                      comment['html_url']))

        if comment['line'] is not None:
            content += u' on `%s`, line %d' % (comment['path'], comment['line'])

        content += u'\n\n~~~ quote\n%s\n~~~' % (comment['body'],)

    else:
        raise UnknownEventType(force_str(u'Event %s is unknown and cannot be handled' % (event,)))

    return target_stream, subject, content
开发者ID:timabbott,项目名称:zulip,代码行数:66,代码来源:github.py


示例16: otp_encrypt_api_key

def otp_encrypt_api_key(user_profile, otp):
    # type: (UserProfile, str) -> str
    assert len(otp) == UserProfile.API_KEY_LENGTH * 2
    hex_encoded_api_key = ascii_to_hex(force_str(user_profile.api_key))
    assert len(hex_encoded_api_key) == UserProfile.API_KEY_LENGTH * 2
    return xor_hex_strings(hex_encoded_api_key, otp)
开发者ID:JamesLinus,项目名称:zulip,代码行数:6,代码来源:mobile_auth_otp.py


示例17: api_github_v2

def api_github_v2(
    user_profile, event, payload, branches, default_stream, commit_stream, issue_stream, topic_focus=None
):
    # type: (UserProfile, text_type, Mapping[text_type, Any], text_type, text_type, text_type, text_type, Optional[text_type]) -> Tuple[text_type, text_type, text_type]
    """
    processes github payload with version 2 field specification
    `payload` comes in unmodified from github
    `default_stream` is set to what `stream` is in v1 above
    `commit_stream` and `issue_stream` fall back to `default_stream` if they are empty
    This and allowing alternative endpoints is what distinguishes v1 from v2 of the github configuration
    """
    target_stream = commit_stream if commit_stream else default_stream
    issue_stream = issue_stream if issue_stream else default_stream
    repository = payload["repository"]
    topic_focus = topic_focus if topic_focus else repository["name"]

    # Event Handlers
    if event == "pull_request":
        subject = get_pull_request_or_issue_subject(repository, payload["pull_request"], "PR")
        content = github_pull_request_content(payload)
    elif event == "issues":
        # in v1, we assume that this stream exists since it is
        # deprecated and the few realms that use it already have the
        # stream
        target_stream = issue_stream
        subject = get_pull_request_or_issue_subject(repository, payload["issue"], "Issue")
        content = github_issues_content(payload)
    elif event == "issue_comment":
        # Comments on both issues and pull requests come in as issue_comment events
        issue = payload["issue"]
        if "pull_request" not in issue or issue["pull_request"]["diff_url"] is None:
            # It's an issues comment
            target_stream = issue_stream
            type = "Issue"
            subject = get_pull_request_or_issue_subject(repository, payload["issue"], type)
        else:
            # It's a pull request comment
            type = "PR"
            subject = get_pull_request_or_issue_subject(repository, payload["issue"], type)

        content = github_object_commented_content(payload, type)

    elif event == "push":
        subject, content = build_message_from_gitlog(
            user_profile,
            topic_focus,
            payload["ref"],
            payload["commits"],
            payload["before"],
            payload["after"],
            payload["compare"],
            payload["pusher"]["name"],
            forced=payload["forced"],
            created=payload["created"],
        )
    elif event == "commit_comment":
        subject = topic_focus

        comment = payload.get("comment")
        action = u"[commented]({})".format(comment["html_url"])
        content = get_commits_comment_action_message(
            comment["user"]["login"],
            action,
            comment["html_url"].split("#", 1)[0],
            comment["commit_id"],
            comment["body"],
        )

    else:
        raise UnknownEventType(force_str(u"Event %s is unknown and cannot be handled" % (event,)))

    return target_stream, subject, content
开发者ID:galexrt,项目名称:zulip,代码行数:72,代码来源:github.py


示例18: twitter_text

    def twitter_text(self, text, urls, user_mentions, media):
        # type: (text_type, List[Dict[text_type, text_type]], List[Dict[text_type, Any]], List[Dict[text_type, Any]]) -> Element
        """
        Use data from the twitter API to turn links, mentions and media into A
        tags.

        This works by using the urls, user_mentions and media data from the
        twitter API.

        The first step is finding the locations of the URLs, mentions and media
        in the text. For each match we build a dictionary with the start
        location, end location, the URL to link to, and the text to show in the
        link.

        Next we sort the matches by start location. And for each we add the
        text from the end of the last link to the start of the current link to
        the output. The text needs to added to the text attribute of the first
        node (the P tag) or the tail the last link created.

        Finally we add any remaining text to the last node.
        """

        to_linkify = [] # type: List[Dict[text_type, Any]]
        # Build dicts for URLs
        for url_data in urls:
            short_url = url_data["url"]
            full_url = url_data["expanded_url"]
            for match in re.finditer(re.escape(short_url), text, re.IGNORECASE):
                to_linkify.append({
                    'start': match.start(),
                    'end': match.end(),
                    'url': short_url,
                    'text': full_url,
                })
        # Build dicts for mentions
        for user_mention in user_mentions:
            screen_name = user_mention['screen_name']
            mention_string = u'@' + screen_name
            for match in re.finditer(re.escape(mention_string), text, re.IGNORECASE):
                to_linkify.append({
                    'start': match.start(),
                    'end': match.end(),
                    'url': u'https://twitter.com/' + force_text(urllib.parse.quote(force_str(screen_name))),
                    'text': mention_string,
                })
        # Build dicts for media
        for media_item in media:
            short_url = media_item['url']
            expanded_url = media_item['expanded_url']
            for match in re.finditer(re.escape(short_url), text, re.IGNORECASE):
                to_linkify.append({
                    'start': match.start(),
                    'end': match.end(),
                    'url': short_url,
                    'text': expanded_url,
                })

        to_linkify.sort(key=lambda x: x['start'])
        p = current_node = markdown.util.etree.Element('p')

        def set_text(text):
            # type: (text_type) -> None
            """
            Helper to set the text or the tail of the current_node
            """
            if current_node == p:
                current_node.text = text
            else:
                current_node.tail = text

        current_index = 0
        for link in to_linkify:
            # The text we want to link starts in already linked text skip it
            if link['start'] < current_index:
                continue
            # Add text from the end of last link to the start of the current
            # link
            set_text(text[current_index:link['start']])
            current_index = link['end']
            current_node = a = url_to_a(link['url'], link['text'])
            p.append(a)

        # Add any unused text
        set_text(text[current_index:])
        return p
开发者ID:150vb,项目名称:zulip,代码行数:85,代码来源:__init__.py



注:本文中的zerver.lib.str_utils.force_str函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python str_utils.force_text函数代码示例发布时间:2022-05-26
下一篇:
Python str_utils.force_bytes函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap