本文整理汇总了Python中saml2.mdstore.destinations函数的典型用法代码示例。如果您正苦于以下问题:Python destinations函数的具体用法?Python destinations怎么用?Python destinations使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了destinations函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_incommon_1
def test_incommon_1():
mds = MetadataStore(ONTS.values(), ATTRCONV, xmlsec_path,
disable_ssl_certificate_validation=True)
mds.imp(METADATACONF["2"])
print mds.entities()
assert mds.entities() == 169
idps = mds.with_descriptor("idpsso")
print idps.keys()
assert len(idps) == 53 # !!!!???? < 10%
assert mds.single_sign_on_service('urn:mace:incommon:uiuc.edu') == []
idpsso = mds.single_sign_on_service('urn:mace:incommon:alaska.edu')
assert len(idpsso) == 1
print idpsso
assert destinations(idpsso) == ['https://idp.alaska.edu/idp/profile/SAML2/Redirect/SSO']
sps = mds.with_descriptor("spsso")
acs_sp = []
for nam, desc in sps.items():
if "attribute_consuming_service" in desc:
acs_sp.append(nam)
assert len(acs_sp) == 0
# Look for attribute authorities
aas = mds.with_descriptor("attribute_authority")
print aas.keys()
assert len(aas) == 53
开发者ID:GSA,项目名称:pysaml2,代码行数:31,代码来源:test_30_mdstore.py
示例2: test_incommon_1
def test_incommon_1():
mds = MetadataStore(list(ONTS.values()), ATTRCONV, sec_config, disable_ssl_certificate_validation=True)
mds.imp(METADATACONF["2"])
print((mds.entities()))
assert mds.entities() > 1700
idps = mds.with_descriptor("idpsso")
print((list(idps.keys())))
assert len(idps) > 300 # ~ 18%
try:
_ = mds.single_sign_on_service("urn:mace:incommon:uiuc.edu")
except UnknownPrincipal:
pass
idpsso = mds.single_sign_on_service("urn:mace:incommon:alaska.edu")
assert len(idpsso) == 1
print(idpsso)
assert destinations(idpsso) == ["https://idp.alaska.edu/idp/profile/SAML2/Redirect/SSO"]
sps = mds.with_descriptor("spsso")
acs_sp = []
for nam, desc in list(sps.items()):
if "attribute_consuming_service" in desc:
acs_sp.append(nam)
assert len(acs_sp) == 0
# Look for attribute authorities
aas = mds.with_descriptor("attribute_authority")
print((list(aas.keys())))
assert len(aas) == 180
开发者ID:rohe,项目名称:pysaml2-3,代码行数:34,代码来源:test_30_mdstore.py
示例3: pick_binding
def pick_binding(self, service, bindings=None, descr_type="", request=None,
entity_id=""):
if request and not entity_id:
entity_id = request.issuer.text.strip()
sfunc = getattr(self.metadata, service)
if bindings is None:
bindings = self.config.preferred_binding[service]
if not descr_type:
if self.entity_type == "sp":
descr_type = "idpsso"
else:
descr_type = "spsso"
for binding in bindings:
try:
srvs = sfunc(entity_id, binding, descr_type)
if srvs:
return binding, destinations(srvs)[0]
except UnsupportedBinding:
pass
logger.error("Failed to find consumer URL: %s, %s, %s" % (entity_id,
bindings,
descr_type))
#logger.error("Bindings: %s" % bindings)
#logger.error("Entities: %s" % self.metadata)
raise SAMLError("Unkown entity or unsupported bindings")
开发者ID:gbel,项目名称:pysaml2,代码行数:31,代码来源:entity.py
示例4: slo
def slo(self, request):
"""
generate a SAML2 logout request; reset session; return IDP URL
"""
session = request.SESSION
session.set(self.session_auth_key, False)
del session[self.session_user_properties]
config = self._saml2_config()
scl = Saml2Client(config)
samluid = session.get(self.session_samluid_key, "")
entityid = config.metadata.keys()[0]
sp_url = self.saml2_sp_url
actual_url = request.get("ACTUAL_URL", "")
if not actual_url.startswith(sp_url):
# the request was made from within a context we cannot handle
return None
session.set(self.session_storedurl_key, request.URL1)
# we cannot simply call global_logout on the client since it doesn't know about our user...
srvs = scl.metadata.single_logout_service(entityid, BINDING_HTTP_REDIRECT, "idpsso")
destination = destinations(srvs)[0]
samlrequest = scl.create_logout_request(destination, entityid, name_id=saml.NameID(text=samluid))
samlrequest.session_index = samlp.SessionIndex(session.get(self.session_samlsessionindex_key))
to_sign = []
samlrequest = signed_instance_factory(samlrequest, scl.sec, to_sign)
logger.info("SSO logout request: %s" % samlrequest.to_string())
session_id = samlrequest.id
rstate = scl._relay_state(session_id)
msg = http_redirect_message(samlrequest, destination, rstate)
headers = dict(msg["headers"])
location = headers["Location"]
logger.info("attempting to post: {loc}".format(loc=headers["Location"]))
return location
开发者ID:Haufe-Lexware,项目名称:hl.pas.samlplugin,代码行数:33,代码来源:plugin.py
示例5: test_swami_1
def test_swami_1():
UMU_IDP = 'https://idp.umu.se/saml2/idp/metadata.php'
mds = MetadataStore(ONTS.values(), ATTRCONV, sec_config,
disable_ssl_certificate_validation=True)
mds.imp(METADATACONF["1"])
assert len(mds) == 1 # One source
idps = mds.with_descriptor("idpsso")
assert idps.keys()
idpsso = mds.single_sign_on_service(UMU_IDP)
assert len(idpsso) == 1
assert destinations(idpsso) == [
'https://idp.umu.se/saml2/idp/SSOService.php']
_name = name(mds[UMU_IDP])
assert _name == u'Umeå University (SAML2)'
certs = mds.certs(UMU_IDP, "idpsso", "signing")
assert len(certs) == 1
sps = mds.with_descriptor("spsso")
assert len(sps) == 108
wants = mds.attribute_requirement('https://connect8.sunet.se/shibboleth')
lnamn = [d_to_local_name(mds.attrc, attr) for attr in wants["optional"]]
assert _eq(lnamn, ['eduPersonPrincipalName', 'mail', 'givenName', 'sn',
'eduPersonScopedAffiliation'])
wants = mds.attribute_requirement('https://beta.lobber.se/shibboleth')
assert wants["required"] == []
lnamn = [d_to_local_name(mds.attrc, attr) for attr in wants["optional"]]
assert _eq(lnamn, ['eduPersonPrincipalName', 'mail', 'givenName', 'sn',
'eduPersonScopedAffiliation', 'eduPersonEntitlement'])
开发者ID:HaToHo,项目名称:pysaml2,代码行数:32,代码来源:test_30_mdstore.py
示例6: do_authz_decision_query
def do_authz_decision_query(
self,
entity_id,
action,
subject_id,
nameid_format,
evidence=None,
resource=None,
sp_name_qualifier=None,
name_qualifier=None,
consent=None,
extensions=None,
sign=False,
):
subject = saml.Subject(
name_id=saml.NameID(
text=subject_id,
format=nameid_format,
sp_name_qualifier=sp_name_qualifier,
name_qualifier=name_qualifier,
)
)
srvs = self.metadata.authz_service(entity_id, BINDING_SOAP)
for dest in destinations(srvs):
resp = self._use_soap(
dest, "authz_decision_query", action=action, evidence=evidence, resource=resource, subject=subject
)
if resp:
return resp
return None
开发者ID:peopledoc,项目名称:pysaml2,代码行数:33,代码来源:client.py
示例7: test_switch_1
def test_switch_1():
mds = MetadataStore(ONTS.values(), ATTRCONV, sec_config,
disable_ssl_certificate_validation=True)
mds.imp(METADATACONF["5"])
assert len(mds.keys()) > 160
idps = mds.with_descriptor("idpsso")
print(idps.keys())
idpsso = mds.single_sign_on_service(
'https://aai-demo-idp.switch.ch/idp/shibboleth')
assert len(idpsso) == 1
print(idpsso)
assert destinations(idpsso) == [
'https://aai-demo-idp.switch.ch/idp/profile/SAML2/Redirect/SSO']
assert len(idps) > 30
aas = mds.with_descriptor("attribute_authority")
print(aas.keys())
aad = aas['https://aai-demo-idp.switch.ch/idp/shibboleth']
print(aad.keys())
assert len(aad["attribute_authority_descriptor"]) == 1
assert len(aad["idpsso_descriptor"]) == 1
sps = mds.with_descriptor("spsso")
dual = [eid for eid, ent in idps.items() if eid in sps]
print(len(dual))
assert len(dual) == 0
开发者ID:HaToHo,项目名称:pysaml2,代码行数:26,代码来源:test_30_mdstore.py
示例8: do_authn_query
def do_authn_query(self, entity_id, consent=None, extensions=None, sign=False):
srvs = self.metadata.authn_request_service(entity_id, BINDING_SOAP)
for destination in destinations(srvs):
resp = self._use_soap(destination, "authn_query", consent=consent, extensions=extensions, sign=sign)
if resp:
return resp
return None
开发者ID:peopledoc,项目名称:pysaml2,代码行数:10,代码来源:client.py
示例9: pick_binding
def pick_binding(self, service, bindings=None, descr_type="", request=None,
entity_id=""):
if request and not entity_id:
entity_id = request.issuer.text.strip()
sfunc = getattr(self.metadata, service)
if bindings is None:
if request and request.protocol_binding:
bindings = [request.protocol_binding]
else:
bindings = self.config.preferred_binding[service]
if not descr_type:
if self.entity_type == "sp":
descr_type = "idpsso"
else:
descr_type = "spsso"
_url = _index = None
if request:
try:
_url = getattr(request, "%s_url" % service)
except AttributeError:
_url = None
try:
_index = getattr(request, "%s_index" % service)
except AttributeError:
pass
for binding in bindings:
try:
srvs = sfunc(entity_id, binding, descr_type)
if srvs:
if _url:
for srv in srvs:
if srv["location"] == _url:
return binding, _url
elif _index:
for srv in srvs:
if srv["index"] == _index:
return binding, srv["location"]
else:
return binding, destinations(srvs)[0]
except UnsupportedBinding:
pass
logger.error("Failed to find consumer URL: %s, %s, %s" % (entity_id,
bindings,
descr_type))
#logger.error("Bindings: %s" % bindings)
#logger.error("Entities: %s" % self.metadata)
raise SAMLError("Unknown entity or unsupported bindings")
开发者ID:WiserTogether,项目名称:pysaml2,代码行数:54,代码来源:entity.py
示例10: test_metadata
def test_metadata():
conf = config.Config()
conf.load_file("idp_conf_mdb")
umu_idp = 'https://idp.umu.se/saml2/idp/metadata.php'
# Set up a Metadata store
mds = MetadataStore(ATTRCONV, conf,
disable_ssl_certificate_validation=True)
# Import metadata from local file.
mds.imp([{"class": "saml2.mdstore.MetaDataFile",
"metadata": [(full_path("swamid-2.0.xml"), )]}])
assert len(mds) == 1 # One source
try:
export_mdstore_to_mongo_db(mds, "metadata", "test")
except ConnectionFailure:
pass
else:
mdmdb = MetadataMDB(ATTRCONV, "metadata", "test")
# replace all metadata instances with this one
mds.metadata = {"mongo_db": mdmdb}
idps = mds.with_descriptor("idpsso")
assert idps.keys()
idpsso = mds.single_sign_on_service(umu_idp)
assert len(idpsso) == 1
assert destinations(idpsso) == [
'https://idp.umu.se/saml2/idp/SSOService.php']
_name = name(mds[umu_idp])
assert _name == u'Ume\xe5 University'
certs = mds.certs(umu_idp, "idpsso", "signing")
assert len(certs) == 1
sps = mds.with_descriptor("spsso")
assert len(sps) == 417
wants = mds.attribute_requirement('https://connect.sunet.se/shibboleth')
assert wants["optional"] == []
lnamn = [d_to_local_name(mds.attrc, attr) for attr in wants["required"]]
assert _eq(lnamn,
['eduPersonPrincipalName', 'mail', 'givenName', 'sn',
'eduPersonScopedAffiliation', 'eduPersonAffiliation'])
wants = mds.attribute_requirement(
"https://gidp.geant.net/sp/module.php/saml/sp/metadata.php/default-sp")
# Optional
lnamn = [d_to_local_name(mds.attrc, attr) for attr in wants["optional"]]
assert _eq(lnamn, ['displayName', 'commonName', 'schacHomeOrganization',
'eduPersonAffiliation', 'schacHomeOrganizationType'])
# Required
lnamn = [d_to_local_name(mds.attrc, attr) for attr in wants["required"]]
assert _eq(lnamn, ['eduPersonTargetedID', 'mail',
'eduPersonScopedAffiliation'])
开发者ID:Amli,项目名称:pysaml2,代码行数:54,代码来源:test_76_metadata_in_mdb.py
示例11: _sso_location
def _sso_location(self, entityid=None, binding=BINDING_HTTP_REDIRECT):
if entityid:
# verify that it's in the metadata
srvs = self.metadata.single_sign_on_service(entityid, binding)
if srvs:
return destinations(srvs)[0]
else:
logger.info("_sso_location: %s, %s" % (entityid, binding))
raise IdpUnspecified("No IdP to send to given the premises")
# get the idp location from the metadata. If there is more than one
# IdP in the configuration raise exception
eids = self.metadata.with_descriptor("idpsso")
if len(eids) > 1:
raise IdpUnspecified("Too many IdPs to choose from: %s" % eids)
try:
srvs = self.metadata.single_sign_on_service(next(iter(eids)), binding)
return destinations(srvs)[0]
except IndexError:
raise IdpUnspecified("No IdP to send to given the premises")
开发者ID:lvanderree,项目名称:pysaml2-3,代码行数:21,代码来源:client_base.py
示例12: do_attribute_query
def do_attribute_query(self, entityid, subject_id,
attribute=None, sp_name_qualifier=None,
name_qualifier=None, nameid_format=None,
real_id=None, consent=None, extensions=None,
sign=False, binding=BINDING_SOAP):
""" Does a attribute request to an attribute authority, this is
by default done over SOAP.
:param entityid: To whom the query should be sent
:param subject_id: The identifier of the subject
:param attribute: A dictionary of attributes and values that is asked for
:param sp_name_qualifier: The unique identifier of the
service provider or affiliation of providers for whom the
identifier was generated.
:param name_qualifier: The unique identifier of the identity
provider that generated the identifier.
:param nameid_format: The format of the name ID
:param real_id: The identifier which is the key to this entity in the
identity database
:param binding: Which binding to use
:return: The attributes returned
"""
srvs = self.metadata.attribute_service(entityid, binding)
if srvs == []:
raise Exception("No attribute service support at entity")
destination = destinations(srvs)[0]
if real_id:
response_args = {"real_id": real_id}
else:
response_args = {}
if binding == BINDING_SOAP:
return self.use_soap(destination, "attribute_query", consent=consent,
extensions=extensions, sign=sign,
subject_id=subject_id, attribute=attribute,
sp_name_qualifier=sp_name_qualifier,
name_qualifier=name_qualifier,
nameid_format=nameid_format,
response_args=response_args)
elif binding == BINDING_HTTP_POST:
return self.use_soap(destination, "attribute_query", consent=consent,
extensions=extensions, sign=sign,
subject_id=subject_id, attribute=attribute,
sp_name_qualifier=sp_name_qualifier,
name_qualifier=name_qualifier,
nameid_format=nameid_format,
response_args=response_args)
else:
raise Exception("Unsupported binding")
开发者ID:GSA,项目名称:pysaml2,代码行数:52,代码来源:client.py
示例13: test_metadata
def test_metadata():
conf = config.Config()
conf.load_file("idp_conf_mdb")
UMU_IDP = "https://idp.umu.se/saml2/idp/metadata.php"
# Set up a Metadata store
mds = MetadataStore(list(ONTS.values()), ATTRCONV, conf, disable_ssl_certificate_validation=True)
# Import metadata from local file.
mds.imp({"local": [full_path("swamid-2.0.xml")]})
assert len(mds) == 1 # One source
export_mdstore_to_mongo_db(mds, "metadata", "test")
mdmdb = MetadataMDB(ONTS, ATTRCONV, "metadata", "test")
# replace all metadata instances with this one
mds.metadata = {"mongo_db": mdmdb}
idps = mds.with_descriptor("idpsso")
assert list(idps.keys())
idpsso = mds.single_sign_on_service(UMU_IDP)
assert len(idpsso) == 1
assert destinations(idpsso) == ["https://idp.umu.se/saml2/idp/SSOService.php"]
_name = name(mds[UMU_IDP])
assert _name == "Ume\xe5 University"
certs = mds.certs(UMU_IDP, "idpsso", "signing")
assert len(certs) == 1
sps = mds.with_descriptor("spsso")
assert len(sps) == 431
wants = mds.attribute_requirement("https://connect.sunet.se/shibboleth")
assert wants["optional"] == []
lnamn = [d_to_local_name(mds.attrc, attr) for attr in wants["required"]]
assert _eq(
lnamn,
["eduPersonPrincipalName", "mail", "givenName", "sn", "eduPersonScopedAffiliation", "eduPersonAffiliation"],
)
wants = mds.attribute_requirement("https://gidp.geant.net/sp/module.php/saml/sp/metadata.php/default-sp")
# Optional
lnamn = [d_to_local_name(mds.attrc, attr) for attr in wants["optional"]]
assert _eq(
lnamn,
["displayName", "commonName", "schacHomeOrganization", "eduPersonAffiliation", "schacHomeOrganizationType"],
)
# Required
lnamn = [d_to_local_name(mds.attrc, attr) for attr in wants["required"]]
assert _eq(lnamn, ["eduPersonTargetedID", "mail", "eduPersonScopedAffiliation"])
开发者ID:rohe,项目名称:pysaml2-3,代码行数:49,代码来源:test_76_metadata_in_mdb.py
示例14: create_logout_response
def create_logout_response(self, request, binding, status=None,
sign=False, issuer=None):
""" Create a LogoutResponse. What is returned depends on which binding
is used.
:param request: The request this is a response to
:param binding: Which binding the request came in over
:param status: The return status of the response operation
:param issuer: The issuer of the message
:return: A logout message.
"""
mid = sid()
if not status:
status = success_status_factory()
# response and packaging differs depending on binding
response = ""
if binding in [BINDING_SOAP, BINDING_HTTP_POST]:
response = logoutresponse_factory(sign=sign, id = mid,
in_response_to = request.id,
status = status)
elif binding == BINDING_HTTP_REDIRECT:
sp_entity_id = request.issuer.text.strip()
srvs = self.metadata.single_logout_service(sp_entity_id, "spsso")
if not srvs:
raise Exception("Nowhere to send the response")
destination = destinations(srvs)[0]
_issuer = self.issuer(issuer)
response = logoutresponse_factory(sign=sign, id = mid,
in_response_to = request.id,
status = status,
issuer = _issuer,
destination = destination,
sp_entity_id = sp_entity_id,
instant=instant())
if sign:
to_sign = [(class_name(response), mid)]
response = signed_instance_factory(response, self.sec, to_sign)
logger.info("Response: %s" % (response,))
return response
开发者ID:GSA,项目名称:pysaml2,代码行数:45,代码来源:server.py
示例15: test_metadata
def test_metadata():
conf = config.Config()
conf.load_file("idp_conf_mdb")
UMU_IDP = 'https://idp.umu.se/saml2/idp/metadata.php'
mds = MetadataStore(ONTS.values(), ATTRCONV, conf,
disable_ssl_certificate_validation=True)
mds.imp({"local": [full_path("swamid-1.0.xml")]})
assert len(mds) == 1 # One source
export_mdstore_to_mongo_db(mds, "metadata", "test")
mdmdb = MetadataMDB(ONTS, ATTRCONV, "metadata", "test")
# replace all metadata instances with this one
mds.metadata = {"mongo_db": mdmdb}
idps = mds.with_descriptor("idpsso")
assert idps.keys()
idpsso = mds.single_sign_on_service(UMU_IDP)
assert len(idpsso) == 1
assert destinations(idpsso) == [
'https://idp.umu.se/saml2/idp/SSOService.php']
_name = name(mds[UMU_IDP])
assert _name == u'Umeå University (SAML2)'
certs = mds.certs(UMU_IDP, "idpsso", "signing")
assert len(certs) == 1
sps = mds.with_descriptor("spsso")
assert len(sps) == 108
wants = mds.attribute_requirement('https://connect8.sunet.se/shibboleth')
lnamn = [d_to_local_name(mds.attrc, attr) for attr in wants["optional"]]
assert _eq(lnamn, ['eduPersonPrincipalName', 'mail', 'givenName', 'sn',
'eduPersonScopedAffiliation'])
wants = mds.attribute_requirement('https://beta.lobber.se/shibboleth')
assert wants["required"] == []
lnamn = [d_to_local_name(mds.attrc, attr) for attr in wants["optional"]]
assert _eq(lnamn, ['eduPersonPrincipalName', 'mail', 'givenName', 'sn',
'eduPersonScopedAffiliation', 'eduPersonEntitlement'])
开发者ID:hufman,项目名称:pysaml2,代码行数:41,代码来源:test_76_metadata_in_mdb.py
示例16: do_assertion_id_request
def do_assertion_id_request(self, assertion_ids, entity_id,
consent=None, extensions=None, sign=False):
srvs = self.metadata.assertion_id_request_service(entity_id,
BINDING_SOAP)
if not srvs:
raise NoServiceDefined("%s: %s" % (entity_id,
"assertion_id_request_service"))
if isinstance(assertion_ids, basestring):
assertion_ids = [assertion_ids]
_id_refs = [AssertionIDRef(_id) for _id in assertion_ids]
for destination in destinations(srvs):
res = self._use_soap(destination, "assertion_id_request",
assertion_id_refs=_id_refs, consent=consent,
extensions=extensions, sign=sign)
if res:
return res
return None
开发者ID:mlepine,项目名称:pysaml2,代码行数:22,代码来源:client.py
示例17: create_logout_response
def create_logout_response(self, idp_entity_id, request_id,
status_code,
binding=BINDING_HTTP_REDIRECT):
""" Constructs a LogoutResponse
:param idp_entity_id: The entityid of the IdP that want to do the
logout
:param request_id: The Id of the request we are replying to
:param status_code: The status code of the response
:param binding: The type of binding that will be used for the response
:return: A LogoutResponse instance
"""
srvs = self.metadata.single_logout_services(idp_entity_id, "idpsso",
binding=binding)
destination = destinations(srvs)[0]
status = samlp.Status(
status_code=samlp.StatusCode(value=status_code))
return destination, self._message(LogoutResponse, destination,
in_response_to=request_id,
status=status)
开发者ID:GSA,项目名称:pysaml2,代码行数:23,代码来源:client_base.py
示例18: make_logout_response
def make_logout_response(self, idp_entity_id, request_id,
status_code, binding=BINDING_HTTP_REDIRECT):
"""
XXX There were issues with an explicit closing tag on
StatusCode. Check wether we still need this. XXX
Constructs a LogoutResponse
:param idp_entity_id: The entityid of the IdP that want to do the
logout
:param request_id: The Id of the request we are replying to
:param status_code: The status code of the response
:param binding: The type of binding that will be used for the response
:return: A LogoutResponse instance
"""
srvs = self.metadata.single_logout_service(idp_entity_id, binding, "idpsso")
destination = destinations(srvs)[0]
logger.info("destination to provider: %s" % destination)
status = samlp.Status(
status_code=samlp.StatusCode(value=status_code, text='\n'),
status_message=samlp.StatusMessage(text='logout success')
)
response = samlp.LogoutResponse(
id=sid(),
version=VERSION,
issue_instant=instant(),
destination=destination,
issuer=saml.Issuer(text=self.config.entityid,
format=saml.NAMEID_FORMAT_ENTITY),
in_response_to=request_id,
status=status,
)
return response, destination
开发者ID:Hackman238,项目名称:hl.pas.samlplugin,代码行数:36,代码来源:client.py
示例19: do_logout
def do_logout(self, subject_id, entity_ids, reason, expire, sign=None):
"""
:param subject_id: Identifier of the Subject
:param entity_ids: List of entity ids for the IdPs that have provided
information concerning the subject
:param reason: The reason for doing the logout
:param expire: Try to logout before this time.
:param sign: Whether to sign the request or not
:return:
"""
# check time
if not not_on_or_after(expire): # I've run out of time
# Do the local logout anyway
self.local_logout(subject_id)
return 0, "504 Gateway Timeout", [], []
# for all where I can use the SOAP binding, do those first
not_done = entity_ids[:]
responses = {}
for entity_id in entity_ids:
response = False
for binding in [#BINDING_SOAP,
BINDING_HTTP_POST,
BINDING_HTTP_REDIRECT]:
srvs = self.metadata.single_logout_service(entity_id, "idpsso",
binding=binding)
if not srvs:
continue
destination = destinations(srvs)[0]
logger.info("destination to provider: %s" % destination)
request = self.create_logout_request(destination, entity_id,
subject_id, reason=reason,
expire=expire)
to_sign = []
if binding.startswith("http://"):
sign = True
if sign is None:
sign = self.logout_requests_signed_default
if sign:
request.signature = pre_signature_part(request.id,
self.sec.my_cert, 1)
to_sign = [(class_name(request), request.id)]
logger.info("REQUEST: %s" % request)
srequest = signed_instance_factory(request, self.sec, to_sign)
if binding == BINDING_SOAP:
response = self.send_using_soap(srequest, destination)
if response:
logger.info("Verifying response")
response = self.logout_request_response(response)
if response:
not_done.remove(entity_id)
logger.info("OK response from %s" % destination)
responses[entity_id] = logout_response_from_string(response)
else:
logger.info("NOT OK response from %s" % destination)
else:
session_id = request.id
rstate = self._relay_state(session_id)
self.state[session_id] = {"entity_id": entity_id,
"operation": "SLO",
"entity_ids": entity_ids,
"subject_id": subject_id,
"reason": reason,
"not_on_of_after": expire,
"sign": sign}
if binding == BINDING_HTTP_POST:
response = self.use_http_form_post(srequest,
destination,
rstate)
else:
response = self.use_http_get(srequest, destination,
rstate)
responses[entity_id] = response
not_done.remove(entity_id)
# only try one binding
break
if not_done:
# upstream should try later
raise LogoutError("%s" % (entity_ids,))
return responses
开发者ID:GSA,项目名称:pysaml2,代码行数:100,代码来源:client.py
示例20: do_attribute_query
def do_attribute_query(self, entityid, subject_id,
attribute=None, sp_name_qualifier=None,
name_qualifier=None, nameid_format=None,
real_id=None, consent=None, extensions=None,
sign=False, binding=BINDING_SOAP):
""" Does a attribute request to an attribute authority, this is
by default done over SOAP.
:param entityid: To whom the query should be sent
:param subject_id: The identifier of the subject
:param attribute: A dictionary of attributes and values that is
asked for
:param sp_name_qualifier: The unique identifier of the
service provider or affiliation of providers for whom the
identifier was generated.
:param name_qualifier: The unique identifier of the identity
provider that generated the identifier.
:param nameid_format: The format of the name ID
:param real_id: The identifier which is the key to this entity in the
identity database
:param binding: Which binding to use
:return: The attributes returned if BINDING_SOAP was used.
HTTP args if BINDING_HTT_POST was used.
"""
if real_id:
response_args = {"real_id": real_id}
else:
response_args = {}
if not binding:
binding, destination = self.pick_binding("attribute_service",
None,
"attribute_authority",
entity_id=entityid)
else:
srvs = self.metadata.attribute_service(entityid, binding)
if srvs is []:
raise Exception("No attribute service support at entity")
destination = destinations(srvs)[0]
if binding == BINDING_SOAP:
return self._use_soap(destination, "attribute_query",
consent=consent, extensions=extensions,
sign=sign, subject_id=subject_id,
attribute=attribute,
sp_name_qualifier=sp_name_qualifier,
name_qualifier=name_qualifier,
nameid_format=nameid_format,
response_args=response_args)
elif binding == BINDING_HTTP_POST:
mid = sid()
query = self.create_attribute_query(destination, subject_id,
attribute, mid, consent,
extensions, sign)
self.state[query.id] = {"entity_id": entityid,
"operation": "AttributeQuery",
"subject_id": subject_id,
"sign": sign}
relay_state = self._relay_state(query.id)
return self.apply_binding(binding, "%s" % query, destination,
relay_state)
else:
raise Exception("Unsupported binding")
开发者ID:mlepine,项目名称:pysaml2,代码行数:65,代码来源:client.py
注:本文中的saml2.mdstore.destinations函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论