本文整理汇总了Python中socket.socket_error函数的典型用法代码示例。如果您正苦于以下问题:Python socket_error函数的具体用法?Python socket_error怎么用?Python socket_error使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了socket_error函数的11个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_egain_on_buffer_size
def test_egain_on_buffer_size(self, *args):
# get a connection that's already fully started
c = self.test_successful_connection()
header = six.b('\x00\x00\x00\x00') + int32_pack(20000)
responses = [
header + (six.b('a') * (4096 - len(header))),
six.b('a') * 4096,
socket_error(errno.EAGAIN),
six.b('a') * 100,
socket_error(errno.EAGAIN)]
def side_effect(*args):
response = responses.pop(0)
if isinstance(response, socket_error):
raise response
else:
return response
c._socket.recv.side_effect = side_effect
c.handle_read(None, 0)
self.assertEqual(c._total_reqd_bytes, 20000 + len(header))
# the EAGAIN prevents it from reading the last 100 bytes
c._iobuf.seek(0, os.SEEK_END)
pos = c._iobuf.tell()
self.assertEqual(pos, 4096 + 4096)
# now tell it to read the last 100 bytes
c.handle_read(None, 0)
c._iobuf.seek(0, os.SEEK_END)
pos = c._iobuf.tell()
self.assertEqual(pos, 4096 + 4096 + 100)
开发者ID:mike-tr-adamson,项目名称:python-driver,代码行数:32,代码来源:test_libevreactor.py
示例2: get_listening_udp_socket
def get_listening_udp_socket(ip, port, retry=30, style=None):
"""Returns a bound socket.socket for accepting UDP datagrams.
The socket will be bound to the given ip and tcp port with other
optional parameters.
:param ip: The ip address to listen on. ``''`` and ``'*'`` are
translated to ``'0.0.0.0'`` which will listen on all configured
addresses.
:param port: The udp port to listen on.
:param retry: The number seconds to keep trying to bind the socket,
in case there's another process bound but exiting soon. This
allows near zero-downtime process handoffs as you start the new
one and kill the old.
:param style: The libraries you'd like to use in creating the
socket. The default will use the standard Python libraries.
``'Eventlet'`` is recognized and will use the Eventlet
libraries. Other styles may added in the future.
"""
if not style:
from socket import AF_INET, AF_INET6, AF_UNSPEC, \
error as socket_error, getaddrinfo, socket, SOCK_DGRAM, \
SOL_SOCKET, SO_REUSEADDR
from time import sleep
elif style.lower() == 'eventlet':
from eventlet.green.socket import AF_INET, AF_INET6, AF_UNSPEC, \
error as socket_error, getaddrinfo, socket, SOCK_DGRAM, \
SOL_SOCKET, SO_REUSEADDR
from eventlet import sleep
else:
from socket import error as socket_error
raise socket_error('Socket style %r not understood.' % style)
if not ip or ip == '*':
ip = '0.0.0.0'
family = None
for a in getaddrinfo(ip, port, AF_UNSPEC, SOCK_DGRAM):
if a[0] in (AF_INET, AF_INET6):
family = a[0]
break
if not family:
raise socket_error(
'Could not determine address family of %s:%s for binding.' %
(ip, port))
good_sock = None
retry_until = time() + retry
while not good_sock and time() < retry_until:
try:
sock = socket(family, SOCK_DGRAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind((ip, port))
good_sock = sock
except socket_error as err:
if err.errno != EADDRINUSE:
raise
sleep(0.1)
if not good_sock:
raise socket_error(
'Could not bind to %s:%s after trying for %s seconds.' %
(ip, port, retry))
return good_sock
开发者ID:gholt,项目名称:python-brim,代码行数:60,代码来源:service.py
示例3: test_eagain_on_buffer_size
def test_eagain_on_buffer_size(self):
c = self.test_successful_connection()
header = six.b('\x00\x00\x00\x00') + int32_pack(20000)
responses = [
header + (six.b('a') * (4096 - len(header))),
six.b('a') * 4096,
socket_error(errno.EAGAIN),
six.b('a') * 100,
socket_error(errno.EAGAIN)]
def side_effect(*args):
response = responses.pop(0)
if isinstance(response, socket_error):
raise response
else:
return response
self.get_socket(c).recv.side_effect = side_effect
c.handle_read(*self.null_handle_function_args)
self.assertEqual(c._current_frame.end_pos, 20000 + len(header))
# the EAGAIN prevents it from reading the last 100 bytes
c._iobuf.seek(0, os.SEEK_END)
pos = c._iobuf.tell()
self.assertEqual(pos, 4096 + 4096)
# now tell it to read the last 100 bytes
c.handle_read(*self.null_handle_function_args)
c._iobuf.seek(0, os.SEEK_END)
pos = c._iobuf.tell()
self.assertEqual(pos, 4096 + 4096 + 100)
开发者ID:joaquincasares,项目名称:python-driver,代码行数:31,代码来源:utils.py
示例4: test_socket_error_connection_refused
def test_socket_error_connection_refused(self):
# Test
e = socket_error(111)
code = self.exception_handler.handle_socket_error(e)
# Verify
self.assertEqual(code, handler.CODE_SOCKET_ERROR)
self.assertTrue('refused' in self.recorder.lines[0])
self.assertEqual([TAG_FAILURE], self.prompt.get_write_tags())
开发者ID:CUXIDUMDUM,项目名称:pulp,代码行数:9,代码来源:test_exceptions.py
示例5: test_socket_error_on_write
def test_socket_error_on_write(self, *args):
c = self.make_connection()
# make the OptionsMessage write fail
c._socket.send.side_effect = socket_error(errno.EIO, "bad stuff!")
c.handle_write(None, 0)
# make sure it errored correctly
self.assertTrue(c.is_defunct)
self.assertIsInstance(c.last_error, socket_error)
self.assertTrue(c.connected_event.is_set())
开发者ID:mike-tr-adamson,项目名称:python-driver,代码行数:11,代码来源:test_libevreactor.py
示例6: test_socket_error_on_read
def test_socket_error_on_read(self, *args):
c = self.make_connection()
# let it write the OptionsMessage
c.handle_write(None, 0)
# read in a SupportedMessage response
c._socket.recv.side_effect = socket_error(errno.EIO, "busy socket")
c.handle_read(None, 0)
# make sure it errored correctly
self.assertTrue(c.is_defunct)
self.assertIsInstance(c.last_error, socket_error)
self.assertTrue(c.connected_event.is_set())
开发者ID:mike-tr-adamson,项目名称:python-driver,代码行数:14,代码来源:test_libevreactor.py
示例7: test_blocking_on_write
def test_blocking_on_write(self, *args):
c = self.make_connection()
# make the OptionsMessage write block
c._socket.send.side_effect = socket_error(errno.EAGAIN, "socket busy")
c.handle_write(None, 0)
self.assertFalse(c.is_defunct)
# try again with normal behavior
c._socket.send.side_effect = lambda x: len(x)
c.handle_write(None, 0)
self.assertFalse(c.is_defunct)
self.assertTrue(c._socket.send.call_args is not None)
开发者ID:mike-tr-adamson,项目名称:python-driver,代码行数:14,代码来源:test_libevreactor.py
示例8: receiveData
def receiveData(socket):
# first we get a string that says how long the serialized string is
length = socket.recv(BODY_SIZE_STRING_SIZE)
if length == '':
raise socket_error("")
length = int(length)
# If we have received the first part of the data, then we need to get all of it
# So we will wait for it to all come in
timeout = socket.gettimeout()
socket.settimeout(None)
# We receive and convert a serialized object string to an actual data object
data_string = socket.recv(length)
# Return the socket to it's previous blocking state
socket.settimeout(timeout)
return pickle.loads(data_string)
开发者ID:MarsRobotics,项目名称:Experiments,代码行数:20,代码来源:DataTransferProtocol.py
示例9: test_handle_exception
def test_handle_exception(self):
"""
Tests the high level call that branches based on exception type for all types.
"""
# For each exception type, check that the proper code is returned and
# that a failure message has been output. For simplicity in those tests,
# reset the tags after each run.
code = self.exception_handler.handle_exception(exceptions.BadRequestException({}))
self.assertEqual(code, handler.CODE_BAD_REQUEST)
self.assertEqual(3, len(self.prompt.tags))
self.assertEqual(TAG_FAILURE, self.prompt.get_write_tags()[0])
self.prompt.tags = []
code = self.exception_handler.handle_exception(exceptions.ConflictException({}))
self.assertEqual(code, handler.CODE_CONFLICT)
self.assertEqual(1, len(self.prompt.tags))
self.assertEqual(TAG_FAILURE, self.prompt.get_write_tags()[0])
self.prompt.tags = []
code = self.exception_handler.handle_exception(exceptions.ConnectionException({}))
self.assertEqual(code, handler.CODE_CONNECTION_EXCEPTION)
self.assertEqual(1, len(self.prompt.tags))
self.assertEqual(TAG_FAILURE, self.prompt.get_write_tags()[0])
self.prompt.tags = []
code = self.exception_handler.handle_exception(exceptions.NotFoundException({'resources' : {'repo_id' : 'foo'}}))
self.assertEqual(code, handler.CODE_NOT_FOUND)
self.assertEqual(2, len(self.prompt.tags))
self.assertEqual(TAG_FAILURE, self.prompt.get_write_tags()[0])
self.prompt.tags = []
code = self.exception_handler.handle_exception(exceptions.PermissionsException({}))
self.assertEqual(code, handler.CODE_PERMISSIONS_EXCEPTION)
self.assertEqual(1, len(self.prompt.tags))
self.assertEqual(TAG_FAILURE, self.prompt.get_write_tags()[0])
self.prompt.tags = []
code = self.exception_handler.handle_exception(exceptions.PulpServerException({}))
self.assertEqual(code, handler.CODE_PULP_SERVER_EXCEPTION)
self.assertEqual(1, len(self.prompt.tags))
self.assertEqual(TAG_FAILURE, self.prompt.get_write_tags()[0])
self.prompt.tags = []
code = self.exception_handler.handle_exception(InvalidConfig('Test Message'))
self.assertEqual(code, handler.CODE_INVALID_CONFIG)
self.assertEqual(1, len(self.prompt.tags))
self.assertEqual(TAG_FAILURE, self.prompt.get_write_tags()[0])
self.prompt.tags = []
code = self.exception_handler.handle_exception(WrongHost('expected', 'actual'))
self.assertEqual(code, handler.CODE_WRONG_HOST)
self.assertEqual(1, len(self.prompt.tags))
self.assertEqual(TAG_FAILURE, self.prompt.get_write_tags()[0])
self.prompt.tags = []
code = self.exception_handler.handle_exception(exceptions.ApacheServerException('Test Message'))
self.assertEqual(code, handler.CODE_APACHE_SERVER_EXCEPTION)
self.assertEqual(1, len(self.prompt.tags))
self.assertEqual(TAG_FAILURE, self.prompt.get_write_tags()[0])
self.prompt.tags = []
code = self.exception_handler.handle_exception(gaierror())
self.assertEqual(code, handler.CODE_UNKNOWN_HOST)
self.assertEqual(1, len(self.prompt.tags))
self.assertEqual(TAG_FAILURE, self.prompt.get_write_tags()[0])
self.prompt.tags = []
code = self.exception_handler.handle_exception(socket_error())
self.assertEqual(code, handler.CODE_SOCKET_ERROR)
self.assertEqual(1, len(self.prompt.tags))
self.assertEqual(TAG_FAILURE, self.prompt.get_write_tags()[0])
self.prompt.tags = []
code = self.exception_handler.handle_exception(exceptions.ClientCertificateExpiredException(
CERT_FILENAME))
self.assertEqual(code, handler.CODE_PERMISSIONS_EXCEPTION)
self.assertEqual([TAG_FAILURE, TAG_PARAGRAPH], self.prompt.get_write_tags())
self.prompt.tags = []
code = self.exception_handler.handle_exception(Exception({}))
self.assertEqual(code, handler.CODE_UNEXPECTED)
self.assertEqual(1, len(self.prompt.tags))
self.assertEqual(TAG_FAILURE, self.prompt.get_write_tags()[0])
self.prompt.tags = []
开发者ID:CUXIDUMDUM,项目名称:pulp,代码行数:86,代码来源:test_exceptions.py
示例10: get_listening_tcp_socket
def get_listening_tcp_socket(ip, port, backlog=4096, retry=30, certfile=None,
keyfile=None, style=None):
"""
Returns a socket.socket bound to the given ip and tcp port with
other optional parameters.
:param ip: The ip address to listen on. ``''`` and ``'*'`` are
translated to ``'0.0.0.0'`` which will listen on all
configured addresses.
:param port: The tcp port to listen on.
:param backlog: The amount of system queued connections allowed.
:param retry: The number seconds to keep trying to bind the
socket, in case there's another process bound but
exiting soon. This allows near zero-downtime
process handoffs as you start the new one and kill
the old.
:param certfile: The certificate file if you wish the socket to
be ssl wrapped (see ssl.wrap_socket).
:param keyfile: The key file if you wish the socket to be ssl
wrapped (see ssl.wrap_socket).
:param style: The libraries you'd like to use in creating the
socket. The default will use the standard Python
libraries. ``'Eventlet'`` is recognized and will
use the Eventlet libraries. Other styles may added
in the future.
"""
if not style:
from socket import AF_INET, AF_INET6, AF_UNSPEC, \
error as socket_error, getaddrinfo, IPPROTO_TCP, socket, \
SOCK_STREAM, SO_KEEPALIVE, SOL_SOCKET, SO_REUSEADDR, TCP_KEEPIDLE
from ssl import wrap_socket
from time import sleep
elif style.lower() == 'eventlet':
from eventlet.green.socket import AF_INET, AF_INET6, AF_UNSPEC, \
error as socket_error, getaddrinfo, IPPROTO_TCP, socket, \
SOCK_STREAM, SO_KEEPALIVE, SOL_SOCKET, SO_REUSEADDR, TCP_KEEPIDLE
from eventlet.green.ssl import wrap_socket
from eventlet import sleep
else:
from socket import error as socket_error
raise socket_error('Socket style %r not understood.' % style)
if not ip or ip == '*':
ip = '0.0.0.0'
family = None
for a in getaddrinfo(ip, port, AF_UNSPEC, SOCK_STREAM):
if a[0] in (AF_INET, AF_INET6):
family = a[0]
break
if not family:
raise socket_error('Could not determine address family of %s:%s for '
'binding.' % (ip, port))
good_sock = None
retry_until = time() + retry
while not good_sock and time() < retry_until:
try:
sock = socket(family, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1)
sock.setsockopt(IPPROTO_TCP, TCP_KEEPIDLE, 600)
sock.bind((ip, port))
sock.listen(backlog)
if certfile and keyfile:
sock = wrap_socket(sock, certfile=certfile, keyfile=keyfile)
good_sock = sock
except socket_error, err:
if err.errno != EADDRINUSE:
raise
sleep(0.1)
开发者ID:ahale,项目名称:brim,代码行数:68,代码来源:service.py
示例11: socket
try:
sock = socket(family, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1)
sock.setsockopt(IPPROTO_TCP, TCP_KEEPIDLE, 600)
sock.bind((ip, port))
sock.listen(backlog)
if certfile and keyfile:
sock = wrap_socket(sock, certfile=certfile, keyfile=keyfile)
good_sock = sock
except socket_error, err:
if err.errno != EADDRINUSE:
raise
sleep(0.1)
if not good_sock:
raise socket_error('Could not bind to %s:%s after trying for %s '
'seconds.' % (ip, port, retry))
return good_sock
def get_listening_udp_socket(ip, port, retry=30, style=None):
"""
Returns a socket.socket bound to the given ip and tcp port with
other optional parameters.
:param ip: The ip address to listen on. ``''`` and ``'*'`` are
translated to ``'0.0.0.0'`` which will listen on all
configured addresses.
:param port: The udp port to listen on.
:param retry: The number seconds to keep trying to bind the
socket, in case there's another process bound but
exiting soon. This allows near zero-downtime
开发者ID:ahale,项目名称:brim,代码行数:32,代码来源:service.py
注:本文中的socket.socket_error函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论