本文整理汇总了Python中stun.get_ip_info函数的典型用法代码示例。如果您正苦于以下问题:Python get_ip_info函数的具体用法?Python get_ip_info怎么用?Python get_ip_info使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_ip_info函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: get_nat_type
def get_nat_type():
parser = optparse.OptionParser(version=stun.__version__)
parser.add_option("-d", "--debug", dest="DEBUG", action="store_true",
default=False, help="Enable debug logging")
parser.add_option("-H", "--host", dest="stun_host", default=None,
help="STUN host to use")
parser.add_option("-P", "--host-port", dest="stun_port", type="int",
default=3478, help="STUN host port to use (default: "
"3478)")
parser.add_option("-i", "--interface", dest="source_ip", default="0.0.0.0",
help="network interface for client (default: 0.0.0.0)")
parser.add_option("-p", "--port", dest="source_port", type="int",
default=54320, help="port to listen on for client "
"(default: 54320)")
(options, args) = parser.parse_args()
if options.DEBUG:
stun.enable_logging()
kwargs = dict(source_ip=options.source_ip,
source_port=int(options.source_port),
stun_host=options.stun_host,
stun_port=options.stun_port)
nat_type, external_ip, external_port = stun.get_ip_info(**kwargs)
print "NAT Type:", nat_type
print "External IP:", external_ip
print "External Port:", external_port
return nat_type, external_ip, external_port
开发者ID:tiendung19872005,项目名称:webservice_djangorestfull,代码行数:26,代码来源:client.py
示例2: get_NAT_status
def get_NAT_status(stun_host=None):
"""
Given a server hostname, initiate a STUN request to it;
and return the response in the form of a dict.
"""
response = stun.get_ip_info(stun_host=stun_host, source_port=0)
return {'nat_type': response[0],
'external_ip': response[1],
'external_port': response[2]}
开发者ID:Cryptix23,项目名称:OpenBazaar,代码行数:9,代码来源:network_util.py
示例3: start_node
def start_node(self, port):
self.dynamic_ip = stun.get_ip_info()[1]
try:
self.loadState(OBNodeStrings.knode_pickle)
except IOError:
print "knode.p does not exist, network has not been saved."
print "%s:\t%s" % (self.dynamic_ip, self.node.port,)
开发者ID:cgsheeh,项目名称:SFWR3XA3_Redevelopment,代码行数:10,代码来源:Node.py
示例4: test_stun
def test_stun():
"""Get external IP address from stun, and test the connection capabilities"""
log.info("STUN - Testing connection...")
src_port=randint(1025, 65535)
stun = get_ip_info(source_port=src_port)
log.debug(stun)
port_mapping = PRESERVES_PORT if stun[2] == src_port else None
seq_stun = None
if port_mapping != PRESERVES_PORT:
"""Test for sequential port mapping"""
seq_stun = get_ip_info(source_port=src_port+1)
log.debug(seq_stun)
port_mapping = SQUENTIAL_PORT if stun[2] + 1 == seq_stun[2] else RANDOM_PORT
log.debug("STUN - Port allocation: "+port_strings[port_mapping])
seq_stun = seq_stun or None
ret = (stun, seq_stun), port_mapping, src_port
return ret
开发者ID:aircraft008,项目名称:punchVPN,代码行数:19,代码来源:punchVPN.py
示例5: run
def run(self):
self._log("Getting external IP info")
nat_type, external_ip, external_port = stun.get_ip_info()
#nat_type, external_ip, external_port = None, "0.0.0.0", 12345
self._log("Public addr: %s:%s" % (external_ip, external_port))
self._log("Connecting to DHT")
self.dht = DHT("0.0.0.0", 52525)
self.public_contact = (external_ip, external_port)
for peer in self.core.config.get("peers", self._default_peers):
self.dht.bootstrap(peer["host"], peer["port"])
开发者ID:shish,项目名称:chunker,代码行数:11,代码来源:dht.py
示例6: lback_default_focused_agent
def lback_default_focused_agent():
ip_info =stun.get_ip_info()
public_ip = ip_info[1]
local_ip = "127.0.0.1"
## prefer local
agents = lback_agents()
for agent in agents:
if local_ip==agent.host:
return agent.id
if public_ip==agent.host:
return agent.id
raise Exception("No default focused agent available")
开发者ID:nadirhamid,项目名称:lback,代码行数:12,代码来源:utils.py
示例7: update_loop
def update_loop(prefs, client):
while True:
nat_type, ip, port = get_ip_info()
for domain in prefs.get('domains'):
result = client.update_dns_record(domain['dns'], ip)
if result:
logger.info("Updated DNS Record")
else:
logger.info("DNS is already set.")
logger.info("Sleeping for %s seconds" % prefs.get('interval'))
sleep(prefs.get('interval'))
pass
开发者ID:nwswanson,项目名称:dnsminder,代码行数:12,代码来源:application.py
示例8: connect_to_server
def connect_to_server(self):
ch = raw_input("Choose how to setup a connection\n 1.m for private server\n 2.p for public server\n")
self.local_port = int(raw_input("enter port to listen: "))
if ch == 'p':
self.nat_type, self.my_ip, self.port = stun.get_ip_info('0.0.0.0', self.local_port)
if not self.my_ip:
log.warn('Stun request failed! switching to manual server')
else:
log.info('External address: %s:%s' % (self.my_ip, self.port))
self.listen_address = ('0.0.0.0', self.local_port)
log.info('Listening on address: %s:%s' % (self.listen_address[0], self.listen_address[1]))
self.linsocket.bind(self.listen_address)
开发者ID:yaronraz1,项目名称:linblast,代码行数:13,代码来源:ExternalServer.py
示例9: connectionSupport
def connectionSupport(self):
"""
This method provide reserve of external port on external router
or rewrite info about external port on Signal server.
"""
self.transport.write('[True, "something"]', ('10.0.16.24', 4444))
time.sleep(2)
if self.s_connect:
self.s_connect = False
self.connectionSupport()
else:
self.sendDatagram('[False, "Client 911", %s]' % str(stun.get_ip_info()[1:]))
self.connectionSupport()
开发者ID:noob-saibot,项目名称:p2p-behind-NATs,代码行数:13,代码来源:Mess_server.py
示例10: uiStunserver
def uiStunserver():
global PeerAddr
global PeerIp
global port
nat_type, Myip, port = stun.get_ip_info()
if Myip=="" or Myip=="none":
print "Stun request falied! switching to manual server"
uiMyserver()
else:
listenport=port
listenAddr=('0.0.0.0',listenport)
listensoc.bind(listenAddr)
print "your ip is: '"+Myip+ "' and your port is: '"+str(port)+"'"
开发者ID:yaronraz1,项目名称:linblast,代码行数:13,代码来源:linblast.py
示例11: ReceiverTask
def ReceiverTask(addr, port, bootstrap=None):
global t_recv_buffer
receiver = Receiver((addr, port))
addr, port = receiver.get_addr()
cache["addr"], cache["port"] = addr, port
port_map = [(port, "UDP"), (port + 1, "UDP")]
external_ip, lan_addr = do_UPNP_mapping(port_map)
#external_ip, lan_addr = "161.53.120.206", get_lan_addr()
if external_ip:
logging.info("[+] uPnP maps:{0}:{1} -> {2}:{3}".format(
lan_addr, port, external_ip, port))
cache["addr"], cache["port"] = lan_addr, port
#cache["addr"], cache["port"] = external_ip, port
else:
# Do STUN probe
logging.info("[-] external_ip unknown: sending STUN probe...")
import stun
nat_type, external_ip, external_port = stun.get_ip_info()
logging.info("[+] STUN probe: {0}:{1}, {2}".format(
external_ip, external_port, nat_type))
#cache["addr"], cache["port"] = external_ip, port #, external_port
# TODO: if nat_type == symmetric -> import turn
logging.info("cache['addr'] = {0}, cache['port'] = {1}".format(
cache["addr"], cache["port"]))
p = Process(target=tftp_server, args=(port + 1, "./tasks/"))
p.start()
if bootstrap:
yield NewTask(Bootstrap(bootstrap))
while True:
logging.info("Waiting for packets in ReceiverTask")
resp, addr, rpc_no, uuid = yield receiver.recv()
packet = {"resp": resp,
"addr": addr,
"rpc_no": rpc_no,
"uuid": uuid }
#logging.debug("Packet received {}".format(packet))
if not socket_observable.notify_one(packet):
logging.debug("Not observer for notify, sending response...")
sender = Sender(packet["addr"],
(cache["addr"], cache["port"]),
cache["uuid"])
yield sender.send_resp(packet["resp"], packet["rpc_no"])
开发者ID:jsam,项目名称:jollyroger,代码行数:50,代码来源:networkio.py
示例12: main
def main():
try:
parser = make_argument_parser()
args = parser.parse_args()
if args.mode == CLIENT:
nat_type, host, port = stun.get_ip_info()
reactor.listenUDP(0, Client(host, port, args.message))
reactor.run()
elif args.mode == SERVER:
reactor.listenUDP(0, Server())
reactor.run()
else:
parser.print_help()
except KeyboardInterrupt:
sys.exit()
开发者ID:eugena,项目名称:p2p,代码行数:15,代码来源:p2p.py
示例13: setup_external_ip
def setup_external_ip(self, hostname=None):
"""
Detects and change hostname if needed (not fully implemented so far)
:param hostname:the current hostname of the robot
"""
nat_type, current_external_ip, external_port = stun.get_ip_info()
log.info("Nat type %s, external IP %s, external port %s", nat_type, current_external_ip, external_port)
dns_registered_ip = dns.resolver.query(hostname)[0]
if str(current_external_ip) != str(dns_registered_ip):
log.info(
"DNS need change. Currently on DNS '%s', current external IP '%s'",
dns_registered_ip,
current_external_ip,
)
else:
log.info("DNS and current external IP ('%s') are the same, no DNS change needed", current_external_ip)
开发者ID:clouetb,项目名称:RobotCommandApp,代码行数:16,代码来源:port_forwarder.py
示例14: main
def main():
try:
options = make_argument_parser().parse_args()
if options.debug:
logging.basicConfig()
stun.log.setLevel(logging.DEBUG)
nat_type, external_ip, external_port = stun.get_ip_info(
source_ip=options.source_ip,
source_port=options.source_port,
stun_host=options.stun_host,
stun_port=options.stun_port
)
print('NAT Type:', nat_type)
print('External IP:', external_ip)
print('External Port:', external_port)
except KeyboardInterrupt:
sys.exit()
开发者ID:2mny,项目名称:mylar,代码行数:19,代码来源:cli.py
示例15: main
def main():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(10)
nat_type, external_ip, external_port = stun.get_ip_info()
other_client, other_client_nat = request_conn(s, addr2str((external_ip, external_port)))
connected = None
s.settimeout(2)
while True:
# discover procedure
while not connected:
for addr in [other_client, other_client_nat]:
print("Try to conn to addr: {}".format(addr))
s.sendto("discover", addr)
try:
data, addr = s.recvfrom(1024)
s.sendto("discover", addr)
except socket.timeout:
continue
connected = addr
print("Connected to: {}".format(addr))
s.sendto("hello i'm here", connected)
data, addr = s.recvfrom(1024)
print("Received from {}: {}".format(addr, data))
time.sleep(1)
开发者ID:pryg-skok,项目名称:py-msg-stun,代码行数:24,代码来源:client.py
示例16: Process
db.add("port_mappings", [(port, "UDP"), (port + 1, "UDP")])
try:
external_addr = u.external_addr()
except Exception, e:
logging.warning(
"Cannot fetch external address via uPnP: {0}".format(e))
except Exception, e:
logging.error("Cannot import UPNP class or port map is busy -> audit.")
## Do STUN probing - !
logging.info("Sending STUN probe...")
try:
import stun
nat_type, external_ip, external_port = stun.get_ip_info()
logging.info(
"""STUN probe results:\n
[+] NAT: {0}
[+] External IP: {1}:{2}
""".format(nat_type, external_ip, external_port))
db.add("nat_type", nat_type)
db.add("external_ip", external_ip)
db.add("external_port", port) ## audit for stun/turn port map
except Exception, e:
logging.error("STUN probing failed: {0}".format(e))
## start tftp receiver
p = Process(target=tftp_server, args=(port + 1, "./tasks"))
开发者ID:jsam,项目名称:jollyroger,代码行数:30,代码来源:managers.py
示例17: start_server
def start_server(keychain, first_startup=False):
# logging
logFile = logfile.LogFile.fromFullPath(DATA_FOLDER + "debug.log", rotateLength=15000000, maxRotatedFiles=1)
log.addObserver(FileLogObserver(logFile, level="debug").emit)
log.addObserver(FileLogObserver(level="debug").emit)
logger = Logger(system="Httpseed")
if os.path.isfile(DATA_FOLDER + 'keys.pickle'):
keys = pickle.load(open(DATA_FOLDER + "keys.pickle", "r"))
signing_key_hex = keys["signing_privkey"]
signing_key = nacl.signing.SigningKey(signing_key_hex, encoder=nacl.encoding.HexEncoder)
else:
signing_key = nacl.signing.SigningKey.generate()
keys = {
'signing_privkey': signing_key.encode(encoder=nacl.encoding.HexEncoder),
'signing_pubkey': signing_key.verify_key.encode(encoder=nacl.encoding.HexEncoder)
}
pickle.dump(keys, open(DATA_FOLDER + "keys.pickle", "wb"))
# Stun
port = 18467 if not TESTNET else 28467
logger.info("Finding NAT Type...")
response = stun.get_ip_info(stun_host="stun.l.google.com", source_port=port, stun_port=19302)
logger.info("%s on %s:%s" % (response[0], response[1], response[2]))
ip_address = response[1]
port = response[2]
# Start the kademlia server
this_node = Node(keychain.guid, ip_address, port,
keychain.verify_key.encode(), None, objects.FULL_CONE, False)
protocol = OpenBazaarProtocol(db, (ip_address, port), objects.FULL_CONE, testnet=TESTNET, relaying=True)
try:
kserver = Server.loadState('cache.pickle', ip_address, port, protocol, db, objects.FULL_CONE, None)
except Exception:
kserver = Server(this_node, db, keychain.signing_key)
kserver.protocol.connect_multiplexer(protocol)
protocol.register_processor(kserver.protocol)
kserver.saveStateRegularly('cache.pickle', 10)
reactor.listenUDP(port, protocol)
class WebResource(resource.Resource):
def __init__(self, kserver_r):
resource.Resource.__init__(self)
self.kserver = kserver_r
self.nodes = {}
for bucket in self.kserver.protocol.router.buckets:
for node in bucket.getNodes():
self.nodes[(node.ip, node.port)] = node
self.nodes[(this_node.ip, this_node.port)] = this_node
loopingCall = task.LoopingCall(self.crawl)
loopingCall.start(900, True)
def crawl(self):
def gather_results(result):
for proto in result:
n = objects.Node()
try:
n.ParseFromString(proto)
node = Node(n.guid, n.nodeAddress.ip, n.nodeAddress.port, n.signedPublicKey,
None if not n.HasField("relayAddress") else
(n.relayAddress.ip, n.relayAddress.port),
n.natType,
n.vendor)
self.nodes[(node.ip, node.port)] = node
except Exception:
pass
def start_crawl(results):
for node, result in results.items():
if not result[0]:
del self.nodes[(node.ip, node.port)]
node = Node(digest(random.getrandbits(255)))
nearest = self.kserver.protocol.router.findNeighbors(node)
spider = NodeSpiderCrawl(self.kserver.protocol, node, nearest, 100, 4)
spider.find().addCallback(gather_results)
ds = {}
for bucket in self.kserver.protocol.router.buckets:
for node in bucket.getNodes():
self.nodes[(node.ip, node.port)] = node
for node in self.nodes.values():
if node.id != this_node.id:
ds[node] = self.kserver.protocol.callPing(node)
deferredDict(ds).addCallback(start_crawl)
def getChild(self, child, request):
return self
def render_GET(self, request):
nodes = self.nodes.values()
shuffle(nodes)
logger.info("Received a request for nodes, responding...")
if "format" in request.args:
if request.args["format"][0] == "json":
json_list = []
if "type" in request.args and request.args["type"][0] == "vendors":
for node in nodes:
#.........这里部分代码省略.........
开发者ID:Dekryptor,项目名称:OpenBazaar-Server,代码行数:101,代码来源:httpseed.py
示例18: on_bootstrap_complete
from constants import DATA_FOLDER, DATABASE
from txjsonrpc.netstring import jsonrpc
from networkcli import RPCCalls
from market import network
from market.listeners import MessageListenerImpl, NotificationListenerImpl
from ws import WSFactory, WSProtocol
from autobahn.twisted.websocket import listenWS
# logging
logFile = logfile.LogFile.fromFullPath(DATA_FOLDER + "debug.log")
log.addObserver(log.FileLogObserver(logFile).emit)
log.startLogging(sys.stdout)
# stun
print "Finding NAT Type.."
response = stun.get_ip_info(stun_host='seed.openbazaar.org', stun_port=3478, source_port=0)
print "%s on %s:%s" % (response[0], response[1], response[2])
ip_address = response[1]
port = response[2]
# database
if not os.path.isfile(DATABASE):
create_database()
# key generation
keys = KeyChain()
def on_bootstrap_complete(resp):
mlistener = MessageListenerImpl(ws_factory)
mserver.get_messages(mlistener)
mserver.protocol.add_listener(mlistener)
开发者ID:Bitorious,项目名称:OpenBazaar-Server,代码行数:31,代码来源:openbazaard.py
示例19: run
def run(*args):
TESTNET = args[0]
SSL = args[5]
# database
db = Database(TESTNET)
# key generation
keys = KeyChain(db)
# logging
logFile = logfile.LogFile.fromFullPath(DATA_FOLDER + "debug.log", rotateLength=15000000, maxRotatedFiles=1)
log.addObserver(FileLogObserver(logFile, level=args[1]).emit)
log.addObserver(FileLogObserver(level=args[1]).emit)
logger = Logger(system="OpenBazaard")
# NAT traversal
port = args[2]
PortMapper().add_port_mapping(port, port, 'UDP')
logger.info("Finding NAT Type...")
while True:
try:
response = stun.get_ip_info(source_port=port)
break
except Exception:
pass
logger.info("%s on %s:%s" % (response[0], response[1], response[2]))
ip_address = response[1]
port = response[2]
# TODO: use TURN if symmetric NAT
def on_bootstrap_complete(resp):
logger.info("bootstrap complete, downloading outstanding messages...")
mserver.get_messages(mlistener)
# TODO: ping seed node to establish connection if not full cone NAT
# TODO: after bootstrap run through any pending contracts and see if the bitcoin address
# has been funded, if not listen on the address and start the 10 minute delete timer.
protocol = OpenBazaarProtocol((ip_address, port), response[0], testnet=TESTNET)
# kademlia
node = Node(keys.guid, ip_address, port, signed_pubkey=keys.guid_signed_pubkey, vendor=Profile(db).get().vendor)
storage = ForgetfulStorage() if TESTNET else PersistentStorage(db.DATABASE)
try:
kserver = Server.loadState(DATA_FOLDER + 'cache.pickle', ip_address, port, protocol, db,
on_bootstrap_complete, storage=storage)
except Exception:
kserver = Server(node, db, KSIZE, ALPHA, storage=storage)
kserver.protocol.connect_multiplexer(protocol)
kserver.bootstrap(kserver.querySeed(SEED)).addCallback(on_bootstrap_complete)
kserver.saveStateRegularly(DATA_FOLDER + 'cache.pickle', 10)
protocol.register_processor(kserver.protocol)
# market
mserver = network.Server(kserver, keys.signing_key, db)
mserver.protocol.connect_multiplexer(protocol)
protocol.register_processor(mserver.protocol)
reactor.listenUDP(port, protocol)
class OnlyIP(Site):
def __init__(self, resource, ip, timeout=60 * 60 * 1):
self.ip = ip
Site.__init__(self, resource, timeout=timeout)
def buildProtocol(self, addr):
if addr.host == self.ip:
return Site.buildProtocol(self, addr)
return None
# websockets api
if SSL:
ws_factory = WSFactory("wss://127.0.0.1:18466", mserver, kserver)
contextFactory = ChainedOpenSSLContextFactory(SSL_KEY, SSL_CERT)
ws_factory.protocol = WSProtocol
ws_factory.setProtocolOptions(allowHixie76=True)
listenWS(ws_factory, contextFactory)
else:
ws_factory = WSFactory("ws://127.0.0.1:18466", mserver, kserver)
ws_factory.protocol = WSProtocol
ws_factory.setProtocolOptions(allowHixie76=True)
listenWS(ws_factory)
webdir = File(".")
if args[4] != "127.0.0.1" and args[4] != "0.0.0.0":
ws_interface = "0.0.0.0"
web = OnlyIP(webdir, args[4])
else:
ws_interface = args[4]
web = Site(webdir)
reactor.listenTCP(18465, web, interface=ws_interface)
# rest api
api = OpenBazaarAPI(mserver, kserver, protocol)
#.........这里部分代码省略.........
开发者ID:HiroIshikawa,项目名称:OpenBazaar-Server,代码行数:101,代码来源:openbazaard.py
示例20: GUID
from guidutils.guid import GUID
from dht.network import Server
from dht.node import Node
from wireprotocol import OpenBazaarProtocol
from constants import DATA_FOLDER
from market import network
from txjsonrpc.netstring import jsonrpc
from networkcli import RPCCalls
# logging
logFile = logfile.LogFile.fromFullPath(DATA_FOLDER + "debug.log")
log.addObserver(log.FileLogObserver(logFile).emit)
log.startLogging(sys.stdout)
# stun
response = stun.get_ip_info(stun_host="stun.l.google.com", source_port=0, stun_port=19302)
ip_address = response[1]
port = response[2]
# key generation
if os.path.isfile(DATA_FOLDER + 'keys.pickle'):
keys = pickle.load(open(DATA_FOLDER + "keys.pickle", "r"))
g = keys["guid"]
else:
print "Generating GUID, stand by..."
g = GUID()
keys = {'guid': g}
pickle.dump(keys, open(DATA_FOLDER + "keys.pickle", "wb"))
protocol = OpenBazaarProtocol((ip_address, port))
开发者ID:Renelvon,项目名称:Network,代码行数:30,代码来源:openbazaard.py
注:本文中的stun.get_ip_info函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论