• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python models.get_stream_recipient函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中zerver.models.get_stream_recipient函数的典型用法代码示例。如果您正苦于以下问题:Python get_stream_recipient函数的具体用法?Python get_stream_recipient怎么用?Python get_stream_recipient使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get_stream_recipient函数的19个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_remove_muted_topic

    def test_remove_muted_topic(self) -> None:
        user = self.example_user('hamlet')
        email = user.email
        realm = user.realm
        self.login(email)

        stream = get_stream(u'Verona', realm)
        recipient = get_stream_recipient(stream.id)

        url = '/api/v1/users/me/subscriptions/muted_topics'
        payloads = [
            {'stream': stream.name, 'topic': 'vERONA3', 'op': 'remove'},
            {'stream_id': stream.id, 'topic': 'vEroNA3', 'op': 'remove'},
        ]

        for data in payloads:
            add_topic_mute(
                user_profile=user,
                stream_id=stream.id,
                recipient_id=recipient.id,
                topic_name='Verona3',
            )
            self.assertIn([stream.name, 'Verona3'], get_topic_mutes(user))

            result = self.api_patch(email, url, data)

            self.assert_json_success(result)
            self.assertNotIn([stream.name, 'Verona3'], get_topic_mutes(user))
            self.assertFalse(topic_is_muted(user, stream.id, 'verona3'))
开发者ID:BakerWang,项目名称:zulip,代码行数:29,代码来源:test_muting.py


示例2: access_stream_common

def access_stream_common(user_profile, stream, error):
    # type: (UserProfile, Stream, Text) -> Tuple[Recipient, Subscription]
    """Common function for backend code where the target use attempts to
    access the target stream, returning all the data fetched along the
    way.  If that user does not have permission to access that stream,
    we throw an exception.  A design goal is that the error message is
    the same for streams you can't access and streams that don't exist."""

    # First, we don't allow any access to streams in other realms.
    if stream.realm_id != user_profile.realm_id:
        raise JsonableError(error)

    recipient = get_stream_recipient(stream.id)

    try:
        sub = Subscription.objects.get(user_profile=user_profile,
                                       recipient=recipient,
                                       active=True)
    except Subscription.DoesNotExist:
        sub = None

    # If the stream is in your realm and public, you can access it.
    if stream.is_public():
        return (recipient, sub)

    # Or if you are subscribed to the stream, you can access it.
    if sub is not None:
        return (recipient, sub)

    # Otherwise it is a private stream and you're not on it, so throw
    # an error.
    raise JsonableError(error)
开发者ID:brockwhittaker,项目名称:zulip,代码行数:32,代码来源:streams.py


示例3: test_muted_topic_add_invalid

    def test_muted_topic_add_invalid(self) -> None:
        user = self.example_user('hamlet')
        email = user.email
        realm = user.realm
        self.login(email)

        stream = get_stream('Verona', realm)
        recipient = get_stream_recipient(stream.id)
        add_topic_mute(
            user_profile=user,
            stream_id=stream.id,
            recipient_id=recipient.id,
            topic_name=u'Verona3',
        )

        url = '/api/v1/users/me/subscriptions/muted_topics'

        data = {'stream': stream.name, 'topic': 'Verona3', 'op': 'add'}  # type: Dict[str, Any]
        result = self.api_patch(email, url, data)
        self.assert_json_error(result, "Topic already muted")

        data = {'stream_id': 999999999, 'topic': 'Verona3', 'op': 'add'}
        result = self.api_patch(email, url, data)
        self.assert_json_error(result, "Invalid stream id")

        data = {'topic': 'Verona3', 'op': 'add'}
        result = self.api_patch(email, url, data)
        self.assert_json_error(result, "Please supply 'stream'.")

        data = {'stream': stream.name, 'stream_id': stream.id, 'topic': 'Verona3', 'op': 'add'}
        result = self.api_patch(email, url, data)
        self.assert_json_error(result, "Please choose one: 'stream' or 'stream_id'.")
开发者ID:BakerWang,项目名称:zulip,代码行数:32,代码来源:test_muting.py


示例4: test_user_ids_muting_topic

    def test_user_ids_muting_topic(self) -> None:
        hamlet = self.example_user('hamlet')
        cordelia  = self.example_user('cordelia')
        realm = hamlet.realm
        stream = get_stream(u'Verona', realm)
        recipient = get_stream_recipient(stream.id)
        topic_name = 'teST topic'

        stream_topic_target = StreamTopicTarget(
            stream_id=stream.id,
            topic_name=topic_name,
        )

        user_ids = stream_topic_target.user_ids_muting_topic()
        self.assertEqual(user_ids, set())

        def mute_user(user: UserProfile) -> None:
            add_topic_mute(
                user_profile=user,
                stream_id=stream.id,
                recipient_id=recipient.id,
                topic_name='test TOPIC',
            )

        mute_user(hamlet)
        user_ids = stream_topic_target.user_ids_muting_topic()
        self.assertEqual(user_ids, {hamlet.id})

        mute_user(cordelia)
        user_ids = stream_topic_target.user_ids_muting_topic()
        self.assertEqual(user_ids, {hamlet.id, cordelia.id})
开发者ID:brainwane,项目名称:zulip,代码行数:31,代码来源:test_muting.py


示例5: mute_stream

 def mute_stream(stream_name: str) -> None:
     stream = get_stream(stream_name, realm)
     recipient = get_stream_recipient(stream.id)
     subscription = Subscription.objects.get(
         user_profile=user,
         recipient=recipient
     )
     subscription.in_home_view = False
     subscription.save()
开发者ID:BakerWang,项目名称:zulip,代码行数:9,代码来源:test_unread.py


示例6: mute_topic

        def mute_topic(stream_name: str, topic_name: str) -> None:
            stream = get_stream(stream_name, realm)
            recipient = get_stream_recipient(stream.id)

            add_topic_mute(
                user_profile=user,
                stream_id=stream.id,
                recipient_id=recipient.id,
                topic_name=topic_name,
            )
开发者ID:BakerWang,项目名称:zulip,代码行数:10,代码来源:test_unread.py


示例7: handle

    def handle(self, *args: Any, **options: str) -> None:
        realm = self.get_realm(options)
        assert realm is not None  # Should be ensured by parser
        stream_to_keep = get_stream(options["stream_to_keep"], realm)
        stream_to_destroy = get_stream(options["stream_to_destroy"], realm)

        recipient_to_destroy = get_stream_recipient(stream_to_destroy.id)
        recipient_to_keep = get_stream_recipient(stream_to_keep.id)

        # The high-level approach here is to move all the messages to
        # the surviving stream, deactivate all the subscriptions on
        # the stream to be removed and deactivate the stream, and add
        # new subscriptions to the stream to keep for any users who
        # were only on the now-deactivated stream.

        # Move the messages, and delete the old copies from caches.
        message_ids_to_clear = list(Message.objects.filter(
            recipient=recipient_to_destroy).values_list("id", flat=True))
        count = Message.objects.filter(recipient=recipient_to_destroy).update(recipient=recipient_to_keep)
        print("Moved %s messages" % (count,))
        bulk_delete_cache_keys(message_ids_to_clear)

        # Move the Subscription objects.  This algorithm doesn't
        # preserve any stream settings/colors/etc. from the stream
        # being destroyed, but it's convenient.
        existing_subs = Subscription.objects.filter(recipient=recipient_to_keep)
        users_already_subscribed = dict((sub.user_profile_id, sub.active) for sub in existing_subs)

        subs_to_deactivate = Subscription.objects.filter(recipient=recipient_to_destroy, active=True)
        users_to_activate = [
            sub.user_profile for sub in subs_to_deactivate
            if not users_already_subscribed.get(sub.user_profile_id, False)
        ]

        if len(subs_to_deactivate) > 0:
            print("Deactivating %s subscriptions" % (len(subs_to_deactivate),))
            bulk_remove_subscriptions([sub.user_profile for sub in subs_to_deactivate],
                                      [stream_to_destroy],
                                      self.get_client())
        do_deactivate_stream(stream_to_destroy)
        if len(users_to_activate) > 0:
            print("Adding %s subscriptions" % (len(users_to_activate),))
            bulk_add_subscriptions([stream_to_keep], users_to_activate)
开发者ID:BakerWang,项目名称:zulip,代码行数:43,代码来源:merge_streams.py


示例8: mute_topic

        def mute_topic(stream_name, topic_name):
            # type: (Text, Text) -> None
            stream = get_stream(stream_name, realm)
            recipient = get_stream_recipient(stream.id)

            add_topic_mute(
                user_profile=user,
                stream_id=stream.id,
                recipient_id=recipient.id,
                topic_name=topic_name,
            )
开发者ID:brockwhittaker,项目名称:zulip,代码行数:11,代码来源:test_unread.py


示例9: get_web_public_topics_backend

def get_web_public_topics_backend(request: HttpRequest, stream_id: int) -> HttpResponse:
    try:
        stream = get_stream_by_id(stream_id)
    except JsonableError:
        return json_success(dict(topics=[]))

    if not stream.is_web_public:
        return json_success(dict(topics=[]))

    recipient = get_stream_recipient(stream.id)

    result = get_topic_history_for_web_public_stream(recipient=recipient)

    return json_success(dict(topics=result))
开发者ID:284928489,项目名称:zulip,代码行数:14,代码来源:archive.py


示例10: _turn_on_stream_push_for_cordelia

 def _turn_on_stream_push_for_cordelia(self) -> None:
     '''
     conventions:
         Cordelia is the message receiver we care about.
         Scotland is our stream.
     '''
     cordelia = self.example_user('cordelia')
     stream = self.subscribe(cordelia, 'Scotland')
     recipient = get_stream_recipient(stream.id)
     cordelia_subscription = Subscription.objects.get(
         user_profile_id=cordelia.id,
         recipient=recipient,
     )
     cordelia_subscription.push_notifications = True
     cordelia_subscription.save()
开发者ID:gnprice,项目名称:zulip,代码行数:15,代码来源:test_message_edit_notifications.py


示例11: test_muted_topic_add_invalid

    def test_muted_topic_add_invalid(self) -> None:
        self.user_profile = self.example_user('hamlet')
        email = self.user_profile.email
        self.login(email)

        realm = self.user_profile.realm
        stream = get_stream(u'Verona', realm)
        recipient = get_stream_recipient(stream.id)
        add_topic_mute(
            user_profile=self.user_profile,
            stream_id=stream.id,
            recipient_id=recipient.id,
            topic_name=u'Verona3',
        )

        url = '/api/v1/users/me/subscriptions/muted_topics'
        data = {'stream': 'Verona', 'topic': 'Verona3', 'op': 'add'}
        result = self.api_patch(email, url, data)
        self.assert_json_error(result, "Topic already muted")
开发者ID:brainwane,项目名称:zulip,代码行数:19,代码来源:test_muting.py


示例12: set_topic_mutes

def set_topic_mutes(user_profile: UserProfile, muted_topics: List[List[str]]) -> None:

    '''
    This is only used in tests.
    '''

    MutedTopic.objects.filter(
        user_profile=user_profile,
    ).delete()

    for stream_name, topic_name in muted_topics:
        stream = get_stream(stream_name, user_profile.realm)
        recipient = get_stream_recipient(stream.id)

        add_topic_mute(
            user_profile=user_profile,
            stream_id=stream.id,
            recipient_id=recipient.id,
            topic_name=topic_name,
        )
开发者ID:BakerWang,项目名称:zulip,代码行数:20,代码来源:topic_mutes.py


示例13: access_stream_common

def access_stream_common(user_profile: UserProfile, stream: Stream,
                         error: str,
                         require_active: bool=True,
                         allow_realm_admin: bool=False) -> Tuple[Recipient, Optional[Subscription]]:
    """Common function for backend code where the target use attempts to
    access the target stream, returning all the data fetched along the
    way.  If that user does not have permission to access that stream,
    we throw an exception.  A design goal is that the error message is
    the same for streams you can't access and streams that don't exist."""

    # First, we don't allow any access to streams in other realms.
    if stream.realm_id != user_profile.realm_id:
        raise JsonableError(error)

    recipient = get_stream_recipient(stream.id)

    try:
        sub = Subscription.objects.get(user_profile=user_profile,
                                       recipient=recipient,
                                       active=require_active)
    except Subscription.DoesNotExist:
        sub = None

    # If the stream is in your realm and public, you can access it.
    if stream.is_public() and not user_profile.is_guest:
        return (recipient, sub)

    # Or if you are subscribed to the stream, you can access it.
    if sub is not None:
        return (recipient, sub)

    # For some specific callers (e.g. getting list of subscribers,
    # removing other users from a stream, and updating stream name and
    # description), we allow realm admins to access stream even if
    # they are not subscribed to a private stream.
    if user_profile.is_realm_admin and allow_realm_admin:
        return (recipient, sub)

    # Otherwise it is a private stream and you're not on it, so throw
    # an error.
    raise JsonableError(error)
开发者ID:BakerWang,项目名称:zulip,代码行数:41,代码来源:streams.py


示例14: test_remove_muted_topic

    def test_remove_muted_topic(self) -> None:
        self.user_profile = self.example_user('hamlet')
        email = self.user_profile.email
        self.login(email)

        realm = self.user_profile.realm
        stream = get_stream(u'Verona', realm)
        recipient = get_stream_recipient(stream.id)
        add_topic_mute(
            user_profile=self.user_profile,
            stream_id=stream.id,
            recipient_id=recipient.id,
            topic_name=u'Verona3',
        )

        url = '/api/v1/users/me/subscriptions/muted_topics'
        data = {'stream': 'Verona', 'topic': 'vERONA3', 'op': 'remove'}
        result = self.api_patch(email, url, data)

        self.assert_json_success(result)
        user = self.example_user('hamlet')
        self.assertNotIn([[u'Verona', u'Verona3']], get_topic_mutes(user))
开发者ID:brainwane,项目名称:zulip,代码行数:22,代码来源:test_muting.py


示例15: test_stream_recipient_info

    def test_stream_recipient_info(self) -> None:
        hamlet = self.example_user('hamlet')
        cordelia = self.example_user('cordelia')
        othello = self.example_user('othello')

        realm = hamlet.realm

        stream_name = 'Test Stream'
        topic_name = 'test topic'

        for user in [hamlet, cordelia, othello]:
            self.subscribe(user, stream_name)

        stream = get_stream(stream_name, realm)
        recipient = get_stream_recipient(stream.id)

        stream_topic = StreamTopicTarget(
            stream_id=stream.id,
            topic_name=topic_name,
        )

        sub = get_subscription(stream_name, hamlet)
        sub.push_notifications = True
        sub.save()

        info = get_recipient_info(
            recipient=recipient,
            sender_id=hamlet.id,
            stream_topic=stream_topic,
        )

        all_user_ids = {hamlet.id, cordelia.id, othello.id}

        expected_info = dict(
            active_user_ids=all_user_ids,
            push_notify_user_ids=set(),
            stream_push_user_ids={hamlet.id},
            stream_email_user_ids=set(),
            um_eligible_user_ids=all_user_ids,
            long_term_idle_user_ids=set(),
            default_bot_user_ids=set(),
            service_bot_tuples=[],
        )

        self.assertEqual(info, expected_info)

        # Now mute Hamlet to omit him from stream_push_user_ids.
        add_topic_mute(
            user_profile=hamlet,
            stream_id=stream.id,
            recipient_id=recipient.id,
            topic_name=topic_name,
        )

        info = get_recipient_info(
            recipient=recipient,
            sender_id=hamlet.id,
            stream_topic=stream_topic,
        )

        self.assertEqual(info['stream_push_user_ids'], set())

        # Add a service bot.
        service_bot = do_create_user(
            email='[email protected]',
            password='',
            realm=realm,
            full_name='',
            short_name='',
            bot_type=UserProfile.EMBEDDED_BOT,
        )

        info = get_recipient_info(
            recipient=recipient,
            sender_id=hamlet.id,
            stream_topic=stream_topic,
            possibly_mentioned_user_ids={service_bot.id}
        )
        self.assertEqual(info['service_bot_tuples'], [
            (service_bot.id, UserProfile.EMBEDDED_BOT),
        ])

        # Add a normal bot.
        normal_bot = do_create_user(
            email='[email protected]',
            password='',
            realm=realm,
            full_name='',
            short_name='',
            bot_type=UserProfile.DEFAULT_BOT,
        )

        info = get_recipient_info(
            recipient=recipient,
            sender_id=hamlet.id,
            stream_topic=stream_topic,
            possibly_mentioned_user_ids={service_bot.id, normal_bot.id}
        )
        self.assertEqual(info['default_bot_user_ids'], {normal_bot.id})
开发者ID:rishig,项目名称:zulip,代码行数:99,代码来源:test_users.py


示例16: home_real


#.........这里部分代码省略.........
        server_inline_image_preview = settings.INLINE_IMAGE_PREVIEW,
        server_inline_url_embed_preview = settings.INLINE_URL_EMBED_PREVIEW,
        password_min_length = settings.PASSWORD_MIN_LENGTH,
        password_min_guesses  = settings.PASSWORD_MIN_GUESSES,
        jitsi_server_url      = settings.JITSI_SERVER_URL,
        search_pills_enabled  = settings.SEARCH_PILLS_ENABLED,

        # Misc. extra data.
        have_initial_messages = user_has_messages,
        initial_servertime    = time.time(),  # Used for calculating relative presence age
        default_language_name = get_language_name(register_ret['default_language']),
        language_list_dbl_col = get_language_list_for_templates(register_ret['default_language']),
        language_list         = get_language_list(),
        needs_tutorial        = needs_tutorial,
        first_in_realm        = first_in_realm,
        prompt_for_invites    = prompt_for_invites,
        furthest_read_time    = sent_time_in_epoch_seconds(latest_read),
        has_mobile_devices    = num_push_devices_for_user(user_profile) > 0,
        bot_types             = get_bot_types(user_profile),
        two_fa_enabled        = two_fa_enabled,
        # Adding two_fa_enabled as condition saves us 3 queries when
        # 2FA is not enabled.
        two_fa_enabled_user   = two_fa_enabled and bool(default_device(user_profile)),
    )

    undesired_register_ret_fields = [
        'streams',
    ]
    for field_name in set(register_ret.keys()) - set(undesired_register_ret_fields):
        page_params[field_name] = register_ret[field_name]

    if narrow_stream is not None:
        # In narrow_stream context, initial pointer is just latest message
        recipient = get_stream_recipient(narrow_stream.id)
        try:
            initial_pointer = Message.objects.filter(recipient=recipient).order_by('id').reverse()[0].id
        except IndexError:
            initial_pointer = -1
        page_params["narrow_stream"] = narrow_stream.name
        if narrow_topic is not None:
            page_params["narrow_topic"] = narrow_topic
        page_params["narrow"] = [dict(operator=term[0], operand=term[1]) for term in narrow]
        page_params["max_message_id"] = initial_pointer
        page_params["pointer"] = initial_pointer
        page_params["have_initial_messages"] = (initial_pointer != -1)
        page_params["enable_desktop_notifications"] = False

    statsd.incr('views.home')
    show_invites = True

    # Some realms only allow admins to invite users
    if user_profile.realm.invite_by_admins_only and not user_profile.is_realm_admin:
        show_invites = False
    if user_profile.is_guest:
        show_invites = False

    show_billing = False
    show_plans = False
    if settings.CORPORATE_ENABLED:
        from corporate.models import Customer
        if user_profile.is_billing_admin or user_profile.is_realm_admin:
            customer = Customer.objects.filter(realm=user_profile.realm).first()
            if customer is not None and customer.has_billing_relationship:
                show_billing = True
        if user_profile.realm.plan_type == Realm.LIMITED:
            show_plans = True
开发者ID:akashnimare,项目名称:zulip,代码行数:67,代码来源:home.py


示例17: home_real


#.........这里部分代码省略.........
        register_ret['pointer'] = register_ret['max_message_id']
        user_profile.last_pointer_updater = request.session.session_key

    if user_profile.pointer == -1:
        latest_read = None
    else:
        try:
            latest_read = UserMessage.objects.get(user_profile=user_profile,
                                                  message__id=user_profile.pointer)
        except UserMessage.DoesNotExist:
            # Don't completely fail if your saved pointer ID is invalid
            logging.warning("%s has invalid pointer %s" % (user_profile.email, user_profile.pointer))
            latest_read = None

    # Set default language and make it persist
    default_language = register_ret['default_language']
    url_lang = '/{}'.format(request.LANGUAGE_CODE)
    if not request.path.startswith(url_lang):
        translation.activate(default_language)
        request.session[translation.LANGUAGE_SESSION_KEY] = translation.get_language()

    # Pass parameters to the client-side JavaScript code.
    # These end up in a global JavaScript Object named 'page_params'.
    page_params = dict(
        # Server settings.
        development_environment = settings.DEVELOPMENT,
        debug_mode            = settings.DEBUG,
        test_suite            = settings.TEST_SUITE,
        poll_timeout          = settings.POLL_TIMEOUT,
        login_page            = settings.HOME_NOT_LOGGED_IN,
        root_domain_uri       = settings.ROOT_DOMAIN_URI,
        maxfilesize           = settings.MAX_FILE_UPLOAD_SIZE,
        max_avatar_file_size  = settings.MAX_AVATAR_FILE_SIZE,
        server_generation     = settings.SERVER_GENERATION,
        use_websockets        = settings.USE_WEBSOCKETS,
        save_stacktraces      = settings.SAVE_FRONTEND_STACKTRACES,
        server_inline_image_preview = settings.INLINE_IMAGE_PREVIEW,
        server_inline_url_embed_preview = settings.INLINE_URL_EMBED_PREVIEW,
        password_min_length = settings.PASSWORD_MIN_LENGTH,
        password_min_guesses  = settings.PASSWORD_MIN_GUESSES,

        # Misc. extra data.
        have_initial_messages = user_has_messages,
        initial_servertime    = time.time(),  # Used for calculating relative presence age
        default_language_name = get_language_name(register_ret['default_language']),
        language_list_dbl_col = get_language_list_for_templates(register_ret['default_language']),
        language_list         = get_language_list(),
        needs_tutorial        = needs_tutorial,
        first_in_realm        = first_in_realm,
        prompt_for_invites    = prompt_for_invites,
        furthest_read_time    = sent_time_in_epoch_seconds(latest_read),
        has_mobile_devices    = num_push_devices_for_user(user_profile) > 0,
    )

    undesired_register_ret_fields = [
        'streams',
    ]
    for field_name in set(register_ret.keys()) - set(undesired_register_ret_fields):
        page_params[field_name] = register_ret[field_name]

    if narrow_stream is not None:
        # In narrow_stream context, initial pointer is just latest message
        recipient = get_stream_recipient(narrow_stream.id)
        try:
            initial_pointer = Message.objects.filter(recipient=recipient).order_by('id').reverse()[0].id
        except IndexError:
            initial_pointer = -1
        page_params["narrow_stream"] = narrow_stream.name
        if narrow_topic is not None:
            page_params["narrow_topic"] = narrow_topic
        page_params["narrow"] = [dict(operator=term[0], operand=term[1]) for term in narrow]
        page_params["max_message_id"] = initial_pointer
        page_params["pointer"] = initial_pointer
        page_params["have_initial_messages"] = (initial_pointer != -1)
        page_params["enable_desktop_notifications"] = False

    statsd.incr('views.home')
    show_invites = True

    # Some realms only allow admins to invite users
    if user_profile.realm.invite_by_admins_only and not user_profile.is_realm_admin:
        show_invites = False

    request._log_data['extra'] = "[%s]" % (register_ret["queue_id"],)
    response = render(request, 'zerver/index.html',
                      context={'user_profile': user_profile,
                               'page_params': simplejson.encoder.JSONEncoderForHTML().encode(page_params),
                               'nofontface': is_buggy_ua(request.META.get("HTTP_USER_AGENT", "Unspecified")),
                               'avatar_url': avatar_url(user_profile),
                               'show_debug':
                               settings.DEBUG and ('show_debug' in request.GET),
                               'pipeline': settings.PIPELINE_ENABLED,
                               'show_invites': show_invites,
                               'is_admin': user_profile.is_realm_admin,
                               'show_webathena': user_profile.realm.webathena_enabled,
                               'enable_feedback': settings.ENABLE_FEEDBACK,
                               'embedded': narrow_stream is not None,
                               },)
    patch_cache_control(response, no_cache=True, no_store=True, must_revalidate=True)
    return response
开发者ID:brockwhittaker,项目名称:zulip,代码行数:101,代码来源:home.py


示例18: test_import_realm

    def test_import_realm(self) -> None:

        original_realm = Realm.objects.get(string_id='zulip')
        RealmEmoji.objects.get(realm=original_realm).delete()
        # data to test import of huddles
        huddle = [
            self.example_email('hamlet'),
            self.example_email('othello')
        ]
        self.send_huddle_message(
            self.example_email('cordelia'), huddle, 'test huddle message'
        )

        # data to test import of hotspots
        sample_user = self.example_user('hamlet')

        UserHotspot.objects.create(
            user=sample_user, hotspot='intro_streams'
        )

        # data to test import of muted topic
        stream = get_stream(u'Verona', original_realm)
        add_topic_mute(
            user_profile=sample_user,
            stream_id=stream.id,
            recipient_id=get_stream_recipient(stream.id).id,
            topic_name=u'Verona2')

        # data to test import of botstoragedata and botconfigdata
        bot_profile = do_create_user(
            email="[email protected]",
            password="test",
            realm=original_realm,
            full_name="bot",
            short_name="bot",
            bot_type=UserProfile.EMBEDDED_BOT,
            bot_owner=sample_user)
        storage = StateHandler(bot_profile)
        storage.put('some key', 'some value')

        set_bot_config(bot_profile, 'entry 1', 'value 1')

        self._export_realm(original_realm)

        with patch('logging.info'):
            do_import_realm('var/test-export', 'test-zulip')

        # sanity checks

        # test realm
        self.assertTrue(Realm.objects.filter(string_id='test-zulip').exists())
        imported_realm = Realm.objects.get(string_id='test-zulip')
        self.assertNotEqual(imported_realm.id, original_realm.id)

        def assert_realm_values(f: Callable[[Realm], Any], equal: bool=True) -> None:
            orig_realm_result = f(original_realm)
            imported_realm_result = f(imported_realm)
            # orig_realm_result should be truthy and have some values, otherwise
            # the test is kind of meaningless
            assert(orig_realm_result)
            if equal:
                self.assertEqual(orig_realm_result, imported_realm_result)
            else:
                self.assertNotEqual(orig_realm_result, imported_realm_result)

        # test users
        assert_realm_values(
            lambda r: {user.email for user in r.get_admin_users()}
        )

        assert_realm_values(
            lambda r: {user.email for user in r.get_active_users()}
        )

        # test stream
        assert_realm_values(
            lambda r: {stream.name for stream in get_active_streams(r)}
        )

        # test recipients
        def get_recipient_stream(r: Realm) -> Stream:
            return get_stream_recipient(
                Stream.objects.get(name='Verona', realm=r).id
            )

        def get_recipient_user(r: Realm) -> UserProfile:
            return get_personal_recipient(
                UserProfile.objects.get(full_name='Iago', realm=r).id
            )

        assert_realm_values(lambda r: get_recipient_stream(r).type)
        assert_realm_values(lambda r: get_recipient_user(r).type)

        # test subscription
        def get_subscribers(recipient: Recipient) -> Set[str]:
            subscriptions = Subscription.objects.filter(recipient=recipient)
            users = {sub.user_profile.email for sub in subscriptions}
            return users

        assert_realm_values(
#.........这里部分代码省略.........
开发者ID:brainwane,项目名称:zulip,代码行数:101,代码来源:test_import_export.py


示例19: get_recipient_stream

 def get_recipient_stream(r: Realm) -> Stream:
     return get_stream_recipient(
         Stream.objects.get(name='Verona', realm=r).id
     )
开发者ID:brainwane,项目名称:zulip,代码行数:4,代码来源:test_import_export.py



注:本文中的zerver.models.get_stream_recipient函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python models.get_system_bot函数代码示例发布时间:2022-05-26
下一篇:
Python models.get_stream函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap