本文整理汇总了Python中six.moves.urllib.parse.quote_plus函数的典型用法代码示例。如果您正苦于以下问题:Python quote_plus函数的具体用法?Python quote_plus怎么用?Python quote_plus使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了quote_plus函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _setup_firewall
def _setup_firewall(self, ri, fw):
client = self._get_vyatta_client(ri.router)
fw_cmd_list = []
# Create firewall
fw_name = vyatta_utils.get_firewall_name(ri, fw)
fw_cmd_list.append(
vyatta_client.SetCmd(
FW_NAME.format(parse.quote_plus(fw_name))))
if fw.get('description'):
fw_cmd_list.append(vyatta_client.SetCmd(
FW_DESCRIPTION.format(
parse.quote_plus(fw_name),
parse.quote_plus(fw['description']))))
# Set firewall state policy
fw_cmd_list.append(vyatta_client.SetCmd(FW_ESTABLISHED_ACCEPT))
fw_cmd_list.append(vyatta_client.SetCmd(FW_RELATED_ACCEPT))
# Create firewall rules
rule_num = 0
for rule in fw['firewall_rule_list']:
if not rule['enabled']:
continue
if rule['ip_version'] == 4:
rule_num += 1
fw_cmd_list += self._set_firewall_rule(fw_name, rule_num, rule)
else:
LOG.warn(_LW("IPv6 rules are not supported."))
# Configure router zones
zone_cmd_list = vyatta_utils.get_zone_cmds(client, ri, fw_name)
client.exec_cmd_batch(fw_cmd_list + zone_cmd_list)
开发者ID:armando-migliaccio,项目名称:neutron-fwaas,代码行数:35,代码来源:vyatta_fwaas.py
示例2: _setup_optimizer
def _setup_optimizer(self, ri, opt):
client = self._get_vyatta_client(ri.router)
opt_cmd_list = []
# Create optimizer
opt_name = vyatta_utils.get_optimizer_name(ri, opt)
opt_cmd_list.append(
vyatta_client.SetCmd(
FW_NAME.format(parse.quote_plus(opt_name))))
if opt.get('description'):
opt_cmd_list.append(vyatta_client.SetCmd(
FW_DESCRIPTION.format(
parse.quote_plus(opt_name),
parse.quote_plus(opt['description']))))
# Set optimizer state policy
opt_cmd_list.append(vyatta_client.SetCmd(FW_ESTABLISHED_ACCEPT))
opt_cmd_list.append(vyatta_client.SetCmd(FW_RELATED_ACCEPT))
# Create optimizer rules
rule_num = 0
for rule in opt['optimizer_rule_list']:
if not rule['enabled']:
continue
if rule['ip_version'] == 4:
rule_num += 1
opt_cmd_list += self._set_optimizer_rule(opt_name, rule_num, rule)
else:
LOG.warn(_LW("IPv6 rules are not supported."))
# Configure router zones
zone_cmd_list = vyatta_utils.get_zone_cmds(client, ri, opt_name)
client.exec_cmd_batch(opt_cmd_list + zone_cmd_list)
开发者ID:carlosv5,项目名称:OaaS-network,代码行数:35,代码来源:vyatta_oaas.py
示例3: get_raw_yaml_from_repo
def get_raw_yaml_from_repo(repo, full_name, commit_sha):
"""Return decoded YAML data structure from
the given file in *repo* at *commit_sha*.
:arg commit_sha: A byte string containing the commit hash
"""
from six.moves.urllib.parse import quote_plus
cache_key = "%RAW%%2".join((
quote_plus(repo.controldir()), quote_plus(full_name), commit_sha.decode()))
import django.core.cache as cache
def_cache = cache.caches["default"]
result = None
# Memcache is apparently limited to 250 characters.
if len(cache_key) < 240:
result = def_cache.get(cache_key)
if result is not None:
return result
result = load_yaml(
expand_yaml_macros(
repo, commit_sha,
get_repo_blob(repo, full_name, commit_sha).data))
def_cache.add(cache_key, result, None)
return result
开发者ID:akiyoko,项目名称:relate,代码行数:28,代码来源:content.py
示例4: get_yaml_from_repo
def get_yaml_from_repo(repo, full_name, commit_sha, cached=True):
"""Return decoded, struct-ified YAML data structure from
the given file in *repo* at *commit_sha*.
See :class:`relate.utils.Struct` for more on
struct-ification.
"""
if cached:
from six.moves.urllib.parse import quote_plus
cache_key = "%%%2".join(
(quote_plus(repo.controldir()), quote_plus(full_name),
commit_sha.decode()))
import django.core.cache as cache
def_cache = cache.caches["default"]
result = None
# Memcache is apparently limited to 250 characters.
if len(cache_key) < 240:
result = def_cache.get(cache_key)
if result is not None:
return result
expanded = expand_yaml_macros(
repo, commit_sha,
get_repo_blob(repo, full_name, commit_sha).data)
result = dict_to_struct(load_yaml(expanded))
if cached:
def_cache.add(cache_key, result, None)
return result
开发者ID:akiyoko,项目名称:relate,代码行数:33,代码来源:content.py
示例5: tribler_urlencode_single
def tribler_urlencode_single(key, value):
utf8_key = quote_plus(text_type(key).encode('utf-8'))
# Convert bool values to ints
if isinstance(value, bool):
value = int(value)
utf8_value = quote_plus(text_type(value).encode('utf-8'))
return "%s=%s" % (utf8_key, utf8_value)
开发者ID:Tribler,项目名称:tribler,代码行数:7,代码来源:tribler_request_manager.py
示例6: get_raw_yaml_from_repo
def get_raw_yaml_from_repo(repo, full_name, commit_sha):
# type: (Repo_ish, Text, bytes) -> Any
"""Return decoded YAML data structure from
the given file in *repo* at *commit_sha*.
:arg commit_sha: A byte string containing the commit hash
"""
from six.moves.urllib.parse import quote_plus
cache_key = "%RAW%%2".join((
CACHE_KEY_ROOT,
quote_plus(repo.controldir()), quote_plus(full_name), commit_sha.decode(),
))
import django.core.cache as cache
def_cache = cache.caches["default"]
result = None # type: Optional[Any]
# Memcache is apparently limited to 250 characters.
if len(cache_key) < 240:
result = def_cache.get(cache_key)
if result is not None:
return result
yaml_str = expand_yaml_macros(
repo, commit_sha,
get_repo_blob(repo, full_name, commit_sha,
allow_tree=False).data)
result = load_yaml(yaml_str) # type: ignore
def_cache.add(cache_key, result, None)
return result
开发者ID:ishitatsuyuki,项目名称:relate,代码行数:34,代码来源:content.py
示例7: test_get_console_url
def test_get_console_url(self, mock_get):
token = "abcdefg123"
callbackurl = ""
mock_get.return_value = Mock(text=u'{"SigninToken": "%s"}' % token,
status_code=200,
reason="Ok")
url_template = ("https://signin.aws.amazon.com/federation?"
"Action=login&"
"Issuer={callbackurl}&"
"Destination=https%3A%2F%2Fconsole.aws.amazon.com%2F&"
"SigninToken={token}")
expected_url = url_template.format(callbackurl=quote_plus(callbackurl),
token=token).encode()
result = self.app.get('/account/testaccount/testrole/consoleurl')
self.assertEqual(result.status_int, 200)
self.assertEqual(result.body, expected_url)
# Check if the Callback URL is set
callbackurl = "https://www.foobar.invalid"
expected_url = url_template.format(callbackurl=quote_plus(callbackurl),
token=token).encode()
result = self.app.get('/account/testaccount/testrole/consoleurl',
{'callbackurl': callbackurl})
self.assertEqual(result.status_int, 200)
self.assertEqual(result.body, expected_url)
开发者ID:ImmobilienScout24,项目名称:afp-core,代码行数:26,代码来源:api_endpoint_tests.py
示例8: _process_repo
def _process_repo(repo, runtime_storage_inst, record_processor_inst, rcs_inst):
uri = repo["uri"]
LOG.info("Processing repo uri: %s", uri)
LOG.debug("Processing blueprints for repo uri: %s", uri)
bp_iterator = lp.log(repo)
bp_iterator_typed = _record_typer(bp_iterator, "bp")
processed_bp_iterator = record_processor_inst.process(bp_iterator_typed)
runtime_storage_inst.set_records(processed_bp_iterator, utils.merge_records)
LOG.debug("Processing bugs for repo uri: %s", uri)
current_date = utils.date_to_timestamp("now")
bug_modified_since = runtime_storage_inst.get_by_key("bug_modified_since-%s" % repo["module"])
bug_iterator = bps.log(repo, bug_modified_since)
bug_iterator_typed = _record_typer(bug_iterator, "bug")
processed_bug_iterator = record_processor_inst.process(bug_iterator_typed)
runtime_storage_inst.set_records(processed_bug_iterator, utils.merge_records)
runtime_storage_inst.set_by_key("bug_modified_since-%s" % repo["module"], current_date)
vcs_inst = vcs.get_vcs(repo, cfg.CONF.sources_root)
vcs_inst.fetch()
branches = {repo.get("default_branch", "master")}
for release in repo.get("releases"):
if "branch" in release:
branches.add(release["branch"])
for branch in branches:
LOG.debug("Processing commits in repo: %s, branch: %s", uri, branch)
vcs_key = "vcs:" + str(parse.quote_plus(uri) + ":" + branch)
last_id = runtime_storage_inst.get_by_key(vcs_key)
commit_iterator = vcs_inst.log(branch, last_id)
commit_iterator_typed = _record_typer(commit_iterator, "commit")
processed_commit_iterator = record_processor_inst.process(commit_iterator_typed)
runtime_storage_inst.set_records(processed_commit_iterator, _merge_commits)
last_id = vcs_inst.get_last_id(branch)
runtime_storage_inst.set_by_key(vcs_key, last_id)
LOG.debug("Processing reviews for repo: %s, branch: %s", uri, branch)
rcs_key = "rcs:" + str(parse.quote_plus(uri) + ":" + branch)
last_id = runtime_storage_inst.get_by_key(rcs_key)
review_iterator = rcs_inst.log(repo, branch, last_id, grab_comments=("ci" in repo))
review_iterator_typed = _record_typer(review_iterator, "review")
if "ci" in repo: # add external CI data
review_iterator_typed = _process_reviews(review_iterator_typed, repo["ci"], repo["module"], branch)
processed_review_iterator = record_processor_inst.process(review_iterator_typed)
runtime_storage_inst.set_records(processed_review_iterator, utils.merge_records)
last_id = rcs_inst.get_last_id(repo, branch)
runtime_storage_inst.set_by_key(rcs_key, last_id)
开发者ID:karthik-suresh,项目名称:stackalytics,代码行数:59,代码来源:main.py
示例9: encode_uri
def encode_uri(uri):
split = list(urlsplit(uri))
split[1] = split[1].encode('idna').decode('ascii')
split[2] = quote_plus(split[2].encode('utf-8'), '/').decode('ascii')
query = list((q, quote_plus(v.encode('utf-8')))
for (q, v) in parse_qsl(split[3]))
split[3] = urlencode(query).decode('ascii')
return urlunsplit(split)
开发者ID:AlexEshoo,项目名称:sphinx,代码行数:8,代码来源:__init__.py
示例10: url_quote_plus
def url_quote_plus(v, name='(Unknown name)', md={}):
if six.PY2 and isinstance(v, unicode):
# quote_plus does not handle unicode. Encoding to a "safe"
# intermediate encoding before quoting, then unencoding the result.
return quote_plus(v.encode('utf-8')).decode('utf-8')
elif six.PY3 and isinstance(v, bytes):
return quote_plus(v.decode('utf-8')).encode('utf-8')
return quote_plus(str(v))
开发者ID:zopefoundation,项目名称:DocumentTemplate,代码行数:8,代码来源:DT_Var.py
示例11: _generate_connection_uri
def _generate_connection_uri(self):
if self.use_proxy:
if self.sql_proxy_use_tcp:
if not self.sql_proxy_tcp_port:
self.reserve_free_tcp_port()
if not self.sql_proxy_unique_path:
self.sql_proxy_unique_path = self._generate_unique_path()
database_uris = CONNECTION_URIS[self.database_type]
ssl_spec = None
socket_path = None
if self.use_proxy:
proxy_uris = database_uris['proxy']
if self.sql_proxy_use_tcp:
format_string = proxy_uris['tcp']
else:
format_string = proxy_uris['socket']
socket_path = \
"{sql_proxy_socket_path}/{instance_socket_name}".format(
sql_proxy_socket_path=self.sql_proxy_unique_path,
instance_socket_name=self._get_instance_socket_name()
)
else:
public_uris = database_uris['public']
if self.use_ssl:
format_string = public_uris['ssl']
ssl_spec = {
'cert': self.sslcert,
'key': self.sslkey,
'ca': self.sslrootcert
}
else:
format_string = public_uris['non-ssl']
if not self.user:
raise AirflowException("The login parameter needs to be set in connection")
if not self.public_ip:
raise AirflowException("The location parameter needs to be set in connection")
if not self.password:
raise AirflowException("The password parameter needs to be set in connection")
if not self.database:
raise AirflowException("The database parameter needs to be set in connection")
connection_uri = format_string.format(
user=quote_plus(self.user) if self.user else '',
password=quote_plus(self.password) if self.password else '',
database=quote_plus(self.database) if self.database else '',
public_ip=self.public_ip,
public_port=self.public_port,
proxy_port=self.sql_proxy_tcp_port,
socket_path=self._quote(socket_path),
ssl_spec=self._quote(json.dumps(ssl_spec)) if ssl_spec else '',
client_cert_file=self._quote(self.sslcert) if self.sslcert else '',
client_key_file=self._quote(self.sslkey) if self.sslcert else '',
server_ca_file=self._quote(self.sslrootcert if self.sslcert else '')
)
self.log.info("DB connection URI %s", connection_uri.replace(
quote_plus(self.password) if self.password else 'PASSWORD', 'XXXXXXXXXXXX'))
return connection_uri
开发者ID:alrolorojas,项目名称:airflow,代码行数:58,代码来源:gcp_sql_hook.py
示例12: get_display_url
def get_display_url(self, data, trans):
dataset_hash, user_hash = encode_dataset_user(trans, data, None)
return url_for(controller='dataset',
action="display_application",
dataset_id=dataset_hash,
user_id=user_hash,
app_name=quote_plus(self.display_application.id),
link_name=quote_plus(self.id),
app_action=None)
开发者ID:bwlang,项目名称:galaxy,代码行数:9,代码来源:application.py
示例13: to_query
def to_query(object, key=None):
""" Dumps a dictionary into a nested query string."""
object_type = type(object)
if object_type is dict:
return '&'.join([to_query(object[k], '%s[%s]' % (key, k) if key else k) for k in sorted(object)])
elif object_type in (list, tuple):
return '&'.join([to_query(o, '%s[]' % key) for o in object])
else:
return '%s=%s' % (quote_plus(str(key)), quote_plus(str(object)))
开发者ID:Asset-Map,项目名称:recurly-client-python,代码行数:9,代码来源:js.py
示例14: get_repo_blob_data_cached
def get_repo_blob_data_cached(repo, full_name, commit_sha):
# type: (Repo_ish, Text, bytes) -> bytes
"""
:arg commit_sha: A byte string containing the commit hash
"""
if isinstance(commit_sha, six.binary_type):
from six.moves.urllib.parse import quote_plus
cache_key = "%s%R%1".join((
CACHE_KEY_ROOT,
quote_plus(repo.controldir()),
quote_plus(full_name),
commit_sha.decode(),
".".join(str(s) for s in sys.version_info[:2]),
)) # type: Optional[Text]
else:
cache_key = None
try:
import django.core.cache as cache
except ImproperlyConfigured:
cache_key = None
result = None # type: Optional[bytes]
if cache_key is None:
result = get_repo_blob(repo, full_name, commit_sha,
allow_tree=False).data
assert isinstance(result, six.binary_type)
return result
# Byte string is wrapped in a tuple to force pickling because memcache's
# python wrapper appears to auto-decode/encode string values, thus trying
# to decode our byte strings. Grr.
def_cache = cache.caches["default"]
# Memcache is apparently limited to 250 characters.
if len(cache_key) < 240:
cached_result = def_cache.get(cache_key)
if cached_result is not None:
(result,) = cached_result
assert isinstance(result, six.binary_type), cache_key
return result
result = get_repo_blob(repo, full_name, commit_sha,
allow_tree=False).data
assert result is not None
if len(result) <= getattr(settings, "RELATE_CACHE_MAX_BYTES", 0):
def_cache.add(cache_key, (result,), None)
assert isinstance(result, six.binary_type)
return result
开发者ID:ishitatsuyuki,项目名称:relate,代码行数:55,代码来源:content.py
示例15: get_repo_keys
def get_repo_keys(memcached_inst):
for repo in (memcached_inst.get('repos') or []):
uri = repo['uri']
branches = set(['master'])
for release in repo.get('releases'):
if 'branch' in release:
branches.add(release['branch'])
for branch in branches:
yield 'vcs:' + str(parse.quote_plus(uri) + ':' + branch)
yield 'rcs:' + str(parse.quote_plus(uri) + ':' + branch)
开发者ID:Acidburn0zzz,项目名称:stackalytics,代码行数:11,代码来源:dump.py
示例16: test_oauth2_get_authorize_url_native
def test_oauth2_get_authorize_url_native():
"""
Starts an auth flow with a NativeAppFlowManager, gets the authorize url
validates expected results with both default and specified parameters.
"""
ac = globus_sdk.AuthClient(client_id=CLIENT_ID)
# default parameters for starting auth flow
flow_manager = globus_sdk.auth.GlobusNativeAppFlowManager(ac)
ac.current_oauth2_flow_manager = flow_manager
# get url_and validate results
url_res = ac.oauth2_get_authorize_url()
expected_vals = [
ac.base_url + "v2/oauth2/authorize?",
"client_id=" + ac.client_id,
"redirect_uri=" + quote_plus(ac.base_url + "v2/web/auth-code"),
"scope=" + quote_plus(" ".join(DEFAULT_REQUESTED_SCOPES)),
"state=" + "_default",
"response_type=" + "code",
"code_challenge=" + quote_plus(flow_manager.challenge),
"code_challenge_method=" + "S256",
"access_type=" + "online",
]
for val in expected_vals:
assert val in url_res
# starting flow with specified paramaters
flow_manager = globus_sdk.auth.GlobusNativeAppFlowManager(
ac,
requested_scopes="scopes",
redirect_uri="uri",
state="state",
verifier=("a" * 43),
refresh_tokens=True,
)
ac.current_oauth2_flow_manager = flow_manager
# get url_and validate results
url_res = ac.oauth2_get_authorize_url()
verifier, remade_challenge = make_native_app_challenge("a" * 43)
expected_vals = [
ac.base_url + "v2/oauth2/authorize?",
"client_id=" + ac.client_id,
"redirect_uri=" + "uri",
"scope=" + "scopes",
"state=" + "state",
"response_type=" + "code",
"code_challenge=" + quote_plus(remade_challenge),
"code_challenge_method=" + "S256",
"access_type=" + "offline",
]
for val in expected_vals:
assert val in url_res
开发者ID:globus,项目名称:globus-sdk-python,代码行数:54,代码来源:test_auth_client_flow.py
示例17: _get_tomahawk_url
def _get_tomahawk_url(metadata):
"""Generates URL for iframe with Tomahawk embedded player.
See http://toma.hk/tools/embeds.php for more info.
"""
if not ('artist' in metadata and 'title' in metadata):
return None
else:
return "http://toma.hk/embed.php?artist={artist}&title={title}".format(
artist=quote_plus(metadata['artist'].encode("UTF-8")),
title=quote_plus(metadata['title'].encode("UTF-8")),
)
开发者ID:hellska,项目名称:acousticbrainz-server,代码行数:12,代码来源:data.py
示例18: process_repo
def process_repo(repo, runtime_storage_inst, record_processor_inst):
uri = repo['uri']
LOG.debug('Processing repo uri %s' % uri)
bp_iterator = lp.log(repo)
bp_iterator_typed = _record_typer(bp_iterator, 'bp')
processed_bp_iterator = record_processor_inst.process(
bp_iterator_typed)
runtime_storage_inst.set_records(processed_bp_iterator,
utils.merge_records)
vcs_inst = vcs.get_vcs(repo, cfg.CONF.sources_root)
vcs_inst.fetch()
rcs_inst = rcs.get_rcs(repo, cfg.CONF.review_uri)
rcs_inst.setup(key_filename=cfg.CONF.ssh_key_filename,
username=cfg.CONF.ssh_username)
branches = set(['master'])
for release in repo.get('releases'):
if 'branch' in release:
branches.add(release['branch'])
for branch in branches:
LOG.debug('Processing repo %s, branch %s', uri, branch)
vcs_key = 'vcs:' + str(parse.quote_plus(uri) + ':' + branch)
last_id = runtime_storage_inst.get_by_key(vcs_key)
commit_iterator = vcs_inst.log(branch, last_id)
commit_iterator_typed = _record_typer(commit_iterator, 'commit')
processed_commit_iterator = record_processor_inst.process(
commit_iterator_typed)
runtime_storage_inst.set_records(
processed_commit_iterator, _merge_commits)
last_id = vcs_inst.get_last_id(branch)
runtime_storage_inst.set_by_key(vcs_key, last_id)
LOG.debug('Processing reviews for repo %s, branch %s', uri, branch)
rcs_key = 'rcs:' + str(parse.quote_plus(uri) + ':' + branch)
last_id = runtime_storage_inst.get_by_key(rcs_key)
review_iterator = rcs_inst.log(branch, last_id)
review_iterator_typed = _record_typer(review_iterator, 'review')
processed_review_iterator = record_processor_inst.process(
review_iterator_typed)
runtime_storage_inst.set_records(processed_review_iterator,
utils.merge_records)
last_id = rcs_inst.get_last_id(branch)
runtime_storage_inst.set_by_key(rcs_key, last_id)
开发者ID:vnaboychenko,项目名称:stackalytics,代码行数:53,代码来源:main.py
示例19: url
def url(self):
base_url = self.trans.request.base
if self.parameter.strip_https and base_url[: 5].lower() == 'https':
base_url = "http%s" % base_url[5:]
return "%s%s" % (base_url,
url_for(controller='dataset',
action="display_application",
dataset_id=self._dataset_hash,
user_id=self._user_hash,
app_name=quote_plus(self.parameter.link.display_application.id),
link_name=quote_plus(self.parameter.link.id),
app_action=self.action_name,
action_param=self._url))
开发者ID:ImmPortDB,项目名称:immport-galaxy,代码行数:13,代码来源:parameters.py
示例20: _alternatives
def _alternatives(self, account, container, obj):
# put S3 parts in dedicated container
suffix = ("+segments" if container and container.endswith('+segments')
else "")
if obj is None:
yield account, container, obj
elif self.stop_at_first_match:
# TODO(FVE): when supported, also pass account to con_builder()
yield account, quote_plus(self.con_builder(obj)) + suffix, obj
else:
for alt_container in self.con_builder.alternatives(obj):
yield account, quote_plus(alt_container) + suffix, obj
raise StopIteration
开发者ID:jfsmig,项目名称:oio-swift,代码行数:13,代码来源:autocontainerbase.py
注:本文中的six.moves.urllib.parse.quote_plus函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论