本文整理汇总了Python中zerver.lib.actions.bulk_remove_subscriptions函数的典型用法代码示例。如果您正苦于以下问题:Python bulk_remove_subscriptions函数的具体用法?Python bulk_remove_subscriptions怎么用?Python bulk_remove_subscriptions使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了bulk_remove_subscriptions函数的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: remove_subscriptions_backend
def remove_subscriptions_backend(request, user_profile,
streams_raw = REQ("subscriptions", validator=check_list(check_string)),
principals = REQ(validator=check_list(check_string), default=None)):
removing_someone_else = principals and \
set(principals) != set((user_profile.email,))
if removing_someone_else and not user_profile.is_realm_admin:
# You can only unsubscribe other people from a stream if you are a realm
# admin.
return json_error("This action requires administrative rights")
streams, _ = list_to_streams(streams_raw, user_profile)
for stream in streams:
if removing_someone_else and stream.invite_only and \
not subscribed_to_stream(user_profile, stream):
# Even as an admin, you can't remove other people from an
# invite-only stream you're not on.
return json_error("Cannot administer invite-only streams this way")
if principals:
people_to_unsub = set(principal_to_user_profile(
user_profile, principal) for principal in principals)
else:
people_to_unsub = set([user_profile])
result = dict(removed=[], not_subscribed=[]) # type: Dict[str, List[str]]
(removed, not_subscribed) = bulk_remove_subscriptions(people_to_unsub, streams)
for (subscriber, stream) in removed:
result["removed"].append(stream.name)
for (subscriber, stream) in not_subscribed:
result["not_subscribed"].append(stream.name)
return json_success(result)
开发者ID:anteq,项目名称:zulip,代码行数:35,代码来源:streams.py
示例2: handle
def handle(self, **options):
# type: (*Any, **Any) -> None
if options["domain"] is None or options["stream"] is None or \
(options["users"] is None and options["all_users"] is None):
self.print_help("./manage.py", "remove_users_from_stream")
exit(1)
realm = get_realm(options["domain"])
stream_name = options["stream"].strip()
stream = get_stream(stream_name, realm)
if options["all_users"]:
user_profiles = UserProfile.objects.filter(realm=realm)
else:
emails = set([email.strip() for email in options["users"].split(",")])
user_profiles = []
for email in emails:
user_profiles.append(get_user_profile_by_email(email))
result = bulk_remove_subscriptions(user_profiles, [stream])
not_subscribed = result[1]
not_subscribed_users = {tup[0] for tup in not_subscribed}
for user_profile in user_profiles:
if user_profile in not_subscribed_users:
print("%s was not subscribed" % (user_profile.email,))
else:
print("Removed %s from %s" % (user_profile.email, stream_name))
开发者ID:Jianchun1,项目名称:zulip,代码行数:28,代码来源:remove_users_from_stream.py
示例3: test_subscriptions
def test_subscriptions(self) -> None:
now = timezone_now()
user = [self.example_user('hamlet')]
stream = [self.make_stream('test_stream')]
bulk_add_subscriptions(stream, user)
subscription_creation_logs = RealmAuditLog.objects.filter(event_type=RealmAuditLog.SUBSCRIPTION_CREATED,
event_time__gte=now)
self.assertEqual(subscription_creation_logs.count(), 1)
self.assertEqual(subscription_creation_logs[0].modified_stream.id, stream[0].id)
self.assertEqual(subscription_creation_logs[0].modified_user, user[0])
bulk_remove_subscriptions(user, stream, get_client("website"))
subscription_deactivation_logs = RealmAuditLog.objects.filter(event_type=RealmAuditLog.SUBSCRIPTION_DEACTIVATED,
event_time__gte=now)
self.assertEqual(subscription_deactivation_logs.count(), 1)
self.assertEqual(subscription_deactivation_logs[0].modified_stream.id, stream[0].id)
self.assertEqual(subscription_deactivation_logs[0].modified_user, user[0])
开发者ID:BakerWang,项目名称:zulip,代码行数:18,代码来源:test_audit_log.py
示例4: test_subscriptions
def test_subscriptions(self):
# type: () -> None
now = timezone_now()
user = [self.example_user('hamlet')]
stream = [self.make_stream('test_stream')]
bulk_add_subscriptions(stream, user)
subscription_creation_logs = RealmAuditLog.objects.filter(event_type='subscription_created',
event_time__gte=now)
self.assertEqual(subscription_creation_logs.count(), 1)
self.assertEqual(subscription_creation_logs[0].modified_stream.id, stream[0].id)
self.assertEqual(subscription_creation_logs[0].modified_user, user[0])
bulk_remove_subscriptions(user, stream)
subscription_deactivation_logs = RealmAuditLog.objects.filter(event_type='subscription_deactivated',
event_time__gte=now)
self.assertEqual(subscription_deactivation_logs.count(), 1)
self.assertEqual(subscription_deactivation_logs[0].modified_stream.id, stream[0].id)
self.assertEqual(subscription_deactivation_logs[0].modified_user, user[0])
开发者ID:brockwhittaker,项目名称:zulip,代码行数:19,代码来源:test_audit_log.py
示例5: handle
def handle(self, *args: Any, **options: str) -> None:
realm = self.get_realm(options)
assert realm is not None # Should be ensured by parser
stream_to_keep = get_stream(options["stream_to_keep"], realm)
stream_to_destroy = get_stream(options["stream_to_destroy"], realm)
recipient_to_destroy = get_stream_recipient(stream_to_destroy.id)
recipient_to_keep = get_stream_recipient(stream_to_keep.id)
# The high-level approach here is to move all the messages to
# the surviving stream, deactivate all the subscriptions on
# the stream to be removed and deactivate the stream, and add
# new subscriptions to the stream to keep for any users who
# were only on the now-deactivated stream.
# Move the messages, and delete the old copies from caches.
message_ids_to_clear = list(Message.objects.filter(
recipient=recipient_to_destroy).values_list("id", flat=True))
count = Message.objects.filter(recipient=recipient_to_destroy).update(recipient=recipient_to_keep)
print("Moved %s messages" % (count,))
bulk_delete_cache_keys(message_ids_to_clear)
# Move the Subscription objects. This algorithm doesn't
# preserve any stream settings/colors/etc. from the stream
# being destroyed, but it's convenient.
existing_subs = Subscription.objects.filter(recipient=recipient_to_keep)
users_already_subscribed = dict((sub.user_profile_id, sub.active) for sub in existing_subs)
subs_to_deactivate = Subscription.objects.filter(recipient=recipient_to_destroy, active=True)
users_to_activate = [
sub.user_profile for sub in subs_to_deactivate
if not users_already_subscribed.get(sub.user_profile_id, False)
]
if len(subs_to_deactivate) > 0:
print("Deactivating %s subscriptions" % (len(subs_to_deactivate),))
bulk_remove_subscriptions([sub.user_profile for sub in subs_to_deactivate],
[stream_to_destroy],
self.get_client())
do_deactivate_stream(stream_to_destroy)
if len(users_to_activate) > 0:
print("Adding %s subscriptions" % (len(users_to_activate),))
bulk_add_subscriptions([stream_to_keep], users_to_activate)
开发者ID:BakerWang,项目名称:zulip,代码行数:43,代码来源:merge_streams.py
示例6: handle
def handle(self, **options: Any) -> None:
realm = self.get_realm(options)
assert realm is not None # Should be ensured by parser
user_profiles = self.get_users(options, realm)
stream_name = options["stream"].strip()
stream = get_stream(stream_name, realm)
result = bulk_remove_subscriptions(user_profiles, [stream])
not_subscribed = result[1]
not_subscribed_users = {tup[0] for tup in not_subscribed}
for user_profile in user_profiles:
if user_profile in not_subscribed_users:
print("%s was not subscribed" % (user_profile.email,))
else:
print("Removed %s from %s" % (user_profile.email, stream_name))
开发者ID:brockwhittaker,项目名称:zulip,代码行数:16,代码来源:remove_users_from_stream.py
示例7: remove_subscriptions_backend
def remove_subscriptions_backend(
request: HttpRequest, user_profile: UserProfile,
streams_raw: Iterable[Text]=REQ("subscriptions", validator=check_list(check_string)),
principals: Optional[Iterable[Text]]=REQ(validator=check_list(check_string), default=None),
) -> HttpResponse:
removing_someone_else = principals and \
set(principals) != set((user_profile.email,))
if removing_someone_else and not user_profile.is_realm_admin:
# You can only unsubscribe other people from a stream if you are a realm
# admin.
return json_error(_("This action requires administrative rights"))
streams_as_dict = []
for stream_name in streams_raw:
streams_as_dict.append({"name": stream_name.strip()})
streams, __ = list_to_streams(streams_as_dict, user_profile)
for stream in streams:
if removing_someone_else and stream.invite_only and \
not subscribed_to_stream(user_profile, stream.id):
# Even as an admin, you can't remove other people from an
# invite-only stream you're not on.
return json_error(_("Cannot administer invite-only streams this way"))
if principals:
people_to_unsub = set(principal_to_user_profile(
user_profile, principal) for principal in principals)
else:
people_to_unsub = set([user_profile])
result = dict(removed=[], not_subscribed=[]) # type: Dict[str, List[Text]]
(removed, not_subscribed) = bulk_remove_subscriptions(people_to_unsub, streams,
acting_user=user_profile)
for (subscriber, removed_stream) in removed:
result["removed"].append(removed_stream.name)
for (subscriber, not_subscribed_stream) in not_subscribed:
result["not_subscribed"].append(not_subscribed_stream.name)
return json_success(result)
开发者ID:joydeep1701,项目名称:zulip,代码行数:42,代码来源:streams.py
示例8: unsubscribe_from_stream
def unsubscribe_from_stream(self, email, stream_name):
# type: (Text, Text) -> None
user_profile = get_user_profile_by_email(email)
stream = get_stream(stream_name, user_profile.realm)
bulk_remove_subscriptions([user_profile], [stream])
开发者ID:christi3k,项目名称:zulip,代码行数:5,代码来源:test_classes.py
示例9: unsubscribe
def unsubscribe(self, user_profile: UserProfile, stream_name: str) -> None:
client = get_client("website")
stream = get_stream(stream_name, user_profile.realm)
bulk_remove_subscriptions([user_profile], [stream], client)
开发者ID:BakerWang,项目名称:zulip,代码行数:4,代码来源:test_classes.py
示例10: unsubscribe
def unsubscribe(self, user_profile: UserProfile, stream_name: Text) -> None:
stream = get_stream(stream_name, user_profile.realm)
bulk_remove_subscriptions([user_profile], [stream])
开发者ID:joydeep1701,项目名称:zulip,代码行数:3,代码来源:test_classes.py
注:本文中的zerver.lib.actions.bulk_remove_subscriptions函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论