本文整理汇总了Python中saml2.s_utils.success_status_factory函数的典型用法代码示例。如果您正苦于以下问题:Python success_status_factory函数的具体用法?Python success_status_factory怎么用?Python success_status_factory使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了success_status_factory函数的16个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_response
def test_response(self):
response = sigver.response_factory(
in_response_to="_012345",
destination="https:#www.example.com",
status=s_utils.success_status_factory(),
assertion=s_utils.assertion_factory(
subject=factory(saml.Subject, text="_aaa",
name_id=saml.NAMEID_FORMAT_TRANSIENT),
attribute_statement=do_attribute_statement(
{
("", "", "sn"): ("Jeter", ""),
("", "", "givenName"): ("Derek", ""),
}
),
issuer=self.server._issuer(),
),
issuer=self.server._issuer(),
)
print(response.keyswv())
assert _eq(response.keyswv(), ['destination', 'assertion', 'status',
'in_response_to', 'issue_instant',
'version', 'issuer', 'id'])
assert response.version == "2.0"
assert response.issuer.text == "urn:mace:example.com:saml:roland:idp"
assert response.destination == "https:#www.example.com"
assert response.in_response_to == "_012345"
#
status = response.status
print(status)
assert status.status_code.value == samlp.STATUS_SUCCESS
开发者ID:jkakavas,项目名称:pysaml2,代码行数:31,代码来源:test_50_server.py
示例2: _status_response
def _status_response(self, response_class, issuer, status, sign=False,
**kwargs):
""" Create a StatusResponse.
:param response_class: Which subclass of StatusResponse that should be
used
:param issuer: The issuer of the response message
:param status: The return status of the response operation
:param sign: Whether the response should be signed or not
:param kwargs: Extra arguments to the response class
:return: Class instance or string representation of the instance
"""
mid = sid()
for key in ["destination", "binding"]:
try:
del kwargs[key]
except KeyError:
pass
if not status:
status = success_status_factory()
response = response_class(issuer=issuer, id=mid, version=VERSION,
issue_instant=instant(),
status=status, **kwargs)
if sign:
return self.sign(response, mid)
else:
return response
开发者ID:gbel,项目名称:pysaml2,代码行数:32,代码来源:entity.py
示例3: _response
def _response(self, in_response_to, consumer_url=None, status=None,
issuer=None, sign=False, to_sign=None, **kwargs):
""" Create a Response.
:param in_response_to: The session identifier of the request
:param consumer_url: The URL which should receive the response
:param status: The status of the response
:param issuer: The issuer of the response
:param sign: Whether the response should be signed or not
:param to_sign: If there are other parts to sign
:param kwargs: Extra key word arguments
:return: A Response instance
"""
if not status:
status = success_status_factory()
_issuer = self._issuer(issuer)
response = response_factory(issuer=_issuer,
in_response_to=in_response_to,
status=status)
if consumer_url:
response.destination = consumer_url
self._add_info(response, **kwargs)
if sign:
return self.sign(response, to_sign=to_sign)
elif to_sign:
return signed_instance_factory(response, self.sec, to_sign)
else:
return response
开发者ID:gbel,项目名称:pysaml2,代码行数:34,代码来源:entity.py
示例4: _response
def _response(self, in_response_to, consumer_url=None, status=None,
issuer=None, sign=False, to_sign=None,
encrypt_assertion=False, encrypt_cert=None, **kwargs):
""" Create a Response.
:param in_response_to: The session identifier of the request
:param consumer_url: The URL which should receive the response
:param status: The status of the response
:param issuer: The issuer of the response
:param sign: Whether the response should be signed or not
:param to_sign: If there are other parts to sign
:param kwargs: Extra key word arguments
:return: A Response instance
"""
if not status:
status = success_status_factory()
_issuer = self._issuer(issuer)
response = response_factory(issuer=_issuer,
in_response_to=in_response_to,
status=status)
if consumer_url:
response.destination = consumer_url
self._add_info(response, **kwargs)
if not sign and to_sign and not encrypt_assertion:
return signed_instance_factory(response, self.sec, to_sign)
if encrypt_assertion:
if sign:
response.signature = pre_signature_part(response.id,
self.sec.my_cert, 1)
cbxs = CryptoBackendXmlSec1(self.config.xmlsec_binary)
_, cert_file = make_temp("%s" % encrypt_cert, decode=False)
response = cbxs.encrypt_assertion(response, cert_file,
pre_encryption_part())
# template(response.assertion.id))
if sign:
if to_sign:
signed_instance_factory(response, self.sec, to_sign)
else:
# default is to sign the whole response if anything
sign_class = [(class_name(response), response.id)]
return signed_instance_factory(response, self.sec,
sign_class)
else:
return response
if sign:
return self.sign(response, to_sign=to_sign)
else:
return response
开发者ID:18600597055,项目名称:hue,代码行数:56,代码来源:entity.py
示例5: test_sign_then_encrypt_assertion2
def test_sign_then_encrypt_assertion2(self):
# Begin with the IdPs side
_sec = self.server.sec
nameid_policy = samlp.NameIDPolicy(allow_create="false",
format=saml.NAMEID_FORMAT_PERSISTENT)
asser = Assertion({"givenName": "Derek", "surName": "Jeter"})
assertion = asser.construct(
self.client.config.entityid, "_012345",
"http://lingon.catalogix.se:8087/",
factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT),
policy=self.server.config.getattr("policy", "idp"),
issuer=self.server._issuer(),
attrconvs=self.server.config.attribute_converters,
authn_class=INTERNETPROTOCOLPASSWORD,
authn_auth="http://www.example.com/login")
assertion.signature = sigver.pre_signature_part(
assertion.id, _sec.my_cert, 1)
sigass = _sec.sign_statement(assertion, class_name(assertion),
key_file=self.client.sec.key_file,
node_id=assertion.id)
sigass = rm_xmltag(sigass)
response = sigver.response_factory(
in_response_to="_012345",
destination="https://www.example.com",
status=s_utils.success_status_factory(),
issuer=self.server._issuer(),
encrypted_assertion=EncryptedAssertion()
)
xmldoc = "%s" % response
# strangely enough I get different tags if I run this test separately
# or as part of a bunch of tests.
xmldoc = add_subelement(xmldoc, "EncryptedAssertion", sigass)
enctext = _sec.crypto.encrypt_assertion(xmldoc, _sec.cert_file,
pre_encryption_part())
#seresp = samlp.response_from_string(enctext)
resp_str = base64.encodestring(enctext)
# Now over to the client side
resp = self.client.parse_authn_request_response(
resp_str, BINDING_HTTP_POST,
{"_012345": "http://foo.example.com/service"})
#assert resp.encrypted_assertion == []
assert resp.assertion
assert resp.ava == {'givenName': ['Derek'], 'sn': ['Jeter']}
开发者ID:daryllstrauss,项目名称:pysaml2,代码行数:54,代码来源:test_51_client.py
示例6: handle_logout_request
def handle_logout_request(self, request, name_id, binding, sign=False,
relay_state=""):
"""
Deal with a LogoutRequest
:param request: The request as text string
:param name_id: The id of the current user
:param binding: Which binding the message came in over
:param sign: Whether the response will be signed or not
:return: Keyword arguments which can be used to send the response
what's returned follow different patterns for different bindings.
If the binding is BINDIND_SOAP, what is returned looks like this::
{
"data": <the SOAP enveloped response>
"url": "",
'headers': [('content-type', 'application/soap+xml')]
'method': "POST
}
"""
logger.info("logout request: %s", request)
_req = self._parse_request(request, LogoutRequest,
"single_logout_service", binding)
if _req.message.name_id == name_id:
try:
if self.local_logout(name_id):
status = success_status_factory()
else:
status = status_message_factory("Server error",
STATUS_REQUEST_DENIED)
except KeyError:
status = status_message_factory("Server error",
STATUS_REQUEST_DENIED)
else:
status = status_message_factory("Wrong user",
STATUS_UNKNOWN_PRINCIPAL)
if binding == BINDING_SOAP:
response_bindings = [BINDING_SOAP]
elif binding == BINDING_HTTP_POST or BINDING_HTTP_REDIRECT:
response_bindings = [BINDING_HTTP_POST, BINDING_HTTP_REDIRECT]
else:
response_bindings = self.config.preferred_binding[
"single_logout_service"]
response = self.create_logout_response(_req.message, response_bindings,
status, sign)
rinfo = self.response_args(_req.message, response_bindings)
return self.apply_binding(rinfo["binding"], response,
rinfo["destination"], relay_state,
response=True)
开发者ID:pombredanne,项目名称:pysaml2,代码行数:54,代码来源:client.py
示例7: create_logout_response
def create_logout_response(self, request, binding, status=None,
sign=False, issuer=None):
""" Create a LogoutResponse. What is returned depends on which binding
is used.
:param request: The request this is a response to
:param binding: Which binding the request came in over
:param status: The return status of the response operation
:param issuer: The issuer of the message
:return: A logout message.
"""
mid = sid()
if not status:
status = success_status_factory()
# response and packaging differs depending on binding
response = ""
if binding in [BINDING_SOAP, BINDING_HTTP_POST]:
response = logoutresponse_factory(sign=sign, id = mid,
in_response_to = request.id,
status = status)
elif binding == BINDING_HTTP_REDIRECT:
sp_entity_id = request.issuer.text.strip()
srvs = self.metadata.single_logout_service(sp_entity_id, "spsso")
if not srvs:
raise Exception("Nowhere to send the response")
destination = destinations(srvs)[0]
_issuer = self.issuer(issuer)
response = logoutresponse_factory(sign=sign, id = mid,
in_response_to = request.id,
status = status,
issuer = _issuer,
destination = destination,
sp_entity_id = sp_entity_id,
instant=instant())
if sign:
to_sign = [(class_name(response), mid)]
response = signed_instance_factory(response, self.sec, to_sign)
logger.info("Response: %s" % (response,))
return response
开发者ID:GSA,项目名称:pysaml2,代码行数:45,代码来源:server.py
示例8: auth_response
def auth_response(identity, in_response_to, sp_conf):
"""Generates a fresh signed authentication response"""
sp_entity_id = sp_conf.entityid
idp_entity_id = sp_conf.idps().keys()[0]
acs = sp_conf.endpoint('assertion_consumer_service')[0]
issuer = saml.Issuer(text=idp_entity_id, format=saml.NAMEID_FORMAT_ENTITY)
response = response_factory(issuer=issuer,
in_response_to=in_response_to,
destination=acs,
status=success_status_factory())
idp_conf = IdPConfig()
name_form = "urn:oasis:names:tc:SAML:2.0:attrname-format:uri"
idp_conf.load({
'entityid': idp_entity_id,
'xmlsec_binary': sp_conf.xmlsec_binary,
'attribute_map_dir': os.path.join(BASEDIR, 'attribute-maps'),
'service': {
'idp': {
'endpoints': tuple(),
'policy': {
'default': {
"lifetime": {"minutes": 15},
"attribute_restrictions": None,
"name_form": name_form,
}
}
},
},
'key_file': os.path.join(BASEDIR, 'idpcert.key'),
'cert_file': os.path.join(BASEDIR, 'idpcert.pem'),
'metadata': {
'local': [os.path.join(BASEDIR, 'sp_metadata.xml')],
},
})
server = Server("", idp_conf)
server.ident = Identifier(FakeDb())
userid = 'irrelevant'
response = server.authn_response(identity, in_response_to, acs,
sp_entity_id, None, userid)
return '\n'.join(response)
开发者ID:FluidReview,项目名称:djangosaml2,代码行数:41,代码来源:auth_response.py
示例9: _response
def _response(self, in_response_to, consumer_url=None, status=None,
issuer=None, sign=False, to_sign=None,
**kwargs):
""" Create a Response that adhers to the ??? profile.
:param in_response_to: The session identifier of the request
:param consumer_url: The URL which should receive the response
:param status: The status of the response
:param issuer: The issuer of the response
:param sign: Whether the response should be signed or not
:param to_sign: What other parts to sign
:param kwargs: Extra key word arguments
:return: A Response instance
"""
if not status:
status = success_status_factory()
_issuer = self.issuer(issuer)
response = response_factory(
issuer=_issuer,
in_response_to = in_response_to,
status = status,
)
if consumer_url:
response.destination = consumer_url
for key, val in kwargs.items():
setattr(response, key, val)
if sign:
try:
to_sign.append((class_name(response), response.id))
except AttributeError:
to_sign = [(class_name(response), response.id)]
return signed_instance_factory(response, self.sec, to_sign)
开发者ID:GSA,项目名称:pysaml2,代码行数:40,代码来源:server.py
示例10: test_sign_then_encrypt_assertion
def test_sign_then_encrypt_assertion(self):
# Begin with the IdPs side
_sec = self.server.sec
assertion = s_utils.assertion_factory(
subject=factory(saml.Subject, text="_aaa",
name_id=factory(
saml.NameID,
format=saml.NAMEID_FORMAT_TRANSIENT)),
attribute_statement=do_attribute_statement(
{
("", "", "surName"): ("Jeter", ""),
("", "", "givenName"): ("Derek", ""),
}
),
issuer=self.server._issuer(),
)
assertion.signature = sigver.pre_signature_part(
assertion.id, _sec.my_cert, 1)
sigass = _sec.sign_statement(assertion, class_name(assertion),
key_file="pki/mykey.pem",
node_id=assertion.id)
# Create an Assertion instance from the signed assertion
_ass = saml.assertion_from_string(sigass)
response = sigver.response_factory(
in_response_to="_012345",
destination="https:#www.example.com",
status=s_utils.success_status_factory(),
issuer=self.server._issuer(),
assertion=_ass
)
enctext = _sec.crypto.encrypt_assertion(response, _sec.cert_file,
pre_encryption_part())
seresp = samlp.response_from_string(enctext)
# Now over to the client side
_csec = self.client.sec
if seresp.encrypted_assertion:
decr_text = _csec.decrypt(enctext)
seresp = samlp.response_from_string(decr_text)
resp_ass = []
sign_cert_file = "pki/mycert.pem"
for enc_ass in seresp.encrypted_assertion:
assers = extension_elements_to_elements(
enc_ass.extension_elements, [saml, samlp])
for ass in assers:
if ass.signature:
if not _csec.verify_signature("%s" % ass,
sign_cert_file,
node_name=class_name(ass)):
continue
resp_ass.append(ass)
seresp.assertion = resp_ass
seresp.encrypted_assertion = None
#print _sresp
assert seresp.assertion
开发者ID:daryllstrauss,项目名称:pysaml2,代码行数:64,代码来源:test_51_client.py
示例11: test_status_success
def test_status_success():
status = utils.success_status_factory()
status_text = "%s" % status
assert status_text == SUCCESS_STATUS
assert status.status_code.value == samlp.STATUS_SUCCESS
开发者ID:domoniquecarter35,项目名称:pysaml2,代码行数:5,代码来源:test_12_s_utils.py
示例12: _response
def _response(self, in_response_to, consumer_url=None, status=None,
issuer=None, sign=False, to_sign=None, sp_entity_id=None,
encrypt_assertion=False, encrypt_assertion_self_contained=False, encrypted_advice_attributes=False,
encrypt_cert_advice=None, encrypt_cert_assertion=None,sign_assertion=None, pefim=False, **kwargs):
""" Create a Response.
Encryption:
encrypt_assertion must be true for encryption to be performed. If encrypted_advice_attributes also is
true, then will the function try to encrypt the assertion in the the advice element of the main
assertion. Only one assertion element is allowed in the advice element, if multiple assertions exists
in the advice element the main assertion will be encrypted instead, since it's no point to encrypt
If encrypted_advice_attributes is
false the main assertion will be encrypted. Since the same key
:param in_response_to: The session identifier of the request
:param consumer_url: The URL which should receive the response
:param status: The status of the response
:param issuer: The issuer of the response
:param sign: Whether the response should be signed or not
:param to_sign: If there are other parts to sign
:param sp_entity_id: Entity ID for the calling service provider.
:param encrypt_assertion: True if assertions should be encrypted.
:param encrypt_assertion_self_contained: True if all encrypted assertions should have alla namespaces
selfcontained.
:param encrypted_advice_attributes: True if assertions in the advice element should be encrypted.
:param encrypt_cert_advice: Certificate to be used for encryption of assertions in the advice element.
:param encrypt_cert_assertion: Certificate to be used for encryption of assertions.
:param sign_assertion: True if assertions should be signed.
:param pefim: True if a response according to the PEFIM profile should be created.
:param kwargs: Extra key word arguments
:return: A Response instance
"""
if not status:
status = success_status_factory()
_issuer = self._issuer(issuer)
response = response_factory(issuer=_issuer,
in_response_to=in_response_to,
status=status)
if consumer_url:
response.destination = consumer_url
self._add_info(response, **kwargs)
if not sign and to_sign and not encrypt_assertion:
return signed_instance_factory(response, self.sec, to_sign)
has_encrypt_cert = self.has_encrypt_cert_in_metadata(sp_entity_id)
if not has_encrypt_cert and encrypt_cert_advice is None:
encrypted_advice_attributes = False
if not has_encrypt_cert and encrypt_cert_assertion is None:
encrypt_assertion = False
if encrypt_assertion or (encrypted_advice_attributes and response.assertion.advice is not None and
len(response.assertion.advice.assertion) == 1):
if sign:
response.signature = pre_signature_part(response.id,
self.sec.my_cert, 1)
sign_class = [(class_name(response), response.id)]
cbxs = CryptoBackendXmlSec1(self.config.xmlsec_binary)
encrypt_advice = False
if encrypted_advice_attributes and response.assertion.advice is not None \
and len(response.assertion.advice.assertion) > 0:
_assertions = response.assertion
if not isinstance(_assertions, list):
_assertions = [_assertions]
for _assertion in _assertions:
_assertion.advice.encrypted_assertion = []
_assertion.advice.encrypted_assertion.append(EncryptedAssertion())
_advice_assertions = copy.deepcopy(_assertion.advice.assertion)
_assertion.advice.assertion = []
if not isinstance(_advice_assertions, list):
_advice_assertions = [_advice_assertions]
for tmp_assertion in _advice_assertions:
to_sign_advice = []
if sign_assertion and not pefim:
tmp_assertion.signature = pre_signature_part(tmp_assertion.id, self.sec.my_cert, 1)
to_sign_advice.append((class_name(tmp_assertion), tmp_assertion.id))
#tmp_assertion = response.assertion.advice.assertion[0]
_assertion.advice.encrypted_assertion[0].add_extension_element(tmp_assertion)
if encrypt_assertion_self_contained:
advice_tag = response.assertion.advice._to_element_tree().tag
assertion_tag = tmp_assertion._to_element_tree().tag
response = \
response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion(
assertion_tag, advice_tag)
node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in
["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]])
if to_sign_advice:
response = signed_instance_factory(response, self.sec, to_sign_advice)
response = self._encrypt_assertion(encrypt_cert_advice, sp_entity_id, response, node_xpath=node_xpath)
response = response_from_string(response)
if encrypt_assertion:
to_sign_assertion = []
if sign_assertion is not None and sign_assertion:
#.........这里部分代码省略.........
开发者ID:WiserTogether,项目名称:pysaml2,代码行数:101,代码来源:entity.py
示例13: logout_response
def logout_response(self, request, bindings, status=None, sign=False, issuer=None):
""" Create a LogoutResponse. What is returned depends on which binding
is used.
:param request: The request this is a response to
:param bindings: Which bindings that can be used to send the response
:param status: The return status of the response operation
:param issuer: The issuer of the message
:return: A 3-tuple consisting of HTTP return code, HTTP headers and
possibly a message.
"""
sp_entity_id = request.issuer.text.strip()
binding = None
destinations = []
for binding in bindings:
destinations = self.conf.single_logout_services(sp_entity_id, binding)
if destinations:
break
if not destinations:
if self.log:
self.log.error("Not way to return a response !!!")
return ("412 Precondition Failed", [("Content-type", "text/html")], ["No return way defined"])
# Pick the first
destination = destinations[0]
if self.log:
self.log.info("Logout Destination: %s, binding: %s" % (destination, binding))
if not status:
status = success_status_factory()
mid = sid()
rcode = "200 OK"
# response and packaging differs depending on binding
if binding == BINDING_SOAP:
response = logoutresponse_factory(sign=sign, id=mid, in_response_to=request.id, status=status)
if sign:
to_sign = [(class_name(response), mid)]
response = signed_instance_factory(response, self.sec, to_sign)
(headers, message) = http_soap_message(response)
else:
_issuer = self.issuer(issuer)
response = logoutresponse_factory(
sign=sign,
id=mid,
in_response_to=request.id,
status=status,
issuer=_issuer,
destination=destination,
sp_entity_id=sp_entity_id,
instant=instant(),
)
if sign:
to_sign = [(class_name(response), mid)]
response = signed_instance_factory(response, self.sec, to_sign)
if self.log:
self.log.info("Response: %s" % (response,))
if binding == BINDING_HTTP_REDIRECT:
(headers, message) = http_redirect_message(response, destination, typ="SAMLResponse")
rcode = "302 Found"
else:
(headers, message) = http_post_message(response, destination, typ="SAMLResponse")
return rcode, headers, message
开发者ID:howow,项目名称:pysaml2,代码行数:70,代码来源:server.py
示例14: _response
def _response(
self,
in_response_to,
consumer_url=None,
sp_entity_id=None,
identity=None,
name_id=None,
status=None,
sign=False,
policy=Policy(),
authn=None,
authn_decl=None,
issuer=None,
):
""" Create a Response that adhers to the ??? profile.
:param in_response_to: The session identifier of the request
:param consumer_url: The URL which should receive the response
:param sp_entity_id: The entity identifier of the SP
:param identity: A dictionary with attributes and values that are
expected to be the bases for the assertion in the response.
:param name_id: The identifier of the subject
:param status: The status of the response
:param sign: Whether the assertion should be signed or not
:param policy: The attribute release policy for this instance
:param authn: A 2-tuple denoting the authn class and the authn
authority
:param authn_decl:
:param issuer: The issuer of the response
:return: A Response instance
"""
to_sign = []
if not status:
status = success_status_factory()
_issuer = self.issuer(issuer)
response = response_factory(issuer=_issuer, in_response_to=in_response_to, status=status)
if consumer_url:
response.destination = consumer_url
if identity:
ast = Assertion(identity)
try:
ast.apply_policy(sp_entity_id, policy, self.metadata)
except MissingValue, exc:
return self.error_response(in_response_to, consumer_url, sp_entity_id, exc, name_id)
if authn: # expected to be a 2-tuple class+authority
(authn_class, authn_authn) = authn
assertion = ast.construct(
sp_entity_id,
in_response_to,
consumer_url,
name_id,
self.conf.attribute_converters,
policy,
issuer=_issuer,
authn_class=authn_class,
authn_auth=authn_authn,
)
elif authn_decl:
assertion = ast.construct(
sp_entity_id,
in_response_to,
consumer_url,
name_id,
self.conf.attribute_converters,
policy,
issuer=_issuer,
authn_decl=authn_decl,
)
else:
assertion = ast.construct(
sp_entity_id,
in_response_to,
consumer_url,
name_id,
self.conf.attribute_converters,
policy,
issuer=_issuer,
)
if sign:
assertion.signature = pre_signature_part(assertion.id, self.sec.my_cert, 1)
# Just the assertion or the response and the assertion ?
to_sign = [(class_name(assertion), assertion.id)]
# Store which assertion that has been sent to which SP about which
# subject.
# self.cache.set(assertion.subject.name_id.text,
# sp_entity_id, {"ava": identity, "authn": authn},
# assertion.conditions.not_on_or_after)
response.assertion = assertion
开发者ID:howow,项目名称:pysaml2,代码行数:99,代码来源:server.py
示例15: test_sign_then_encrypt_assertion_advice
def test_sign_then_encrypt_assertion_advice(self):
# Begin with the IdPs side
_sec = self.server.sec
nameid_policy = samlp.NameIDPolicy(allow_create="false",
format=saml.NAMEID_FORMAT_PERSISTENT)
asser = Assertion({"givenName": "Derek", "surName": "Jeter"})
assertion = asser.construct(
self.client.config.entityid, "_012345",
"http://lingon.catalogix.se:8087/",
factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT),
policy=self.server.config.getattr("policy", "idp"),
issuer=self.server._issuer(),
attrconvs=self.server.config.attribute_converters,
authn_class=INTERNETPROTOCOLPASSWORD,
authn_auth="http://www.example.com/login")
a_asser = Assertion({"uid": "test01", "email": "[email protected]"})
a_assertion = a_asser.construct(
self.client.config.entityid, "_012345",
"http://lingon.catalogix.se:8087/",
factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT),
policy=self.server.config.getattr("policy", "idp"),
issuer=self.server._issuer(),
attrconvs=self.server.config.attribute_converters,
authn_class=INTERNETPROTOCOLPASSWORD,
authn_auth="http://www.example.com/login")
a_assertion.signature = sigver.pre_signature_part(
a_assertion.id, _sec.my_cert, 1)
assertion.advice = Advice()
assertion.advice.encrypted_assertion = []
assertion.advice.encrypted_assertion.append(EncryptedAssertion())
assertion.advice.encrypted_assertion[0].add_extension_element(a_assertion)
response = sigver.response_factory(
in_response_to="_012345",
destination="http://lingon.catalogix.se:8087/",
status=s_utils.success_status_factory(),
issuer=self.server._issuer()
)
response.assertion.append(assertion)
response = _sec.sign_statement("%s" % response, class_name(a_assertion),
key_file=self.client.sec.key_file,
node_id=a_assertion.id)
#xmldoc = "%s" % response
# strangely enough I get different tags if I run this test separately
# or as part of a bunch of tests.
#xmldoc = add_subelement(xmldoc, "EncryptedAssertion", sigass)
node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in
["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]])
enctext = _sec.crypto.encrypt_assertion(response, _sec.cert_file,
pre_encryption_part(), node_xpath=node_xpath)
#seresp = samlp.response_from_string(enctext)
resp_str = base64.encodestring(enctext)
# Now over to the client side
resp = self.client.parse_authn_request_response(
resp_str, BINDING_HTTP_POST,
{"_012345": "http://foo.example.com/service"})
#assert resp.encrypted_assertion == []
assert resp.assertion
assert resp.assertion.advice
assert resp.assertion.advice.assertion
assert resp.ava == \
{'sn': ['Jeter'], 'givenName': ['Derek'], 'uid': ['test01'], 'email': ['[email protected]']}
开发者ID:Itxaka,项目名称:pysaml2,代码行数:77,代码来源:test_51_client.py
示例16: _response
def _response(self, in_response_to, consumer_url=None, status=None,
issuer=None, sign=False, to_sign=None,
encrypt_assertion=False, encrypt_assertion_self_contained=False, encrypted_advice_attributes=False,
encrypt_cert=None, **kwargs):
""" Create a Response.
Encryption:
encrypt_assertion must be true for encryption to be performed. If encrypted_advice_attributes also is
true, then will the function try to encrypt the assertion in the the advice element of the main
assertion. Only one assertion element is allowed in the advice element, if multiple assertions exists
in the advice element the main assertion will be encrypted instead, since it's no point to encrypt
If encrypted_advice_attributes is
false the main assertion will be encrypted. Since the same key
:param in_response_to: The session identifier of the request
:param consumer_url: The URL which should receive the response
:param status: The status of the response
:param issuer: The issuer of the response
:param sign: Whether the response should be signed or not
:param to_sign: If there are other parts to sign
:param kwargs: Extra key word arguments
:return: A Response instance
"""
if not status:
status = success_status_factory()
_issuer = self._issuer(issuer)
response = response_factory(issuer=_issuer,
in_response_to=in_response_to,
status=status)
if consumer_url:
response.destination = consumer_url
self._add_info(response, **kwargs)
if not sign and to_sign and not encrypt_assertion:
return signed_instance_factory(response, self.sec, to_sign)
if encrypt_assertion:
node_xpath = None
if sign:
response.signature = pre_signature_part(response.id,
self.sec.my_cert, 1)
sign_class = [(class_name(response), response.id)]
cbxs = CryptoBackendXmlSec1(self.config.xmlsec_binary)
if encrypted_advice_attributes and response.assertion.advice is not None \
and len(response.assertion.advice.assertion) == 1:
tmp_assertion = response.assertion.advice.assertion[0]
response.assertion.advice.encrypted_assertion = []
response.assertion.advice.encrypted_assertion.append(EncryptedAssertion())
if isinstance(tmp_assertion, list):
response.assertion.advice.encrypted_assertion[0].add_extension_elements(tmp_assertion)
else:
response.assertion.advice.encrypted_assertion[0].add_extension_element(tmp_assertion)
response.assertion.advice.assertion = []
if encrypt_assertion_self_contained:
advice_tag = response.assertion.advice._to_element_tree().tag
assertion_tag = tmp_assertion._to_element_tree().tag
response = response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion(
assertion_tag, advice_tag)
node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in
["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]])
elif encrypt_assertion_self_contained:
assertion_tag = response.assertion._to_element_tree().tag
response = pre_encrypt_assertion(response)
response = response.get_xml_string_with_self_contained_assertion_within_encrypted_assertion(
assertion_tag)
else:
response = pre_encrypt_assertion(response)
if to_sign:
response = signed_instance_factory(response, self.sec, to_sign)
_, cert_file = make_temp("%s" % encrypt_cert, decode=False)
response = cbxs.encrypt_assertion(response, cert_file,
pre_encryption_part(), node_xpath=node_xpath)
# template(response.assertion.id))
if sign:
return signed_instance_factory(response, self.sec, sign_class)
else:
return response
if sign:
return self.sign(response, to_sign=to_sign)
else:
return response
开发者ID:Itxaka,项目名称:pysaml2,代码行数:86,代码来源:entity.py
注:本文中的saml2.s_utils.success_status_factory函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论