本文整理汇总了Python中saml2.assertion.Assertion类的典型用法代码示例。如果您正苦于以下问题:Python Assertion类的具体用法?Python Assertion怎么用?Python Assertion使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Assertion类的19个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_authz_decision_query
def test_authz_decision_query(self):
conf = config.SPConfig()
conf.load_file("server3_conf")
client = Saml2Client(conf)
AVA = {'mail': u'[email protected]',
'eduPersonTargetedID': '95e9ae91dbe62d35198fbbd5e1fb0976',
'displayName': u'Roland Hedberg',
'uid': 'http://roland.hedberg.myopenid.com/'}
sp_entity_id = "sp_entity_id"
in_response_to = "1234"
consumer_url = "http://example.com/consumer"
name_id = saml.NameID(saml.NAMEID_FORMAT_TRANSIENT, text="name_id")
policy = Policy()
ava = Assertion(AVA)
assertion = ava.construct(sp_entity_id, in_response_to,
consumer_url, name_id,
conf.attribute_converters,
policy, issuer=client.issuer())
adq = client.authz_decision_query_using_assertion("entity_id",
assertion,
"read",
"http://example.com/text")
assert adq
print adq
assert adq.keyswv() != []
assert adq.destination == "entity_id"
assert adq.resource == "http://example.com/text"
assert adq.action[0].text == "read"
开发者ID:howow,项目名称:pysaml2,代码行数:32,代码来源:test_51_client.py
示例2: test_assertion_with_authn_instant
def test_assertion_with_authn_instant():
ava = {}
ast = Assertion(ava)
policy = Policy(
{
"default": {
"lifetime": {"minutes": 240},
"attribute_restrictions": None, # means all I have
"name_form": NAME_FORMAT_URI,
}
}
)
name_id = NameID(format=NAMEID_FORMAT_TRANSIENT, text="foobar")
issuer = Issuer(text="entityid", format=NAMEID_FORMAT_ENTITY)
msg = ast.construct(
"sp_entity_id",
"in_response_to",
"consumer_url",
name_id,
[AttributeConverterNOOP(NAME_FORMAT_URI)],
policy,
issuer=issuer,
authn_decl=ACD,
authn_auth="authn_authn",
authn_instant=1234567890,
)
print msg
assert msg.authn_statement[0].authn_instant == "2009-02-13T23:31:30Z"
开发者ID:justquick,项目名称:pysaml2,代码行数:29,代码来源:test_20_assertion.py
示例3: test_assertion_with_noop_attribute_conv
def test_assertion_with_noop_attribute_conv():
ava = {"urn:oid:2.5.4.4": "Roland", "urn:oid:2.5.4.42": "Hedberg" }
ast = Assertion(ava)
policy = Policy({
"default": {
"lifetime": {"minutes": 240},
"attribute_restrictions": None, # means all I have
"name_form": NAME_FORMAT_URI
},
})
name_id = NameID(format=NAMEID_FORMAT_TRANSIENT, text="foobar")
issuer = Issuer(text="entityid", format=NAMEID_FORMAT_ENTITY)
msg = ast.construct("sp_entity_id", "in_response_to", "consumer_url",
name_id, [AttributeConverterNOOP(NAME_FORMAT_URI)],
policy, issuer=issuer, authn_decl=ACD ,
authn_auth="authn_authn")
print msg
for attr in msg.attribute_statement[0].attribute:
assert attr.name_format == NAME_FORMAT_URI
assert len(attr.attribute_value) == 1
if attr.name == "urn:oid:2.5.4.42":
assert attr.attribute_value[0].text == "Hedberg"
elif attr.name == "urn:oid:2.5.4.4":
assert attr.attribute_value[0].text == "Roland"
开发者ID:jakew1ll,项目名称:pysaml2,代码行数:25,代码来源:test_20_assertion.py
示例4: test_assertion_with_authn_instant
def test_assertion_with_authn_instant():
ava = {}
ast = Assertion(ava)
policy = Policy({
"default": {
"lifetime": {"minutes": 240},
"attribute_restrictions": None, # means all I have
"name_form": NAME_FORMAT_URI
},
})
name_id = NameID(format=NAMEID_FORMAT_TRANSIENT, text="foobar")
issuer = Issuer(text="entityid", format=NAMEID_FORMAT_ENTITY)
farg = add_path(
{},
['subject', 'subject_confirmation', 'method', saml.SCM_BEARER])
add_path(
farg['subject']['subject_confirmation'],
['subject_confirmation_data', 'in_response_to', 'in_response_to'])
add_path(
farg['subject']['subject_confirmation'],
['subject_confirmation_data', 'recipient', 'consumer_url'])
msg = ast.construct(
"sp_entity_id", [AttributeConverterNOOP(NAME_FORMAT_URI)], policy,
issuer=issuer, authn_decl=ACD, authn_auth="authn_authn",
authn_instant=1234567890, name_id=name_id, farg=farg)
print(msg)
assert msg.authn_statement[0].authn_instant == "2009-02-13T23:31:30Z"
开发者ID:SUNET,项目名称:pysaml2,代码行数:30,代码来源:test_20_assertion.py
示例5: create_attribute_response
def create_attribute_response(self, identity, in_response_to, destination,
sp_entity_id, userid="", name_id=None,
status=None, issuer=None,
sign_assertion=False, sign_response=False,
attributes=None):
""" Create an attribute assertion response.
:param identity: A dictionary with attributes and values that are
expected to be the bases for the assertion in the response.
:param in_response_to: The session identifier of the request
:param destination: The URL which should receive the response
:param sp_entity_id: The entity identifier of the SP
:param userid: A identifier of the user
:param name_id: The identifier of the subject
:param status: The status of the response
:param issuer: The issuer of the response
:param sign_assertion: Whether the assertion should be signed or not
:param sign_response: Whether the whole response should be signed
:param attributes:
:return: A response instance
"""
if not name_id and userid:
try:
name_id = self.ident.construct_nameid(userid,
self.config.policy,
sp_entity_id)
logger.warning("Unspecified NameID format")
except Exception:
pass
to_sign = []
args = {}
if identity:
_issuer = self._issuer(issuer)
ast = Assertion(identity)
policy = self.config.getattr("policy", "aa")
if policy:
ast.apply_policy(sp_entity_id, policy)
else:
policy = Policy()
if attributes:
restr = restriction_from_attribute_spec(attributes)
ast = filter_attribute_value_assertions(ast)
assertion = ast.construct(sp_entity_id, in_response_to,
destination, name_id,
self.config.attribute_converters,
policy, issuer=_issuer)
if sign_assertion:
assertion.signature = pre_signature_part(assertion.id,
self.sec.my_cert, 1)
# Just the assertion or the response and the assertion ?
to_sign = [(class_name(assertion), assertion.id)]
args["assertion"] = assertion
return self._response(in_response_to, destination, status, issuer,
sign_response, to_sign, **args)
开发者ID:caustin,项目名称:pysaml2,代码行数:60,代码来源:server.py
示例6: post_auth
def post_auth(authData):
for t in authData:
if t[0] == 'Stripped-User-Name':
userName = t[1][1:-1]
elif t[0] == 'User-Password':
userPassword = t[1][1:-1]
identity = ldap_attributes(userName, userPassword)
if identity == None:
return radiusd.RLM_MODULE_FAIL
indentityFiltered = {k: identity[k] for k in set(ATTRS) & set(identity.keys())}
print {k: identity[k] for k in set(ATTRS) & set(identity.keys())}
policy = Policy({
'default': {
'lifetime': {'minutes': 60},
'attribute_restrictions': None,
'name_form': NAME_FORMAT_URI
}
})
name_id = NameID(format=NAMEID_FORMAT_TRANSIENT, text='urn:mace:' + LDAP_SERVER)
issuer = Issuer(text='moonshot.' + LDAP_SERVER, format=NAMEID_FORMAT_ENTITY)
ast = Assertion(indentityFiltered)
assertion = ast.construct('', '', '',
name_id, [AttributeConverterNOOP(NAME_FORMAT_URI)],
policy, issuer=issuer)
assertion = str(assertion).replace('\n', '')
attr = 'SAML-AAA-Assertion'
result = (tuple([(attr, x) for x in eq_len_parts('%s' % assertion)]))
return radiusd.RLM_MODULE_UPDATED, result, None
开发者ID:gierschv,项目名称:freeradius_samlldap,代码行数:34,代码来源:freeradius_samlldap.py
示例7: test_assertion_2
def test_assertion_2():
AVA = {'mail': u'[email protected]',
'eduPersonTargetedID': 'http://lingon.ladok.umu.se:8090/idp!http://lingon.ladok.umu.se:8088/sp!95e9ae91dbe62d35198fbbd5e1fb0976',
'displayName': u'Roland Hedberg',
'uid': 'http://roland.hedberg.myopenid.com/'}
ava = Assertion(AVA)
policy = Policy( {
"default": {
"lifetime": {"minutes": 240},
"attribute_restrictions": None, # means all I have
"name_form": NAME_FORMAT_URI
},
})
ava = ava.apply_policy( "", policy )
acs = ac_factory("attributemaps")
attribute=from_local(acs, ava, policy.get_name_form(""))
assert len(attribute) == 4
names = [attr.name for attr in attribute]
assert _eq(names, ['urn:oid:0.9.2342.19200300.100.1.3',
'urn:oid:1.3.6.1.4.1.5923.1.1.1.10',
'urn:oid:2.16.840.1.113730.3.1.241',
'urn:oid:0.9.2342.19200300.100.1.1'])
开发者ID:evansd,项目名称:pysaml2,代码行数:26,代码来源:test_20_assertion.py
示例8: test_assertion_1
def test_assertion_1(AVA):
ava = Assertion(AVA[0])
print ava
print ava.__dict__
policy = Policy({
"default": {
"attribute_restrictions": {
"givenName": ["^R.*"],
}
}
})
ava = ava.apply_policy( "", policy )
print ava
assert _eq(ava.keys(), [])
ava = Assertion(AVA[1].copy())
ava = ava.apply_policy( "", policy )
assert _eq(ava.keys(), ["givenName"])
assert ava["givenName"] == ["Ryan"]
ava = Assertion(AVA[3].copy())
ava = ava.apply_policy( "", policy )
assert _eq(ava.keys(), ["givenName"])
assert ava["givenName"] == ["Roland"]
开发者ID:evansd,项目名称:pysaml2,代码行数:28,代码来源:test_20_assertion.py
示例9: test_sign_then_encrypt_assertion2
def test_sign_then_encrypt_assertion2(self):
# Begin with the IdPs side
_sec = self.server.sec
nameid_policy = samlp.NameIDPolicy(allow_create="false",
format=saml.NAMEID_FORMAT_PERSISTENT)
asser = Assertion({"givenName": "Derek", "surName": "Jeter"})
assertion = asser.construct(
self.client.config.entityid, "_012345",
"http://lingon.catalogix.se:8087/",
factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT),
policy=self.server.config.getattr("policy", "idp"),
issuer=self.server._issuer(),
attrconvs=self.server.config.attribute_converters,
authn_class=INTERNETPROTOCOLPASSWORD,
authn_auth="http://www.example.com/login")
assertion.signature = sigver.pre_signature_part(
assertion.id, _sec.my_cert, 1)
sigass = _sec.sign_statement(assertion, class_name(assertion),
key_file=self.client.sec.key_file,
node_id=assertion.id)
sigass = rm_xmltag(sigass)
response = sigver.response_factory(
in_response_to="_012345",
destination="https://www.example.com",
status=s_utils.success_status_factory(),
issuer=self.server._issuer(),
encrypted_assertion=EncryptedAssertion()
)
xmldoc = "%s" % response
# strangely enough I get different tags if I run this test separately
# or as part of a bunch of tests.
xmldoc = add_subelement(xmldoc, "EncryptedAssertion", sigass)
enctext = _sec.crypto.encrypt_assertion(xmldoc, _sec.cert_file,
pre_encryption_part())
#seresp = samlp.response_from_string(enctext)
resp_str = base64.encodestring(enctext)
# Now over to the client side
resp = self.client.parse_authn_request_response(
resp_str, BINDING_HTTP_POST,
{"_012345": "http://foo.example.com/service"})
#assert resp.encrypted_assertion == []
assert resp.assertion
assert resp.ava == {'givenName': ['Derek'], 'sn': ['Jeter']}
开发者ID:daryllstrauss,项目名称:pysaml2,代码行数:54,代码来源:test_51_client.py
示例10: setup_assertion
def setup_assertion(self, authn, sp_entity_id, in_response_to, consumer_url, name_id, policy, _issuer,
authn_statement, identity, best_effort, sign_response, add_subject=True):
ast = Assertion(identity)
ast.acs = self.config.getattr("attribute_converters", "idp")
if policy is None:
policy = Policy()
try:
ast.apply_policy(sp_entity_id, policy, self.metadata)
except MissingValue, exc:
if not best_effort:
return self.create_error_response(in_response_to, consumer_url,
exc, sign_response)
开发者ID:Itxaka,项目名称:pysaml2,代码行数:12,代码来源:server.py
示例11: setup_assertion
def setup_assertion(self, authn, sp_entity_id, in_response_to, consumer_url,
name_id, policy, _issuer, authn_statement, identity,
best_effort, sign_response, farg=None, **kwargs):
"""
Construct and return the Assertion
:param authn: Authentication information
:param sp_entity_id:
:param in_response_to: The ID of the request this is an answer to
:param consumer_url: The recipient of the assertion
:param name_id: The NameID of the subject
:param policy: Assertion policies
:param _issuer: Issuer of the statement
:param authn_statement: An AuthnStatement instance
:param identity: Identity information about the Subject
:param best_effort: Even if not the SPs demands can be met send a
response.
:param sign_response: Sign the response, only applicable if
ErrorResponse
:param kwargs: Extra keyword arguments
:return: An Assertion instance
"""
ast = Assertion(identity)
ast.acs = self.config.getattr("attribute_converters", "idp")
if policy is None:
policy = Policy()
try:
ast.apply_policy(sp_entity_id, policy, self.metadata)
except MissingValue as exc:
if not best_effort:
return self.create_error_response(in_response_to, consumer_url,
exc, sign_response)
farg = self.update_farg(in_response_to, consumer_url, farg)
if authn: # expected to be a dictionary
# Would like to use dict comprehension but ...
authn_args = dict(
[(AUTHN_DICT_MAP[k], v) for k, v in authn.items() if
k in AUTHN_DICT_MAP])
authn_args.update(kwargs)
assertion = ast.construct(
sp_entity_id, self.config.attribute_converters, policy,
issuer=_issuer, farg=farg['assertion'], name_id=name_id,
**authn_args)
elif authn_statement: # Got a complete AuthnStatement
assertion = ast.construct(
sp_entity_id, self.config.attribute_converters, policy,
issuer=_issuer, authn_statem=authn_statement,
farg=farg['assertion'], name_id=name_id, **kwargs)
else:
assertion = ast.construct(
sp_entity_id, self.config.attribute_converters, policy,
issuer=_issuer, farg=farg['assertion'], name_id=name_id,
**kwargs)
return assertion
开发者ID:Amli,项目名称:pysaml2,代码行数:59,代码来源:server.py
示例12: _authn_response
def _authn_response(self, in_response_to, consumer_url,
sp_entity_id, identity=None, name_id=None,
status=None, authn=None, issuer=None, policy=None,
sign_assertion=False, sign_response=False,
best_effort=False, encrypt_assertion=False,
encrypt_cert=None, authn_statement=None):
""" Create a response. A layer of indirection.
:param in_response_to: The session identifier of the request
:param consumer_url: The URL which should receive the response
:param sp_entity_id: The entity identifier of the SP
:param identity: A dictionary with attributes and values that are
expected to be the bases for the assertion in the response.
:param name_id: The identifier of the subject
:param status: The status of the response
:param authn: A dictionary containing information about the
authn context.
:param issuer: The issuer of the response
:param sign_assertion: Whether the assertion should be signed or not
:param sign_response: Whether the response should be signed or not
:param best_effort: Even if not the SPs demands can be met send a
response.
:return: A response instance
"""
to_sign = []
args = {}
#if identity:
_issuer = self._issuer(issuer)
ast = Assertion(identity)
ast.acs = self.config.getattr("attribute_converters", "idp")
if policy is None:
policy = Policy()
try:
ast.apply_policy(sp_entity_id, policy, self.metadata)
except MissingValue, exc:
if not best_effort:
return self.create_error_response(in_response_to, consumer_url,
exc, sign_response)
开发者ID:18600597055,项目名称:hue,代码行数:39,代码来源:server.py
示例13: test_assertion_2
def test_assertion_2():
AVA = {
"mail": u"[email protected]",
"eduPersonTargetedID": "http://lingon.ladok.umu"
".se:8090/idp!http://lingon.ladok.umu"
".se:8088/sp!95e9ae91dbe62d35198fbbd5e1fb0976",
"displayName": u"Roland Hedberg",
"uid": "http://roland.hedberg.myopenid.com/",
}
ava = Assertion(AVA)
policy = Policy(
{
"default": {
"lifetime": {"minutes": 240},
"attribute_restrictions": None, # means all I have
"name_form": NAME_FORMAT_URI,
}
}
)
ava = ava.apply_policy("", policy)
acs = ac_factory(full_path("attributemaps"))
attribute = from_local(acs, ava, policy.get_name_form(""))
assert len(attribute) == 4
names = [attr.name for attr in attribute]
assert _eq(
names,
[
"urn:oid:0.9.2342.19200300.100.1.3",
"urn:oid:1.3.6.1.4.1.5923.1.1.1.10",
"urn:oid:2.16.840.1.113730.3.1.241",
"urn:oid:0.9.2342.19200300.100.1.1",
],
)
开发者ID:justquick,项目名称:pysaml2,代码行数:37,代码来源:test_20_assertion.py
示例14: test_assertion_with_noop_attribute_conv
def test_assertion_with_noop_attribute_conv():
ava = {"urn:oid:2.5.4.4": "Roland", "urn:oid:2.5.4.42": "Hedberg"}
ast = Assertion(ava)
policy = Policy({
"default": {
"lifetime": {"minutes": 240},
"attribute_restrictions": None, # means all I have
"name_form": NAME_FORMAT_URI
},
})
name_id = NameID(format=NAMEID_FORMAT_TRANSIENT, text="foobar")
issuer = Issuer(text="entityid", format=NAMEID_FORMAT_ENTITY)
farg = add_path(
{},
['subject', 'subject_confirmation', 'method', saml.SCM_BEARER])
add_path(
farg['subject']['subject_confirmation'],
['subject_confirmation_data', 'in_response_to', 'in_response_to'])
add_path(
farg['subject']['subject_confirmation'],
['subject_confirmation_data', 'recipient', 'consumer_url'])
msg = ast.construct(
"sp_entity_id", [AttributeConverterNOOP(NAME_FORMAT_URI)], policy,
issuer=issuer, farg=farg, authn_decl=ACD, name_id=name_id,
authn_auth="authn_authn")
print(msg)
for attr in msg.attribute_statement[0].attribute:
assert attr.name_format == NAME_FORMAT_URI
assert len(attr.attribute_value) == 1
if attr.name == "urn:oid:2.5.4.42":
assert attr.attribute_value[0].text == "Hedberg"
elif attr.name == "urn:oid:2.5.4.4":
assert attr.attribute_value[0].text == "Roland"
开发者ID:SUNET,项目名称:pysaml2,代码行数:36,代码来源:test_20_assertion.py
示例15: setup_assertion
def setup_assertion(self, authn, sp_entity_id, in_response_to, consumer_url,
name_id, policy, _issuer,
authn_statement, identity, best_effort, sign_response,
add_subject=True):
ast = Assertion(identity)
ast.acs = self.config.getattr("attribute_converters", "idp")
if policy is None:
policy = Policy()
try:
ast.apply_policy(sp_entity_id, policy, self.metadata)
except MissingValue as exc:
if not best_effort:
return self.create_error_response(in_response_to, consumer_url,
exc, sign_response)
if authn: # expected to be a dictionary
# Would like to use dict comprehension but ...
authn_args = dict([
(AUTHN_DICT_MAP[k], v) for k, v in
authn.items()
if k in AUTHN_DICT_MAP])
assertion = ast.construct(sp_entity_id, in_response_to,
consumer_url, name_id,
self.config.attribute_converters,
policy, issuer=_issuer,
add_subject=add_subject,
**authn_args)
elif authn_statement: # Got a complete AuthnStatement
assertion = ast.construct(sp_entity_id, in_response_to,
consumer_url, name_id,
self.config.attribute_converters,
policy, issuer=_issuer,
authn_statem=authn_statement,
add_subject=add_subject)
else:
assertion = ast.construct(sp_entity_id, in_response_to,
consumer_url, name_id,
self.config.attribute_converters,
policy, issuer=_issuer,
add_subject=add_subject)
return assertion
开发者ID:Lefford,项目名称:pysaml2,代码行数:42,代码来源:server.py
示例16: test_sign_then_encrypt_assertion_advice
def test_sign_then_encrypt_assertion_advice(self):
# Begin with the IdPs side
_sec = self.server.sec
nameid_policy = samlp.NameIDPolicy(allow_create="false",
format=saml.NAMEID_FORMAT_PERSISTENT)
asser = Assertion({"givenName": "Derek", "surName": "Jeter"})
assertion = asser.construct(
self.client.config.entityid, "_012345",
"http://lingon.catalogix.se:8087/",
factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT),
policy=self.server.config.getattr("policy", "idp"),
issuer=self.server._issuer(),
attrconvs=self.server.config.attribute_converters,
authn_class=INTERNETPROTOCOLPASSWORD,
authn_auth="http://www.example.com/login")
a_asser = Assertion({"uid": "test01", "email": "[email protected]"})
a_assertion = a_asser.construct(
self.client.config.entityid, "_012345",
"http://lingon.catalogix.se:8087/",
factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT),
policy=self.server.config.getattr("policy", "idp"),
issuer=self.server._issuer(),
attrconvs=self.server.config.attribute_converters,
authn_class=INTERNETPROTOCOLPASSWORD,
authn_auth="http://www.example.com/login")
a_assertion.signature = sigver.pre_signature_part(
a_assertion.id, _sec.my_cert, 1)
assertion.advice = Advice()
assertion.advice.encrypted_assertion = []
assertion.advice.encrypted_assertion.append(EncryptedAssertion())
assertion.advice.encrypted_assertion[0].add_extension_element(a_assertion)
response = sigver.response_factory(
in_response_to="_012345",
destination="http://lingon.catalogix.se:8087/",
status=s_utils.success_status_factory(),
issuer=self.server._issuer()
)
response.assertion.append(assertion)
response = _sec.sign_statement("%s" % response, class_name(a_assertion),
key_file=self.client.sec.key_file,
node_id=a_assertion.id)
#xmldoc = "%s" % response
# strangely enough I get different tags if I run this test separately
# or as part of a bunch of tests.
#xmldoc = add_subelement(xmldoc, "EncryptedAssertion", sigass)
node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in
["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]])
enctext = _sec.crypto.encrypt_assertion(response, _sec.cert_file,
pre_encryption_part(), node_xpath=node_xpath)
#seresp = samlp.response_from_string(enctext)
resp_str = base64.encodestring(enctext)
# Now over to the client side
resp = self.client.parse_authn_request_response(
resp_str, BINDING_HTTP_POST,
{"_012345": "http://foo.example.com/service"})
#assert resp.encrypted_assertion == []
assert resp.assertion
assert resp.assertion.advice
assert resp.assertion.advice.assertion
assert resp.ava == \
{'sn': ['Jeter'], 'givenName': ['Derek'], 'uid': ['test01'], 'email': ['[email protected]']}
开发者ID:Itxaka,项目名称:pysaml2,代码行数:77,代码来源:test_51_client.py
示例17: _authn_response
def _authn_response(self, in_response_to, consumer_url,
sp_entity_id, identity=None, name_id=None,
status=None, authn=None, issuer=None, policy=None,
sign_assertion=False, sign_response=False,
best_effort=False, encrypt_assertion=False, encrypt_cert=None):
""" Create a response. A layer of indirection.
:param in_response_to: The session identifier of the request
:param consumer_url: The URL which should receive the response
:param sp_entity_id: The entity identifier of the SP
:param identity: A dictionary with attributes and values that are
expected to be the bases for the assertion in the response.
:param name_id: The identifier of the subject
:param status: The status of the response
:param authn: A dictionary containing information about the
authn context.
:param issuer: The issuer of the response
:param sign_assertion: Whether the assertion should be signed or not
:param sign_response: Whether the response should be signed or not
:param best_effort: Even if not the SPs demands can be met send a
response.
:return: A response instance
"""
to_sign = []
args = {}
#if identity:
_issuer = self._issuer(issuer)
ast = Assertion(identity)
if policy is None:
policy = Policy()
try:
ast.apply_policy(sp_entity_id, policy, self.metadata)
except MissingValue as exc:
if not best_effort:
return self.create_error_response(in_response_to, consumer_url,
exc, sign_response)
if authn: # expected to be a dictionary
# Would like to use dict comprehension but ...
authn_args = dict([
(AUTHN_DICT_MAP[k], v) for k, v in list(authn.items())
if k in AUTHN_DICT_MAP])
assertion = ast.construct(sp_entity_id, in_response_to,
consumer_url, name_id,
self.config.attribute_converters,
policy, issuer=_issuer,
**authn_args)
else:
assertion = ast.construct(sp_entity_id, in_response_to,
consumer_url, name_id,
self.config.attribute_converters,
policy, issuer=_issuer)
if sign_assertion:
assertion.signature = pre_signature_part(assertion.id,
self.sec.my_cert, 1)
# Just the assertion or the response and the assertion ?
to_sign = [(class_name(assertion), assertion.id)]
# Store which assertion that has been sent to which SP about which
# subject.
# self.cache.set(assertion.subject.name_id.text,
# sp_entity_id, {"ava": identity, "authn": authn},
# assertion.conditions.not_on_or_after)
args["assertion"] = assertion
if self.support_AssertionIDRequest() or self.support_AuthnQuery():
self.session_db.store_assertion(assertion, to_sign)
return self._response(in_response_to, consumer_url, status, issuer,
sign_response, to_sign, encrypt_assertion=encrypt_assertion,
encrypt_cert=encrypt_cert, **args)
开发者ID:rohe,项目名称:pysaml2-3,代码行数:76,代码来源:server.py
示例18: _response
def _response(
self,
in_response_to,
consumer_url=None,
sp_entity_id=None,
identity=None,
name_id=None,
status=None,
sign=False,
policy=Policy(),
authn=None,
authn_decl=None,
issuer=None,
):
""" Create a Response that adhers to the ??? profile.
:param in_response_to: The session identifier of the request
:param consumer_url: The URL which should receive the response
:param sp_entity_id: The entity identifier of the SP
:param identity: A dictionary with attributes and values that are
expected to be the bases for the assertion in the response.
:param name_id: The identifier of the subject
:param status: The status of the response
:param sign: Whether the assertion should be signed or not
:param policy: The attribute release policy for this instance
:param authn: A 2-tuple denoting the authn class and the authn
authority
:param authn_decl:
:param issuer: The issuer of the response
:return: A Response instance
"""
to_sign = []
if not status:
status = success_status_factory()
_issuer = self.issuer(issuer)
response = response_factory(issuer=_issuer, in_response_to=in_response_to, status=status)
if consumer_url:
response.destination = consumer_url
if identity:
ast = Assertion(identity)
try:
ast.apply_policy(sp_entity_id, policy, self.metadata)
except MissingValue, exc:
return self.error_response(in_response_to, consumer_url, sp_entity_id, exc, name_id)
if authn: # expected to be a 2-tuple class+authority
(authn_class, authn_authn) = authn
assertion = ast.construct(
sp_entity_id,
in_response_to,
consumer_url,
name_id,
self.conf.attribute_converters,
policy,
issuer=_issuer,
authn_class=authn_class,
authn_auth=authn_authn,
)
elif authn_decl:
assertion = ast.construct(
sp_entity_id,
in_response_to,
consumer_url,
name_id,
self.conf.attribute_converters,
policy,
issuer=_issuer,
authn_decl=authn_decl,
)
else:
assertion = ast.construct(
sp_entity_id,
in_response_to,
consumer_url,
name_id,
self.conf.attribute_converters,
policy,
issuer=_issuer,
)
if sign:
assertion.signature = pre_signature_part(assertion.id, self.sec.my_cert, 1)
# Just the assertion or the response and the assertion ?
to_sign = [(class_name(assertion), assertion.id)]
# Store which assertion that has been sent to which SP about which
# subject.
# self.cache.set(assertion.subject.name_id.text,
# sp_entity_id, {"ava": identity, "authn": authn},
# assertion.conditions.not_on_or_after)
response.assertion = assertion
开发者ID:howow,项目名称:pysaml2,代码行数:99,代码来源:server.py
示例19: _authn_response
def _authn_response(self, in_response_to, consumer_url,
sp_entity_id, identity=None, name_id=None,
status=None, authn=None, issuer=None, policy=None,
sign_assertion=False, sign_response=False):
""" Create a response. A layer of indirection.
:param in_response_to: The session identifier of the request
:param consumer_url: The URL which should receive the response
:param sp_entity_id: The entity identifier of the SP
:param identity: A dictionary with attributes and values that are
expected to be the bases for the assertion in the response.
:param name_id: The identifier of the subject
:param status: The status of the response
:param authn: A dictionary containing information about the
authn context.
:param issuer: The issuer of the response
:param sign_assertion: Whether the assertion should be signed or not
:param sign_response: Whether the response should be signed or not
:return: A response instance
"""
to_sign = []
args = {}
if identity:
_issuer = self._issuer(issuer)
ast = Assertion(identity)
if policy is None:
policy = Policy()
try:
ast.apply_policy(sp_entity_id, policy, self.metadata)
except MissingValue, exc:
return self.create_error_response(in_response_to, consumer_url,
exc, sign_response)
if authn: # expected to be a dictionary
if "decl" in authn:
assertion = ast.construct(sp_entity_id, in_response_to,
consumer_url, name_id,
self.config.attribute_converters,
policy, issuer=_issuer,
authn_decl=authn["decl"],
authn_auth=authn["authn_auth"])
else:
assertion = ast.construct(sp_entity_id, in_response_to,
consumer_url, name_id,
self.config.attribute_converters,
policy, issuer=_issuer,
authn_class=authn["class_ref"],
authn_auth=authn["authn_auth"])
else:
assertion = ast.construct(sp_entity_id, in_response_to,
consumer_url, name_id,
self.config.attribute_converters,
policy, issuer=_issuer)
|
请发表评论