本文整理汇总了Python中socket.getnameinfo函数的典型用法代码示例。如果您正苦于以下问题:Python getnameinfo函数的具体用法?Python getnameinfo怎么用?Python getnameinfo使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了getnameinfo函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: testInterpreterCrash
def testInterpreterCrash(self):
# Making sure getnameinfo doesn't crash the interpreter
try:
# On some versions, this crashes the interpreter.
socket.getnameinfo(('x', 0, 0, 0), 0)
except socket.error:
pass
开发者ID:249550148,项目名称:gevent,代码行数:7,代码来源:test_socket.py
示例2: testInterpreterCrash
def testInterpreterCrash(self):
if sys.platform[:4] == 'java': return
# Making sure getnameinfo doesn't crash the interpreter
try:
# On some versions, this crashes the interpreter.
socket.getnameinfo(('x', 0, 0, 0), 0)
except socket.error:
pass
开发者ID:babble,项目名称:babble,代码行数:8,代码来源:test_socket.py
示例3: testRefCountGetNameInfo
def testRefCountGetNameInfo(self):
# Testing reference count for getnameinfo
if hasattr(sys, "getrefcount"):
try:
# On some versions, this loses a reference
orig = sys.getrefcount(__name__)
socket.getnameinfo(__name__, 0)
except TypeError:
self.assertEqual(sys.getrefcount(__name__), orig, "socket.getnameinfo loses a reference")
开发者ID:slide,项目名称:main,代码行数:9,代码来源:test_socket.py
示例4: testRefCountGetNameInfo
def testRefCountGetNameInfo(self):
# Testing reference count for getnameinfo
if hasattr(sys, "getrefcount"):
try:
# On some versions, this loses a reference
orig = sys.getrefcount(__name__)
socket.getnameinfo(__name__,0)
except SystemError:
if sys.getrefcount(__name__) <> orig:
self.fail("socket.getnameinfo loses a reference")
开发者ID:1310701102,项目名称:sl4a,代码行数:10,代码来源:test_socket.py
示例5: validate
def validate(self, key, client_addr, cert_principals=None, ca=False):
for entry in self._ca_entries if ca else self._user_entries:
if entry.key != key:
continue
from_patterns = entry.options.get('from')
if from_patterns is not None:
client_host, _ = socket.getnameinfo((client_addr, 0),
socket.NI_NUMERICSERV)
client_ip = ip_address(client_addr)
if not all(pattern.matches(client_host, client_addr, client_ip)
for pattern in from_patterns):
continue
principal_patterns = entry.options.get('principals')
if cert_principals is not None and principal_patterns is not None:
if not all(any(pattern.matches(principal)
for principal in cert_principals)
for pattern in principal_patterns):
continue
return entry.options
return None
开发者ID:dcafferty,项目名称:asyncssh,代码行数:25,代码来源:auth_keys.py
示例6: getpeernameinfo
def getpeernameinfo(self):
"""Returns (remote, port, family) tuple
"""
(addr, port) = socket.getnameinfo(self.socket.getpeername(),
socket.NI_NUMERICHOST | socket.NI_NUMERICSERV)
return (addr, port, self.socket.family)
开发者ID:bjencks,项目名称:captiveportal,代码行数:7,代码来源:asyncserver.py
示例7: canonicalize_dns
def canonicalize_dns(inhost, family=socket.AF_UNSPEC):
"Canonicalize a hostname or numeric IP address."
if inhost in canonicalization_cache:
return canonicalization_cache[inhost]
# Catch garbaged hostnames in corrupted Mode 6 responses
m = re.match("([:.[\]]|\w)*", inhost)
if not m:
raise TypeError
(hostname, portsuffix) = portsplit(inhost)
try:
ai = socket.getaddrinfo(hostname, None, family, 0, 0, socket.AI_CANONNAME)
except socket.gaierror as e:
print('getaddrinfo failed: %s' % e, file=sys.stderr)
raise SystemExit(1)
(family, socktype, proto, canonname, sockaddr) = ai[0]
try:
name = socket.getnameinfo(sockaddr, socket.NI_NAMEREQD)
result = name[0].lower() + portsuffix
except socket.gaierror:
# On OS X, canonname is empty for hosts without rDNS.
# Fall back to the hostname.
canonicalized = canonname or hostname
result = canonicalized.lower() + portsuffix
canonicalization_cache[inhost] = result
return result
开发者ID:ntpsec,项目名称:ntpsec,代码行数:25,代码来源:util.py
示例8: _validate_ip_address
def _validate_ip_address(family, address):
"""Check if `address` is valid IP address and return it, in a normalized
form.
:Parameters:
- `family`: ``socket.AF_INET`` or ``socket.AF_INET6``
- `address`: the IP address to validate
"""
try:
info = socket.getaddrinfo(address, 0, family, socket.SOCK_STREAM, 0,
socket.AI_NUMERICHOST)
except socket.gaierror as err:
logger.debug("gaierror: {0} for {1!r}".format(err, address))
raise ValueError("Bad IP address")
if not info:
logger.debug("getaddrinfo result empty")
raise ValueError("Bad IP address")
addr = info[0][4]
logger.debug(" got address: {0!r}".format(addr))
try:
return socket.getnameinfo(addr, socket.NI_NUMERICHOST)[0]
except socket.gaierror as err:
logger.debug("gaierror: {0} for {1!r}".format(err, addr))
raise ValueError("Bad IP address")
开发者ID:Ansen,项目名称:openshift-xmpptalk-ircbindxmpp-bundle,代码行数:26,代码来源:jid.py
示例9: match_options
def match_options(self, client_addr, cert_principals, cert_subject=None):
"""Match "from", "principals" and "subject" options in entry"""
from_patterns = self.options.get('from')
if from_patterns:
client_host, _ = socket.getnameinfo((client_addr, 0),
socket.NI_NUMERICSERV)
client_ip = ip_address(client_addr)
if not all(pattern.matches(client_host, client_addr, client_ip)
for pattern in from_patterns):
return False
principal_patterns = self.options.get('principals')
if cert_principals is not None and principal_patterns is not None:
if not all(any(pattern.matches(principal)
for principal in cert_principals)
for pattern in principal_patterns):
return False
subject_patterns = self.options.get('subject')
if cert_subject is not None and subject_patterns is not None:
if not all(pattern.matches(cert_subject)
for pattern in subject_patterns):
return False
return True
开发者ID:ronf,项目名称:asyncssh,代码行数:30,代码来源:auth_keys.py
示例10: __init__
def __init__(self, parent, server):
"""
Initialize a HeyU client application.
:param parent: The parent of the ``HubApplication``. This
will be an instance of ``tendril.Tendril``.
:param server: The underlying HeyU server instance. The
server keeps track of subscriptions and
forwards notifications to the subscribers.
"""
# Initialize the application
super(HubApplication, self).__init__(parent)
# Save the server link
self.server = server
# Are we a persistent connection?
self.persist = False
# Set up the desired framer
parent.framers = tendril.COBSFramer(True)
# Determine the hostname of the client
try:
if parent.remote_addr[0] in ('127.0.0.1', '::1'):
self.hostname = socket.getfqdn()
else:
self.hostname, _port = socket.getnameinfo(parent.remote_addr,
0)
except Exception:
# Just use the bare address
self.hostname = parent.remote_addr[0]
开发者ID:klmitch,项目名称:heyu,代码行数:33,代码来源:hub.py
示例11: update
def update(self, s):
self.win.erase()
self.addstr(1, 0, "CLIENT PORT S I QUEUED RECVD SENT", curses.A_REVERSE)
self.sessions[s.server_id] = s.sessions
items = []
for l in self.sessions:
items.extend(l)
items.sort(key=lambda x: int(x.queued), reverse=True)
for i, session in enumerate(items):
try:
# ugh, need to handle if slow - thread for async resolver?
if options.names:
session.host = socket.getnameinfo((session.host, int(session.port)), 0)[0]
self.addstr(
i + 2,
0,
"%-15s %5s %1s %1s %8s %8s %8s"
% (
session.host[:15],
session.port,
session.server_id,
session.interest_ops,
session.queued,
session.recved,
session.sent,
),
)
except:
break
开发者ID:mrtvfuencxozd,项目名称:zktop,代码行数:29,代码来源:zktop.py
示例12: listen
def listen(self):
if __debug__:
self.err_log.debug('Entering main loop.')
while True:
try:
sock, addr = self.listener.accept()
if self.secure:
sock = self.wrap_socket(sock)
self.active_queue.put((
(sock, socket.getnameinfo(addr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV)),
self.interface[1],
self.secure
))
except socket.timeout:
# socket.timeout will be raised every THREAD_STOP_CHECK_INTERVAL
# seconds. When that happens, we check if it's time to die.
if not self.ready:
if __debug__:
self.err_log.debug('Listener exiting.')
try:
self.listener.shutdown(socket.SHUT_RDWR)
except socket.error:
self.err_log.warning('Socket shutdown() failed')
self.listener.close()
return
else:
continue
except:
self.err_log.error(str(traceback.format_exc()))
开发者ID:errbotio,项目名称:rocket,代码行数:33,代码来源:listener.py
示例13: setup
def setup(self):
# some hack for mapped IPv4-to-IPv6 addresses
ip = IPy.IP(self.client_address[0])
if ip.iptype() == "IPV4MAP":
self.client_address = (ip._getIPv4Map().strNormal(), self.client_address[1])
client_name = socket.getnameinfo(self.client_address, socket.NI_NUMERICSERV)
self.logger = logging.getLogger("ffcontroller.control-connection")
threading.current_thread().setName(client_name[0])
self.start_time = datetime.datetime.now()
self.last_command_time = self.start_time
self.commands_sent = 0
self.queue = MixedQueue(global_queue)
# start dedicated server for us
self.proxy_server = start_server(
ThreadedHTTPServer,
HTTPProxyRequestHandler,
options.proxy_address,
proxy_port_range.popleft(),
ServerConnectionPool(self.queue),
)
# register self for statistics
control_connections.append(self)
self.logger.debug("started")
socketserver.StreamRequestHandler.setup(self)
self.connection.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
开发者ID:nikicat,项目名称:ffcontroller,代码行数:25,代码来源:ffcontroller.py
示例14: connect
def connect(self):
# create listening socket for back connection
backserv = socket.socket()
backserv.settimeout(options.backconn_timeout)
# bind to some unused port
backserv.bind((options.backconn_address, 0))
selfaddr = backserv.getsockname()
selfnetloc = "{0}:{1}".format(*selfaddr)
self.logger.debug("awaiting back connection on {0}".format(selfnetloc))
# start listening
backserv.listen(1)
for i in range(options.retry_count):
# put both locations to queue
self.queue.put((selfnetloc, self.netloc))
# wait for connection
try:
self.sock, peeraddr = backserv.accept()
self.sock.settimeout(options.transport_timeout)
break
except socket.timeout:
self.logger.info("timeout while accepting back connection. resending request.")
else:
raise socket.timeout("timeout while trying to establish back connection")
self.logger.debug(
"accepted back connection from {0}:{1}".format(*socket.getnameinfo(peeraddr, socket.NI_NUMERICSERV))
)
# close server socket
backserv.close()
开发者ID:nikicat,项目名称:ffcontroller,代码行数:28,代码来源:ffcontroller.py
示例15: getHostnameFqdn
def getHostnameFqdn():
"""
Wrapper around getfqdn.
Returns the fully qualified hostname, None if not found.
"""
try:
sHostname = socket.getfqdn();
except:
return None;
if '.' in sHostname or sHostname.startswith('localhost'):
return sHostname;
#
# Somewhat misconfigured system, needs expensive approach to guessing FQDN.
# Get address information on the hostname and do a reverse lookup from that.
#
try:
aAddressInfo = socket.getaddrinfo(sHostname, None);
except:
return sHostname;
for aAI in aAddressInfo:
try: sName, _ = socket.getnameinfo(aAI[4], 0);
except: continue;
if '.' in sName and not set(sName).issubset(set('0123456789.')) and not sName.startswith('localhost'):
return sName;
return sHostname;
开发者ID:mdaniel,项目名称:virtualbox-org-svn-vbox-trunk,代码行数:31,代码来源:netutils.py
示例16: test_getnameinfo
def test_getnameinfo():
host = "127.0.0.1"
port = 25
info = socket.getnameinfo((host, port), 0)
w_l = space.appexec([w_socket, space.wrap(host), space.wrap(port)],
"(_socket, host, port): return _socket.getnameinfo((host, port), 0)")
assert space.unwrap(w_l) == info
开发者ID:Debug-Orz,项目名称:Sypy,代码行数:7,代码来源:test_sock_app.py
示例17: mac_to_ipv6
def mac_to_ipv6(prefix, mac):
mp = mac.split(":")
inv_a = int(mp[0], 16) ^ 2
addr = "{0}{1:02x}{2:02x}:{3:02x}ff:fe{4:02x}:{5:02x}{6:02x}" \
.format(prefix, inv_a, int(mp[1], 16), int(mp[2], 16),
int(mp[3], 16), int(mp[4], 16), int(mp[5], 16))
name = socket.getnameinfo((addr, 22), socket.NI_NUMERICSERV | socket.NI_NUMERICHOST)
return name[0]
开发者ID:saaros,项目名称:poni,代码行数:8,代码来源:cloud_libvirt.py
示例18: mac_to_ipv6
def mac_to_ipv6(prefix, mac):
mp = mac.split(":")
inv_a = int(mp[0], 16) ^ 2
addr = "%s%02x%02x:%02xff:fe%02x:%02x%02x" % \
(prefix, inv_a, int(mp[1], 16), int(mp[2], 16),
int(mp[3], 16), int(mp[4], 16), int(mp[5], 16))
name = socket.getnameinfo((addr, 22), socket.NI_NUMERICSERV|socket.NI_NUMERICHOST)
return name[0]
开发者ID:stisti,项目名称:poni,代码行数:8,代码来源:cloud_libvirt.py
示例19: find_jobs
def find_jobs(self, constraint = 'True', attributes = []):
"""
Return ClassAds for jobs matching the constraints.
"""
if len(self._schedds) == 0:
LOG.info('Finding schedds reporting to collector %s', self._collector_name)
attempt = 0
while True:
try:
schedd_ads = self._collector.query(htcondor.AdTypes.Schedd, self._schedd_constraint, ['MyAddress', 'ScheddIpAddr'])
break
except IOError:
attempt += 1
LOG.warning('Collector query failed: %s', str(sys.exc_info()[0]))
if attempt == 10:
LOG.error('Communication with the collector failed. We have no information of the condor pool.')
return
LOG.debug('%d schedd ads', len(schedd_ads))
for ad in schedd_ads:
LOG.debug(ad)
schedd = htcondor.Schedd(ad)
matches = re.match('<([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+):([0-9]+)', ad['MyAddress'])
# schedd does not have an ipaddr attribute natively, but we can assign it
schedd.ipaddr = matches.group(1)
schedd.host = socket.getnameinfo((matches.group(1), int(matches.group(2))), socket.AF_INET)[0] # socket.getnameinfo(*, AF_INET) returns a (host, port) 2-tuple
self._schedds.append(schedd)
LOG.debug('Found schedds: %s', ', '.join(['%s (%s)' % (schedd.host, schedd.ipaddr) for schedd in self._schedds]))
LOG.debug('Querying HTCondor with constraint "%s" for attributes %s', constraint, str(attributes))
classads = []
for schedd in self._schedds:
attempt = 0
while True:
try:
ads = schedd.query(constraint, attributes)
break
except IOError:
attempt += 1
LOG.warning('IOError in communicating with schedd %s. Trying again.', schedd.ipaddr)
if attempt == 10:
LOG.error('Schedd %s did not respond.', schedd.ipaddr)
ads = []
break
classads.extend(ads)
LOG.info('HTCondor returned %d classads', len(classads))
return classads
开发者ID:SmartDataProjects,项目名称:dynamo,代码行数:57,代码来源:htc.py
示例20: test_getnameinfo
def test_getnameinfo():
'''
Tests socket.getnameinfo()
'''
#sanity
socket.getnameinfo(("127.0.0.1", 80), 8)
socket.getnameinfo(("127.0.0.1", 80), 9)
host, service = socket.getnameinfo( ("127.0.0.1", 80), 8)
AreEqual(service, '80')
if is_cli:
AssertError(NotImplementedError, socket.getnameinfo, ("127.0.0.1", 80), 0)
#IP gives a TypeError
#AssertError(SystemError, socket.getnameinfo, ("127.0.0.1"), 8)
#AssertError(SystemError, socket.getnameinfo, (321), 8)
AssertError(TypeError, socket.getnameinfo, ("127.0.0.1"), '0')
AssertError(TypeError, socket.getnameinfo, ("127.0.0.1", 80, 0, 0, 0), 8)
AssertError(socket.gaierror, socket.getnameinfo, ('no such host will ever exist', 80), 8)
开发者ID:mdavid,项目名称:dlr,代码行数:19,代码来源:socket_test.py
注:本文中的socket.getnameinfo函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论