本文整理汇总了Python中zato.common.crypto.CryptoManager类的典型用法代码示例。如果您正苦于以下问题:Python CryptoManager类的具体用法?Python CryptoManager怎么用?Python CryptoManager使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了CryptoManager类的17个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _on_server
def _on_server(self, args):
repo_dir = join(self.config_dir, 'repo')
server_conf = ConfigObj(join(repo_dir, 'server.conf'))
cm = CryptoManager(priv_key_location=abspath(join(repo_dir, server_conf['crypto']['priv_key_location'])))
cm.load_keys()
engine_params = dict(server_conf['odb'].items())
engine_params['extra'] = {}
engine_params['pool_size'] = 1
query = ping_queries[engine_params['engine']]
session = create_pool(cm, engine_params)
session.execute(query)
session.close()
if self.show_output:
self.logger.info('SQL ODB connection OK')
kvdb_config = Bunch(dict(server_conf['kvdb'].items()))
kvdb = KVDB(None, kvdb_config, cm.decrypt)
kvdb.init()
kvdb.conn.info()
kvdb.close()
if self.show_output:
self.logger.info('Redis connection OK')
开发者ID:Adniel,项目名称:zato,代码行数:30,代码来源:check_config.py
示例2: test_enrypt
def test_enrypt(self):
""" Encrypt a message using the public key.
"""
_plain_text = uuid4().hex
with NamedTemporaryFile(prefix='zato-test-') as tf_priv:
with NamedTemporaryFile(prefix='zato-test-') as tf_pub:
tf_priv.write(priv_key)
tf_priv.flush()
tf_pub.write(pub_key)
tf_pub.flush()
cm_priv = CryptoManager()
cm_priv.priv_key_location = tf_priv.name
cm_priv.load_keys()
cm_pub = CryptoManager()
cm_pub.pub_key_location = tf_pub.name
cm_pub.load_keys()
encrypted = cm_pub.encrypt(_plain_text)
decrypted = cm_priv.decrypt(encrypted)
eq_(_plain_text, decrypted)
开发者ID:Adniel,项目名称:zato,代码行数:26,代码来源:test_crypto.py
示例3: execute
def execute(self, args):
repo_dir = os.path.join(os.path.abspath(os.path.join(self.original_dir, args.path)), 'config', 'repo')
config = get_config(repo_dir, 'server.conf')
priv_key_location = os.path.abspath(os.path.join(repo_dir, config.crypto.priv_key_location))
cm = CryptoManager(priv_key_location=priv_key_location)
cm.load_keys()
engine_args = Bunch()
engine_args.odb_type = config.odb.engine
engine_args.odb_user = config.odb.username
engine_args.odb_password = cm.decrypt(config.odb.password)
engine_args.odb_host = config.odb.host
engine_args.odb_db_name = config.odb.db_name
engine = self._get_engine(engine_args)
session = self._get_session(engine)
auth = None
headers = {}
with closing(session) as session:
cluster = session.query(Server).\
filter(Server.token == config.main.token).\
one().cluster
channel = session.query(HTTPSOAP).\
filter(HTTPSOAP.cluster_id == cluster.id).\
filter(HTTPSOAP.url_path == args.url_path).\
one()
if channel.security_id:
security = session.query(HTTPBasicAuth).\
filter(HTTPBasicAuth.id == channel.security_id).\
first()
if security:
auth = (security.username, security.password)
if args.headers:
for pair in args.headers.strip().split(';'):
k, v = pair.strip().split('=', 1)
headers[k] = v
client = AnyServiceInvoker('http://{}'.format(config.main.gunicorn_bind), args.url_path, auth,
max_response_repr=int(args.max_response_repr), max_cid_repr=int(args.max_cid_repr))
func = client.invoke_async if args.async else client.invoke
response = func(args.name, args.payload, headers, args.channel, args.data_format, args.transport)
if response.ok:
self.logger.info(response.data or '(None)')
else:
self.logger.error(response.details)
if args.verbose:
self.logger.debug('inner.text:[{}]'.format(response.inner.text))
self.logger.debug('response:[{}]'.format(response))
开发者ID:barowski,项目名称:zato,代码行数:59,代码来源:service.py
示例4: get_crypto_manager_from_server_config
def get_crypto_manager_from_server_config(config, repo_dir):
priv_key_location = os.path.abspath(os.path.join(repo_dir, config.crypto.priv_key_location))
cm = CryptoManager(priv_key_location=priv_key_location)
cm.load_keys()
return cm
开发者ID:lanwan,项目名称:zato,代码行数:8,代码来源:util.py
示例5: xtest_decrypt_priv_key_in_string
def xtest_decrypt_priv_key_in_string(self):
""" Decrypt a message using a private key from string.
"""
cm = CryptoManager()
cm.priv_key = priv_key
cm.load_keys()
eq_(plain_text, cm.decrypt(secret))
开发者ID:Adniel,项目名称:zato,代码行数:8,代码来源:test_crypto.py
示例6: xtest_reset
def xtest_reset(self):
""" Resets all keys to None.
"""
cm = CryptoManager()
cm.reset()
eq_(cm.priv_key_location, None)
eq_(cm.pub_key_location, None)
eq_(cm.priv_key, None)
eq_(cm.pub_key, None)
开发者ID:Adniel,项目名称:zato,代码行数:10,代码来源:test_crypto.py
示例7: encrypt
def encrypt(data, priv_key, b64=True):
""" Encrypt data using a public key derived from the private key.
data - data to be encrypted
priv_key - private key to use (as a PEM string)
b64 - should the encrypted data be BASE64-encoded before being returned, defaults to True
"""
cm = CryptoManager(priv_key=priv_key)
cm.load_keys()
return cm.encrypt(data, b64)
开发者ID:m0rcq,项目名称:zato,代码行数:11,代码来源:util.py
示例8: decrypt
def decrypt(data, priv_key, b64=True):
""" Decrypts data using the given private key.
data - data to be encrypted
priv_key - private key to use (as a PEM string)
b64 - should the data be BASE64-decoded before being decrypted, defaults to True
"""
cm = CryptoManager(priv_key=priv_key)
cm.load_keys()
return cm.decrypt(data, b64)
开发者ID:m0rcq,项目名称:zato,代码行数:11,代码来源:util.py
示例9: _on_server
def _on_server(self, args):
repo_dir = join(self.config_dir, 'repo')
server_conf = ConfigObj(join(repo_dir, 'server.conf'))
cm = CryptoManager(priv_key_location=abspath(join(repo_dir, server_conf['crypto']['priv_key_location'])))
cm.load_keys()
self.on_server_check_sql_odb(cm, server_conf, repo_dir)
self.on_server_check_kvdb(cm, server_conf)
self.on_server_check_stale_unix_socket()
开发者ID:assad2012,项目名称:zato,代码行数:11,代码来源:check_config.py
示例10: get_crypto_manager_conf
def get_crypto_manager_conf(self, conf_file=None, priv_key_location=None, repo_dir=None):
repo_dir = repo_dir or join(self.config_dir, 'repo')
conf = None
if not priv_key_location:
conf = ConfigObj(join(repo_dir, conf_file))
priv_key_location = priv_key_location or abspath(join(repo_dir, conf['crypto']['priv_key_location']))
cm = CryptoManager(priv_key_location=priv_key_location)
cm.load_keys()
return cm, conf
开发者ID:dev-alex-alex2006hw,项目名称:zato,代码行数:12,代码来源:check_config.py
示例11: xtest_decrypt_priv_key_in_file
def xtest_decrypt_priv_key_in_file(self):
""" Decrypt a message using a private key from file.
"""
with NamedTemporaryFile(prefix='zato-test-') as tf:
tf.write(priv_key)
tf.flush()
cm = CryptoManager()
cm.priv_key_location = tf.name
cm.load_keys()
eq_(plain_text, cm.decrypt(secret))
开发者ID:Adniel,项目名称:zato,代码行数:12,代码来源:test_crypto.py
示例12: encrypt
def encrypt(data, pub_key, padding=RSA.pkcs1_padding, b64=True):
""" Encrypt data using the given public key.
data - data to be encrypted
pub_key - public key to use (as a PEM string)
padding - padding to use, defaults to PKCS#1
b64 - should the encrypted data be BASE64-encoded before being returned, defaults to True
"""
logger.debug('Using pub_key:[{}]'.format(pub_key))
cm = CryptoManager(pub_key=pub_key)
cm.load_keys()
return cm.encrypt(data, padding, b64)
开发者ID:barowski,项目名称:zato,代码行数:13,代码来源:util.py
示例13: decrypt
def decrypt(data, priv_key, padding=RSA.pkcs1_padding, b64=True):
""" Decrypts data using the given private key.
data - data to be encrypted
priv_key - private key to use (as a PEM string)
padding - padding to use, defaults to PKCS#1
b64 - should the data be BASE64-decoded before being decrypted, defaults to True
"""
logger.debug('Using priv_key:[{}]'.format(priv_key))
cm = CryptoManager(priv_key=priv_key)
cm.load_keys()
return cm.decrypt(data, padding, b64)
开发者ID:barowski,项目名称:zato,代码行数:13,代码来源:util.py
示例14: _update_crypto
def _update_crypto(
self, args, copy_crypto_func, update_secrets=False, load_secrets_func=None,
store_secrets_func=None, conf_file_name=None, priv_key_name=None, pub_key_name=None, secret_names=[]):
repo_dir = os.path.join(os.path.abspath(os.path.join(self.original_dir, args.path)), 'config', 'repo')
secrets = {}
if update_secrets:
priv_key_location = os.path.abspath(os.path.join(repo_dir, priv_key_name))
pub_key_location = os.path.abspath(os.path.join(repo_dir, pub_key_name))
conf_location = os.path.join(repo_dir, conf_file_name)
cm = CryptoManager(priv_key_location=priv_key_location)
cm.load_keys()
secrets, conf = load_secrets_func(secrets, secret_names, cm, conf_location, conf_file_name)
copy_crypto_func(repo_dir, args)
if update_secrets:
cm.reset()
cm.pub_key_location = pub_key_location
cm.load_keys()
store_secrets_func(secrets, secret_names, cm, conf_location, conf)
开发者ID:Aayush-Kasurde,项目名称:zato,代码行数:25,代码来源:crypto.py
示例15: _on_server
def _on_server(self, args):
repo_dir = join(self.config_dir, 'repo')
server_conf = ConfigObj(join(repo_dir, 'server.conf'))
cm = CryptoManager(priv_key_location=abspath(join(repo_dir, server_conf['crypto']['priv_key_location'])))
cm.load_keys()
self.on_server_check_sql_odb(cm, server_conf, repo_dir)
self.on_server_check_kvdb(cm, server_conf)
# enmasse actually needs a sockets because it means a server is running
# so we can't quit if one is available.
if getattr(args, 'check_stale_server_sockets', True):
self.on_server_check_stale_unix_socket()
开发者ID:alex-hutton,项目名称:zato,代码行数:15,代码来源:check_config.py
示例16: _on_web_admin
def _on_web_admin(self, args):
"""Load ODB using component config and update security definition."""
config_file = join(self.config_dir, 'repo', 'web-admin.conf')
with open(config_file) as fp:
c = bunchify(load(fp))
private_key = join(self.config_dir, 'repo', 'web-admin-priv-key.pem')
cm = CryptoManager(priv_key_location=private_key)
cm.load_keys()
db_password = cm.decrypt(c.DATABASE_PASSWORD)
engine_url = "%s://%s:%[email protected]%s:%s/%s" % (
c.db_type,
c.DATABASE_USER,
db_password,
c.DATABASE_HOST,
c.DATABASE_PORT,
c.DATABASE_NAME)
engine = sqlalchemy.create_engine(engine_url)
session = self._get_session(engine)
if engine.dialect.has_table(engine.connect(), 'sec_basic_auth'):
try:
secdef = session.query(HTTPBasicAuth).filter(
HTTPBasicAuth.name == args.secdef).one()
except sqlalchemy.orm.exc.NoResultFound:
self.logger.error(
"Security definition '%s' not found.", args.secdef)
return 1
self.logger.info("Security definition: %s" % secdef.name)
self.logger.info("Username: %s" % secdef.username)
self.logger.info("Old password: %s" % secdef.password)
self.logger.info("Setting new password.")
secdef.password = args.password
session.commit()
self.logger.info('OK')
else:
msg = "Zato ODB does not seem to have been set up yet."
self.logger.error(msg)
return self.SYS_ERROR.NO_ODB_FOUND
开发者ID:grenzi,项目名称:zato-deploy-cli,代码行数:43,代码来源:updatepassword.py
示例17: execute
def execute(self, args):
cm = CryptoManager(priv_key_location=os.path.abspath(args.path))
cm.load_keys()
self.logger.info('Secret is [{}]'.format(cm.decrypt(args.secret)))
开发者ID:barowski,项目名称:zato,代码行数:5,代码来源:crypto.py
注:本文中的zato.common.crypto.CryptoManager类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论