• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python parse.urlencode函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中six.moves.urllib.parse.urlencode函数的典型用法代码示例。如果您正苦于以下问题:Python urlencode函数的具体用法?Python urlencode怎么用?Python urlencode使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了urlencode函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: export

    def export(self, ds, requestor, notify):
        """
        This function exports data as FITS files. To do this, the function binds metadata (keywords) to images (arrays) to create FITS files and then serves the FITS files at jsoc.stanford.edu.
        Written by Monica Bobra and Art Amezcua
        19 July 2016

        Parameters
        ----------
        requestor: string
        	Username of requestor.
        notify   : string
        	E-mail address of requestor.
        ds       : string
            Name of the data series.

        Returns
        -------
        supath : list
            List containing paths to all the requested FITS files.
		"""
		# test to see if the user's e-mail address is registered with jsoc.stanford.edu
        test_email_query = 'http://jsoc.stanford.edu/cgi-bin/ajax/checkAddress.sh?address='+quote_plus(notify)+'&checkonly=1'
        response = urlopen(test_email_query)
        data = json.loads(response.read())
        if (data['status'] == 4):
		    raise RuntimeError('User e-mail address is not registered with jsoc.stanford.edu')
        query = '?' + urlencode({'op': 'exp_request', 'protocol': 'fits', 'format': 'json', 'method': 'url', 'requestor': requestor, 'notify': notify, 'ds': ds})
        req = self._json_request(self._url_jsoc_fetch + query)
        # waiting for the request to be ready
        if (int(req.data['status']) == 1 or int(req.data['status']) == 2):
            if 'requestid' in req.data:
                query = '?' + urlencode({'op': 'exp_status', 'requestid': req.data['requestid']})
                supath = []
                print('Waiting for the request to be ready. Please allow at least 20 seconds.')
                time.sleep(15)
                while True :  
                    req = self._json_request(self._url_jsoc_fetch + query)
                    if (int(req.data['status']) == 1 or int(req.data['status']) == 2 or int(req.data['status']) == 6):
                        time.sleep(5)
                    elif (int(req.data['status']) == 0):
                        dir = req.data['dir']
                        for dataobj in (req.data['data']):
                            supath.append(urljoin(self.baseurl,os.path.join(req.data['dir'],dataobj['filename'])))
                        break
                    else:
                        print(type(req.data['status']))
                        if (req.data['status'] == 3):
                            raise RuntimeError('DRMS Query failed, request size is too large, status=%s' % req.data['status'])
                        if (req.data['status'] == 4):
                            raise RuntimeError('DRMS Query failed, request not formed correctly, status=%s' % req.data['status'])
                        if (req.data['status'] == 5):
                            raise RuntimeError('DRMS Query failed, export request expired, status=%s' % req.data['status'])
                            
            else:
                raise RuntimeError('DRMS Query failed, there is no requestid, status=%s' % req.data['status'])
        else:
            raise RuntimeError('DRMS Query failed, series is not a valid series, status=%s' % req.data['status'])
        print("All the data are available at:")
        print(str(urljoin(self.baseurl,req.data['dir'])))
        return supath
开发者ID:mbobra,项目名称:drms_json,代码行数:60,代码来源:drms_json.py


示例2: _list_messages_page

 def _list_messages_page(self, marker, retry=False):
     url = self._get_files_url()
     parsed_url = urlsplit(url, 'http')
     conn = self.get_connection(parsed_url, self.tls)
     headers = [('Host', parsed_url.hostname),
                ('X-Auth-Token', self.auth.token_id)]
     query = urlencode({'limit': '1000'})
     if marker:
         query += '&{0}'.format(urlencode({'marker': marker}))
     selector = '{0}?{1}'.format(parsed_url.path, query)
     with gevent.Timeout(self.timeout):
         log.request(conn, 'GET', selector, headers)
         conn.putrequest('GET', selector)
         for name, value in headers:
             conn.putheader(name, value)
         conn.endheaders()
         res = conn.getresponse()
         status = '{0!s} {1}'.format(res.status, res.reason)
         log.response(conn, status, res.getheaders())
         data = res.read()
     if res.status == 401 and not retry:
         self.auth.create_token()
         return self._list_messages_page(marker, retry=True)
     if res.status == 200:
         lines = data.splitlines()
         return [line for line in lines
                 if line.startswith(self.prefix)], lines[-1]
     elif res.status == 204:
         return [], None
     else:
         raise RackspaceError(res)
开发者ID:slimta,项目名称:python-slimta-cloudstorage,代码行数:31,代码来源:rackspace.py


示例3: use_http_uri

    def use_http_uri(message, typ, destination="", relay_state=""):
        if "\n" in message:
            data = message.split("\n")[1]
        else:
            data = message.strip()
        if typ == "SAMLResponse":
            info = {
                "data": data,
                "headers": [
                    ("Content-Type", "application/samlassertion+xml"),
                    ("Cache-Control", "no-cache, no-store"),
                    ("Pragma", "no-cache")
                ]
            }
        elif typ == "SAMLRequest":
            # msg should be an identifier
            if relay_state:
                query = urlencode({"ID": message,
                                   "RelayState": relay_state})
            else:
                query = urlencode({"ID": message})
            info = {
                "data": "",
                "url": "%s?%s" % (destination, query)
            }
        else:
            raise NotImplemented

        return info
开发者ID:HaToHo,项目名称:pysaml2,代码行数:29,代码来源:httpbase.py


示例4: cas_application

 def cas_application(environ, start_response):
     username = environ.get('REMOTE_USER','')
     if username:
         return application(environ, start_response)
     qs = environ.get('QUERY_STRING','').split("&")
     if qs and qs[-1].startswith("ticket="):
         # assume a response from the authority
         ticket = qs.pop().split("=", 1)[1]
         environ['QUERY_STRING'] = "&".join(qs)
         service = construct_url(environ)
         args = urlencode(
                 {'service': service,'ticket': ticket})
         requrl = authority + "validate?" + args
         result = urlopen(requrl).read().split("\n")
         if 'yes' == result[0]:
             environ['REMOTE_USER'] = result[1]
             environ['AUTH_TYPE'] = 'cas'
             return application(environ, start_response)
         exce = CASLoginFailure()
     else:
         service = construct_url(environ)
         args = urlencode({'service': service})
         location = authority + "login?" + args
         exce = CASAuthenticate(location)
     return exce.wsgi_application(environ, start_response)
开发者ID:10sr,项目名称:hue,代码行数:25,代码来源:cas.py


示例5: execute

    def execute(cls, uri, http_verb, extra_headers=None, batch=False, _body=None, **kw):
        """
        if batch == False, execute a command with the given parameters and
        return the response JSON.
        If batch == True, return the dictionary that would be used in a batch
        command.
        """
        if batch:
            urlsplitter = urlparse(API_ROOT).netloc
            ret = {"method": http_verb, "path": uri.split(urlsplitter, 1)[1]}
            if kw:
                ret["body"] = kw
            return ret

        if not ('app_id' in ACCESS_KEYS and 'rest_key' in ACCESS_KEYS):
            raise core.ParseError('Missing connection credentials')

        app_id = ACCESS_KEYS.get('app_id')
        rest_key = ACCESS_KEYS.get('rest_key')
        master_key = ACCESS_KEYS.get('master_key')

        url = uri if uri.startswith(API_ROOT) else cls.ENDPOINT_ROOT + uri
        if _body is None:
            data = kw and json.dumps(kw, default=date_handler) or "{}"
        else:
            data = _body
        if http_verb == 'GET' and data and len(urlencode(kw)) > 0:
            url += '?%s' % urlencode(kw)
            data = None
        else:
            data = data.encode('utf-8')

        headers = {
            'Content-type': 'application/json',
            'X-Parse-Application-Id': app_id,
            'X-Parse-REST-API-Key': rest_key
        }
        headers.update(extra_headers or {})
        url = url.replace('classes/User', 'users') # Jarrel edit
        request = Request(url, data, headers)

        if ACCESS_KEYS.get('session_token'):
            request.add_header('X-Parse-Session-Token', ACCESS_KEYS.get('session_token'))
        elif master_key:
            request.add_header('X-Parse-Master-Key', master_key)

        request.get_method = lambda: http_verb

        try:
            response = urlopen(request, timeout=CONNECTION_TIMEOUT)
        except HTTPError as e:
            exc = {
                400: core.ResourceRequestBadRequest,
                401: core.ResourceRequestLoginRequired,
                403: core.ResourceRequestForbidden,
                404: core.ResourceRequestNotFound
                }.get(e.code, core.ParseError)
            raise exc(e.read())

        return json.loads(response.read().decode('utf-8'))
开发者ID:jarrelscy,项目名称:ParsePy,代码行数:60,代码来源:connection.py


示例6: test_group_request

def test_group_request(session, users, groups, http_client, base_url):  # noqa: F811
    user = users["[email protected]"]
    group = groups["sad-team"]

    # Request to join

    fe_url = url(base_url, "/groups/{}/join".format(group.groupname))
    resp = yield http_client.fetch(
        fe_url,
        method="POST",
        headers={"X-Grouper-User": user.username},
        body=urlencode({"reason": "Test Request", "member": "User: [email protected]"}),
    )
    assert resp.code == 200

    request = Request.get(session, requester_id=user.id, requesting_id=group.id)
    assert request.status == "pending"

    # Approve request

    fe_url = url(base_url, "/groups/{}/requests/{}".format(group.groupname, request.id))
    resp = yield http_client.fetch(
        fe_url,
        method="POST",
        headers={"X-Grouper-User": "[email protected]"},
        body=urlencode({"reason": "Test Request", "status": "actioned"}),
    )
    assert resp.code == 200

    request = Request.get(session, requester_id=user.id, requesting_id=group.id)
    assert request.status == "actioned"
开发者ID:dropbox,项目名称:grouper,代码行数:31,代码来源:handlers_test.py


示例7: __init__

 def __init__(self, base, relative=None):
     self._has_token = False
     self._url = None
     self._url_parts = None
     self._loaded = False
     self._xml = None
     self._url_parts = None
     self._headers = None
     self._config = None
     if isinstance(base, six.string_types):
         base_url = base
         self._url_parts = list(parse.urlsplit(base_url))
     elif isinstance(base, RequestBase):
         base_url = base.url
         self._has_token = base.has_token
         self._url_parts = base._url_parts[:]
         self._headers = base._headers
         self._config = base.config
     if relative:
         scheme, netloc, path, qs, fragment = parse.urlsplit(relative)
         if path:
             self._url_parts[2] = _join_plex(self._url_parts[2], path)
         if qs:
             data = parse.parse_qsl(self._url_parts[3]) + parse.parse_qsl(qs)
             self._url_parts[3] = parse.urlencode(data)
         else:
             # Strip of all non-token parts
             data = parse.parse_qsl(self._url_parts[3])
             self._url_parts[3] = parse.urlencode([(x, y) for x, y in data if x == 'X-Plex-Token'])
     if not self._has_token:
         self._has_token = 'X-Plex-Token' in parse.parse_qs(self._url_parts[3])
     self._url = parse.urlunsplit(self._url_parts)
开发者ID:Xaroth,项目名称:plex-export,代码行数:32,代码来源:base.py


示例8: url_concat

def url_concat(url, args, fragments=None):
    """Concatenate url and argument dictionary regardless of whether
    url has existing query parameters.

    >>> url_concat("http://example.com/foo?a=b", dict(c="d"))
    'http://example.com/foo?a=b&c=d'
    """

    if not args and not fragments:
        return url

    # Strip off hashes
    while url[-1] == '#':
        url = url[:-1]

    fragment_tail = ''
    if fragments:
        fragment_tail = '#' + urlencode(fragments)

    args_tail = ''
    if args:
        if url[-1] not in ('?', '&'):
            args_tail += '&' if ('?' in url) else '?'
        args_tail += urlencode(args)

    return url + args_tail + fragment_tail
开发者ID:hadock,项目名称:nylas-python,代码行数:26,代码来源:util.py


示例9: generate_dashboard_url

def generate_dashboard_url(dashboard):
    """Generate a dashboard URL from a given definition."""
    try:
        title = dashboard.get('dashboard', 'title')
    except configparser.NoOptionError:
        raise ValueError("option 'title' in section 'dashboard' not set")

    try:
        foreach = dashboard.get('dashboard', 'foreach')
    except configparser.NoOptionError:
        raise ValueError("option 'foreach' in section 'dashboard' not set")

    try:
        baseurl = dashboard.get('dashboard', 'baseurl')
    except configparser.NoOptionError:
        baseurl = 'https://review.openstack.org/#/dashboard/?'

    url = baseurl
    url += escape(urllib_parse.urlencode({'title': title,
                                          'foreach': foreach}))
    for section in dashboard.sections():
        if not section.startswith('section'):
            continue

        try:
            query = dashboard.get(section, 'query')
        except configparser.NoOptionError:
            raise ValueError("option 'query' in '%s' not set" % section)

        title = section[9:-1]
        encoded = escape(urllib_parse.urlencode({title: query}))
        url += "&%s" % encoded
    return url
开发者ID:ankur-gupta91,项目名称:gerrit-dash-creator,代码行数:33,代码来源:creator.py


示例10: test_user_tok_acls

def test_user_tok_acls(
    session, graph, users, user_admin_perm_to_auditors, http_client, base_url  # noqa: F811
):
    role_user = "[email protected]"
    admin = "[email protected]"
    pleb = "[email protected]"

    # admin creating token for role user
    fe_url = url(base_url, "/users/{}/tokens/add".format(role_user))
    resp = yield http_client.fetch(
        fe_url, method="POST", headers={"X-Grouper-User": admin}, body=urlencode({"name": "foo"})
    )
    assert resp.code == 200

    with pytest.raises(HTTPError):
        # non-admin creating token for role user
        resp = yield http_client.fetch(
            fe_url,
            method="POST",
            headers={"X-Grouper-User": pleb},
            body=urlencode({"name": "foo2"}),
        )

    fe_url = url(base_url, "/users/{}/tokens/add".format(pleb))
    with pytest.raises(HTTPError):
        # admin creating token for normal (non-role) user
        resp = yield http_client.fetch(
            fe_url,
            method="POST",
            headers={"X-Grouper-User": admin},
            body=urlencode({"name": "foo3"}),
        )
开发者ID:dropbox,项目名称:grouper,代码行数:32,代码来源:users_test.py


示例11: list

    def list(self, response, res):
        params = dict(response.request.params)
        params.pop("marker", None)
        query = urlparse.urlencode(params)
        type_name = response.request.urlvars.get("type_name")
        type_version = response.request.urlvars.get("type_version")
        if response.request.urlvars.get("state") == "creating":
            drafts = "/drafts"
        else:
            drafts = ""

        artifacts_list = [serialization.serialize_for_client(a, show_level=Showlevel.NONE) for a in res["artifacts"]]
        url = "/v3/artifacts"
        if type_name:
            url += "/" + type_name
        if type_version:
            url += "/v" + type_version
        url += drafts
        if query:
            first_url = url + "?" + query
        else:
            first_url = url
        body = {"artifacts": artifacts_list, "first": first_url}
        if "next_marker" in res:
            params["marker"] = res["next_marker"]
            next_query = urlparse.urlencode(params)
            body["next"] = url + "?" + next_query
        content = json.dumps(body, ensure_ascii=False)
        response.unicode_body = six.text_type(content)
        response.content_type = "application/json"
开发者ID:bgxavier,项目名称:glance,代码行数:30,代码来源:artifacts.py


示例12: finalize_request

	def finalize_request(self):
		redirect_to = self.data.get('redirect_to') or None
		redirect_message = self.data.get('redirect_message') or None
		status = self.integration_request.status

		if self.flags.status_changed_to == "Completed":
			if self.data.reference_doctype and self.data.reference_docname:
				custom_redirect_to = None
				try:
					custom_redirect_to = frappe.get_doc(self.data.reference_doctype,
						self.data.reference_docname).run_method("on_payment_authorized", self.flags.status_changed_to)
				except Exception:
					frappe.log_error(frappe.get_traceback())

				if custom_redirect_to:
					redirect_to = custom_redirect_to

				redirect_url = 'payment-success'

			if self.redirect_url:
				redirect_url = self.redirect_url
				redirect_to = None
		else:
			redirect_url = 'payment-failed'

		if redirect_to:
			redirect_url += '?' + urlencode({'redirect_to': redirect_to})
		if redirect_message:
			redirect_url += '&' + urlencode({'redirect_message': redirect_message})

		return {
			"redirect_to": redirect_url,
			"status": status
		}
开发者ID:ESS-LLP,项目名称:frappe,代码行数:34,代码来源:stripe_settings.py


示例13: list

    def list(self, **kwargs):
        """Retrieve a listing of Image objects

        :param page_size: Number of images to request in each paginated request
        :returns generator over list of Images
        """

        ori_validate_fun = self.model.validate
        empty_fun = lambda *args, **kwargs: None

        def paginate(url):
            resp, body = self.http_client.get(url)
            for image in body['images']:
                # NOTE(bcwaldon): remove 'self' for now until we have
                # an elegant way to pass it into the model constructor
                # without conflict.
                image.pop('self', None)
                yield self.model(**image)
                # NOTE(zhiyan): In order to resolve the performance issue
                # of JSON schema validation for image listing case, we
                # don't validate each image entry but do it only on first
                # image entry for each page.
                self.model.validate = empty_fun

            # NOTE(zhiyan); Reset validation function.
            self.model.validate = ori_validate_fun

            try:
                next_url = body['next']
            except KeyError:
                return
            else:
                for image in paginate(next_url):
                    yield image

        filters = kwargs.get('filters', {})

        if not kwargs.get('page_size'):
            filters['limit'] = DEFAULT_PAGE_SIZE
        else:
            filters['limit'] = kwargs['page_size']

        tags = filters.pop('tag', [])
        tags_url_params = []

        for tag in tags:
            if isinstance(tag, six.string_types):
                tags_url_params.append({'tag': strutils.safe_encode(tag)})

        for param, value in six.iteritems(filters):
            if isinstance(value, six.string_types):
                filters[param] = strutils.safe_encode(value)

        url = '/v2/images?%s' % parse.urlencode(filters)

        for param in tags_url_params:
            url = '%s&%s' % (url, parse.urlencode(param))

        for image in paginate(url):
            yield image
开发者ID:codybum,项目名称:OpenStackInAction,代码行数:60,代码来源:images.py


示例14: list

    def list(self, response, res):
        params = dict(response.request.params)
        params.pop('marker', None)
        query = urlparse.urlencode(params)
        type_name = response.request.urlvars.get('type_name')
        type_version = response.request.urlvars.get('type_version')
        if response.request.urlvars.get('state') == 'creating':
            drafts = "/drafts"
        else:
            drafts = ""

        artifacts_list = [
            serialization.serialize_for_client(a, show_level=Showlevel.NONE)
            for a in res['artifacts']]
        url = "/v0.1/artifacts"
        if type_name:
            url += "/" + type_name
        if type_version:
            url += "/v" + type_version
        url += drafts
        if query:
            first_url = url + "?" + query
        else:
            first_url = url
        body = {
            "artifacts": artifacts_list,
            "first": first_url
        }
        if 'next_marker' in res:
            params['marker'] = res['next_marker']
            next_query = urlparse.urlencode(params)
            body['next'] = url + '?' + next_query
        content = json.dumps(body, ensure_ascii=False)
        response.unicode_body = six.text_type(content)
        response.content_type = 'application/json'
开发者ID:froyobin,项目名称:xmonitor,代码行数:35,代码来源:glare.py


示例15: user_data

    def user_data(self, access_token, *args, **kwargs):
        """Loads user data from service"""
        url = GITHUB_USER_DATA_URL + '?' + urlencode({
            'access_token': access_token
        })

        try:
            data = simplejson.load(dsa_urlopen(url))
        except ValueError:
            data = None

        # if we have a github organization defined, test that the current users
        # is a member of that organization.
        if data and self.GITHUB_ORGANIZATION:
            member_url = GITHUB_ORGANIZATION_MEMBER_OF_URL.format(
                org=self.GITHUB_ORGANIZATION,
                username=data.get('login')
            ) + '?' + urlencode({
                'access_token': access_token
            })

            try:
                response = dsa_urlopen(member_url)
            except HTTPError:
                data = None
            else:
                # if the user is a member of the organization, response code
                # will be 204, see http://bit.ly/ZS6vFl
                if response.code != 204:
                    raise AuthFailed('User doesn\'t belong to the '
                                     'organization')
        return data
开发者ID:ForkRepo,项目名称:sentry,代码行数:32,代码来源:github.py


示例16: create_charge_on_braintree

	def create_charge_on_braintree(self):
		self.configure_braintree()

		redirect_to = self.data.get('redirect_to') or None
		redirect_message = self.data.get('redirect_message') or None

		result = braintree.Transaction.sale({
			"amount": self.data.amount,
			"payment_method_nonce": self.data.payload_nonce,
			"options": {
				"submit_for_settlement": True
			}
		})

		if result.is_success:
			self.integration_request.db_set('status', 'Completed', update_modified=False)
			self.flags.status_changed_to = "Completed"
			self.integration_request.db_set('output', result.transaction.status, update_modified=False)

		elif result.transaction:
			self.integration_request.db_set('status', 'Failed', update_modified=False)
			error_log = frappe.log_error("code: " + str(result.transaction.processor_response_code) + " | text: " + str(result.transaction.processor_response_text), "Braintree Payment Error")
			self.integration_request.db_set('error', error_log.error, update_modified=False)
		else:
			self.integration_request.db_set('status', 'Failed', update_modified=False)
			for error in result.errors.deep_errors:
				error_log = frappe.log_error("code: " + str(error.code) + " | message: " + str(error.message), "Braintree Payment Error")
				self.integration_request.db_set('error', error_log.error, update_modified=False)

		if self.flags.status_changed_to == "Completed":
			status = 'Completed'
			if self.data.reference_doctype and self.data.reference_docname:
				custom_redirect_to = None
				try:
					custom_redirect_to = frappe.get_doc(self.data.reference_doctype,
						self.data.reference_docname).run_method("on_payment_authorized", self.flags.status_changed_to)
					braintree_success_page = frappe.get_hooks('braintree_success_page')
					if braintree_success_page:
						custom_redirect_to = frappe.get_attr(braintree_success_page[-1])(self.data)
				except Exception:
					frappe.log_error(frappe.get_traceback())

				if custom_redirect_to:
					redirect_to = custom_redirect_to

			redirect_url = 'payment-success'
		else:
			status = 'Error'
			redirect_url = 'payment-failed'

		if redirect_to:
			redirect_url += '?' + urlencode({'redirect_to': redirect_to})
		if redirect_message:
			redirect_url += '&' + urlencode({'redirect_message': redirect_message})

		return {
		"redirect_to": redirect_url,
		"status": status
		}
开发者ID:ESS-LLP,项目名称:frappe,代码行数:59,代码来源:braintree_settings.py


示例17: http_redirect_message

def http_redirect_message(message, location, relay_state="", typ="SAMLRequest",
                          sigalg=None, key=None, **kwargs):
    """The HTTP Redirect binding defines a mechanism by which SAML protocol
    messages can be transmitted within URL parameters.
    Messages are encoded for use with this binding using a URL encoding
    technique, and transmitted using the HTTP GET method.

    The DEFLATE Encoding is used in this function.

    :param message: The message
    :param location: Where the message should be posted to
    :param relay_state: for preserving and conveying state information
    :param typ: What type of message it is SAMLRequest/SAMLResponse/SAMLart
    :param sigalg: The signature algorithm to use.
    :param key: Key to use for signing
    :return: A tuple containing header information and a HTML message.
    """

    if not isinstance(message, six.string_types):
        message = "%s" % (message,)

    _order = None
    if typ in ["SAMLRequest", "SAMLResponse"]:
        if typ == "SAMLRequest":
            _order = REQ_ORDER
        else:
            _order = RESP_ORDER
        args = {typ: deflate_and_base64_encode(message)}
    elif typ == "SAMLart":
        args = {typ: message}
    else:
        raise Exception("Unknown message type: %s" % typ)

    if relay_state:
        args["RelayState"] = relay_state

    if sigalg:
        # sigalgs, one of the ones defined in xmldsig

        args["SigAlg"] = sigalg

        try:
            signer = SIGNER_ALGS[sigalg]
        except:
            raise Unsupported("Signing algorithm")
        else:
            string = "&".join([urlencode({k: args[k]})
                               for k in _order if k in args]).encode('ascii')
            args["Signature"] = base64.b64encode(signer.sign(string, key))
            string = urlencode(args)
    else:
        string = urlencode(args)

    glue_char = "&" if urlparse(location).query else "?"
    login_url = glue_char.join([location, string])
    headers = [('Location', str(login_url))]
    body = []

    return {"headers": headers, "data": body}
开发者ID:Goggin,项目名称:pysaml2,代码行数:59,代码来源:pack.py


示例18: authorize_payment

	def authorize_payment(self):
		"""
		An authorization is performed when user’s payment details are successfully authenticated by the bank.
		The money is deducted from the customer’s account, but will not be transferred to the merchant’s account
		until it is explicitly captured by merchant.
		"""
		data = json.loads(self.integration_request.data)
		settings = self.get_settings(data)

		try:
			resp = make_get_request("https://api.razorpay.com/v1/payments/{0}"
				.format(self.data.razorpay_payment_id), auth=(settings.api_key,
					settings.api_secret))

			if resp.get("status") == "authorized":
				self.integration_request.update_status(data, 'Authorized')
				self.flags.status_changed_to = "Authorized"

			else:
				frappe.log_error(str(resp), 'Razorpay Payment not authorized')

		except:
			frappe.log_error(frappe.get_traceback())
			# failed
			pass

		status = frappe.flags.integration_request.status_code

		redirect_to = data.get('notes', {}).get('redirect_to') or None
		redirect_message = data.get('notes', {}).get('redirect_message') or None

		if self.flags.status_changed_to == "Authorized":
			if self.data.reference_doctype and self.data.reference_docname:
				custom_redirect_to = None
				try:
					custom_redirect_to = frappe.get_doc(self.data.reference_doctype,
						self.data.reference_docname).run_method("on_payment_authorized", self.flags.status_changed_to)
				except Exception:
					frappe.log_error(frappe.get_traceback())

				if custom_redirect_to:
					redirect_to = custom_redirect_to

			redirect_url = 'payment-success'
		else:
			redirect_url = 'payment-failed'

		if redirect_to:
			redirect_url += '?' + urlencode({'redirect_to': redirect_to})
		if redirect_message:
			redirect_url += '&' + urlencode({'redirect_message': redirect_message})

		return {
			"redirect_to": redirect_url,
			"status": status
		}
开发者ID:JiShangShiDai,项目名称:frappe,代码行数:56,代码来源:razorpay_settings.py


示例19: confirm_payment

def confirm_payment(token):
	try:
		redirect = True
		status_changed_to, redirect_to = None, None

		doc = frappe.get_doc("PayPal Settings")
		doc.setup_sandbox_env(token)

		integration_request = frappe.get_doc("Integration Request", token)
		data = json.loads(integration_request.data)

		redirect_to = data.get('redirect_to') or None
		redirect_message = data.get('redirect_message') or None

		params, url = doc.get_paypal_params_and_url()
		params.update({
			"METHOD": "DoExpressCheckoutPayment",
			"PAYERID": data.get("payerid"),
			"TOKEN": token,
			"PAYMENTREQUEST_0_PAYMENTACTION": "SALE",
			"PAYMENTREQUEST_0_AMT": data.get("amount"),
			"PAYMENTREQUEST_0_CURRENCYCODE": data.get("currency").upper()
		})

		response = make_post_request(url, data=params)

		if response.get("ACK")[0] == "Success":
			update_integration_request_status(token, {
				"transaction_id": response.get("PAYMENTINFO_0_TRANSACTIONID")[0],
				"correlation_id": response.get("CORRELATIONID")[0]
			}, "Completed")

			if data.get("reference_doctype") and data.get("reference_docname"):
				custom_redirect_to = frappe.get_doc(data.get("reference_doctype"),
					data.get("reference_docname")).run_method("on_payment_authorized", "Completed")
				frappe.db.commit()

				if custom_redirect_to:
					redirect_to = custom_redirect_to

			redirect_url = '/integrations/payment-success'
		else:
			redirect_url = "/integrations/payment-failed"

		if redirect_to:
			redirect_url += '?' + urlencode({'redirect_to': redirect_to})
		if redirect_message:
			redirect_url += '&' + urlencode({'redirect_message': redirect_message})

		# this is done so that functions called via hooks can update flags.redirect_to
		if redirect:
			frappe.local.response["type"] = "redirect"
			frappe.local.response["location"] = get_url(redirect_url)

	except Exception:
		frappe.log_error(frappe.get_traceback())
开发者ID:JiShangShiDai,项目名称:frappe,代码行数:56,代码来源:paypal_settings.py


示例20: create_charge_on_stripe

	def create_charge_on_stripe(self):
		headers = {"Authorization":
			"Bearer {0}".format(self.get_password(fieldname="secret_key", raise_exception=False))}
		
		data = {
			"amount": cint(flt(self.data.amount)*100),
			"currency": self.data.currency,
			"source": self.data.stripe_token_id,
			"description": self.data.description
		}
		
		redirect_to = self.data.get('redirect_to') or None
		redirect_message = self.data.get('redirect_message') or None

		try:
			resp = make_post_request(url="https://api.stripe.com/v1/charges", headers=headers, data=data)
			
			if resp.get("captured") == True:
				self.integration_request.db_set('status', 'Completed', update_modified=False)
				self.flags.status_changed_to = "Completed"

			else:
				frappe.log_error(str(resp), 'Stripe Payment not completed')

		except:
			frappe.log_error(frappe.get_traceback())
			# failed
			pass

		status = frappe.flags.integration_request.status_code

		if self.flags.status_changed_to == "Completed":
			if self.data.reference_doctype and self.data.reference_docname:
				custom_redirect_to = None
				try:
					custom_redirect_to = frappe.get_doc(self.data.reference_doctype,
						self.data.reference_docname).run_method("on_payment_authorized", self.flags.status_changed_to)
				except Exception:
					frappe.log_error(frappe.get_traceback())

				if custom_redirect_to:
					redirect_to = custom_redirect_to

			redirect_url = 'payment-success'
		else:
			redirect_url = 'payment-failed'

		if redirect_to:
			redirect_url += '?' + urlencode({'redirect_to': redirect_to})
		if redirect_message:
			redirect_url += '&' + urlencode({'redirect_message': redirect_message})

		return {
			"redirect_to": redirect_url,
			"status": status
		}
开发者ID:britlog,项目名称:frappe,代码行数:56,代码来源:stripe_settings.py



注:本文中的six.moves.urllib.parse.urlencode函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python parse.urljoin函数代码示例发布时间:2022-05-27
下一篇:
Python parse.urldefrag函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap