• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python actions.bulk_remove_subscriptions函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中zerver.lib.actions.bulk_remove_subscriptions函数的典型用法代码示例。如果您正苦于以下问题:Python bulk_remove_subscriptions函数的具体用法?Python bulk_remove_subscriptions怎么用?Python bulk_remove_subscriptions使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了bulk_remove_subscriptions函数的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: remove_subscriptions_backend

def remove_subscriptions_backend(request, user_profile,
                                 streams_raw = REQ("subscriptions", validator=check_list(check_string)),
                                 principals = REQ(validator=check_list(check_string), default=None)):

    removing_someone_else = principals and \
        set(principals) != set((user_profile.email,))
    if removing_someone_else and not user_profile.is_realm_admin:
        # You can only unsubscribe other people from a stream if you are a realm
        # admin.
        return json_error("This action requires administrative rights")

    streams, _ = list_to_streams(streams_raw, user_profile)

    for stream in streams:
        if removing_someone_else and stream.invite_only and \
                not subscribed_to_stream(user_profile, stream):
            # Even as an admin, you can't remove other people from an
            # invite-only stream you're not on.
            return json_error("Cannot administer invite-only streams this way")

    if principals:
        people_to_unsub = set(principal_to_user_profile(
                user_profile, principal) for principal in principals)
    else:
        people_to_unsub = set([user_profile])

    result = dict(removed=[], not_subscribed=[]) # type: Dict[str, List[str]]
    (removed, not_subscribed) = bulk_remove_subscriptions(people_to_unsub, streams)

    for (subscriber, stream) in removed:
        result["removed"].append(stream.name)
    for (subscriber, stream) in not_subscribed:
        result["not_subscribed"].append(stream.name)

    return json_success(result)
开发者ID:anteq,项目名称:zulip,代码行数:35,代码来源:streams.py


示例2: handle

    def handle(self, **options):
        # type: (*Any, **Any) -> None
        if options["domain"] is None or options["stream"] is None or \
                (options["users"] is None and options["all_users"] is None):
            self.print_help("./manage.py", "remove_users_from_stream")
            exit(1)

        realm = get_realm(options["domain"])
        stream_name = options["stream"].strip()
        stream = get_stream(stream_name, realm)

        if options["all_users"]:
            user_profiles = UserProfile.objects.filter(realm=realm)
        else:
            emails = set([email.strip() for email in options["users"].split(",")])
            user_profiles = []
            for email in emails:
                user_profiles.append(get_user_profile_by_email(email))

        result = bulk_remove_subscriptions(user_profiles, [stream])
        not_subscribed = result[1]
        not_subscribed_users = {tup[0] for tup in not_subscribed}

        for user_profile in user_profiles:
            if user_profile in not_subscribed_users:
                print("%s was not subscribed" % (user_profile.email,))
            else:
                print("Removed %s from %s" % (user_profile.email, stream_name))
开发者ID:Jianchun1,项目名称:zulip,代码行数:28,代码来源:remove_users_from_stream.py


示例3: test_subscriptions

    def test_subscriptions(self) -> None:
        now = timezone_now()
        user = [self.example_user('hamlet')]
        stream = [self.make_stream('test_stream')]

        bulk_add_subscriptions(stream, user)
        subscription_creation_logs = RealmAuditLog.objects.filter(event_type=RealmAuditLog.SUBSCRIPTION_CREATED,
                                                                  event_time__gte=now)
        self.assertEqual(subscription_creation_logs.count(), 1)
        self.assertEqual(subscription_creation_logs[0].modified_stream.id, stream[0].id)
        self.assertEqual(subscription_creation_logs[0].modified_user, user[0])

        bulk_remove_subscriptions(user, stream, get_client("website"))
        subscription_deactivation_logs = RealmAuditLog.objects.filter(event_type=RealmAuditLog.SUBSCRIPTION_DEACTIVATED,
                                                                      event_time__gte=now)
        self.assertEqual(subscription_deactivation_logs.count(), 1)
        self.assertEqual(subscription_deactivation_logs[0].modified_stream.id, stream[0].id)
        self.assertEqual(subscription_deactivation_logs[0].modified_user, user[0])
开发者ID:BakerWang,项目名称:zulip,代码行数:18,代码来源:test_audit_log.py


示例4: test_subscriptions

    def test_subscriptions(self):
        # type: () -> None
        now = timezone_now()
        user = [self.example_user('hamlet')]
        stream = [self.make_stream('test_stream')]

        bulk_add_subscriptions(stream, user)
        subscription_creation_logs = RealmAuditLog.objects.filter(event_type='subscription_created',
                                                                  event_time__gte=now)
        self.assertEqual(subscription_creation_logs.count(), 1)
        self.assertEqual(subscription_creation_logs[0].modified_stream.id, stream[0].id)
        self.assertEqual(subscription_creation_logs[0].modified_user, user[0])

        bulk_remove_subscriptions(user, stream)
        subscription_deactivation_logs = RealmAuditLog.objects.filter(event_type='subscription_deactivated',
                                                                      event_time__gte=now)
        self.assertEqual(subscription_deactivation_logs.count(), 1)
        self.assertEqual(subscription_deactivation_logs[0].modified_stream.id, stream[0].id)
        self.assertEqual(subscription_deactivation_logs[0].modified_user, user[0])
开发者ID:brockwhittaker,项目名称:zulip,代码行数:19,代码来源:test_audit_log.py


示例5: handle

    def handle(self, *args: Any, **options: str) -> None:
        realm = self.get_realm(options)
        assert realm is not None  # Should be ensured by parser
        stream_to_keep = get_stream(options["stream_to_keep"], realm)
        stream_to_destroy = get_stream(options["stream_to_destroy"], realm)

        recipient_to_destroy = get_stream_recipient(stream_to_destroy.id)
        recipient_to_keep = get_stream_recipient(stream_to_keep.id)

        # The high-level approach here is to move all the messages to
        # the surviving stream, deactivate all the subscriptions on
        # the stream to be removed and deactivate the stream, and add
        # new subscriptions to the stream to keep for any users who
        # were only on the now-deactivated stream.

        # Move the messages, and delete the old copies from caches.
        message_ids_to_clear = list(Message.objects.filter(
            recipient=recipient_to_destroy).values_list("id", flat=True))
        count = Message.objects.filter(recipient=recipient_to_destroy).update(recipient=recipient_to_keep)
        print("Moved %s messages" % (count,))
        bulk_delete_cache_keys(message_ids_to_clear)

        # Move the Subscription objects.  This algorithm doesn't
        # preserve any stream settings/colors/etc. from the stream
        # being destroyed, but it's convenient.
        existing_subs = Subscription.objects.filter(recipient=recipient_to_keep)
        users_already_subscribed = dict((sub.user_profile_id, sub.active) for sub in existing_subs)

        subs_to_deactivate = Subscription.objects.filter(recipient=recipient_to_destroy, active=True)
        users_to_activate = [
            sub.user_profile for sub in subs_to_deactivate
            if not users_already_subscribed.get(sub.user_profile_id, False)
        ]

        if len(subs_to_deactivate) > 0:
            print("Deactivating %s subscriptions" % (len(subs_to_deactivate),))
            bulk_remove_subscriptions([sub.user_profile for sub in subs_to_deactivate],
                                      [stream_to_destroy],
                                      self.get_client())
        do_deactivate_stream(stream_to_destroy)
        if len(users_to_activate) > 0:
            print("Adding %s subscriptions" % (len(users_to_activate),))
            bulk_add_subscriptions([stream_to_keep], users_to_activate)
开发者ID:BakerWang,项目名称:zulip,代码行数:43,代码来源:merge_streams.py


示例6: handle

    def handle(self, **options: Any) -> None:
        realm = self.get_realm(options)
        assert realm is not None  # Should be ensured by parser
        user_profiles = self.get_users(options, realm)
        stream_name = options["stream"].strip()
        stream = get_stream(stream_name, realm)

        result = bulk_remove_subscriptions(user_profiles, [stream])
        not_subscribed = result[1]
        not_subscribed_users = {tup[0] for tup in not_subscribed}

        for user_profile in user_profiles:
            if user_profile in not_subscribed_users:
                print("%s was not subscribed" % (user_profile.email,))
            else:
                print("Removed %s from %s" % (user_profile.email, stream_name))
开发者ID:brockwhittaker,项目名称:zulip,代码行数:16,代码来源:remove_users_from_stream.py


示例7: remove_subscriptions_backend

def remove_subscriptions_backend(
        request: HttpRequest, user_profile: UserProfile,
        streams_raw: Iterable[Text]=REQ("subscriptions", validator=check_list(check_string)),
        principals: Optional[Iterable[Text]]=REQ(validator=check_list(check_string), default=None),
) -> HttpResponse:

    removing_someone_else = principals and \
        set(principals) != set((user_profile.email,))
    if removing_someone_else and not user_profile.is_realm_admin:
        # You can only unsubscribe other people from a stream if you are a realm
        # admin.
        return json_error(_("This action requires administrative rights"))

    streams_as_dict = []
    for stream_name in streams_raw:
        streams_as_dict.append({"name": stream_name.strip()})

    streams, __ = list_to_streams(streams_as_dict, user_profile)

    for stream in streams:
        if removing_someone_else and stream.invite_only and \
                not subscribed_to_stream(user_profile, stream.id):
            # Even as an admin, you can't remove other people from an
            # invite-only stream you're not on.
            return json_error(_("Cannot administer invite-only streams this way"))

    if principals:
        people_to_unsub = set(principal_to_user_profile(
            user_profile, principal) for principal in principals)
    else:
        people_to_unsub = set([user_profile])

    result = dict(removed=[], not_subscribed=[])  # type: Dict[str, List[Text]]
    (removed, not_subscribed) = bulk_remove_subscriptions(people_to_unsub, streams,
                                                          acting_user=user_profile)

    for (subscriber, removed_stream) in removed:
        result["removed"].append(removed_stream.name)
    for (subscriber, not_subscribed_stream) in not_subscribed:
        result["not_subscribed"].append(not_subscribed_stream.name)

    return json_success(result)
开发者ID:joydeep1701,项目名称:zulip,代码行数:42,代码来源:streams.py


示例8: unsubscribe_from_stream

 def unsubscribe_from_stream(self, email, stream_name):
     # type: (Text, Text) -> None
     user_profile = get_user_profile_by_email(email)
     stream = get_stream(stream_name, user_profile.realm)
     bulk_remove_subscriptions([user_profile], [stream])
开发者ID:christi3k,项目名称:zulip,代码行数:5,代码来源:test_classes.py


示例9: unsubscribe

 def unsubscribe(self, user_profile: UserProfile, stream_name: str) -> None:
     client = get_client("website")
     stream = get_stream(stream_name, user_profile.realm)
     bulk_remove_subscriptions([user_profile], [stream], client)
开发者ID:BakerWang,项目名称:zulip,代码行数:4,代码来源:test_classes.py


示例10: unsubscribe

 def unsubscribe(self, user_profile: UserProfile, stream_name: Text) -> None:
     stream = get_stream(stream_name, user_profile.realm)
     bulk_remove_subscriptions([user_profile], [stream])
开发者ID:joydeep1701,项目名称:zulip,代码行数:3,代码来源:test_classes.py



注:本文中的zerver.lib.actions.bulk_remove_subscriptions函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python actions.check_send_message函数代码示例发布时间:2022-05-26
下一篇:
Python actions.bulk_add_subscriptions函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap