本文整理汇总了Python中six.moves.urllib_parse.urlencode函数的典型用法代码示例。如果您正苦于以下问题:Python urlencode函数的具体用法?Python urlencode怎么用?Python urlencode使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了urlencode函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _service_url
def _service_url(request, redirect_to=None, gateway=False):
"""Generates application service URL for CAS"""
protocol = ('http://', 'https://')[request.is_secure()]
host = request.get_host()
prefix = (('http://', 'https://')[request.is_secure()] + host)
service = protocol + host + request.path
if redirect_to:
if '?' in service:
service += '&'
else:
service += '?'
if gateway:
""" If gateway, capture params and reencode them before returning a url """
gateway_params = [(REDIRECT_FIELD_NAME, redirect_to), ('gatewayed','true')]
query_dict = request.GET.copy()
try:
del query_dict['ticket']
except:
pass
query_list = query_dict.items()
#remove duplicate params
for item in query_list:
for index, item2 in enumerate(gateway_params):
if item[0] == item2[0]:
gateway_params.pop(index)
extra_params = gateway_params + query_list
#Sort params by key name so they are always in the same order.
sorted_params = sorted(extra_params, key=itemgetter(0))
service += urlencode(sorted_params)
else:
service += urlencode({REDIRECT_FIELD_NAME: redirect_to})
return service
开发者ID:unistra,项目名称:django-cas,代码行数:35,代码来源:views.py
示例2: verify_link
def verify_link(user):
"""
Used for verifying an e-mail address when the user clicks the link in the verification mail.
"""
proofing_user = ProofingUser.from_user(user, current_app.private_userdb)
code = request.args.get('code')
email = request.args.get('email')
if code and email:
current_app.logger.debug('Trying to save email address {} as verified for user {}'.format(email, proofing_user))
url = urlappend(current_app.config['DASHBOARD_URL'], 'emails')
scheme, netloc, path, query_string, fragment = urlsplit(url)
try:
state = current_app.proofing_statedb.get_state_by_eppn_and_email(proofing_user.eppn, email)
timeout = current_app.config.get('EMAIL_VERIFICATION_TIMEOUT', 24)
if state.is_expired(timeout):
current_app.logger.info("Verification code is expired. Removing the state")
current_app.logger.debug("Proofing state: {}".format(state))
current_app.proofing_statedb.remove_state(state)
new_query_string = urlencode({'msg': ':ERROR:emails.code_invalid_or_expired'})
url = urlunsplit((scheme, netloc, path, new_query_string, fragment))
return redirect(url)
except DocumentDoesNotExist:
current_app.logger.info('Could not find proofing state for email {}'.format(email))
new_query_string = urlencode({'msg': ':ERROR:emails.unknown_email'})
url = urlunsplit((scheme, netloc, path, new_query_string, fragment))
return redirect(url)
if code == state.verification.verification_code:
try:
verify_mail_address(state, proofing_user)
current_app.logger.info('Email successfully verified')
current_app.logger.debug('Email address: {}'.format(email))
new_query_string = urlencode({'msg': 'emails.verification-success'})
url = urlunsplit((scheme, netloc, path, new_query_string, fragment))
return redirect(url)
except UserOutOfSync:
current_app.logger.info('Could not confirm email, data out of sync')
current_app.logger.debug('Mail address: {}'.format(email))
new_query_string = urlencode({'msg': ':ERROR:user-out-of-sync'})
url = urlunsplit((scheme, netloc, path, new_query_string, fragment))
return redirect(url)
current_app.logger.info("Invalid verification code")
current_app.logger.debug("Email address: {}".format(state.verification.email))
new_query_string = urlencode({'msg': ':ERROR:emails.code_invalid_or_expired'})
url = urlunsplit((scheme, netloc, path, new_query_string, fragment))
return redirect(url)
abort(400)
开发者ID:SUNET,项目名称:eduid-webapp,代码行数:48,代码来源:views.py
示例3: _build_url
def _build_url(self):
url = list(urlsplit(self._path))
qs = parse_qs(url[3])
qs["count"] = self._count
qs["page"] = self._page
url[3] = urlencode(qs, doseq=True)
return urlunsplit(url)
开发者ID:P-EB,项目名称:mailman3-client,代码行数:7,代码来源:_client.py
示例4: _request
def _request(self, query, phases=None, page=0):
phases = ','.join([str(int(x)) for x in phases]) if phases else ''
response, content = self.network.request(
uri=self.endpoint + '?' + urlencode({
'q': json.dumps(query),
'phases': phases,
'page': page,
'pagesize': MPDSDataRetrieval.pagesize
}),
method='GET',
headers={'Key': self.api_key}
)
if response.status != 200:
return {'error': 'HTTP error code %s' % response.status, 'code': response.status}
try:
content = json.loads(content)
except:
return {'error': 'Unreadable data obtained'}
if content.get('error'):
return {'error': content['error']}
if not content['out']:
return {'error': 'No hits', 'code': 1}
return content
开发者ID:matk86,项目名称:MatMiner,代码行数:26,代码来源:retrieve_MPDS.py
示例5: process_view
def process_view(self, request, view_func, view_args, view_kwargs):
"""Forwards unauthenticated requests to the admin page to the CAS
login URL, as well as calls to django.contrib.auth.views.login and
logout.
"""
if view_func in (login, cas_login) and request.POST.get(
'logoutRequest', ''):
if cas_request_logout_allowed(request):
return cas_logout(request, *view_args, **view_kwargs)
return HttpResponseForbidden()
if view_func == login:
return cas_login(request, *view_args, **view_kwargs)
elif view_func == logout:
return cas_logout(request, *view_args, **view_kwargs)
# for all view modules except django admin. by default, we redirect to
# cas for all admin views
# for all other views, we treats the request with respect of views
# configuration
if not (self._is_an_admin_view(view_func) and settings.CAS_ADMIN_AUTH):
return None
if request.user.is_authenticated():
if request.user.is_staff:
return None
else:
error = ('<h1>Forbidden</h1><p>You do not have staff '
'privileges.</p>')
return HttpResponseForbidden(error)
params = urlencode({REDIRECT_FIELD_NAME: request.get_full_path()})
return HttpResponseRedirect(
'{}?{}'.format(reverse('django_cas:login'), params)
)
开发者ID:unistra,项目名称:django-cas,代码行数:35,代码来源:middleware.py
示例6: _get_api_registrations
def _get_api_registrations(self, params):
data = urllib_parse.urlencode(params)
headers = {}
headers['Content-Type'] = 'application/x-www-form-urlencoded'
response = None
attempts = 0
while attempts < self.retries:
try:
response = requests.request(
method='GET',
url=self.url,
headers=headers,
timeout=self.timeout,
params=data)
self.logger.debug('Got repsonse: %s.', response.json())
break
except Exception:
msg = 'Unable to connect to registrations API.'
self.logger.exception(msg)
attempts += 1
eventlet.sleep(self.retry_delay)
if not response:
raise Exception('Failed to connect to TypeForm API.')
if response.status_code != httplib.OK:
failure_reason = ('Failed to retrieve registrations: %s \
(status code: %s)' % (response.text, response.status_code))
self.logger.error(failure_reason)
raise Exception(failure_reason)
return response.json()
开发者ID:AlexeyDeyneko,项目名称:st2contrib,代码行数:33,代码来源:registration_sensor.py
示例7: test_events_async
def test_events_async(self):
# type: () -> None
user_profile = self.example_user('hamlet')
self.login(user_profile.email)
event_queue_id = self.create_queue()
data = {
'queue_id': event_queue_id,
'last_event_id': 0,
}
path = '/json/events?{}'.format(urllib_parse.urlencode(data))
self.client_get_async(path)
def process_events():
# type: () -> None
users = [user_profile.id]
event = dict(
type='test',
data='test data',
)
process_event(event, users)
self.io_loop.call_later(0.1, process_events)
response = self.wait()
data = ujson.loads(response.body)
events = data['events']
events = cast(List[Dict[str, Any]], events)
self.assertEqual(len(events), 1)
self.assertEqual(events[0]['data'], 'test data')
self.assertEqual(data['result'], 'success')
开发者ID:brockwhittaker,项目名称:zulip,代码行数:30,代码来源:test_tornado.py
示例8: parse_grip_uri
def parse_grip_uri(uri):
parsed = urlparse(uri)
# HACK: work around '+' character in base64-encoded values
query = parsed.query.replace('+', '%2B')
params = parse_qs(query)
iss = None
key = None
if 'iss' in params:
iss = params['iss'][0]
del params['iss']
if 'key' in params:
key = params['key'][0]
del params['key']
if key is not None and key.startswith('base64:'):
key = b64decode(key[7:])
qs = urlencode(params, True)
path = parsed.path
if path.endswith('/'):
path = path[:-1]
control_uri = parsed.scheme + '://' + parsed.netloc + path
if qs:
control_uri += '?' + qs
out = {'control_uri': control_uri}
if iss:
out['control_iss'] = iss
if key:
out['key'] = key
return out
开发者ID:fanout,项目名称:pygripcontrol,代码行数:28,代码来源:gripcontrol.py
示例9: make_url
def make_url(*args, **kwargs):
"""Makes a URL from component parts"""
base = '/'.join(args)
if kwargs:
return "%s?%s" % (base, urlencode(kwargs),)
else:
return base
开发者ID:Oli76,项目名称:rwslib,代码行数:7,代码来源:__init__.py
示例10: verify_proxy_ticket
def verify_proxy_ticket(ticket, service):
"""Verifies CAS 2.0+ XML-based proxy ticket.
Returns username on success and None on failure.
"""
try:
from xml.etree import ElementTree
except ImportError:
from elementtree import ElementTree
params = {'ticket': ticket, 'service': service}
url = (urljoin(settings.CAS_SERVER_URL, 'proxyValidate') + '?' +
urlencode(params))
page = urlopen(url)
try:
response = page.read()
tree = ElementTree.fromstring(response)
if tree[0].tag.endswith('authenticationSuccess'):
username = tree[0][0].text
proxies = []
if len(tree[0]) > 1:
for element in tree[0][1]:
proxies.append(element.text)
return {"username": username, "proxies": proxies}, None
else:
return None, None
finally:
page.close()
开发者ID:UGentPortaal,项目名称:django-cas,代码行数:32,代码来源:backends.py
示例11: auth_url
def auth_url(self):
"""Return authorization redirect url."""
key, secret = self.get_key_and_secret()
callback = self.strategy.absolute_uri(self.redirect_uri)
callback = sub(r'^https', 'http', callback)
query = urlencode({'cb': callback})
return 'https://www.twilio.com/authorize/{0}?{1}'.format(key, query)
开发者ID:BeatrizFerreira,项目名称:EP1DAS,代码行数:7,代码来源:twilio.py
示例12: wrapped_f
def wrapped_f(*args, **kwargs):
from django_cas.views import login
request = args[0]
if request.user.is_authenticated():
#Is Authed, fine
pass
else:
path_with_params = request.path + '?' + urlencode({
k: v.encode("utf-8") for (k, v) in request.GET.items()
})
if request.GET.get('ticket'):
#Not Authed, but have a ticket!
#Try to authenticate
return login(request, path_with_params, False, True)
else:
#Not Authed, but no ticket
gatewayed = request.GET.get('gatewayed')
if gatewayed == 'true':
pass
else:
#Not Authed, try to authenticate
return login(request, path_with_params, False, True)
return func(*args, **kwargs)
开发者ID:UGentPortaal,项目名称:django-cas,代码行数:26,代码来源:decorators.py
示例13: process_view
def process_view(self, request, view_func, view_args, view_kwargs):
"""Forwards unauthenticated requests to the admin page to the CAS
login URL, as well as calls to django.contrib.auth.views.login and
logout.
"""
if view_func in (login, cas_login) and request.POST.get(
'logoutRequest', ''):
if cas_request_logout_allowed(request):
return cas_logout(request, *view_args, **view_kwargs)
return HttpResponseForbidden()
if view_func == login:
return cas_login(request, *view_args, **view_kwargs)
elif view_func == logout:
return cas_logout(request, *view_args, **view_kwargs)
if settings.CAS_ADMIN_PREFIX:
if not request.path.startswith(settings.CAS_ADMIN_PREFIX):
return None
elif not view_func.__module__.startswith('django.contrib.admin.'):
return None
if request.user.is_authenticated():
if request.user.is_staff:
return None
else:
error = ('<h1>Forbidden</h1><p>You do not have staff '
'privileges.</p>')
return HttpResponseForbidden(error)
params = urlencode({REDIRECT_FIELD_NAME: request.get_full_path()})
return HttpResponseRedirect(reverse(cas_login) + '?' + params)
开发者ID:UGentPortaal,项目名称:django-cas,代码行数:31,代码来源:middleware.py
示例14: send_email
def send_email(email, template, kwargs):
"""Send an email via the mailgun service."""
maildir = os.path.join(topdir, 'emails')
env = jinja2.Environment(loader=jinja2.FileSystemLoader(maildir))
template = env.get_template(template)
rendered = template.render(**kwargs)
headers, message = parse_email(rendered)
mailargs = {'to': email,
'from': app.config['MAIL_FROM'],
'bcc': app.config.get('MAIL_BCC'),
'text': message}
mailargs.update(headers)
conn = HTTPSConnection('api.mailgun.net', 443)
conn.connect()
auth = b64enc('api:{0[MAILGUN_KEY]}'.format(app.config))
headers = {'Authorization': 'Basic {0}'.format(auth),
'Accept': 'application/json',
'Content-type': 'application/x-www-form-urlencoded'}
url = '/v2/{0[MAILGUN_DOMAIN]}/messages'.format(app.config)
body = urlencode(mailargs)
conn.request('POST', url, body, headers)
resp = conn.getresponse()
if resp.status != 200:
raise RuntimeError('could not send email')
conn.close()
开发者ID:SShiva,项目名称:online-trial,代码行数:25,代码来源:website.py
示例15: get_proxy_ticket_for
def get_proxy_ticket_for(self, service):
"""Verifies CAS 2.0+ XML-based authentication ticket.
Returns username on success and None on failure.
"""
if not settings.CAS_PROXY_CALLBACK:
raise CasConfigException("No proxy callback set in settings")
try:
from xml.etree import ElementTree
except ImportError:
from elementtree import ElementTree
params = {'pgt': self.tgt, 'targetService': service}
url = (urljoin(settings.CAS_SERVER_URL, 'proxy') + '?' +
urlencode(params))
page = urlopen(url)
try:
response = page.read()
tree = ElementTree.fromstring(response)
if tree[0].tag.endswith('proxySuccess'):
return tree[0][0].text
else:
raise CasTicketException("Failed to get proxy ticket")
finally:
page.close()
开发者ID:rlmv,项目名称:django-dartmouth-cas,代码行数:29,代码来源:models.py
示例16: wrapped_f
def wrapped_f(*args):
from django_cas.views import login
request = args[0]
if request.user.is_authenticated():
# Is Authed, fine
pass
else:
path_with_params = request.path + "?" + urlencode(request.GET.copy())
if request.GET.get("ticket"):
# Not Authed, but have a ticket!
# Try to authenticate
return login(request, path_with_params, False, True)
else:
# Not Authed, but no ticket
gatewayed = request.GET.get("gatewayed")
if gatewayed == "true":
pass
else:
# Not Authed, try to authenticate
return login(request, path_with_params, False, True)
return func(*args)
开发者ID:rlmv,项目名称:django-dartmouth-cas,代码行数:25,代码来源:decorators.py
示例17: authorize
def authorize(user):
if user.orcid is None:
proofing_state = current_app.proofing_statedb.get_state_by_eppn(user.eppn, raise_on_missing=False)
if not proofing_state:
current_app.logger.debug('No proofing state found for user {!s}. Initializing new proofing state.'.format(
user))
proofing_state = OrcidProofingState({'eduPersonPrincipalName': user.eppn, 'state': get_unique_hash(),
'nonce': get_unique_hash()})
current_app.proofing_statedb.save(proofing_state)
claims_request = ClaimsRequest(userinfo=Claims(id=None))
oidc_args = {
'client_id': current_app.oidc_client.client_id,
'response_type': 'code',
'scope': 'openid',
'claims': claims_request.to_json(),
'redirect_uri': url_for('orcid.authorization_response', _external=True),
'state': proofing_state.state,
'nonce': proofing_state.nonce,
}
authorization_url = '{}?{}'.format(current_app.oidc_client.authorization_endpoint, urlencode(oidc_args))
current_app.logger.debug('Authorization url: {!s}'.format(authorization_url))
current_app.stats.count(name='authn_request')
return redirect(authorization_url)
# Orcid already connected to user
url = urlappend(current_app.config['DASHBOARD_URL'], 'accountlinking')
scheme, netloc, path, query_string, fragment = urlsplit(url)
new_query_string = urlencode({'msg': ':ERROR:orc.already_connected'})
url = urlunsplit((scheme, netloc, path, new_query_string, fragment))
return redirect(url)
开发者ID:SUNET,项目名称:eduid-webapp,代码行数:30,代码来源:views.py
示例18: test_response
def test_response(self):
lwt = addon_factory(type=amo.ADDON_PERSONA)
lwt.persona.persona_id = 666
lwt.persona.save()
stheme = addon_factory(type=amo.ADDON_STATICTHEME)
stheme.current_version.files.all()[0].update(
filename='foo.xpi', hash='brown')
MigratedLWT.objects.create(
lightweight_theme=lwt, static_theme=stheme, getpersonas_id=666)
update = self.get_update('en-US', lwt.id)
response = json.loads(update.get_json())
url = '{0}{1}/{2}?{3}'.format(
user_media_url('addons'), str(stheme.id), 'foo.xpi',
urlencode({'filehash': 'brown'}))
assert update.data == {
'stheme_id': stheme.id, 'filename': 'foo.xpi', 'hash': 'brown'}
assert response == {
"converted_theme": {
"url": url,
"hash": 'brown'
}
}
update = self.get_update('en-US', 666, 'src=gp')
response = json.loads(update.get_json())
assert update.data == {
'stheme_id': stheme.id, 'filename': 'foo.xpi', 'hash': 'brown'}
assert response == {
"converted_theme": {
"url": url,
"hash": 'brown'
}
}
开发者ID:diox,项目名称:olympia,代码行数:34,代码来源:test_theme_update.py
示例19: access_token_callback
def access_token_callback(request, uri, headers):
assert request.method == 'POST'
parsed_body = request.parse_request_body(request.body)
assert 'client_id' in parsed_body
assert 'client_secret' in parsed_body
assert 'code' in parsed_body
assert 'redirect_uri' in parsed_body
if parsed_body['code'][0] == 'bad_verification_code':
body = dict(
error_uri='http://developer.github.com/v3/oauth/'
'#bad-verification-code',
error_description='The code passed is '
'incorrect or expired.',
error='bad_verification_code',
)
else:
body = dict(
access_token='%s_token' % parsed_body['code'][0],
scope='admin:repo_hook,user:email',
token_type='bearer',
)
headers['content-type'] = 'application/x-www-form-urlencoded'
return (
200,
headers,
urllib_parse.urlencode(body)
)
开发者ID:SDSG-Invenio,项目名称:zenodo,代码行数:30,代码来源:fixtures.py
示例20: login
def login(self, username, password):
"""Log in to Betfair. Sets `session_token` if successful.
:param str username: Username
:param str password: Password
:raises: BetfairLoginError
"""
response = self.session.post(
os.path.join(self.identity_url, 'certlogin'),
cert=self.cert_file,
data=urllib.urlencode({
'username': username,
'password': password,
}),
headers={
'X-Application': self.app_key,
'Content-Type': 'application/x-www-form-urlencoded',
},
timeout=self.timeout,
)
utils.check_status_code(response, [httplib.OK])
data = response.json()
if data.get('loginStatus') != 'SUCCESS':
raise exceptions.LoginError(response, data)
self.session_token = data['sessionToken']
开发者ID:BenderV,项目名称:betfair2.py,代码行数:25,代码来源:betfair.py
注:本文中的six.moves.urllib_parse.urlencode函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论