本文整理汇总了Python中unpaddedbase64.decode_base64函数的典型用法代码示例。如果您正苦于以下问题:Python decode_base64函数的具体用法?Python decode_base64怎么用?Python decode_base64使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了decode_base64函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: from_token
def from_token(cls, token):
if PY3:
# The argument raw=False is only available on new versions of
# msgpack, and only really needed on Python 3. Gate it behind
# a PY3 check to avoid causing issues on Debian-packaged versions.
decoded = msgpack.loads(decode_base64(token), raw=False)
else:
decoded = msgpack.loads(decode_base64(token))
return RoomListNextBatch(**{
cls.REVERSE_KEY_DICT[key]: val
for key, val in decoded.items()
})
开发者ID:matrix-org,项目名称:synapse,代码行数:12,代码来源:room_list.py
示例2: check_event_content_hash
def check_event_content_hash(event, hash_algorithm=hashlib.sha256):
"""Check whether the hash for this PDU matches the contents"""
name, expected_hash = compute_content_hash(event.get_pdu_json(), hash_algorithm)
logger.debug("Expecting hash: %s", encode_base64(expected_hash))
# some malformed events lack a 'hashes'. Protect against it being missing
# or a weird type by basically treating it the same as an unhashed event.
hashes = event.get("hashes")
if not isinstance(hashes, dict):
raise SynapseError(400, "Malformed 'hashes'", Codes.UNAUTHORIZED)
if name not in hashes:
raise SynapseError(
400,
"Algorithm %s not in hashes %s" % (
name, list(hashes),
),
Codes.UNAUTHORIZED,
)
message_hash_base64 = hashes[name]
try:
message_hash_bytes = decode_base64(message_hash_base64)
except Exception:
raise SynapseError(
400,
"Invalid base64: %s" % (message_hash_base64,),
Codes.UNAUTHORIZED,
)
return message_hash_bytes == expected_hash
开发者ID:matrix-org,项目名称:synapse,代码行数:29,代码来源:event_signing.py
示例3: _verify_third_party_invite
def _verify_third_party_invite(self, event, auth_events):
"""
Validates that the invite event is authorized by a previous third-party invite.
Checks that the public key, and keyserver, match those in the third party invite,
and that the invite event has a signature issued using that public key.
Args:
event: The m.room.member join event being validated.
auth_events: All relevant previous context events which may be used
for authorization decisions.
Return:
True if the event fulfills the expectations of a previous third party
invite event.
"""
if "third_party_invite" not in event.content:
return False
if "signed" not in event.content["third_party_invite"]:
return False
signed = event.content["third_party_invite"]["signed"]
for key in {"mxid", "token"}:
if key not in signed:
return False
token = signed["token"]
invite_event = auth_events.get(
(EventTypes.ThirdPartyInvite, token,)
)
if not invite_event:
return False
if event.user_id != invite_event.user_id:
return False
try:
public_key = invite_event.content["public_key"]
if signed["mxid"] != event.state_key:
return False
if signed["token"] != token:
return False
for server, signature_block in signed["signatures"].items():
for key_name, encoded_signature in signature_block.items():
if not key_name.startswith("ed25519:"):
return False
verify_key = decode_verify_key_bytes(
key_name,
decode_base64(public_key)
)
verify_signed_json(signed, server, verify_key)
# We got the public key from the invite, so we know that the
# correct server signed the signed bundle.
# The caller is responsible for checking that the signing
# server has not revoked that public key.
return True
return False
except (KeyError, SignatureVerifyException,):
return False
开发者ID:Vutsuak16,项目名称:synapse,代码行数:59,代码来源:auth.py
示例4: get_server_keys
def get_server_keys(server_name, target, port):
url = "https://%s:%i/_matrix/key/v1" % (target, port)
keys = json.load(urllib2.urlopen(url))
verify_keys = {}
for key_id, key_base64 in keys["verify_keys"].items():
verify_key = decode_verify_key_bytes(key_id, decode_base64(key_base64))
verify_signed_json(keys, server_name, verify_key)
verify_keys[key_id] = verify_key
return verify_keys
开发者ID:0-T-0,项目名称:synapse,代码行数:9,代码来源:check_signature.py
示例5: read_perspectives
def read_perspectives(self, perspectives_servers):
servers = {}
for server_name, server_config in perspectives_servers.items():
for key_id, key_data in server_config["verify_keys"].items():
if is_signing_algorithm_supported(key_id):
key_base64 = key_data["key"]
key_bytes = decode_base64(key_base64)
verify_key = decode_verify_key_bytes(key_id, key_bytes)
servers.setdefault(server_name, {})[key_id] = verify_key
return servers
开发者ID:matrix-org,项目名称:synapse,代码行数:10,代码来源:key.py
示例6: test_decode
def test_decode(self):
self.assertEqual(decode_base64(u''), b'')
self.assertEqual(decode_base64(u'AA'), b'\x00')
self.assertEqual(decode_base64(u'AAA'), b'\x00\x00')
self.assertEqual(decode_base64(u'AAAA'), b'\x00\x00\x00')
with self.assertRaises(Exception):
decode_base64(u'A')
开发者ID:matrix-org,项目名称:python-unpaddedbase64,代码行数:7,代码来源:test_unpaddedbase64.py
示例7: read_old_signing_keys
def read_old_signing_keys(self, old_signing_keys):
keys = {}
for key_id, key_data in old_signing_keys.items():
if is_signing_algorithm_supported(key_id):
key_base64 = key_data["key"]
key_bytes = decode_base64(key_base64)
verify_key = decode_verify_key_bytes(key_id, key_bytes)
verify_key.expired_ts = key_data["expired_ts"]
keys[key_id] = verify_key
else:
raise ConfigError(
"Unsupported signing algorithm for old key: %r" % (key_id,)
)
return keys
开发者ID:matrix-org,项目名称:synapse,代码行数:14,代码来源:key.py
示例8: read_old_signing_keys
def read_old_signing_keys(stream):
"""Reads a list of old keys from a stream
Args:
stream : A stream to iterate for keys.
Returns:
list of VerifyKey objects.
"""
keys = []
for line in stream:
algorithm, version, expired, key_base64 = line.split()
key_name = "%s:%s" % (algorithm, version,)
key = decode_verify_key_bytes(key_name, decode_base64(key_base64))
key.expired = int(expired)
keys.append(key)
return keys
开发者ID:matrix-org,项目名称:python-signedjson,代码行数:15,代码来源:key.py
示例9: decode_signing_key_base64
def decode_signing_key_base64(algorithm, version, key_base64):
"""Decode a base64 encoded signing key
Args:
algorithm (str): The algorithm the key is for (currently "ed25519").
version (str): Identifies this key out of the keys for this entity.
key_base64 (str): Base64 encoded bytes of the key.
Returns:
A SigningKey object.
"""
if algorithm == NACL_ED25519:
key_bytes = decode_base64(key_base64)
key = nacl.signing.SigningKey(key_bytes)
key.version = version
key.alg = NACL_ED25519
return key
else:
raise ValueError("Unsupported algorithm %s" % (algorithm,))
开发者ID:matrix-org,项目名称:python-signedjson,代码行数:17,代码来源:key.py
示例10: __init__
def __init__(self, sydent, server_name, port, pubkeys):
super(RemotePeer, self).__init__(server_name, pubkeys)
self.sydent = sydent
# look up or build the replication URL
try:
replication_url = sydent.cfg.get(
"peer.%s" % server_name, "base_replication_url",
)
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
if not port:
port = 1001
replication_url = "https://%s:%i" % (server_name, port)
if replication_url[-1:] != '/':
replication_url += "/"
replication_url += "_matrix/identity/replicate/v1/push"
self.replication_url = replication_url
# Get verify key for this peer
# Check if their key is base64 or hex encoded
pubkey = self.pubkeys[SIGNING_KEY_ALGORITHM]
try:
# Check for hex encoding
int(pubkey, 16)
# Decode hex into bytes
pubkey_decoded = binascii.unhexlify(pubkey)
logger.warn("Peer public key of %s is hex encoded. Please update to base64 encoding", server_name)
except ValueError:
# Check for base64 encoding
try:
pubkey_decoded = decode_base64(pubkey)
except Exception as e:
raise ConfigError(
"Unable to decode public key for peer %s: %s" % (server_name, e),
)
self.verify_key = signedjson.key.decode_verify_key_bytes(SIGNING_KEY_ALGORITHM + ":", pubkey_decoded)
# Attach metadata
self.verify_key.alg = SIGNING_KEY_ALGORITHM
self.verify_key.version = 0
开发者ID:matrix-org,项目名称:sydent,代码行数:46,代码来源:peer.py
示例11: verify_any_signature
def verify_any_signature(self, data, server_hostname):
if server_hostname not in data["signatures"]:
raise AuthError(401, "No signature from server %s" % (server_hostname,))
for key_name, signature in data["signatures"][server_hostname].items():
key_data = yield self.hs.get_simple_http_client().get_json(
"%s%s/_matrix/identity/api/v1/pubkey/%s" %
(id_server_scheme, server_hostname, key_name,),
)
if "public_key" not in key_data:
raise AuthError(401, "No public key named %s from %s" %
(key_name, server_hostname,))
verify_signed_json(
data,
server_hostname,
decode_verify_key_bytes(key_name, decode_base64(key_data["public_key"]))
)
return
开发者ID:roblabla,项目名称:synapse,代码行数:17,代码来源:room.py
示例12: verifyServerSignedJson
def verifyServerSignedJson(self, signed_json, acceptable_server_names=None):
"""Given a signed json object, try to verify any one
of the signatures on it
XXX: This contains a fairly noddy version of the home server
SRV lookup and signature verification. It does no caching (just
fetches the signature each time and does not contact any other
servers to do perspective checks).
:param acceptable_server_names: If provided and not None,
only signatures from servers in this list will be accepted.
:type acceptable_server_names: list of strings
:return a tuple of the server name and key name that was
successfully verified. If the json cannot be verified,
raises SignatureVerifyException.
"""
if 'signatures' not in signed_json:
raise SignatureVerifyException("Signature missing")
for server_name, sigs in signed_json['signatures'].items():
if acceptable_server_names is not None:
if server_name not in acceptable_server_names:
continue
server_keys = yield self._getKeysForServer(server_name)
for key_name, sig in sigs.items():
if key_name in server_keys:
if 'key' not in server_keys[key_name]:
logger.warn("Ignoring key %s with no 'key'")
continue
key_bytes = decode_base64(server_keys[key_name]['key'])
verify_key = signedjson.key.decode_verify_key_bytes(key_name, key_bytes)
logger.info("verifying sig from key %r", key_name)
signedjson.sign.verify_signed_json(signed_json, server_name, verify_key)
logger.info("Verified signature with key %s from %s", key_name, server_name)
defer.returnValue((server_name, key_name))
logger.warn(
"No matching key found for signature block %r in server keys %r",
signed_json['signatures'], server_keys,
)
logger.warn(
"Unable to verify any signatures from block %r. Acceptable server names: %r",
signed_json['signatures'], acceptable_server_names,
)
raise SignatureVerifyException("No matching signature found")
开发者ID:matrix-org,项目名称:sydent,代码行数:45,代码来源:verifier.py
示例13: verify_signed_json
def verify_signed_json(json_object, signature_name, verify_key):
"""Check a signature on a signed JSON object.
Args:
json_object (dict): The signed JSON object to check.
signature_name (str): The name of the signature to check.
verify_key (syutil.crypto.VerifyKey): The key to verify the signature.
Raises:
InvalidSignature: If the signature isn't valid
"""
try:
signatures = json_object["signatures"]
except KeyError:
raise SignatureVerifyException("No signatures on this object")
key_id = "%s:%s" % (verify_key.alg, verify_key.version)
try:
signature_b64 = signatures[signature_name][key_id]
except:
raise SignatureVerifyException("Missing signature for %s, %s" % (signature_name, key_id))
try:
signature = decode_base64(signature_b64)
except:
raise SignatureVerifyException("Invalid signature base64 for %s, %s" % (signature_name, key_id))
json_object_copy = dict(json_object)
del json_object_copy["signatures"]
json_object_copy.pop("unsigned", None)
message = encode_canonical_json(json_object_copy)
# logger.debug("VERIFY: %s %s %s", signature_name, key_id, message)
try:
verify_key.verify(message, signature)
except:
logger.exception("Error verifying signature")
raise SignatureVerifyException("Unable to verify signature for %s " % signature_name)
开发者ID:matrix-org,项目名称:python-signedjson,代码行数:42,代码来源:sign.py
示例14: check_event_content_hash
def check_event_content_hash(event, hash_algorithm=hashlib.sha256):
"""Check whether the hash for this PDU matches the contents"""
name, expected_hash = compute_content_hash(event, hash_algorithm)
logger.debug("Expecting hash: %s", encode_base64(expected_hash))
if name not in event.hashes:
raise SynapseError(
400,
"Algorithm %s not in hashes %s" % (
name, list(event.hashes),
),
Codes.UNAUTHORIZED,
)
message_hash_base64 = event.hashes[name]
try:
message_hash_bytes = decode_base64(message_hash_base64)
except:
raise SynapseError(
400,
"Invalid base64: %s" % (message_hash_base64,),
Codes.UNAUTHORIZED,
)
return message_hash_bytes == expected_hash
开发者ID:roblabla,项目名称:synapse,代码行数:22,代码来源:event_signing.py
示例15: from_token
def from_token(cls, token):
return RoomListNextBatch(**{
cls.REVERSE_KEY_DICT[key]: val
for key, val in msgpack.loads(decode_base64(token)).items()
})
开发者ID:mebjas,项目名称:synapse,代码行数:5,代码来源:room_list.py
示例16: select_pdus
def select_pdus(cursor):
cursor.execute(
"SELECT pdu_id, origin FROM pdus ORDER BY depth ASC"
)
ids = cursor.fetchall()
pdu_tuples = store._get_pdu_tuples(cursor, ids)
pdus = [Pdu.from_pdu_tuple(p) for p in pdu_tuples]
reference_hashes = {}
for pdu in pdus:
try:
if pdu.prev_pdus:
print "PROCESS", pdu.pdu_id, pdu.origin, pdu.prev_pdus
for pdu_id, origin, hashes in pdu.prev_pdus:
ref_alg, ref_hsh = reference_hashes[(pdu_id, origin)]
hashes[ref_alg] = encode_base64(ref_hsh)
store._store_prev_pdu_hash_txn(cursor, pdu.pdu_id, pdu.origin, pdu_id, origin, ref_alg, ref_hsh)
print "SUCCESS", pdu.pdu_id, pdu.origin, pdu.prev_pdus
pdu = add_event_pdu_content_hash(pdu)
ref_alg, ref_hsh = compute_pdu_event_reference_hash(pdu)
reference_hashes[(pdu.pdu_id, pdu.origin)] = (ref_alg, ref_hsh)
store._store_pdu_reference_hash_txn(cursor, pdu.pdu_id, pdu.origin, ref_alg, ref_hsh)
for alg, hsh_base64 in pdu.hashes.items():
print alg, hsh_base64
store._store_pdu_content_hash_txn(cursor, pdu.pdu_id, pdu.origin, alg, decode_base64(hsh_base64))
except:
print "FAILED_", pdu.pdu_id, pdu.origin, pdu.prev_pdus
开发者ID:0-T-0,项目名称:synapse,代码行数:33,代码来源:hash_history.py
示例17: process_v2_response
def process_v2_response(self, from_server, response_json,
requested_ids=[], only_from_server=True):
time_now_ms = self.clock.time_msec()
response_keys = {}
verify_keys = {}
for key_id, key_data in response_json["verify_keys"].items():
if is_signing_algorithm_supported(key_id):
key_base64 = key_data["key"]
key_bytes = decode_base64(key_base64)
verify_key = decode_verify_key_bytes(key_id, key_bytes)
verify_key.time_added = time_now_ms
verify_keys[key_id] = verify_key
old_verify_keys = {}
for key_id, key_data in response_json["old_verify_keys"].items():
if is_signing_algorithm_supported(key_id):
key_base64 = key_data["key"]
key_bytes = decode_base64(key_base64)
verify_key = decode_verify_key_bytes(key_id, key_bytes)
verify_key.expired = key_data["expired_ts"]
verify_key.time_added = time_now_ms
old_verify_keys[key_id] = verify_key
results = {}
server_name = response_json["server_name"]
if only_from_server:
if server_name != from_server:
raise ValueError(
"Expected a response for server %r not %r" % (
from_server, server_name
)
)
for key_id in response_json["signatures"].get(server_name, {}):
if key_id not in response_json["verify_keys"]:
raise ValueError(
"Key response must include verification keys for all"
" signatures"
)
if key_id in verify_keys:
verify_signed_json(
response_json,
server_name,
verify_keys[key_id]
)
signed_key_json = sign_json(
response_json,
self.config.server_name,
self.config.signing_key[0],
)
signed_key_json_bytes = encode_canonical_json(signed_key_json)
ts_valid_until_ms = signed_key_json[u"valid_until_ts"]
updated_key_ids = set(requested_ids)
updated_key_ids.update(verify_keys)
updated_key_ids.update(old_verify_keys)
response_keys.update(verify_keys)
response_keys.update(old_verify_keys)
yield defer.gatherResults(
[
preserve_fn(self.store.store_server_keys_json)(
server_name=server_name,
key_id=key_id,
from_server=server_name,
ts_now_ms=time_now_ms,
ts_expires_ms=ts_valid_until_ms,
key_json_bytes=signed_key_json_bytes,
)
for key_id in updated_key_ids
],
consumeErrors=True,
).addErrback(unwrapFirstError)
results[server_name] = response_keys
defer.returnValue(results)
开发者ID:pombredanne,项目名称:synapse-2,代码行数:79,代码来源:keyring.py
示例18: process_v2_response
def process_v2_response(
self, from_server, response_json, requested_ids=[],
):
"""Parse a 'Server Keys' structure from the result of a /key request
This is used to parse either the entirety of the response from
GET /_matrix/key/v2/server, or a single entry from the list returned by
POST /_matrix/key/v2/query.
Checks that each signature in the response that claims to come from the origin
server is valid. (Does not check that there actually is such a signature, for
some reason.)
Stores the json in server_keys_json so that it can be used for future responses
to /_matrix/key/v2/query.
Args:
from_server (str): the name of the server producing this result: either
the origin server for a /_matrix/key/v2/server request, or the notary
for a /_matrix/key/v2/query.
response_json (dict): the json-decoded Server Keys response object
requested_ids (iterable[str]): a list of the key IDs that were requested.
We will store the json for these key ids as well as any that are
actually in the response
Returns:
Deferred[dict[str, nacl.signing.VerifyKey]]:
map from key_id to key object
"""
time_now_ms = self.clock.time_msec()
response_keys = {}
verify_keys = {}
for key_id, key_data in response_json["verify_keys"].items():
if is_signing_algorithm_supported(key_id):
key_base64 = key_data["key"]
key_bytes = decode_base64(key_base64)
verify_key = decode_verify_key_bytes(key_id, key_bytes)
verify_key.time_added = time_now_ms
verify_keys[key_id] = verify_key
old_verify_keys = {}
for key_id, key_data in response_json["old_verify_keys"].items():
if is_signing_algorithm_supported(key_id):
key_base64 = key_data["key"]
key_bytes = decode_base64(key_base64)
verify_key = decode_verify_key_bytes(key_id, key_bytes)
verify_key.expired = key_data["expired_ts"]
verify_key.time_added = time_now_ms
old_verify_keys[key_id] = verify_key
server_name = response_json["server_name"]
for key_id in response_json["signatures"].get(server_name, {}):
if key_id not in response_json["verify_keys"]:
raise KeyLookupError(
"Key response must include verification keys for all"
" signatures"
)
if key_id in verify_keys:
verify_signed_json(
response_json,
server_name,
verify_keys[key_id]
)
signed_key_json = sign_json(
response_json,
self.config.server_name,
self.config.signing_key[0],
)
signed_key_json_bytes = encode_canonical_json(signed_key_json)
ts_valid_until_ms = signed_key_json[u"valid_until_ts"]
updated_key_ids = set(requested_ids)
updated_key_ids.update(verify_keys)
updated_key_ids.update(old_verify_keys)
response_keys.update(verify_keys)
response_keys.update(old_verify_keys)
yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
run_in_background(
self.store.store_server_keys_json,
server_name=server_name,
key_id=key_id,
from_server=from_server,
ts_now_ms=time_now_ms,
ts_expires_ms=ts_valid_until_ms,
key_json_bytes=signed_key_json_bytes,
)
for key_id in updated_key_ids
],
consumeErrors=True,
).addErrback(unwrapFirstError))
defer.returnValue(response_keys)
开发者ID:matrix-org,项目名称:synapse,代码行数:99,代码来源:keyring.py
示例19: get_server_verify_key_v1_direct
def get_server_verify_key_v1_direct(self, server_name, key_ids):
"""Finds a verification key for the server with one of the key ids.
Args:
server_name (str): The name of the server to fetch a key for.
keys_ids (list of str): The key_ids to check for.
"""
# Try to fetch the key from the remote server.
(response, tls_certificate) = yield fetch_server_key(
server_name, self.hs.tls_server_context_factory
)
# Check the response.
x509_certificate_bytes = crypto.dump_certificate(
crypto.FILETYPE_ASN1, tls_certificate
)
if ("signatures" not in response
or server_name not in response["signatures"]):
raise ValueError("Key response not signed by remote server")
if "tls_certificate" not in response:
raise ValueError("Key response missing TLS certificate")
tls_certificate_b64 = response["tls_certificate"]
if encode_base64(x509_certificate_bytes) != tls_certificate_b64:
raise ValueError("TLS certificate doesn't match")
# Cache the result in the datastore.
time_now_ms = self.clock.time_msec()
verify_keys = {}
for key_id, key_base64 in response["verify_keys"].items():
if is_signing_algorithm_supported(key_id):
key_bytes = decode_base64(key_base64)
verify_key = decode_verify_key_bytes(key_id, key_bytes)
verify_key.time_added = time_now_ms
verify_keys[key_id] = verify_key
for key_id in response["signatures"][server_name]:
if key_id not in response["verify_keys"]:
raise ValueError(
"Key response must include verification keys for all"
" signatures"
)
if key_id in verify_keys:
verify_signed_json(
response,
server_name,
verify_keys[key_id]
)
yield self.store.store_server_certificate(
server_name,
server_name,
time_now_ms,
tls_certificate,
)
yield self.store_keys(
server_name=server_name,
from_server=server_name,
verify_keys=verify_keys,
)
defer.returnValue(verify_keys)
开发者ID:pombredanne,项目名称:synapse-2,代码行数:70,代码来源:keyring.py
示例20: test_decode_urlunsafe_chars
def test_decode_urlunsafe_chars(self):
self.assertEqual(decode_base64(u'/+aa'), b'\xff\xe6\x9a')
self.assertEqual(decode_base64(u'_-aa'), b'\xff\xe6\x9a')
开发者ID:matrix-org,项目名称:python-unpaddedbase64,代码行数:3,代码来源:test_unpaddedbase64.py
注:本文中的unpaddedbase64.decode_base64函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论