本文整理汇总了Python中requests.structures.CaseInsensitiveDict类的典型用法代码示例。如果您正苦于以下问题:Python CaseInsensitiveDict类的具体用法?Python CaseInsensitiveDict怎么用?Python CaseInsensitiveDict使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了CaseInsensitiveDict类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: assert_request_equal
def assert_request_equal(self, expected, real_request):
method, path = expected[:2]
if urlparse(path).scheme:
match_path = real_request['full_path']
else:
match_path = real_request['path']
self.assertEqual((method, path), (real_request['method'],
match_path))
if len(expected) > 2:
body = expected[2]
real_request['expected'] = body
err_msg = 'Body mismatch for %(method)s %(path)s, ' \
'expected %(expected)r, and got %(body)r' % real_request
self.orig_assertEqual(body, real_request['body'], err_msg)
if len(expected) > 3:
headers = CaseInsensitiveDict(expected[3])
for key, value in headers.items():
real_request['key'] = key
real_request['expected_value'] = value
real_request['value'] = real_request['headers'].get(key)
err_msg = (
'Header mismatch on %(key)r, '
'expected %(expected_value)r and got %(value)r '
'for %(method)s %(path)s %(headers)r' % real_request)
self.orig_assertEqual(value, real_request['value'],
err_msg)
real_request['extra_headers'] = dict(
(key, value) for key, value in real_request['headers'].items()
if key not in headers)
if real_request['extra_headers']:
self.fail('Received unexpected headers for %(method)s '
'%(path)s, got %(extra_headers)r' % real_request)
开发者ID:wesleykendall,项目名称:python-swiftclient,代码行数:33,代码来源:utils.py
示例2: _check_creds
def _check_creds(self, creds):
d = CaseInsensitiveDict()
if isinstance(creds, dict):
d.update(creds)
elif isinstance(creds, basestring):
if os.path.exists(creds):
creds = file(creds, "r").read()
for line in creds.splitlines():
if ":" in line:
k, v = line.split(":", 1)
d[k.strip()] = v.strip()
else:
raise TypeError("unsupported type for credentials data")
if "companyId" not in d and "CID" in d:
d["companyId"] = d["CID"]
if "companyId" in d and not "psk" in d:
raise ValueError("psk is required when companyId is provided")
elif "psk" in d and not "companyId" in d:
raise ValueError("companyId is required when psk is provided")
elif "companyId" in d and "psk" in d:
return {"companyId": int(d["companyId"]), "psk": str(d["psk"])}
elif "loginSessionId" in d and not "profileId" in d:
raise ValueError("profileId is required when loginSessionId is " "provided")
elif "profileId" in d and not "loginSessionId" in d:
raise ValueError("loginSessionId is required when profileId is " "provided")
elif "loginSessionId" in d and "profileId" in d:
return {"loginSessionId": str(d["loginSessionId"]), "profileId": int(d["profileId"])}
else:
raise ValueError("either companyId+psk or " "loginSessionId+profileId must be provided")
开发者ID:ninemoreminutes,项目名称:lmiapi,代码行数:29,代码来源:v1.py
示例3: send_document
def send_document(url, data, timeout=10, *args, **kwargs):
"""Helper method to send a document via POST.
Additional ``*args`` and ``**kwargs`` will be passed on to ``requests.post``.
:arg url: Full url to send to, including protocol
:arg data: POST data to send (dict)
:arg timeout: Seconds to wait for response (defaults to 10)
:returns: Tuple of status code (int or None) and error (exception class instance or None)
"""
logger.debug("send_document: url=%s, data=%s, timeout=%s", url, data, timeout)
headers = CaseInsensitiveDict({
'User-Agent': USER_AGENT,
})
if "headers" in kwargs:
# Update from kwargs
headers.update(kwargs.get("headers"))
kwargs.update({
"data": data, "timeout": timeout, "headers": headers
})
try:
response = requests.post(url, *args, **kwargs)
logger.debug("send_document: response status code %s", response.status_code)
return response.status_code, None
except RequestException as ex:
logger.debug("send_document: exception %s", ex)
return None, ex
开发者ID:jaywink,项目名称:social-federation,代码行数:27,代码来源:network.py
示例4: assert_request_equal
def assert_request_equal(self, expected, real_request):
method, path = expected[:2]
if urlparse(path).scheme:
match_path = real_request["full_path"]
else:
match_path = real_request["path"]
self.assertEqual((method, path), (real_request["method"], match_path))
if len(expected) > 2:
body = expected[2]
real_request["expected"] = body
err_msg = "Body mismatch for %(method)s %(path)s, " "expected %(expected)r, and got %(body)r" % real_request
self.orig_assertEqual(body, real_request["body"], err_msg)
if len(expected) > 3:
headers = CaseInsensitiveDict(expected[3])
for key, value in headers.items():
real_request["key"] = key
real_request["expected_value"] = value
real_request["value"] = real_request["headers"].get(key)
err_msg = (
"Header mismatch on %(key)r, "
"expected %(expected_value)r and got %(value)r "
"for %(method)s %(path)s %(headers)r" % real_request
)
self.orig_assertEqual(value, real_request["value"], err_msg)
real_request["extra_headers"] = dict(
(key, value) for key, value in real_request["headers"].items() if key not in headers
)
if real_request["extra_headers"]:
self.fail(
"Received unexpected headers for %(method)s " "%(path)s, got %(extra_headers)r" % real_request
)
开发者ID:tipabu,项目名称:python-swiftclient,代码行数:32,代码来源:utils.py
示例5: DDWRT
class DDWRT(Router):
def __init__(self, conf, hostnames):
self.hostnames = CaseInsensitiveDict()
self.hostnames.update(hostnames)
self.conf = conf
self.auth = self.conf.auth()
def clients(self):
""" Receives all currently logged in users in a wifi network.
:rtype : list
:return: Returns a list of dicts, containing the following keys: mac, ipv4, seen, hostname
"""
clients = self._get_clients_raw()
clients_json = []
for client in clients:
client_hostname_from_router = client[0]
client_ipv4 = client[1].strip()
client_mac = client[2].strip().upper()
client_hostname = self.hostnames.get(client_mac, client_hostname_from_router).strip()
client_connections = int(client[3].strip())
# Clients with less than 20 connections are considered offline
if client_connections < 20:
continue
clients_json.append({
'mac': client_mac,
'ipv4': client_ipv4,
'seen': int(time.time()),
'hostname': client_hostname,
})
logger.debug('The router got us {} clients.'.format(len(clients_json)))
logger.debug(str(clients_json))
return clients_json
def _get_clients_raw(self):
info_page = self.conf.internal()
response = requests.get(info_page, auth=self.auth)
logger.info('Got response from router with code {}.'.format(response.status_code))
return DDWRT._convert_to_clients(response.text) or []
@staticmethod
def _convert_to_clients(router_info_all):
# Split router info in lines and filter empty info
router_info_lines = filter(None, router_info_all.split("\n"))
# Get key / value of router info
router_info_items = dict()
for item in router_info_lines:
key, value = item[1:-1].split("::") # Remove curly braces and split
router_info_items[key.strip()] = value.strip()
# Get client info as a list
arp_table = utils.groupn(router_info_items['arp_table'].replace("'", "").split(","), 4)
dhcp_leases = utils.groupn(router_info_items['dhcp_leases'].replace("'", "").split(","), 5)
return arp_table if (len(arp_table) > 0) else []
开发者ID:JonasGroeger,项目名称:labspion,代码行数:60,代码来源:ddwrt.py
示例6: prepare_response
def prepare_response(self, cached):
"""Verify our vary headers match and construct a real urllib3
HTTPResponse object.
"""
# Special case the '*' Vary value as it means we cannot actually
# determine if the cached response is suitable for this request.
if "*" in cached.get("vary", {}):
return
body_raw = cached["response"].pop("body")
headers = CaseInsensitiveDict(data=cached['response']['headers'])
if headers.get('transfer-encoding', '') == 'chunked':
headers.pop('transfer-encoding')
cached['response']['headers'] = headers
try:
body = io.BytesIO(body_raw)
except TypeError:
# This can happen if cachecontrol serialized to v1 format (pickle)
# using Python 2. A Python 2 str(byte string) will be unpickled as
# a Python 3 str (unicode string), which will cause the above to
# fail with:
#
# TypeError: 'str' does not support the buffer interface
body = io.BytesIO(body_raw.encode('utf8'))
return HTTPResponse(
body=body,
preload_content=False,
**cached["response"]
)
开发者ID:RyPeck,项目名称:cbapi-python,代码行数:33,代码来源:serialize.py
示例7: Attachment
class Attachment(object):
def __init__(self, part):
encoding = part.encoding or 'utf-8'
self.headers = CaseInsensitiveDict({
k.decode(encoding): v.decode(encoding)
for k, v in part.headers.items()
})
self.content_type = self.headers.get('Content-Type', None)
self.content_id = self.headers.get('Content-ID', None)
self.content_location = self.headers.get('Content-Location', None)
self._part = part
def __repr__(self):
return '<Attachment(%r, %r)>' % (self.content_id, self.content_type)
@cached_property
def content(self):
"""Return the content of the attachment
:rtype: bytes or str
"""
encoding = self.headers.get('Content-Transfer-Encoding', None)
content = self._part.content
if encoding == 'base64':
return base64.b64decode(content)
elif encoding == 'binary':
return content.strip(b'\r\n')
else:
return content
开发者ID:ovnicraft,项目名称:python-zeep,代码行数:31,代码来源:attachments.py
示例8: MockResponse
class MockResponse(object):
"""
Mock response object with a status code and some content
"""
def __init__(self, status_code, content=None, headers=None):
self.status_code = status_code
self.content = content or ''
self.headers = CaseInsensitiveDict()
if headers:
self.headers.update(headers)
def raise_for_status(self):
http_error_msg = ''
if 400 <= self.status_code < 500:
http_error_msg = '%s Client Error: ...' % self.status_code
elif 500 <= self.status_code < 600:
http_error_msg = '%s Server Error: ...' % self.status_code
if http_error_msg:
raise requests.HTTPError(http_error_msg, response=self)
def json(self, **kwargs):
return json.loads(self.content)
开发者ID:palanisamyprps,项目名称:rapidpro-python,代码行数:26,代码来源:tests.py
示例9: test_get
def test_get(self):
cid = CaseInsensitiveDict()
cid["spam"] = "oneval"
cid["SPAM"] = "blueval"
self.assertEqual(cid.get("spam"), "blueval")
self.assertEqual(cid.get("SPAM"), "blueval")
self.assertEqual(cid.get("sPam"), "blueval")
self.assertEqual(cid.get("notspam", "default"), "default")
开发者ID:kprantis,项目名称:requests,代码行数:8,代码来源:test_requests.py
示例10: test_preserve_last_key_case
def test_preserve_last_key_case(self):
cid = CaseInsensitiveDict({"Accept": "application/json", "user-Agent": "requests"})
cid.update({"ACCEPT": "application/json"})
cid["USER-AGENT"] = "requests"
keyset = frozenset(["ACCEPT", "USER-AGENT"])
assert frozenset(i[0] for i in cid.items()) == keyset
assert frozenset(cid.keys()) == keyset
assert frozenset(cid) == keyset
开发者ID:RRedwards,项目名称:requests,代码行数:8,代码来源:test_requests.py
示例11: test_lower_items
def test_lower_items(self):
cid = CaseInsensitiveDict({
'Accept': 'application/json',
'user-Agent': 'requests',
})
keyset = frozenset(lowerkey for lowerkey, v in cid.lower_items())
lowerkeyset = frozenset(['accept', 'user-agent'])
assert keyset == lowerkeyset
开发者ID:5x5x5x5,项目名称:try_git,代码行数:8,代码来源:test_requests.py
示例12: test_get
def test_get(self):
cid = CaseInsensitiveDict()
cid["spam"] = "oneval"
cid["SPAM"] = "blueval"
assert cid.get("spam") == "blueval"
assert cid.get("SPAM") == "blueval"
assert cid.get("sPam") == "blueval"
assert cid.get("notspam", "default") == "default"
开发者ID:RRedwards,项目名称:requests,代码行数:8,代码来源:test_requests.py
示例13: set_extra_headers
def set_extra_headers(self, headers):
header_dict = CaseInsensitiveDict(headers)
if 'Reply-To' in header_dict:
self.data["ReplyTo"] = header_dict.pop('Reply-To')
self.data["Headers"] = [
{"Name": key, "Value": value}
for key, value in header_dict.items()
]
开发者ID:c-mrf,项目名称:django-anymail,代码行数:8,代码来源:postmark.py
示例14: test_preserve_key_case
def test_preserve_key_case(self):
cid = CaseInsensitiveDict({
'Accept': 'application/json',
'user-Agent': 'requests',
})
keyset = frozenset(['Accept', 'user-Agent'])
assert frozenset(i[0] for i in cid.items()) == keyset
assert frozenset(cid.keys()) == keyset
assert frozenset(cid) == keyset
开发者ID:5x5x5x5,项目名称:try_git,代码行数:9,代码来源:test_requests.py
示例15: test_copy
def test_copy(self):
cid = CaseInsensitiveDict({
'Accept': 'application/json',
'user-Agent': 'requests',
})
cid_copy = cid.copy()
assert cid == cid_copy
cid['changed'] = True
assert cid != cid_copy
开发者ID:503945930,项目名称:requests,代码行数:9,代码来源:test_requests.py
示例16: test_fixes_649
def test_fixes_649(self):
"""__setitem__ should behave case-insensitively."""
cid = CaseInsensitiveDict()
cid['spam'] = 'oneval'
cid['Spam'] = 'twoval'
cid['sPAM'] = 'redval'
cid['SPAM'] = 'blueval'
assert cid['spam'] == 'blueval'
assert cid['SPAM'] == 'blueval'
assert list(cid.keys()) == ['SPAM']
开发者ID:5x5x5x5,项目名称:try_git,代码行数:10,代码来源:test_requests.py
示例17: test_setdefault
def test_setdefault(self):
cid = CaseInsensitiveDict({'Spam': 'blueval'})
self.assertEqual(
cid.setdefault('spam', 'notblueval'),
'blueval'
)
self.assertEqual(
cid.setdefault('notspam', 'notblueval'),
'notblueval'
)
开发者ID:k-mito,项目名称:requests-docs-jp,代码行数:10,代码来源:test_requests.py
示例18: request
def request(self, method, url, accept_json=False, headers=None, params=None, json=None, data=None, files=None,
**kwargs):
full_url = self.url + url
input_headers = _remove_null_values(headers) if headers else {}
headers = CaseInsensitiveDict({'user-agent': 'watson-developer-cloud-python-' + __version__})
if accept_json:
headers['accept'] = 'application/json'
headers.update(input_headers)
# Remove keys with None values
params = _remove_null_values(params)
json = _remove_null_values(json)
data = _remove_null_values(data)
files = _remove_null_values(files)
# Support versions of requests older than 2.4.2 without the json input
if not data and json is not None:
data = json_import.dumps(json)
headers.update({'content-type': 'application/json'})
auth = None
if self.username and self.password:
auth = (self.username, self.password)
if self.api_key is not None:
if params is None:
params = {}
if url.startswith('https://gateway-a.watsonplatform.net/calls'):
params['apikey'] = self.api_key
else:
params['api_key'] = self.api_key
response = requests.request(method=method, url=full_url, cookies=self.jar, auth=auth, headers=headers,
params=params, data=data, files=files, **kwargs)
if 200 <= response.status_code <= 299:
if accept_json:
response_json = response.json()
if 'status' in response_json and response_json['status'] == 'ERROR':
response.status_code = 400
error_message = 'Unknown error'
if 'statusInfo' in response_json:
error_message = response_json['statusInfo']
if error_message == 'invalid-api-key':
response.status_code = 401
raise WatsonException('Error: ' + error_message)
return response_json
return response
else:
if response.status_code == 401:
error_message = 'Unauthorized: Access is denied due to invalid credentials'
else:
error_message = self._get_error_message(response)
raise WatsonException(error_message)
开发者ID:nthuy190991,项目名称:rocsi_dev,代码行数:55,代码来源:watson_developer_cloud_service.py
示例19: test_preserve_last_key_case
def test_preserve_last_key_case(self):
cid = CaseInsensitiveDict({
'Accept': 'application/json',
'user-Agent': 'requests',
})
cid.update({'ACCEPT': 'application/json'})
cid['USER-AGENT'] = 'requests'
keyset = frozenset(['ACCEPT', 'USER-AGENT'])
assert frozenset(i[0] for i in cid.items()) == keyset
assert frozenset(cid.keys()) == keyset
assert frozenset(cid) == keyset
开发者ID:5x5x5x5,项目名称:try_git,代码行数:11,代码来源:test_requests.py
示例20: verify_signature
def verify_signature(self, query_parameters):
"""Verify the signature provided with the query parameters.
http://docs.shopify.com/api/tutorials/oauth
example usage::
from shopify_trois import Credentials
from shopify_trois.engines import Json as Shopify
from urllib.parse import parse_qsl
credentials = Credentials(
api_key='your_api_key',
scope=['read_orders'],
secret='your_app_secret'
)
shopify = Shopify(shop_name="your_store_name", credentials=\
credentials)
query_parameters = parse_qsl("code=238420989938cb70a609f6ece2e2586\
b&shop=yourstore.myshopify.com×tamp=1373382939&\
signature=6fb122e33c21851c465345b8cb97245e")
if not shopify.verify_signature(query_parameters):
raise Exception("invalid signature")
credentials.code = dict(query_parameters).get('code')
shopify.setup_access_token()
:returns: Returns True if the signature is valid.
"""
params = CaseInsensitiveDict(query_parameters)
signature = params.pop("signature", None)
calculated = ["%s=%s" % (k, v) for k, v in params.items()]
calculated.sort()
calculated = "".join(calculated)
calculated = "{secret}{calculated}".format(
secret=self.credentials.secret,
calculated=calculated
)
md5 = hashlib.md5()
md5.update(calculated.encode('utf-8'))
produced = md5.hexdigest()
return produced == signature
开发者ID:RafaAguilar,项目名称:shopify-trois,代码行数:52,代码来源:oauth_engine.py
注:本文中的requests.structures.CaseInsensitiveDict类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论