本文整理汇总了Python中socket.gethostbyname函数的典型用法代码示例。如果您正苦于以下问题:Python gethostbyname函数的具体用法?Python gethostbyname怎么用?Python gethostbyname使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了gethostbyname函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: __init__
def __init__(self):
if len(sys.argv)>1:
self.PNUM = int(raw_input("what port number do you want?"))
self.internal_print(str(self.PNUM+1) + "\n")
else:
self.PNUM = 12345
self.S = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
self.S.bind(("",self.PNUM))
self.ADR = {}
self.name = raw_input("username: ")
temp = raw_input("What address do you want to send data to: ")
if temp != "":
temp = temp.split(":")
if len(temp) == 2:
if temp[1] == "scan":
self.scan_for_connection(socket.gethostbyname(temp[0]))
else:
self.ADR[(socket.gethostbyname(temp[0]),int(temp[1]))] = ""
else:
self.ADR[(socket.gethostbyname(temp[0]),self.PNUM)] = ""
self.S.settimeout(2)
self.msg = ""
self.internal_print("You may now converse\nType 'exit' and hit return to leave the chat\n")
开发者ID:dileepvr,项目名称:no_twerking,代码行数:29,代码来源:DATAGRAM_peers.py
示例2: IsIPv4
def IsIPv4(url):
import socket
try:
#print socket.getaddrinfo(sr[1], 80)
socket.gethostbyname(url)
return True
except: return False
开发者ID:serbra,项目名称:ru,代码行数:7,代码来源:default.py
示例3: _invalid_host
def _invalid_host(self, host):
try:
socket.gethostbyname(host)
except socket.gaierror:
self.logger.error("\t%s: Cannot resolve hostname" % host)
return True
return False
开发者ID:heyglen,项目名称:wmi-scanner,代码行数:7,代码来源:wmi_computer.py
示例4: hostname_resolves
def hostname_resolves(hostname):
"""Checks to see if hostname is DNS resolvable"""
try:
socket.gethostbyname(hostname)
return 1
except socket.error:
return 0
开发者ID:carriercomm,项目名称:go-5,代码行数:7,代码来源:go.py
示例5: set_ip
def set_ip(host_ip):
print ""
print ""
sys.stdout.write("SET IP STATIC ADRESS :"+host_ip[0]+" ")
logging.info( "SET IP ADRESS :"+host_ip[0])
get_ip=socket.gethostbyname(socket.gethostname())
if get_ip!=host_ip[0]:
os.system('netsh interface ipv4 set address name="Ethernet" source=static address='+host_ip[0]+' mask='+host_ip[1]+' gateway='+host_ip[2])
i=0
while get_ip != host_ip[0] :
ip_t=socket.gethostbyname(socket.gethostname())
if ip_t!='127.0.0.1':
get_ip=ip_t
if i==0:
sys.stdout.write("Processing ")
else:
sys.stdout.write(".")
time.sleep(1)
if i < 60:
i = i + 1
else:
sys.stdout.write( " WAIT "+str(i)+" SEC STOPPED, IP ADDRESS :"+get_ip)
break
sys.stdout.write("OK, SET STATIC IP ADRESS HOST :"+get_ip)
logging.info("OK, SET STATIC IP ADRESS HOST :"+get_ip)
else:
sys.stdout.write("IP :"+str(host_ip)+" established")
logging.error("IP :"+str(host_ip)+" established")
开发者ID:sav1978,项目名称:borey-test,代码行数:29,代码来源:set_ip_cam.py
示例6: check
def check(self):
# do check of host
try:
# requires name resolve facility
socket.gethostbyname(self.hostname)
except IOError:
raise BuilderError("{0} could not be resolved".format(self.hostname))
try:
# check_for_ans_error(res, self.hostname)
self.run_ansible_with_check("/bin/rpm -q mock rsync")
except AnsibleCallError:
raise BuilderError(msg="Build host `{0}` does not have mock or rsync installed"
.format(self.hostname))
# test for path existence for mockchain and chroot config for this chroot
try:
self.run_ansible_with_check("/usr/bin/test -f {0}".format(mockchain))
except AnsibleCallError:
raise BuilderError(msg="Build host `{}` missing mockchain binary `{}`"
.format(self.hostname, mockchain))
try:
self.run_ansible_with_check("/usr/bin/test -f /etc/mock/{}.cfg"
.format(self.job.chroot))
except AnsibleCallError:
raise BuilderError(msg="Build host `{}` missing mock config for chroot `{}`"
.format(self.hostname, self.job.chroot))
开发者ID:shenkeee,项目名称:copr,代码行数:28,代码来源:builder.py
示例7: handle_dns_req
def handle_dns_req(self, pkt):
if pkt.getlayer(DNS) and pkt[DNS].qd and not pkt[DNS].an:
# should this hostname be spoofed?
if 'hostnames' in self.parameters.keys():
if not pkt[DNS].qd.qname.rstrip('.') in self.parameters['hostnames'].split(','):
# return original ip address
# is this an ipv6 request?
if pkt[DNS].qd.qtype == IPV6_QUERY:
try:
valid_ip = getaddrinfo(pkt[DNS].qd.qname, None, AF_INET6, IPPROTO_IP, AI_CANONNAME)[0][4][0]
response = self.mkdnsresponse6(pkt[DNS], valid_ip)
p = IPv6(src=pkt[IPv6].dst, dst=pkt[IPv6].src)/UDP(sport=pkt[UDP].dport, dport=pkt[UDP].sport)/response
except:
p = None
# is this an ipv4 request?
if pkt[DNS].qd.qtype == IPV4_QUERY:
try:
valid_ip = gethostbyname(pkt[DNS].qd.qname)
response = self.mkdnsresponse4(pkt[DNS], valid_ip)
p = IPv6(src=pkt[IPv6].dst, dst=pkt[IPv6].src)/UDP(sport=pkt[UDP].dport, dport=pkt[UDP].sport)/response
except:
p = None
if p:
send(p, verbose=0)
return
# is it an ipv5 query?
if pkt[DNS].qd.qtype == IPV4_QUERY:
# should we spoof ipv4 querys as well?
if 'spoofip4' in self.parameters.keys():
# spoof!
spoof_ip = self.parameters['spoofip4']
self.log("%s is looking for %s. spoofing with %s.\n" % (pkt[IPv6].src, pkt[DNS].qd.qname, spoof_ip))
response = self.mkdnsresponse4(pkt[DNS], spoof_ip)
p = IPv6(src=pkt[IPv6].dst, dst=pkt[IPv6].src)/UDP(sport=pkt[UDP].dport, dport=pkt[UDP].sport)/response
else:
# get real ip address of IPv4 Query
try:
valid_ip = gethostbyname(pkt[DNS].qd.qname)
response = self.mkdnsresponse4(pkt[DNS], valid_ip)
p = IPv6(src=pkt[IPv6].dst, dst=pkt[IPv6].src)/UDP(sport=pkt[UDP].dport, dport=pkt[UDP].sport)/response
except:
p = None
else:
# it is an ipv6 query
if 'spoofip6' in self.parameters.keys():
spoof_ip = self.parameters['spoofip6']
else:
spoof_ip = pkt[IPv6].dst
self.log("%s is looking for %s. spoofing with %s.\n" % (pkt[IPv6].src, pkt[DNS].qd.qname, spoof_ip))
response = self.mkdnsresponse6(pkt[DNS], spoof_ip)
p = IPv6(src=pkt[IPv6].dst, dst=pkt[IPv6].src)/UDP(sport=pkt[UDP].dport, dport=pkt[UDP].sport)/response
if p:
send(p, verbose=0)
开发者ID:Crapworks,项目名称:deadbeef,代码行数:60,代码来源:fakedns.py
示例8: validate
def validate(config):
"""Validate beacon configuration."""
# Configuration for nginx_connections beacon should be a dict
if not isinstance(config, dict):
return False, ('Configuration for nginx_connections '
'beacon must be a dictionary.')
else:
# Verify configuration parameters
if 'protocol' in config:
if config['protocol'] not in VALID_PROTOCOLS:
return False, ('Protocol configuration for nginx_connections '
'should be in {}.'.format(VALID_PROTOCOLS))
if 'port' in config:
try:
port = int(config['port'])
except ValueError:
return False, ('Port configuration for nginx_connections '
'is not a number.')
else:
if port < 1 or port > 65535:
return False, ('Port configuration for nginx_connection '
'is not a valid port number.')
if 'host' in config:
try:
socket.gethostbyname(config['host'])
except socket.error:
return False, ('Host configuration for nginx_connection '
'cannot be resolved.')
return True, 'Valid beacon configuration.'
开发者ID:darioblanco,项目名称:saltstack-example,代码行数:29,代码来源:nginx_connections.py
示例9: main
def main():
stressHosts_by_host = ['sfeserv27','sfeserv03','gba-ubun810-amd64','gba-ubun810-i386','gba-mac-ppc','gba-fbsd63-i386','gba-fbsd72-i386']
stressHosts = []
for sh in range(1,16):
try:
stressHosts.append(socket.gethostbyname('stress%02d.splunk.com' % sh))
except socket.gaierror:
pass
# do the same for pre-defined hosts
for sh in stressHosts_by_host:
try:
stressHosts.append(socket.gethostbyname(sh))
except socket.gaierror:
pass
random.shuffle(stressHosts)
dns = "server sfeserv31.splunk.com\nzone es.splunk.com\nupdate delete stressclients.es.splunk.com. A\n"
IPs = ''
for IP in stressHosts:
IPs = "update add stressclients.es.splunk.com. 3600 A %s\n %s" % (IP,IPs)
dns = dns + IPs + "show\nsend\nEOF"
std_pipes = {}
cmd = ["nsupdate","-k","Kes.splunk.com.+157+51549.key"]
#try:
po = Popen(cmd, stdout=PIPE, stderr=PIPE, stdin=PIPE)
std_pipes = po.communicate(input=dns)
print std_pipes
开发者ID:ampledata,项目名称:scripts,代码行数:32,代码来源:stress.py
示例10: test_bulkadd
def test_bulkadd(self):
app_id = 'test_app'
bulk_add_request = taskqueue_service_pb.TaskQueueBulkAddRequest()
item = bulk_add_request.add_add_request()
item.set_app_id(app_id)
item.set_queue_name('default')
item.set_task_name('babaganoose')
item.set_eta_usec(0)
item.set_method(taskqueue_service_pb.TaskQueueAddRequest.GET)
item.set_mode(taskqueue_service_pb.TaskQueueMode.PUSH)
host = socket.gethostbyname(socket.gethostname())
item.set_url('http://' + host + ':64839/queues')
host = socket.gethostbyname(socket.gethostname())
req = urllib2.Request('http://' + host + ':64839')
api_request = remote_api_pb.Request()
api_request.set_method("BulkAdd")
api_request.set_service_name("taskqueue")
api_request.set_request(bulk_add_request.Encode())
remote_request = api_request.Encode()
req.add_header('Content-Length', str(len(remote_request)))
req.add_header('protocolbuffertype', 'Request')
req.add_header('appdata', app_id)
response = urllib2.urlopen(req, remote_request)
api_response = response.read()
api_response = remote_api_pb.Response(api_response)
bulk_add_response = taskqueue_service_pb.TaskQueueBulkAddResponse(api_response.response())
print bulk_add_response
self.assertEquals(response.getcode(), 200)
开发者ID:GavinHwa,项目名称:appscale,代码行数:28,代码来源:test_tq_add_task.py
示例11: test_bulkadd
def test_bulkadd(self):
app_id = "test_app"
bulk_add_request = taskqueue_service_pb.TaskQueueBulkAddRequest()
item = bulk_add_request.add_add_request()
item.set_app_id(app_id)
item.set_queue_name("default")
item.set_task_name("babaganoose")
item.set_method(taskqueue_service_pb.TaskQueueAddRequest.GET)
item.set_mode(taskqueue_service_pb.TaskQueueMode.PUSH)
item.set_eta_usec(5000000) # 5 seconds
host = socket.gethostbyname(socket.gethostname())
item.set_url("http://" + host + ":64839/queues")
host = socket.gethostbyname(socket.gethostname())
req = urllib2.Request("http://" + host + ":64839")
api_request = remote_api_pb.Request()
api_request.set_method("BulkAdd")
api_request.set_service_name("taskqueue")
api_request.set_request(bulk_add_request.Encode())
remote_request = api_request.Encode()
req.add_header("Content-Length", str(len(remote_request)))
req.add_header("protocolbuffertype", "Request")
req.add_header("appdata", app_id)
response = urllib2.urlopen(req, remote_request)
api_response = response.read()
api_response = remote_api_pb.Response(api_response)
bulk_add_response = taskqueue_service_pb.TaskQueueBulkAddResponse(api_response.response())
print bulk_add_response
self.assertEquals(response.getcode(), 200)
开发者ID:mackjoner,项目名称:appscale,代码行数:28,代码来源:test_tq_add_delayed_task.py
示例12: _authorize_client_ports
def _authorize_client_ports(self, client_cidrs=[]):
if not client_cidrs:
self.logger.debug("No client CIDRs specified, using local address.")
client_ip = url_get('http://checkip.amazonaws.com/').strip()
client_cidrs = ("%s/32" % client_ip,)
self.logger.debug("Client CIDRs: %s", client_cidrs)
namenode = self._get_namenode()
jobtracker = self._get_jobtracker()
for client_cidr in client_cidrs:
# Allow access to port 80 on namenode from client
self.cluster.authorize_role(self.NAMENODE, 80, 80, client_cidr)
# Allow access to jobtracker UI on master from client
# (so we can see when the cluster is ready)
self.cluster.authorize_role(self.JOBTRACKER, 50030, 50030, client_cidr)
# Allow access to namenode and jobtracker via public address from each other
namenode_ip = socket.gethostbyname(namenode.public_dns_name)
jobtracker_ip = socket.gethostbyname(jobtracker.public_dns_name)
self.cluster.authorize_role(self.NAMENODE, 8020, 8020, "%s/32" % namenode_ip)
self.cluster.authorize_role(self.NAMENODE, 8020, 8020, "%s/32" % jobtracker_ip)
self.cluster.authorize_role(self.JOBTRACKER, 8021, 8021, "%s/32" % namenode_ip)
self.cluster.authorize_role(self.JOBTRACKER, 8021, 8021,
"%s/32" % jobtracker_ip)
开发者ID:tzuryby,项目名称:PyStratus,代码行数:26,代码来源:service.py
示例13: getHostLocalAddress
def getHostLocalAddress(self,):
if platform == "darwin":
hostname = socket.gethostname() if '.local' in socket.gethostname() else socket.gethostname()+'.local' #OS X where hostname doesn't have the <.local>
return socket.gethostbyname(hostname)
else:
return socket.gethostbyname(socket.gethostname())
开发者ID:josuebrunel,项目名称:RaspIP,代码行数:7,代码来源:raspip.py
示例14: proc_excluded
def proc_excluded(self,excluded):
processed=set()
for item in excluded:
try:
test=item.split(".")
if (len(test)!=4):
try:
processed.add(socket.gethostbyname(item))
except:
pass
else:
try:
if (int(test[0])>0 and int(test[0])<256):
if (int(test[0])>0 and int(test[0])<256):
if (int(test[0])>0 and int(test[0])<256):
if (int(test[0])>0 and int(test[0])<256):
processed.add(item)
except:
processed.add(socket.gethostbyname(item))
except:
try:
processed.add(socket.gethostbyname(item))
except:
pass
return processed
开发者ID:ostree,项目名称:airpwn-ng,代码行数:26,代码来源:airpwn_ng.py
示例15: main
def main():
parser = OptionParser()
parser.add_option("-s", "--src", dest="src", type="string",
help="Source IP address", metavar="IP")
parser.add_option("-d", "--dst", dest="dst", type="string",
help="Destination IP address", metavar="IP")
options, args = parser.parse_args()
if options.dst == None:
parser.print_help()
sys.exit()
else:
dst_host = socket.gethostbyname(options.dst)
if options.src == None:
# Get the current Network Interface
src_host = socket.gethostbyname(socket.gethostname())
else:
src_host = options.src
print("[+] Local Machine: %s"%src_host)
print("[+] Remote Machine: %s"%dst_host)
data = "TEST!!"
print("[+] Data to inject: %s"%data)
# IP Header
ipobj = IP(src_host, dst_host)
# TCP Header
tcpobj = TCP(1234, 80)
response = send(ipobj, tcpobj, iface="eth0", retry=1, timeout=0.3)
if response:
ip = ipobj.unpack(response)
response = response[ip.ihl:]
tcp = tcpobj.unpack(response)
print "IP Header:", ip.list
print "TCP Header:", tcp.list
开发者ID:94883r,项目名称:RSPET,代码行数:33,代码来源:__init__.py
示例16: __init__
def __init__(self, args=[]):
# arguments vector
self.args = args
# start port and end port
self.start, self.stop = 1, 1024
# host name
self.host = ""
# check the arguments
if len(self.args) == 4:
self.host = self.args[1]
try:
self.start = int(self.args[2])
self.stop = int(self.args[3])
except ValueError:
usage()
return
if self.start > self.stop:
usage()
return
elif len(self.args) == 2:
self.host = self.args[1]
else:
usage()
return
try:
sk.gethostbyname(self.host)
except:
print "hostname '%s' unknown" % self.host
self.scan(self.host, self.start, self.stop)
开发者ID:firebitsbr,项目名称:d4rkc0de,代码行数:31,代码来源:pyscan.py
示例17: test_mbox
def test_mbox(analoguer1, analoguer2):
if hasattr(socket, 'setdefaulttimeout'):
socket.setdefaulttimeout(3)
total = 0
success = 0
ping = 0
exception_occur = True
while True:
total = total + 1
try:
socket.gethostbyname('www.baidu.com')
ping = ping + 1
if exception_occur:
analoguer1.close(); analoguer1.connect(); analoguer1.identify(); analoguer1.read(); analoguer1.subscribe(MBOX_NAME); analoguer1.read()
analoguer2.close(); analoguer2.connect(); analoguer2.identify(); analoguer2.read(); analoguer2.subscribe(MBOX_NAME); analoguer2.read()
data = analoguer1.get_randstr(30)
if total % 2 == 0:
analoguer1.publish(MBOX_NAME, data)
r = analoguer1.read()
assert(r['peers'] == 1)
r = analoguer2.read()
assert(r['body']['data'] == data)
else:
analoguer2.publish(MBOX_NAME, data)
r = analoguer2.read()
assert(r['peers'] == 1)
r = analoguer1.read()
assert(r['body']['data'] == data)
exception_occur = False
success = success + 1
except Exception, e:
exception_occur = True
print e
print 'total: %s, success: %s, ping: %s, percentage: %s%%' % (total, success, ping, int(success*10000/total)/100.0)
time.sleep(3)
开发者ID:mobedu,项目名称:script,代码行数:35,代码来源:mbox_tests.py
示例18: _analyze_ips
def _analyze_ips(self, ip_address_list, fuzzable_request):
'''
Search all IP addresses in Bing and determine if they have more than
one domain hosted on it. Store findings in KB.
'''
bing_wrapper = bing(self._uri_opener)
# This is the best way to search, one by one!
for ip_address in ip_address_list:
results = bing_wrapper.get_n_results('ip:' + ip_address,
self._result_limit)
results = [r.URL.base_url() for r in results]
results = list(set(results))
# not vuln by default
is_vulnerable = False
if len(results) > 1:
# We may have something...
is_vulnerable = True
if len(results) == 2:
# Maybe we have this case:
# [Mon 09 Jun 2008 01:08:26 PM ART] - http://216.244.147.14/
# [Mon 09 Jun 2008 01:08:26 PM ART] - http://www.business.com/
# Where www.business.com resolves to 216.244.147.14; so we don't really
# have more than one domain in the same server.
try:
res0 = socket.gethostbyname(results[0].get_domain())
res1 = socket.gethostbyname(results[1].get_domain())
except:
pass
else:
if res0 == res1:
is_vulnerable = False
if is_vulnerable:
desc = 'The web application under test seems to be in a shared' \
' hosting. This list of domains, and the domain of the ' \
' web application under test, all point to the same IP' \
' address (%s):\n' % ip_address
domain_list = kb.kb.raw_read(self, 'domains')
for url in results:
domain = url.get_domain()
desc += '- %s\n' % domain
domain_list.append(domain)
kb.kb.raw_write(self, 'domains', domain_list)
v = Vuln.from_fr('Shared hosting', desc, severity.MEDIUM, 1,
self.get_name(), fuzzable_request)
v['also_in_hosting'] = results
om.out.vulnerability(desc, severity=severity.MEDIUM)
kb.kb.append(self, 'shared_hosting', v)
开发者ID:Adastra-thw,项目名称:w3af,代码行数:60,代码来源:shared_hosting.py
示例19: no_test_userandaddressinit
def no_test_userandaddressinit(self):
server = 'http://demo.opencastproject.org'
user = 'matterhorn_system_account';
password = 'CHANGE_ME';
hostname = 'rubenruaHost'
address = '8.8.8.8'
workflow = 'mini-full'
workflow_parameters = 'uno:uno'
workflow_parameters_2 = 'uno:uno;dos:dos'
client = MHHTTPClient(server, user, password)
self.assertEqual(client.hostname, 'GC-' + socket.gethostname())
self.assertEqual(client.address, socket.gethostbyname(socket.gethostname()))
client = MHHTTPClient(server, user, password, hostname)
self.assertEqual(client.hostname, hostname)
self.assertEqual(client.address, socket.gethostbyname(socket.gethostname()))
client = MHHTTPClient(server, user, password, hostname, address)
self.assertEqual(client.hostname, hostname)
self.assertEqual(client.address, address)
client = MHHTTPClient(server, user, password, hostname, address)
self.assertEqual(client.workflow, 'full')
self.assertEqual(client.workflow_parameters, {'trimHold': 'true'})
client = MHHTTPClient(server, user, password, hostname, address, workflow, workflow_parameters)
self.assertEqual(client.workflow, 'mini-full')
self.assertEqual(client.workflow_parameters, {'uno': 'uno'})
client = MHHTTPClient(server, user, password, hostname, address, workflow, workflow_parameters_2)
self.assertEqual(client.workflow, 'mini-full')
self.assertEqual(client.workflow_parameters, {'uno': 'uno', 'dos': 'dos'})
开发者ID:hectorcanto,项目名称:Galicaster,代码行数:33,代码来源:mh_client.py
示例20: find_eap_in_state
def find_eap_in_state(self, state, check_if_resolvable_hostname = False):
rows = self.hawkular_api.get_hawkular_servers()
for row in rows:
self.web_session.logger.info("Product: {} Feed: {} State: {}".
format(row.get("Product Name"), row.get("Feed"), row.get("details").get("Server State")))
if not row.get("Product Name"):
self.web_session.logger.warning ("Product Name 'None'. Feed: {}.".format(row.get("Feed")))
elif (row.get("Product Name") == 'JBoss EAP' or 'wildfly' in row.get("Product Name").lower()) \
and row.get("Node Name") != 'master:server-*' \
and (state.lower() == "any" or row.get("details").get("Server State") == state.lower()) \
and "domain" not in row.get("Feed").lower():
if check_if_resolvable_hostname:
hostname = row.get("details").get("Hostname")
try:
socket.gethostbyname(hostname)
self.web_session.logger.debug("Found resolvable Hostname: {}".format(hostname))
return row
except:
self.web_session.logger.debug("Note a resolvable Hostname: {}".format(hostname))
else:
return row
return None
开发者ID:Hawkular-QE,项目名称:cf-ui,代码行数:26,代码来源:servers.py
注:本文中的socket.gethostbyname函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论