本文整理汇总了Python中six.moves.BaseHTTPServer.HTTPServer类的典型用法代码示例。如果您正苦于以下问题:Python HTTPServer类的具体用法?Python HTTPServer怎么用?Python HTTPServer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了HTTPServer类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: __init__
def __init__(self, boat, behaviour_manager,
server_address, RequestHandlerClass, bind_and_activate=True):
HTTPServer.__init__(self, server_address, RequestHandlerClass,
bind_and_activate)
log.info('boatd api listening on %s:%s', *server_address)
self.boat = boat
self.behaviour_manager = behaviour_manager
self.running = True
# set API endpoints for GETs
self.handles = {
'/': self.boatd_info,
'/boat': self.boat_attr,
'/wind': self.wind,
'/active': self.boat_active,
'/behaviours': self.behaviours,
}
# set API endpoints for POSTs
self.post_handles = {
'/': self.boatd_post,
'/behaviours': self.behaviours_post,
}
开发者ID:zuzak,项目名称:boatd,代码行数:25,代码来源:api.py
示例2: __init__
def __init__(self, server_address, RequestHandlerClass,
ssl_context=None, request_queue_size=None):
# This overrides the implementation of __init__ in python's
# SocketServer.TCPServer (which BaseHTTPServer.HTTPServer
# does not override, thankfully).
HTTPServer.__init__(self, server_address, RequestHandlerClass)
self.socket = socket.socket(self.address_family,
self.socket_type)
self.ssl_context = ssl_context
if ssl_context:
class TSafeConnection(tsafe.Connection):
def settimeout(self, *args):
self._lock.acquire()
try:
return self._ssl_conn.settimeout(*args)
finally:
self._lock.release()
def gettimeout(self):
self._lock.acquire()
try:
return self._ssl_conn.gettimeout()
finally:
self._lock.release()
self.socket = TSafeConnection(ssl_context, self.socket)
self.server_bind()
if request_queue_size:
self.socket.listen(request_queue_size)
self.server_activate()
开发者ID:crystallovemama,项目名称:hue,代码行数:28,代码来源:httpserver.py
示例3: HTTPLogServer
class HTTPLogServer(threading.Thread):
class RequestHandler(BaseHTTPRequestHandler):
INDENT = 4
def do_GET(self):
if self.path == "/summary":
self.send_response(200)
self.end_headers()
with self.server.state_lock:
status = {
"complete": self.server.state["complete"],
"running" : self.server.state["running"],
"total" : self.server.state["total"],
"results" : self.server.state["summary"],
}
self.wfile.write(json.dumps(status, indent=self.INDENT))
else:
self.send_response(404)
self.end_headers()
def __init__(self, state, state_lock):
super(HTTPLogServer, self).__init__()
port = int(PIGLIT_CONFIG.safe_get("http", "port", fallback=8080))
self._httpd = HTTPServer(("", port), HTTPLogServer.RequestHandler)
self._httpd.state = state
self._httpd.state_lock = state_lock
def run(self):
while True:
with self._httpd.state_lock:
# stop handling requests after the request for the final results
if self._httpd.state["complete"] == self._httpd.state["total"]:
break;
self._httpd.handle_request()
开发者ID:chadversary,项目名称:piglit,代码行数:34,代码来源:log.py
示例4: __init__
def __init__(self, addr, handler, root, userpwd):
HTTPServer.__init__(self, addr, handler)
self.root = root
self.userpwd = userpwd # WebDAV Auth user:passwd
if len(userpwd)>0:
self.auth_enable = True
else:
self.auth_enable = False
开发者ID:prince-0203,项目名称:TinyWebDav,代码行数:8,代码来源:WebDAV.py
示例5: __init__
def __init__(self, patroni, config):
self.connection_string = 'http://{}/patroni'.format(config.get('connect_address', None) or config['listen'])
host, port = config['listen'].split(':')
HTTPServer.__init__(self, (host, int(port)), RestApiHandler)
Thread.__init__(self, target=self.serve_forever)
self._set_fd_cloexec(self.socket)
self.patroni = patroni
self.daemon = True
开发者ID:radek-senfeld,项目名称:patroni,代码行数:8,代码来源:api.py
示例6: start_webserver
def start_webserver():
"""Start the webserver.
"""
path = os.path.dirname(__file__)
os.chdir(path)
server_address = ("127.0.0.1", 0)
httpd = HTTPServer(server_address, CGIHTTPRequestHandler)
print("http://127.0.0.1:%d" % httpd.server_port)
httpd.serve_forever()
开发者ID:scriptharness,项目名称:python-scriptharness,代码行数:9,代码来源:cgi_server.py
示例7: __init__
def __init__(self, server_address, handler, impl, meta):
'''
:param server_address: address of the server
:param handler: handler for requests
:param impl: reference to the implementation object
'''
HTTPServer.__init__(self, server_address, handler)
self.impl = impl
self.meta = meta
开发者ID:LucaBongiorni,项目名称:kitty,代码行数:9,代码来源:rpc.py
示例8: HelpServer
def HelpServer():
global server
while 1:
try:
server = HTTPServer(('localhost', PORT_NUMBER), HelpHandler)
server.serve_forever( poll_interval = 2 )
except Exception as e:
server = None
time.sleep( 5 )
开发者ID:esitarski,项目名称:CrossMgr,代码行数:9,代码来源:HelpSearch.py
示例9: __init__
def __init__(self, store, *args, **kwargs):
HTTPServer.__init__(self, *args, **kwargs)
self.sessions = {}
self.store = store
if self.server_port != 80:
self.base_url = ('http://%s:%s/' %
(self.server_name, self.server_port))
else:
self.base_url = 'http://%s/' % (self.server_name,)
开发者ID:ziima,项目名称:python-openid,代码行数:10,代码来源:consumer.py
示例10: ProxyThread
class ProxyThread(threading.Thread):
def __init__(self, token):
self.proxy = HTTPServer(('localhost', 0), ProxyHandler)
self.proxy._dcos_auth_token = token
super(ProxyThread, self).__init__()
def run(self):
self.proxy.serve_forever()
def port(self):
return self.proxy.socket.getsockname()[1]
开发者ID:balasubram,项目名称:dcos-spark,代码行数:11,代码来源:spark_submit.py
示例11: __init__
def __init__(self, *args, **kwargs):
HTTPServer.__init__(self, *args, **kwargs)
if self.server_port != 80:
self.base_url = ('http://%s:%s/' %
(self.server_name, self.server_port))
else:
self.base_url = 'http://%s/' % (self.server_name,)
self.openid = None
self.approved = {}
self.lastCheckIDRequest = {}
开发者ID:ziima,项目名称:python-openid,代码行数:12,代码来源:server.py
示例12: run
def run(self):
"""
Runs the server using Python's simple HTTPServer.
TODO: make this multithreaded.
"""
httpd = HTTPServer((self.host, self.port), self._Handler)
sa = httpd.socket.getsockname()
serve_message = "Serving HTTP on {host} port {port} (http://{host}:{port}/) ..."
print(serve_message.format(host=sa[0], port=sa[1]))
try:
httpd.serve_forever()
except KeyboardInterrupt:
print("\nKeyboard interrupt received, exiting.")
httpd.shutdown()
开发者ID:gaoxuesong,项目名称:python-stanford-corenlp,代码行数:14,代码来源:annotator.py
示例13: run_echo_server
def run_echo_server():
class Handler(BaseHTTPRequestHandler):
def _do(self):
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
parsed = urlparse(self.path)
query_parameters = parse_qs(parsed.query)
response = {
'path': parsed.path,
'headers': dict(self.headers)
}
if query_parameters:
response['query_params'] = query_parameters
try:
try: # py2
raw_content_len = self.headers.getheader('content-length')
except AttributeError: # py3
raw_content_len = self.headers.get('content-length')
content_len = int(raw_content_len)
except TypeError:
content_len = 0
if content_len:
body = self.rfile.read(content_len)
try: # py3
body = body.decode('UTF-8')
except AttributeError: # py2
pass
if body:
response['body'] = body
print(response)
encoded_json = json.dumps(response)
self.wfile.write(b(encoded_json))
return
do_GET = _do
do_POST = _do
do_PUT = _do
do_HEAD = _do
do_PATCH = _do
do_OPTIONS = _do
do_DELETE = _do
do_TRACE = _do
do_CONNECT = _do
server_address = ('', PORT)
httpd = HTTPServer(server_address, Handler)
httpd.serve_forever()
开发者ID:bartoszhernas,项目名称:silk,代码行数:48,代码来源:util.py
示例14: get_request
def get_request(self):
socket, client_address = HTTPServer.get_request(self)
socket = ssl.wrap_socket(socket,
keyfile=HttpsTestServerLayer.CERT_FILE,
certfile=HttpsTestServerLayer.CERT_FILE,
server_side=True)
return socket, client_address
开发者ID:cloudtrainings,项目名称:crate-workshop,代码行数:7,代码来源:tests.py
示例15: __initialize
def __initialize(self, config):
self.__ssl_options = self.__get_ssl_options(config)
self.__listen = config['listen']
host, port = config['listen'].rsplit(':', 1)
HTTPServer.__init__(self, (host, int(port)), RestApiHandler)
Thread.__init__(self, target=self.serve_forever)
self._set_fd_cloexec(self.socket)
self.__protocol = 'http'
# wrap socket with ssl if 'certfile' is defined in a config.yaml
# Sometime it's also needed to pass reference to a 'keyfile'.
if self.__ssl_options.get('certfile'):
import ssl
self.socket = ssl.wrap_socket(self.socket, server_side=True, **self.__ssl_options)
self.__protocol = 'https'
self.__set_connection_string(config.get('connect_address'))
开发者ID:jberkus,项目名称:patroni,代码行数:17,代码来源:api.py
示例16: test
def test():
host = 'localhost'
# When I use port 0 here, it works for the first fetch and the
# next one gets connection refused. Bummer. So instead, pick a
# port that's *probably* not in use.
import os
port = (os.getpid() % 31000) + 1024
server = HTTPServer((host, port), FetcherTestHandler)
import threading
server_thread = threading.Thread(target=server.serve_forever)
server_thread.setDaemon(True)
server_thread.start()
run_fetcher_tests(server)
server.shutdown()
开发者ID:bjinwright,项目名称:python3-openid,代码行数:18,代码来源:test_fetchers.py
示例17: SimpleWebServer
class SimpleWebServer(object):
"""A very basic web server."""
def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT):
self.stop_serving = False
host = host
port = port
while True:
try:
self.server = HTTPServer(
(host, port), HtmlOnlyHandler)
self.host = host
self.port = port
break
except socket.error:
log.debug("port %d is in use, trying to next one" % port)
port += 1
self.thread = threading.Thread(target=self._run_web_server)
def _run_web_server(self):
"""Runs the server loop."""
log.debug("web server started")
while not self.stop_serving:
self.server.handle_request()
self.server.server_close()
def start(self):
"""Starts the server."""
self.thread.start()
def stop(self):
"""Stops the server."""
self.stop_serving = True
try:
# This is to force stop the server loop
URLopener().open("http://%s:%d" % (self.host, self.port))
except IOError:
pass
log.info("Shutting down the webserver")
self.thread.join()
def where_is(self, path):
return "http://%s:%d/%s" % (self.host, self.port, path)
开发者ID:HuntJSparra,项目名称:bokeh,代码行数:43,代码来源:file_server.py
示例18: main
def main(ikey, skey, akey, host, port=8080):
server = HTTPServer(('', port), RequestHandler)
server.ikey = ikey
server.skey = skey
server.akey = akey
server.host = host
print("Visit the root URL with a 'user' argument, e.g.")
print("'http://localhost:%d/?user=myname'." % port)
server.serve_forever()
开发者ID:,项目名称:,代码行数:9,代码来源:
示例19: __init__
def __init__(self, patroni, config):
self._auth_key = base64.b64encode(config['auth'].encode('utf-8')).decode('utf-8') if 'auth' in config else None
host, port = config['listen'].split(':')
HTTPServer.__init__(self, (host, int(port)), RestApiHandler)
Thread.__init__(self, target=self.serve_forever)
self._set_fd_cloexec(self.socket)
protocol = 'http'
# wrap socket with ssl if 'certfile' is defined in a config.yaml
# Sometime it's also needed to pass reference to a 'keyfile'.
options = {option: config[option] for option in ['certfile', 'keyfile'] if option in config}
if options.get('certfile'):
import ssl
self.socket = ssl.wrap_socket(self.socket, server_side=True, **options)
protocol = 'https'
self.connection_string = '{0}://{1}/patroni'.format(protocol, config.get('connect_address', config['listen']))
self.patroni = patroni
self.daemon = True
开发者ID:UKHomeOffice,项目名称:docker-postgres-patroni-not-tracking,代码行数:21,代码来源:api.py
示例20: __init__
def __init__(self, patroni, config):
self._auth_key = base64.b64encode(config["auth"].encode("utf-8")).decode("utf-8") if "auth" in config else None
host, port = config["listen"].split(":")
HTTPServer.__init__(self, (host, int(port)), RestApiHandler)
Thread.__init__(self, target=self.serve_forever)
self._set_fd_cloexec(self.socket)
protocol = "http"
# wrap socket with ssl if 'certfile' is defined in a config.yaml
# Sometime it's also needed to pass reference to a 'keyfile'.
options = {option: config[option] for option in ["certfile", "keyfile"] if option in config}
if options.get("certfile", None):
import ssl
self.socket = ssl.wrap_socket(self.socket, server_side=True, **options)
protocol = "https"
self.connection_string = "{}://{}/patroni".format(protocol, config.get("connect_address", config["listen"]))
self.patroni = patroni
self.daemon = True
开发者ID:alexclear,项目名称:patroni,代码行数:22,代码来源:api.py
注:本文中的six.moves.BaseHTTPServer.HTTPServer类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论