• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python structures.CaseInsensitiveDict类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中requests.structures.CaseInsensitiveDict的典型用法代码示例。如果您正苦于以下问题:Python CaseInsensitiveDict类的具体用法?Python CaseInsensitiveDict怎么用?Python CaseInsensitiveDict使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了CaseInsensitiveDict类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: assert_request_equal

    def assert_request_equal(self, expected, real_request):
        method, path = expected[:2]
        if urlparse(path).scheme:
            match_path = real_request['full_path']
        else:
            match_path = real_request['path']
        self.assertEqual((method, path), (real_request['method'],
                                          match_path))
        if len(expected) > 2:
            body = expected[2]
            real_request['expected'] = body
            err_msg = 'Body mismatch for %(method)s %(path)s, ' \
                'expected %(expected)r, and got %(body)r' % real_request
            self.orig_assertEqual(body, real_request['body'], err_msg)

        if len(expected) > 3:
            headers = CaseInsensitiveDict(expected[3])
            for key, value in headers.items():
                real_request['key'] = key
                real_request['expected_value'] = value
                real_request['value'] = real_request['headers'].get(key)
                err_msg = (
                    'Header mismatch on %(key)r, '
                    'expected %(expected_value)r and got %(value)r '
                    'for %(method)s %(path)s %(headers)r' % real_request)
                self.orig_assertEqual(value, real_request['value'],
                                      err_msg)
            real_request['extra_headers'] = dict(
                (key, value) for key, value in real_request['headers'].items()
                if key not in headers)
            if real_request['extra_headers']:
                self.fail('Received unexpected headers for %(method)s '
                          '%(path)s, got %(extra_headers)r' % real_request)
开发者ID:wesleykendall,项目名称:python-swiftclient,代码行数:33,代码来源:utils.py


示例2: _check_creds

 def _check_creds(self, creds):
     d = CaseInsensitiveDict()
     if isinstance(creds, dict):
         d.update(creds)
     elif isinstance(creds, basestring):
         if os.path.exists(creds):
             creds = file(creds, "r").read()
         for line in creds.splitlines():
             if ":" in line:
                 k, v = line.split(":", 1)
                 d[k.strip()] = v.strip()
     else:
         raise TypeError("unsupported type for credentials data")
     if "companyId" not in d and "CID" in d:
         d["companyId"] = d["CID"]
     if "companyId" in d and not "psk" in d:
         raise ValueError("psk is required when companyId is provided")
     elif "psk" in d and not "companyId" in d:
         raise ValueError("companyId is required when psk is provided")
     elif "companyId" in d and "psk" in d:
         return {"companyId": int(d["companyId"]), "psk": str(d["psk"])}
     elif "loginSessionId" in d and not "profileId" in d:
         raise ValueError("profileId is required when loginSessionId is " "provided")
     elif "profileId" in d and not "loginSessionId" in d:
         raise ValueError("loginSessionId is required when profileId is " "provided")
     elif "loginSessionId" in d and "profileId" in d:
         return {"loginSessionId": str(d["loginSessionId"]), "profileId": int(d["profileId"])}
     else:
         raise ValueError("either companyId+psk or " "loginSessionId+profileId must be provided")
开发者ID:ninemoreminutes,项目名称:lmiapi,代码行数:29,代码来源:v1.py


示例3: send_document

def send_document(url, data, timeout=10, *args, **kwargs):
    """Helper method to send a document via POST.

    Additional ``*args`` and ``**kwargs`` will be passed on to ``requests.post``.

    :arg url: Full url to send to, including protocol
    :arg data: POST data to send (dict)
    :arg timeout: Seconds to wait for response (defaults to 10)
    :returns: Tuple of status code (int or None) and error (exception class instance or None)
    """
    logger.debug("send_document: url=%s, data=%s, timeout=%s", url, data, timeout)
    headers = CaseInsensitiveDict({
        'User-Agent': USER_AGENT,
    })
    if "headers" in kwargs:
        # Update from kwargs
        headers.update(kwargs.get("headers"))
    kwargs.update({
        "data": data, "timeout": timeout, "headers": headers
    })
    try:
        response = requests.post(url, *args, **kwargs)
        logger.debug("send_document: response status code %s", response.status_code)
        return response.status_code, None
    except RequestException as ex:
        logger.debug("send_document: exception %s", ex)
        return None, ex
开发者ID:jaywink,项目名称:social-federation,代码行数:27,代码来源:network.py


示例4: assert_request_equal

    def assert_request_equal(self, expected, real_request):
        method, path = expected[:2]
        if urlparse(path).scheme:
            match_path = real_request["full_path"]
        else:
            match_path = real_request["path"]
        self.assertEqual((method, path), (real_request["method"], match_path))
        if len(expected) > 2:
            body = expected[2]
            real_request["expected"] = body
            err_msg = "Body mismatch for %(method)s %(path)s, " "expected %(expected)r, and got %(body)r" % real_request
            self.orig_assertEqual(body, real_request["body"], err_msg)

        if len(expected) > 3:
            headers = CaseInsensitiveDict(expected[3])
            for key, value in headers.items():
                real_request["key"] = key
                real_request["expected_value"] = value
                real_request["value"] = real_request["headers"].get(key)
                err_msg = (
                    "Header mismatch on %(key)r, "
                    "expected %(expected_value)r and got %(value)r "
                    "for %(method)s %(path)s %(headers)r" % real_request
                )
                self.orig_assertEqual(value, real_request["value"], err_msg)
            real_request["extra_headers"] = dict(
                (key, value) for key, value in real_request["headers"].items() if key not in headers
            )
            if real_request["extra_headers"]:
                self.fail(
                    "Received unexpected headers for %(method)s " "%(path)s, got %(extra_headers)r" % real_request
                )
开发者ID:tipabu,项目名称:python-swiftclient,代码行数:32,代码来源:utils.py


示例5: DDWRT

class DDWRT(Router):
    def __init__(self, conf, hostnames):
        self.hostnames = CaseInsensitiveDict()
        self.hostnames.update(hostnames)
        self.conf = conf
        self.auth = self.conf.auth()

    def clients(self):
        """ Receives all currently logged in users in a wifi network.

        :rtype : list
        :return: Returns a list of dicts, containing the following keys: mac, ipv4, seen, hostname
        """
        clients = self._get_clients_raw()

        clients_json = []
        for client in clients:
            client_hostname_from_router = client[0]
            client_ipv4 = client[1].strip()
            client_mac = client[2].strip().upper()
            client_hostname = self.hostnames.get(client_mac, client_hostname_from_router).strip()
            client_connections = int(client[3].strip())

            # Clients with less than 20 connections are considered offline
            if client_connections < 20:
                continue

            clients_json.append({
                'mac': client_mac,
                'ipv4': client_ipv4,
                'seen': int(time.time()),
                'hostname': client_hostname,
            })

        logger.debug('The router got us {} clients.'.format(len(clients_json)))
        logger.debug(str(clients_json))
        return clients_json

    def _get_clients_raw(self):
        info_page = self.conf.internal()
        response = requests.get(info_page, auth=self.auth)
        logger.info('Got response from router with code {}.'.format(response.status_code))
        return DDWRT._convert_to_clients(response.text) or []

    @staticmethod
    def _convert_to_clients(router_info_all):
        # Split router info in lines and filter empty info
        router_info_lines = filter(None, router_info_all.split("\n"))

        # Get key / value of router info
        router_info_items = dict()
        for item in router_info_lines:
            key, value = item[1:-1].split("::")  # Remove curly braces and split
            router_info_items[key.strip()] = value.strip()

        # Get client info as a list
        arp_table = utils.groupn(router_info_items['arp_table'].replace("'", "").split(","), 4)
        dhcp_leases = utils.groupn(router_info_items['dhcp_leases'].replace("'", "").split(","), 5)

        return arp_table if (len(arp_table) > 0) else []
开发者ID:JonasGroeger,项目名称:labspion,代码行数:60,代码来源:ddwrt.py


示例6: prepare_response

    def prepare_response(self, cached):
        """Verify our vary headers match and construct a real urllib3
        HTTPResponse object.
        """
        # Special case the '*' Vary value as it means we cannot actually
        # determine if the cached response is suitable for this request.
        if "*" in cached.get("vary", {}):
            return

        body_raw = cached["response"].pop("body")

        headers = CaseInsensitiveDict(data=cached['response']['headers'])
        if headers.get('transfer-encoding', '') == 'chunked':
            headers.pop('transfer-encoding')

        cached['response']['headers'] = headers

        try:
            body = io.BytesIO(body_raw)
        except TypeError:
            # This can happen if cachecontrol serialized to v1 format (pickle)
            # using Python 2. A Python 2 str(byte string) will be unpickled as
            # a Python 3 str (unicode string), which will cause the above to
            # fail with:
            #
            #     TypeError: 'str' does not support the buffer interface
            body = io.BytesIO(body_raw.encode('utf8'))

        return HTTPResponse(
            body=body,
            preload_content=False,
            **cached["response"]
        )
开发者ID:RyPeck,项目名称:cbapi-python,代码行数:33,代码来源:serialize.py


示例7: Attachment

class Attachment(object):
    def __init__(self, part):
        encoding = part.encoding or 'utf-8'
        self.headers = CaseInsensitiveDict({
            k.decode(encoding): v.decode(encoding)
            for k, v in part.headers.items()
        })
        self.content_type = self.headers.get('Content-Type', None)
        self.content_id = self.headers.get('Content-ID', None)
        self.content_location = self.headers.get('Content-Location', None)
        self._part = part

    def __repr__(self):
        return '<Attachment(%r, %r)>' % (self.content_id, self.content_type)

    @cached_property
    def content(self):
        """Return the content of the attachment

        :rtype: bytes or str

        """
        encoding = self.headers.get('Content-Transfer-Encoding', None)
        content = self._part.content

        if encoding == 'base64':
            return base64.b64decode(content)
        elif encoding == 'binary':
            return content.strip(b'\r\n')
        else:
            return content
开发者ID:ovnicraft,项目名称:python-zeep,代码行数:31,代码来源:attachments.py


示例8: MockResponse

class MockResponse(object):
    """
    Mock response object with a status code and some content
    """
    def __init__(self, status_code, content=None, headers=None):
        self.status_code = status_code
        self.content = content or ''
        self.headers = CaseInsensitiveDict()

        if headers:
            self.headers.update(headers)

    def raise_for_status(self):
        http_error_msg = ''

        if 400 <= self.status_code < 500:
            http_error_msg = '%s Client Error: ...' % self.status_code

        elif 500 <= self.status_code < 600:
            http_error_msg = '%s Server Error: ...' % self.status_code

        if http_error_msg:
            raise requests.HTTPError(http_error_msg, response=self)

    def json(self, **kwargs):
        return json.loads(self.content)
开发者ID:palanisamyprps,项目名称:rapidpro-python,代码行数:26,代码来源:tests.py


示例9: test_get

 def test_get(self):
     cid = CaseInsensitiveDict()
     cid["spam"] = "oneval"
     cid["SPAM"] = "blueval"
     self.assertEqual(cid.get("spam"), "blueval")
     self.assertEqual(cid.get("SPAM"), "blueval")
     self.assertEqual(cid.get("sPam"), "blueval")
     self.assertEqual(cid.get("notspam", "default"), "default")
开发者ID:kprantis,项目名称:requests,代码行数:8,代码来源:test_requests.py


示例10: test_preserve_last_key_case

 def test_preserve_last_key_case(self):
     cid = CaseInsensitiveDict({"Accept": "application/json", "user-Agent": "requests"})
     cid.update({"ACCEPT": "application/json"})
     cid["USER-AGENT"] = "requests"
     keyset = frozenset(["ACCEPT", "USER-AGENT"])
     assert frozenset(i[0] for i in cid.items()) == keyset
     assert frozenset(cid.keys()) == keyset
     assert frozenset(cid) == keyset
开发者ID:RRedwards,项目名称:requests,代码行数:8,代码来源:test_requests.py


示例11: test_lower_items

 def test_lower_items(self):
     cid = CaseInsensitiveDict({
         'Accept': 'application/json',
         'user-Agent': 'requests',
     })
     keyset = frozenset(lowerkey for lowerkey, v in cid.lower_items())
     lowerkeyset = frozenset(['accept', 'user-agent'])
     assert keyset == lowerkeyset
开发者ID:5x5x5x5,项目名称:try_git,代码行数:8,代码来源:test_requests.py


示例12: test_get

 def test_get(self):
     cid = CaseInsensitiveDict()
     cid["spam"] = "oneval"
     cid["SPAM"] = "blueval"
     assert cid.get("spam") == "blueval"
     assert cid.get("SPAM") == "blueval"
     assert cid.get("sPam") == "blueval"
     assert cid.get("notspam", "default") == "default"
开发者ID:RRedwards,项目名称:requests,代码行数:8,代码来源:test_requests.py


示例13: set_extra_headers

 def set_extra_headers(self, headers):
     header_dict = CaseInsensitiveDict(headers)
     if 'Reply-To' in header_dict:
         self.data["ReplyTo"] = header_dict.pop('Reply-To')
     self.data["Headers"] = [
         {"Name": key, "Value": value}
         for key, value in header_dict.items()
     ]
开发者ID:c-mrf,项目名称:django-anymail,代码行数:8,代码来源:postmark.py


示例14: test_preserve_key_case

 def test_preserve_key_case(self):
     cid = CaseInsensitiveDict({
         'Accept': 'application/json',
         'user-Agent': 'requests',
     })
     keyset = frozenset(['Accept', 'user-Agent'])
     assert frozenset(i[0] for i in cid.items()) == keyset
     assert frozenset(cid.keys()) == keyset
     assert frozenset(cid) == keyset
开发者ID:5x5x5x5,项目名称:try_git,代码行数:9,代码来源:test_requests.py


示例15: test_copy

 def test_copy(self):
     cid = CaseInsensitiveDict({
         'Accept': 'application/json',
         'user-Agent': 'requests',
     })
     cid_copy = cid.copy()
     assert cid == cid_copy
     cid['changed'] = True
     assert cid != cid_copy
开发者ID:503945930,项目名称:requests,代码行数:9,代码来源:test_requests.py


示例16: test_fixes_649

 def test_fixes_649(self):
     """__setitem__ should behave case-insensitively."""
     cid = CaseInsensitiveDict()
     cid['spam'] = 'oneval'
     cid['Spam'] = 'twoval'
     cid['sPAM'] = 'redval'
     cid['SPAM'] = 'blueval'
     assert cid['spam'] == 'blueval'
     assert cid['SPAM'] == 'blueval'
     assert list(cid.keys()) == ['SPAM']
开发者ID:5x5x5x5,项目名称:try_git,代码行数:10,代码来源:test_requests.py


示例17: test_setdefault

 def test_setdefault(self):
     cid = CaseInsensitiveDict({'Spam': 'blueval'})
     self.assertEqual(
         cid.setdefault('spam', 'notblueval'),
         'blueval'
     )
     self.assertEqual(
         cid.setdefault('notspam', 'notblueval'),
         'notblueval'
     )
开发者ID:k-mito,项目名称:requests-docs-jp,代码行数:10,代码来源:test_requests.py


示例18: request

    def request(self, method, url, accept_json=False, headers=None, params=None, json=None, data=None, files=None,
                **kwargs):
        full_url = self.url + url

        input_headers = _remove_null_values(headers) if headers else {}

        headers = CaseInsensitiveDict({'user-agent': 'watson-developer-cloud-python-' + __version__})
        if accept_json:
            headers['accept'] = 'application/json'
        headers.update(input_headers)

        # Remove keys with None values
        params = _remove_null_values(params)
        json = _remove_null_values(json)
        data = _remove_null_values(data)
        files = _remove_null_values(files)

        # Support versions of requests older than 2.4.2 without the json input
        if not data and json is not None:
            data = json_import.dumps(json)
            headers.update({'content-type': 'application/json'})

        auth = None
        if self.username and self.password:
            auth = (self.username, self.password)
        if self.api_key is not None:
            if params is None:
                params = {}
            if url.startswith('https://gateway-a.watsonplatform.net/calls'):
                params['apikey'] = self.api_key
            else:
                params['api_key'] = self.api_key

        response = requests.request(method=method, url=full_url, cookies=self.jar, auth=auth, headers=headers,
                                    params=params, data=data, files=files, **kwargs)

        if 200 <= response.status_code <= 299:
            if accept_json:
                response_json = response.json()
                if 'status' in response_json and response_json['status'] == 'ERROR':
                    response.status_code = 400
                    error_message = 'Unknown error'
                    if 'statusInfo' in response_json:
                        error_message = response_json['statusInfo']
                    if error_message == 'invalid-api-key':
                        response.status_code = 401
                    raise WatsonException('Error: ' + error_message)
                return response_json
            return response
        else:
            if response.status_code == 401:
                error_message = 'Unauthorized: Access is denied due to invalid credentials'
            else:
                error_message = self._get_error_message(response)
            raise WatsonException(error_message)
开发者ID:nthuy190991,项目名称:rocsi_dev,代码行数:55,代码来源:watson_developer_cloud_service.py


示例19: test_preserve_last_key_case

 def test_preserve_last_key_case(self):
     cid = CaseInsensitiveDict({
         'Accept': 'application/json',
         'user-Agent': 'requests',
     })
     cid.update({'ACCEPT': 'application/json'})
     cid['USER-AGENT'] = 'requests'
     keyset = frozenset(['ACCEPT', 'USER-AGENT'])
     assert frozenset(i[0] for i in cid.items()) == keyset
     assert frozenset(cid.keys()) == keyset
     assert frozenset(cid) == keyset
开发者ID:5x5x5x5,项目名称:try_git,代码行数:11,代码来源:test_requests.py


示例20: verify_signature

    def verify_signature(self, query_parameters):
        """Verify the signature provided with the query parameters.

        http://docs.shopify.com/api/tutorials/oauth

        example usage::

            from shopify_trois import Credentials
            from shopify_trois.engines import Json as Shopify
            from urllib.parse import parse_qsl

            credentials = Credentials(
                api_key='your_api_key',
                scope=['read_orders'],
                secret='your_app_secret'
            )

            shopify = Shopify(shop_name="your_store_name", credentials=\
                    credentials)

            query_parameters = parse_qsl("code=238420989938cb70a609f6ece2e2586\
b&shop=yourstore.myshopify.com&timestamp=1373382939&\
signature=6fb122e33c21851c465345b8cb97245e")

            if not shopify.verify_signature(query_parameters):
                raise Exception("invalid signature")

            credentials.code = dict(query_parameters).get('code')

            shopify.setup_access_token()

        :returns: Returns True if the signature is valid.

        """
        params = CaseInsensitiveDict(query_parameters)
        signature = params.pop("signature", None)

        calculated = ["%s=%s" % (k, v) for k, v in params.items()]
        calculated.sort()
        calculated = "".join(calculated)

        calculated = "{secret}{calculated}".format(
            secret=self.credentials.secret,
            calculated=calculated
        )

        md5 = hashlib.md5()
        md5.update(calculated.encode('utf-8'))

        produced = md5.hexdigest()

        return produced == signature
开发者ID:RafaAguilar,项目名称:shopify-trois,代码行数:52,代码来源:oauth_engine.py



注:本文中的requests.structures.CaseInsensitiveDict类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python utils.add_dict_to_cookiejar函数代码示例发布时间:2022-05-26
下一篇:
Python sessions.Session类代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap