本文整理汇总了Python中socket.getdefaulttimeout函数的典型用法代码示例。如果您正苦于以下问题:Python getdefaulttimeout函数的具体用法?Python getdefaulttimeout怎么用?Python getdefaulttimeout使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了getdefaulttimeout函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: feedsStatus
def feedsStatus(self):
from urllib import urlopen
import socket
self["tl_red"].hide()
self["tl_yellow"].hide()
self["tl_green"].hide()
currentTimeoutDefault = socket.getdefaulttimeout()
socket.setdefaulttimeout(3)
try:
d = urlopen("http://openvix.co.uk/TrafficLightState.php")
self.trafficLight = d.read()
if self.trafficLight == "unstable":
self["tl_off"].hide()
self["tl_red"].show()
elif self.trafficLight == "updating":
self["tl_off"].hide()
self["tl_yellow"].show()
elif self.trafficLight == "stable":
self["tl_off"].hide()
self["tl_green"].show()
else:
self.trafficLight = "unknown"
self["tl_off"].show()
except:
self.trafficLight = "unknown"
self["tl_off"].show()
socket.setdefaulttimeout(currentTimeoutDefault)
开发者ID:undertaker01,项目名称:enigma2,代码行数:28,代码来源:SoftwareUpdate.py
示例2: testDefaultTimeout
def testDefaultTimeout(self):
# Testing default timeout
# The default timeout should initially be None
self.assertEqual(socket.getdefaulttimeout(), None)
s = socket.socket()
self.assertEqual(s.gettimeout(), None)
s.close()
# Set the default timeout to 10, and see if it propagates
socket.setdefaulttimeout(10)
self.assertEqual(socket.getdefaulttimeout(), 10)
s = socket.socket()
self.assertEqual(s.gettimeout(), 10)
s.close()
# Reset the default timeout to None, and see if it propagates
socket.setdefaulttimeout(None)
self.assertEqual(socket.getdefaulttimeout(), None)
s = socket.socket()
self.assertEqual(s.gettimeout(), None)
s.close()
# Check that setting it to an invalid value raises ValueError
self.assertRaises(ValueError, socket.setdefaulttimeout, -1)
# Check that setting it to an invalid type raises TypeError
self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
开发者ID:249550148,项目名称:gevent,代码行数:27,代码来源:test_socket.py
示例3: __init__
def __init__(self, family_or_realsock=socket.AF_INET, *args, **kwargs):
should_set_nonblocking = kwargs.pop('set_nonblocking', True)
if isinstance(family_or_realsock, (int, long)):
fd = _original_socket(family_or_realsock, *args, **kwargs)
else:
fd = family_or_realsock
# import timeout from other socket, if it was there
try:
self._timeout = fd.gettimeout() or socket.getdefaulttimeout()
except AttributeError:
self._timeout = socket.getdefaulttimeout()
if should_set_nonblocking:
set_nonblocking(fd)
self.fd = fd
# when client calls setblocking(0) or settimeout(0) the socket must
# act non-blocking
self.act_non_blocking = False
# Copy some attributes from underlying real socket.
# This is the easiest way that i found to fix
# https://bitbucket.org/eventlet/eventlet/issue/136
# Only `getsockopt` is required to fix that issue, others
# are just premature optimization to save __getattr__ call.
self.bind = fd.bind
self.close = fd.close
self.fileno = fd.fileno
self.getsockname = fd.getsockname
self.getsockopt = fd.getsockopt
self.listen = fd.listen
self.setsockopt = fd.setsockopt
self.shutdown = fd.shutdown
开发者ID:lafengnan,项目名称:eventlet,代码行数:33,代码来源:greenio.py
示例4: get_remote_applications_icon
def get_remote_applications_icon (self):
logging.info ("get remote applications icon")
for n in self.notification:
if not str (n['app_id']) in self.app_ids:
self.app_ids.append (str (n['app_id']))
ids_str = ", ".join (self.app_ids)
qstr = "SELECT icon_url, app_id FROM application WHERE app_id IN (%s)" % ids_str
apps = self._query (qstr)
default_timeout = socket.getdefaulttimeout ()
socket.setdefaulttimeout (GET_ICON_TIMEOUT)
logging.debug ("socket timeout: %s" % socket.getdefaulttimeout ())
timeout_count = 0
for app in apps:
if timeout_count < 3:
try:
icon_name = self.get_remote_icon \
(app['icon_url'], self.app_icons_dir)
except TimeoutError:
logging.debug ("timeout")
timeout_count += 1
icon_name = ""
except NoUpdateError:
logging.debug ("No need update")
icon_name = os.path.basename \
(urlparse.urlsplit (app['icon_url']).path)
else:
icon_name = ""
self.applications[app['app_id']] = {'icon_name': icon_name}
socket.setdefaulttimeout (default_timeout)
self.refresh_status["apps_icon"] = True
开发者ID:evaryont,项目名称:wallbox,代码行数:32,代码来源:post_office.py
示例5: testTimeoutAttribute
def testTimeoutAttribute(self):
# This will prove that the timeout gets through HTTPConnection
# and into the socket.
# default -- use global socket timeout
self.assertIsNone(socket.getdefaulttimeout())
socket.setdefaulttimeout(30)
try:
httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT)
httpConn.connect()
finally:
socket.setdefaulttimeout(None)
self.assertEqual(httpConn.sock.gettimeout(), 30)
httpConn.close()
# no timeout -- do not use global socket default
self.assertIsNone(socket.getdefaulttimeout())
socket.setdefaulttimeout(30)
try:
httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT,
timeout=None)
httpConn.connect()
finally:
socket.setdefaulttimeout(None)
self.assertEqual(httpConn.sock.gettimeout(), None)
httpConn.close()
# a value
httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30)
httpConn.connect()
self.assertEqual(httpConn.sock.gettimeout(), 30)
httpConn.close()
开发者ID:chidea,项目名称:GoPythonDLLWrapper,代码行数:32,代码来源:test_httplib.py
示例6: test_update_feeds_resets_timeout_on_exception
def test_update_feeds_resets_timeout_on_exception(mock_update, hacks_feed):
"""update_feeds resets the socket timeout even on an exception."""
assert socket.getdefaulttimeout() is None
mock_update.side_effect = Exception('Failure')
with pytest.raises(Exception):
update_feeds()
assert socket.getdefaulttimeout() is None
开发者ID:Elchi3,项目名称:kuma,代码行数:7,代码来源:test_utils.py
示例7: test_update_feeds
def test_update_feeds(hacks_feed, mocked_parse):
"""update_feeds adds new entries, resets timeout."""
assert socket.getdefaulttimeout() is None
count = update_feeds()
assert count == 2
assert Entry.objects.count() == 2
assert socket.getdefaulttimeout() is None
开发者ID:Elchi3,项目名称:kuma,代码行数:7,代码来源:test_utils.py
示例8: get_node
def get_node(cls, external_manager, proxy_class, hostname=None, ssh_port=22, username='root', password=None, isRemote=False, use_keys=False):
key = cls.get_key(proxy_class, hostname, ssh_port, username, password, isRemote, use_keys)
if cls.nodes.get(key) is not None:
return cls.nodes[key]
try:
import time
import datetime
start = datetime.datetime.now()
print 'NODE_PROXY : START ',proxy_class,hostname
print socket.getdefaulttimeout()
print start
node = NodeWrapper(external_manager,proxy_class,hostname,ssh_port,username,password,isRemote,use_keys)
finally:
now = datetime.datetime.now()
print 'NODE_PROXY : END ',hostname,socket.getdefaulttimeout()
print (now - start).seconds
cls._node_pool_lock.acquire()
try:
if cls.nodes.get(key) is None:
cls.nodes[key] = node
print 'Adding to NodePool'
else:
node.cleanup()
return cls.nodes[key]
finally:
cls._node_pool_lock.release()
开发者ID:smarkm,项目名称:ovm,代码行数:26,代码来源:NodePool.py
示例9: fetch
def fetch(self, server):
"""
This function gets your IP from a specific server
"""
t = None
socket_default_timeout = socket.getdefaulttimeout()
opener = urllib.build_opener()
opener.addheaders = [('User-agent',
"Mozilla/5.0 (X11; Linux x86_64; rv:24.0)"
" Gecko/20100101 Firefox/24.0")]
try:
# Close url resource if fetching not finished within timeout.
t = Timer(self.timeout, self.handle_timeout, [self.url])
t.start()
# Open URL.
if version_info[0:2] == (2, 5):
# Support for Python 2.5.* using socket hack
# (Changes global socket timeout.)
socket.setdefaulttimeout(self.timeout)
self.url = opener.open(server)
else:
self.url = opener.open(server, timeout=self.timeout)
# Read response.
content = self.url.read()
# Didn't want to import chardet. Prefered to stick to stdlib
if PY3K:
try:
content = content.decode('UTF-8')
except UnicodeDecodeError:
content = content.decode('ISO-8859-1')
p = '(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.('
p += '25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|['
p += '01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)'
m = re.search(
p,
content)
myip = m.group(0)
if len(myip) > 0:
return myip
else:
return ''
except Exception as e:
print(e)
return ''
finally:
if self.url is not None:
self.url.close()
self.url = None
if t is not None:
t.cancel()
# Reset default socket timeout.
if socket.getdefaulttimeout() != socket_default_timeout:
socket.setdefaulttimeout(socket_default_timeout)
开发者ID:debunge,项目名称:pyp2p,代码行数:59,代码来源:ipgetter.py
示例10: _socket_create_connection
def _socket_create_connection(address, timeout=None):
if timeout is None:
timeout = socket.getdefaulttimeout()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
oldtimeout = socket.getdefaulttimeout()
socket.setdefaulttimeout(timeout)
sock.connect(address)
socket.setdefaulttimeout(oldtimeout)
return sock
开发者ID:anutron,项目名称:windmill,代码行数:9,代码来源:https.py
示例11: test_timeout_reset_after_call
def test_timeout_reset_after_call(self):
old_timeout = socket.getdefaulttimeout()
self.stub_urlopen_with_timeout_check(30, None, "url")
try:
socket.setdefaulttimeout(1234)
base_utils.urlopen("url", timeout=30)
self.assertEquals(1234, socket.getdefaulttimeout())
finally:
socket.setdefaulttimeout(old_timeout)
开发者ID:HMTech,项目名称:autotest,代码行数:9,代码来源:base_utils_unittest.py
示例12: getTimeout
def getTimeout(self):
"""
Return the timeout set for this session. Should be set in any case,
but just to be sure if..else struct.
"""
if(socket.getdefaulttimeout() != None):
return socket.getdefaulttimeout()
else:
return 'No timeout set'
开发者ID:vikasgupta1812,项目名称:ping.py,代码行数:9,代码来源:TimeoutController.py
示例13: testSocketTimeout
def testSocketTimeout(self):
dict = {'logger': self.logger,
'endpoint': 'https://github.com/dmwm',
'cacheduration': None,
'timeout': 10,
}
service = Service(dict)
deftimeout = socket.getdefaulttimeout()
service.getData('%s/socketresettest' % self.testDir, '/WMCore/blob/master/setup.py#L11')
assert deftimeout == socket.getdefaulttimeout()
开发者ID:yuyiguo,项目名称:WMCore,代码行数:10,代码来源:Service_t.py
示例14: testPing
def testPing(self):
import socket
oldTimeOut = socket.getdefaulttimeout()
from angel_app.resource.remote.clone import Clone
cc = Clone("80.219.195.84", 6221)
assert False == cc.ping()
dd = Clone("localhost")
assert True == dd.ping(), "Make sure you have a local provider instance running."
assert oldTimeOut == socket.getdefaulttimeout()
开发者ID:infothrill,项目名称:angel-app-svnmirror,代码行数:12,代码来源:cloneTest.py
示例15: __init__
def __init__ (self, family = socket.AF_INET, type = socket.SOCK_STREAM, proto = 0, _sock = None,
_hub = None):
"""
Initialize the UV socket
:param family_or_realsock: a socket descriptor or a socket family
"""
self.uv_fd = None
self.uv_handle = None
self.uv_hub = None
self.uv_recv_string = '' # buffer for receiving data...
if isinstance(family, (int, long)):
self.uv_fd = _original_socket(family, type, proto, _sock)
elif isinstance(family, GreenSocket):
_sock = family
self.uv_fd = _sock.uv_fd
if hasattr(_sock, 'uv_hub') and _sock.uv_hub:
_hub = _sock.uv_hub
else:
_sock = family
self.uv_fd = _sock
if not self.uv_hub:
if _hub:
self.uv_hub = _hub
else:
self.uv_hub = weakref.proxy(get_hub())
## check if the socket type is supported by pyUV and we can create a pyUV socket...
if not self.uv_handle:
if self.type == socket.SOCK_STREAM:
self.uv_handle = pyuv.TCP(self.uv_hub.uv_loop)
self.uv_handle.open(self.fileno())
elif self.type == socket.SOCK_DGRAM:
self.uv_handle = pyuv.UDP(self.uv_hub.uv_loop)
self.uv_handle.open(self.fileno())
# import timeout from other socket, if it was there
try:
self._timeout = self.uv_fd.gettimeout() or socket.getdefaulttimeout()
except AttributeError:
self._timeout = socket.getdefaulttimeout()
assert self.uv_fd, 'the socket descriptor must be not null'
set_nonblocking(self.uv_fd)
# when client calls setblocking(0) or settimeout(0) the socket must act non-blocking
self.act_non_blocking = False
开发者ID:inercia,项目名称:evy,代码行数:50,代码来源:sockets.py
示例16: retry_http
def retry_http(tries, backoff=2, on_failure='error'):
"""
Retry a function or method reading from the internet until no socket or IOError
is raised
delay sets the initial delay, and backoff sets how much the delay should
lengthen after each failure. backoff must be greater than 1, or else it
isn't really a backoff. tries must be at least 0, and delay greater than 0.
"""
delay = socket.getdefaulttimeout()
o_delay = socket.getdefaulttimeout()
if backoff <= 1:
raise ValueError("backoff must be greater than 1")
tries = math.floor(tries)
if tries < 0:
raise ValueError("tries must be 0 or greater")
if delay <= 0:
delay = 15.
o_delay = 15.
socket.setdefaulttimeout(delay)
#raise ValueError("delay must be greater than 0")
def deco_retry(f):
def f_retry(*args, **kwargs):
mtries, mdelay = tries, delay # make mutable
while mtries > 0:
try:
rv = f(*args, **kwargs) # Try again
except IOError,msg:
rv = False
except socket.error:
rv = False
if rv != False: # Done on success
return rv
mtries -= 1 # consume an attempt
socket.setdefaulttimeout(mdelay) # wait...
mdelay *= backoff # make future wait longer
logger.error("URL timeout: %d attempts remaining (delay=%.1fs)"%(mtries,mdelay))
logger.critical("URL timeout: number of trials exceeded")
if on_failure=='error':
raise IOError,msg # Ran out of tries :-(
else:
logger.critical("URL Failed, but continuing...")
return None
开发者ID:IvS-KULeuven,项目名称:ComboCode,代码行数:48,代码来源:decorators.py
示例17: _call_api
def _call_api(self, api_url, err_env):
"""urlopen(), plus error handling and possible retries.
err_env is a dict of additional info passed to the error handler
"""
while True: # may retry on error
api_request = urllib2.Request(
api_url, headers={"Accept-Encoding": "gzip"})
log.debug("Amazon URL: %s" % api_url)
try:
if self.Timeout and sys.version[:3] in ["2.4", "2.5"]:
# urllib2.urlopen() doesn't accept timeout until 2.6
old_timeout = socket.getdefaulttimeout()
try:
socket.setdefaulttimeout(self.Timeout)
return urllib2.urlopen(api_request)
finally:
socket.setdefaulttimeout(old_timeout)
else:
# the simple way
return urllib2.urlopen(api_request, timeout=self.Timeout)
except:
if not self.ErrorHandler:
raise
exception = sys.exc_info()[1] # works in Python 2 and 3
err = {'exception': exception}
err.update(err_env)
if not self.ErrorHandler(err):
raise
开发者ID:patrickdessalle,项目名称:bottlenose,代码行数:32,代码来源:api.py
示例18: __init__
def __init__(self, family=_socket.AF_INET, type=_socket.SOCK_STREAM, proto=0, _sock=None):
if _sock is None:
_sock = _socket_socket(family, type, proto)
self.__socket = _sock
self.__socket.setblocking(0)
self.__timeout = _socket.getdefaulttimeout()
开发者ID:thomascobb,项目名称:cothread,代码行数:7,代码来源:cosocket.py
示例19: urlopen
def urlopen(url, retries=3, codes=(408, 500, 502, 503, 504), timeout=None):
"""Open url, optionally retrying if an error is encountered.
Socket and other IO errors will always be retried if retries > 0.
HTTP errors are retried if the error code is passed in ``codes``.
:param retries: Number of time to retry.
:param codes: HTTP error codes that should be retried.
"""
attempts = 0
while True:
try:
return urllib2.urlopen(url, timeout=timeout)
except IOError as e:
no_retry = isinstance(e, urllib2.HTTPError) and e.code not in codes
if attempts < retries and not no_retry:
attempts += 1
continue
else:
try:
url_string = url.get_full_url() # if url is Request obj
except Exception:
url_string = url
if timeout is None:
timeout = socket.getdefaulttimeout()
log.exception(
'Failed after %s retries on url with a timeout of %s: %s: %s',
attempts, timeout, url_string, e)
raise e
开发者ID:dastanforever,项目名称:allura,代码行数:30,代码来源:helpers.py
示例20: _socket_timeout
def _socket_timeout(*args, **kwargs):
old_timeout = socket.getdefaulttimeout()
socket.setdefaulttimeout(timeout)
try:
return func(*args, **kwargs)
finally:
socket.setdefaulttimeout(old_timeout)
开发者ID:547358880,项目名称:flask-tutorial,代码行数:7,代码来源:package_index.py
注:本文中的socket.getdefaulttimeout函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论