本文整理汇总了Python中six.moves.urllib.parse.parse_qsl函数的典型用法代码示例。如果您正苦于以下问题:Python parse_qsl函数的具体用法?Python parse_qsl怎么用?Python parse_qsl使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了parse_qsl函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: __init__
def __init__(self, base, relative=None):
self._has_token = False
self._url = None
self._url_parts = None
self._loaded = False
self._xml = None
self._url_parts = None
self._headers = None
self._config = None
if isinstance(base, six.string_types):
base_url = base
self._url_parts = list(parse.urlsplit(base_url))
elif isinstance(base, RequestBase):
base_url = base.url
self._has_token = base.has_token
self._url_parts = base._url_parts[:]
self._headers = base._headers
self._config = base.config
if relative:
scheme, netloc, path, qs, fragment = parse.urlsplit(relative)
if path:
self._url_parts[2] = _join_plex(self._url_parts[2], path)
if qs:
data = parse.parse_qsl(self._url_parts[3]) + parse.parse_qsl(qs)
self._url_parts[3] = parse.urlencode(data)
else:
# Strip of all non-token parts
data = parse.parse_qsl(self._url_parts[3])
self._url_parts[3] = parse.urlencode([(x, y) for x, y in data if x == 'X-Plex-Token'])
if not self._has_token:
self._has_token = 'X-Plex-Token' in parse.parse_qs(self._url_parts[3])
self._url = parse.urlunsplit(self._url_parts)
开发者ID:Xaroth,项目名称:plex-export,代码行数:32,代码来源:base.py
示例2: params
def params(self, collapse=True):
"""Extracts the query parameters from the split urls components.
This method will provide back as a dictionary the query parameter
names and values that were provided in the url.
:param collapse: Boolean, turn on or off collapsing of query values
with the same name. Since a url can contain the same query parameter
name with different values it may or may not be useful for users to
care that this has happened. This parameter when True uses the
last value that was given for a given name, while if False it will
retain all values provided by associating the query parameter name with
a list of values instead of a single (non-list) value.
"""
if self.query:
if collapse:
return dict(parse.parse_qsl(self.query))
else:
params = {}
for (key, value) in parse.parse_qsl(self.query):
if key in params:
if isinstance(params[key], list):
params[key].append(value)
else:
params[key] = [params[key], value]
else:
params[key] = value
return params
else:
return {}
开发者ID:bdrich,项目名称:neutron-lbaas,代码行数:30,代码来源:netutils.py
示例3: oauth_dance
def oauth_dance(server, consumer_key, key_cert_data, print_tokens=False, verify=None):
if verify is None:
verify = server.startswith('https')
# step 1: get request tokens
oauth = OAuth1(
consumer_key, signature_method=SIGNATURE_RSA, rsa_key=key_cert_data)
r = requests.post(
server + '/plugins/servlet/oauth/request-token', verify=verify, auth=oauth)
request = dict(parse_qsl(r.text))
request_token = request['oauth_token']
request_token_secret = request['oauth_token_secret']
if print_tokens:
print("Request tokens received.")
print(" Request token: {}".format(request_token))
print(" Request token secret: {}".format(request_token_secret))
# step 2: prompt user to validate
auth_url = '{}/plugins/servlet/oauth/authorize?oauth_token={}'.format(
server, request_token)
if print_tokens:
print(
"Please visit this URL to authorize the OAuth request:\n\t{}".format(auth_url))
else:
webbrowser.open_new(auth_url)
print(
"Your browser is opening the OAuth authorization for this client session.")
approved = input(
'Have you authorized this program to connect on your behalf to {}? (y/n)'.format(server))
if approved.lower() != 'y':
exit(
'Abandoning OAuth dance. Your partner faceplants. The audience boos. You feel shame.')
# step 3: get access tokens for validated user
oauth = OAuth1(consumer_key,
signature_method=SIGNATURE_RSA,
rsa_key=key_cert_data,
resource_owner_key=request_token,
resource_owner_secret=request_token_secret
)
r = requests.post(
server + '/plugins/servlet/oauth/access-token', verify=verify, auth=oauth)
access = dict(parse_qsl(r.text))
if print_tokens:
print("Access tokens received.")
print(" Access token: {}".format(access['oauth_token']))
print(" Access token secret: {}".format(
access['oauth_token_secret']))
return {
'access_token': access['oauth_token'],
'access_token_secret': access['oauth_token_secret'],
'consumer_key': consumer_key,
'key_cert': key_cert_data,
}
开发者ID:cogell,项目名称:Jira-Alfred-Workflow,代码行数:58,代码来源:jirashell.py
示例4: equals
def equals(self, rhs):
lhsp = urlparse.urlparse(self.lhs)
rhsp = urlparse.urlparse(rhs)
lhs_qs = urlparse.parse_qsl(lhsp.query)
rhs_qs = urlparse.parse_qsl(rhsp.query)
return (lhsp.scheme == rhsp.scheme and
lhsp.netloc == rhsp.netloc and
lhsp.path == rhsp.path and
len(lhs_qs) == len(rhs_qs) and
set(lhs_qs) == set(rhs_qs))
开发者ID:yizhongyin,项目名称:OpenstackLiberty,代码行数:12,代码来源:test_cli20.py
示例5: GET
def GET(self):
if self._GET is None:
params = parse_qsl(self.url_parts.query, keep_blank_values=True)
self._GET = MultiDict()
for key, value in params:
self._GET.add(key, value)
return self._GET
开发者ID:Ms2ger,项目名称:web-platform-tests,代码行数:7,代码来源:request.py
示例6: canonicalize_url
def canonicalize_url(url, keep_blank_values=True, keep_fragments=False,
encoding=None):
"""Canonicalize the given url by applying the following procedures:
- sort query arguments, first by key, then by value
- percent encode paths and query arguments. non-ASCII characters are
percent-encoded using UTF-8 (RFC-3986)
- normalize all spaces (in query arguments) '+' (plus symbol)
- normalize percent encodings case (%2f -> %2F)
- remove query arguments with blank values (unless keep_blank_values is True)
- remove fragments (unless keep_fragments is True)
The url passed can be a str or unicode, while the url returned is always a
str.
For examples see the tests in tests/test_utils_url.py
"""
scheme, netloc, path, params, query, fragment = parse_url(url)
keyvals = parse_qsl(query, keep_blank_values)
keyvals.sort()
query = urlencode(keyvals)
# XXX: copied from w3lib.url.safe_url_string to add encoding argument
# path = to_native_str(path, encoding)
# path = moves.urllib.parse.quote(path, _safe_chars, encoding='latin1') or '/'
path = safe_url_string(_unquotepath(path)) or '/'
fragment = '' if not keep_fragments else fragment
return urlunparse((scheme, netloc.lower(), path, params, query, fragment))
开发者ID:AugustLONG,项目名称:scrapy,代码行数:30,代码来源:url.py
示例7: _cs_request
def _cs_request(self, url, method, **kwargs):
# Check that certain things are called correctly
if method in ['GET', 'DELETE']:
assert 'body' not in kwargs
elif method == 'PUT':
assert 'body' in kwargs
# Call the method
args = urlparse.parse_qsl(urlparse.urlparse(url)[4])
kwargs.update(args)
munged_url = url.rsplit('?', 1)[0]
munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
munged_url = munged_url.replace('-', '_')
callback = "%s_%s" % (method.lower(), munged_url)
if not hasattr(self, callback):
raise AssertionError('Called unknown API method: %s %s, '
'expected fakes method name: %s' %
(method, url, callback))
# Note the call
self.callstack.append((method, url, kwargs.get('body', None)))
status, headers, body = getattr(self, callback)(**kwargs)
r = utils.TestResponse({
"status_code": status,
"text": body,
"headers": headers,
})
return r, body
开发者ID:wangxiyuan1,项目名称:python-cinderclient,代码行数:30,代码来源:fakes.py
示例8: parse_request
def parse_request(self, environ):
def rebuild_header_name(environ_key):
"""Construct the HTTP header name from a WSGI environ variable.
"""
header_name = environ_key[5:] # strip 'HTTP_'
name_parts = header_name.split("_")
header_name = "-".join(part.capitalize() for part in name_parts)
return header_name
request = {}
request["host"] = environ.get("HTTP_HOST", None)
request["path"] = environ.get("PATH_INFO", None)
request["query"] = dict(parse_qsl(environ.get("QUERY_STRING", None)))
request["method"] = environ.get("REQUEST_METHOD", None)
request["headers"] = {}
for key in environ:
if key.startswith("HTTP_"):
header_name = rebuild_header_name(key)
request["headers"][header_name] = environ[key]
if "CONTENT_TYPE" in environ:
request["headers"]["Content-Type"] = environ["CONTENT_TYPE"]
if "CONTENT_LENGTH" in environ:
request["headers"]["Content-Length"] = environ["CONTENT_LENGTH"]
if environ["CONTENT_LENGTH"]:
request["body"] = get_post(environ)
return request
开发者ID:htobenothing,项目名称:pyoidc,代码行数:30,代码来源:proof_of_possesion.py
示例9: methodNext
def methodNext(self, previous_request, previous_response):
"""Retrieves the next page of results.
Args:
previous_request: The request for the previous page. (required)
previous_response: The response from the request for the previous page. (required)
Returns:
A request object that you can call 'execute()' on to request the next
page. Returns None if there are no more items in the collection.
"""
# Retrieve nextPageToken from previous_response
# Use as pageToken in previous_request to create new request.
if "nextPageToken" not in previous_response or not previous_response["nextPageToken"]:
return None
request = copy.copy(previous_request)
pageToken = previous_response["nextPageToken"]
parsed = list(urlparse(request.uri))
q = parse_qsl(parsed[4])
# Find and remove old 'pageToken' value from URI
newq = [(key, value) for (key, value) in q if key != "pageToken"]
newq.append(("pageToken", pageToken))
parsed[4] = urlencode(newq)
uri = urlunparse(parsed)
request.uri = uri
logger.info("URL being requested: {0!s} {1!s}".format(methodName, uri))
return request
开发者ID:runt18,项目名称:google-api-python-client,代码行数:34,代码来源:discovery.py
示例10: update_url_query
def update_url_query(*args, **kwargs):
"""
Return a new URL with the query parameters of the URL updated based on the
keyword arguments of the function call. If the argument already exists in the
URL, it will be overwritten with the new value; if not, it will be added.
However, if the new value is None, then any existing query parameters with
that key will be removed without being replaced.
The URL must be passed as the first positional argument of the function;
it cannot be passed as a keyword argument.
"""
if not args:
raise TypeError("URL must be passed as the first positional argument")
url = args[0]
scheme, netloc, path, query, fragment = urlsplit(url)
qlist = parse_qsl(query)
for key, value in kwargs.items():
# remove all key/value pairs from qlist that match this key
qlist = [pair for pair in qlist if not pair[0] == key]
# add this key/value pair to the qlist (unless it's None)
if value is not None:
qlist.append((key, value))
# bring it on back
query = urlencode(qlist)
return urlunsplit((scheme, netloc, path, query, fragment))
开发者ID:daseinphil,项目名称:seamless-karma,代码行数:25,代码来源:utils.py
示例11: test_api_request_w_query_params
def test_api_request_w_query_params(self):
from six.moves.urllib.parse import parse_qsl
from six.moves.urllib.parse import urlsplit
conn = self._makeMockOne()
http = conn._http = _Http(
{'status': '200', 'content-type': 'application/json'},
b'{}',
)
self.assertEqual(conn.api_request('GET', '/', {'foo': 'bar'}), {})
self.assertEqual(http._called_with['method'], 'GET')
uri = http._called_with['uri']
scheme, netloc, path, qs, _ = urlsplit(uri)
self.assertEqual('%s://%s' % (scheme, netloc), conn.API_BASE_URL)
# Intended to emulate self.mock_template
PATH = '/'.join([
'',
'mock',
conn.API_VERSION,
'',
])
self.assertEqual(path, PATH)
parms = dict(parse_qsl(qs))
self.assertEqual(parms['foo'], 'bar')
self.assertIsNone(http._called_with['body'])
expected_headers = {
'Accept-Encoding': 'gzip',
'Content-Length': '0',
'User-Agent': conn.USER_AGENT,
}
self.assertEqual(http._called_with['headers'], expected_headers)
开发者ID:jonparrott,项目名称:gcloud-python,代码行数:30,代码来源:test_connection.py
示例12: dispatch_wsgi
def dispatch_wsgi(self):
""" WSGI handler """
if hasattr(self, 'response'):
return self._write_response()
LOGGER.debug('WSGI mode detected')
if self.environ['REQUEST_METHOD'] == 'POST':
try:
request_body_size = int(self.environ.get('CONTENT_LENGTH', 0))
except (ValueError):
request_body_size = 0
self.requesttype = 'POST'
self.request = self.environ['wsgi.input'].read(request_body_size)
LOGGER.debug('Request type: POST. Request:\n%s\n', self.request)
else: # it's a GET request
self.requesttype = 'GET'
self.request = wsgiref.util.request_uri(self.environ)
try:
query_part = splitquery(self.request)[-1]
self.kvp = dict(parse_qsl(query_part, keep_blank_values=True))
except AttributeError as err:
LOGGER.exception('Could not parse query string')
self.kvp = {}
LOGGER.debug('Request type: GET. Request:\n%s\n', self.request)
return self.dispatch()
开发者ID:PublicaMundi,项目名称:pycsw,代码行数:29,代码来源:server.py
示例13: _get_credentials
def _get_credentials(self, key):
request_token = {
'oauth_token': sickbeard.TWITTER_USERNAME,
'oauth_token_secret': sickbeard.TWITTER_PASSWORD,
'oauth_callback_confirmed': 'true'
}
token = oauth.Token(request_token['oauth_token'], request_token['oauth_token_secret'])
token.set_verifier(key)
logger.log(u'Generating and signing request for an access token using key ' + key, logger.DEBUG)
signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1() # @UnusedVariable
oauth_consumer = oauth.Consumer(key=self.consumer_key, secret=self.consumer_secret)
logger.log(u'oauth_consumer: ' + str(oauth_consumer), logger.DEBUG)
oauth_client = oauth.Client(oauth_consumer, token)
logger.log(u'oauth_client: ' + str(oauth_client), logger.DEBUG)
resp, content = oauth_client.request(self.ACCESS_TOKEN_URL, method='POST', body='oauth_verifier=%s' % key)
logger.log(u'resp, content: ' + str(resp) + ',' + str(content), logger.DEBUG)
access_token = dict(parse_qsl(content))
logger.log(u'access_token: ' + str(access_token), logger.DEBUG)
logger.log(u'resp[status] = ' + str(resp['status']), logger.DEBUG)
if resp['status'] != '200':
logger.log(u'The request for a token with did not succeed: ' + str(resp['status']), logger.ERROR)
return False
else:
logger.log(u'Your Twitter Access Token key: %s' % access_token['oauth_token'], logger.DEBUG)
logger.log(u'Access Token secret: %s' % access_token['oauth_token_secret'], logger.DEBUG)
sickbeard.TWITTER_USERNAME = access_token['oauth_token']
sickbeard.TWITTER_PASSWORD = access_token['oauth_token_secret']
return True
开发者ID:Eiber,项目名称:SickRage-Medusa,代码行数:33,代码来源:tweet.py
示例14: __init__
def __init__(self, columns):
for key, value in map2x(None, self.COLUMNS, columns):
if key:
setattr(self, key, value)
# special handling for 'args' - parse it into a dict if it is a string
if isinstance(self.args, string_types):
self.args = dict(x for x in parse_qsl(self.args.encode('utf-8')))
开发者ID:jleclanche,项目名称:internetarchive,代码行数:7,代码来源:catalog.py
示例15: prepare_request_mock
def prepare_request_mock(self, data, referer='http://localhost/user_with_workspaces/Public Workspace', user=None, extra_headers={}, GET=''):
request = Mock()
request.method = 'POST'
request.get_host.return_value = 'localhost'
GET_PARAMETERS = parse_qsl(GET)
request.GET = MagicMock()
request.GET.__len__.side_effect = lambda: len(GET_PARAMETERS)
request.GET.__getitem__.side_effect = lambda key: GET_PARAMETERS[key]
request.GET.urlencode.side_effect = lambda: GET
request.COOKIES = {
settings.SESSION_COOKIE_NAME: 'test',
}
request.META = {
'HTTP_ACCEPT': 'application/json',
'SERVER_PROTOCOL': 'http',
'REMOTE_ADDR': '127.0.0.1',
'content_type': 'application/json',
'content_length': len(data),
'HTTP_HOST': 'localhost',
'HTTP_REFERER': referer,
'HTTP_X_FI_WARE_OAUTH_TOKEN': 'true',
}
request.META.update(extra_headers)
request.read.return_value = data
if user is None:
request.user = self.admin_mock
else:
request.user = user
return request
开发者ID:GreenIDer-Donati,项目名称:wirecloud,代码行数:30,代码来源:proxy.py
示例16: test_api_request_w_query_params
def test_api_request_w_query_params(self):
from six.moves.urllib.parse import parse_qsl
from six.moves.urllib.parse import urlsplit
PROJECT = 'project'
conn = self._makeOne(PROJECT)
http = conn._http = Http(
{'status': '200', 'content-type': 'application/json'},
'{}',
)
self.assertEqual(conn.api_request('GET', '/', {'foo': 'bar'}), {})
self.assertEqual(http._called_with['method'], 'GET')
uri = http._called_with['uri']
scheme, netloc, path, qs, _ = urlsplit(uri)
self.assertEqual('%s://%s' % (scheme, netloc), conn.API_BASE_URL)
self.assertEqual(path,
'/'.join(['', 'storage', conn.API_VERSION, '']))
parms = dict(parse_qsl(qs))
self.assertEqual(parms['project'], PROJECT)
self.assertEqual(parms['foo'], 'bar')
self.assertEqual(http._called_with['body'], None)
expected_headers = {
'Accept-Encoding': 'gzip',
'Content-Length': 0,
'User-Agent': conn.USER_AGENT,
}
self.assertEqual(http._called_with['headers'], expected_headers)
开发者ID:Loooffy,项目名称:gcloud-python,代码行数:26,代码来源:test_connection.py
示例17: _load_backend
def _load_backend(self, backend_uri, context):
"""
Return the instantiated backend object identified by the given
`backend_uri`.
The entry point that is used to create the backend object is determined
by the protocol part of the given URI.
"""
parsed = parse.urlparse(backend_uri)
options = dict(parse.parse_qsl(parsed.query))
try:
backend = self._entry_points[self.BACKENDS_ENTRY_POINT][parsed.scheme].load()
except KeyError:
raise BackendNotFoundError(
"The requested backend `%s` could not be found in the "
"registered entry points. Perhaps you forgot to install the "
"corresponding backend package?" % parsed.scheme
)
password = (parse.unquote(parsed.password)
if parsed.password
else parsed.password)
return backend(
username=parsed.username, password=password,
hostname=parsed.hostname, port=parsed.port,
path=parsed.path, options=options, context=context,
)
开发者ID:sephii,项目名称:taxi,代码行数:29,代码来源:plugins.py
示例18: _upload_from_file_simple_test_helper
def _upload_from_file_simple_test_helper(self, properties=None,
content_type_arg=None,
expected_content_type=None):
from six.moves.http_client import OK
from six.moves.urllib.parse import parse_qsl
from six.moves.urllib.parse import urlsplit
from tempfile import NamedTemporaryFile
BLOB_NAME = 'blob-name'
DATA = b'ABCDEF'
response = {'status': OK}
connection = _Connection(
(response, b''),
)
bucket = _Bucket(connection)
blob = self._makeOne(BLOB_NAME, bucket=bucket, properties=properties)
blob.CHUNK_SIZE = 5
with NamedTemporaryFile() as fh:
fh.write(DATA)
fh.flush()
blob.upload_from_file(fh, rewind=True,
content_type=content_type_arg)
rq = connection.http._requested
self.assertEqual(len(rq), 1)
self.assertEqual(rq[0]['method'], 'POST')
uri = rq[0]['uri']
scheme, netloc, path, qs, _ = urlsplit(uri)
self.assertEqual(scheme, 'http')
self.assertEqual(netloc, 'example.com')
self.assertEqual(path, '/b/name/o')
self.assertEqual(dict(parse_qsl(qs)),
{'uploadType': 'media', 'name': BLOB_NAME})
headers = dict(
[(x.title(), str(y)) for x, y in rq[0]['headers'].items()])
self.assertEqual(headers['Content-Length'], '6')
self.assertEqual(headers['Content-Type'], expected_content_type)
开发者ID:Loooffy,项目名称:gcloud-python,代码行数:35,代码来源:test_blob.py
示例19: client_request
def client_request(self, client, method, url, **kwargs):
# Check that certain things are called correctly
if method in ["GET", "DELETE"]:
assert "json" not in kwargs
# Note the call
self.callstack.append((method, url, kwargs.get("headers") or {}, kwargs.get("json") or kwargs.get("data")))
try:
fixture = self.fixtures[url][method]
except KeyError:
pass
else:
return TestResponse({"headers": fixture[0], "text": fixture[1]})
# Call the method
args = parse.parse_qsl(parse.urlparse(url)[4])
kwargs.update(args)
munged_url = url.rsplit("?", 1)[0]
munged_url = munged_url.strip("/").replace("/", "_").replace(".", "_")
munged_url = munged_url.replace("-", "_")
callback = "%s_%s" % (method.lower(), munged_url)
if not hasattr(self, callback):
raise AssertionError(
"Called unknown API method: %s %s, " "expected fakes method name: %s" % (method, url, callback)
)
resp = getattr(self, callback)(**kwargs)
if len(resp) == 3:
status, headers, body = resp
else:
status, body = resp
headers = {}
return TestResponse({"status_code": status, "text": body, "headers": headers})
开发者ID:KevinTsang,项目名称:rally,代码行数:35,代码来源:fake_client.py
示例20: parameters
def parameters(self, request, context):
jobid = int(job_re_path.match(request.path).group(1))
job = self._jobs[jobid]
if request.method == 'GET':
pass
elif request.method == 'POST':
data = dict(parse_qsl(request.body))
if 'QUERY' in data:
assert data['QUERY'] == 'SELECT TOP 42 * FROM ivoa.obsCore'
for param in job.parameters:
if param.id_ == 'query':
param.content = data['QUERY']
if 'UPLOAD' in data:
for param in job.parameters:
if param.id_ == 'upload':
uploads1 = {data[0]: data[1] for data in [
data.split(',') for data
in data['UPLOAD'].split(';')
]}
uploads2 = {data[0]: data[1] for data in [
data.split(',') for data
in param.content.split(';')
]}
uploads1.update(uploads2)
param.content = ';'.join([
'{}={}'.format(key, value) for key, value
in uploads1.items()
])
开发者ID:pyvirtobs,项目名称:pyvo,代码行数:33,代码来源:test_tap.py
注:本文中的six.moves.urllib.parse.parse_qsl函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论