本文整理汇总了Python中unpaddedbase64.encode_base64函数的典型用法代码示例。如果您正苦于以下问题:Python encode_base64函数的具体用法?Python encode_base64怎么用?Python encode_base64使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了encode_base64函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: response_json_object
def response_json_object(self):
verify_keys = {}
for key in self.config.signing_key:
verify_key_bytes = key.verify_key.encode()
key_id = "%s:%s" % (key.alg, key.version)
verify_keys[key_id] = {
u"key": encode_base64(verify_key_bytes)
}
old_verify_keys = {}
for key_id, key in self.config.old_signing_keys.items():
verify_key_bytes = key.encode()
old_verify_keys[key_id] = {
u"key": encode_base64(verify_key_bytes),
u"expired_ts": key.expired_ts,
}
tls_fingerprints = self.config.tls_fingerprints
json_object = {
u"valid_until_ts": self.valid_until_ts,
u"server_name": self.config.server_name,
u"verify_keys": verify_keys,
u"old_verify_keys": old_verify_keys,
u"tls_fingerprints": tls_fingerprints,
}
for key in self.config.signing_key:
json_object = sign_json(
json_object,
self.config.server_name,
key,
)
return json_object
开发者ID:rubo77,项目名称:synapse,代码行数:33,代码来源:local_key_resource.py
示例2: sign_json
def sign_json(json_object, signature_name, signing_key):
"""Sign the JSON object. Stores the signature in json_object["signatures"].
Args:
json_object (dict): The JSON object to sign.
signature_name (str): The name of the signing entity.
signing_key (syutil.crypto.SigningKey): The key to sign the JSON with.
Returns:
The modified, signed JSON object."""
signatures = json_object.pop("signatures", {})
unsigned = json_object.pop("unsigned", None)
message_bytes = encode_canonical_json(json_object)
signed = signing_key.sign(message_bytes)
signature_base64 = encode_base64(signed.signature)
key_id = "%s:%s" % (signing_key.alg, signing_key.version)
signatures.setdefault(signature_name, {})[key_id] = signature_base64
# logger.debug("SIGNING: %s %s %s", signature_name, key_id, message_bytes)
json_object["signatures"] = signatures
if unsigned is not None:
json_object["unsigned"] = unsigned
return json_object
开发者ID:matrix-org,项目名称:python-signedjson,代码行数:28,代码来源:sign.py
示例3: check_event_content_hash
def check_event_content_hash(event, hash_algorithm=hashlib.sha256):
"""Check whether the hash for this PDU matches the contents"""
name, expected_hash = compute_content_hash(event.get_pdu_json(), hash_algorithm)
logger.debug("Expecting hash: %s", encode_base64(expected_hash))
# some malformed events lack a 'hashes'. Protect against it being missing
# or a weird type by basically treating it the same as an unhashed event.
hashes = event.get("hashes")
if not isinstance(hashes, dict):
raise SynapseError(400, "Malformed 'hashes'", Codes.UNAUTHORIZED)
if name not in hashes:
raise SynapseError(
400,
"Algorithm %s not in hashes %s" % (
name, list(hashes),
),
Codes.UNAUTHORIZED,
)
message_hash_base64 = hashes[name]
try:
message_hash_bytes = decode_base64(message_hash_base64)
except Exception:
raise SynapseError(
400,
"Invalid base64: %s" % (message_hash_base64,),
Codes.UNAUTHORIZED,
)
return message_hash_bytes == expected_hash
开发者ID:matrix-org,项目名称:synapse,代码行数:29,代码来源:event_signing.py
示例4: read_certificate_from_disk
def read_certificate_from_disk(self, require_cert_and_key):
"""
Read the certificates and private key from disk.
Args:
require_cert_and_key (bool): set to True to throw an error if the certificate
and key file are not given
"""
if require_cert_and_key:
self.tls_private_key = self.read_tls_private_key()
self.tls_certificate = self.read_tls_certificate()
elif self.tls_certificate_file:
# we only need the certificate for the tls_fingerprints. Reload it if we
# can, but it's not a fatal error if we can't.
try:
self.tls_certificate = self.read_tls_certificate()
except Exception as e:
logger.info(
"Unable to read TLS certificate (%s). Ignoring as no "
"tls listeners enabled.", e,
)
self.tls_fingerprints = list(self._original_tls_fingerprints)
if self.tls_certificate:
# Check that our own certificate is included in the list of fingerprints
# and include it if it is not.
x509_certificate_bytes = crypto.dump_certificate(
crypto.FILETYPE_ASN1, self.tls_certificate
)
sha256_fingerprint = encode_base64(sha256(x509_certificate_bytes).digest())
sha256_fingerprints = set(f["sha256"] for f in self.tls_fingerprints)
if sha256_fingerprint not in sha256_fingerprints:
self.tls_fingerprints.append({u"sha256": sha256_fingerprint})
开发者ID:matrix-org,项目名称:synapse,代码行数:34,代码来源:tls.py
示例5: select_pdus
def select_pdus(cursor):
cursor.execute(
"SELECT pdu_id, origin FROM pdus ORDER BY depth ASC"
)
ids = cursor.fetchall()
pdu_tuples = store._get_pdu_tuples(cursor, ids)
pdus = [Pdu.from_pdu_tuple(p) for p in pdu_tuples]
reference_hashes = {}
for pdu in pdus:
try:
if pdu.prev_pdus:
print "PROCESS", pdu.pdu_id, pdu.origin, pdu.prev_pdus
for pdu_id, origin, hashes in pdu.prev_pdus:
ref_alg, ref_hsh = reference_hashes[(pdu_id, origin)]
hashes[ref_alg] = encode_base64(ref_hsh)
store._store_prev_pdu_hash_txn(cursor, pdu.pdu_id, pdu.origin, pdu_id, origin, ref_alg, ref_hsh)
print "SUCCESS", pdu.pdu_id, pdu.origin, pdu.prev_pdus
pdu = add_event_pdu_content_hash(pdu)
ref_alg, ref_hsh = compute_pdu_event_reference_hash(pdu)
reference_hashes[(pdu.pdu_id, pdu.origin)] = (ref_alg, ref_hsh)
store._store_pdu_reference_hash_txn(cursor, pdu.pdu_id, pdu.origin, ref_alg, ref_hsh)
for alg, hsh_base64 in pdu.hashes.items():
print alg, hsh_base64
store._store_pdu_content_hash_txn(cursor, pdu.pdu_id, pdu.origin, alg, decode_base64(hsh_base64))
except:
print "FAILED_", pdu.pdu_id, pdu.origin, pdu.prev_pdus
开发者ID:0-T-0,项目名称:synapse,代码行数:33,代码来源:hash_history.py
示例6: add_event_hashes
def add_event_hashes(self, event_ids):
hashes = yield self.get_event_reference_hashes(event_ids)
hashes = {
e_id: {k: encode_base64(v) for k, v in h.items() if k == "sha256"}
for e_id, h in hashes.items()
}
defer.returnValue(list(hashes.items()))
开发者ID:matrix-org,项目名称:synapse,代码行数:8,代码来源:signatures.py
示例7: encode_signing_key_base64
def encode_signing_key_base64(key):
"""Encode a signing key as base64
Args:
key (SigningKey): A signing key to encode.
Returns:
base64 encoded string.
"""
return encode_base64(key.encode())
开发者ID:matrix-org,项目名称:python-signedjson,代码行数:8,代码来源:key.py
示例8: encode_verify_key_base64
def encode_verify_key_base64(key):
"""Encode a verify key as base64
Args:
key (VerifyKey): A signing key to encode.
Returns:
base64 encoded string.
"""
return encode_base64(key.encode())
开发者ID:matrix-org,项目名称:python-signedjson,代码行数:8,代码来源:key.py
示例9: get_server_verify_key_v2_direct
def get_server_verify_key_v2_direct(self, server_name, key_ids):
keys = {}
for requested_key_id in key_ids:
if requested_key_id in keys:
continue
(response, tls_certificate) = yield fetch_server_key(
server_name, self.hs.tls_server_context_factory,
path=(b"/_matrix/key/v2/server/%s" % (
urllib.quote(requested_key_id),
)).encode("ascii"),
)
if (u"signatures" not in response
or server_name not in response[u"signatures"]):
raise KeyLookupError("Key response not signed by remote server")
if "tls_fingerprints" not in response:
raise KeyLookupError("Key response missing TLS fingerprints")
certificate_bytes = crypto.dump_certificate(
crypto.FILETYPE_ASN1, tls_certificate
)
sha256_fingerprint = hashlib.sha256(certificate_bytes).digest()
sha256_fingerprint_b64 = encode_base64(sha256_fingerprint)
response_sha256_fingerprints = set()
for fingerprint in response[u"tls_fingerprints"]:
if u"sha256" in fingerprint:
response_sha256_fingerprints.add(fingerprint[u"sha256"])
if sha256_fingerprint_b64 not in response_sha256_fingerprints:
raise KeyLookupError("TLS certificate not allowed by fingerprints")
response_keys = yield self.process_v2_response(
from_server=server_name,
requested_ids=[requested_key_id],
response_json=response,
)
keys.update(response_keys)
yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
run_in_background(
self.store_keys,
server_name=key_server_name,
from_server=server_name,
verify_keys=verify_keys,
)
for key_server_name, verify_keys in keys.items()
],
consumeErrors=True
).addErrback(unwrapFirstError))
defer.returnValue(keys)
开发者ID:rubo77,项目名称:synapse,代码行数:57,代码来源:keyring.py
示例10: render_GET
def render_GET(self, request):
err, args = get_args(request, ("public_key",))
if err:
return json.dumps(err)
pubKey = self.sydent.keyring.ed25519.verify_key
pubKeyBase64 = encode_base64(pubKey.encode())
return json.dumps({'valid': args["public_key"] == pubKeyBase64})
开发者ID:matrix-org,项目名称:sydent,代码行数:9,代码来源:pubkeyservlets.py
示例11: select_v1_keys
def select_v1_keys(connection):
cursor = connection.cursor()
cursor.execute("SELECT server_name, key_id, verify_key FROM server_signature_keys")
rows = cursor.fetchall()
cursor.close()
results = {}
for server_name, key_id, verify_key in rows:
results.setdefault(server_name, {})[key_id] = encode_base64(verify_key)
return results
开发者ID:DoubleMalt,项目名称:synapse,代码行数:9,代码来源:convert_server_keys.py
示例12: event_id
def event_id(self):
# We have to import this here as otherwise we get an import loop which
# is hard to break.
from synapse.crypto.event_signing import compute_event_reference_hash
if self._event_id:
return self._event_id
self._event_id = "$" + encode_base64(compute_event_reference_hash(self)[1])
return self._event_id
开发者ID:matrix-org,项目名称:synapse,代码行数:9,代码来源:__init__.py
示例13: test_sign_and_verify
def test_sign_and_verify(self):
self.assertIn('signatures', self.signed)
self.assertIn('Alice', self.signed['signatures'])
self.assertIn('mock:test', self.signed['signatures']['Alice'])
self.assertEqual(
self.signed['signatures']['Alice']['mock:test'],
encode_base64(b'x_______')
)
self.assertEqual(self.sigkey.signed_bytes, b'{"foo":"bar"}')
verify_signed_json(self.signed, 'Alice', self.verkey)
开发者ID:matrix-org,项目名称:python-signedjson,代码行数:10,代码来源:test_sign.py
示例14: response_json_object
def response_json_object(self):
verify_keys = {}
for key in self.config.signing_key:
verify_key_bytes = key.verify_key.encode()
key_id = "%s:%s" % (key.alg, key.version)
verify_keys[key_id] = {
u"key": encode_base64(verify_key_bytes)
}
old_verify_keys = {}
for key in self.config.old_signing_keys:
key_id = "%s:%s" % (key.alg, key.version)
verify_key_bytes = key.encode()
old_verify_keys[key_id] = {
u"key": encode_base64(verify_key_bytes),
u"expired_ts": key.expired,
}
x509_certificate_bytes = crypto.dump_certificate(
crypto.FILETYPE_ASN1,
self.config.tls_certificate
)
sha256_fingerprint = sha256(x509_certificate_bytes).digest()
json_object = {
u"valid_until_ts": self.valid_until_ts,
u"server_name": self.config.server_name,
u"verify_keys": verify_keys,
u"old_verify_keys": old_verify_keys,
u"tls_fingerprints": [{
u"sha256": encode_base64(sha256_fingerprint),
}]
}
for key in self.config.signing_key:
json_object = sign_json(
json_object,
self.config.server_name,
key,
)
return json_object
开发者ID:0-T-0,项目名称:synapse,代码行数:41,代码来源:local_key_resource.py
示例15: add_event_hashes
def add_event_hashes(self, event_ids):
hashes = yield self.get_event_reference_hashes(
event_ids
)
hashes = [
{
k: encode_base64(v) for k, v in h.items()
if k == "sha256"
}
for h in hashes
]
defer.returnValue(zip(event_ids, hashes))
开发者ID:Vutsuak16,项目名称:synapse,代码行数:13,代码来源:signatures.py
示例16: response_json_object
def response_json_object(server_config):
verify_keys = {}
for key in server_config.signing_key:
verify_key_bytes = key.verify_key.encode()
key_id = "%s:%s" % (key.alg, key.version)
verify_keys[key_id] = encode_base64(verify_key_bytes)
x509_certificate_bytes = crypto.dump_certificate(
crypto.FILETYPE_ASN1,
server_config.tls_certificate
)
json_object = {
u"server_name": server_config.server_name,
u"verify_keys": verify_keys,
u"tls_certificate": encode_base64(x509_certificate_bytes)
}
for key in server_config.signing_key:
json_object = sign_json(
json_object,
server_config.server_name,
key,
)
return json_object
开发者ID:roblabla,项目名称:synapse,代码行数:24,代码来源:server_key_resource.py
示例17: _get_latest_event_ids_and_hashes_in_room
def _get_latest_event_ids_and_hashes_in_room(self, txn, room_id):
sql = (
"SELECT e.event_id, e.depth FROM events as e "
"INNER JOIN event_forward_extremities as f "
"ON e.event_id = f.event_id "
"AND e.room_id = f.room_id "
"WHERE f.room_id = ?"
)
txn.execute(sql, (room_id,))
results = []
for event_id, depth in txn.fetchall():
hashes = self._get_event_reference_hashes_txn(txn, event_id)
prev_hashes = {
k: encode_base64(v) for k, v in hashes.items() if k == "sha256"
}
results.append((event_id, prev_hashes, depth))
return results
开发者ID:matrix-org,项目名称:synapse,代码行数:20,代码来源:event_federation.py
示例18: main
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"input_json", nargs="?", type=argparse.FileType('r'), default=sys.stdin
)
args = parser.parse_args()
logging.basicConfig()
event_json = dictobj(json.load(args.input_json))
algorithms = {"sha256": hashlib.sha256}
for alg_name in event_json.hashes:
if check_event_content_hash(event_json, algorithms[alg_name]):
print("PASS content hash %s" % (alg_name,))
else:
print("FAIL content hash %s" % (alg_name,))
for algorithm in algorithms.values():
name, h_bytes = compute_event_reference_hash(event_json, algorithm)
print("Reference hash %s: %s" % (name, encode_base64(h_bytes)))
开发者ID:DoubleMalt,项目名称:synapse,代码行数:21,代码来源:check_event_hash.py
示例19: check_event_content_hash
def check_event_content_hash(event, hash_algorithm=hashlib.sha256):
"""Check whether the hash for this PDU matches the contents"""
name, expected_hash = compute_content_hash(event, hash_algorithm)
logger.debug("Expecting hash: %s", encode_base64(expected_hash))
if name not in event.hashes:
raise SynapseError(
400,
"Algorithm %s not in hashes %s" % (
name, list(event.hashes),
),
Codes.UNAUTHORIZED,
)
message_hash_base64 = event.hashes[name]
try:
message_hash_bytes = decode_base64(message_hash_base64)
except:
raise SynapseError(
400,
"Invalid base64: %s" % (message_hash_base64,),
Codes.UNAUTHORIZED,
)
return message_hash_bytes == expected_hash
开发者ID:roblabla,项目名称:synapse,代码行数:22,代码来源:event_signing.py
示例20: read_config
def read_config(self, config):
self.tls_certificate = self.read_tls_certificate(
config.get("tls_certificate_path")
)
self.tls_certificate_file = config.get("tls_certificate_path")
self.no_tls = config.get("no_tls", False)
if self.no_tls:
self.tls_private_key = None
else:
self.tls_private_key = self.read_tls_private_key(
config.get("tls_private_key_path")
)
self.tls_dh_params_path = self.check_file(
config.get("tls_dh_params_path"), "tls_dh_params"
)
self.tls_fingerprints = config["tls_fingerprints"]
# Check that our own certificate is included in the list of fingerprints
# and include it if it is not.
x509_certificate_bytes = crypto.dump_certificate(
crypto.FILETYPE_ASN1,
self.tls_certificate
)
sha256_fingerprint = encode_base64(sha256(x509_certificate_bytes).digest())
sha256_fingerprints = set(f["sha256"] for f in self.tls_fingerprints)
if sha256_fingerprint not in sha256_fingerprints:
self.tls_fingerprints.append({u"sha256": sha256_fingerprint})
# This config option applies to non-federation HTTP clients
# (e.g. for talking to recaptcha, identity servers, and such)
# It should never be used in production, and is intended for
# use only when running tests.
self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get(
"use_insecure_ssl_client_just_for_testing_do_not_use"
)
开发者ID:DoubleMalt,项目名称:synapse,代码行数:39,代码来源:tls.py
注:本文中的unpaddedbase64.encode_base64函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论