• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python socket.error函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中socket.error函数的典型用法代码示例。如果您正苦于以下问题:Python error函数的具体用法?Python error怎么用?Python error使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了error函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: connect

 def connect(self, host: bytes, port: int, connect_timeout: float = None) -> None:
     """
     Connect to host:port (with an optional connect timeout)
     and emit 'connect' when connected, or 'connect_error' in
     the case of an error.
     """
     self.host = host
     self.port = port
     self.on("fd_writable", self.handle_connect)
     # TODO: use socket.getaddrinfo(); needs to be non-blocking.
     try:
         err = self.sock.connect_ex((host, port))
     except socket.gaierror as why:
         self.handle_conn_error(socket.gaierror, why)
         return
     except socket.error as why:
         self.handle_conn_error(socket.error, why)
         return
     if err != errno.EINPROGRESS:
         self.handle_conn_error(socket.error, socket.error(err, os.strerror(err)))
         return
     if connect_timeout:
         self._timeout_ev = self._loop.schedule(
             connect_timeout,
             self.handle_conn_error,
             socket.error,
             socket.error(errno.ETIMEDOUT, os.strerror(errno.ETIMEDOUT)),
             True,
         )
开发者ID:mnot,项目名称:thor,代码行数:29,代码来源:tcp.py


示例2: GET

  def GET(self, path):
    try:
      default_timeout = socket.getdefaulttimeout()
      socket.setdefaulttimeout(self.timeout)
      try:
        self.connect()
        self.connection.request('GET', self.path + path)
        response = self.connection.getresponse()
      # If ssl error : may come from bad configuration
      except ssl.SSLError as exc:
        if exc.message == 'The read operation timed out':
          raise socket.error(str(exc) + self.error_message_timeout)
        raise ssl.SSLError(str(exc) + self.ssl_error_message_connect_fail)
      except socket.error as exc:
        if exc.message == 'timed out':
          raise socket.error(str(exc) + self.error_message_timeout)
        raise socket.error(self.error_message_connect_fail + str(exc))

      # check self.response.status and raise exception early
      if response.status == httplib.REQUEST_TIMEOUT:
        # resource is not ready
        raise ResourceNotReady(path)
      elif response.status == httplib.NOT_FOUND:
        raise NotFoundError(path)
      elif response.status == httplib.FORBIDDEN:
        raise Unauthorized(path)
      elif response.status != httplib.OK:
        message = parsed_error_message(response.status,
                                       response.read(),
                                       path)
        raise ServerError(message)
    finally:
      socket.setdefaulttimeout(default_timeout)

    return response.read()
开发者ID:JuRogn,项目名称:slapos.core,代码行数:35,代码来源:slap.py


示例3: inet_ntop

        def inet_ntop(address_family, packed_ip):
            """
            Convert a packed IP address (a string of some number of characters)
            to its standard, family-specific string representation (for
            example, ``'7.10.0.5`` or ``5aef:2b::8``). inet_ntop() is useful
            when a library or network protocol returns an object of type
            ``struct in_addr`` or ``struct in6_addr``.

            ===============  ============
            Argument         Description
            ===============  ============
            address_family   Supported values are ``socket.AF_INET`` and ``socket.AF_INET6``.
            packed_ip        The IP address to unpack.
            ===============  ============
            """
            if not address_family in (socket.AF_INET, socket.AF_INET6):
                raise socket.error(97, os.strerror(97))

            if address_family == socket.AF_INET:
                bytes = 17
            else:
                bytes = 47

            buf = create_string_buffer(bytes)

            result = _inet_ntop(address_family, packed_ip, buf, bytes)
            if not result:
                raise socket.error("unknown error calling inet_ntop")

            return buf.value
开发者ID:ixokai,项目名称:pants,代码行数:30,代码来源:dns.py


示例4: start_serving

    def start_serving(self, connection_handler, host=None, port=None, *,
                      family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
                      sock=None, backlog=100, ssl=None, reuse_address=None):
        """XXX"""
        if host is not None or port is not None:
            if sock is not None:
                raise ValueError(
                    'host/port and sock can not be specified at the same time')

            AF_INET6 = getattr(socket, 'AF_INET6', 0)
            if reuse_address is None:
                reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
            sockets = []
            if host == '':
                host = None

            infos = yield from self.getaddrinfo(
                host, port, family=family,
                type=socket.SOCK_STREAM, proto=0, flags=flags)
            if not infos:
                raise socket.error('getaddrinfo() returned empty list')

            completed = False
            try:
                for res in infos:
                    af, socktype, proto, canonname, sa = res
                    sock = socket.socket(af, socktype, proto)
                    sockets.append(sock)
                    if reuse_address:
                        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
                                        True)
                    # Disable IPv4/IPv6 dual stack support (enabled by
                    # default on Linux) which makes a single socket
                    # listen on both address families.
                    if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
                        sock.setsockopt(socket.IPPROTO_IPV6,
                                        socket.IPV6_V6ONLY,
                                        True)
                    try:
                        sock.bind(sa)
                    except socket.error as err:
                        raise socket.error(err.errno, 'error while attempting '
                                           'to bind on address %r: %s'
                                           % (sa, err.strerror.lower()))
                completed = True
            finally:
                if not completed:
                    for sock in sockets:
                        sock.close()
        else:
            if sock is None:
                raise ValueError(
                    'host and port was not specified and no sock specified')
            sockets = [sock]

        for sock in sockets:
            sock.listen(backlog)
            sock.setblocking(False)
            self._start_serving(connection_handler, sock, ssl)
        return sockets
开发者ID:sah,项目名称:tulip,代码行数:60,代码来源:base_events.py


示例5: inet_ntop

def inet_ntop(address_family, packed_ip, encoding="UTF-8"):
    addr = sockaddr()
    addr.sa_family = address_family
    addr_size = c_int(sizeof(addr))
    ip_string = create_string_buffer(128)
    ip_string_size = c_int(sizeof(addr))

    if address_family == socket.AF_INET:
        if len(packed_ip) != sizeof(addr.ipv4_addr):
            raise socket.error('packed IP wrong length for inet_ntop')
        memmove(addr.ipv4_addr, packed_ip, 4)
    elif address_family == socket.AF_INET6:
        if len(packed_ip) != sizeof(addr.ipv6_addr):
            raise socket.error('packed IP wrong length for inet_ntop')
        memmove(addr.ipv6_addr, packed_ip, 16)
    else:
        raise socket.error('unknown address family')

    if WSAAddressToStringA(byref(addr),
                           addr_size,
                           None,
                           ip_string,
                           byref(ip_string_size)) != 0:
        raise socket.error(FormatError())

    return (ip_string[:ip_string_size.value - 1]).decode(encoding)
开发者ID:PR3SID3NT3,项目名称:pydivert,代码行数:26,代码来源:winutils.py


示例6: bind

 def bind(self, address):
     _checkaddrpair(address, False)
     if self.__isbound():
         raise _socket.error('Socket is already bound')
     elif self.__isconnected():
         raise _socket.error("Socket is already connected, cannot be bound")
         
     if self.__conn.proto == _lightbluecommon.L2CAP:
         raise NotImplementedError("L2CAP server sockets not currently supported")
         
     if address[1] != 0:
         raise _socket.error("must bind to port 0, other ports not supported on Mac OS X")
     address = (address[0], _getavailableport(self.__conn.proto))
         
     # address must be either empty string or local device address
     if address[0] != "":
         try:
             import lightblue            
             localaddr = lightblue.gethostaddr()
         except:
             localaddr = None
         if localaddr is None or address[0] != localaddr:
             raise _socket.error(
                 errno.EADDRNOTAVAIL, os.strerror(errno.EADDRNOTAVAIL))            
             
     # is this port already in use?
     if address[1] in self._boundports[self.__conn.proto]:
         raise _socket.error(errno.EADDRINUSE, os.strerror(errno.EADDRINUSE))
 
     self._boundports[self.__conn.proto].add(address[1])
     self.__port = address[1]
开发者ID:LuoZijun,项目名称:pybluez,代码行数:31,代码来源:_bluetoothsockets.py


示例7: run

 def run(self):
     print "#{} connection started".format(self.identifier)
     try:
         while StoppableThread.stopped(self):
             inp = self.socket.recv(self.BufferSize)
             if inp == '':
                 raise socket.error("connection terminated")
             command, inp = inp[0],inp[1:]
             out = None
             if command == self.ListGamesCommand:
                 out = self.handleListGames(inp)
             elif command == self.ListMovesCommand:
                 out = self.handleListMoves(inp)
             elif command == self.SetGameCommand:
                 out = self.handleSetGame(inp)
             if out is None:
                 raise socket.error("got bad command {}".format(command+inp))
             self.socket.send(out)
     except socket.error as e:
         print "#{} Connection Error: {}".format(self.identifier,e)
         pass
     finally:
         self.socket.close()
         self.server.unManage(self,True)
     print "#{} connection ended".format(self.identifier)
开发者ID:bentheiii,项目名称:CheckMateServer,代码行数:25,代码来源:serverThreads.py


示例8: test_drain_nowait

    def test_drain_nowait(self):
        c = Connection(transport=Mock)
        c.drain_events = Mock()
        c.drain_events.side_effect = socket.timeout()

        c.more_to_read = True
        self.assertFalse(c.drain_nowait())
        self.assertFalse(c.more_to_read)

        c.drain_events.side_effect = socket.error()
        c.drain_events.side_effect.errno = errno.EAGAIN
        c.more_to_read = True
        self.assertFalse(c.drain_nowait())
        self.assertFalse(c.more_to_read)

        c.drain_events.side_effect = socket.error()
        c.drain_events.side_effect.errno = errno.EPERM
        with self.assertRaises(socket.error):
            c.drain_nowait()

        c.more_to_read = False
        c.drain_events = Mock()
        self.assertTrue(c.drain_nowait())
        c.drain_events.assert_called_with(timeout=0)
        self.assertTrue(c.more_to_read)
开发者ID:adityar7,项目名称:kombu,代码行数:25,代码来源:test_connection.py


示例9: get_socket

    def get_socket(self, request, addrinfo, nonblocking):
        # yikes
        family, socktype, sockaddr = addrinfo[0], addrinfo[1], addrinfo[2:]

        ret = socket.socket(family, socktype)

        is_ssl = request.host_url.scheme.lower() == 'https'
        if nonblocking:
            ret.setblocking(0)
        if is_ssl:
            ret = ssl.wrap_socket(ret)

        try:
            conn_res = ret.connect_ex(sockaddr)
        except socket.error as se:
            conn_res = se.args[0]

        if conn_res:
            if conn_res not in (errno.EISCONN, errno.EWOULDBLOCK,
                                errno.EINPROGRESS, errno.EALREADY):
                socket.error('Unknown', conn_res)

        # djb points out that some socket error conditions are only
        # visible with this 'one weird old trick'
        err = ret.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
        if err:
            raise socket.error('Unknown', err)

        return ret
开发者ID:mahmoud,项目名称:hematite,代码行数:29,代码来源:client.py


示例10: InetPtoN

def InetPtoN(protocol, addr_string):
  """Convert ipv6 string to packed bytes.

  Args:
    protocol: socket.AF_INET or socket.AF_INET6
    addr_string: IPv6 address string
  Returns:
    bytestring representing address
  Raises:
    socket.error: on bad IPv6 address format
  """
  if protocol == socket.AF_INET:
    return socket.inet_aton(addr_string)

  if protocol != socket.AF_INET6:
    raise socket.error("Unsupported protocol")

  if not addr_string:
    raise socket.error("Empty address string")

  if BAD_SINGLE_COLON.match(addr_string):
    raise socket.error("Start or ends with single colon")

  if addr_string == "::":
    return ("0" * 32).decode("hex_codec")

  addr_string = _RemoveV4Ending(addr_string)
  addr_string = _StripLeadingOrTrailingDoubleColons(addr_string)
  addr_string = _ZeroPad(addr_string)

  try:
    return addr_string.decode("hex_codec")
  except TypeError:
    raise socket.error("Error decoding: %s" % addr_string)
开发者ID:bhyvex,项目名称:grr,代码行数:34,代码来源:ipv6_utils.py


示例11: _ZeroPad

def _ZeroPad(addr_string):
  """Pad out zeros in each address chunk as necessary."""
  chunks = addr_string.split(":")
  total_length = len(chunks)
  if total_length > 8:
    raise socket.error(
        "Too many address chunks in %s, expected 8" % addr_string)

  double_colon = False
  addr_array = []
  for chunk in chunks:
    if chunk:
      chunk_len = len(chunk)
      if chunk_len > 4:
        raise socket.error("Chunk must be length 4: %s" % addr_string)
      if chunk_len != 4:
        # Pad out with 0's until we have 4 digits
        chunk = "0" * (4 - chunk_len) + chunk
      addr_array.append(chunk)
    else:
      if double_colon:
        raise socket.error("More than one double colon in %s" % addr_string)
      else:
        double_colon = True
        # Add zeros for the compressed chunks
        addr_array.extend(["0000"] * (8 - total_length + 1))

  if len(addr_array) != 8:
    raise socket.error("Bad address length, expected 8 chunks: %s" % addr_array)

  return "".join(addr_array)
开发者ID:bhyvex,项目名称:grr,代码行数:31,代码来源:ipv6_utils.py


示例12: callback

	def callback(self, inputs, outputs, errors):
		try:
			for s in inputs:
				if s == self.server:
					try:
						conn, addr = self.server.accept()
					except socket.error as e:
						if e[0] == 24: # ulimit maxfiles, need to raise ulimit
							self._root.console_write('Maximum files reached, refused new connection.')
						else:
							raise socket.error(e)
					client = Client.Client(self._root, conn, addr, self._root.session_id)
					self.addClient(client)
				else:
					try:
						data = s.recv(4096)
						if data:
							if s in self.socketmap: # for threading, just need to pass this to a worker thread... remember to fix the problem for any calls to handler, and fix msg ids (handler.thread)
									self.socketmap[s].Handle(data)
							else:
								self._root.console_write('Problem, sockets are not being cleaned up properly.')
						else:
							raise socket.error('Connection closed.')
					except socket.error:
						self.removeSocket(s)
			
			for s in outputs:
				try:
					self.socketmap[s].FlushBuffer()
				except KeyError:
					self.removeSocket(s)
				except socket.error:
					self.removeSocket(s)
		except: self._root.error(traceback.format_exc())
开发者ID:Anarchid,项目名称:uberserver,代码行数:34,代码来源:Dispatcher.py


示例13: write_to_socket

    def write_to_socket(self, frame_data):
        """Write data to the socket.

        :param str frame_data:
        :return:
        """
        self._lock.acquire()
        try:
            total_bytes_written = 0
            bytes_to_send = len(frame_data)
            while total_bytes_written < bytes_to_send:
                try:
                    if not self.socket:
                        raise socket.error('connection/socket error')
                    bytes_written = \
                        self.socket.send(frame_data[total_bytes_written:])
                    if bytes_written == 0:
                        raise socket.error('connection/socket error')
                    total_bytes_written += bytes_written
                except socket.timeout:
                    pass
                except socket.error as why:
                    if why.args[0] in (EWOULDBLOCK, EAGAIN):
                        continue
                    self._exceptions.append(AMQPConnectionError(why))
                    return
        finally:
            self._lock.release()
开发者ID:fake-name,项目名称:wlnupdates,代码行数:28,代码来源:io.py


示例14: _read_response

    def _read_response(self):
        '''
        Read response from the transport (socket)

        :return: tuple of the form (header, body)
        :rtype: tuple of two byte arrays
        '''
        # Read response header
        buff_header = ctypes.create_string_buffer(12)
        nbytes = self._socket.recv_into(buff_header, 12)

        # Immediately raises an exception if the data cannot be read
        if nbytes != 12:
            raise socket.error(socket.errno.ECONNABORTED, "Software caused connection abort")

        # Extract body lenght from header
        body_length = struct_L.unpack(buff_header[4:8])[0]

        # Unpack body if it is not empty (i.e. not PING)
        if body_length != 0:
            buff_body = ctypes.create_string_buffer(body_length)
            nbytes = self._socket.recv_into(buff_body)
            # Immediately raises an exception if the data cannot be read
            if nbytes != body_length:
                raise socket.error(socket.errno.ECONNABORTED, "Software caused connection abort")
        else:
            buff_body = b""

        return buff_header, buff_body
开发者ID:Volshebnyi,项目名称:tarantool-python,代码行数:29,代码来源:connection.py


示例15: read_message

def read_message(f):
    """Read a packet
    
    return msg received, if error happens, raise socket.error
    """
    # Read two bytes: message version and length
    data = f.recv(8)
    if not data:
        raise socket.error('socket recv error')
    ver, length = unpack('!ii', data)

    # Read message body
    data = f.recv(length)
    if not data:
        raise socket.error('socket recv error')
    msg = OODict(cPickle.loads(data))
    if 'payload_length' not in msg: # Simple message
        return msg
    
    # Read payload
    done_len = 0
    payload = ''
    while True:
        data = f.recv(BUFFER_SIZE)
        if not data:
            raise socket.error('socket recv error')
        done_len += len(data)
        payload += data
        if done_len >= msg.payload_length:
            break
    
    debug('protocol.read_message: payload-length %d get %d', msg.payload_length, len(payload))
    msg.payload = payload
    del msg['payload_length']
    return msg
开发者ID:nkchenz,项目名称:ideerfs,代码行数:35,代码来源:protocol.py


示例16: find_and_bind

    def find_and_bind(self, first_try, minport, maxport, bind = '', reuse = False, ipv6_socket_style = 1, randomizer = False):
        e = 'maxport less than minport - no ports to check'
        if minport == 0 and maxport == 0:
            portrange = range(1)
        elif maxport - minport < 50 or not randomizer:
            portrange = range(minport, maxport + 1)
            if randomizer:
                shuffle(portrange)
                portrange = portrange[:20]
        else:
            portrange = []
            while len(portrange) < 20:
                listen_port = randrange(minport, maxport + 1)
                if listen_port not in portrange:
                    portrange.append(listen_port)

        if first_try != 0:
            try:
                self.bind(first_try, bind, reuse=reuse, ipv6_socket_style=ipv6_socket_style)
                return first_try
            except socket.error as e:
                pass

        for listen_port in portrange:
            try:
                interfaces = self.bind(listen_port, bind, reuse=reuse, ipv6_socket_style=ipv6_socket_style)
                if len(interfaces) == 0:
                    raise socket.error('failed to bind on port')
                host, listen_port = interfaces[0]
                return listen_port
            except socket.error as e:
                raise

        raise socket.error(str(e))
开发者ID:2jangelman,项目名称:P2P-Streams-XBMC--Modules-,代码行数:34,代码来源:SocketHandler.py


示例17: inet_ntop

    def inet_ntop(address_family, packed_ip):
        if address_family == socket.AF_INET:
            return socket.inet_ntoa(packed_ip)

        addr = sockaddr()
        addr.sa_family = address_family
        addr_size = ctypes.c_int(ctypes.sizeof(addr))
        ip_string = ctypes.create_string_buffer(128)
        ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string))

        if address_family == socket.AF_INET6:
            if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr):
                raise socket.error('packed IP wrong length for inet_ntoa')
            ctypes.memmove(addr.ipv6_addr, packed_ip, 16)
        else:
            raise socket.error('unknown address family')

        if WSAAddressToStringA(
                ctypes.byref(addr),
                addr_size,
                None,
                ip_string,
                ctypes.byref(ip_string_size)
        ) != 0:
            raise socket.error(ctypes.FormatError())

        return ip_string[:ip_string_size.value - 1]
开发者ID:datastax,项目名称:python-driver,代码行数:27,代码来源:util.py


示例18: send

    def send(self, buf):
        self.ssl_want = None

        if not self.ssl:
            return super(AsyncHttpSocket, self).send(buf)

        r = None
        if not self.lastbuffer:
            try:
                r = self.socket.send(buf)
            except ssl.SSLError, e:
                if e.args[0] == ssl.SSL_ERROR_WANT_WRITE:
                    log.warning("write_want_write")
                    self.ssl_want = "write"
                    self.lastbuffer = buf  # -1: store the bytes for later
                    return len(buf)  # consume from asyncore
                elif e.args[0] == ssl.SSL_ERROR_WANT_READ:
                    log.warning("write_want_read")
                    self.ssl_want = "read"
                    return 0
                else:
                    raise socket.error(e, r)
            else:
                if r < 0:
                    raise socket.error("unknown -1 for ssl send", r)
                return r
开发者ID:Esteban-Rocha,项目名称:digsby,代码行数:26,代码来源:connection.py


示例19: read_response

    def read_response(self):
        if not self._reader:
            raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)

        # _next_response might be cached from a can_read() call
        if self._next_response is not False:
            response = self._next_response
            self._next_response = False
            return response

        response = self._reader.gets()
        socket_read_size = self.socket_read_size
        while response is False:
            try:
                if HIREDIS_USE_BYTE_BUFFER:
                    bufflen = self._sock.recv_into(self._buffer)
                    if bufflen == 0:
                        raise socket.error(SERVER_CLOSED_CONNECTION_ERROR)
                else:
                    buffer = self._sock.recv(socket_read_size)
                    # an empty string indicates the server shutdown the socket
                    if not isinstance(buffer, bytes) or len(buffer) == 0:
                        raise socket.error(SERVER_CLOSED_CONNECTION_ERROR)
            except socket.timeout:
                raise TimeoutError("Timeout reading from socket")
            except socket.error:
                e = sys.exc_info()[1]
                raise ConnectionError("Error while reading from socket: %s" %
                                      (e.args,))
            if HIREDIS_USE_BYTE_BUFFER:
                self._reader.feed(self._buffer, 0, bufflen)
            else:
                self._reader.feed(buffer)
            # proactively, but not conclusively, check if more data is in the
            # buffer. if the data received doesn't end with \r\n, there's more.
            if HIREDIS_USE_BYTE_BUFFER:
                if bufflen > 2 and \
                        self._buffer[bufflen - 2:bufflen] != SYM_CRLF:
                    continue
            else:
                if not buffer.endswith(SYM_CRLF):
                    continue
            response = self._reader.gets()
        # if an older version of hiredis is installed, we need to attempt
        # to convert ResponseErrors to their appropriate types.
        if not HIREDIS_SUPPORTS_CALLABLE_ERRORS:
            if isinstance(response, ResponseError):
                response = self.parse_error(response.args[0])
            elif isinstance(response, list) and response and \
                    isinstance(response[0], ResponseError):
                response[0] = self.parse_error(response[0].args[0])
        # if the response is a ConnectionError or the response is a list and
        # the first item is a ConnectionError, raise it as something bad
        # happened
        if isinstance(response, ConnectionError):
            raise response
        elif isinstance(response, list) and response and \
                isinstance(response[0], ConnectionError):
            raise response[0]
        return response
开发者ID:ContextLogic,项目名称:redis-py,代码行数:60,代码来源:connection.py


示例20: _tunnel

    def _tunnel(self):
        self.send("CONNECT %s:%d HTTP/1.0\r\n" % (self._tunnel_host,
            self._tunnel_port))
        for header, value in self._tunnel_headers.iteritems():
            self.send("%s: %s\r\n" % (header, value))
        self.send("\r\n")
        response = self.response_class(self.sock, strict = self.strict,
                                       method = self._method)
        (version, code, message) = response._read_status()

        if version == "HTTP/0.9":
            # HTTP/0.9 doesn't support the CONNECT verb, so if httplib has
            # concluded HTTP/0.9 is being used something has gone wrong.
            self.close()
            raise socket.error("Invalid response from tunnel request")
        if code != 200:
            self.close()
            raise socket.error("Tunnel connection failed: %d %s" % (code,
                                                                    message.strip()))
        while True:
            line = response.fp.readline(_MAXLINE + 1)
            if len(line) > _MAXLINE:
                raise LineTooLong("header line")
            if not line:
                # for sites which EOF without sending trailer
                break
            if line == '\r\n':
                break
开发者ID:03013405yujiangfeng,项目名称:XX-Net,代码行数:28,代码来源:httplib.py



注:本文中的socket.error函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python socket.f函数代码示例发布时间:2022-05-27
下一篇:
Python socket.create_connection函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap