本文整理汇总了Python中zerver.models.get_user_profile_by_email函数的典型用法代码示例。如果您正苦于以下问题:Python get_user_profile_by_email函数的具体用法?Python get_user_profile_by_email怎么用?Python get_user_profile_by_email使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_user_profile_by_email函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: handle
def handle(self, *args, **options):
# type: (*Any, **str) -> None
old_email = options['old_email']
if options['new_email']:
new_email = options['new_email']
else:
new_email = old_email
gravatar_url = "https://secure.gravatar.com/avatar/%s?d=identicon" % (gravatar_hash(old_email),)
gravatar_data = requests.get(gravatar_url).content
gravatar_file = SimpleUploadedFile('gravatar.jpg', gravatar_data, 'image/jpeg')
try:
user_profile = get_user_profile_by_email(old_email)
except UserProfile.DoesNotExist:
try:
user_profile = get_user_profile_by_email(new_email)
except UserProfile.DoesNotExist:
raise CommandError("Could not find specified user")
upload_avatar_image(gravatar_file, user_profile, old_email)
if old_email != new_email:
gravatar_file.seek(0)
upload_avatar_image(gravatar_file, user_profile, new_email)
user_profile.avatar_source = UserProfile.AVATAR_FROM_USER
user_profile.save(update_fields=['avatar_source'])
开发者ID:Jianchun1,项目名称:zulip,代码行数:28,代码来源:gravatar_to_user_avatar.py
示例2: add_bot_backend
def add_bot_backend(request, user_profile, full_name=REQ(), short_name=REQ(),
default_sending_stream_name=REQ('default_sending_stream', default=None),
default_events_register_stream_name=REQ('default_events_register_stream', default=None),
default_all_public_streams=REQ(validator=check_bool, default=None)):
# type: (HttpRequest, UserProfile, text_type, text_type, Optional[text_type], Optional[text_type], Optional[bool]) -> HttpResponse
short_name += "-bot"
email = short_name + "@" + user_profile.realm.domain
form = CreateUserForm({'full_name': full_name, 'email': email})
if not form.is_valid():
# We validate client-side as well
return json_error(_('Bad name or username'))
try:
get_user_profile_by_email(email)
return json_error(_("Username already in use"))
except UserProfile.DoesNotExist:
pass
if len(request.FILES) == 0:
avatar_source = UserProfile.AVATAR_FROM_GRAVATAR
elif len(request.FILES) != 1:
return json_error(_("You may only upload one file at a time"))
else:
user_file = list(request.FILES.values())[0]
upload_avatar_image(user_file, user_profile, email)
avatar_source = UserProfile.AVATAR_FROM_USER
default_sending_stream = None
if default_sending_stream_name is not None:
default_sending_stream = stream_or_none(default_sending_stream_name, user_profile.realm)
if default_sending_stream and not default_sending_stream.is_public() and not \
subscribed_to_stream(user_profile, default_sending_stream):
return json_error(_('Insufficient permission'))
default_events_register_stream = None
if default_events_register_stream_name is not None:
default_events_register_stream = stream_or_none(default_events_register_stream_name,
user_profile.realm)
if default_events_register_stream and not default_events_register_stream.is_public() and not \
subscribed_to_stream(user_profile, default_events_register_stream):
return json_error(_('Insufficient permission'))
bot_profile = do_create_user(email=email, password='',
realm=user_profile.realm, full_name=full_name,
short_name=short_name, active=True,
bot_type=UserProfile.DEFAULT_BOT,
bot_owner=user_profile,
avatar_source=avatar_source,
default_sending_stream=default_sending_stream,
default_events_register_stream=default_events_register_stream,
default_all_public_streams=default_all_public_streams)
json_result = dict(
api_key=bot_profile.api_key,
avatar_url=avatar_url(bot_profile),
default_sending_stream=get_stream_name(bot_profile.default_sending_stream),
default_events_register_stream=get_stream_name(bot_profile.default_events_register_stream),
default_all_public_streams=bot_profile.default_all_public_streams,
)
return json_success(json_result)
开发者ID:TijeeCorp,项目名称:zulip,代码行数:60,代码来源:users.py
示例3: test_invite_user_signup_initial_history
def test_invite_user_signup_initial_history(self):
# type: () -> None
"""
Test that a new user invited to a stream receives some initial
history but only from public streams.
"""
self.login("[email protected]")
user_profile = get_user_profile_by_email("[email protected]")
private_stream_name = "Secret"
(stream, _) = create_stream_if_needed(user_profile.realm, private_stream_name, invite_only=True)
do_add_subscription(user_profile, stream)
public_msg_id = self.send_message("[email protected]", "Denmark", Recipient.STREAM,
"Public topic", "Public message")
secret_msg_id = self.send_message("[email protected]", private_stream_name, Recipient.STREAM,
"Secret topic", "Secret message")
invitee = "[email protected]"
self.assert_json_success(self.invite(invitee, [private_stream_name, "Denmark"]))
self.assertTrue(find_key_by_email(invitee))
self.submit_reg_form_for_user("alice-test", "password")
invitee_profile = get_user_profile_by_email(invitee)
invitee_msg_ids = [um.message_id for um in
UserMessage.objects.filter(user_profile=invitee_profile)]
self.assertTrue(public_msg_id in invitee_msg_ids)
self.assertFalse(secret_msg_id in invitee_msg_ids)
开发者ID:tobby2002,项目名称:zulip,代码行数:25,代码来源:test_signup.py
示例4: test_mention_shortname
def test_mention_shortname(self):
sender_user_profile = get_user_profile_by_email("[email protected]")
user_profile = get_user_profile_by_email("[email protected]")
msg = Message(sender=sender_user_profile, sending_client=get_client("test"))
content = "@**hamlet**"
self.assertEqual(msg.render_markdown(content),
'<p><span class="user-mention" data-user-email="[email protected]">@King Hamlet</span></p>')
self.assertEqual(msg.mentions_user_ids, set([user_profile.id]))
开发者ID:tobby2002,项目名称:zulip,代码行数:9,代码来源:test_bugdown.py
示例5: add_bot_backend
def add_bot_backend(request, user_profile, full_name_raw=REQ("full_name"), short_name=REQ(),
default_sending_stream_name=REQ('default_sending_stream', default=None),
default_events_register_stream_name=REQ('default_events_register_stream', default=None),
default_all_public_streams=REQ(validator=check_bool, default=None)):
# type: (HttpRequest, UserProfile, Text, Text, Optional[Text], Optional[Text], Optional[bool]) -> HttpResponse
short_name += "-bot"
full_name = check_full_name(full_name_raw)
email = '%[email protected]%s' % (short_name, user_profile.realm.get_bot_domain())
form = CreateUserForm({'full_name': full_name, 'email': email})
if not form.is_valid():
# We validate client-side as well
return json_error(_('Bad name or username'))
try:
get_user_profile_by_email(email)
return json_error(_("Username already in use"))
except UserProfile.DoesNotExist:
pass
if len(request.FILES) == 0:
avatar_source = UserProfile.AVATAR_FROM_GRAVATAR
elif len(request.FILES) != 1:
return json_error(_("You may only upload one file at a time"))
else:
avatar_source = UserProfile.AVATAR_FROM_USER
default_sending_stream = None
if default_sending_stream_name is not None:
(default_sending_stream, ignored_rec, ignored_sub) = access_stream_by_name(
user_profile, default_sending_stream_name)
default_events_register_stream = None
if default_events_register_stream_name is not None:
(default_events_register_stream, ignored_rec, ignored_sub) = access_stream_by_name(
user_profile, default_events_register_stream_name)
bot_profile = do_create_user(email=email, password='',
realm=user_profile.realm, full_name=full_name,
short_name=short_name, active=True,
bot_type=UserProfile.DEFAULT_BOT,
bot_owner=user_profile,
avatar_source=avatar_source,
default_sending_stream=default_sending_stream,
default_events_register_stream=default_events_register_stream,
default_all_public_streams=default_all_public_streams)
if len(request.FILES) == 1:
user_file = list(request.FILES.values())[0]
upload_avatar_image(user_file, user_profile, bot_profile)
json_result = dict(
api_key=bot_profile.api_key,
avatar_url=avatar_url(bot_profile),
default_sending_stream=get_stream_name(bot_profile.default_sending_stream),
default_events_register_stream=get_stream_name(bot_profile.default_events_register_stream),
default_all_public_streams=bot_profile.default_all_public_streams,
)
return json_success(json_result)
开发者ID:dawran6,项目名称:zulip,代码行数:56,代码来源:users.py
示例6: test_pointer_out_of_range
def test_pointer_out_of_range(self):
"""
Posting json to /json/update_pointer with an out of range (< 0) pointer returns a 400
and error message.
"""
self.login("[email protected]")
self.assertEqual(get_user_profile_by_email("[email protected]").pointer, -1)
result = self.client.post("/json/update_pointer", {"pointer": -2})
self.assert_json_error(result, "Bad value for 'pointer': -2")
self.assertEqual(get_user_profile_by_email("[email protected]").pointer, -1)
开发者ID:Croolis,项目名称:zulip,代码行数:10,代码来源:test_unread.py
示例7: test_missing_pointer
def test_missing_pointer(self):
"""
Posting json to /json/update_pointer which does not contain a pointer key/value pair
returns a 400 and error message.
"""
self.login("[email protected]")
self.assertEqual(get_user_profile_by_email("[email protected]").pointer, -1)
result = self.client.post("/json/update_pointer", {"foo": 1})
self.assert_json_error(result, "Missing 'pointer' argument")
self.assertEqual(get_user_profile_by_email("[email protected]").pointer, -1)
开发者ID:Croolis,项目名称:zulip,代码行数:10,代码来源:test_unread.py
示例8: test_invalid_pointer
def test_invalid_pointer(self):
"""
Posting json to /json/update_pointer with an invalid pointer returns a 400 and error
message.
"""
self.login("[email protected]")
self.assertEqual(get_user_profile_by_email("[email protected]").pointer, -1)
result = self.client.post("/json/update_pointer", {"pointer": "foo"})
self.assert_json_error(result, "Bad value for 'pointer': foo")
self.assertEqual(get_user_profile_by_email("[email protected]").pointer, -1)
开发者ID:Croolis,项目名称:zulip,代码行数:10,代码来源:test_unread.py
示例9: export_avatars_local_helper
def export_avatars_local_helper(realm, output_dir, local_dir):
# type: (Realm, Path, Path) -> None
if not os.path.exists(output_dir):
os.makedirs(output_dir)
count = 0
records = []
users = list(UserProfile.objects.filter(realm=realm))
users += [
get_user_profile_by_email(settings.NOTIFICATION_BOT),
get_user_profile_by_email(settings.EMAIL_GATEWAY_BOT),
get_user_profile_by_email(settings.WELCOME_BOT),
]
for user in users:
if user.avatar_source == UserProfile.AVATAR_FROM_GRAVATAR:
continue
# NOTE: There is an avatar source called AVATAR_FROM_SYSTEM,
# but I'm not sure we support it any more. If we
# have system-generated avatars, then arguably we
# don't need to export them, but it's probably
# expedient to just copy them over. The more
# common case is AVATAR_FROM_USER, which is handled
# here as well. AVATAR_FROM_GRAVATAR refers to
# avatars hosted by gravatar.com, and for them,
# we have no files to worry about exporting
avatar_hash = user_avatar_hash(user.email)
wildcard = os.path.join(local_dir, avatar_hash + '.*')
for local_path in glob.glob(wildcard):
logging.info('Copying avatar file for user %s from %s' % (
user.email, local_path))
fn = os.path.basename(local_path)
output_path = os.path.join(output_dir, fn)
mkdir_p(str(os.path.dirname(output_path)))
subprocess.check_call(["cp", "-a", str(local_path), str(output_path)])
stat = os.stat(local_path)
record = dict(realm_id=realm.id,
user_profile_id=user.id,
user_profile_email=user.email,
s3_path=fn,
path=fn,
size=stat.st_size,
last_modified=stat.st_mtime,
content_type=None)
records.append(record)
count += 1
if (count % 100 == 0):
logging.info("Finished %s" % (count,))
with open(os.path.join(output_dir, "records.json"), "w") as records_file:
ujson.dump(records, records_file, indent=4)
开发者ID:HKingz,项目名称:zulip,代码行数:55,代码来源:export.py
示例10: test_api_update_pointer
def test_api_update_pointer(self):
"""
Same as above, but for the API view
"""
email = "[email protected]"
self.assertEqual(get_user_profile_by_email(email).pointer, -1)
msg_id = self.send_message("[email protected]", "Verona", Recipient.STREAM)
result = self.client_put("/api/v1/users/me/pointer", {"pointer": msg_id},
**self.api_auth(email))
self.assert_json_success(result)
self.assertEqual(get_user_profile_by_email(email).pointer, msg_id)
开发者ID:Croolis,项目名称:zulip,代码行数:11,代码来源:test_unread.py
示例11: test_update_pointer
def test_update_pointer(self):
"""
Posting a pointer to /update (in the form {"pointer": pointer}) changes
the pointer we store for your UserProfile.
"""
self.login("[email protected]")
self.assertEqual(get_user_profile_by_email("[email protected]").pointer, -1)
msg_id = self.send_message("[email protected]", "Verona", Recipient.STREAM)
result = self.client.post("/json/update_pointer", {"pointer": msg_id})
self.assert_json_success(result)
self.assertEqual(get_user_profile_by_email("[email protected]").pointer, msg_id)
开发者ID:Croolis,项目名称:zulip,代码行数:11,代码来源:test_unread.py
示例12: test_do_change_realm_subdomain_clears_user_realm_cache
def test_do_change_realm_subdomain_clears_user_realm_cache(self) -> None:
"""The main complicated thing about changing realm subdomains is
updating the cache, and we start by populating the cache for
Hamlet, and we end by checking the cache to ensure that his
realm appears to be deactivated. You can make this test fail
by disabling cache.flush_realm()."""
user = get_user_profile_by_email('[email protected]')
realm = get_realm('zulip')
do_change_realm_subdomain(realm, "newzulip")
user = get_user_profile_by_email('[email protected]')
self.assertEqual(user.realm.string_id, "newzulip")
# This doesn't use a cache right now, but may later.
self.assertIsNone(get_realm("zulip"))
开发者ID:showell,项目名称:zulip,代码行数:13,代码来源:test_realm.py
示例13: test_register_deactivated
def test_register_deactivated(self):
"""
If you try to register for a deactivated realm, you get a clear error
page.
"""
realm = get_realm("zulip.com")
realm.deactivated = True
realm.save(update_fields=["deactivated"])
result = self.register("test", "test")
self.assertIn("has been deactivated", result.content.replace("\n", " "))
with self.assertRaises(UserProfile.DoesNotExist):
get_user_profile_by_email('[email protected]')
开发者ID:anindya,项目名称:zulip,代码行数:14,代码来源:test_signup.py
示例14: test_mention_multiple
def test_mention_multiple(self):
sender_user_profile = get_user_profile_by_email("othe[email protected]")
hamlet = get_user_profile_by_email("[email protected]")
cordelia = get_user_profile_by_email("[email protected]")
msg = Message(sender=sender_user_profile, sending_client=get_client("test"))
content = "@**King Hamlet** and @**cordelia**, check this out"
self.assertEqual(msg.render_markdown(content),
'<p>'
'<span class="user-mention" '
'data-user-email="[email protected]">@King Hamlet</span> and '
'<span class="user-mention" '
'data-user-email="[email protected]">@Cordelia Lear</span>, '
'check this out</p>')
self.assertEqual(msg.mentions_user_ids, set([hamlet.id, cordelia.id]))
开发者ID:tobby2002,项目名称:zulip,代码行数:15,代码来源:test_bugdown.py
示例15: enqueue_welcome_emails
def enqueue_welcome_emails(email, name):
sender = {'email': '[email protected]', 'name': 'Waseem Daher'}
if settings.VOYAGER:
sender = {'email': settings.ZULIP_ADMINISTRATOR, 'name': 'Zulip'}
user_profile = get_user_profile_by_email(email)
unsubscribe_link = one_click_unsubscribe_link(user_profile, "welcome")
template_payload = {'name': name,
'not_voyager': not settings.VOYAGER,
'external_host': settings.EXTERNAL_HOST,
'unsubscribe_link': unsubscribe_link}
#Send day 1 email
send_local_email_template_with_delay([{'email': email, 'name': name}],
"zerver/emails/followup/day1",
template_payload,
datetime.timedelta(hours=1),
tags=["followup-emails"],
sender=sender)
#Send day 2 email
tomorrow = datetime.datetime.utcnow() + datetime.timedelta(hours=24)
# 11 AM EDT
tomorrow_morning = datetime.datetime(tomorrow.year, tomorrow.month, tomorrow.day, 15, 0)
assert(datetime.datetime.utcnow() < tomorrow_morning)
send_local_email_template_with_delay([{'email': email, 'name': name}],
"zerver/emails/followup/day2",
template_payload,
tomorrow_morning - datetime.datetime.utcnow(),
tags=["followup-emails"],
sender=sender)
开发者ID:Gabriel0402,项目名称:zulip,代码行数:31,代码来源:notifications.py
示例16: test_receive_stream_email_messages_empty_body
def test_receive_stream_email_messages_empty_body(self):
# build dummy messages for stream
# test message with empty body is not sent
self.login("[email protected]")
user_profile = get_user_profile_by_email("[email protected]")
self.subscribe_to_stream(user_profile.email, "Denmark")
stream = get_stream("Denmark", user_profile.realm)
stream_to_address = encode_email_address(stream)
headers = {}
headers['Reply-To'] = '[email protected]'
# empty body
incoming_valid_message = MIMEText('')
incoming_valid_message['Subject'] = 'TestStreamEmailMessages Subject'
incoming_valid_message['From'] = "[email protected]"
incoming_valid_message['To'] = stream_to_address
incoming_valid_message['Reply-to'] = "[email protected]"
exception_message = ""
debug_info = {}
# process_message eats the exception & logs an error which can't be parsed here
# so calling process_stream_message directly
try:
process_stream_message(incoming_valid_message['To'],
incoming_valid_message['Subject'],
incoming_valid_message,
debug_info)
except ZulipEmailForwardError as e:
# empty body throws exception
exception_message = e.message
self.assertEqual(exception_message, "Unable to find plaintext or HTML message body")
开发者ID:seanly,项目名称:zulip,代码行数:35,代码来源:test_email_mirror.py
示例17: test_receive_stream_email_messages_success
def test_receive_stream_email_messages_success(self):
# build dummy messages for stream
# test valid incoming stream message is processed properly
self.login("[email protected]")
user_profile = get_user_profile_by_email("[email protected]")
self.subscribe_to_stream(user_profile.email, "Denmark")
stream = get_stream("Denmark", user_profile.realm)
stream_to_address = encode_email_address(stream)
incoming_valid_message = MIMEText('TestStreamEmailMessages Body')
incoming_valid_message['Subject'] = 'TestStreamEmailMessages Subject'
incoming_valid_message['From'] = "[email protected]"
incoming_valid_message['To'] = stream_to_address
incoming_valid_message['Reply-to'] = "[email protected]"
process_message(incoming_valid_message)
# Hamlet is subscribed to this stream so should see the email message from Othello.
message = most_recent_message(user_profile)
self.assertEqual(message.content, "TestStreamEmailMessages Body")
self.assertEqual(get_display_recipient(message.recipient), stream.name)
self.assertEqual(message.subject, incoming_valid_message['Subject'])
开发者ID:seanly,项目名称:zulip,代码行数:26,代码来源:test_email_mirror.py
示例18: create_mirrored_message_users
def create_mirrored_message_users(request, user_profile, recipients):
# type: (HttpResponse, UserProfile, Iterable[Text]) -> Tuple[bool, Optional[UserProfile]]
if "sender" not in request.POST:
return (False, None)
sender_email = request.POST["sender"].strip().lower()
referenced_users = set([sender_email])
if request.POST['type'] == 'private':
for email in recipients:
referenced_users.add(email.lower())
if request.client.name == "zephyr_mirror":
user_check = same_realm_zephyr_user
fullname_function = compute_mit_user_fullname
elif request.client.name == "irc_mirror":
user_check = same_realm_irc_user
fullname_function = compute_irc_user_fullname
elif request.client.name in ("jabber_mirror", "JabberMirror"):
user_check = same_realm_jabber_user
fullname_function = compute_jabber_user_fullname
else:
# Unrecognized mirroring client
return (False, None)
for email in referenced_users:
# Check that all referenced users are in our realm:
if not user_check(user_profile, email):
return (False, None)
# Create users for the referenced users, if needed.
for email in referenced_users:
create_mirror_user_if_needed(user_profile.realm, email, fullname_function)
sender = get_user_profile_by_email(sender_email)
return (True, sender)
开发者ID:souravbadami,项目名称:zulip,代码行数:35,代码来源:messages.py
示例19: by_pm_with
def by_pm_with(self, query, operand, maybe_negate):
# type: (Query, str, ConditionTransform) -> Query
if ',' in operand:
# Huddle
try:
emails = [e.strip() for e in operand.split(',')]
recipient = recipient_for_emails(emails, False,
self.user_profile, self.user_profile)
except ValidationError:
raise BadNarrowOperator('unknown recipient ' + operand)
cond = column("recipient_id") == recipient.id
return query.where(maybe_negate(cond))
else:
# Personal message
self_recipient = get_recipient(Recipient.PERSONAL, type_id=self.user_profile.id)
if operand == self.user_profile.email:
# Personals with self
cond = and_(column("sender_id") == self.user_profile.id,
column("recipient_id") == self_recipient.id)
return query.where(maybe_negate(cond))
# Personals with other user; include both directions.
try:
narrow_profile = get_user_profile_by_email(operand)
except UserProfile.DoesNotExist:
raise BadNarrowOperator('unknown user ' + operand)
narrow_recipient = get_recipient(Recipient.PERSONAL, narrow_profile.id)
cond = or_(and_(column("sender_id") == narrow_profile.id,
column("recipient_id") == self_recipient.id),
and_(column("sender_id") == self.user_profile.id,
column("recipient_id") == narrow_recipient.id))
return query.where(maybe_negate(cond))
开发者ID:souravbadami,项目名称:zulip,代码行数:33,代码来源:messages.py
示例20: enqueue_welcome_emails
def enqueue_welcome_emails(email, name):
# type: (text_type, text_type) -> None
if settings.WELCOME_EMAIL_SENDER is not None:
sender = settings.WELCOME_EMAIL_SENDER # type: Dict[str, text_type]
else:
sender = {'email': settings.ZULIP_ADMINISTRATOR, 'name': 'Zulip'}
user_profile = get_user_profile_by_email(email)
unsubscribe_link = one_click_unsubscribe_link(user_profile, "welcome")
template_payload = {'name': name,
'verbose_support_offers': settings.VERBOSE_SUPPORT_OFFERS,
'external_host': settings.EXTERNAL_HOST,
'external_uri_scheme': settings.EXTERNAL_URI_SCHEME,
'server_uri': settings.SERVER_URI,
'realm_uri': user_profile.realm.uri,
'unsubscribe_link': unsubscribe_link}
# Send day 1 email
send_local_email_template_with_delay([{'email': email, 'name': name}],
"zerver/emails/followup/day1",
template_payload,
datetime.timedelta(hours=1),
tags=["followup-emails"],
sender=sender)
# Send day 2 email
send_local_email_template_with_delay([{'email': email, 'name': name}],
"zerver/emails/followup/day2",
template_payload,
datetime.timedelta(days=1),
tags=["followup-emails"],
sender=sender)
开发者ID:neurodynamic,项目名称:zulip,代码行数:32,代码来源:notifications.py
注:本文中的zerver.models.get_user_profile_by_email函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论