本文整理汇总了Python中zerver.lib.validator.check_dict函数的典型用法代码示例。如果您正苦于以下问题:Python check_dict函数的具体用法?Python check_dict怎么用?Python check_dict使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了check_dict函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: api_newrelic_webhook
def api_newrelic_webhook(request, user_profile, alert=REQ(validator=check_dict([]), default=None),
deployment=REQ(validator=check_dict([]), default=None)):
try:
stream = request.GET['stream']
except (AttributeError, KeyError):
return json_error("Missing stream parameter.")
if alert:
# Use the message as the subject because it stays the same for
# "opened", "acknowledged", and "closed" messages that should be
# grouped.
subject = alert['message']
content = "%(long_description)s\n[View alert](%(alert_url)s)" % (alert)
elif deployment:
subject = "%s deploy" % (deployment['application_name'])
content = """`%(revision)s` deployed by **%(deployed_by)s**
%(description)s
%(changelog)s""" % (deployment)
else:
return json_error("Unknown webhook request")
check_send_message(user_profile, get_client("ZulipNewRelicWebhook"), "stream",
[stream], subject, content)
return json_success()
开发者ID:anindya,项目名称:zulip,代码行数:25,代码来源:webhooks.py
示例2: test_register_events
def test_register_events(self):
realm_user_add_checker = check_dict([
('type', equals('realm_user')),
('op', equals('add')),
('person', check_dict([
('email', check_string),
('full_name', check_string),
('is_admin', check_bool),
('is_bot', check_bool),
])),
])
stream_create_checker = check_dict([
('type', equals('stream')),
('op', equals('create')),
('streams', check_list(check_dict([
('description', check_string),
('invite_only', check_bool),
('name', check_string),
('stream_id', check_int),
])))
])
events = self.do_test(lambda: self.register("test1", "test1"))
error = realm_user_add_checker('events[0]', events[0])
self.assert_on_error(error)
error = stream_create_checker('events[1]', events[1])
self.assert_on_error(error)
开发者ID:seanly,项目名称:zulip,代码行数:27,代码来源:test_events.py
示例3: test_check_dict
def test_check_dict(self):
keys = [
('names', check_list(check_string)),
('city', check_string),
]
x = {
'names': ['alice', 'bob'],
'city': 'Boston',
}
error = check_dict(keys)('x', x)
self.assertEqual(error, None)
x = 999
error = check_dict(keys)('x', x)
self.assertEqual(error, 'x is not a dict')
x = {}
error = check_dict(keys)('x', x)
self.assertEqual(error, 'names key is missing from x')
x = {
'names': ['alice', 'bob', {}]
}
error = check_dict(keys)('x', x)
self.assertEqual(error, 'x["names"][2] is not a string')
x = {
'names': ['alice', 'bob'],
'city': 5
}
error = check_dict(keys)('x', x)
self.assertEqual(error, 'x["city"] is not a string')
开发者ID:anindya,项目名称:zulip,代码行数:33,代码来源:test_decorators.py
示例4: test_rename_stream
def test_rename_stream(self):
realm = get_realm('zulip.com')
stream, _ = create_stream_if_needed(realm, 'old_name')
new_name = u'stream with a brand new name'
self.subscribe_to_stream(self.user_profile.email, stream.name)
action = lambda: do_rename_stream(realm, stream.name, new_name)
events = self.do_test(action)
schema_checker = check_dict([
('type', equals('stream')),
('op', equals('update')),
('property', equals('email_address')),
('value', check_string),
('name', equals('old_name')),
])
error = schema_checker('events[0]', events[0])
self.assert_on_error(error)
schema_checker = check_dict([
('type', equals('stream')),
('op', equals('update')),
('property', equals('name')),
('value', equals(new_name)),
('name', equals('old_name')),
])
error = schema_checker('events[1]', events[1])
self.assert_on_error(error)
开发者ID:seanly,项目名称:zulip,代码行数:28,代码来源:test_events.py
示例5: do_rest_call
def do_rest_call(rest_operation: Dict[str, Any],
request_data: Optional[Dict[str, Any]],
event: Dict[str, Any],
service_handler: Any,
timeout: Any=None) -> None:
rest_operation_validator = check_dict([
('method', check_string),
('relative_url_path', check_string),
('request_kwargs', check_dict([])),
('base_url', check_string),
])
error = rest_operation_validator('rest_operation', rest_operation)
if error:
raise JsonableError(error)
http_method = rest_operation['method']
final_url = urllib.parse.urljoin(rest_operation['base_url'], rest_operation['relative_url_path'])
request_kwargs = rest_operation['request_kwargs']
request_kwargs['timeout'] = timeout
try:
response = requests.request(http_method, final_url, data=request_data, **request_kwargs)
if str(response.status_code).startswith('2'):
response_message = service_handler.process_success(response, event)
if response_message is not None:
succeed_with_message(event, response_message)
else:
logging.warning("Message %(message_url)s triggered an outgoing webhook, returning status "
"code %(status_code)s.\n Content of response (in quotes): \""
"%(response)s\""
% {'message_url': get_message_url(event, request_data),
'status_code': response.status_code,
'response': response.content})
failure_message = "Third party responded with %d" % (response.status_code)
fail_with_message(event, failure_message)
notify_bot_owner(event, request_data, response.status_code, response.content)
except requests.exceptions.Timeout as e:
logging.info("Trigger event %s on %s timed out. Retrying" % (
event["command"], event['service_name']))
request_retry(event, request_data, 'Unable to connect with the third party.', exception=e)
except requests.exceptions.ConnectionError as e:
response_message = ("The message `%s` resulted in a connection error when "
"sending a request to an outgoing "
"webhook! See the Zulip server logs for more information." % (event["command"],))
logging.info("Trigger event %s on %s resulted in a connection error. Retrying"
% (event["command"], event['service_name']))
request_retry(event, request_data, response_message, exception=e)
except requests.exceptions.RequestException as e:
response_message = ("An exception of type *%s* occurred for message `%s`! "
"See the Zulip server logs for more information." % (
type(e).__name__, event["command"],))
logging.exception("Outhook trigger failed:\n %s" % (e,))
fail_with_message(event, response_message)
notify_bot_owner(event, request_data, exception=e)
开发者ID:gnprice,项目名称:zulip,代码行数:58,代码来源:outgoing_webhook.py
示例6: realm_bot_schema
def realm_bot_schema(self, field_name, check):
return check_dict([
('type', equals('realm_bot')),
('op', equals('update')),
('bot', check_dict([
('email', check_string),
(field_name, check),
])),
])
开发者ID:seanly,项目名称:zulip,代码行数:9,代码来源:test_events.py
示例7: test_send_message_events
def test_send_message_events(self):
schema_checker = check_dict([
('type', equals('message')),
('flags', check_list(None)),
('message', check_dict([
('avatar_url', check_string),
('client', check_string),
('content', check_string),
('content_type', equals('text/html')),
('display_recipient', check_string),
('gravatar_hash', check_string),
('id', check_int),
('recipient_id', check_int),
('sender_domain', check_string),
('sender_email', check_string),
('sender_full_name', check_string),
('sender_id', check_int),
('sender_short_name', check_string),
('subject', check_string),
('subject_links', check_list(None)),
('timestamp', check_int),
('type', check_string),
])),
])
events = self.do_test(lambda: self.send_message("[email protected]", "Verona", Recipient.STREAM, "hello"))
error = schema_checker('events[0]', events[0])
self.assert_on_error(error)
schema_checker = check_dict([
('type', equals('update_message')),
('flags', check_list(None)),
('content', check_string),
('edit_timestamp', check_int),
('flags', check_list(None)),
('message_id', check_int),
('message_ids', check_list(check_int)),
('orig_content', check_string),
('orig_rendered_content', check_string),
('orig_subject', check_string),
('propagate_mode', check_string),
('rendered_content', check_string),
('sender', check_string),
('stream_id', check_int),
('subject', check_string),
('subject_links', check_list(None)),
# There is also a timestamp field in the event, but we ignore it, as
# it's kind of an unwanted but harmless side effect of calling log_event.
])
message_id = Message.objects.order_by('-id')[0].id
topic = 'new_topic'
propagate_mode = 'change_all'
content = 'new content'
events = self.do_test(lambda: do_update_message(self.user_profile, message_id, topic, propagate_mode, content))
error = schema_checker('events[0]', events[0])
self.assert_on_error(error)
开发者ID:seanly,项目名称:zulip,代码行数:56,代码来源:test_events.py
示例8: test_change_full_name
def test_change_full_name(self):
schema_checker = check_dict([
('type', equals('realm_user')),
('op', equals('update')),
('person', check_dict([
('email', check_string),
('full_name', check_string),
])),
])
events = self.do_test(lambda: do_change_full_name(self.user_profile, 'Sir Hamlet'))
error = schema_checker('events[0]', events[0])
self.assert_on_error(error)
开发者ID:seanly,项目名称:zulip,代码行数:12,代码来源:test_events.py
示例9: test_do_deactivate_user
def test_do_deactivate_user(self):
bot_deactivate_checker = check_dict([
('type', equals('realm_bot')),
('op', equals('remove')),
('bot', check_dict([
('email', check_string),
('full_name', check_string),
])),
])
bot = self.create_bot('[email protected]')
action = lambda: do_deactivate_user(bot)
events = self.do_test(action)
error = bot_deactivate_checker('events[1]', events[1])
self.assert_on_error(error)
开发者ID:seanly,项目名称:zulip,代码行数:14,代码来源:test_events.py
示例10: test_realm_emoji_events
def test_realm_emoji_events(self):
schema_checker = check_dict([
('type', equals('realm_emoji')),
('op', equals('update')),
('realm_emoji', check_dict([])),
])
events = self.do_test(lambda: do_add_realm_emoji(get_realm("zulip.com"), "my_emoji",
"https://realm.com/my_emoji"))
error = schema_checker('events[0]', events[0])
self.assert_on_error(error)
events = self.do_test(lambda: do_remove_realm_emoji(get_realm("zulip.com"), "my_emoji"))
error = schema_checker('events[0]', events[0])
self.assert_on_error(error)
开发者ID:seanly,项目名称:zulip,代码行数:14,代码来源:test_events.py
示例11: test_check_dict
def test_check_dict(self):
# type: () -> None
keys = [
('names', check_list(check_string)),
('city', check_string),
] # type: List[Tuple[str, Validator]]
x = {
'names': ['alice', 'bob'],
'city': 'Boston',
} # type: Any
error = check_dict(keys)('x', x)
self.assertEqual(error, None)
x = 999
error = check_dict(keys)('x', x)
self.assertEqual(error, 'x is not a dict')
x = {}
error = check_dict(keys)('x', x)
self.assertEqual(error, 'names key is missing from x')
x = {
'names': ['alice', 'bob', {}]
}
error = check_dict(keys)('x', x)
self.assertEqual(error, 'x["names"][2] is not a string')
x = {
'names': ['alice', 'bob'],
'city': 5
}
error = check_dict(keys)('x', x)
self.assertEqual(error, 'x["city"] is not a string')
# test dict_only
x = {
'names': ['alice', 'bob'],
'city': 'Boston',
}
error = check_dict_only(keys)('x', x)
self.assertEqual(error, None)
x = {
'names': ['alice', 'bob'],
'city': 'Boston',
'state': 'Massachusetts',
}
error = check_dict_only(keys)('x', x)
self.assertEqual(error, 'Unexpected arguments: state')
开发者ID:brockwhittaker,项目名称:zulip,代码行数:50,代码来源:test_decorators.py
示例12: test_change_is_admin
def test_change_is_admin(self):
schema_checker = check_dict([
('type', equals('realm_user')),
('op', equals('update')),
('person', check_dict([
('email', check_string),
('is_admin', check_bool),
])),
])
# The first False is probably a noop, then we get transitions in both directions.
for is_admin in [False, True, False]:
events = self.do_test(lambda: do_change_is_admin(self.user_profile, is_admin))
error = schema_checker('events[0]', events[0])
self.assert_on_error(error)
开发者ID:seanly,项目名称:zulip,代码行数:14,代码来源:test_events.py
示例13: api_travis_webhook
def api_travis_webhook(request: HttpRequest, user_profile: UserProfile,
ignore_pull_requests: bool = REQ(validator=check_bool, default=True),
message: Dict[str, str]=REQ('payload', validator=check_dict([
('author_name', check_string),
('status_message', check_string),
('compare_url', check_string),
]))) -> HttpResponse:
message_status = message['status_message']
if ignore_pull_requests and message['type'] == 'pull_request':
return json_success()
if message_status in GOOD_STATUSES:
emoji = ':thumbs_up:'
elif message_status in BAD_STATUSES:
emoji = ':thumbs_down:'
else:
emoji = "(No emoji specified for status '{}'.)".format(message_status)
body = MESSAGE_TEMPLATE.format(
message['author_name'],
message_status,
emoji,
message['compare_url'],
message['build_url']
)
topic = 'builds'
check_send_webhook_message(request, user_profile, topic, body)
return json_success()
开发者ID:BakerWang,项目名称:zulip,代码行数:30,代码来源:view.py
示例14: api_bitbucket_webhook
def api_bitbucket_webhook(request: HttpRequest, user_profile: UserProfile,
payload: Mapping[str, Any]=REQ(validator=check_dict([])),
branches: Optional[str]=REQ(default=None)) -> HttpResponse:
repository = payload['repository']
commits = [
{
'name': payload.get('user'),
'sha': commit.get('raw_node'),
'message': commit.get('message'),
'url': u'{}{}commits/{}'.format(
payload.get('canon_url'),
repository.get('absolute_url'),
commit.get('raw_node'))
}
for commit in payload['commits']
]
if len(commits) == 0:
# Bitbucket doesn't give us enough information to really give
# a useful message :/
subject = repository['name']
content = (u"%s [force pushed](%s)"
% (payload['user'],
payload['canon_url'] + repository['absolute_url']))
else:
branch = payload['commits'][-1]['branch']
if branches is not None and branches.find(branch) == -1:
return json_success()
content = get_push_commits_event_message(payload['user'], None, branch, commits)
subject = SUBJECT_WITH_BRANCH_TEMPLATE.format(repo=repository['name'], branch=branch)
check_send_webhook_message(request, user_profile, subject, content)
return json_success()
开发者ID:284928489,项目名称:zulip,代码行数:34,代码来源:view.py
示例15: update_user_custom_profile_data
def update_user_custom_profile_data(
request: HttpRequest,
user_profile: UserProfile,
data: List[Dict[str, Union[int, str]]]=REQ(validator=check_list(
check_dict([('id', check_int)])))) -> HttpResponse:
for item in data:
field_id = item['id']
try:
field = CustomProfileField.objects.get(id=field_id)
except CustomProfileField.DoesNotExist:
return json_error(_('Field id {id} not found.').format(id=field_id))
validators = CustomProfileField.FIELD_VALIDATORS
extended_validators = CustomProfileField.EXTENDED_FIELD_VALIDATORS
field_type = field.field_type
value = item['value']
var_name = '{}'.format(field.name)
if field_type in validators:
validator = validators[field_type]
result = validator(var_name, value)
else:
# Check extended validators.
extended_validator = extended_validators[field_type]
field_data = field.field_data
result = extended_validator(var_name, field_data, value)
if result is not None:
return json_error(result)
do_update_user_custom_profile_data(user_profile, data)
# We need to call this explicitly otherwise constraints are not check
return json_success()
开发者ID:umairwaheed,项目名称:zulip,代码行数:32,代码来源:custom_profile_fields.py
示例16: api_travis_webhook
def api_travis_webhook(request, user_profile, client,
stream=REQ(default='travis'),
topic=REQ(default=None),
ignore_pull_requests=REQ(validator=check_bool, default=True),
message=REQ('payload', validator=check_dict([
('author_name', check_string),
('status_message', check_string),
('compare_url', check_string),
]))):
# type: (HttpRequest, UserProfile, Client, str, str, str, Dict[str, str]) -> HttpResponse
message_status = message['status_message']
if ignore_pull_requests and message['type'] == 'pull_request':
return json_success()
if message_status in GOOD_STATUSES:
emoji = ':thumbsup:'
elif message_status in BAD_STATUSES:
emoji = ':thumbsdown:'
else:
emoji = "(No emoji specified for status '{}'.)".format(message_status)
body = MESSAGE_TEMPLATE.format(
message['author_name'],
message_status,
emoji,
message['compare_url'],
message['build_url']
)
check_send_message(user_profile, client, 'stream', [stream], topic, body)
return json_success()
开发者ID:aakash-cr7,项目名称:zulip,代码行数:32,代码来源:view.py
示例17: api_bitbucket_webhook
def api_bitbucket_webhook(request, user_profile, payload=REQ(validator=check_dict([])),
stream=REQ(default='commits')):
# type: (HttpRequest, UserProfile, Dict[str, Any], str) -> None
repository = payload['repository']
commits = [{'id': commit['raw_node'], 'message': commit['message'],
'url': '%s%scommits/%s' % (payload['canon_url'],
repository['absolute_url'],
commit['raw_node'])}
for commit in payload['commits']]
subject = repository['name']
if len(commits) == 0:
# Bitbucket doesn't give us enough information to really give
# a useful message :/
content = ("%s [force pushed](%s)"
% (payload['user'],
payload['canon_url'] + repository['absolute_url']))
else:
branch = payload['commits'][-1]['branch']
content = build_commit_list_content(commits, branch, None, payload['user'])
subject += '/%s' % (branch,)
check_send_message(user_profile, get_client("ZulipBitBucketWebhook"), "stream",
[stream], subject, content)
return json_success()
开发者ID:MarkCupitt,项目名称:zulip,代码行数:25,代码来源:bitbucket.py
示例18: report_error
def report_error(request: HttpRequest, user_profile: UserProfile, message: str=REQ(),
stacktrace: str=REQ(), ui_message: bool=REQ(validator=check_bool),
user_agent: str=REQ(), href: str=REQ(), log: str=REQ(),
more_info: Optional[Dict[str, Any]]=REQ(validator=check_dict([]), default=None)
) -> HttpResponse:
"""Accepts an error report and stores in a queue for processing. The
actual error reports are later handled by do_report_error (below)"""
if not settings.BROWSER_ERROR_REPORTING:
return json_success()
if more_info is None:
more_info = {}
js_source_map = get_js_source_map()
if js_source_map:
stacktrace = js_source_map.annotate_stacktrace(stacktrace)
try:
version = subprocess.check_output(["git", "log", "HEAD^..HEAD", "--oneline"],
universal_newlines=True) # type: Optional[str]
except Exception:
version = None
# Get the IP address of the request
remote_ip = request.META.get('HTTP_X_REAL_IP')
if remote_ip is None:
remote_ip = request.META['REMOTE_ADDR']
# For the privacy of our users, we remove any actual text content
# in draft_content (from drafts rendering exceptions). See the
# comment on privacy_clean_markdown for more details.
if more_info.get('draft_content'):
more_info['draft_content'] = privacy_clean_markdown(more_info['draft_content'])
if user_profile.is_authenticated:
email = user_profile.delivery_email
full_name = user_profile.full_name
else:
email = "[email protected]"
full_name = "Anonymous User"
queue_json_publish('error_reports', dict(
type = "browser",
report = dict(
host = request.get_host().split(":")[0],
ip_address = remote_ip,
user_email = email,
user_full_name = full_name,
user_visible = ui_message,
server_path = settings.DEPLOY_ROOT,
version = version,
user_agent = user_agent,
href = href,
message = message,
stacktrace = stacktrace,
log = log,
more_info = more_info,
)
))
return json_success()
开发者ID:akashnimare,项目名称:zulip,代码行数:60,代码来源:report.py
示例19: update_storage
def update_storage(request: HttpRequest, user_profile: UserProfile,
storage: Dict[str, str]=REQ(validator=check_dict([]))) -> HttpResponse:
try:
set_bot_storage(user_profile, list(storage.items()))
except StateError as e:
return json_error(str(e))
return json_success()
开发者ID:284928489,项目名称:zulip,代码行数:7,代码来源:storage.py
示例20: api_beanstalk_webhook
def api_beanstalk_webhook(request, user_profile,
payload=REQ(validator=check_dict([]))):
# Beanstalk supports both SVN and git repositories
# We distinguish between the two by checking for a
# 'uri' key that is only present for git repos
git_repo = 'uri' in payload
if git_repo:
# To get a linkable url,
subject, content = build_message_from_gitlog(user_profile, payload['repository']['name'],
payload['ref'], payload['commits'],
payload['before'], payload['after'],
payload['repository']['url'],
payload['pusher_name'])
else:
author = payload.get('author_full_name')
url = payload.get('changeset_url')
revision = payload.get('revision')
(short_commit_msg, _, _) = payload.get('message').partition("\n")
subject = "svn r%s" % (revision,)
content = "%s pushed [revision %s](%s):\n\n> %s" % (author, revision, url, short_commit_msg)
check_send_message(user_profile, get_client("ZulipBeanstalkWebhook"), "stream",
["commits"], subject, content)
return json_success()
开发者ID:anindya,项目名称:zulip,代码行数:25,代码来源:webhooks.py
注:本文中的zerver.lib.validator.check_dict函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论