本文整理汇总了Python中zerver.models.get_user_profile_by_id函数的典型用法代码示例。如果您正苦于以下问题:Python get_user_profile_by_id函数的具体用法?Python get_user_profile_by_id怎么用?Python get_user_profile_by_id使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_user_profile_by_id函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: consume
def consume(self, event: Mapping[str, Any]) -> None:
user_profile_id = event['user_profile_id']
user_profile = get_user_profile_by_id(user_profile_id)
message = cast(Dict[str, Any], event['message'])
# TODO: Do we actually want to allow multiple Services per bot user?
services = get_bot_services(user_profile_id)
for service in services:
bot_handler = get_bot_handler(str(service.name))
if bot_handler is None:
logging.error("Error: User %s has bot with invalid embedded bot service %s" % (
user_profile_id, service.name))
continue
try:
if hasattr(bot_handler, 'initialize'):
bot_handler.initialize(self.get_bot_api_client(user_profile))
if event['trigger'] == 'mention':
message['content'] = extract_query_without_mention(
message=message,
client=self.get_bot_api_client(user_profile),
)
assert message['content'] is not None
bot_handler.handle_message(
message=message,
bot_handler=self.get_bot_api_client(user_profile)
)
except EmbeddedBotQuitException as e:
logging.warning(str(e))
开发者ID:BakerWang,项目名称:zulip,代码行数:29,代码来源:queue_processors.py
示例2: consume
def consume(self, event):
logging.info("Received event: %s" % (event),)
user_profile = get_user_profile_by_id(event["user_profile_id"])
client = get_client(event["client"])
log_time = timestamp_to_datetime(event["time"])
status = event["status"]
do_update_user_presence(user_profile, client, log_time, status)
开发者ID:150vb,项目名称:zulip,代码行数:7,代码来源:queue_processors.py
示例3: handle_remove_push_notification
def handle_remove_push_notification(user_profile_id: int, message_ids: List[int]) -> None:
"""This should be called when a message that had previously had a
mobile push executed is read. This triggers a mobile push notifica
mobile app when the message is read on the server, to remove the
message from the notification.
"""
user_profile = get_user_profile_by_id(user_profile_id)
message_ids = bulk_access_messages_expect_usermessage(user_profile_id, message_ids)
gcm_payload, gcm_options = get_remove_payload_gcm(user_profile, message_ids)
if uses_notification_bouncer():
try:
send_notifications_to_bouncer(user_profile_id,
{},
gcm_payload,
gcm_options)
except requests.ConnectionError: # nocoverage
def failure_processor(event: Dict[str, Any]) -> None:
logger.warning(
"Maximum retries exceeded for trigger:%s event:push_notification" % (
event['user_profile_id']))
else:
android_devices = list(PushDeviceToken.objects.filter(
user=user_profile, kind=PushDeviceToken.GCM))
if android_devices:
send_android_push_notification(android_devices, gcm_payload, gcm_options)
UserMessage.objects.filter(
user_profile_id=user_profile_id,
message_id__in=message_ids,
).update(
flags=F('flags').bitand(
~UserMessage.flags.active_mobile_push_notification))
开发者ID:BakerWang,项目名称:zulip,代码行数:34,代码来源:push_notifications.py
示例4: notify_bot_owner
def notify_bot_owner(event: Dict[str, Any],
request_data: Dict[str, Any],
status_code: Optional[int]=None,
response_content: Optional[AnyStr]=None,
failure_message: Optional[str]=None,
exception: Optional[Exception]=None) -> None:
message_url = get_message_url(event)
bot_id = event['user_profile_id']
bot_owner = get_user_profile_by_id(bot_id).bot_owner
notification_message = "[A message](%s) triggered an outgoing webhook." % (message_url,)
if failure_message:
notification_message += "\n" + failure_message
if status_code:
notification_message += "\nThe webhook got a response with status code *%s*." % (status_code,)
if response_content:
notification_message += "\nThe response contains the following payload:\n" \
"```\n%s\n```" % (response_content,)
if exception:
notification_message += "\nWhen trying to send a request to the webhook service, an exception " \
"of type %s occurred:\n```\n%s\n```" % (
type(exception).__name__, str(exception))
message_info = dict(
type='private',
display_recipient=[dict(email=bot_owner.email)],
)
response_data = dict(content=notification_message)
send_response_message(bot_id=bot_id, message_info=message_info, response_data=response_data)
开发者ID:BakerWang,项目名称:zulip,代码行数:29,代码来源:outgoing_webhook.py
示例5: get_realm_for_filename
def get_realm_for_filename(path: str) -> Optional[int]:
conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
key = get_bucket(conn, settings.S3_AUTH_UPLOADS_BUCKET).get_key(path)
if key is None:
# This happens if the key does not exist.
return None
return get_user_profile_by_id(key.metadata["user_profile_id"]).realm_id
开发者ID:akashnimare,项目名称:zulip,代码行数:7,代码来源:upload.py
示例6: import_uploads_s3
def import_uploads_s3(bucket_name, import_dir, avatar_bucket=False):
# type: (str, Path, bool) -> None
conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
bucket = conn.get_bucket(bucket_name, validate=True)
records_filename = os.path.join(import_dir, "records.json")
with open(records_filename) as records_file:
records = ujson.loads(records_file.read())
for record in records:
key = Key(bucket)
if avatar_bucket:
# For avatars, we need to rehash the user's email with the
# new server's avatar salt
avatar_hash = user_avatar_hash(record['user_profile_email'])
key.key = avatar_hash
if record['s3_path'].endswith('.original'):
key.key += '.original'
else:
key.key = record['s3_path']
user_profile_id = int(record['user_profile_id'])
# Support email gateway bot and other cross-realm messages
if user_profile_id in id_maps["user_profile"]:
logging.info("Uploaded by ID mapped user: %s!" % (user_profile_id,))
user_profile_id = id_maps["user_profile"][user_profile_id]
user_profile = get_user_profile_by_id(user_profile_id)
key.set_metadata("user_profile_id", str(user_profile.id))
key.set_metadata("realm_id", str(user_profile.realm.id))
key.set_metadata("orig_last_modified", record['last_modified'])
headers = {'Content-Type': key['content_type']}
key.set_contents_from_filename(os.path.join(import_dir, record['path']), headers=headers)
开发者ID:HKingz,项目名称:zulip,代码行数:35,代码来源:export.py
示例7: get_user
def get_user(self, user_profile_id: int) -> Optional[UserProfile]:
"""Override the Django method for getting a UserProfile object from
the user_profile_id,."""
try:
return get_user_profile_by_id(user_profile_id)
except UserProfile.DoesNotExist:
return None
开发者ID:deltay,项目名称:zulip,代码行数:7,代码来源:backends.py
示例8: send_to_missed_message_address
def send_to_missed_message_address(address, message):
# type: (text_type, message.Message) -> None
token = get_missed_message_token_from_address(address)
key = missed_message_redis_key(token)
result = redis_client.hmget(key, 'user_profile_id', 'recipient_id', 'subject')
if not all(val is not None for val in result):
raise ZulipEmailForwardError('Missing missed message address data')
user_profile_id, recipient_id, subject = result
user_profile = get_user_profile_by_id(user_profile_id)
recipient = Recipient.objects.get(id=recipient_id)
display_recipient = get_display_recipient(recipient)
# Testing with basestring so we don't depend on the list return type from
# get_display_recipient
if not isinstance(display_recipient, six.string_types):
recipient_str = ','.join([user['email'] for user in display_recipient])
else:
recipient_str = display_recipient
body = filter_footer(extract_body(message))
body += extract_and_upload_attachments(message, user_profile.realm)
if not body:
body = '(No email body)'
if recipient.type == Recipient.STREAM:
recipient_type_name = 'stream'
else:
recipient_type_name = 'private'
internal_send_message(user_profile.email, recipient_type_name,
recipient_str, subject, body)
开发者ID:150vb,项目名称:zulip,代码行数:32,代码来源:email_mirror.py
示例9: fake_message_sender
def fake_message_sender(event):
# type: (Dict[str, Any]) -> None
"""This function is used only for Casper and backend tests, where
rabbitmq is disabled"""
log_data = dict() # type: Dict[str, Any]
record_request_start_data(log_data)
req = event['request']
try:
sender = get_user_profile_by_id(event['server_meta']['user_id'])
client = get_client("website")
msg_id = check_send_message(sender, client, req['type'],
extract_recipients(req['to']),
req['subject'], req['content'],
local_id=req.get('local_id', None),
sender_queue_id=req.get('queue_id', None))
resp = {"result": "success", "msg": "", "id": msg_id}
except JsonableError as e:
resp = {"result": "error", "msg": str(e)}
server_meta = event['server_meta']
server_meta.update({'worker_log_data': log_data,
'time_request_finished': time.time()})
result = {'response': resp, 'req_id': event['req_id'],
'server_meta': server_meta}
respond_send_message(result)
开发者ID:aakash-cr7,项目名称:zulip,代码行数:27,代码来源:socket.py
示例10: response_listener
def response_listener(error_response):
# type: (Dict[str, SupportsInt]) -> None
identifier = error_response['identifier']
key = get_apns_key(identifier)
if not redis_client.exists(key):
logging.warn("APNs key, {}, doesn't not exist.".format(key))
return
code = error_response['status']
assert isinstance(code, int)
errmsg = ERROR_CODES[code]
data = redis_client.hgetall(key)
token = data['token']
user = get_user_profile_by_id(int(data['user_id']))
b64_token = hex_to_b64(token)
logging.warn("APNS: Failed to deliver APNS notification to %s, reason: %s" % (b64_token, errmsg))
if code == 8:
# Invalid Token, remove from our database
logging.warn("APNS: Removing token from database due to above failure")
try:
PushDeviceToken.objects.get(user=user, token=b64_token).delete()
except PushDeviceToken.DoesNotExist:
pass
开发者ID:zulip,项目名称:zulip,代码行数:25,代码来源:push_notifications.py
示例11: get_user
def get_user(self, user_profile_id):
# type: (int) -> Optional[UserProfile]
""" Get a UserProfile object from the user_profile_id. """
try:
return get_user_profile_by_id(user_profile_id)
except UserProfile.DoesNotExist:
return None
开发者ID:umkay,项目名称:zulip,代码行数:7,代码来源:backends.py
示例12: send_to_missed_message_address
def send_to_missed_message_address(address, message):
# type: (text_type, message.Message) -> None
token = get_missed_message_token_from_address(address)
key = missed_message_redis_key(token)
result = redis_client.hmget(key, "user_profile_id", "recipient_id", "subject")
if not all(val is not None for val in result):
raise ZulipEmailForwardError("Missing missed message address data")
user_profile_id, recipient_id, subject = result
user_profile = get_user_profile_by_id(user_profile_id)
recipient = Recipient.objects.get(id=recipient_id)
display_recipient = get_display_recipient(recipient)
# Testing with basestring so we don't depend on the list return type from
# get_display_recipient
if not isinstance(display_recipient, six.string_types):
recipient_str = ",".join([user["email"] for user in display_recipient])
else:
recipient_str = display_recipient
body = filter_footer(extract_body(message))
body += extract_and_upload_attachments(message, user_profile.realm)
if not body:
body = "(No email body)"
if recipient.type == Recipient.STREAM:
recipient_type_name = "stream"
else:
recipient_type_name = "private"
internal_send_message(user_profile.email, recipient_type_name, recipient_str, subject, body)
logging.info("Successfully processed email from %s to %s" % (user_profile.email, recipient_str))
开发者ID:galexrt,项目名称:zulip,代码行数:32,代码来源:email_mirror.py
示例13: consume
def consume(self, event):
# type: (Mapping[str, Any]) -> None
user_profile = get_user_profile_by_id(event["user_profile_id"])
client = get_client(event["client"])
log_time = timestamp_to_datetime(event["time"])
query = event["query"]
do_update_user_activity(user_profile, client, query, log_time)
开发者ID:aakash-cr7,项目名称:zulip,代码行数:7,代码来源:queue_processors.py
示例14: test_change_delivery_email_end_to_end_with_admins_visibility
def test_change_delivery_email_end_to_end_with_admins_visibility(self) -> None:
user_profile = self.example_user('hamlet')
do_set_realm_property(user_profile.realm, 'email_address_visibility',
Realm.EMAIL_ADDRESS_VISIBILITY_ADMINS)
old_email = user_profile.email
new_email = '[email protected]'
self.login(self.example_email('hamlet'))
obj = EmailChangeStatus.objects.create(new_email=new_email,
old_email=old_email,
user_profile=user_profile,
realm=user_profile.realm)
key = generate_key()
Confirmation.objects.create(content_object=obj,
date_sent=now(),
confirmation_key=key,
type=Confirmation.EMAIL_CHANGE)
url = confirmation_url(key, user_profile.realm.host, Confirmation.EMAIL_CHANGE)
response = self.client_get(url)
self.assertEqual(response.status_code, 200)
self.assert_in_success_response(["This confirms that the email address for your Zulip"],
response)
user_profile = get_user_profile_by_id(user_profile.id)
self.assertEqual(user_profile.delivery_email, new_email)
self.assertEqual(user_profile.email, "[email protected]")
obj.refresh_from_db()
self.assertEqual(obj.status, 1)
with self.assertRaises(UserProfile.DoesNotExist):
get_user(old_email, user_profile.realm)
with self.assertRaises(UserProfile.DoesNotExist):
get_user_by_delivery_email(old_email, user_profile.realm)
self.assertEqual(get_user_by_delivery_email(new_email, user_profile.realm), user_profile)
开发者ID:BakerWang,项目名称:zulip,代码行数:33,代码来源:test_email_change.py
示例15: from_dict
def from_dict(cls, d: MutableMapping[str, Any]) -> 'ClientDescriptor':
if 'user_profile_email' not in d:
# Temporary migration for the addition of the new user_profile_email field
from zerver.models import get_user_profile_by_id
d['user_profile_email'] = get_user_profile_by_id(d['user_profile_id']).email
if 'client_type' in d:
# Temporary migration for the rename of client_type to client_type_name
d['client_type_name'] = d['client_type']
if 'client_gravatar' not in d:
# Temporary migration for the addition of the client_gravatar field
d['client_gravatar'] = False
ret = cls(
d['user_profile_id'],
d['user_profile_email'],
d['realm_id'],
EventQueue.from_dict(d['event_queue']),
d['event_types'],
d['client_type_name'],
d['apply_markdown'],
d['client_gravatar'],
d['all_public_streams'],
d['queue_timeout'],
d.get('narrow', [])
)
ret.last_connection_time = d['last_connection_time']
return ret
开发者ID:gnprice,项目名称:zulip,代码行数:27,代码来源:event_queue.py
示例16: avatar
def avatar(request: HttpRequest, email_or_id: str, medium: bool=False) -> HttpResponse:
"""Accepts an email address or user ID and returns the avatar"""
is_email = False
try:
int(email_or_id)
except ValueError:
is_email = True
try:
if is_email:
realm = request.user.realm
user_profile = get_user_including_cross_realm(email_or_id, realm)
else:
user_profile = get_user_profile_by_id(email_or_id)
# If there is a valid user account passed in, use its avatar
url = avatar_url(user_profile, medium=medium)
except UserProfile.DoesNotExist:
# If there is no such user, treat it as a new gravatar
email = email_or_id
avatar_version = 1
url = get_gravatar_url(email, avatar_version, medium)
# We can rely on the url already having query parameters. Because
# our templates depend on being able to use the ampersand to
# add query parameters to our url, get_avatar_url does '?x=x'
# hacks to prevent us from having to jump through decode/encode hoops.
assert '?' in url
url += '&' + request.META['QUERY_STRING']
return redirect(url)
开发者ID:joydeep1701,项目名称:zulip,代码行数:29,代码来源:users.py
示例17: send_to_missed_message_address
def send_to_missed_message_address(address, message):
# type: (Text, message.Message) -> None
token = get_missed_message_token_from_address(address)
key = missed_message_redis_key(token)
result = redis_client.hmget(key, 'user_profile_id', 'recipient_id', 'subject')
if not all(val is not None for val in result):
raise ZulipEmailForwardError('Missing missed message address data')
user_profile_id, recipient_id, subject_b = result # type: (bytes, bytes, bytes)
user_profile = get_user_profile_by_id(user_profile_id)
recipient = Recipient.objects.get(id=recipient_id)
display_recipient = get_display_recipient(recipient)
# Testing with basestring so we don't depend on the list return type from
# get_display_recipient
if not isinstance(display_recipient, str):
recipient_str = u','.join([user['email'] for user in display_recipient])
else:
recipient_str = display_recipient
body = construct_zulip_body(message, user_profile.realm)
if recipient.type == Recipient.STREAM:
recipient_type_name = 'stream'
else:
recipient_type_name = 'private'
internal_send_message(user_profile.realm, user_profile.email,
recipient_type_name, recipient_str,
subject_b.decode('utf-8'), body)
logger.info("Successfully processed email from %s to %s" % (
user_profile.email, recipient_str))
开发者ID:brockwhittaker,项目名称:zulip,代码行数:32,代码来源:email_mirror.py
示例18: handle_missedmessage_emails
def handle_missedmessage_emails(user_profile_id: int,
missed_email_events: Iterable[Dict[str, Any]]) -> None:
message_ids = {event.get('message_id'): event.get('trigger') for event in missed_email_events}
user_profile = get_user_profile_by_id(user_profile_id)
if not receives_offline_email_notifications(user_profile):
return
messages = Message.objects.filter(usermessage__user_profile_id=user_profile,
id__in=message_ids,
usermessage__flags=~UserMessage.flags.read)
# Cancel missed-message emails for deleted messages
messages = [um for um in messages if um.content != "(deleted)"]
if not messages:
return
messages_by_recipient_subject = defaultdict(list) # type: Dict[Tuple[int, str], List[Message]]
for msg in messages:
if msg.recipient.type == Recipient.PERSONAL:
# For PM's group using (recipient, sender).
messages_by_recipient_subject[(msg.recipient_id, msg.sender_id)].append(msg)
else:
messages_by_recipient_subject[(msg.recipient_id, msg.topic_name())].append(msg)
message_count_by_recipient_subject = {
recipient_subject: len(msgs)
for recipient_subject, msgs in messages_by_recipient_subject.items()
}
for msg_list in messages_by_recipient_subject.values():
msg = min(msg_list, key=lambda msg: msg.pub_date)
if msg.is_stream_message():
context_messages = get_context_for_message(msg)
filtered_context_messages = bulk_access_messages(user_profile, context_messages)
msg_list.extend(filtered_context_messages)
# Sort emails by least recently-active discussion.
recipient_subjects = [] # type: List[Tuple[Tuple[int, str], int]]
for recipient_subject, msg_list in messages_by_recipient_subject.items():
max_message_id = max(msg_list, key=lambda msg: msg.id).id
recipient_subjects.append((recipient_subject, max_message_id))
recipient_subjects = sorted(recipient_subjects, key=lambda x: x[1])
# Send an email per recipient subject pair
for recipient_subject, ignored_max_id in recipient_subjects:
unique_messages = {}
for m in messages_by_recipient_subject[recipient_subject]:
unique_messages[m.id] = dict(
message=m,
trigger=message_ids.get(m.id)
)
do_send_missedmessage_events_reply_in_zulip(
user_profile,
list(unique_messages.values()),
message_count_by_recipient_subject[recipient_subject],
)
开发者ID:gregmccoy,项目名称:zulip,代码行数:59,代码来源:notifications.py
示例19: delete_all_deactivated_user_sessions
def delete_all_deactivated_user_sessions() -> None:
for session in Session.objects.all():
user_profile_id = get_session_user(session)
if user_profile_id is None:
continue
user_profile = get_user_profile_by_id(user_profile_id)
if not user_profile.is_active or user_profile.realm.deactivated:
logging.info("Deactivating session for deactivated user %s" % (user_profile.email,))
delete_session(session)
开发者ID:284928489,项目名称:zulip,代码行数:9,代码来源:sessions.py
示例20: get_message_url
def get_message_url(event: Dict[str, Any]) -> str:
bot_user = get_user_profile_by_id(event['user_profile_id'])
message = event['message']
realm = bot_user.realm
return near_message_url(
realm=realm,
message=message,
)
开发者ID:BakerWang,项目名称:zulip,代码行数:9,代码来源:outgoing_webhook.py
注:本文中的zerver.models.get_user_profile_by_id函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论