def encrypt_using_xmlsec(xmlsec, data, template, epath=None, key=None,
key_file=None, key_file_type="pubkey-pem",
session_key=None):
"""encrypting a value using xmlsec.
:param xmlsec: Path to the xmlsec1 binary
:param data: A XML document from which the value should be picked.
:param template: The encyption part template
:param epath: Which value to encrypt, if not the whole document
should be encrypted.
:param key: The key to be used for the encrypting, either this or
:param key_file: The file where the key can be found
:param key_file_type: pubkey-pem, pubkey-der, pubkey-cert-pem,
pubkey-cert-der, privkey-der, privkey-pem, ...
:param session_key: Key algorithm
:return: The signed statement
"""
if not key_file and key:
_, key_file = make_temp("%s" % key, ".pem")
ntf = NamedTemporaryFile()
xpath = create_xpath(epath)
com_list = [xmlsec, "encrypt",
"--output", ntf.name,
"--xml-data", data,
'--node-xpath', xpath,
key_file_type, key_file
]
if session_key:
com_list.extend(["--session-key", session_key])
_, fil = make_temp("%s" % template, decode=False)
com_list.append(fil)
pof = Popen(com_list, stderr=PIPE, stdout=PIPE)
p_out = pof.stdout.read()
p_err = pof.stderr.read()
# this doesn't work if --store-signatures are used
if p_out == "":
ntf.seek(0)
encrypted_statement = ntf.read()
if not encrypted_statement:
logger.error(p_err)
raise Exception("Encryption failed")
else:
return encrypted_statement
else:
logger.error(LOG_LINE % (p_out, p_err))
raise Exception("Encryption failed")
def _encrypt_assertion(self, encrypt_cert, sp_entity_id, response, node_xpath=None):
""" Encryption of assertions.
:param encrypt_cert: Certificate to be used for encryption.
:param sp_entity_id: Entity ID for the calling service provider.
:param response: A samlp.Response
:param node_xpath: Unquie path to the element to be encrypted.
:return: A new samlp.Resonse with the designated assertion encrypted.
"""
_certs = []
cbxs = CryptoBackendXmlSec1(self.config.xmlsec_binary)
if encrypt_cert:
_certs = []
_certs.append(encrypt_cert)
elif sp_entity_id is not None:
_certs = self.metadata.certs(sp_entity_id, "any", "encryption")
exception = None
for _cert in _certs:
try:
begin_cert = "-----BEGIN CERTIFICATE-----\n"
end_cert = "\n-----END CERTIFICATE-----\n"
if begin_cert not in _cert:
_cert = "%s%s" % (begin_cert, _cert)
if end_cert not in _cert:
_cert = "%s%s" % (_cert, end_cert)
_, cert_file = make_temp(_cert.encode('ascii'), decode=False)
response = cbxs.encrypt_assertion(response, cert_file,
pre_encryption_part(), node_xpath=node_xpath)
return response
except Exception as ex:
exception = ex
pass
if exception:
raise exception
return response
def _response(self, in_response_to, consumer_url=None, status=None,
issuer=None, sign=False, to_sign=None,
encrypt_assertion=False, encrypt_cert=None, **kwargs):
""" Create a Response.
:param in_response_to: The session identifier of the request
:param consumer_url: The URL which should receive the response
:param status: The status of the response
:param issuer: The issuer of the response
:param sign: Whether the response should be signed or not
:param to_sign: If there are other parts to sign
:param kwargs: Extra key word arguments
:return: A Response instance
"""
if not status:
status = success_status_factory()
_issuer = self._issuer(issuer)
response = response_factory(issuer=_issuer,
in_response_to=in_response_to,
status=status)
if consumer_url:
response.destination = consumer_url
self._add_info(response, **kwargs)
if not sign and to_sign and not encrypt_assertion:
return signed_instance_factory(response, self.sec, to_sign)
if encrypt_assertion:
if sign:
response.signature = pre_signature_part(response.id,
self.sec.my_cert, 1)
cbxs = CryptoBackendXmlSec1(self.config.xmlsec_binary)
_, cert_file = make_temp("%s" % encrypt_cert, decode=False)
response = cbxs.encrypt_assertion(response, cert_file,
pre_encryption_part())
# template(response.assertion.id))
if sign:
if to_sign:
signed_instance_factory(response, self.sec, to_sign)
else:
# default is to sign the whole response if anything
sign_class = [(class_name(response), response.id)]
return signed_instance_factory(response, self.sec,
sign_class)
else:
return response
if sign:
return self.sign(response, to_sign=to_sign)
else:
return response
开发者ID:18600597055,项目名称:hue,代码行数:56,代码来源:entity.py
示例11: _parse_response
def _parse_response(self, xmlstr, response_cls, service, binding, outstanding_certs=None, **kwargs):
""" Deal with a Response
:param xmlstr: The response as a xml string
:param response_cls: What type of response it is
:param binding: What type of binding this message came through.
:param kwargs: Extra key word arguments
:return: None if the reply doesn't contain a valid SAML Response,
otherwise the response.
"""
response = None
if self.config.accepted_time_diff:
kwargs["timeslack"] = self.config.accepted_time_diff
if "asynchop" not in kwargs:
if binding in [BINDING_SOAP, BINDING_PAOS]:
kwargs["asynchop"] = False
else:
kwargs["asynchop"] = True
if xmlstr:
if "return_addrs" not in kwargs:
if binding in [BINDING_HTTP_REDIRECT, BINDING_HTTP_POST]:
try:
# expected return address
kwargs["return_addrs"] = self.config.endpoint(
service, binding=binding)
except Exception:
logger.info("Not supposed to handle this!")
return None
try:
response = response_cls(self.sec, **kwargs)
except Exception, exc:
logger.info("%s" % exc)
raise
xmlstr = self.unravel(xmlstr, binding, response_cls.msgtype)
origxml = xmlstr
if outstanding_certs is not None:
_response = samlp.any_response_from_string(xmlstr)
if len(_response.encrypted_assertion) > 0:
_, cert_file = make_temp("%s" % outstanding_certs[_response.in_response_to]["key"], decode=False)
cbxs = CryptoBackendXmlSec1(self.config.xmlsec_binary)
xmlstr = cbxs.decrypt(xmlstr, cert_file)
if not xmlstr: # Not a valid reponse
return None
try:
response = response.loads(xmlstr, False, origxml=origxml)
except SigverError, err:
logger.error("Signature Error: %s" % err)
raise
开发者ID:krgo,项目名称:pysaml2,代码行数:55,代码来源:entity.py
示例12: _certs
def _certs(self, key_descriptors, typ):
certs = {}
for key_desc in key_descriptors:
use = key_desc.use
for cert in cert_from_key_info(key_desc.key_info):
chash = signature("", [cert])
try:
cert = self._keys[chash]
except KeyError:
if typ == "pem":
cert = make_temp(pem_format(cert), ".pem", False)
elif typ == "der":
cert = make_temp(cert, suffix=".der")
self._keys[chash] = cert
try:
certs[use].append(chash)
except KeyError:
certs[use] = [chash]
return certs
def decrypt_message(enctext, xmlsec_binary, key_file=None,
key_file_type="privkey-pem", cafile=None,
epath=None, id_attr="",
node_name="", node_id=None, debug=False):
""" Decrypts an encrypted part of a XML document.
:param enctext: XML document containing an encrypted part
:param xmlsec_binary: The xmlsec1 binaries to be used
:param key_file: The key used to decrypt the message
:param key_file_type: The key file type
:param node_name: The SAML class of the root node in the message
:param node_id: The identifier of the root node if any
:param id_attr: Should normally be one of "id", "Id" or "ID"
:param debug: To debug or not
:return: The decrypted document if all was OK otherwise will raise an
exception.
"""
if not id_attr:
id_attr = ID_ATTR
_, fil = make_temp(enctext, decode=False)
com_list = [xmlsec_binary, "--decrypt",
"--%s" % key_file_type, key_file]
if key_file_type in ["privkey-pem", "privkey-der", "pkcs8-pem",
"pkcs8-der"]:
if isinstance(cafile, basestring):
com_list.append(cafile)
else:
com_list.extend(cafile)
if id_attr:
com_list.extend(["--id-attr:%s" % id_attr, node_name])
elif epath:
xpath = create_xpath(epath)
com_list.extend(['--node-xpath', xpath])
# if debug:
# com_list.append("--store-signatures")
if node_id:
com_list.extend(["--node-id", node_id])
com_list.append(fil)
if debug:
try:
print " ".join(com_list)
except TypeError:
print "key_file_type", key_file_type
print "key_file", key_file
print "node_name", node_name
print "fil", fil
raise
print "%s: %s" % (key_file, os.access(key_file, os.F_OK))
print "%s: %s" % (fil, os.access(fil, os.F_OK))
pof = Popen(com_list, stderr=PIPE, stdout=PIPE)
p_out = pof.stdout.read()
try:
p_err = pof.stderr.read()
if debug:
print p_err
verified = parse_xmlsec_output(p_err)
except XmlsecError, exc:
logger(LOG_LINE % (p_out, exc))
raise DecryptionError("%s" % (exc,))
开发者ID:GSA,项目名称:pysaml2,代码行数:70,代码来源:encdec.py
示例17: _response
def _response(self, in_response_to, consumer_url=None, status=None,
issuer=None, sign=False, to_sign=None,
encrypt_assertion=False, encrypt_assertion_self_contained=False, encrypted_advice_attributes=False,
encrypt_cert=None, **kwargs):
""" Create a Response.
Encryption:
encrypt_assertion must be true for encryption to be performed. If encrypted_advice_attributes also is
true, then will the function try to encrypt the assertion in the the advice element of the main
assertion. Only one assertion element is allowed in the advice element, if multiple assertions exists
in the advice element the main assertion will be encrypted instead, since it's no point to encrypt
If encrypted_advice_attributes is
false the main assertion will be encrypted. Since the same key
:param in_response_to: The session identifier of the request
:param consumer_url: The URL which should receive the response
:param status: The status of the response
:param issuer: The issuer of the response
:param sign: Whether the response should be signed or not
:param to_sign: If there are other parts to sign
:param kwargs: Extra key word arguments
:return: A Response instance
"""
if not status:
status = success_status_factory()
_issuer = self._issuer(issuer)
response = response_factory(issuer=_issuer,
in_response_to=in_response_to,
status=status)
if consumer_url:
response.destination = consumer_url
self._add_info(response, **kwargs)
if not sign and to_sign and not encrypt_assertion:
return signed_instance_factory(response, self.sec, to_sign)
if encrypt_assertion:
node_xpath = None
if sign:
response.signature = pre_signature_part(response.id,
self.sec.my_cert, 1)
sign_class = [(class_name(response), response.id)]
cbxs = CryptoBackendXmlSec1(self.config.xmlsec_binary)
if encrypted_advice_attributes and response.assertion.advice is not None \
and len(response.assertion.advice.assertion) == 1:
tmp_assertion = response.assertion.advice.assertion[0]
response.assertion.advice.encrypted_assertion = []
response.assertion.advice.encrypted_assertion.append(EncryptedAssertion())
if isinstance(tmp_assertion, list):
response.assertion.advice.encrypted_assertion[0].add_extension_elements(tmp_assertion)
else:
response.assertion.advice.encrypted_assertion[0].add_extension_element(tmp_assertion)
response.assertion.advice.assertion = []
if encrypt_assertion_self_contained:
advice_tag = response.assertion.advice._to_element_tree().tag
assertion_tag = tmp_assertion._to_element_tree().tag
response = response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion(
assertion_tag, advice_tag)
node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in
["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]])
elif encrypt_assertion_self_contained:
assertion_tag = response.assertion._to_element_tree().tag
response = pre_encrypt_assertion(response)
response = response.get_xml_string_with_self_contained_assertion_within_encrypted_assertion(
assertion_tag)
else:
response = pre_encrypt_assertion(response)
if to_sign:
response = signed_instance_factory(response, self.sec, to_sign)
_, cert_file = make_temp("%s" % encrypt_cert, decode=False)
response = cbxs.encrypt_assertion(response, cert_file,
pre_encryption_part(), node_xpath=node_xpath)
# template(response.assertion.id))
if sign:
return signed_instance_factory(response, self.sec, sign_class)
else:
return response
if sign:
return self.sign(response, to_sign=to_sign)
else:
return response
开发者ID:Itxaka,项目名称:pysaml2,代码行数:86,代码来源:entity.py
示例18: test_encrypted_signed_response_4
def test_encrypted_signed_response_4(self):
name_id = self.server.ident.transient_nameid(
"urn:mace:example.com:saml:roland:sp", "id12")
ava = {"givenName": ["Derek"], "surName": ["Jeter"],
"mail": ["[email protected]"], "title": "The man"}
cert_str, cert_key_str = generate_cert()
signed_resp = self.server.create_authn_response(
ava,
"id12", # in_response_to
"http://lingon.catalogix.se:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
name_id=name_id,
sign_response=True,
sign_assertion=True,
encrypt_assertion=True,
encrypted_advice_attributes=True,
encrypt_cert=cert_str,
)
sresponse = response_from_string(signed_resp)
valid = self.server.sec.verify_signature(signed_resp,
self.server.config.cert_file,
node_name='urn:oasis:names:tc:SAML:2.0:protocol:Response',
node_id=sresponse.id,
id_attr="")
assert valid
_, key_file = make_temp("%s" % cert_key_str, decode=False)
decr_text = self.server.sec.decrypt(signed_resp, key_file)
resp = samlp.response_from_string(decr_text)
valid = self.server.sec.verify_signature(decr_text,
self.server.config.cert_file,
node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
node_id=resp.assertion[0].id,
id_attr="")
assert valid
assert resp.assertion[0].advice.encrypted_assertion[0].extension_elements
assertion = extension_elements_to_elements(resp.assertion[0].advice.encrypted_assertion[0].extension_elements,
[saml, samlp])
assert assertion
assert assertion[0].attribute_statement
ava = ava = get_ava(assertion[0])
assert ava ==\
{'mail': ['[email protected]'], 'givenname': ['Derek'], 'surname': ['Jeter'], 'title': ['The man']}
#Should work, but I suspect that xmlsec manipulates the xml to much while encrypting that the signature
#is no longer working. :(
assert 'xmlns:encas' not in decr_text
valid = self.server.sec.verify_signature(decr_text,
self.server.config.cert_file,
node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
node_id=assertion[0].id,
id_attr="")
assert valid
请发表评论