• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python models.get_user_profile_by_id函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中zerver.models.get_user_profile_by_id函数的典型用法代码示例。如果您正苦于以下问题:Python get_user_profile_by_id函数的具体用法?Python get_user_profile_by_id怎么用?Python get_user_profile_by_id使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get_user_profile_by_id函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: consume

    def consume(self, event: Mapping[str, Any]) -> None:
        user_profile_id = event['user_profile_id']
        user_profile = get_user_profile_by_id(user_profile_id)

        message = cast(Dict[str, Any], event['message'])

        # TODO: Do we actually want to allow multiple Services per bot user?
        services = get_bot_services(user_profile_id)
        for service in services:
            bot_handler = get_bot_handler(str(service.name))
            if bot_handler is None:
                logging.error("Error: User %s has bot with invalid embedded bot service %s" % (
                    user_profile_id, service.name))
                continue
            try:
                if hasattr(bot_handler, 'initialize'):
                    bot_handler.initialize(self.get_bot_api_client(user_profile))
                if event['trigger'] == 'mention':
                    message['content'] = extract_query_without_mention(
                        message=message,
                        client=self.get_bot_api_client(user_profile),
                    )
                    assert message['content'] is not None
                bot_handler.handle_message(
                    message=message,
                    bot_handler=self.get_bot_api_client(user_profile)
                )
            except EmbeddedBotQuitException as e:
                logging.warning(str(e))
开发者ID:BakerWang,项目名称:zulip,代码行数:29,代码来源:queue_processors.py


示例2: consume

 def consume(self, event):
     logging.info("Received event: %s" % (event),)
     user_profile = get_user_profile_by_id(event["user_profile_id"])
     client = get_client(event["client"])
     log_time = timestamp_to_datetime(event["time"])
     status = event["status"]
     do_update_user_presence(user_profile, client, log_time, status)
开发者ID:150vb,项目名称:zulip,代码行数:7,代码来源:queue_processors.py


示例3: handle_remove_push_notification

def handle_remove_push_notification(user_profile_id: int, message_ids: List[int]) -> None:
    """This should be called when a message that had previously had a
    mobile push executed is read.  This triggers a mobile push notifica
    mobile app when the message is read on the server, to remove the
    message from the notification.

    """
    user_profile = get_user_profile_by_id(user_profile_id)
    message_ids = bulk_access_messages_expect_usermessage(user_profile_id, message_ids)
    gcm_payload, gcm_options = get_remove_payload_gcm(user_profile, message_ids)

    if uses_notification_bouncer():
        try:
            send_notifications_to_bouncer(user_profile_id,
                                          {},
                                          gcm_payload,
                                          gcm_options)
        except requests.ConnectionError:  # nocoverage
            def failure_processor(event: Dict[str, Any]) -> None:
                logger.warning(
                    "Maximum retries exceeded for trigger:%s event:push_notification" % (
                        event['user_profile_id']))
    else:
        android_devices = list(PushDeviceToken.objects.filter(
            user=user_profile, kind=PushDeviceToken.GCM))
        if android_devices:
            send_android_push_notification(android_devices, gcm_payload, gcm_options)

    UserMessage.objects.filter(
        user_profile_id=user_profile_id,
        message_id__in=message_ids,
    ).update(
        flags=F('flags').bitand(
            ~UserMessage.flags.active_mobile_push_notification))
开发者ID:BakerWang,项目名称:zulip,代码行数:34,代码来源:push_notifications.py


示例4: notify_bot_owner

def notify_bot_owner(event: Dict[str, Any],
                     request_data: Dict[str, Any],
                     status_code: Optional[int]=None,
                     response_content: Optional[AnyStr]=None,
                     failure_message: Optional[str]=None,
                     exception: Optional[Exception]=None) -> None:
    message_url = get_message_url(event)
    bot_id = event['user_profile_id']
    bot_owner = get_user_profile_by_id(bot_id).bot_owner

    notification_message = "[A message](%s) triggered an outgoing webhook." % (message_url,)
    if failure_message:
        notification_message += "\n" + failure_message
    if status_code:
        notification_message += "\nThe webhook got a response with status code *%s*." % (status_code,)
    if response_content:
        notification_message += "\nThe response contains the following payload:\n" \
                                "```\n%s\n```" % (response_content,)
    if exception:
        notification_message += "\nWhen trying to send a request to the webhook service, an exception " \
                                "of type %s occurred:\n```\n%s\n```" % (
                                    type(exception).__name__, str(exception))

    message_info = dict(
        type='private',
        display_recipient=[dict(email=bot_owner.email)],
    )
    response_data = dict(content=notification_message)
    send_response_message(bot_id=bot_id, message_info=message_info, response_data=response_data)
开发者ID:BakerWang,项目名称:zulip,代码行数:29,代码来源:outgoing_webhook.py


示例5: get_realm_for_filename

def get_realm_for_filename(path: str) -> Optional[int]:
    conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
    key = get_bucket(conn, settings.S3_AUTH_UPLOADS_BUCKET).get_key(path)
    if key is None:
        # This happens if the key does not exist.
        return None
    return get_user_profile_by_id(key.metadata["user_profile_id"]).realm_id
开发者ID:akashnimare,项目名称:zulip,代码行数:7,代码来源:upload.py


示例6: import_uploads_s3

def import_uploads_s3(bucket_name, import_dir, avatar_bucket=False):
    # type: (str, Path, bool) -> None
    conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
    bucket = conn.get_bucket(bucket_name, validate=True)

    records_filename = os.path.join(import_dir, "records.json")
    with open(records_filename) as records_file:
        records = ujson.loads(records_file.read())

    for record in records:
        key = Key(bucket)

        if avatar_bucket:
            # For avatars, we need to rehash the user's email with the
            # new server's avatar salt
            avatar_hash = user_avatar_hash(record['user_profile_email'])
            key.key = avatar_hash
            if record['s3_path'].endswith('.original'):
                key.key += '.original'
        else:
            key.key = record['s3_path']

        user_profile_id = int(record['user_profile_id'])
        # Support email gateway bot and other cross-realm messages
        if user_profile_id in id_maps["user_profile"]:
            logging.info("Uploaded by ID mapped user: %s!" % (user_profile_id,))
            user_profile_id = id_maps["user_profile"][user_profile_id]
        user_profile = get_user_profile_by_id(user_profile_id)
        key.set_metadata("user_profile_id", str(user_profile.id))
        key.set_metadata("realm_id", str(user_profile.realm.id))
        key.set_metadata("orig_last_modified", record['last_modified'])

        headers = {'Content-Type': key['content_type']}

        key.set_contents_from_filename(os.path.join(import_dir, record['path']), headers=headers)
开发者ID:HKingz,项目名称:zulip,代码行数:35,代码来源:export.py


示例7: get_user

 def get_user(self, user_profile_id: int) -> Optional[UserProfile]:
     """Override the Django method for getting a UserProfile object from
     the user_profile_id,."""
     try:
         return get_user_profile_by_id(user_profile_id)
     except UserProfile.DoesNotExist:
         return None
开发者ID:deltay,项目名称:zulip,代码行数:7,代码来源:backends.py


示例8: send_to_missed_message_address

def send_to_missed_message_address(address, message):
    # type: (text_type, message.Message) -> None
    token = get_missed_message_token_from_address(address)
    key = missed_message_redis_key(token)
    result = redis_client.hmget(key, 'user_profile_id', 'recipient_id', 'subject')
    if not all(val is not None for val in result):
        raise ZulipEmailForwardError('Missing missed message address data')
    user_profile_id, recipient_id, subject = result

    user_profile = get_user_profile_by_id(user_profile_id)
    recipient = Recipient.objects.get(id=recipient_id)
    display_recipient = get_display_recipient(recipient)

    # Testing with basestring so we don't depend on the list return type from
    # get_display_recipient
    if not isinstance(display_recipient, six.string_types):
        recipient_str = ','.join([user['email'] for user in display_recipient])
    else:
        recipient_str = display_recipient

    body = filter_footer(extract_body(message))
    body += extract_and_upload_attachments(message, user_profile.realm)
    if not body:
        body = '(No email body)'

    if recipient.type == Recipient.STREAM:
        recipient_type_name = 'stream'
    else:
        recipient_type_name = 'private'

    internal_send_message(user_profile.email, recipient_type_name,
                          recipient_str, subject, body)
开发者ID:150vb,项目名称:zulip,代码行数:32,代码来源:email_mirror.py


示例9: fake_message_sender

def fake_message_sender(event):
    # type: (Dict[str, Any]) -> None
    """This function is used only for Casper and backend tests, where
    rabbitmq is disabled"""
    log_data = dict() # type: Dict[str, Any]
    record_request_start_data(log_data)

    req = event['request']
    try:
        sender = get_user_profile_by_id(event['server_meta']['user_id'])
        client = get_client("website")

        msg_id = check_send_message(sender, client, req['type'],
                                    extract_recipients(req['to']),
                                    req['subject'], req['content'],
                                    local_id=req.get('local_id', None),
                                    sender_queue_id=req.get('queue_id', None))
        resp = {"result": "success", "msg": "", "id": msg_id}
    except JsonableError as e:
        resp = {"result": "error", "msg": str(e)}

    server_meta = event['server_meta']
    server_meta.update({'worker_log_data': log_data,
                        'time_request_finished': time.time()})
    result = {'response': resp, 'req_id': event['req_id'],
              'server_meta': server_meta}
    respond_send_message(result)
开发者ID:aakash-cr7,项目名称:zulip,代码行数:27,代码来源:socket.py


示例10: response_listener

def response_listener(error_response):
    # type: (Dict[str, SupportsInt]) -> None
    identifier = error_response['identifier']
    key = get_apns_key(identifier)
    if not redis_client.exists(key):
        logging.warn("APNs key, {}, doesn't not exist.".format(key))
        return

    code = error_response['status']
    assert isinstance(code, int)

    errmsg = ERROR_CODES[code]
    data = redis_client.hgetall(key)
    token = data['token']
    user = get_user_profile_by_id(int(data['user_id']))
    b64_token = hex_to_b64(token)

    logging.warn("APNS: Failed to deliver APNS notification to %s, reason: %s" % (b64_token, errmsg))
    if code == 8:
        # Invalid Token, remove from our database
        logging.warn("APNS: Removing token from database due to above failure")
        try:
            PushDeviceToken.objects.get(user=user, token=b64_token).delete()
        except PushDeviceToken.DoesNotExist:
            pass
开发者ID:zulip,项目名称:zulip,代码行数:25,代码来源:push_notifications.py


示例11: get_user

 def get_user(self, user_profile_id):
     # type: (int) -> Optional[UserProfile]
     """ Get a UserProfile object from the user_profile_id. """
     try:
         return get_user_profile_by_id(user_profile_id)
     except UserProfile.DoesNotExist:
         return None
开发者ID:umkay,项目名称:zulip,代码行数:7,代码来源:backends.py


示例12: send_to_missed_message_address

def send_to_missed_message_address(address, message):
    # type: (text_type, message.Message) -> None
    token = get_missed_message_token_from_address(address)
    key = missed_message_redis_key(token)
    result = redis_client.hmget(key, "user_profile_id", "recipient_id", "subject")
    if not all(val is not None for val in result):
        raise ZulipEmailForwardError("Missing missed message address data")
    user_profile_id, recipient_id, subject = result

    user_profile = get_user_profile_by_id(user_profile_id)
    recipient = Recipient.objects.get(id=recipient_id)
    display_recipient = get_display_recipient(recipient)

    # Testing with basestring so we don't depend on the list return type from
    # get_display_recipient
    if not isinstance(display_recipient, six.string_types):
        recipient_str = ",".join([user["email"] for user in display_recipient])
    else:
        recipient_str = display_recipient

    body = filter_footer(extract_body(message))
    body += extract_and_upload_attachments(message, user_profile.realm)
    if not body:
        body = "(No email body)"

    if recipient.type == Recipient.STREAM:
        recipient_type_name = "stream"
    else:
        recipient_type_name = "private"

    internal_send_message(user_profile.email, recipient_type_name, recipient_str, subject, body)
    logging.info("Successfully processed email from %s to %s" % (user_profile.email, recipient_str))
开发者ID:galexrt,项目名称:zulip,代码行数:32,代码来源:email_mirror.py


示例13: consume

 def consume(self, event):
     # type: (Mapping[str, Any]) -> None
     user_profile = get_user_profile_by_id(event["user_profile_id"])
     client = get_client(event["client"])
     log_time = timestamp_to_datetime(event["time"])
     query = event["query"]
     do_update_user_activity(user_profile, client, query, log_time)
开发者ID:aakash-cr7,项目名称:zulip,代码行数:7,代码来源:queue_processors.py


示例14: test_change_delivery_email_end_to_end_with_admins_visibility

    def test_change_delivery_email_end_to_end_with_admins_visibility(self) -> None:
        user_profile = self.example_user('hamlet')
        do_set_realm_property(user_profile.realm, 'email_address_visibility',
                              Realm.EMAIL_ADDRESS_VISIBILITY_ADMINS)

        old_email = user_profile.email
        new_email = '[email protected]'
        self.login(self.example_email('hamlet'))
        obj = EmailChangeStatus.objects.create(new_email=new_email,
                                               old_email=old_email,
                                               user_profile=user_profile,
                                               realm=user_profile.realm)
        key = generate_key()
        Confirmation.objects.create(content_object=obj,
                                    date_sent=now(),
                                    confirmation_key=key,
                                    type=Confirmation.EMAIL_CHANGE)
        url = confirmation_url(key, user_profile.realm.host, Confirmation.EMAIL_CHANGE)
        response = self.client_get(url)

        self.assertEqual(response.status_code, 200)
        self.assert_in_success_response(["This confirms that the email address for your Zulip"],
                                        response)
        user_profile = get_user_profile_by_id(user_profile.id)
        self.assertEqual(user_profile.delivery_email, new_email)
        self.assertEqual(user_profile.email, "[email protected]")
        obj.refresh_from_db()
        self.assertEqual(obj.status, 1)
        with self.assertRaises(UserProfile.DoesNotExist):
            get_user(old_email, user_profile.realm)
        with self.assertRaises(UserProfile.DoesNotExist):
            get_user_by_delivery_email(old_email, user_profile.realm)
        self.assertEqual(get_user_by_delivery_email(new_email, user_profile.realm), user_profile)
开发者ID:BakerWang,项目名称:zulip,代码行数:33,代码来源:test_email_change.py


示例15: from_dict

    def from_dict(cls, d: MutableMapping[str, Any]) -> 'ClientDescriptor':
        if 'user_profile_email' not in d:
            # Temporary migration for the addition of the new user_profile_email field
            from zerver.models import get_user_profile_by_id
            d['user_profile_email'] = get_user_profile_by_id(d['user_profile_id']).email
        if 'client_type' in d:
            # Temporary migration for the rename of client_type to client_type_name
            d['client_type_name'] = d['client_type']
        if 'client_gravatar' not in d:
            # Temporary migration for the addition of the client_gravatar field
            d['client_gravatar'] = False

        ret = cls(
            d['user_profile_id'],
            d['user_profile_email'],
            d['realm_id'],
            EventQueue.from_dict(d['event_queue']),
            d['event_types'],
            d['client_type_name'],
            d['apply_markdown'],
            d['client_gravatar'],
            d['all_public_streams'],
            d['queue_timeout'],
            d.get('narrow', [])
        )
        ret.last_connection_time = d['last_connection_time']
        return ret
开发者ID:gnprice,项目名称:zulip,代码行数:27,代码来源:event_queue.py


示例16: avatar

def avatar(request: HttpRequest, email_or_id: str, medium: bool=False) -> HttpResponse:
    """Accepts an email address or user ID and returns the avatar"""
    is_email = False
    try:
        int(email_or_id)
    except ValueError:
        is_email = True

    try:
        if is_email:
            realm = request.user.realm
            user_profile = get_user_including_cross_realm(email_or_id, realm)
        else:
            user_profile = get_user_profile_by_id(email_or_id)
        # If there is a valid user account passed in, use its avatar
        url = avatar_url(user_profile, medium=medium)
    except UserProfile.DoesNotExist:
        # If there is no such user, treat it as a new gravatar
        email = email_or_id
        avatar_version = 1
        url = get_gravatar_url(email, avatar_version, medium)

    # We can rely on the url already having query parameters. Because
    # our templates depend on being able to use the ampersand to
    # add query parameters to our url, get_avatar_url does '?x=x'
    # hacks to prevent us from having to jump through decode/encode hoops.
    assert '?' in url
    url += '&' + request.META['QUERY_STRING']
    return redirect(url)
开发者ID:joydeep1701,项目名称:zulip,代码行数:29,代码来源:users.py


示例17: send_to_missed_message_address

def send_to_missed_message_address(address, message):
    # type: (Text, message.Message) -> None
    token = get_missed_message_token_from_address(address)
    key = missed_message_redis_key(token)
    result = redis_client.hmget(key, 'user_profile_id', 'recipient_id', 'subject')
    if not all(val is not None for val in result):
        raise ZulipEmailForwardError('Missing missed message address data')
    user_profile_id, recipient_id, subject_b = result  # type: (bytes, bytes, bytes)

    user_profile = get_user_profile_by_id(user_profile_id)
    recipient = Recipient.objects.get(id=recipient_id)
    display_recipient = get_display_recipient(recipient)

    # Testing with basestring so we don't depend on the list return type from
    # get_display_recipient
    if not isinstance(display_recipient, str):
        recipient_str = u','.join([user['email'] for user in display_recipient])
    else:
        recipient_str = display_recipient

    body = construct_zulip_body(message, user_profile.realm)

    if recipient.type == Recipient.STREAM:
        recipient_type_name = 'stream'
    else:
        recipient_type_name = 'private'

    internal_send_message(user_profile.realm, user_profile.email,
                          recipient_type_name, recipient_str,
                          subject_b.decode('utf-8'), body)
    logger.info("Successfully processed email from %s to %s" % (
        user_profile.email, recipient_str))
开发者ID:brockwhittaker,项目名称:zulip,代码行数:32,代码来源:email_mirror.py


示例18: handle_missedmessage_emails

def handle_missedmessage_emails(user_profile_id: int,
                                missed_email_events: Iterable[Dict[str, Any]]) -> None:
    message_ids = {event.get('message_id'): event.get('trigger') for event in missed_email_events}

    user_profile = get_user_profile_by_id(user_profile_id)
    if not receives_offline_email_notifications(user_profile):
        return

    messages = Message.objects.filter(usermessage__user_profile_id=user_profile,
                                      id__in=message_ids,
                                      usermessage__flags=~UserMessage.flags.read)

    # Cancel missed-message emails for deleted messages
    messages = [um for um in messages if um.content != "(deleted)"]

    if not messages:
        return

    messages_by_recipient_subject = defaultdict(list)  # type: Dict[Tuple[int, str], List[Message]]
    for msg in messages:
        if msg.recipient.type == Recipient.PERSONAL:
            # For PM's group using (recipient, sender).
            messages_by_recipient_subject[(msg.recipient_id, msg.sender_id)].append(msg)
        else:
            messages_by_recipient_subject[(msg.recipient_id, msg.topic_name())].append(msg)

    message_count_by_recipient_subject = {
        recipient_subject: len(msgs)
        for recipient_subject, msgs in messages_by_recipient_subject.items()
    }

    for msg_list in messages_by_recipient_subject.values():
        msg = min(msg_list, key=lambda msg: msg.pub_date)
        if msg.is_stream_message():
            context_messages = get_context_for_message(msg)
            filtered_context_messages = bulk_access_messages(user_profile, context_messages)
            msg_list.extend(filtered_context_messages)

    # Sort emails by least recently-active discussion.
    recipient_subjects = []  # type: List[Tuple[Tuple[int, str], int]]
    for recipient_subject, msg_list in messages_by_recipient_subject.items():
        max_message_id = max(msg_list, key=lambda msg: msg.id).id
        recipient_subjects.append((recipient_subject, max_message_id))

    recipient_subjects = sorted(recipient_subjects, key=lambda x: x[1])

    # Send an email per recipient subject pair
    for recipient_subject, ignored_max_id in recipient_subjects:
        unique_messages = {}
        for m in messages_by_recipient_subject[recipient_subject]:
            unique_messages[m.id] = dict(
                message=m,
                trigger=message_ids.get(m.id)
            )
        do_send_missedmessage_events_reply_in_zulip(
            user_profile,
            list(unique_messages.values()),
            message_count_by_recipient_subject[recipient_subject],
        )
开发者ID:gregmccoy,项目名称:zulip,代码行数:59,代码来源:notifications.py


示例19: delete_all_deactivated_user_sessions

def delete_all_deactivated_user_sessions() -> None:
    for session in Session.objects.all():
        user_profile_id = get_session_user(session)
        if user_profile_id is None:
            continue
        user_profile = get_user_profile_by_id(user_profile_id)
        if not user_profile.is_active or user_profile.realm.deactivated:
            logging.info("Deactivating session for deactivated user %s" % (user_profile.email,))
            delete_session(session)
开发者ID:284928489,项目名称:zulip,代码行数:9,代码来源:sessions.py


示例20: get_message_url

def get_message_url(event: Dict[str, Any]) -> str:
    bot_user = get_user_profile_by_id(event['user_profile_id'])
    message = event['message']
    realm = bot_user.realm

    return near_message_url(
        realm=realm,
        message=message,
    )
开发者ID:BakerWang,项目名称:zulip,代码行数:9,代码来源:outgoing_webhook.py



注:本文中的zerver.models.get_user_profile_by_id函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python models.remote_user_to_email函数代码示例发布时间:2022-05-26
下一篇:
Python models.get_user_profile_by_email函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap