本文整理汇总了Python中requests.compat.urlparse函数的典型用法代码示例。如果您正苦于以下问题:Python urlparse函数的具体用法?Python urlparse怎么用?Python urlparse使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了urlparse函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: get_connection
def get_connection(self, url, proxies=None, verify=None, cert=None):
"""Returns a urllib3 connection for the given URL. This should not be
called from user code, and is only exposed for use when subclassing the
:class:`HTTPAdapter <requests.adapters.HTTPAdapter>`.
:param url: The URL to connect to.
:param proxies: (optional) A Requests-style dictionary of proxies used on this request.
"""
with self._pool_kw_lock:
if url.lower().startswith('https'):
self._update_poolmanager_ssl_kw(verify, cert)
proxies = proxies or {}
proxy = proxies.get(urlparse(url.lower()).scheme)
if proxy:
proxy = prepend_scheme_if_needed(proxy, 'http')
proxy_manager = self.proxy_manager_for(proxy)
conn = proxy_manager.connection_from_url(url)
else:
# Only scheme should be lower case
parsed = urlparse(url)
url = parsed.geturl()
conn = self.poolmanager.connection_from_url(url)
return conn
开发者ID:pcreech,项目名称:pulp,代码行数:26,代码来源:adapters.py
示例2: authenticate_server
def authenticate_server(self, response):
"""
Uses GSSAPI to authenticate the server.
Returns True on success, False on failure.
"""
log.debug("authenticate_server(): Authenticate header: {0}".format(
_negotiate_value(response)))
host_port_thread = "%s_%s_%s" % (urlparse(response.url).hostname,
urlparse(response.url).port,
threading.current_thread().ident)
try:
result = kerberos.authGSSClientStep(self.context[host_port_thread],
_negotiate_value(response))
except kerberos.GSSError:
log.exception("authenticate_server(): authGSSClientStep() failed:")
return False
if result < 1:
log.error("authenticate_server(): authGSSClientStep() failed: "
"{0}".format(result))
return False
log.debug("authenticate_server(): returning {0}".format(response))
return True
开发者ID:cloudera,项目名称:hue,代码行数:28,代码来源:kerberos_.py
示例3: headers
def headers(self):
"""Return Request-Line"""
url = urlparse(self._orig.url)
# Querystring
qs = ''
if url.query or self._orig.params:
qs = '?'
if url.query:
qs += url.query
# Requests doesn't make params part of ``request.url``.
if self._orig.params:
if url.query:
qs += '&'
#noinspection PyUnresolvedReferences
qs += type(self._orig)._encode_params(self._orig.params)
# Request-Line
request_line = '{method} {path}{query} HTTP/1.1'.format(
method=self._orig.method,
path=url.path or '/',
query=qs
)
headers = dict(self._orig.headers)
if 'Host' not in headers:
headers['Host'] = urlparse(self._orig.url).netloc
headers = ['%s: %s' % (name, value)
for name, value in headers.items()]
headers.insert(0, request_line)
return '\r\n'.join(headers).strip()
开发者ID:bkvirendra,项目名称:httpie,代码行数:35,代码来源:models.py
示例4: refresh
def refresh(self):
name = urlparse(self.observable).netloc.replace(
'.', '_').replace(':', '_')
# Add the auth headers to any other headers
auth_headers = self.auth.get_headers()
headers = self.http_args.get('headers', {})
headers.update(auth_headers)
# build new http args with these headers
http_args = self.http_args.copy()
http_args['headers'] = headers
response = requests.get(self.info_url, **http_args)
if response.status_code != 200:
raise Exception('%s: status code %d' % (response.url,
response.status_code))
info = msgpack.unpackb(response.content, encoding='utf-8')
self.metadata = info['metadata']
entries = {s['name']: RemoteCatalogEntry(url=self.source_url,
getenv=self.getenv,
getshell=self.getshell,
auth=self.auth,
http_args=self.http_args, **s)
for s in info['sources']}
return name, {}, entries, []
开发者ID:stonebig,项目名称:intake,代码行数:27,代码来源:base.py
示例5: _get_auth
def _get_auth(self):
parsed_url = urlparse(self.authentication_url)
post_data = {
'name': self.username,
'password': self.password,
'next': parsed_url.path + '?' + parsed_url.query
}
try:
response = self.session.post(self.url, data=post_data,
allow_redirects=False)
response.raise_for_status()
response = self.session.get(response.headers['location'],
allow_redirects=False)
response.raise_for_status()
resulting_uri = '{redirect_uri}#access_token=(.*)'.format(
redirect_uri=re.escape(self.redirect_uri))
self.auth = re.search(resulting_uri, response.headers['location']).group(1)
except Exception as error:
helpers.handle_requests_exception(error)
self.auth = None
return self.auth
开发者ID:KraXed112,项目名称:SickRage,代码行数:27,代码来源:putio_client.py
示例6: get_filename_from_url
def get_filename_from_url(url):
"""Get a filename from a URL.
>>> from planet.api import utils
>>> urls = [
... 'https://planet.com/',
... 'https://planet.com/path/to/',
... 'https://planet.com/path/to/example.tif',
... 'https://planet.com/path/to/example.tif?foo=f6f1&bar=baz',
... 'https://planet.com/path/to/example.tif?foo=f6f1&bar=baz#quux'
... ]
>>> for url in urls:
... print('{} -> {}'.format(url, utils.get_filename_from_url(url)))
...
https://planet.com/ -> None
https://planet.com/path/to/ -> None
https://planet.com/path/to/example.tif -> example.tif
https://planet.com/path/to/example.tif?foo=f6f1&bar=baz -> example.tif
https://planet.com/path/to/example.tif?foo=f6f1&bar=baz#quux -> example.tif
>>>
:returns: a filename (i.e. ``basename``)
:rtype: str or None
"""
path = urlparse(url).path
name = path[path.rfind('/')+1:]
return name or None
开发者ID:planetlabs,项目名称:planet-client-python,代码行数:27,代码来源:utils.py
示例7: request
def request(session, base_path, method, path, **kwargs):
"""Construct a :class:`requests.Request` object and send it.
:param requests.Session session:
:param str base_path:
:param str method: Method for the :class:`requests.Request` object.
:param str path: (optional) The path to join with :attr:`CouchDB.url`.
:param kwargs: (optional) Arguments that :meth:`requests.Session.request` takes.
:rtype: requests.Response
"""
# Prepare the params dictionary
if ('params' in kwargs) and isinstance(kwargs['params'], dict):
params = kwargs['params'].copy()
for key, val in iteritems(params):
# Handle titlecase booleans
if isinstance(val, bool):
params[key] = json.dumps(val)
kwargs['params'] = params
if compat.urlparse(path).scheme:
# Support absolute URLs
url = path
else:
url = urljoin(base_path, path).strip('/')
r = session.request(method, url, **kwargs)
# Raise exception on a bad status code
if not (200 <= r.status_code < 300):
utils.raise_http_exception(r)
return r
开发者ID:rwanyoike,项目名称:time2relax,代码行数:31,代码来源:time2relax.py
示例8: generate_request_header
def generate_request_header(self, response):
"""
Generates the GSSAPI authentication token with kerberos.
If any GSSAPI step fails, return None.
"""
host = urlparse(response.url).hostname
peer_name = "{service}@{host}".format(service=self.service, host=host) # eg. [email protected]
req_flags = (C_MUTUAL_FLAG,) if self.mutual_authentication in (REQUIRED, OPTIONAL) else ()
try:
self.context[host] = InitContext(peer_name=peer_name, req_flags=req_flags, cred=self.cred)
except GSSException:
# TODO is it even possible for InitContext() to raise?
log.exception("generate_request_header(): InitContext() failed:")
return None
try:
gss_response = self.context[host].step(_negotiate_value(response))
except GSSException:
log.exception("generate_request_header(): init_context.step() failed:")
return None
return "Negotiate {0}".format(gss_response)
开发者ID:japsu,项目名称:requests-gssapi,代码行数:26,代码来源:implementation.py
示例9: get_response
def get_response(name, request_kwargs):
host = Host(request_kwargs['headers'].get('Host', None)
or urlparse(request_kwargs['url']).netloc.split('@')[-1])
session = Session(host, name)
session.load()
# Update session headers with the request headers.
session['headers'].update(request_kwargs.get('headers', {}))
# Use the merged headers for the request
request_kwargs['headers'] = session['headers']
auth = request_kwargs.get('auth', None)
if auth:
session.auth = auth
elif session.auth:
request_kwargs['auth'] = session.auth
rsession = RSession(cookies=session.cookies)
try:
response = rsession.request(**request_kwargs)
except Exception:
raise
else:
session.cookies = rsession.cookies
session.save()
return response
开发者ID:simonbuchan,项目名称:httpie,代码行数:28,代码来源:sessions.py
示例10: get_tokens
def get_tokens(cls, url, user_agent=None, **kwargs):
scraper = cls.create_scraper()
if user_agent:
scraper.headers["User-Agent"] = user_agent
try:
resp = scraper.get(url, **kwargs)
resp.raise_for_status()
except Exception:
logging.error("'%s' returned an error. Could not collect tokens." % url)
raise
domain = urlparse(resp.url).netloc
cookie_domain = None
for d in scraper.cookies.list_domains():
if d.startswith(".") and d in ("." + domain):
cookie_domain = d
break
else:
raise ValueError(
'Unable to find Cloudflare cookies. Does the site actually have Cloudflare IUAM ("I\'m Under Attack Mode") enabled?'
)
return (
{
"__cfduid": scraper.cookies.get("__cfduid", "", domain=cookie_domain),
"cf_clearance": scraper.cookies.get(
"cf_clearance", "", domain=cookie_domain
),
},
scraper.headers["User-Agent"],
)
开发者ID:Anorov,项目名称:cloudflare-scrape,代码行数:33,代码来源:__init__.py
示例11: get_response
def get_response(name, request_kwargs, read_only=False):
"""Like `client.get_response`, but applies permanent
aspects of the session to the request.
"""
host = Host(request_kwargs['headers'].get('Host', None)
or urlparse(request_kwargs['url']).netloc.split('@')[-1])
session = Session(host, name)
session.load()
# Update session headers with the request headers.
session['headers'].update(request_kwargs.get('headers', {}))
# Use the merged headers for the request
request_kwargs['headers'] = session['headers']
auth = request_kwargs.get('auth', None)
if auth:
session.auth = auth
elif session.auth:
request_kwargs['auth'] = session.auth
rsession = requests.Session(cookies=session.cookies)
try:
response = rsession.request(**request_kwargs)
except Exception:
raise
else:
# Existing sessions with `read_only=True` don't get updated.
if session.is_new or not read_only:
session.cookies = rsession.cookies
session.save()
return response
开发者ID:bkvirendra,项目名称:httpie,代码行数:33,代码来源:sessions.py
示例12: authenticate_server
def authenticate_server(self, response):
"""
Uses GSSAPI to authenticate the server.
Returns True on success, False on failure.
"""
log.debug("authenticate_server(): Authenticate header: {0}".format(
_negotiate_value(response)))
host = urlparse(response.url).hostname
try:
result = kerberos.authGSSClientStep(self.context[host],
_negotiate_value(response))
except kerberos.GSSError as e:
log.error("authenticate_server(): authGSSClientStep() failed:")
log.exception(e)
return False
if result < 1:
log.error("authenticate_server(): authGSSClientStep() failed: "
"{0}".format(result))
return False
log.debug("authenticate_server(): returning {0}".format(response))
return True
开发者ID:vvanholl,项目名称:cassandra_snap_to_hadoop,代码行数:27,代码来源:kerberos_.py
示例13: authenticate_user
def authenticate_user(self, response, **kwargs):
"""Handles user authentication with gssapi/kerberos"""
host = urlparse(response.url).hostname
try:
auth_header = self.generate_request_header(response, host)
except KerberosExchangeError:
# GSS Failure, return existing response
return response
log.debug("authenticate_user(): Authorization header: {0}".format(
auth_header))
response.request.headers['Authorization'] = auth_header
# Consume the content so we can reuse the connection for the next
# request.
response.content
response.raw.release_conn()
_r = response.connection.send(response.request, **kwargs)
_r.history.append(response)
log.debug("authenticate_user(): returning {0}".format(_r))
return _r
开发者ID:FileTrek,项目名称:phoenix,代码行数:25,代码来源:kerberos_.py
示例14: __init__
def __init__(self, count, url, cls, session, params=None, etag=None,
headers=None):
models.GitHubCore.__init__(self, {}, session)
#: Original number of items requested
self.original = count
#: Number of items left in the iterator
self.count = count
#: URL the class used to make it's first GET
self.url = url
#: Last URL that was requested
self.last_url = None
self._api = self.url
#: Class for constructing an item to return
self.cls = cls
#: Parameters of the query string
self.params = params or {}
self._remove_none(self.params)
# We do not set this from the parameter sent. We want this to
# represent the ETag header returned by GitHub no matter what.
# If this is not None, then it won't be set from the response and
# that's not what we want.
#: The ETag Header value returned by GitHub
self.etag = None
#: Headers generated for the GET request
self.headers = headers or {}
#: The last response seen
self.last_response = None
#: Last status code received
self.last_status = 0
if etag:
self.headers.update({'If-None-Match': etag})
self.path = urlparse(self.url).path
开发者ID:ArRolin,项目名称:gitsome,代码行数:34,代码来源:structs.py
示例15: get_spn
def get_spn(self, r):
if self.spn is None:
domain = urlparse(r.url).hostname
self.spn = spn = "[email protected]%s" % domain
log.debug("calculated SPN as %s" % spn)
return self.spn
开发者ID:RogerWebb,项目名称:olap,代码行数:7,代码来源:requests_kerberosauth.py
示例16: from_request
def from_request(request):
"""Make an `HTTPMessage` from `requests.models.Request`."""
url = urlparse(request.url)
request_headers = dict(request.headers)
if 'Host' not in request_headers:
request_headers['Host'] = url.netloc
try:
body = request.data
except AttributeError:
# requests < 0.12.1
body = request._enc_data
if isinstance(body, dict):
# --form
body = request.__class__._encode_params(body)
return HTTPMessage(
line='{method} {path} HTTP/1.1'.format(
method=request.method,
path=url.path or '/'),
headers='\n'.join(str('%s: %s') % (name, value)
for name, value
in request_headers.items()),
body=body,
content_type=request_headers.get('Content-Type')
)
开发者ID:Bahus,项目名称:httpie,代码行数:27,代码来源:httpmessage.py
示例17: send
def send(self, request, **kwargs):
url = urlparse(request.url)
if url.scheme != 'https':
raise Exception('Only HTTPS is supported!')
ctx = self._make_context()
conn = httpslib.HTTPSConnection(
url.hostname, url.port or 443, ssl_context=ctx)
conn.request(request.method, url.path, request.body, request.headers)
resp = conn.getresponse()
response = Response()
# Fallback to None if there's no status_code, for whatever reason.
response.status_code = getattr(resp, 'status', None)
# Make headers case-insensitive.
response.headers = CaseInsensitiveDict(getattr(resp, 'headers', {}))
# Set encoding.
response.encoding = get_encoding_from_headers(response.headers)
response.raw = resp
response.reason = response.raw.reason
if isinstance(request.url, bytes):
response.url = request.url.decode('utf-8')
else:
response.url = request.url
# Give the Response some context.
response.request = request
response.connection = self
return response
开发者ID:mcrute,项目名称:dev_urandom,代码行数:35,代码来源:pkcs11-adapter.py
示例18: parse_args
def parse_args(self, env, args=None, namespace=None):
self.env = env
args = super(Parser, self).parse_args(args, namespace)
if not args.json and env.config.implicit_content_type == 'form':
args.form = True
if args.debug:
args.traceback = True
if args.output:
env.stdout = args.output
env.stdout_isatty = False
self._process_output_options(args, env)
self._process_pretty_options(args, env)
self._guess_method(args, env)
self._parse_items(args)
if not env.stdin_isatty:
self._body_from_file(args, env.stdin)
if not (args.url.startswith(HTTP) or args.url.startswith(HTTPS)):
scheme = HTTPS if env.progname == 'https' else HTTP
args.url = scheme + args.url
if args.auth and not args.auth.has_password():
# Stdin already read (if not a tty) so it's save to prompt.
args.auth.prompt_password(urlparse(args.url).netloc)
return args
开发者ID:hashin2,项目名称:cs499-django,代码行数:33,代码来源:input.py
示例19: reply
def reply(self, url, special_reply_content=None):
"""
:param special_reply_content: will use this reply instead of self.replies
:return: if success return True, else return False
"""
assert self.logged, 'Did not login successfully!'
assert self._reply_contents or special_reply_content, 'No reply text!'
resp = self.get(url)
followup_value = self.pat_followup_value.search(resp.text).group(1).strip(r'\"')
# 为什么不用parse_qs? 因为cc98 url上的boardid有些大写有些小写,不是统一的,
# cc98 is too SB
# 为什么不用lower降成小写?因为我写完上面那句话才想起来的,为了保留上面那句话
# I is too SB
qs_list = parse_qsl(urlparse(url).query)
boardid = qs_list[0][1]
rootid = qs_list[1][1]
reply_url = self.REPLY_BASE_URL + '?' + urlencode((('method', 'fastreply'), ('BoardID', boardid)))
cookies_password = parse_qs(resp.request.headers['cookie']).get('password')[0]
post_reply = special_reply_content if special_reply_content else random.choice(self._reply_contents)
post_form = {
'followup': followup_value,
'RootID': rootid,
'star': '1',
'UserName': self.username,
'passwd': cookies_password,
'Expression': 'face7.gif',
'Content': post_reply,
'signflag': 'yes',
}
self._reply_resp = self.post(reply_url, data=post_form)
return self._reply_resp.ok
开发者ID:littlezz,项目名称:cc98,代码行数:35,代码来源:cc98.py
示例20: authenticate_server
def authenticate_server(self, response):
"""
Uses GSSAPI to authenticate the server.
Returns True on success, False on failure.
"""
log.debug("authenticate_server(): Authenticate header: {0}".format(
_negotiate_value(response)))
host = urlparse(response.url).hostname
try:
# If this is set pass along the struct to Kerberos
if self.cbt_struct:
result = kerberos.authGSSClientStep(self.context[host],
_negotiate_value(response),
channel_bindings=self.cbt_struct)
else:
result = kerberos.authGSSClientStep(self.context[host],
_negotiate_value(response))
except kerberos.GSSError:
log.exception("authenticate_server(): authGSSClientStep() failed:")
return False
if result < 1:
log.error("authenticate_server(): authGSSClientStep() failed: "
"{0}".format(result))
return False
log.debug("authenticate_server(): returning {0}".format(response))
return True
开发者ID:FileTrek,项目名称:phoenix,代码行数:32,代码来源:kerberos_.py
注:本文中的requests.compat.urlparse函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论