本文整理汇总了Python中zerver.models.bulk_get_streams函数的典型用法代码示例。如果您正苦于以下问题:Python bulk_get_streams函数的具体用法?Python bulk_get_streams怎么用?Python bulk_get_streams使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了bulk_get_streams函数的5个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: list_to_streams
def list_to_streams(streams_raw, user_profile, autocreate=False, invite_only=False):
# type: (Iterable[text_type], UserProfile, Optional[bool], Optional[bool]) -> Tuple[List[Stream], List[Stream]]
"""Converts plaintext stream names to a list of Streams, validating input in the process
For each stream name, we validate it to ensure it meets our
requirements for a proper stream name: that is, that it is shorter
than Stream.MAX_NAME_LENGTH characters and passes
valid_stream_name.
This function in autocreate mode should be atomic: either an exception will be raised
during a precheck, or all the streams specified will have been created if applicable.
@param streams_raw The list of stream names to process
@param user_profile The user for whom we are retreiving the streams
@param autocreate Whether we should create streams if they don't already exist
@param invite_only Whether newly created streams should have the invite_only bit set
"""
# Validate all streams, getting extant ones, then get-or-creating the rest.
stream_set = set(stream_name.strip() for stream_name in streams_raw)
for stream_name in stream_set:
if len(stream_name) > Stream.MAX_NAME_LENGTH:
raise JsonableError(_("Stream name (%s) too long.") % (stream_name,))
if not valid_stream_name(stream_name):
raise JsonableError(_("Invalid stream name (%s).") % (stream_name,))
existing_streams = [] # type: List[Stream]
missing_stream_names = [] # type: List[text_type]
existing_stream_map = bulk_get_streams(user_profile.realm, stream_set)
for stream_name in stream_set:
stream = existing_stream_map.get(stream_name.lower())
if stream is None:
missing_stream_names.append(stream_name)
else:
existing_streams.append(stream)
if not missing_stream_names:
# This is the happy path for callers who expected all of these
# streams to exist already.
created_streams = [] # type: List[Stream]
else:
# autocreate=True path starts here
if not user_profile.can_create_streams():
raise JsonableError(_('User cannot create streams.'))
elif not autocreate:
raise JsonableError(_("Stream(s) (%s) do not exist") % ", ".join(missing_stream_names))
# We already filtered out existing streams, so dup_streams
# will normally be an empty list below, but we protect against somebody
# else racing to create the same stream. (This is not an entirely
# paranoid approach, since often on Zulip two people will discuss
# creating a new stream, and both people eagerly do it.)
created_streams, dup_streams = create_streams_if_needed(realm=user_profile.realm,
stream_names=missing_stream_names,
invite_only=invite_only)
existing_streams += dup_streams
return existing_streams, created_streams
开发者ID:krtkmj,项目名称:zulip,代码行数:60,代码来源:streams.py
示例2: list_to_streams
def list_to_streams(streams_raw: Iterable[Mapping[str, Any]],
user_profile: UserProfile,
autocreate: bool=False) -> Tuple[List[Stream], List[Stream]]:
"""Converts list of dicts to a list of Streams, validating input in the process
For each stream name, we validate it to ensure it meets our
requirements for a proper stream name using check_stream_name.
This function in autocreate mode should be atomic: either an exception will be raised
during a precheck, or all the streams specified will have been created if applicable.
@param streams_raw The list of stream dictionaries to process;
names should already be stripped of whitespace by the caller.
@param user_profile The user for whom we are retreiving the streams
@param autocreate Whether we should create streams if they don't already exist
"""
# Validate all streams, getting extant ones, then get-or-creating the rest.
stream_set = set(stream_dict["name"] for stream_dict in streams_raw)
for stream_name in stream_set:
# Stream names should already have been stripped by the
# caller, but it makes sense to verify anyway.
assert stream_name == stream_name.strip()
check_stream_name(stream_name)
existing_streams = [] # type: List[Stream]
missing_stream_dicts = [] # type: List[Mapping[str, Any]]
existing_stream_map = bulk_get_streams(user_profile.realm, stream_set)
for stream_dict in streams_raw:
stream_name = stream_dict["name"]
stream = existing_stream_map.get(stream_name.lower())
if stream is None:
missing_stream_dicts.append(stream_dict)
else:
existing_streams.append(stream)
if len(missing_stream_dicts) == 0:
# This is the happy path for callers who expected all of these
# streams to exist already.
created_streams = [] # type: List[Stream]
else:
# autocreate=True path starts here
if not user_profile.can_create_streams():
raise JsonableError(_('User cannot create streams.'))
elif not autocreate:
raise JsonableError(_("Stream(s) (%s) do not exist") % ", ".join(
stream_dict["name"] for stream_dict in missing_stream_dicts))
# We already filtered out existing streams, so dup_streams
# will normally be an empty list below, but we protect against somebody
# else racing to create the same stream. (This is not an entirely
# paranoid approach, since often on Zulip two people will discuss
# creating a new stream, and both people eagerly do it.)
created_streams, dup_streams = create_streams_if_needed(realm=user_profile.realm,
stream_dicts=missing_stream_dicts)
existing_streams += dup_streams
return existing_streams, created_streams
开发者ID:joydeep1701,项目名称:zulip,代码行数:60,代码来源:streams.py
示例3: list_to_streams
def list_to_streams(streams_raw, user_profile, autocreate=False, invite_only=False):
# type: (Iterable[text_type], UserProfile, Optional[bool], Optional[bool]) -> Tuple[List[Stream], List[Stream]]
"""Converts plaintext stream names to a list of Streams, validating input in the process
For each stream name, we validate it to ensure it meets our
requirements for a proper stream name: that is, that it is shorter
than Stream.MAX_NAME_LENGTH characters and passes
valid_stream_name.
This function in autocreate mode should be atomic: either an exception will be raised
during a precheck, or all the streams specified will have been created if applicable.
@param streams_raw The list of stream names to process
@param user_profile The user for whom we are retreiving the streams
@param autocreate Whether we should create streams if they don't already exist
@param invite_only Whether newly created streams should have the invite_only bit set
"""
existing_streams = []
created_streams = []
# Validate all streams, getting extant ones, then get-or-creating the rest.
stream_set = set(stream_name.strip() for stream_name in streams_raw)
rejects = []
for stream_name in stream_set:
if len(stream_name) > Stream.MAX_NAME_LENGTH:
raise JsonableError(_("Stream name (%s) too long.") % (stream_name,))
if not valid_stream_name(stream_name):
raise JsonableError(_("Invalid stream name (%s).") % (stream_name,))
existing_stream_map = bulk_get_streams(user_profile.realm, stream_set)
for stream_name in stream_set:
stream = existing_stream_map.get(stream_name.lower())
if stream is None:
rejects.append(stream_name)
else:
existing_streams.append(stream)
if rejects:
if not user_profile.can_create_streams():
raise JsonableError(_('User cannot create streams.'))
elif not autocreate:
raise JsonableError(_("Stream(s) (%s) do not exist") % ", ".join(rejects))
for stream_name in rejects:
stream, created = create_stream_if_needed(user_profile.realm,
stream_name,
invite_only=invite_only)
if created:
created_streams.append(stream)
else:
# We already checked for existing streams above; this
# next line is present to handle races where a stream
# was created while this function was executing.
existing_streams.append(stream)
return existing_streams, created_streams
开发者ID:Kingedgar,项目名称:zulip,代码行数:55,代码来源:streams.py
示例4: exclude_muting_conditions
def exclude_muting_conditions(user_profile, narrow):
# type: (UserProfile, Optional[Iterable[Dict[str, Any]]]) -> List[Selectable]
conditions = []
stream_name = get_stream_name_from_narrow(narrow)
if stream_name is None:
rows = Subscription.objects.filter(
user_profile=user_profile,
active=True,
in_home_view=False,
recipient__type=Recipient.STREAM
).values('recipient_id')
muted_recipient_ids = [row['recipient_id'] for row in rows]
if len(muted_recipient_ids) > 0:
# Only add the condition if we have muted streams to simplify/avoid warnings.
condition = not_(column("recipient_id").in_(muted_recipient_ids))
conditions.append(condition)
muted_topics = ujson.loads(user_profile.muted_topics)
if muted_topics:
if stream_name is not None:
muted_topics = [m for m in muted_topics if m[0].lower() == stream_name]
if not muted_topics:
return conditions
muted_streams = bulk_get_streams(user_profile.realm,
[muted[0] for muted in muted_topics])
muted_recipients = bulk_get_recipients(Recipient.STREAM,
[stream.id for stream in six.itervalues(muted_streams)])
recipient_map = dict((s.name.lower(), muted_recipients[s.id].id)
for s in six.itervalues(muted_streams))
muted_topics = [m for m in muted_topics if m[0].lower() in recipient_map]
if muted_topics:
def mute_cond(muted):
# type: (Tuple[str, str]) -> Selectable
stream_cond = column("recipient_id") == recipient_map[muted[0].lower()]
topic_cond = func.upper(column("subject")) == func.upper(muted[1])
return and_(stream_cond, topic_cond)
condition = not_(or_(*list(map(mute_cond, muted_topics))))
return conditions + [condition]
return conditions
开发者ID:souravbadami,项目名称:zulip,代码行数:45,代码来源:messages.py
示例5: exclude_muting_conditions
def exclude_muting_conditions(user_profile, narrow):
conditions = []
stream_name = get_stream_name_from_narrow(narrow)
if stream_name is None:
rows = Subscription.objects.filter(
user_profile=user_profile,
active=True,
in_home_view=False,
recipient__type=Recipient.STREAM
).values('recipient_id')
muted_recipient_ids = [row['recipient_id'] for row in rows]
condition = not_(column("recipient_id").in_(muted_recipient_ids))
conditions.append(condition)
muted_topics = ujson.loads(user_profile.muted_topics)
if muted_topics:
if stream_name is not None:
muted_topics = [m for m in muted_topics if m[0].lower() == stream_name]
if not muted_topics:
return conditions
muted_streams = bulk_get_streams(user_profile.realm,
[muted[0] for muted in muted_topics])
muted_recipients = bulk_get_recipients(Recipient.STREAM,
[stream.id for stream in six.itervalues(muted_streams)])
recipient_map = dict((s.name.lower(), muted_recipients[s.id].id)
for s in six.itervalues(muted_streams))
muted_topics = [m for m in muted_topics if m[0].lower() in recipient_map]
if muted_topics:
def mute_cond(muted):
stream_cond = column("recipient_id") == recipient_map[muted[0].lower()]
topic_cond = func.upper(column("subject")) == func.upper(muted[1])
return and_(stream_cond, topic_cond)
condition = not_(or_(*list(map(mute_cond, muted_topics))))
return conditions + [condition]
return conditions
开发者ID:danshev,项目名称:zulip,代码行数:41,代码来源:messages.py
注:本文中的zerver.models.bulk_get_streams函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论