• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python crypto.CryptoManager类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中zato.common.crypto.CryptoManager的典型用法代码示例。如果您正苦于以下问题:Python CryptoManager类的具体用法?Python CryptoManager怎么用?Python CryptoManager使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了CryptoManager类的17个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _on_server

    def _on_server(self, args):
        
        repo_dir = join(self.config_dir, 'repo')
        server_conf = ConfigObj(join(repo_dir, 'server.conf'))

        cm = CryptoManager(priv_key_location=abspath(join(repo_dir, server_conf['crypto']['priv_key_location'])))
        cm.load_keys()
        
        engine_params = dict(server_conf['odb'].items())
        engine_params['extra'] = {}
        engine_params['pool_size'] = 1
        
        query = ping_queries[engine_params['engine']]
        
        session = create_pool(cm, engine_params)
        session.execute(query)
        session.close()
        
        if self.show_output:
            self.logger.info('SQL ODB connection OK')
        
        kvdb_config = Bunch(dict(server_conf['kvdb'].items()))
        kvdb = KVDB(None, kvdb_config, cm.decrypt)
        kvdb.init()
        
        kvdb.conn.info()
        kvdb.close()
        
        if self.show_output:
            self.logger.info('Redis connection OK')
开发者ID:Adniel,项目名称:zato,代码行数:30,代码来源:check_config.py


示例2: test_enrypt

 def test_enrypt(self):
     """ Encrypt a message using the public key.
     """
     _plain_text = uuid4().hex
     
     with NamedTemporaryFile(prefix='zato-test-') as tf_priv:
         with NamedTemporaryFile(prefix='zato-test-') as tf_pub:
             
             tf_priv.write(priv_key)
             tf_priv.flush()
             
             tf_pub.write(pub_key)
             tf_pub.flush()
             
             cm_priv = CryptoManager()
             cm_priv.priv_key_location = tf_priv.name
             cm_priv.load_keys()
             
             cm_pub = CryptoManager()
             cm_pub.pub_key_location = tf_pub.name
             cm_pub.load_keys()
             
             encrypted = cm_pub.encrypt(_plain_text)
             decrypted = cm_priv.decrypt(encrypted)
             
             eq_(_plain_text, decrypted)
开发者ID:Adniel,项目名称:zato,代码行数:26,代码来源:test_crypto.py


示例3: execute

    def execute(self, args):

        repo_dir = os.path.join(os.path.abspath(os.path.join(self.original_dir, args.path)), 'config', 'repo')
        config = get_config(repo_dir, 'server.conf')
        priv_key_location = os.path.abspath(os.path.join(repo_dir, config.crypto.priv_key_location))
        
        cm = CryptoManager(priv_key_location=priv_key_location)
        cm.load_keys()
        
        engine_args = Bunch()
        engine_args.odb_type = config.odb.engine
        engine_args.odb_user = config.odb.username
        engine_args.odb_password = cm.decrypt(config.odb.password)
        engine_args.odb_host = config.odb.host
        engine_args.odb_db_name = config.odb.db_name
        
        engine = self._get_engine(engine_args)
        session = self._get_session(engine)
        
        auth = None
        headers = {}
        
        with closing(session) as session:
            cluster = session.query(Server).\
                filter(Server.token == config.main.token).\
                one().cluster
            
            channel = session.query(HTTPSOAP).\
                filter(HTTPSOAP.cluster_id == cluster.id).\
                filter(HTTPSOAP.url_path == args.url_path).\
                one()
            
            if channel.security_id:
                security = session.query(HTTPBasicAuth).\
                    filter(HTTPBasicAuth.id == channel.security_id).\
                    first()
                
                if security:
                    auth = (security.username, security.password)
                    
        if args.headers:
            for pair in args.headers.strip().split(';'):
                k, v = pair.strip().split('=', 1)
                headers[k] = v
                    
        client = AnyServiceInvoker('http://{}'.format(config.main.gunicorn_bind), args.url_path, auth,
            max_response_repr=int(args.max_response_repr), max_cid_repr=int(args.max_cid_repr))
        
        func = client.invoke_async if args.async else client.invoke
        response = func(args.name, args.payload, headers, args.channel, args.data_format, args.transport)
        
        if response.ok:
            self.logger.info(response.data or '(None)')
        else:
            self.logger.error(response.details)
            
        if args.verbose:
            self.logger.debug('inner.text:[{}]'.format(response.inner.text))
            self.logger.debug('response:[{}]'.format(response))
开发者ID:barowski,项目名称:zato,代码行数:59,代码来源:service.py


示例4: get_crypto_manager_from_server_config

def get_crypto_manager_from_server_config(config, repo_dir):

    priv_key_location = os.path.abspath(os.path.join(repo_dir, config.crypto.priv_key_location))

    cm = CryptoManager(priv_key_location=priv_key_location)
    cm.load_keys()

    return cm
开发者ID:lanwan,项目名称:zato,代码行数:8,代码来源:util.py


示例5: xtest_decrypt_priv_key_in_string

 def xtest_decrypt_priv_key_in_string(self):
     """ Decrypt a message using a private key from string.
     """
     cm = CryptoManager()
     cm.priv_key = priv_key
     cm.load_keys()
     
     eq_(plain_text, cm.decrypt(secret))
开发者ID:Adniel,项目名称:zato,代码行数:8,代码来源:test_crypto.py


示例6: xtest_reset

 def xtest_reset(self):
     """ Resets all keys to None.
     """
     cm = CryptoManager()
     cm.reset()
     
     eq_(cm.priv_key_location, None)
     eq_(cm.pub_key_location, None)
     eq_(cm.priv_key, None)
     eq_(cm.pub_key, None)
开发者ID:Adniel,项目名称:zato,代码行数:10,代码来源:test_crypto.py


示例7: encrypt

def encrypt(data, priv_key, b64=True):
    """ Encrypt data using a public key derived from the private key.
    data - data to be encrypted
    priv_key - private key to use (as a PEM string)
    b64 - should the encrypted data be BASE64-encoded before being returned, defaults to True
    """

    cm = CryptoManager(priv_key=priv_key)
    cm.load_keys()

    return cm.encrypt(data, b64)
开发者ID:m0rcq,项目名称:zato,代码行数:11,代码来源:util.py


示例8: decrypt

def decrypt(data, priv_key, b64=True):
    """ Decrypts data using the given private key.
    data - data to be encrypted
    priv_key - private key to use (as a PEM string)
    b64 - should the data be BASE64-decoded before being decrypted, defaults to True
    """

    cm = CryptoManager(priv_key=priv_key)
    cm.load_keys()

    return cm.decrypt(data, b64)
开发者ID:m0rcq,项目名称:zato,代码行数:11,代码来源:util.py


示例9: _on_server

    def _on_server(self, args):

        repo_dir = join(self.config_dir, 'repo')
        server_conf = ConfigObj(join(repo_dir, 'server.conf'))

        cm = CryptoManager(priv_key_location=abspath(join(repo_dir, server_conf['crypto']['priv_key_location'])))
        cm.load_keys()

        self.on_server_check_sql_odb(cm, server_conf, repo_dir)
        self.on_server_check_kvdb(cm, server_conf)
        self.on_server_check_stale_unix_socket()
开发者ID:assad2012,项目名称:zato,代码行数:11,代码来源:check_config.py


示例10: get_crypto_manager_conf

    def get_crypto_manager_conf(self, conf_file=None, priv_key_location=None, repo_dir=None):
        repo_dir = repo_dir or join(self.config_dir, 'repo')
        conf = None

        if not priv_key_location:
            conf = ConfigObj(join(repo_dir, conf_file))
            priv_key_location = priv_key_location or abspath(join(repo_dir, conf['crypto']['priv_key_location']))

        cm = CryptoManager(priv_key_location=priv_key_location)
        cm.load_keys()

        return cm, conf
开发者ID:dev-alex-alex2006hw,项目名称:zato,代码行数:12,代码来源:check_config.py


示例11: xtest_decrypt_priv_key_in_file

 def xtest_decrypt_priv_key_in_file(self):
     """ Decrypt a message using a private key from file.
     """
     with NamedTemporaryFile(prefix='zato-test-') as tf:
         tf.write(priv_key)
         tf.flush()
         
         cm = CryptoManager()
         cm.priv_key_location = tf.name
         cm.load_keys()
         
         eq_(plain_text, cm.decrypt(secret))
开发者ID:Adniel,项目名称:zato,代码行数:12,代码来源:test_crypto.py


示例12: encrypt

def encrypt(data, pub_key, padding=RSA.pkcs1_padding, b64=True):
    """ Encrypt data using the given public key.
    data - data to be encrypted
    pub_key - public key to use (as a PEM string)
    padding - padding to use, defaults to PKCS#1
    b64 - should the encrypted data be BASE64-encoded before being returned, defaults to True
    """
    logger.debug('Using pub_key:[{}]'.format(pub_key))
    
    cm = CryptoManager(pub_key=pub_key)
    cm.load_keys()
    
    return cm.encrypt(data, padding, b64)
开发者ID:barowski,项目名称:zato,代码行数:13,代码来源:util.py


示例13: decrypt

def decrypt(data, priv_key, padding=RSA.pkcs1_padding, b64=True):
    """ Decrypts data using the given private key.
    data - data to be encrypted
    priv_key - private key to use (as a PEM string)
    padding - padding to use, defaults to PKCS#1
    b64 - should the data be BASE64-decoded before being decrypted, defaults to True
    """
    logger.debug('Using priv_key:[{}]'.format(priv_key))
    
    cm = CryptoManager(priv_key=priv_key)
    cm.load_keys()
    
    return cm.decrypt(data, padding, b64)
开发者ID:barowski,项目名称:zato,代码行数:13,代码来源:util.py


示例14: _update_crypto

 def _update_crypto(
         self, args, copy_crypto_func, update_secrets=False, load_secrets_func=None,
         store_secrets_func=None, conf_file_name=None, priv_key_name=None, pub_key_name=None, secret_names=[]):
     
     repo_dir = os.path.join(os.path.abspath(os.path.join(self.original_dir, args.path)), 'config', 'repo')
     secrets = {}
     
     if update_secrets:
         priv_key_location = os.path.abspath(os.path.join(repo_dir, priv_key_name))
         pub_key_location = os.path.abspath(os.path.join(repo_dir, pub_key_name))
         conf_location = os.path.join(repo_dir, conf_file_name)
         
         cm = CryptoManager(priv_key_location=priv_key_location)
         cm.load_keys()
         
         secrets, conf = load_secrets_func(secrets, secret_names, cm, conf_location, conf_file_name)
         
     copy_crypto_func(repo_dir, args)
     
     if update_secrets:
         cm.reset()
         cm.pub_key_location = pub_key_location
         cm.load_keys()
         
         store_secrets_func(secrets, secret_names, cm, conf_location, conf)
开发者ID:Aayush-Kasurde,项目名称:zato,代码行数:25,代码来源:crypto.py


示例15: _on_server

    def _on_server(self, args):

        repo_dir = join(self.config_dir, 'repo')
        server_conf = ConfigObj(join(repo_dir, 'server.conf'))

        cm = CryptoManager(priv_key_location=abspath(join(repo_dir, server_conf['crypto']['priv_key_location'])))
        cm.load_keys()

        self.on_server_check_sql_odb(cm, server_conf, repo_dir)
        self.on_server_check_kvdb(cm, server_conf)

        # enmasse actually needs a sockets because it means a server is running
        # so we can't quit if one is available.
        if getattr(args, 'check_stale_server_sockets', True):
            self.on_server_check_stale_unix_socket()
开发者ID:alex-hutton,项目名称:zato,代码行数:15,代码来源:check_config.py


示例16: _on_web_admin

    def _on_web_admin(self, args):
        """Load ODB using component config and update security definition."""
        config_file = join(self.config_dir, 'repo', 'web-admin.conf')

        with open(config_file) as fp:
            c = bunchify(load(fp))

        private_key = join(self.config_dir, 'repo', 'web-admin-priv-key.pem')
        cm = CryptoManager(priv_key_location=private_key)
        cm.load_keys()
        db_password = cm.decrypt(c.DATABASE_PASSWORD)

        engine_url = "%s://%s:%[email protected]%s:%s/%s" % (
            c.db_type,
            c.DATABASE_USER,
            db_password,
            c.DATABASE_HOST,
            c.DATABASE_PORT,
            c.DATABASE_NAME)

        engine = sqlalchemy.create_engine(engine_url)
        session = self._get_session(engine)

        if engine.dialect.has_table(engine.connect(), 'sec_basic_auth'):
            try:
                secdef = session.query(HTTPBasicAuth).filter(
                    HTTPBasicAuth.name == args.secdef).one()
            except sqlalchemy.orm.exc.NoResultFound:
                self.logger.error(
                    "Security definition '%s' not found.", args.secdef)
                return 1

            self.logger.info("Security definition: %s" % secdef.name)
            self.logger.info("Username: %s" % secdef.username)
            self.logger.info("Old password: %s" % secdef.password)
            self.logger.info("Setting new password.")
            secdef.password = args.password
            session.commit()
            self.logger.info('OK')
        else:
            msg = "Zato ODB does not seem to have been set up yet."
            self.logger.error(msg)
            return self.SYS_ERROR.NO_ODB_FOUND
开发者ID:grenzi,项目名称:zato-deploy-cli,代码行数:43,代码来源:updatepassword.py


示例17: execute

 def execute(self, args):
     cm = CryptoManager(priv_key_location=os.path.abspath(args.path))
     cm.load_keys()
     
     self.logger.info('Secret is [{}]'.format(cm.decrypt(args.secret)))
开发者ID:barowski,项目名称:zato,代码行数:5,代码来源:crypto.py



注:本文中的zato.common.crypto.CryptoManager类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python test.rand_bool函数代码示例发布时间:2022-05-26
下一篇:
Python util.rand_string函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap