本文整理汇总了Python中zerver.lib.validator.check_list函数的典型用法代码示例。如果您正苦于以下问题:Python check_list函数的具体用法?Python check_list怎么用?Python check_list使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了check_list函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: remove_subscriptions_backend
def remove_subscriptions_backend(request, user_profile,
streams_raw = REQ("subscriptions", validator=check_list(check_string)),
principals = REQ(validator=check_list(check_string), default=None)):
removing_someone_else = principals and \
set(principals) != set((user_profile.email,))
if removing_someone_else and not user_profile.is_realm_admin:
# You can only unsubscribe other people from a stream if you are a realm
# admin.
return json_error("This action requires administrative rights")
streams, _ = list_to_streams(streams_raw, user_profile)
for stream in streams:
if removing_someone_else and stream.invite_only and \
not subscribed_to_stream(user_profile, stream):
# Even as an admin, you can't remove other people from an
# invite-only stream you're not on.
return json_error("Cannot administer invite-only streams this way")
if principals:
people_to_unsub = set(principal_to_user_profile(
user_profile, principal) for principal in principals)
else:
people_to_unsub = set([user_profile])
result = dict(removed=[], not_subscribed=[]) # type: Dict[str, List[str]]
(removed, not_subscribed) = bulk_remove_subscriptions(people_to_unsub, streams)
for (subscriber, stream) in removed:
result["removed"].append(stream.name)
for (subscriber, stream) in not_subscribed:
result["not_subscribed"].append(stream.name)
return json_success(result)
开发者ID:anteq,项目名称:zulip,代码行数:35,代码来源:streams.py
示例2: test_muted_topics_events
def test_muted_topics_events(self):
muted_topics_checker = check_dict([
('type', equals('muted_topics')),
('muted_topics', check_list(check_list(check_string, 2))),
])
events = self.do_test(lambda: do_set_muted_topics(self.user_profile, [["Denmark", "topic"]]))
error = muted_topics_checker('events[0]', events[0])
self.assert_on_error(error)
开发者ID:seanly,项目名称:zulip,代码行数:8,代码来源:test_events.py
示例3: remote_server_post_analytics
def remote_server_post_analytics(request: HttpRequest,
entity: Union[UserProfile, RemoteZulipServer],
realm_counts: List[Dict[str, Any]]=REQ(
validator=check_list(check_dict_only([
('property', check_string),
('realm', check_int),
('id', check_int),
('end_time', check_float),
('subgroup', check_none_or(check_string)),
('value', check_int),
]))),
installation_counts: List[Dict[str, Any]]=REQ(
validator=check_list(check_dict_only([
('property', check_string),
('id', check_int),
('end_time', check_float),
('subgroup', check_none_or(check_string)),
('value', check_int),
])))) -> HttpResponse:
validate_entity(entity)
server = cast(RemoteZulipServer, entity)
validate_count_stats(server, RemoteRealmCount, realm_counts)
validate_count_stats(server, RemoteInstallationCount, installation_counts)
BATCH_SIZE = 1000
while len(realm_counts) > 0:
batch = realm_counts[0:BATCH_SIZE]
realm_counts = realm_counts[BATCH_SIZE:]
objects_to_create = []
for item in batch:
objects_to_create.append(RemoteRealmCount(
property=item['property'],
realm_id=item['realm'],
remote_id=item['id'],
server=server,
end_time=datetime.datetime.fromtimestamp(item['end_time'], tz=timezone_utc),
subgroup=item['subgroup'],
value=item['value']))
RemoteRealmCount.objects.bulk_create(objects_to_create)
while len(installation_counts) > 0:
batch = installation_counts[0:BATCH_SIZE]
installation_counts = installation_counts[BATCH_SIZE:]
objects_to_create = []
for item in batch:
objects_to_create.append(RemoteInstallationCount(
property=item['property'],
remote_id=item['id'],
server=server,
end_time=datetime.datetime.fromtimestamp(item['end_time'], tz=timezone_utc),
subgroup=item['subgroup'],
value=item['value']))
RemoteInstallationCount.objects.bulk_create(objects_to_create)
return json_success()
开发者ID:BakerWang,项目名称:zulip,代码行数:57,代码来源:views.py
示例4: get_events_backend
def get_events_backend(
request,
user_profile,
handler,
user_client=REQ(converter=get_client, default=None),
last_event_id=REQ(converter=int, default=None),
queue_id=REQ(default=None),
apply_markdown=REQ(default=False, validator=check_bool),
all_public_streams=REQ(default=False, validator=check_bool),
event_types=REQ(default=None, validator=check_list(check_string)),
dont_block=REQ(default=False, validator=check_bool),
narrow=REQ(default=[], validator=check_list(None)),
lifespan_secs=REQ(default=0, converter=int),
):
# type: (HttpRequest, UserProfile, BaseHandler, Optional[Client], Optional[int], Optional[List[text_type]], bool, bool, Optional[text_type], bool, Iterable[Sequence[text_type]], int) -> Union[HttpResponse, _RespondAsynchronously]
if user_client is None:
user_client = request.client
events_query = dict(
user_profile_id=user_profile.id,
user_profile_email=user_profile.email,
queue_id=queue_id,
last_event_id=last_event_id,
event_types=event_types,
client_type_name=user_client.name,
all_public_streams=all_public_streams,
lifespan_secs=lifespan_secs,
narrow=narrow,
dont_block=dont_block,
handler_id=handler.handler_id,
)
if queue_id is None:
events_query["new_queue_data"] = dict(
user_profile_id=user_profile.id,
realm_id=user_profile.realm.id,
user_profile_email=user_profile.email,
event_types=event_types,
client_type_name=user_client.name,
apply_markdown=apply_markdown,
all_public_streams=all_public_streams,
queue_timeout=lifespan_secs,
last_connection_time=time.time(),
narrow=narrow,
)
result = fetch_events(events_query)
if "extra_log_data" in result:
request._log_data["extra"] = result["extra_log_data"]
if result["type"] == "async":
handler._request = request
return RespondAsynchronously
if result["type"] == "error":
return json_error(result["message"])
return json_success(result["response"])
开发者ID:testmana2,项目名称:zulip,代码行数:56,代码来源:tornadoviews.py
示例5: get_events_backend
def get_events_backend(request: HttpRequest, user_profile: UserProfile, handler: BaseHandler,
user_client: Optional[Client]=REQ(converter=get_client, default=None),
last_event_id: Optional[int]=REQ(converter=int, default=None),
queue_id: Optional[List[str]]=REQ(default=None),
apply_markdown: bool=REQ(default=False, validator=check_bool),
client_gravatar: bool=REQ(default=False, validator=check_bool),
all_public_streams: bool=REQ(default=False, validator=check_bool),
event_types: Optional[str]=REQ(default=None, validator=check_list(check_string)),
dont_block: bool=REQ(default=False, validator=check_bool),
narrow: Iterable[Sequence[str]]=REQ(default=[], validator=check_list(None)),
lifespan_secs: int=REQ(default=0, converter=int)
) -> Union[HttpResponse, _RespondAsynchronously]:
if user_client is None:
valid_user_client = request.client
else:
valid_user_client = user_client
events_query = dict(
user_profile_id = user_profile.id,
user_profile_email = user_profile.email,
queue_id = queue_id,
last_event_id = last_event_id,
event_types = event_types,
client_type_name = valid_user_client.name,
all_public_streams = all_public_streams,
lifespan_secs = lifespan_secs,
narrow = narrow,
dont_block = dont_block,
handler_id = handler.handler_id)
if queue_id is None:
events_query['new_queue_data'] = dict(
user_profile_id = user_profile.id,
realm_id = user_profile.realm_id,
user_profile_email = user_profile.email,
event_types = event_types,
client_type_name = valid_user_client.name,
apply_markdown = apply_markdown,
client_gravatar = client_gravatar,
all_public_streams = all_public_streams,
queue_timeout = lifespan_secs,
last_connection_time = time.time(),
narrow = narrow)
result = fetch_events(events_query)
if "extra_log_data" in result:
request._log_data['extra'] = result["extra_log_data"]
if result["type"] == "async":
handler._request = request
return RespondAsynchronously
if result["type"] == "error":
raise result["exception"]
return json_success(result["response"])
开发者ID:284928489,项目名称:zulip,代码行数:54,代码来源:views.py
示例6: update_subscriptions_backend
def update_subscriptions_backend(request, user_profile,
delete=REQ(validator=check_list(check_string), default=[]),
add=REQ(validator=check_list(check_dict([('name', check_string)])), default=[])):
# type: (HttpRequest, UserProfile, Iterable[Text], Iterable[Mapping[str, Any]]) -> HttpResponse
if not add and not delete:
return json_error(_('Nothing to do. Specify at least one of "add" or "delete".'))
method_kwarg_pairs = [
(add_subscriptions_backend, dict(streams_raw=add)),
(remove_subscriptions_backend, dict(streams_raw=delete))
] # type: List[FuncKwargPair]
return compose_views(request, user_profile, method_kwarg_pairs)
开发者ID:aakash-cr7,项目名称:zulip,代码行数:12,代码来源:streams.py
示例7: get_events_backend
def get_events_backend(request, user_profile, handler = None,
user_client = REQ(converter=get_client, default=None),
last_event_id = REQ(converter=int, default=None),
queue_id = REQ(default=None),
apply_markdown = REQ(default=False, validator=check_bool),
all_public_streams = REQ(default=False, validator=check_bool),
event_types = REQ(default=None, validator=check_list(check_string)),
dont_block = REQ(default=False, validator=check_bool),
narrow = REQ(default=[], validator=check_list(None)),
lifespan_secs = REQ(default=0, converter=int)):
if user_client is None:
user_client = request.client
was_connected = False
orig_queue_id = queue_id
if queue_id is None:
if dont_block:
client = allocate_client_descriptor(user_profile.id, user_profile.realm.id,
event_types, user_client, apply_markdown,
all_public_streams, lifespan_secs,
narrow=narrow)
queue_id = client.event_queue.id
else:
return json_error("Missing 'queue_id' argument")
else:
if last_event_id is None:
return json_error("Missing 'last_event_id' argument")
client = get_client_descriptor(queue_id)
if client is None:
return json_error("Bad event queue id: %s" % (queue_id,))
if user_profile.id != client.user_profile_id:
return json_error("You are not authorized to get events from this queue")
client.event_queue.prune(last_event_id)
was_connected = client.finish_current_handler()
if not client.event_queue.empty() or dont_block:
ret = {'events': client.event_queue.contents()}
if orig_queue_id is None:
ret['queue_id'] = queue_id
request._log_data['extra'] = "[%s/%s]" % (queue_id, len(ret["events"]))
if was_connected:
request._log_data['extra'] += " [was connected]"
return json_success(ret)
handler._request = request
if was_connected:
logging.info("Disconnected handler for queue %s (%s/%s)" % (queue_id, user_profile.email,
user_client.name))
client.connect_handler(handler)
# runtornado recognizes this special return value.
return RespondAsynchronously
开发者ID:Gabriel0402,项目名称:zulip,代码行数:52,代码来源:tornadoviews.py
示例8: events_register_backend
def events_register_backend(request, user_profile, apply_markdown=True,
all_public_streams=None,
event_types=REQ(validator=check_list(check_string), default=None),
narrow=REQ(validator=check_list(check_list(check_string, length=2)), default=[]),
queue_lifespan_secs=REQ(converter=int, default=0)):
# type: (HttpRequest, UserProfile, bool, Optional[bool], Optional[Iterable[str]], Iterable[Sequence[Text]], int) -> HttpResponse
all_public_streams = _default_all_public_streams(user_profile, all_public_streams)
narrow = _default_narrow(user_profile, narrow)
ret = do_events_register(user_profile, request.client, apply_markdown,
event_types, queue_lifespan_secs, all_public_streams,
narrow=narrow)
return json_success(ret)
开发者ID:TomaszKolek,项目名称:zulip,代码行数:13,代码来源:events_register.py
示例9: update_subscriptions_backend
def update_subscriptions_backend(request, user_profile,
delete=REQ(validator=check_list(check_string), default=[]),
add=REQ(validator=check_list(check_dict([['name', check_string]])), default=[])):
if not add and not delete:
return json_error('Nothing to do. Specify at least one of "add" or "delete".')
json_dict = {} # type: Dict[str, Any]
for method, items in ((add_subscriptions_backend, add), (remove_subscriptions_backend, delete)):
response = method(request, user_profile, streams_raw=items)
if response.status_code != 200:
transaction.rollback()
return response
json_dict.update(ujson.loads(response.content))
return json_success(json_dict)
开发者ID:anteq,项目名称:zulip,代码行数:14,代码来源:streams.py
示例10: update_user_group_backend
def update_user_group_backend(request: HttpRequest, user_profile: UserProfile,
user_group_id: int=REQ(validator=check_int),
delete: List[int]=REQ(validator=check_list(check_int), default=[]),
add: List[int]=REQ(validator=check_list(check_int), default=[])
) -> HttpResponse:
if not add and not delete:
return json_error(_('Nothing to do. Specify at least one of "add" or "delete".'))
method_kwarg_pairs = [
(add_members_to_group_backend,
dict(user_group_id=user_group_id, members=add)),
(remove_members_from_group_backend,
dict(user_group_id=user_group_id, members=delete))
] # type: List[FuncKwargPair]
return compose_views(request, user_profile, method_kwarg_pairs)
开发者ID:BakerWang,项目名称:zulip,代码行数:15,代码来源:user_groups.py
示例11: add_user_group
def add_user_group(request: HttpRequest, user_profile: UserProfile,
name: str=REQ(),
members: List[int]=REQ(validator=check_list(check_int), default=[]),
description: str=REQ()) -> HttpResponse:
user_profiles = user_ids_to_users(members, user_profile.realm)
check_add_user_group(user_profile.realm, name, user_profiles, description)
return json_success()
开发者ID:BakerWang,项目名称:zulip,代码行数:7,代码来源:user_groups.py
示例12: update_subscriptions_backend
def update_subscriptions_backend(request, user_profile,
delete=REQ(validator=check_list(check_string), default=[]),
add=REQ(validator=check_list(check_dict([('name', check_string)])), default=[])):
# type: (HttpRequest, UserProfile, Iterable[text_type], Iterable[Mapping[str, Any]]) -> HttpResponse
if not add and not delete:
return json_error(_('Nothing to do. Specify at least one of "add" or "delete".'))
json_dict = {} # type: Dict[str, Any]
method_items_pairs = ((add_subscriptions_backend, add), (remove_subscriptions_backend, delete)) # type: Tuple[FuncItPair, FuncItPair]
for method, items in method_items_pairs:
response = method(request, user_profile, streams_raw=items)
if response.status_code != 200:
transaction.rollback()
return response
json_dict.update(ujson.loads(response.content))
return json_success(json_dict)
开发者ID:HKingz,项目名称:zulip,代码行数:16,代码来源:streams.py
示例13: update_user_custom_profile_data
def update_user_custom_profile_data(
request: HttpRequest,
user_profile: UserProfile,
data: List[Dict[str, Union[int, str]]]=REQ(validator=check_list(
check_dict([('id', check_int)])))) -> HttpResponse:
for item in data:
field_id = item['id']
try:
field = CustomProfileField.objects.get(id=field_id)
except CustomProfileField.DoesNotExist:
return json_error(_('Field id {id} not found.').format(id=field_id))
validators = CustomProfileField.FIELD_VALIDATORS
extended_validators = CustomProfileField.EXTENDED_FIELD_VALIDATORS
field_type = field.field_type
value = item['value']
var_name = '{}'.format(field.name)
if field_type in validators:
validator = validators[field_type]
result = validator(var_name, value)
else:
# Check extended validators.
extended_validator = extended_validators[field_type]
field_data = field.field_data
result = extended_validator(var_name, field_data, value)
if result is not None:
return json_error(result)
do_update_user_custom_profile_data(user_profile, data)
# We need to call this explicitly otherwise constraints are not check
return json_success()
开发者ID:umairwaheed,项目名称:zulip,代码行数:32,代码来源:custom_profile_fields.py
示例14: test_register_events
def test_register_events(self):
realm_user_add_checker = check_dict([
('type', equals('realm_user')),
('op', equals('add')),
('person', check_dict([
('email', check_string),
('full_name', check_string),
('is_admin', check_bool),
('is_bot', check_bool),
])),
])
stream_create_checker = check_dict([
('type', equals('stream')),
('op', equals('create')),
('streams', check_list(check_dict([
('description', check_string),
('invite_only', check_bool),
('name', check_string),
('stream_id', check_int),
])))
])
events = self.do_test(lambda: self.register("test1", "test1"))
error = realm_user_add_checker('events[0]', events[0])
self.assert_on_error(error)
error = stream_create_checker('events[1]', events[1])
self.assert_on_error(error)
开发者ID:seanly,项目名称:zulip,代码行数:27,代码来源:test_events.py
示例15: messages_in_narrow_backend
def messages_in_narrow_backend(request, user_profile,
msg_ids = REQ(validator=check_list(check_int)),
narrow = REQ(converter=narrow_parameter)):
# type: (HttpRequest, UserProfile, List[int], List[Dict[str, Any]]) -> HttpResponse
# Note that this function will only work on messages the user
# actually received
# TODO: We assume that the narrow is a search. For now this works because
# the browser only ever calls this function for searches, since it can't
# apply that narrow operator itself.
query = select([column("message_id"), column("subject"), column("rendered_content")],
and_(column("user_profile_id") == literal(user_profile.id),
column("message_id").in_(msg_ids)),
join(table("zerver_usermessage"), table("zerver_message"),
literal_column("zerver_usermessage.message_id") ==
literal_column("zerver_message.id")))
builder = NarrowBuilder(user_profile, column("message_id"))
for term in narrow:
query = builder.add_term(query, term)
sa_conn = get_sqlalchemy_connection()
query_result = list(sa_conn.execute(query).fetchall())
search_fields = dict()
for row in query_result:
(message_id, subject, rendered_content, content_matches, subject_matches) = row
search_fields[message_id] = get_search_fields(rendered_content, subject,
content_matches, subject_matches)
return json_success({"messages": search_fields})
开发者ID:souravbadami,项目名称:zulip,代码行数:33,代码来源:messages.py
示例16: test_check_dict
def test_check_dict(self):
keys = [
('names', check_list(check_string)),
('city', check_string),
]
x = {
'names': ['alice', 'bob'],
'city': 'Boston',
}
error = check_dict(keys)('x', x)
self.assertEqual(error, None)
x = 999
error = check_dict(keys)('x', x)
self.assertEqual(error, 'x is not a dict')
x = {}
error = check_dict(keys)('x', x)
self.assertEqual(error, 'names key is missing from x')
x = {
'names': ['alice', 'bob', {}]
}
error = check_dict(keys)('x', x)
self.assertEqual(error, 'x["names"][2] is not a string')
x = {
'names': ['alice', 'bob'],
'city': 5
}
error = check_dict(keys)('x', x)
self.assertEqual(error, 'x["city"] is not a string')
开发者ID:anindya,项目名称:zulip,代码行数:33,代码来源:test_decorators.py
示例17: test_check_list
def test_check_list(self):
x = 999
error = check_list(check_string)('x', x)
self.assertEqual(error, 'x is not a list')
x = ["hello", 5]
error = check_list(check_string)('x', x)
self.assertEqual(error, 'x[1] is not a string')
x = [["yo"], ["hello", "goodbye", 5]]
error = check_list(check_list(check_string))('x', x)
self.assertEqual(error, 'x[1][2] is not a string')
x = ["hello", "goodbye", "hello again"]
error = check_list(check_string, length=2)('x', x)
self.assertEqual(error, 'x should have exactly 2 items')
开发者ID:anindya,项目名称:zulip,代码行数:16,代码来源:test_decorators.py
示例18: remove_storage
def remove_storage(request, user_profile, keys=REQ(validator=check_list(check_string), default=None)):
# type: (HttpRequest, UserProfile, Optional[List[str]]) -> HttpResponse
keys = keys or get_keys_in_bot_storage(user_profile)
try:
remove_bot_storage(user_profile, keys)
except StateError as e:
return json_error(str(e))
return json_success()
开发者ID:joydeep1701,项目名称:zulip,代码行数:8,代码来源:storage.py
示例19: create_default_stream_group
def create_default_stream_group(request: HttpRequest, user_profile: UserProfile,
group_name: Text=REQ(), description: Text=REQ(),
stream_names: List[Text]=REQ(validator=check_list(check_string))) -> None:
streams = []
for stream_name in stream_names:
(stream, recipient, sub) = access_stream_by_name(user_profile, stream_name)
streams.append(stream)
do_create_default_stream_group(user_profile.realm, group_name, description, streams)
return json_success()
开发者ID:gnprice,项目名称:zulip,代码行数:9,代码来源:streams.py
示例20: update_message_flags
def update_message_flags(request, user_profile,
messages=REQ('messages', validator=check_list(check_int)),
operation=REQ('op'), flag=REQ('flag'),
all=REQ('all', validator=check_bool, default=False)):
request._log_data["extra"] = "[%s %s]" % (operation, flag)
do_update_message_flags(user_profile, operation, flag, messages, all)
return json_success({'result': 'success',
'messages': messages,
'msg': ''})
开发者ID:danshev,项目名称:zulip,代码行数:9,代码来源:messages.py
注:本文中的zerver.lib.validator.check_list函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论