本文整理汇总了Python中zerver.lib.str_utils.force_str函数的典型用法代码示例。如果您正苦于以下问题:Python force_str函数的具体用法?Python force_str怎么用?Python force_str使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了force_str函数的18个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: webathena_kerberos_login
def webathena_kerberos_login(request: HttpRequest, user_profile: UserProfile,
cred: Text=REQ(default=None)) -> HttpResponse:
global kerberos_alter_egos
if cred is None:
return json_error(_("Could not find Kerberos credential"))
if not user_profile.realm.webathena_enabled:
return json_error(_("Webathena login not enabled"))
try:
parsed_cred = ujson.loads(cred)
user = parsed_cred["cname"]["nameString"][0]
if user in kerberos_alter_egos:
user = kerberos_alter_egos[user]
assert(user == user_profile.email.split("@")[0])
ccache = make_ccache(parsed_cred)
except Exception:
return json_error(_("Invalid Kerberos cache"))
# TODO: Send these data via (say) rabbitmq
try:
subprocess.check_call(["ssh", settings.PERSONAL_ZMIRROR_SERVER, "--",
"/home/zulip/python-zulip-api/zulip/integrations/zephyr/process_ccache",
force_str(user),
force_str(user_profile.api_key),
force_str(base64.b64encode(ccache))])
except Exception:
logging.exception("Error updating the user's ccache")
return json_error(_("We were unable to setup mirroring for you"))
return json_success()
开发者ID:gnprice,项目名称:zulip,代码行数:30,代码来源:zephyr.py
示例2: webathena_kerberos_login
def webathena_kerberos_login(request, user_profile,
cred=REQ(default=None)):
# type: (HttpRequest, UserProfile, Text) -> HttpResponse
if cred is None:
return json_error(_("Could not find Kerberos credential"))
if not user_profile.realm.webathena_enabled:
return json_error(_("Webathena login not enabled"))
try:
parsed_cred = ujson.loads(cred)
user = parsed_cred["cname"]["nameString"][0]
if user == "golem":
# Hack for an mit.edu user whose Kerberos username doesn't
# match what he zephyrs as
user = "ctl"
assert(user == user_profile.email.split("@")[0])
ccache = make_ccache(parsed_cred)
except Exception:
return json_error(_("Invalid Kerberos cache"))
# TODO: Send these data via (say) rabbitmq
try:
subprocess.check_call(["ssh", settings.PERSONAL_ZMIRROR_SERVER, "--",
"/home/zulip/zulip/bots/process_ccache",
force_str(user),
force_str(user_profile.api_key),
force_str(base64.b64encode(ccache))])
except Exception:
logging.exception("Error updating the user's ccache")
return json_error(_("We were unable to setup mirroring for you"))
return json_success()
开发者ID:TomaszKolek,项目名称:zulip,代码行数:32,代码来源:zephyr.py
示例3: generate_secrets
def generate_secrets(development=False):
# type: (bool) -> None
if development:
OUTPUT_SETTINGS_FILENAME = "zproject/dev-secrets.conf"
else:
OUTPUT_SETTINGS_FILENAME = "/etc/zulip/zulip-secrets.conf"
lines = [u'[secrets]\n']
def config_line(var, value):
# type: (text_type, text_type) -> text_type
return "%s = %s\n" % (var, value)
old_conf = get_old_conf(OUTPUT_SETTINGS_FILENAME)
for name in AUTOGENERATED_SETTINGS:
lines.append(config_line(name, old_conf.get(name, generate_random_token(64))))
secret_key = old_conf.get('secret_key', generate_django_secretkey())
lines.append(config_line('secret_key', secret_key))
camo_key = old_conf.get('camo_key', get_random_string(64))
lines.append(config_line('camo_key', camo_key))
if not development:
# Write the Camo config file directly
generate_camo_config_file(camo_key)
out = open(OUTPUT_SETTINGS_FILENAME, 'w')
out.write(force_str("".join(lines)))
out.close()
print("Generated %s with auto-generated secrets!" % (OUTPUT_SETTINGS_FILENAME,))
开发者ID:HKingz,项目名称:zulip,代码行数:32,代码来源:generate_secrets.py
示例4: generate_secrets
def generate_secrets(development=False):
# type: (bool) -> None
if development:
OUTPUT_SETTINGS_FILENAME = "zproject/dev-secrets.conf"
else:
OUTPUT_SETTINGS_FILENAME = "/etc/zulip/zulip-secrets.conf"
current_conf = get_old_conf(OUTPUT_SETTINGS_FILENAME)
lines = [] # type: List[Text]
if len(current_conf) == 0:
lines = [u'[secrets]\n']
def need_secret(name):
# type: (str) -> bool
return name not in current_conf
def add_secret(name, value):
# type: (str, Text) -> None
lines.append("%s = %s\n" % (name, value))
current_conf[name] = value
for name in AUTOGENERATED_SETTINGS:
if need_secret(name):
add_secret(name, generate_random_token(64))
if need_secret('secret_key'):
add_secret('secret_key', generate_django_secretkey())
if need_secret('camo_key'):
add_secret('camo_key', get_random_string(64))
# zulip_org_key is generated using os.urandom().
# zulip_org_id does not require a secure CPRNG,
# it only needs to be unique.
if need_secret('zulip_org_key'):
add_secret('zulip_org_key', get_random_string(64))
if need_secret('zulip_org_id'):
add_secret('zulip_org_id', str(uuid.uuid4()))
if not development:
# Write the Camo config file directly
generate_camo_config_file(current_conf['camo_key'])
if len(lines) == 0:
print("generate_secrets: No new secrets to generate.")
return
out = open(OUTPUT_SETTINGS_FILENAME, 'a')
# Write a newline at the start, in case there was no newline at
# the end of the file due to human editing.
out.write("\n" + force_str("".join(lines)))
out.close()
print("Generated new secrets in %s." % (OUTPUT_SETTINGS_FILENAME,))
开发者ID:joydeep1701,项目名称:zulip,代码行数:54,代码来源:generate_secrets.py
示例5: upload_image_to_s3
def upload_image_to_s3(
bucket_name,
file_name,
content_type,
user_profile,
contents):
# type: (NonBinaryStr, Text, Optional[Text], UserProfile, binary_type) -> None
conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
bucket = get_bucket(conn, force_str(bucket_name))
key = Key(bucket)
key.key = force_str(file_name)
key.set_metadata("user_profile_id", str(user_profile.id))
key.set_metadata("realm_id", str(user_profile.realm_id))
if content_type is not None:
headers = {'Content-Type': force_str(content_type)}
else:
headers = None
key.set_contents_from_string(contents, headers=headers)
开发者ID:sharmaeklavya2,项目名称:zulip,代码行数:21,代码来源:upload.py
示例6: _wrapped_view_func
def _wrapped_view_func(request, *args, **kwargs):
# type: (HttpRequest, *Any, **Any) -> HttpResponse
try:
auth_type, encoded_value = request.META['HTTP_AUTHORIZATION'].split() # type: str, str
if auth_type.lower() == "basic":
email, api_key = base64.b64decode(force_bytes(encoded_value)).decode('utf-8').split(":")
email = email.replace('%40', '@')
credentials = u"%s:%s" % (email, api_key)
encoded_credentials = force_str(base64.b64encode(credentials.encode('utf-8')))
request.META['HTTP_AUTHORIZATION'] = "Basic " + encoded_credentials
except Exception:
pass
return view_func(request, *args, **kwargs)
开发者ID:christi3k,项目名称:zulip,代码行数:14,代码来源:view.py
示例7: post_process
def post_process(self, paths, dry_run=False, **kwargs):
# type: (Dict[str, Tuple[ZulipStorage, str]], bool, **Any) -> List[Tuple[str, str, bool]]
if dry_run:
return []
with open(settings.STATIC_HEADER_FILE, 'rb') as header_file:
header = header_file.read().decode(settings.FILE_CHARSET)
# A dictionary of path to tuples of (old_path, new_path,
# processed). The return value of this method is the values
# of this dictionary
ret_dict = {}
for name in paths:
storage, path = paths[name]
if not path.startswith('min/') or not path.endswith('.css'):
ret_dict[path] = (path, path, False)
continue
# Prepend the header
with storage.open(path, 'rb') as orig_file:
orig_contents = orig_file.read().decode(settings.FILE_CHARSET)
storage.delete(path)
with storage.open(path, 'w') as new_file:
new_file.write(force_str(header + orig_contents, encoding=settings.FILE_CHARSET))
ret_dict[path] = (path, path, True)
super_class = super()
if hasattr(super_class, 'post_process'):
super_ret = super_class.post_process(paths, dry_run, **kwargs) # type: ignore # https://github.com/python/mypy/issues/2956
else:
super_ret = []
# Merge super class's return value with ours
for val in super_ret:
old_path, new_path, processed = val
if processed:
ret_dict[old_path] = val
return list(ret_dict.values())
开发者ID:brockwhittaker,项目名称:zulip,代码行数:44,代码来源:storage.py
示例8: test_success
def test_success(self):
# type: () -> None
script = os.path.join(os.path.dirname(__file__),
'../../scripts/lib/email-mirror-postfix')
sender = self.example_email('hamlet')
stream = get_stream("Denmark", get_realm("zulip"))
stream_to_address = encode_email_address(stream)
template_path = os.path.join(MAILS_DIR, "simple.txt")
with open(template_path) as template_file:
mail_template = template_file.read()
mail = mail_template.format(stream_to_address=stream_to_address, sender=sender)
read_pipe, write_pipe = os.pipe()
os.write(write_pipe, mail.encode())
os.close(write_pipe)
subprocess.check_call(
[script, '-r', force_str(stream_to_address), '-s', settings.SHARED_SECRET, '-t'],
stdin=read_pipe)
开发者ID:brockwhittaker,项目名称:zulip,代码行数:19,代码来源:test_email_mirror.py
示例9: compute_mit_user_fullname
def compute_mit_user_fullname(email: NonBinaryStr) -> NonBinaryStr:
try:
# Input is either e.g. [email protected] or user|[email protected]
match_user = re.match(r'^([a-zA-Z0-9_.-]+)(\|.+)[email protected]\.edu$', email.lower())
if match_user and match_user.group(2) is None:
answer = DNS.dnslookup(
"%s.passwd.ns.athena.mit.edu" % (match_user.group(1),),
DNS.Type.TXT)
hesiod_name = force_str(answer[0][0]).split(':')[4].split(',')[0].strip()
if hesiod_name != "":
return hesiod_name
elif match_user:
return match_user.group(1).lower() + "@" + match_user.group(2).upper()[1:]
except DNS.Base.ServerError:
pass
except Exception:
print("Error getting fullname for %s:" % (email,))
traceback.print_exc()
return email.lower()
开发者ID:brainwane,项目名称:zulip,代码行数:19,代码来源:zephyr.py
示例10: ensure_medium_avatar_image
def ensure_medium_avatar_image(self, email):
# type: (text_type) -> None
user_profile = get_user_profile_by_email(email)
email_hash = user_avatar_hash(email)
s3_file_name = email_hash
bucket_name = settings.S3_AVATAR_BUCKET
conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
bucket = get_bucket(conn, force_str(bucket_name))
key = bucket.get_key(email_hash)
image_data = key.get_contents_as_string()
resized_medium = resize_avatar(image_data, MEDIUM_AVATAR_SIZE)
upload_image_to_s3(
bucket_name,
s3_file_name + "-medium.png",
"image/png",
user_profile,
resized_medium
)
开发者ID:shekhirin,项目名称:zulip,代码行数:20,代码来源:upload.py
示例11: upload_image_to_s3
def upload_image_to_s3(
bucket_name,
file_name,
content_type,
user_profile,
contents):
# type: (NonBinaryStr, Text, Optional[Text], UserProfile, binary_type) -> None
conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
bucket = get_bucket(conn, bucket_name)
key = Key(bucket)
key.key = force_str(file_name)
key.set_metadata("user_profile_id", str(user_profile.id))
key.set_metadata("realm_id", str(user_profile.realm_id))
if content_type is not None:
headers = {u'Content-Type': content_type} # type: Optional[Dict[Text, Text]]
else:
headers = None
key.set_contents_from_string(contents, headers=headers) # type: ignore # https://github.com/python/typeshed/issues/1552
开发者ID:brockwhittaker,项目名称:zulip,代码行数:21,代码来源:upload.py
示例12: get_signed_upload_url
def get_signed_upload_url(path):
# type: (text_type) -> text_type
conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
return force_text(conn.generate_url(15, 'GET', bucket=settings.S3_AUTH_UPLOADS_BUCKET, key=force_str(path)))
开发者ID:shekhirin,项目名称:zulip,代码行数:4,代码来源:upload.py
示例13: hex_to_ascii
def hex_to_ascii(input_string):
# type: (str) -> str
"""Given a hex array, decode it back to a string"""
return force_str(binascii.unhexlify(input_string))
开发者ID:JamesLinus,项目名称:zulip,代码行数:4,代码来源:mobile_auth_otp.py
示例14: consume
def consume(self, event: Mapping[str, Any]) -> None:
message = force_str(event["message"])
mirror_email(email.message_from_string(message),
rcpt_to=event["rcpt_to"], pre_checked=True)
开发者ID:brainwane,项目名称:zulip,代码行数:4,代码来源:queue_processors.py
示例15: api_github_v2
def api_github_v2(user_profile, event, payload, branches, default_stream,
commit_stream, issue_stream, topic_focus = None):
# type: (UserProfile, text_type, Mapping[text_type, Any], text_type, text_type, text_type, text_type, Optional[text_type]) -> Tuple[text_type, text_type, text_type]
"""
processes github payload with version 2 field specification
`payload` comes in unmodified from github
`default_stream` is set to what `stream` is in v1 above
`commit_stream` and `issue_stream` fall back to `default_stream` if they are empty
This and allowing alternative endpoints is what distinguishes v1 from v2 of the github configuration
"""
target_stream = commit_stream if commit_stream else default_stream
issue_stream = issue_stream if issue_stream else default_stream
repository = payload['repository']
topic_focus = topic_focus if topic_focus else repository['name']
# Event Handlers
if event == 'pull_request':
subject = get_pull_request_or_issue_subject(repository, payload['pull_request'], 'PR')
content = github_pull_request_content(payload)
elif event == 'issues':
# in v1, we assume that this stream exists since it is
# deprecated and the few realms that use it already have the
# stream
target_stream = issue_stream
subject = get_pull_request_or_issue_subject(repository, payload['issue'], 'Issue')
content = github_issues_content(payload)
elif event == 'issue_comment':
# Comments on both issues and pull requests come in as issue_comment events
issue = payload['issue']
if 'pull_request' not in issue or issue['pull_request']['diff_url'] is None:
# It's an issues comment
target_stream = issue_stream
type = 'Issue'
subject = get_pull_request_or_issue_subject(repository, payload['issue'], type)
else:
# It's a pull request comment
type = 'PR'
subject = get_pull_request_or_issue_subject(repository, payload['issue'], type)
content = github_object_commented_content(payload, type)
elif event == 'push':
subject, content = build_message_from_gitlog(user_profile, topic_focus,
payload['ref'], payload['commits'],
payload['before'], payload['after'],
payload['compare'],
payload['pusher']['name'],
forced=payload['forced'],
created=payload['created'])
elif event == 'commit_comment':
comment = payload['comment']
subject = u'%s: commit %s' % (topic_focus, comment['commit_id'])
content = (u'%s [commented](%s)'
% (comment['user']['login'],
comment['html_url']))
if comment['line'] is not None:
content += u' on `%s`, line %d' % (comment['path'], comment['line'])
content += u'\n\n~~~ quote\n%s\n~~~' % (comment['body'],)
else:
raise UnknownEventType(force_str(u'Event %s is unknown and cannot be handled' % (event,)))
return target_stream, subject, content
开发者ID:timabbott,项目名称:zulip,代码行数:66,代码来源:github.py
示例16: otp_encrypt_api_key
def otp_encrypt_api_key(user_profile, otp):
# type: (UserProfile, str) -> str
assert len(otp) == UserProfile.API_KEY_LENGTH * 2
hex_encoded_api_key = ascii_to_hex(force_str(user_profile.api_key))
assert len(hex_encoded_api_key) == UserProfile.API_KEY_LENGTH * 2
return xor_hex_strings(hex_encoded_api_key, otp)
开发者ID:JamesLinus,项目名称:zulip,代码行数:6,代码来源:mobile_auth_otp.py
示例17: api_github_v2
def api_github_v2(
user_profile, event, payload, branches, default_stream, commit_stream, issue_stream, topic_focus=None
):
# type: (UserProfile, text_type, Mapping[text_type, Any], text_type, text_type, text_type, text_type, Optional[text_type]) -> Tuple[text_type, text_type, text_type]
"""
processes github payload with version 2 field specification
`payload` comes in unmodified from github
`default_stream` is set to what `stream` is in v1 above
`commit_stream` and `issue_stream` fall back to `default_stream` if they are empty
This and allowing alternative endpoints is what distinguishes v1 from v2 of the github configuration
"""
target_stream = commit_stream if commit_stream else default_stream
issue_stream = issue_stream if issue_stream else default_stream
repository = payload["repository"]
topic_focus = topic_focus if topic_focus else repository["name"]
# Event Handlers
if event == "pull_request":
subject = get_pull_request_or_issue_subject(repository, payload["pull_request"], "PR")
content = github_pull_request_content(payload)
elif event == "issues":
# in v1, we assume that this stream exists since it is
# deprecated and the few realms that use it already have the
# stream
target_stream = issue_stream
subject = get_pull_request_or_issue_subject(repository, payload["issue"], "Issue")
content = github_issues_content(payload)
elif event == "issue_comment":
# Comments on both issues and pull requests come in as issue_comment events
issue = payload["issue"]
if "pull_request" not in issue or issue["pull_request"]["diff_url"] is None:
# It's an issues comment
target_stream = issue_stream
type = "Issue"
subject = get_pull_request_or_issue_subject(repository, payload["issue"], type)
else:
# It's a pull request comment
type = "PR"
subject = get_pull_request_or_issue_subject(repository, payload["issue"], type)
content = github_object_commented_content(payload, type)
elif event == "push":
subject, content = build_message_from_gitlog(
user_profile,
topic_focus,
payload["ref"],
payload["commits"],
payload["before"],
payload["after"],
payload["compare"],
payload["pusher"]["name"],
forced=payload["forced"],
created=payload["created"],
)
elif event == "commit_comment":
subject = topic_focus
comment = payload.get("comment")
action = u"[commented]({})".format(comment["html_url"])
content = get_commits_comment_action_message(
comment["user"]["login"],
action,
comment["html_url"].split("#", 1)[0],
comment["commit_id"],
comment["body"],
)
else:
raise UnknownEventType(force_str(u"Event %s is unknown and cannot be handled" % (event,)))
return target_stream, subject, content
开发者ID:galexrt,项目名称:zulip,代码行数:72,代码来源:github.py
示例18: twitter_text
def twitter_text(self, text, urls, user_mentions, media):
# type: (text_type, List[Dict[text_type, text_type]], List[Dict[text_type, Any]], List[Dict[text_type, Any]]) -> Element
"""
Use data from the twitter API to turn links, mentions and media into A
tags.
This works by using the urls, user_mentions and media data from the
twitter API.
The first step is finding the locations of the URLs, mentions and media
in the text. For each match we build a dictionary with the start
location, end location, the URL to link to, and the text to show in the
link.
Next we sort the matches by start location. And for each we add the
text from the end of the last link to the start of the current link to
the output. The text needs to added to the text attribute of the first
node (the P tag) or the tail the last link created.
Finally we add any remaining text to the last node.
"""
to_linkify = [] # type: List[Dict[text_type, Any]]
# Build dicts for URLs
for url_data in urls:
short_url = url_data["url"]
full_url = url_data["expanded_url"]
for match in re.finditer(re.escape(short_url), text, re.IGNORECASE):
to_linkify.append({
'start': match.start(),
'end': match.end(),
'url': short_url,
'text': full_url,
})
# Build dicts for mentions
for user_mention in user_mentions:
screen_name = user_mention['screen_name']
mention_string = u'@' + screen_name
for match in re.finditer(re.escape(mention_string), text, re.IGNORECASE):
to_linkify.append({
'start': match.start(),
'end': match.end(),
'url': u'https://twitter.com/' + force_text(urllib.parse.quote(force_str(screen_name))),
'text': mention_string,
})
# Build dicts for media
for media_item in media:
short_url = media_item['url']
expanded_url = media_item['expanded_url']
for match in re.finditer(re.escape(short_url), text, re.IGNORECASE):
to_linkify.append({
'start': match.start(),
'end': match.end(),
'url': short_url,
'text': expanded_url,
})
to_linkify.sort(key=lambda x: x['start'])
p = current_node = markdown.util.etree.Element('p')
def set_text(text):
# type: (text_type) -> None
"""
Helper to set the text or the tail of the current_node
"""
if current_node == p:
current_node.text = text
else:
current_node.tail = text
current_index = 0
for link in to_linkify:
# The text we want to link starts in already linked text skip it
if link['start'] < current_index:
continue
# Add text from the end of last link to the start of the current
# link
set_text(text[current_index:link['start']])
current_index = link['end']
current_node = a = url_to_a(link['url'], link['text'])
p.append(a)
# Add any unused text
set_text(text[current_index:])
return p
开发者ID:150vb,项目名称:zulip,代码行数:85,代码来源:__init__.py
注:本文中的zerver.lib.str_utils.force_str函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论