• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python socket.socket类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中socket.socket的典型用法代码示例。如果您正苦于以下问题:Python socket类的具体用法?Python socket怎么用?Python socket使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了socket类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: run_client

def run_client(server_address, server_port):
	# Ping a UDP pinger server running at the given address
	
	try:
		client_socket = Socket(socket.AF_INET, socket.SOCK_DGRAM)

		# Time out after 1 second
		client_socket.settimeout(1.0)

		print("Pinging", str(server_address), "on port", server_port)

		for i in range(10):
			client_socket.sendto("".encode(), (server_address, server_port))

			try:
				time_start = time.time()
				_, _ = client_socket.recvfrom(1024)
				time_stop = time.time()
			except socket.timeout:
				print("Packet lost")
			else:
				print("RTT(Round trip time): {:.3f}ms".format((time_stop - time_start) * 1000))
	finally:
		client_socket.close()

	return 0
开发者ID:shenaishiren,项目名称:computer-network-a-top-approach,代码行数:26,代码来源:ping.py


示例2: send_mail

def send_mail(sender_address, mail_server, receiver_address, message):
	try:
		client_socket = Socket(socket.AF_INET, socket.SOCK_STREAM)

		# set a 1 second timeout
		client_socket.settimeout(1.0)

		# connect to mail server
		client_socket.connect((mail_server, 25))

		def send(string):
			"""Helper function: fix newlines, encode and send a string"""
			final = string.replace("\n", "\r\n").encode("ascii")
			print "Sending " + final + "..."
			client_socket.send(final)

			return 0

		def recv_and_check(expected=250):
			"""Helper function: recive reply and check it's ok"""
			reply = client_socket.recv(2048)
			print "Got: ", reply

			code = int(reply.rstrip().split()[0])
			if code != expected:
				raise Exception(reply)

			return 0

		# get initial message from server
		recv_and_check(220)

		# send greeting
		send("HELO {}\n".format(sender_address.split("@")[1]))
		recv_and_check()

		# set sender address
		send("MAIL FROM: {}\n".format(sender_address))
		recv_and_check()

		# set receiver address
		send("RCPT TO: {}\n".format(receiver_address))
		recv_and_check()

		# prepare to send message
		send("DATA\n")
		recv_and_check(354)

		# send the message itself followed by terminator
		send("{}\n.\n".format(message))
		recv_and_check()

		send("QUIT\n")
		recv_and_check(221)

	finally:
		client_socket.close()


	return 0
开发者ID:shenaishiren,项目名称:computer-network-a-top-approach,代码行数:60,代码来源:mail-client.py


示例3: __demodulate

    def __demodulate(self, connection: socket.socket):
        connection.settimeout(0.1)
        time.sleep(self.TIMEOUT)

        total_data = []
        while True:
            try:
                data = connection.recv(65536)
                if data:
                    total_data.append(data)
                else:
                    break
            except socket.timeout:
                break

        if len(total_data) == 0:
            logger.error("Did not receive any data from socket.")

        arr = np.array(np.frombuffer(b"".join(total_data), dtype=np.complex64))
        signal = Signal("", "")
        signal._fulldata = arr
        pa = ProtocolAnalyzer(signal)
        pa.get_protocol_from_signal()
        return pa.plain_bits_str
开发者ID:jopohl,项目名称:urh,代码行数:24,代码来源:test_simulator.py


示例4: main

def main():
    register_builtin_interface()
    server = Socket()
    if len(sys.argv) > 1:
        server_port = int(sys.argv[1])
    else:
        server_port = DEFAULT_PORT
    print "Listening on port " + str(server_port)
    server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    server.bind(("", server_port))
    server.listen(50)
    
    Thread(target=process_event_queue).start()
    
    print "\nAutobus has successfully started up."
    
    try:
        while True:
            socket, address = server.accept()
            connection = Connection(socket, address)
            event_queue.put((connection.id, discard_args(connection.register)), block=True)
            connection.start()
    except KeyboardInterrupt:
        print "KeyboardInterrupt received, shutting down"
        event_queue.put((None, None), block=True)
        print "Event queue has been notified to shut down"
    except:
        print "Unexpected exception occurred in the main loop, shutting down. Stack trace:"
        print_exc()
        event_queue.put((None, None), block=True)
        print "Event queue has been notified to shut down"
    server.close()
开发者ID:giflw,项目名称:afn-tools,代码行数:32,代码来源:__init__.py


示例5: new

 def new(cls, host, port, logger=None):
     logger = logger or logging.getLogger(__name__)
     sock = Socket()
     try:
         sock.connect((host, port))
     except socket.error:
         return None
     return Connection(sock, logger)
开发者ID:mootron,项目名称:otp22logbot,代码行数:8,代码来源:connection.py


示例6: udp_writer

async def udp_writer(s: socket, oqueue: Queue) -> None:
    """Forward packets to the UDP socket."""

    while True:
        peer, data = await oqueue.get()
        try:
            s.sendto(data, peer)
        finally:
            oqueue.task_done()
开发者ID:AndreLouisCaron,项目名称:aiotk,代码行数:9,代码来源:_udp.py


示例7: prepare_socket_for_tls_handshake

    def prepare_socket_for_tls_handshake(self, sock: socket.socket) -> None:
        # Grab the banner
        if self.SHOULD_WAIT_FOR_SERVER_BANNER:
            sock.recv(2048)

        # Send Start TLS
        sock.send(self.START_TLS_CMD)
        if self.START_TLS_OK not in sock.recv(2048):
            raise StartTlsError(self.ERR_NO_STARTTLS)
开发者ID:nabla-c0d3,项目名称:sslyze,代码行数:9,代码来源:tls_wrapped_protocol_helpers.py


示例8: _recv

def _recv(sock: socket.socket, size: int) -> bytes:
    """Secondary function to receive a specified amount of data."""
    message = b''
    while len(message) < size:
        packet = sock.recv(size - len(message))
        if not packet:
            sock.close()
            raise OSError("Nothing else to read from socket")
        message += packet
    return message
开发者ID:skinnersBoxy,项目名称:crossword,代码行数:10,代码来源:wrapper.py


示例9: handlerequest

def handlerequest(clientsocket: socket.socket):
    conn = connecttodb()
    cursor = conn.cursor()
    tagid = bytes.decode(clientsocket.recv(3))
    query = 'select * from tagdata where id={0}'.format(tagid)
    cursor.execute(query)
    s=[]
    for (id, location, officehours, otherinfo, description) in cursor:
        s = [str(id)+','+location + ',', officehours + ',', otherinfo + ',', description]
        s.insert(0, str(len(''.join(s)))+',')
    clientsocket.send(bytes(''.join(s),encoding='utf8'))
    clientsocket.close()
开发者ID:savanakbari,项目名称:PhysicalWeb,代码行数:12,代码来源:requesthandler.py


示例10: _handle_connection

    def _handle_connection(self, connection: socket.socket, address: Any) -> None:
        if self.ssl_options is not None:
            assert ssl, "Python 2.6+ and OpenSSL required for SSL"
            try:
                connection = ssl_wrap_socket(
                    connection,
                    self.ssl_options,
                    server_side=True,
                    do_handshake_on_connect=False,
                )
            except ssl.SSLError as err:
                if err.args[0] == ssl.SSL_ERROR_EOF:
                    return connection.close()
                else:
                    raise
            except socket.error as err:
                # If the connection is closed immediately after it is created
                # (as in a port scan), we can get one of several errors.
                # wrap_socket makes an internal call to getpeername,
                # which may return either EINVAL (Mac OS X) or ENOTCONN
                # (Linux).  If it returns ENOTCONN, this error is
                # silently swallowed by the ssl module, so we need to
                # catch another error later on (AttributeError in
                # SSLIOStream._do_ssl_handshake).
                # To test this behavior, try nmap with the -sT flag.
                # https://github.com/tornadoweb/tornado/pull/750
                if errno_from_exception(err) in (errno.ECONNABORTED, errno.EINVAL):
                    return connection.close()
                else:
                    raise
        try:
            if self.ssl_options is not None:
                stream = SSLIOStream(
                    connection,
                    max_buffer_size=self.max_buffer_size,
                    read_chunk_size=self.read_chunk_size,
                )  # type: IOStream
            else:
                stream = IOStream(
                    connection,
                    max_buffer_size=self.max_buffer_size,
                    read_chunk_size=self.read_chunk_size,
                )

            future = self.handle_stream(stream, address)
            if future is not None:
                IOLoop.current().add_future(
                    gen.convert_yielded(future), lambda f: f.result()
                )
        except Exception:
            app_log.error("Error in connection callback", exc_info=True)
开发者ID:bdarnell,项目名称:tornado,代码行数:51,代码来源:tcpserver.py


示例11: __init__

 def __init__(self, runner: 'BaseRunner', sock: socket.socket, *,
              shutdown_timeout: float=60.0,
              ssl_context: Optional[SSLContext]=None,
              backlog: int=128) -> None:
     super().__init__(runner, shutdown_timeout=shutdown_timeout,
                      ssl_context=ssl_context, backlog=backlog)
     self._sock = sock
     scheme = 'https' if self._ssl_context else 'http'
     if hasattr(socket, 'AF_UNIX') and sock.family == socket.AF_UNIX:
         name = '{}://unix:{}:'.format(scheme, sock.getsockname())
     else:
         host, port = sock.getsockname()[:2]
         name = str(URL.build(scheme=scheme, host=host, port=port))
     self._name = name
开发者ID:KeepSafe,项目名称:aiohttp,代码行数:14,代码来源:web_runner.py


示例12: main

def main():
	parser = argparse.ArgumentParser()
	parser.add_argument('--port', '-p', default=8080, type=int,
	                    help='Port to use')
	args = parser.parse_args()

	try:
	    server_socket = Socket(socket.AF_INET, socket.SOCK_STREAM)
	    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
	    # server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, BUFFER_SIZE)

	    server_socket.bind(('', args.port))
	    server_socket.listen(1)

	    cache_dict = {}

	    print "Proxy server ready..."

	    while True:
	    	try:
	    		connection_socket = server_socket.accept()[0]

	    		t = Thread(target=handle_http, args=[cache_dict, connection_socket])
	    		t.setDaemon(1)
	    		t.start()
	    		t.join()
	    	except socket.error, e:
	    		print e
	    	finally:
	    		connection_socket.close()
开发者ID:shenaishiren,项目名称:computer-network-a-top-approach,代码行数:30,代码来源:web-proxy.py


示例13: workWithUser

def workWithUser(s : socket.socket):
    global parser
    global bUserConnect
    bUserConnect = True
    try:
        while 1:
            dataRecv = s.recv(1024).decode()
            if (dataRecv == ''):
                return
    except socket.error:
        return
    finally:
        bUserConnect = False
        s.close()
开发者ID:quanglys,项目名称:BPA,代码行数:14,代码来源:CoorNode.py


示例14: run_server

def run_server(server_port):
    """Run the UDP pinger server
    """

    # Create the server socket (to handle UDP requests using ipv4), make sure
    # it is always closed by using with statement.
    server_socket = Socket(socket.AF_INET, socket.SOCK_DGRAM)

    # The socket stays connected even after this script ends. So in order
    # to allow the immediate reuse of the socket (so that we can kill and
    # re-run the server while debugging) we set the following option. This
    # is potentially dangerous in real code: in rare cases you may get junk
    # data arriving at the socket.
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    # Set the server port
    server_socket.bind(("", server_port))

    # Start accepting ping requests
    print ("Ping server ready on port", server_port)
    while True:
        # Receive message and send one back
        _, client_address = server_socket.recvfrom(1024)
        server_socket.sendto("Pong".encode(), client_address)

    return 0
开发者ID:rodrigoncalves,项目名称:myhttpd,代码行数:26,代码来源:ping.py


示例15: listen_for_data

def listen_for_data(sock: socket.socket) -> None:
    """Make the socket listen for data forever."""
    host = 'localhost'
    port = 41401
    sock.bind((host, port))
    sock.listen(1)
    while True:
        print('Waiting...')
        conn, addr = sock.accept()
        print(f'Connection from {addr}')
        if addr[0] != '127.0.0.1':
            continue
        with conn:
            data = receive_all(conn)
            parse_data(data)
开发者ID:hyshka,项目名称:dotfiles,代码行数:15,代码来源:pyclip.py


示例16: addNewNode

def addNewNode(s : socket.socket, orderNode):
    global lockCount
    global lockLst, eps, numNode, k, evnValidate, nodeRecv

    try:
        #receive name of the node
        dataRecv = s.recv(1024).decode()
        addNetworkIn(len(dataRecv))
        try:
            if (dataRecv != ''):
                arg = parser.parse_args(dataRecv.lstrip().split(' '))
                nameNode = arg.name[0]
                if (numNode == 0):
                    numNode = arg.num_node[0]
                    for i in range(numNode):
                        nodeRecv.append(False)
        except socket.error as e:
            print('Error: ' + str(e))
            return

        lockLst.acquire()
        lstSock.append(s)
        lstName.append(nameNode)
        lockLst.release()

        if (orderNode == NUMBER_NODE):
            evnInitComplete.set()

    except socket.error:
        pass
开发者ID:quanglys,项目名称:BPA,代码行数:30,代码来源:CoorNode.py


示例17: connect

 def connect(self, attempts=1):
     """
     Connects to the Autobus server. The specified number of attempts will
     be made to re-establish a connection, each time waiting an amount of
     time that increments itself up to 20 seconds. If attempts is None, an
     infinite number of attempts will be made. Once a connection has been
     established, this method returns. If it fails to establish a
     connection, an exception will be raised.
     """
     if attempts == 0:
         attempts = None
     progress = 0
     delay = 0.1
     delay_increment = 1.5
     while (attempts is None or progress < attempts) and not self.is_shut_down:
         progress += 1
         delay *= delay_increment
         if delay > 20:
             delay = 20
         self.socket = Socket()
         try:
             self.socket.connect((self.host, self.port))
         except:
             sleep(delay)
             continue
         with self.on_connect_lock:
             self.connection_established()
         return
     raise Exception("Couldn't connect")
开发者ID:giflw,项目名称:afn-tools,代码行数:29,代码来源:__init__.py


示例18: main

def main():
    if len(sys.argv) <= 1:
        print "You need to specify the url of the application to display."
        return
    scheme, location = urlparse(sys.argv[1])[0:2]
    if scheme != "rtk":
        print ("Only rtk:// urls are supported for now. I might add an HTTP " "transport at some point in the future.")
    if ":" not in location:
        location += ":" + str(DEFAULT_PORT)
    host, port = location.split(":")
    port = int(port)
    socket = Socket()
    try:
        socket.connect((host, port))
    except SocketError, e:
        print "Error while connecting: " + str(e)
        return
开发者ID:hzmmzl,项目名称:afn,代码行数:17,代码来源:rtkinter.py


示例19: run_server

def run_server(server_port):
	# Run UDP pinger server

	try:
		server_socket = Socket(socket.AF_INET, socket.SOCK_DGRAM)
		server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

		server_socket.bind(('', server_port))

		print("Ping server ready on port", server_port)
		while True:
			_, client_address = server_socket.recvfrom(1024)
			server_socket.sendto("".encode(), client_address)
	finally:
		server_socket.close()

	return 0
开发者ID:shenaishiren,项目名称:computer-network-a-top-approach,代码行数:17,代码来源:ping.py


示例20: client_socket_thread

def client_socket_thread(address, client_socket: socket.socket,
                         message_queue: queue.Queue,
                         new_observer_queue_queue: queue.Queue,
                         delete_observer_queue_queue: queue.Queue):
    next_state = ''
    tries = 0
    if kicked == 1:
        kick = 0		#reset kicking status
        return
    else:        
        nickname = address[0] + ':' + str(address[1])		#give them basic nickname of IP+port
        userlist.append(nickname)
        socketlist.append(client_socket)		#add name to both socket list and user list
        observer_queue = queue.Queue()
        new_observer_queue_queue.put((client_socket, observer_queue))
        client_sender(nickname, client_socket, message_queue, delete_observer_queue_queue)
    client_socket.close()
开发者ID:tylercatlin,项目名称:ChatServer,代码行数:17,代码来源:NetFinalCheckpoint3.py



注:本文中的socket.socket类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python socket.__init__函数代码示例发布时间:2022-05-27
下一篇:
Python socket.write函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap