本文整理汇总了Python中zerver.models.get_stream函数的典型用法代码示例。如果您正苦于以下问题:Python get_stream函数的具体用法?Python get_stream怎么用?Python get_stream使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_stream函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: check_stream_name_available
def check_stream_name_available(realm: Realm, name: str) -> None:
check_stream_name(name)
try:
get_stream(name, realm)
raise JsonableError(_("Stream name '%s' is already taken.") % (name,))
except Stream.DoesNotExist:
pass
开发者ID:BakerWang,项目名称:zulip,代码行数:7,代码来源:streams.py
示例2: check_stream_name_available
def check_stream_name_available(realm, name):
# type: (Realm, Text) -> None
check_stream_name(name)
try:
get_stream(name, realm)
raise JsonableError(_("Stream name '%s' is already taken") % (name,))
except Stream.DoesNotExist:
pass
开发者ID:llGurudevll,项目名称:zulip,代码行数:8,代码来源:streams.py
示例3: test_signup_notifications_stream
def test_signup_notifications_stream(self) -> None:
email = self.example_email("hamlet")
realm = get_realm('zulip')
realm.signup_notifications_stream = get_stream('Denmark', realm)
realm.save()
self.login(email)
result = self._get_home_page()
page_params = self._get_page_params(result)
self.assertEqual(page_params['realm_signup_notifications_stream_id'], get_stream('Denmark', realm).id)
开发者ID:umairwaheed,项目名称:zulip,代码行数:9,代码来源:test_home.py
示例4: test_user_ids_muting_topic
def test_user_ids_muting_topic(self) -> None:
hamlet = self.example_user('hamlet')
cordelia = self.example_user('cordelia')
realm = hamlet.realm
stream = get_stream(u'Verona', realm)
recipient = get_stream_recipient(stream.id)
topic_name = 'teST topic'
stream_topic_target = StreamTopicTarget(
stream_id=stream.id,
topic_name=topic_name,
)
user_ids = stream_topic_target.user_ids_muting_topic()
self.assertEqual(user_ids, set())
def mute_user(user: UserProfile) -> None:
add_topic_mute(
user_profile=user,
stream_id=stream.id,
recipient_id=recipient.id,
topic_name='test TOPIC',
)
mute_user(hamlet)
user_ids = stream_topic_target.user_ids_muting_topic()
self.assertEqual(user_ids, {hamlet.id})
mute_user(cordelia)
user_ids = stream_topic_target.user_ids_muting_topic()
self.assertEqual(user_ids, {hamlet.id, cordelia.id})
开发者ID:brainwane,项目名称:zulip,代码行数:31,代码来源:test_muting.py
示例5: json_invite_users
def json_invite_users(request, user_profile, invitee_emails_raw=REQ("invitee_emails")):
# type: (HttpRequest, UserProfile, str) -> HttpResponse
if not invitee_emails_raw:
return json_error(_("You must specify at least one email address."))
invitee_emails = get_invitee_emails_set(invitee_emails_raw)
stream_names = request.POST.getlist('stream')
if not stream_names:
return json_error(_("You must specify at least one stream for invitees to join."))
# We unconditionally sub you to the notifications stream if it
# exists and is public.
notifications_stream = user_profile.realm.notifications_stream
if notifications_stream and not notifications_stream.invite_only:
stream_names.append(notifications_stream.name)
streams = [] # type: List[Stream]
for stream_name in stream_names:
stream = get_stream(stream_name, user_profile.realm)
if stream is None:
return json_error(_("Stream does not exist: %s. No invites were sent.") % (stream_name,))
streams.append(stream)
ret_error, error_data = do_invite_users(user_profile, invitee_emails, streams)
if ret_error is not None:
return json_error(data=error_data, msg=ret_error)
else:
return json_success()
开发者ID:Jianchun1,项目名称:zulip,代码行数:30,代码来源:invite.py
示例6: setUpBeforeMigration
def setUpBeforeMigration(self, apps: StateApps) -> None:
Reaction = apps.get_model('zerver', 'Reaction')
RealmEmoji = apps.get_model('zerver', 'RealmEmoji')
Message = apps.get_model('zerver', 'Message')
Recipient = apps.get_model('zerver', 'Recipient')
sender = self.example_user('iago')
realm = sender.realm
sending_client = make_client(name="test suite")
stream_name = 'Denmark'
stream = get_stream(stream_name, realm)
subject = 'foo'
def send_fake_message(message_content: str, stream: ModelBase) -> ModelBase:
recipient = Recipient.objects.get(type_id=stream.id, type=2)
return Message.objects.create(sender = sender,
recipient = recipient,
subject = subject,
content = message_content,
pub_date = timezone_now(),
sending_client = sending_client)
message = send_fake_message('Test 1', stream)
# Create reactions for all the realm emoji's on the message we faked.
for realm_emoji in RealmEmoji.objects.all():
reaction = Reaction(user_profile=sender, message=message,
emoji_name=realm_emoji.name, emoji_code=realm_emoji.name,
reaction_type='realm_emoji')
reaction.save()
realm_emoji_reactions_count = Reaction.objects.filter(reaction_type='realm_emoji').count()
self.assertEqual(realm_emoji_reactions_count, 1)
开发者ID:284928489,项目名称:zulip,代码行数:31,代码来源:test_migrations.py
示例7: test_remove_muted_topic
def test_remove_muted_topic(self) -> None:
user = self.example_user('hamlet')
email = user.email
realm = user.realm
self.login(email)
stream = get_stream(u'Verona', realm)
recipient = get_stream_recipient(stream.id)
url = '/api/v1/users/me/subscriptions/muted_topics'
payloads = [
{'stream': stream.name, 'topic': 'vERONA3', 'op': 'remove'},
{'stream_id': stream.id, 'topic': 'vEroNA3', 'op': 'remove'},
]
for data in payloads:
add_topic_mute(
user_profile=user,
stream_id=stream.id,
recipient_id=recipient.id,
topic_name='Verona3',
)
self.assertIn([stream.name, 'Verona3'], get_topic_mutes(user))
result = self.api_patch(email, url, data)
self.assert_json_success(result)
self.assertNotIn([stream.name, 'Verona3'], get_topic_mutes(user))
self.assertFalse(topic_is_muted(user, stream.id, 'verona3'))
开发者ID:BakerWang,项目名称:zulip,代码行数:29,代码来源:test_muting.py
示例8: test_muted_topic_remove_invalid
def test_muted_topic_remove_invalid(self) -> None:
user = self.example_user('hamlet')
email = user.email
realm = user.realm
self.login(email)
stream = get_stream('Verona', realm)
url = '/api/v1/users/me/subscriptions/muted_topics'
data = {'stream': 'BOGUS', 'topic': 'Verona3', 'op': 'remove'} # type: Dict[str, Any]
result = self.api_patch(email, url, data)
self.assert_json_error(result, "Topic is not muted")
data = {'stream': stream.name, 'topic': 'BOGUS', 'op': 'remove'}
result = self.api_patch(email, url, data)
self.assert_json_error(result, "Topic is not muted")
data = {'stream_id': 999999999, 'topic': 'BOGUS', 'op': 'remove'}
result = self.api_patch(email, url, data)
self.assert_json_error(result, "Topic is not muted")
data = {'topic': 'Verona3', 'op': 'remove'}
result = self.api_patch(email, url, data)
self.assert_json_error(result, "Please supply 'stream'.")
data = {'stream': stream.name, 'stream_id': stream.id, 'topic': 'Verona3', 'op': 'remove'}
result = self.api_patch(email, url, data)
self.assert_json_error(result, "Please choose one: 'stream' or 'stream_id'.")
开发者ID:BakerWang,项目名称:zulip,代码行数:27,代码来源:test_muting.py
示例9: test_add_muted_topic
def test_add_muted_topic(self) -> None:
user = self.example_user('hamlet')
self.login(user.email)
stream = get_stream('Verona', user.realm)
url = '/api/v1/users/me/subscriptions/muted_topics'
payloads = [
{'stream': stream.name, 'topic': 'Verona3', 'op': 'add'},
{'stream_id': stream.id, 'topic': 'Verona3', 'op': 'add'},
]
for data in payloads:
result = self.api_patch(user.email, url, data)
self.assert_json_success(result)
self.assertIn([stream.name, 'Verona3'], get_topic_mutes(user))
self.assertTrue(topic_is_muted(user, stream.id, 'Verona3'))
self.assertTrue(topic_is_muted(user, stream.id, 'verona3'))
remove_topic_mute(
user_profile=user,
stream_id=stream.id,
topic_name='Verona3',
)
开发者ID:BakerWang,项目名称:zulip,代码行数:27,代码来源:test_muting.py
示例10: test_muted_topic_add_invalid
def test_muted_topic_add_invalid(self) -> None:
user = self.example_user('hamlet')
email = user.email
realm = user.realm
self.login(email)
stream = get_stream('Verona', realm)
recipient = get_stream_recipient(stream.id)
add_topic_mute(
user_profile=user,
stream_id=stream.id,
recipient_id=recipient.id,
topic_name=u'Verona3',
)
url = '/api/v1/users/me/subscriptions/muted_topics'
data = {'stream': stream.name, 'topic': 'Verona3', 'op': 'add'} # type: Dict[str, Any]
result = self.api_patch(email, url, data)
self.assert_json_error(result, "Topic already muted")
data = {'stream_id': 999999999, 'topic': 'Verona3', 'op': 'add'}
result = self.api_patch(email, url, data)
self.assert_json_error(result, "Invalid stream id")
data = {'topic': 'Verona3', 'op': 'add'}
result = self.api_patch(email, url, data)
self.assert_json_error(result, "Please supply 'stream'.")
data = {'stream': stream.name, 'stream_id': stream.id, 'topic': 'Verona3', 'op': 'add'}
result = self.api_patch(email, url, data)
self.assert_json_error(result, "Please choose one: 'stream' or 'stream_id'.")
开发者ID:BakerWang,项目名称:zulip,代码行数:32,代码来源:test_muting.py
示例11: test_receive_stream_email_messages_no_textual_body
def test_receive_stream_email_messages_no_textual_body(self) -> None:
user_profile = self.example_user('hamlet')
self.login(user_profile.email)
self.subscribe(user_profile, "Denmark")
stream = get_stream("Denmark", user_profile.realm)
stream_to_address = encode_email_address(stream)
headers = {}
headers['Reply-To'] = self.example_email('othello')
# No textual body
incoming_valid_message = MIMEMultipart()
with open(os.path.join(settings.DEPLOY_ROOT, "static/images/default-avatar.png"), 'rb') as f:
incoming_valid_message.attach(MIMEImage(f.read()))
incoming_valid_message['Subject'] = 'TestStreamEmailMessages Subject'
incoming_valid_message['From'] = self.example_email('hamlet')
incoming_valid_message['To'] = stream_to_address
incoming_valid_message['Reply-to'] = self.example_email('othello')
exception_message = ""
debug_info = {} # type: Dict[str, Any]
# process_message eats the exception & logs an error which can't be parsed here
# so calling process_stream_message directly
try:
with mock.patch('logging.warning'):
process_stream_message(str(incoming_valid_message['To']), # need to insert str() or mypy throws type error
incoming_valid_message,
debug_info)
except ZulipEmailForwardError as e:
# empty body throws exception
exception_message = str(e)
self.assertEqual(exception_message, "Unable to find plaintext or HTML message body")
开发者ID:deltay,项目名称:zulip,代码行数:34,代码来源:test_email_mirror.py
示例12: test_receive_stream_email_messages_empty_body
def test_receive_stream_email_messages_empty_body(self) -> None:
# build dummy messages for stream
# test message with empty body is not sent
user_profile = self.example_user('hamlet')
self.login(user_profile.email)
self.subscribe(user_profile, "Denmark")
stream = get_stream("Denmark", user_profile.realm)
stream_to_address = encode_email_address(stream)
headers = {}
headers['Reply-To'] = self.example_email('othello')
# empty body
incoming_valid_message = MIMEText('')
incoming_valid_message['Subject'] = 'TestStreamEmailMessages Subject'
incoming_valid_message['From'] = self.example_email('hamlet')
incoming_valid_message['To'] = stream_to_address
incoming_valid_message['Reply-to'] = self.example_email('othello')
exception_message = ""
debug_info = {} # type: Dict[str, Any]
# process_message eats the exception & logs an error which can't be parsed here
# so calling process_stream_message directly
try:
process_stream_message(str(incoming_valid_message['To']), # need to insert str() or mypy throws type error
incoming_valid_message,
debug_info)
except ZulipEmailForwardError as e:
# empty body throws exception
exception_message = str(e)
self.assertEqual(exception_message, "Email has no nonempty body sections; ignoring.")
开发者ID:deltay,项目名称:zulip,代码行数:34,代码来源:test_email_mirror.py
示例13: handle
def handle(self, **options):
# type: (*Any, **Any) -> None
if options["domain"] is None or options["stream"] is None or \
(options["users"] is None and options["all_users"] is None):
self.print_help("./manage.py", "remove_users_from_stream")
exit(1)
realm = get_realm(options["domain"])
stream_name = options["stream"].strip()
stream = get_stream(stream_name, realm)
if options["all_users"]:
user_profiles = UserProfile.objects.filter(realm=realm)
else:
emails = set([email.strip() for email in options["users"].split(",")])
user_profiles = []
for email in emails:
user_profiles.append(get_user_profile_by_email(email))
result = bulk_remove_subscriptions(user_profiles, [stream])
not_subscribed = result[1]
not_subscribed_users = {tup[0] for tup in not_subscribed}
for user_profile in user_profiles:
if user_profile in not_subscribed_users:
print("%s was not subscribed" % (user_profile.email,))
else:
print("Removed %s from %s" % (user_profile.email, stream_name))
开发者ID:Jianchun1,项目名称:zulip,代码行数:28,代码来源:remove_users_from_stream.py
示例14: can_access_stream_history_by_name
def can_access_stream_history_by_name(user_profile: UserProfile, stream_name: str) -> bool:
"""Determine whether the provided user is allowed to access the
history of the target stream. The stream is specified by name.
This is used by the caller to determine whether this user can get
historical messages before they joined for a narrowing search.
Because of the way our search is currently structured,
we may be passed an invalid stream here. We return
False in that situation, and subsequent code will do
validation and raise the appropriate JsonableError.
Note that this function should only be used in contexts where
access_stream is being called elsewhere to confirm that the user
can actually see this stream.
"""
try:
stream = get_stream(stream_name, user_profile.realm)
except Stream.DoesNotExist:
return False
if stream.is_history_realm_public() and not user_profile.is_guest:
return True
if stream.is_history_public_to_subscribers():
# In this case, we check if the user is subscribed.
error = _("Invalid stream name '%s'" % (stream_name,))
try:
(recipient, sub) = access_stream_common(user_profile, stream, error)
except JsonableError:
return False
return True
return False
开发者ID:BakerWang,项目名称:zulip,代码行数:33,代码来源:streams.py
示例15: test_receive_stream_email_messages_empty_body
def test_receive_stream_email_messages_empty_body(self):
# build dummy messages for stream
# test message with empty body is not sent
self.login("[email protected]")
user_profile = get_user_profile_by_email("[email protected]")
self.subscribe_to_stream(user_profile.email, "Denmark")
stream = get_stream("Denmark", user_profile.realm)
stream_to_address = encode_email_address(stream)
headers = {}
headers['Reply-To'] = '[email protected]'
# empty body
incoming_valid_message = MIMEText('')
incoming_valid_message['Subject'] = 'TestStreamEmailMessages Subject'
incoming_valid_message['From'] = "[email protected]"
incoming_valid_message['To'] = stream_to_address
incoming_valid_message['Reply-to'] = "[email protected]"
exception_message = ""
debug_info = {}
# process_message eats the exception & logs an error which can't be parsed here
# so calling process_stream_message directly
try:
process_stream_message(incoming_valid_message['To'],
incoming_valid_message['Subject'],
incoming_valid_message,
debug_info)
except ZulipEmailForwardError as e:
# empty body throws exception
exception_message = e.message
self.assertEqual(exception_message, "Unable to find plaintext or HTML message body")
开发者ID:seanly,项目名称:zulip,代码行数:35,代码来源:test_email_mirror.py
示例16: test_receive_stream_email_messages_success
def test_receive_stream_email_messages_success(self):
# build dummy messages for stream
# test valid incoming stream message is processed properly
self.login("[email protected]")
user_profile = get_user_profile_by_email("[email protected]")
self.subscribe_to_stream(user_profile.email, "Denmark")
stream = get_stream("Denmark", user_profile.realm)
stream_to_address = encode_email_address(stream)
incoming_valid_message = MIMEText('TestStreamEmailMessages Body')
incoming_valid_message['Subject'] = 'TestStreamEmailMessages Subject'
incoming_valid_message['From'] = "[email protected]"
incoming_valid_message['To'] = stream_to_address
incoming_valid_message['Reply-to'] = "[email protected]"
process_message(incoming_valid_message)
# Hamlet is subscribed to this stream so should see the email message from Othello.
message = most_recent_message(user_profile)
self.assertEqual(message.content, "TestStreamEmailMessages Body")
self.assertEqual(get_display_recipient(message.recipient), stream.name)
self.assertEqual(message.subject, incoming_valid_message['Subject'])
开发者ID:seanly,项目名称:zulip,代码行数:26,代码来源:test_email_mirror.py
示例17: subscribe_to_stream
def subscribe_to_stream(self, email, stream_name, realm=None):
realm = get_realm(resolve_email_to_domain(email))
stream = get_stream(stream_name, realm)
if stream is None:
stream, _ = create_stream_if_needed(realm, stream_name)
user_profile = get_user_profile_by_email(email)
do_add_subscription(user_profile, stream, no_log=True)
开发者ID:RomiPierre,项目名称:zulip,代码行数:7,代码来源:test_helpers.py
示例18: test_error_no_recipient
def test_error_no_recipient(self) -> None:
script = os.path.join(os.path.dirname(__file__),
'../../scripts/lib/email-mirror-postfix')
sender = self.example_email('hamlet')
stream = get_stream("Denmark", get_realm("zulip"))
stream_to_address = encode_email_address(stream)
template_path = os.path.join(MAILS_DIR, "simple.txt")
with open(template_path) as template_file:
mail_template = template_file.read()
mail = mail_template.format(stream_to_address=stream_to_address, sender=sender)
read_pipe, write_pipe = os.pipe()
os.write(write_pipe, mail.encode())
os.close(write_pipe)
success_call = True
try:
subprocess.check_output([script, '-s', settings.SHARED_SECRET, '-t'],
stdin=read_pipe)
except subprocess.CalledProcessError as e:
self.assertEqual(
e.output,
b'5.1.1 Bad destination mailbox address: No missed message email address.\n'
)
self.assertEqual(e.returncode, 67)
success_call = False
self.assertFalse(success_call)
开发者ID:gnprice,项目名称:zulip,代码行数:26,代码来源:test_email_mirror.py
示例19: by_stream
def by_stream(self, query, operand, maybe_negate):
# type: (Query, str, ConditionTransform) -> Query
stream = get_stream(operand, self.user_profile.realm)
if stream is None:
raise BadNarrowOperator('unknown stream ' + operand)
if self.user_profile.realm.is_zephyr_mirror_realm:
# MIT users expect narrowing to "social" to also show messages to /^(un)*social(.d)*$/
# (unsocial, ununsocial, social.d, etc)
m = re.search(r'^(?:un)*(.+?)(?:\.d)*$', stream.name, re.IGNORECASE)
# Since the regex has a `.+` in it and "" is invalid as a
# stream name, this will always match
assert(m is not None)
base_stream_name = m.group(1)
matching_streams = get_active_streams(self.user_profile.realm).filter(
name__iregex=r'^(un)*%s(\.d)*$' % (self._pg_re_escape(base_stream_name),))
matching_stream_ids = [matching_stream.id for matching_stream in matching_streams]
recipients_map = bulk_get_recipients(Recipient.STREAM, matching_stream_ids)
cond = column("recipient_id").in_([recipient.id for recipient in recipients_map.values()])
return query.where(maybe_negate(cond))
recipient = get_recipient(Recipient.STREAM, type_id=stream.id)
cond = column("recipient_id") == recipient.id
return query.where(maybe_negate(cond))
开发者ID:souravbadami,项目名称:zulip,代码行数:25,代码来源:messages.py
示例20: test_receive_stream_email_messages_success
def test_receive_stream_email_messages_success(self) -> None:
# build dummy messages for stream
# test valid incoming stream message is processed properly
user_profile = self.example_user('hamlet')
self.login(user_profile.email)
self.subscribe(user_profile, "Denmark")
stream = get_stream("Denmark", user_profile.realm)
stream_to_address = encode_email_address(stream)
incoming_valid_message = MIMEText('TestStreamEmailMessages Body') # type: Any # https://github.com/python/typeshed/issues/275
incoming_valid_message['Subject'] = 'TestStreamEmailMessages Subject'
incoming_valid_message['From'] = self.example_email('hamlet')
incoming_valid_message['To'] = stream_to_address
incoming_valid_message['Reply-to'] = self.example_email('othello')
process_message(incoming_valid_message)
# Hamlet is subscribed to this stream so should see the email message from Othello.
message = most_recent_message(user_profile)
self.assertEqual(message.content, "TestStreamEmailMessages Body")
self.assertEqual(get_display_recipient(message.recipient), stream.name)
self.assertEqual(message.topic_name(), incoming_valid_message['Subject'])
开发者ID:gnprice,项目名称:zulip,代码行数:26,代码来源:test_email_mirror.py
注:本文中的zerver.models.get_stream函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论