• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python validator.check_list函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中zerver.lib.validator.check_list函数的典型用法代码示例。如果您正苦于以下问题:Python check_list函数的具体用法?Python check_list怎么用?Python check_list使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了check_list函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: remove_subscriptions_backend

def remove_subscriptions_backend(request, user_profile,
                                 streams_raw = REQ("subscriptions", validator=check_list(check_string)),
                                 principals = REQ(validator=check_list(check_string), default=None)):

    removing_someone_else = principals and \
        set(principals) != set((user_profile.email,))
    if removing_someone_else and not user_profile.is_realm_admin:
        # You can only unsubscribe other people from a stream if you are a realm
        # admin.
        return json_error("This action requires administrative rights")

    streams, _ = list_to_streams(streams_raw, user_profile)

    for stream in streams:
        if removing_someone_else and stream.invite_only and \
                not subscribed_to_stream(user_profile, stream):
            # Even as an admin, you can't remove other people from an
            # invite-only stream you're not on.
            return json_error("Cannot administer invite-only streams this way")

    if principals:
        people_to_unsub = set(principal_to_user_profile(
                user_profile, principal) for principal in principals)
    else:
        people_to_unsub = set([user_profile])

    result = dict(removed=[], not_subscribed=[]) # type: Dict[str, List[str]]
    (removed, not_subscribed) = bulk_remove_subscriptions(people_to_unsub, streams)

    for (subscriber, stream) in removed:
        result["removed"].append(stream.name)
    for (subscriber, stream) in not_subscribed:
        result["not_subscribed"].append(stream.name)

    return json_success(result)
开发者ID:anteq,项目名称:zulip,代码行数:35,代码来源:streams.py


示例2: test_muted_topics_events

 def test_muted_topics_events(self):
     muted_topics_checker = check_dict([
         ('type', equals('muted_topics')),
         ('muted_topics', check_list(check_list(check_string, 2))),
     ])
     events = self.do_test(lambda: do_set_muted_topics(self.user_profile, [["Denmark", "topic"]]))
     error = muted_topics_checker('events[0]', events[0])
     self.assert_on_error(error)
开发者ID:seanly,项目名称:zulip,代码行数:8,代码来源:test_events.py


示例3: remote_server_post_analytics

def remote_server_post_analytics(request: HttpRequest,
                                 entity: Union[UserProfile, RemoteZulipServer],
                                 realm_counts: List[Dict[str, Any]]=REQ(
                                     validator=check_list(check_dict_only([
                                         ('property', check_string),
                                         ('realm', check_int),
                                         ('id', check_int),
                                         ('end_time', check_float),
                                         ('subgroup', check_none_or(check_string)),
                                         ('value', check_int),
                                     ]))),
                                 installation_counts: List[Dict[str, Any]]=REQ(
                                     validator=check_list(check_dict_only([
                                         ('property', check_string),
                                         ('id', check_int),
                                         ('end_time', check_float),
                                         ('subgroup', check_none_or(check_string)),
                                         ('value', check_int),
                                     ])))) -> HttpResponse:
    validate_entity(entity)
    server = cast(RemoteZulipServer, entity)

    validate_count_stats(server, RemoteRealmCount, realm_counts)
    validate_count_stats(server, RemoteInstallationCount, installation_counts)

    BATCH_SIZE = 1000
    while len(realm_counts) > 0:
        batch = realm_counts[0:BATCH_SIZE]
        realm_counts = realm_counts[BATCH_SIZE:]

        objects_to_create = []
        for item in batch:
            objects_to_create.append(RemoteRealmCount(
                property=item['property'],
                realm_id=item['realm'],
                remote_id=item['id'],
                server=server,
                end_time=datetime.datetime.fromtimestamp(item['end_time'], tz=timezone_utc),
                subgroup=item['subgroup'],
                value=item['value']))
        RemoteRealmCount.objects.bulk_create(objects_to_create)

    while len(installation_counts) > 0:
        batch = installation_counts[0:BATCH_SIZE]
        installation_counts = installation_counts[BATCH_SIZE:]

        objects_to_create = []
        for item in batch:
            objects_to_create.append(RemoteInstallationCount(
                property=item['property'],
                remote_id=item['id'],
                server=server,
                end_time=datetime.datetime.fromtimestamp(item['end_time'], tz=timezone_utc),
                subgroup=item['subgroup'],
                value=item['value']))
        RemoteInstallationCount.objects.bulk_create(objects_to_create)
    return json_success()
开发者ID:BakerWang,项目名称:zulip,代码行数:57,代码来源:views.py


示例4: get_events_backend

def get_events_backend(
    request,
    user_profile,
    handler,
    user_client=REQ(converter=get_client, default=None),
    last_event_id=REQ(converter=int, default=None),
    queue_id=REQ(default=None),
    apply_markdown=REQ(default=False, validator=check_bool),
    all_public_streams=REQ(default=False, validator=check_bool),
    event_types=REQ(default=None, validator=check_list(check_string)),
    dont_block=REQ(default=False, validator=check_bool),
    narrow=REQ(default=[], validator=check_list(None)),
    lifespan_secs=REQ(default=0, converter=int),
):
    # type: (HttpRequest, UserProfile, BaseHandler, Optional[Client], Optional[int], Optional[List[text_type]], bool, bool, Optional[text_type], bool, Iterable[Sequence[text_type]], int) -> Union[HttpResponse, _RespondAsynchronously]
    if user_client is None:
        user_client = request.client

    events_query = dict(
        user_profile_id=user_profile.id,
        user_profile_email=user_profile.email,
        queue_id=queue_id,
        last_event_id=last_event_id,
        event_types=event_types,
        client_type_name=user_client.name,
        all_public_streams=all_public_streams,
        lifespan_secs=lifespan_secs,
        narrow=narrow,
        dont_block=dont_block,
        handler_id=handler.handler_id,
    )

    if queue_id is None:
        events_query["new_queue_data"] = dict(
            user_profile_id=user_profile.id,
            realm_id=user_profile.realm.id,
            user_profile_email=user_profile.email,
            event_types=event_types,
            client_type_name=user_client.name,
            apply_markdown=apply_markdown,
            all_public_streams=all_public_streams,
            queue_timeout=lifespan_secs,
            last_connection_time=time.time(),
            narrow=narrow,
        )

    result = fetch_events(events_query)
    if "extra_log_data" in result:
        request._log_data["extra"] = result["extra_log_data"]

    if result["type"] == "async":
        handler._request = request
        return RespondAsynchronously
    if result["type"] == "error":
        return json_error(result["message"])
    return json_success(result["response"])
开发者ID:testmana2,项目名称:zulip,代码行数:56,代码来源:tornadoviews.py


示例5: get_events_backend

def get_events_backend(request: HttpRequest, user_profile: UserProfile, handler: BaseHandler,
                       user_client: Optional[Client]=REQ(converter=get_client, default=None),
                       last_event_id: Optional[int]=REQ(converter=int, default=None),
                       queue_id: Optional[List[str]]=REQ(default=None),
                       apply_markdown: bool=REQ(default=False, validator=check_bool),
                       client_gravatar: bool=REQ(default=False, validator=check_bool),
                       all_public_streams: bool=REQ(default=False, validator=check_bool),
                       event_types: Optional[str]=REQ(default=None, validator=check_list(check_string)),
                       dont_block: bool=REQ(default=False, validator=check_bool),
                       narrow: Iterable[Sequence[str]]=REQ(default=[], validator=check_list(None)),
                       lifespan_secs: int=REQ(default=0, converter=int)
                       ) -> Union[HttpResponse, _RespondAsynchronously]:
    if user_client is None:
        valid_user_client = request.client
    else:
        valid_user_client = user_client

    events_query = dict(
        user_profile_id = user_profile.id,
        user_profile_email = user_profile.email,
        queue_id = queue_id,
        last_event_id = last_event_id,
        event_types = event_types,
        client_type_name = valid_user_client.name,
        all_public_streams = all_public_streams,
        lifespan_secs = lifespan_secs,
        narrow = narrow,
        dont_block = dont_block,
        handler_id = handler.handler_id)

    if queue_id is None:
        events_query['new_queue_data'] = dict(
            user_profile_id = user_profile.id,
            realm_id = user_profile.realm_id,
            user_profile_email = user_profile.email,
            event_types = event_types,
            client_type_name = valid_user_client.name,
            apply_markdown = apply_markdown,
            client_gravatar = client_gravatar,
            all_public_streams = all_public_streams,
            queue_timeout = lifespan_secs,
            last_connection_time = time.time(),
            narrow = narrow)

    result = fetch_events(events_query)
    if "extra_log_data" in result:
        request._log_data['extra'] = result["extra_log_data"]

    if result["type"] == "async":
        handler._request = request
        return RespondAsynchronously
    if result["type"] == "error":
        raise result["exception"]
    return json_success(result["response"])
开发者ID:284928489,项目名称:zulip,代码行数:54,代码来源:views.py


示例6: update_subscriptions_backend

def update_subscriptions_backend(request, user_profile,
                                 delete=REQ(validator=check_list(check_string), default=[]),
                                 add=REQ(validator=check_list(check_dict([('name', check_string)])), default=[])):
    # type: (HttpRequest, UserProfile, Iterable[Text], Iterable[Mapping[str, Any]]) -> HttpResponse
    if not add and not delete:
        return json_error(_('Nothing to do. Specify at least one of "add" or "delete".'))

    method_kwarg_pairs = [
        (add_subscriptions_backend, dict(streams_raw=add)),
        (remove_subscriptions_backend, dict(streams_raw=delete))
    ] # type: List[FuncKwargPair]
    return compose_views(request, user_profile, method_kwarg_pairs)
开发者ID:aakash-cr7,项目名称:zulip,代码行数:12,代码来源:streams.py


示例7: get_events_backend

def get_events_backend(request, user_profile, handler = None,
                       user_client = REQ(converter=get_client, default=None),
                       last_event_id = REQ(converter=int, default=None),
                       queue_id = REQ(default=None),
                       apply_markdown = REQ(default=False, validator=check_bool),
                       all_public_streams = REQ(default=False, validator=check_bool),
                       event_types = REQ(default=None, validator=check_list(check_string)),
                       dont_block = REQ(default=False, validator=check_bool),
                       narrow = REQ(default=[], validator=check_list(None)),
                       lifespan_secs = REQ(default=0, converter=int)):
    if user_client is None:
        user_client = request.client

    was_connected = False
    orig_queue_id = queue_id
    if queue_id is None:
        if dont_block:
            client = allocate_client_descriptor(user_profile.id, user_profile.realm.id,
                                                event_types, user_client, apply_markdown,
                                                all_public_streams, lifespan_secs,
                                                narrow=narrow)
            queue_id = client.event_queue.id
        else:
            return json_error("Missing 'queue_id' argument")
    else:
        if last_event_id is None:
            return json_error("Missing 'last_event_id' argument")
        client = get_client_descriptor(queue_id)
        if client is None:
            return json_error("Bad event queue id: %s" % (queue_id,))
        if user_profile.id != client.user_profile_id:
            return json_error("You are not authorized to get events from this queue")
        client.event_queue.prune(last_event_id)
        was_connected = client.finish_current_handler()

    if not client.event_queue.empty() or dont_block:
        ret = {'events': client.event_queue.contents()}
        if orig_queue_id is None:
            ret['queue_id'] = queue_id
        request._log_data['extra'] = "[%s/%s]" % (queue_id, len(ret["events"]))
        if was_connected:
            request._log_data['extra'] += " [was connected]"
        return json_success(ret)

    handler._request = request
    if was_connected:
        logging.info("Disconnected handler for queue %s (%s/%s)" % (queue_id, user_profile.email,
                                                                    user_client.name))
    client.connect_handler(handler)

    # runtornado recognizes this special return value.
    return RespondAsynchronously
开发者ID:Gabriel0402,项目名称:zulip,代码行数:52,代码来源:tornadoviews.py


示例8: events_register_backend

def events_register_backend(request, user_profile, apply_markdown=True,
                            all_public_streams=None,
                            event_types=REQ(validator=check_list(check_string), default=None),
                            narrow=REQ(validator=check_list(check_list(check_string, length=2)), default=[]),
                            queue_lifespan_secs=REQ(converter=int, default=0)):
    # type: (HttpRequest, UserProfile, bool, Optional[bool], Optional[Iterable[str]], Iterable[Sequence[Text]], int) -> HttpResponse
    all_public_streams = _default_all_public_streams(user_profile, all_public_streams)
    narrow = _default_narrow(user_profile, narrow)

    ret = do_events_register(user_profile, request.client, apply_markdown,
                             event_types, queue_lifespan_secs, all_public_streams,
                             narrow=narrow)
    return json_success(ret)
开发者ID:TomaszKolek,项目名称:zulip,代码行数:13,代码来源:events_register.py


示例9: update_subscriptions_backend

def update_subscriptions_backend(request, user_profile,
                                 delete=REQ(validator=check_list(check_string), default=[]),
                                 add=REQ(validator=check_list(check_dict([['name', check_string]])), default=[])):
    if not add and not delete:
        return json_error('Nothing to do. Specify at least one of "add" or "delete".')

    json_dict = {} # type: Dict[str, Any]
    for method, items in ((add_subscriptions_backend, add), (remove_subscriptions_backend, delete)):
        response = method(request, user_profile, streams_raw=items)
        if response.status_code != 200:
            transaction.rollback()
            return response
        json_dict.update(ujson.loads(response.content))
    return json_success(json_dict)
开发者ID:anteq,项目名称:zulip,代码行数:14,代码来源:streams.py


示例10: update_user_group_backend

def update_user_group_backend(request: HttpRequest, user_profile: UserProfile,
                              user_group_id: int=REQ(validator=check_int),
                              delete: List[int]=REQ(validator=check_list(check_int), default=[]),
                              add: List[int]=REQ(validator=check_list(check_int), default=[])
                              ) -> HttpResponse:
    if not add and not delete:
        return json_error(_('Nothing to do. Specify at least one of "add" or "delete".'))

    method_kwarg_pairs = [
        (add_members_to_group_backend,
         dict(user_group_id=user_group_id, members=add)),
        (remove_members_from_group_backend,
         dict(user_group_id=user_group_id, members=delete))
    ]  # type: List[FuncKwargPair]
    return compose_views(request, user_profile, method_kwarg_pairs)
开发者ID:BakerWang,项目名称:zulip,代码行数:15,代码来源:user_groups.py


示例11: add_user_group

def add_user_group(request: HttpRequest, user_profile: UserProfile,
                   name: str=REQ(),
                   members: List[int]=REQ(validator=check_list(check_int), default=[]),
                   description: str=REQ()) -> HttpResponse:
    user_profiles = user_ids_to_users(members, user_profile.realm)
    check_add_user_group(user_profile.realm, name, user_profiles, description)
    return json_success()
开发者ID:BakerWang,项目名称:zulip,代码行数:7,代码来源:user_groups.py


示例12: update_subscriptions_backend

def update_subscriptions_backend(request, user_profile,
                                 delete=REQ(validator=check_list(check_string), default=[]),
                                 add=REQ(validator=check_list(check_dict([('name', check_string)])), default=[])):
    # type: (HttpRequest, UserProfile, Iterable[text_type], Iterable[Mapping[str, Any]]) -> HttpResponse
    if not add and not delete:
        return json_error(_('Nothing to do. Specify at least one of "add" or "delete".'))

    json_dict = {} # type: Dict[str, Any]
    method_items_pairs = ((add_subscriptions_backend, add), (remove_subscriptions_backend, delete)) # type: Tuple[FuncItPair, FuncItPair]
    for method, items in method_items_pairs:
        response = method(request, user_profile, streams_raw=items)
        if response.status_code != 200:
            transaction.rollback()
            return response
        json_dict.update(ujson.loads(response.content))
    return json_success(json_dict)
开发者ID:HKingz,项目名称:zulip,代码行数:16,代码来源:streams.py


示例13: update_user_custom_profile_data

def update_user_custom_profile_data(
        request: HttpRequest,
        user_profile: UserProfile,
        data: List[Dict[str, Union[int, str]]]=REQ(validator=check_list(
            check_dict([('id', check_int)])))) -> HttpResponse:
    for item in data:
        field_id = item['id']
        try:
            field = CustomProfileField.objects.get(id=field_id)
        except CustomProfileField.DoesNotExist:
            return json_error(_('Field id {id} not found.').format(id=field_id))

        validators = CustomProfileField.FIELD_VALIDATORS
        extended_validators = CustomProfileField.EXTENDED_FIELD_VALIDATORS
        field_type = field.field_type
        value = item['value']
        var_name = '{}'.format(field.name)
        if field_type in validators:
            validator = validators[field_type]
            result = validator(var_name, value)
        else:
            # Check extended validators.
            extended_validator = extended_validators[field_type]
            field_data = field.field_data
            result = extended_validator(var_name, field_data, value)

        if result is not None:
            return json_error(result)

    do_update_user_custom_profile_data(user_profile, data)
    # We need to call this explicitly otherwise constraints are not check
    return json_success()
开发者ID:umairwaheed,项目名称:zulip,代码行数:32,代码来源:custom_profile_fields.py


示例14: test_register_events

    def test_register_events(self):
        realm_user_add_checker = check_dict([
            ('type', equals('realm_user')),
            ('op', equals('add')),
            ('person', check_dict([
                ('email', check_string),
                ('full_name', check_string),
                ('is_admin', check_bool),
                ('is_bot', check_bool),
            ])),
        ])
        stream_create_checker = check_dict([
            ('type', equals('stream')),
            ('op', equals('create')),
            ('streams', check_list(check_dict([
                ('description', check_string),
                ('invite_only', check_bool),
                ('name', check_string),
                ('stream_id', check_int),
            ])))
        ])

        events = self.do_test(lambda: self.register("test1", "test1"))
        error = realm_user_add_checker('events[0]', events[0])
        self.assert_on_error(error)
        error = stream_create_checker('events[1]', events[1])
        self.assert_on_error(error)
开发者ID:seanly,项目名称:zulip,代码行数:27,代码来源:test_events.py


示例15: messages_in_narrow_backend

def messages_in_narrow_backend(request, user_profile,
                               msg_ids = REQ(validator=check_list(check_int)),
                               narrow = REQ(converter=narrow_parameter)):
    # type: (HttpRequest, UserProfile, List[int], List[Dict[str, Any]]) -> HttpResponse

    # Note that this function will only work on messages the user
    # actually received

    # TODO: We assume that the narrow is a search.  For now this works because
    # the browser only ever calls this function for searches, since it can't
    # apply that narrow operator itself.

    query = select([column("message_id"), column("subject"), column("rendered_content")],
                   and_(column("user_profile_id") == literal(user_profile.id),
                        column("message_id").in_(msg_ids)),
                   join(table("zerver_usermessage"), table("zerver_message"),
                        literal_column("zerver_usermessage.message_id") ==
                        literal_column("zerver_message.id")))

    builder = NarrowBuilder(user_profile, column("message_id"))
    for term in narrow:
        query = builder.add_term(query, term)

    sa_conn = get_sqlalchemy_connection()
    query_result = list(sa_conn.execute(query).fetchall())

    search_fields = dict()
    for row in query_result:
        (message_id, subject, rendered_content, content_matches, subject_matches) = row
        search_fields[message_id] = get_search_fields(rendered_content, subject,
                                                      content_matches, subject_matches)

    return json_success({"messages": search_fields})
开发者ID:souravbadami,项目名称:zulip,代码行数:33,代码来源:messages.py


示例16: test_check_dict

    def test_check_dict(self):
        keys = [
            ('names', check_list(check_string)),
            ('city', check_string),
        ]

        x = {
            'names': ['alice', 'bob'],
            'city': 'Boston',
        }
        error = check_dict(keys)('x', x)
        self.assertEqual(error, None)

        x = 999
        error = check_dict(keys)('x', x)
        self.assertEqual(error, 'x is not a dict')

        x = {}
        error = check_dict(keys)('x', x)
        self.assertEqual(error, 'names key is missing from x')

        x = {
            'names': ['alice', 'bob', {}]
        }
        error = check_dict(keys)('x', x)
        self.assertEqual(error, 'x["names"][2] is not a string')

        x = {
            'names': ['alice', 'bob'],
            'city': 5
        }
        error = check_dict(keys)('x', x)
        self.assertEqual(error, 'x["city"] is not a string')
开发者ID:anindya,项目名称:zulip,代码行数:33,代码来源:test_decorators.py


示例17: test_check_list

    def test_check_list(self):
        x = 999
        error = check_list(check_string)('x', x)
        self.assertEqual(error, 'x is not a list')

        x = ["hello", 5]
        error = check_list(check_string)('x', x)
        self.assertEqual(error, 'x[1] is not a string')

        x = [["yo"], ["hello", "goodbye", 5]]
        error = check_list(check_list(check_string))('x', x)
        self.assertEqual(error, 'x[1][2] is not a string')

        x = ["hello", "goodbye", "hello again"]
        error = check_list(check_string, length=2)('x', x)
        self.assertEqual(error, 'x should have exactly 2 items')
开发者ID:anindya,项目名称:zulip,代码行数:16,代码来源:test_decorators.py


示例18: remove_storage

def remove_storage(request, user_profile, keys=REQ(validator=check_list(check_string), default=None)):
    # type: (HttpRequest, UserProfile, Optional[List[str]]) -> HttpResponse
    keys = keys or get_keys_in_bot_storage(user_profile)
    try:
        remove_bot_storage(user_profile, keys)
    except StateError as e:
        return json_error(str(e))
    return json_success()
开发者ID:joydeep1701,项目名称:zulip,代码行数:8,代码来源:storage.py


示例19: create_default_stream_group

def create_default_stream_group(request: HttpRequest, user_profile: UserProfile,
                                group_name: Text=REQ(), description: Text=REQ(),
                                stream_names: List[Text]=REQ(validator=check_list(check_string))) -> None:
    streams = []
    for stream_name in stream_names:
        (stream, recipient, sub) = access_stream_by_name(user_profile, stream_name)
        streams.append(stream)
    do_create_default_stream_group(user_profile.realm, group_name, description, streams)
    return json_success()
开发者ID:gnprice,项目名称:zulip,代码行数:9,代码来源:streams.py


示例20: update_message_flags

def update_message_flags(request, user_profile,
                      messages=REQ('messages', validator=check_list(check_int)),
                      operation=REQ('op'), flag=REQ('flag'),
                      all=REQ('all', validator=check_bool, default=False)):
    request._log_data["extra"] = "[%s %s]" % (operation, flag)
    do_update_message_flags(user_profile, operation, flag, messages, all)
    return json_success({'result': 'success',
                         'messages': messages,
                         'msg': ''})
开发者ID:danshev,项目名称:zulip,代码行数:9,代码来源:messages.py



注:本文中的zerver.lib.validator.check_list函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python validator.equals函数代码示例发布时间:2022-05-26
下一篇:
Python validator.check_dict函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap