• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python socket.gaierror函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中socket.gaierror函数的典型用法代码示例。如果您正苦于以下问题:Python gaierror函数的具体用法?Python gaierror怎么用?Python gaierror使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了gaierror函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: getnameinfo

def getnameinfo(address, flags):
    dns.build_resolver()
    try:
        host, port = address
    except (ValueError, TypeError):
        if not isinstance(address, tuple):
            del address
            raise TypeError('getnameinfo() argument must be a tuple')
        else:
            raise socket.gaierror(
                socket.EAI_NONAME, "Name or service not known")

    if (flags & socket.NI_NAMEREQD) and (flags & socket.NI_NUMERICHOST):
        raise socket.gaierror(
            socket.EAI_NONAME, "Name or service not known")

    if dns.is_ipv4(host):
        try:
            name = dns.reversename.from_address(host)

            results = dns.resolver_obj.query(name, dns.rdatatype.PTR)
            if len(results) > 1:
                raise socket.error(
                    "sockaddr resolved to multiple addresses")

            host = results[0].target.to_text(omit_final_dot=True)
        except dns.exception.Timeout, exc:
            if flags & socket.NI_NAMEREQD:
                raise socket.gaierror(socket.EAI_AGAIN, 'Lookup timed out')
        except dns.resolver.NXDOMAIN:
            return (host, str(port))
开发者ID:fun-alex-alex2006hw,项目名称:greenhouse,代码行数:31,代码来源:dns.py


示例2: fake_gethostbyname

def fake_gethostbyname(hostname):
    try:
        host_ip = None

        # for templates/index.py
        if hostname == config.get("broker", "bind_address"):
            host_ip = gethostbyname_orig(hostname)
        else:
            # faking hostip
            fake_hosts = config.get("unittest", "fake_hosts_location")
            hostfilename = fake_hosts + hostname

            # strip domain part
            if not os.path.exists(hostfilename) and hostname.find(".") > -1:
                hostfilename = fake_hosts + hostname[: hostname.find(".")]

            hostfile = open(hostfilename).readlines()
            primary_name = hostfile[0].split()[2]
            ip_re = re.compile(r"^\s*([a-z0-9]+)\s+[a-z0-9]+\s+([0-9\.]+)")
            for line in hostfile:
                m = ip_re.search(line)
                if m and primary_name == m.group(1):
                    host_ip = m.group(2)
                    break

        if host_ip == None:
            raise gaierror(-2, "Name or service not known")

        return host_ip

    except IOError, e:
        # To have the cause in aqd.log
        raise gaierror(-2, "Name or service not known %s" % e)
开发者ID:stdweird,项目名称:aquilon,代码行数:33,代码来源:unittest_patches.py


示例3: do_resolve

 def do_resolve(query, dnsserver, timeout, queobj):
     if isinstance(query, basestring):
         qtype = dnslib.QTYPE.AAAA if ':' in dnsserver else dnslib.QTYPE.A
         query = dnslib.DNSRecord(q=dnslib.DNSQuestion(query, qtype=qtype))
     query_data = query.pack()
     sock_family = socket.AF_INET6 if ':' in dnsserver else socket.AF_INET
     sock = socket.socket(sock_family)
     rfile = None
     try:
         sock.settimeout(timeout or None)
         sock.connect(parse_hostport(dnsserver, 53))
         sock.send(struct.pack('>h', len(query_data)) + query_data)
         rfile = sock.makefile('r', 1024)
         reply_data_length = rfile.read(2)
         if len(reply_data_length) < 2:
             raise socket.gaierror(11004, 'getaddrinfo %r from %r failed' % (query, dnsserver))
         reply_data = rfile.read(struct.unpack('>h', reply_data_length)[0])
         record = dnslib.DNSRecord.parse(reply_data)
         iplist = [str(x.rdata) for x in record.rr if x.rtype in (1, 28, 255)]
         if any(x in blacklist for x in iplist):
             logging.debug('query=%r dnsserver=%r record bad iplist=%r', query, dnsserver, iplist)
             raise socket.gaierror(11004, 'getaddrinfo %r from %r failed' % (query, dnsserver))
         else:
             logging.debug('query=%r dnsserver=%r record iplist=%s', query, dnsserver, iplist)
             queobj.put(record)
     except socket.error as e:
         logging.debug('query=%r dnsserver=%r failed %r', query, dnsserver, e)
         queobj.put(e)
     finally:
         if rfile:
             rfile.close()
         sock.close()
开发者ID:anarkia7115,项目名称:smartladder,代码行数:32,代码来源:dnsproxy.py


示例4: getnameinfo

def getnameinfo(sockaddr, flags):
	"""Replacement for Python's socket.getnameinfo.

	Currently only supports IPv4.
	"""
	host, port = sockaddr

	if (flags & socket.NI_NAMEREQD) and (flags & socket.NI_NUMERICHOST):
		# Conflicting flags.  Punt.
		raise socket.gaierror(
			(socket.EAI_NONAME, 'Name or service not known'))

	if is_ipv4_addr(host):
		try:
			rrset =	resolver.query(
				dns.reversename.from_address(host), dns.rdatatype.PTR)
			if len(rrset) > 1:
				raise socket.error('sockaddr resolved to multiple addresses')
			host = rrset[0].target.to_text(omit_final_dot=True)
		except dns.exception.Timeout, e:
			if flags & socket.NI_NAMEREQD:
				raise socket.gaierror((socket.EAI_AGAIN, 'Lookup timed out'))
		except dns.exception.DNSException, e:
			if flags & socket.NI_NAMEREQD:
				raise socket.gaierror(
					(socket.EAI_NONAME, 'Name or service not known'))
开发者ID:rtyler,项目名称:gogreen,代码行数:26,代码来源:purepydns.py


示例5: test_no_internet

def test_no_internet(monkeypatch):
    # No crash occur if the computer don't have access to internet.
    from socket import gaierror
    monkeypatch.setattr(boc_currency_provider, 'urlopen', exception_raiser(gaierror()))
    monkeypatch.setattr(yahoo_currency_provider, 'urlopen', exception_raiser(gaierror()))
    with raises(RateProviderUnavailable):
        boc_currency_provider.BOCProviderPlugin().wrapped_get_currency_rates(
            'USD', date(2008, 5, 20), date(2008, 5, 20)
        )
        yahoo_currency_provider.YahooProviderPlugin().wrapped_get_currency_rates(
            'LVL', date(2008, 5, 20), date(2008, 5, 20)
        )
开发者ID:fokusov,项目名称:moneyguru,代码行数:12,代码来源:currency_test.py


示例6: dnslib_resolve_over_tcp

def dnslib_resolve_over_tcp(query, dnsservers, timeout, **kwargs):
    """dns query over tcp"""
    if not isinstance(query, (basestring, dnslib.DNSRecord)):
        raise TypeError("query argument requires string/DNSRecord")
    blacklist = kwargs.get("blacklist", ())

    def do_resolve(query, dnsserver, timeout, queobj):
        if isinstance(query, basestring):
            qtype = dnslib.QTYPE.AAAA if ":" in dnsserver else dnslib.QTYPE.A
            query = dnslib.DNSRecord(q=dnslib.DNSQuestion(query, qtype=qtype))
        query_data = query.pack()
        sock_family = socket.AF_INET6 if ":" in dnsserver else socket.AF_INET
        sock = socket.socket(sock_family)
        rfile = None
        try:
            sock.settimeout(timeout or None)
            sock.connect(parse_hostport(dnsserver, 53))
            sock.send(struct.pack(">h", len(query_data)) + query_data)
            rfile = sock.makefile("r", 1024)
            reply_data_length = rfile.read(2)
            if len(reply_data_length) < 2:
                raise socket.gaierror(11004, "getaddrinfo %r from %r failed" % (query, dnsserver))
            reply_data = rfile.read(struct.unpack(">h", reply_data_length)[0])
            record = dnslib.DNSRecord.parse(reply_data)
            iplist = [str(x.rdata) for x in record.rr if x.rtype in (1, 28, 255)]
            if any(x in blacklist for x in iplist):
                logging.debug("query=%r dnsserver=%r record bad iplist=%r", query, dnsserver, iplist)
                raise socket.gaierror(11004, "getaddrinfo %r from %r failed" % (query, dnsserver))
            else:
                logging.debug("query=%r dnsserver=%r record iplist=%s", query, dnsserver, iplist)
                queobj.put(record)
        except socket.error as e:
            logging.debug("query=%r dnsserver=%r failed %r", query, dnsserver, e)
            queobj.put(e)
        finally:
            if rfile:
                rfile.close()
            sock.close()

    queobj = Queue.Queue()
    for dnsserver in dnsservers:
        thread.start_new_thread(do_resolve, (query, dnsserver, timeout, queobj))
    for i in range(len(dnsservers)):
        try:
            result = queobj.get(timeout)
        except Queue.Empty:
            raise socket.gaierror(11004, "getaddrinfo %r from %r failed" % (query, dnsservers))
        if result and not isinstance(result, Exception):
            return result
        elif i == len(dnsservers) - 1:
            logging.warning("dnslib_resolve_over_tcp %r with %s return %r", query, dnsservers, result)
    raise socket.gaierror(11004, "getaddrinfo %r from %r failed" % (query, dnsservers))
开发者ID:CaptainHuangsh,项目名称:goagent-crack,代码行数:52,代码来源:dnsproxy.py


示例7: test_gaierror

 def test_gaierror(self):
     """
     L{error.getConnectError} converts to a L{error.UnknownHostError} given
     a C{socket.gaierror} instance.
     """
     result = error.getConnectError(socket.gaierror(12, "hello"))
     self.assertCorrectException(12, "hello", result, error.UnknownHostError)
开发者ID:AlexanderHerlan,项目名称:syncpy,代码行数:7,代码来源:test_error.py


示例8: test_should_reject_false

    def test_should_reject_false(self, lookup_call):
        import socket

        lookup_call.side_effect = socket.gaierror("foobar")
        dnsbl = self._make_one()
        self.assertFalse(dnsbl.should_reject({"ip_address": "10.0.100.1"}))
        lookup_call.assert_called_with("1.100.0.10.xbl.spamhaus.org.")
开发者ID:pxfs,项目名称:fanboi2,代码行数:7,代码来源:test_filters.py


示例9: getaddrinfo

    def getaddrinfo(self, host, port, family=0, socktype=0, proto=0, flags=0):
        """
        Mock for L{socket.getaddrinfo}.

        @param host: see L{socket.getaddrinfo}

        @param port: see L{socket.getaddrinfo}

        @param family: see L{socket.getaddrinfo}

        @param socktype: see L{socket.getaddrinfo}

        @param proto: see L{socket.getaddrinfo}

        @param flags: see L{socket.getaddrinfo}

        @return: L{socket.getaddrinfo}
        """
        self.calls.append((host, port, family, socktype, proto, flags))
        results = self.results[host]
        if results:
            return results
        else:
            raise gaierror(EAI_NONAME,
                           'nodename nor servname provided, or not known')
开发者ID:JohnDoes95,项目名称:project_parser,代码行数:25,代码来源:test_resolver.py


示例10: httpLocThatRequiresAProxy

def httpLocThatRequiresAProxy(loc):
    import networking
    if networking.config._useProxy:
        return fauxrootOpen(loc)
    sleep()
    sockErr = socket.gaierror(110, 'Connection timed out')
    raise urllib2.URLError(sockErr)
开发者ID:vmware,项目名称:weasel,代码行数:7,代码来源:fauxlocations.py


示例11: mocked_socket_gethostbyname

 def mocked_socket_gethostbyname(domain):
     claimed_domains = ["google.com"]
     if domain not in claimed_domains:
         from socket import gaierror
         raise gaierror("[Errno -2] Name or service not known")
     else:
         return '216.58.221.46'
开发者ID:cortesi,项目名称:mitmproxy,代码行数:7,代码来源:test_xss_scanner.py


示例12: test_connection_error

 def test_connection_error(self):
     reason = socket.gaierror(8, 'nodename nor servname provided')
     self.urlopen_mock.side_effect = urllib2.URLError(reason)
     tasks.fetch_manifest('url', self.upload.pk)
     self.check_validation(
         'No manifest was found at that URL. Check the address and try '
         'again.')
开发者ID:rtnpro,项目名称:zamboni,代码行数:7,代码来源:test_tasks.py


示例13: _fast_getaddrinfo

 def _fast_getaddrinfo(host, *args, **kwargs):
     def needs_dns_resolving(host2):
         try:
             ipaddress.ip_address(host2)
             return False  # already valid IP
         except ValueError:
             pass  # not an IP
         if str(host) in ('localhost', 'localhost.',):
             return False
         return True
     try:
         if needs_dns_resolving(host):
             answers = dns.resolver.query(host)
             addr = str(answers[0])
         else:
             addr = host
     except dns.exception.DNSException as e:
         # dns failed for some reason, e.g. dns.resolver.NXDOMAIN
         # this is normal. Simply report back failure:
         raise socket.gaierror(11001, 'getaddrinfo failed') from e
     except BaseException as e:
         # Possibly internal error in dnspython :( see #4483
         # Fall back to original socket.getaddrinfo to resolve dns.
         print_error('dnspython failed to resolve dns with error:', e)
         addr = host
     return socket._getaddrinfo(addr, *args, **kwargs)
开发者ID:chrisrico,项目名称:electrum,代码行数:26,代码来源:network.py


示例14: test_retry_dns_error

 def test_retry_dns_error(self):
     with mock.patch('socket.gethostbyname',
                     side_effect=socket.gaierror(-5, 'No address associated with hostname')):
         crawler = get_crawler(SimpleSpider)
         with LogCapture() as l:
             yield crawler.crawl("http://example.com/")
         self._assert_retried(l)
开发者ID:cdingding,项目名称:scrapy,代码行数:7,代码来源:test_crawl.py


示例15: test_run_command_exception_gaierror

 def test_run_command_exception_gaierror(self):
     """Testing socket.gaierror in run_command."""
     self.connection._telnet = mock.Mock()
     self.connection._telnet.write = mock.Mock()
     self.connection._telnet.write.side_effect = socket.gaierror('except')
     self.connection.run_command('test')
     self.assertFalse(self.connection._connected)
开发者ID:chuchock,项目名称:home-assistant,代码行数:7,代码来源:test_asuswrt.py


示例16: dummy_gethostbyname

 def dummy_gethostbyname(self, host):
     if host in ("localhost", "127.0.0.1"):
         return "127.0.0.1"
     elif re.match(r"\d+\.\d+\.\d+\.\d+", host):
         return host
     else:
         raise socket.gaierror("Dummy test error")
开发者ID:ndonegan,项目名称:calico,代码行数:7,代码来源:test_config.py


示例17: testDefaultExceptionHandler

    def testDefaultExceptionHandler(self):
        """Ensures exception handles swallows (retries)"""
        mock_http_content = 'content'.encode('utf8')
        for exception_arg in (
                http_client.BadStatusLine('line'),
                http_client.IncompleteRead('partial'),
                http_client.ResponseNotReady(),
                socket.error(),
                socket.gaierror(),
                httplib2.ServerNotFoundError(),
                ValueError(),
                oauth2client.client.HttpAccessTokenRefreshError(status=503),
                exceptions.RequestError(),
                exceptions.BadStatusCodeError(
                    {'status': 503}, mock_http_content, 'url'),
                exceptions.RetryAfterError(
                    {'status': 429}, mock_http_content, 'url', 0)):

            retry_args = http_wrapper.ExceptionRetryArgs(
                http={'connections': {}}, http_request=_MockHttpRequest(),
                exc=exception_arg, num_retries=0, max_retry_wait=0,
                total_wait_sec=0)

            # Disable time.sleep for this handler as it is called with
            # a minimum value of 1 second.
            with patch('time.sleep', return_value=None):
                http_wrapper.HandleExceptionsAndRebuildHttpConnections(
                    retry_args)
开发者ID:danacton,项目名称:appengine-endpoints-modules-lnkr,代码行数:28,代码来源:http_wrapper_test.py


示例18: getnameinfo

def getnameinfo (addr, flags):
    """
    Replacement for Python's socket.getnameinfo.

    The flags can be:

    * NI_NAMEREQD If set, then an error is returned if the hostname cannot be determined.
    * NI_DGRAM: If set, then the service is datagram (UDP) based rather than stream (TCP) based. This is required for the few ports (512-514) that have different services for UDP and TCP.
    * NI_NOFQDN: If set, return only the hostname part of the fully qualified domain name for local hosts.
    * NI_NUMERICHOST: If set, then the numeric form of the hostname is returned. (When not set, this will still happen in case the node's name cannot be determined.)
    * NI_NUMERICSERV: If set, then the numeric form of the service address is returned. (When not set, this will still happen in case the service's name cannot be determined.)

    :param flags: the modifer flags
    """
    if (flags & socket.NI_NAMEREQD) and (flags & socket.NI_NUMERICHOST):
        # Conflicting flags.  Punt.
        raise socket.gaierror((socket.EAI_NONAME, 'Name or service not known'))

    resolved = Event()

    def _resolve_callback(result, errorno):
        try:
            if errorno:
                e = pycares.errno.errorcode[errorno]
                msg = pycares.errno.strerror(errorno)
                resolved.send_exception(socket.gaierror(e, msg))
            else:
                resolved.send(result)
        except Exception, e:
            resolved.send_exception(e)
开发者ID:inercia,项目名称:evy,代码行数:30,代码来源:dns.py


示例19: _urlopen

    def _urlopen(self, req, timeout=None):
        url = req.get_full_url()
        if url == 'impossible url':
            raise ValueError()
        if url == 'http://dwqkndwqpihqdw.com':
            msg = 'Name or service not known'
            raise urllib2.URLError(socket.gaierror(-2, msg))

        if url in ('http://google.com', 'http://goodauth'):
            return FakeResult()
        if url == 'http://badauth':
            raise urllib2.HTTPError(url, 401, '', {}, None)
        if url == 'http://timeout':
            raise urllib2.URLError(socket.timeout())
        if url == 'http://error':
            raise urllib2.HTTPError(url, 500, 'Error', {}, None)
        if url == 'http://newplace':
            res = FakeResult()
            res.body = url + ' ' + req.headers['Authorization']
            return res
        if url == 'http://xheaders':
            res = FakeResult()
            headers = req.headers.items()
            headers.sort()
            res.body = str(headers)
            return res

        raise ValueError(url)
开发者ID:irslambouf,项目名称:SyncServer,代码行数:28,代码来源:test_util.py


示例20: test_is_eventlet_bug105

 def test_is_eventlet_bug105(self):
     fake_dns = mock.Mock()
     fake_dns.getaddrinfo.side_effect = socket.gaierror(errno.EBADF)
     with mock.patch.dict('sys.modules', {
             'eventlet.support.greendns': fake_dns}):
         self.assertTrue(utils.is_eventlet_bug105())
         fake_dns.getaddrinfo.assert_called_once()
开发者ID:vladiskuz,项目名称:manila,代码行数:7,代码来源:test_utils.py



注:本文中的socket.gaierror函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python socket.getaddrinfo函数代码示例发布时间:2022-05-27
下一篇:
Python socket.fromfd函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap