• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python util.get_component_name函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中zato.common.util.get_component_name函数的典型用法代码示例。如果您正苦于以下问题:Python get_component_name函数的具体用法?Python get_component_name怎么用?Python get_component_name使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get_component_name函数的14个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: __init__

    def __init__(self, name, config, config_no_sensitive):
        self.logger = getLogger(self.__class__.__name__)

        self.name = name
        self.config = config
        self.engine_name = config['engine'] # self.engine.name is 'mysql' while 'self.engine_name' is mysql+pymysql

        # Safe for printing out to logs, any sensitive data has been shadowed
        self.config_no_sensitive = config_no_sensitive 
        
        _extra = {}

        # Postgres-only
        if self.engine_name.startswith('mysql'):
            _extra['pool_recycle'] = 600
        elif self.engine_name.startswith('postgres'):
            _extra['connect_args'] = {'application_name': get_component_name()}

        extra = self.config.get('extra') # Optional, hence .get
        _extra.update(parse_extra_into_dict(extra))

        engine_url = engine_def.format(**config)
        self.engine = create_engine(engine_url, pool_size=int(config['pool_size']), **_extra)
        
        event.listen(self.engine, 'checkin', self.on_checkin)
        event.listen(self.engine, 'checkout', self.on_checkout)
        event.listen(self.engine, 'connect', self.on_connect)
        event.listen(self.engine, 'first_connect', self.on_first_connect)
开发者ID:azazel75,项目名称:zato,代码行数:28,代码来源:sql.py


示例2: test_create_headers_soap12

    def test_create_headers_soap12(self):

        cid = rand_string()
        now = datetime.utcnow().isoformat()
        soap_action = rand_string()
        requests_module = _FakeRequestsModule()

        config = self._get_config()
        config['data_format'] = DATA_FORMAT.XML
        config['transport'] = URL_TYPE.SOAP
        config['soap_action'] = soap_action
        config['soap_version'] = '1.2'

        wrapper = HTTPSOAPWrapper(config, requests_module)
        user_headers = {rand_string():rand_string(), rand_string():rand_string()}

        headers = wrapper._create_headers(cid, user_headers, now)

        eq_(headers.pop('X-Zato-CID'), cid)
        eq_(headers.pop('X-Zato-Msg-TS'), now)
        eq_(headers.pop('X-Zato-Component'), get_component_name())
        eq_(headers.pop('SOAPAction'), soap_action)
        eq_(headers.pop('Content-Type'), CONTENT_TYPE.SOAP12)

        # Anything that is left must be user headers
        self.assertDictEqual(headers, user_headers)
开发者ID:giacomocariello,项目名称:zato,代码行数:26,代码来源:test_outgoing.py


示例3: _get_engine

    def _get_engine(self, args):
        engine_url = odb.engine_def.format(engine=args.odb_type, username=args.odb_user,
            password=args.odb_password, host=args.odb_host, port=args.odb_port,
            db_name=args.odb_db_name)

        connect_args = {'application_name':util.get_component_name('enmasse')} if args.odb_type == 'postgresql' else {}
        return sqlalchemy.create_engine(engine_url, connect_args=connect_args)
开发者ID:chunjieroad,项目名称:zato,代码行数:7,代码来源:__init__.py


示例4: __init__

    def __init__(self, config, requests_module=None):
        self.config = config
        self.config_no_sensitive = deepcopy(self.config)
        self.config_no_sensitive['password'] = '***'
        self.requests_module = requests_module or requests
        self.session = self.requests_module.session(pool_maxsize=self.config['pool_size'])
        self._component_name = get_component_name()

        self.address = None
        self.path_params = []

        self.set_address_data()
        self.set_auth()
开发者ID:alex-hutton,项目名称:zato,代码行数:13,代码来源:outgoing.py


示例5: __init__

    def __init__(self, config, requests_module=None):
        self.config = config
        self.config_no_sensitive = deepcopy(self.config)
        self.config_no_sensitive['password'] = '***'
        self.requests_module = requests_module or requests
        self.session = self.requests_module.session(pool_maxsize=self.config['pool_size'])
        
        self._component_name = get_component_name()
        
        self.soap = {}
        self.soap['1.1'] = {}
        self.soap['1.1']['content_type'] = 'text/xml; charset=utf-8'
        self.soap['1.1']['message'] = """<?xml version="1.0" encoding="utf-8"?>
<s11:Envelope xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:s11="http://schemas.xmlsoap.org/soap/envelope/">
  {header}
  <s11:Body>{data}</s11:Body>
</s11:Envelope>"""
        self.soap['1.1']['header_template'] = """<s11:Header xmlns:wsse="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd" >
          <wsse:Security>
            <wsse:UsernameToken>
              <wsse:Username>{Username}</wsse:Username>
              <wsse:Password Type="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-username-token-profile-1.0#PasswordText">{Password}</wsse:Password>
            </wsse:UsernameToken>
          </wsse:Security>
        </s11:Header>
        """

        self.soap['1.2'] = {}
        self.soap['1.2']['content_type'] = 'application/soap+xml; charset=utf-8'
        self.soap['1.2']['message'] = """<?xml version="1.0" encoding="utf-8"?>
<s12:Envelope xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:s12="http://www.w3.org/2003/05/soap-envelope/">
  {header}
  <s12:Body></s12:Body>
</s12:Envelope>"""
        self.soap['1.2']['header_template'] = """<s12:Header xmlns:wsse="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd" >
          <wsse:Security>
            <wsse:UsernameToken>
              <wsse:Username>{Username}</wsse:Username>
              <wsse:Password Type="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-username-token-profile-1.0#PasswordText">{Password}</wsse:Password>
            </wsse:UsernameToken>
          </wsse:Security>
        </s12:Header>
        """
        
        self.address = None
        self.path_params = []
        
        self.set_address_data()
        self.set_auth()
开发者ID:fatrix,项目名称:zato,代码行数:49,代码来源:outgoing.py


示例6: __init__

 def __init__(self, repo_location=None, def_id=None, out_id=None, init=True):
     super(OutgoingConnector, self).__init__(repo_location, def_id)
     self.out_id = out_id
     
     self.broker_client_id = 'amqp-publishing-connector'
     self.broker_callbacks = {
         TOPICS[MESSAGE_TYPE.TO_AMQP_PUBLISHING_CONNECTOR_ALL]: self.on_broker_msg,
         TOPICS[MESSAGE_TYPE.TO_AMQP_CONNECTOR_ALL]: self.on_broker_msg
     }
     self.broker_messages = self.broker_callbacks.keys()
     self.component_name = get_component_name('out-amqp')
     
     if init:
         self._init()
     
     logger.info('Started an AMQP publisher for [{}]'.format(self._conn_info()))
开发者ID:lanwan,项目名称:zato,代码行数:16,代码来源:outgoing.py


示例7: __init__

    def __init__(self, config, requests_module=None):
        self.config = config
        self.config['timeout'] = float(self.config['timeout'])
        self.config_no_sensitive = deepcopy(self.config)
        self.config_no_sensitive['password'] = '***'
        self.requests_module = requests_module or requests
        self.session = self.requests_module.session(pool_maxsize=self.config['pool_size'])
        self.https_adapter = HTTPSAdapter()
        self.session.mount('https://', self.https_adapter)
        self._component_name = get_component_name()
        self.default_content_type = self.get_default_content_type()

        self.address = None
        self.path_params = []

        self.set_address_data()
开发者ID:grenzi,项目名称:ctakes_exploration,代码行数:16,代码来源:outgoing.py


示例8: test_create_headers_no_data_format

    def test_create_headers_no_data_format(self):

        cid = rand_string()
        now = datetime.utcnow().isoformat()
        requests_module = _FakeRequestsModule()

        config = self._get_config()

        wrapper = HTTPSOAPWrapper(config, requests_module)
        user_headers = {rand_string():rand_string(), rand_string():rand_string()}

        headers = wrapper._create_headers(cid, user_headers, now)

        eq_(headers.pop('X-Zato-CID'), cid)
        eq_(headers.pop('X-Zato-Msg-TS'), now)
        eq_(headers.pop('X-Zato-Component'), get_component_name())

        # Anything that is left must be user headers
        # (note that there was no Content-Type because there was no data_format)
        self.assertDictEqual(headers, user_headers)
开发者ID:giacomocariello,项目名称:zato,代码行数:20,代码来源:test_outgoing.py


示例9: establish_connection

 def establish_connection(self):
     
     conninfo = self.client
     for name, default_value in self.default_connection_params.items():
         if not getattr(conninfo, name, None):
             setattr(conninfo, name, default_value)
     if conninfo.hostname == 'localhost':
         conninfo.hostname = '127.0.0.1'
     conn = self.Connection(host=conninfo.host,
                            userid=conninfo.userid,
                            password=conninfo.password,
                            login_method=conninfo.login_method,
                            virtual_host=conninfo.virtual_host,
                            insist=conninfo.insist,
                            ssl=conninfo.ssl,
                            connect_timeout=conninfo.connect_timeout,
                            heartbeat=conninfo.heartbeat,
                            client_properties={'zato-component':get_component_name(COMPONENT_PREFIX)})
     
     conn.client = self.client
     return conn
开发者ID:lanwan,项目名称:zato,代码行数:21,代码来源:outgoing.py


示例10: __init__

    def __init__(self, name, config, config_no_sensitive):
        self.name = name
        self.config = config
        self.logger = getLogger(self.__class__.__name__)
        
        # Safe for printing out to logs, any sensitive data has been shadowed
        self.config_no_sensitive = config_no_sensitive 
        
        _extra = {}
        _extra['connect_args'] = {'application_name': get_component_name()}

        extra = self.config.get('extra') # Optional, hence .get
        _extra.update(parse_extra_into_dict(extra))

        engine_url = engine_def.format(**config)
        self.engine = create_engine(engine_url, pool_size=int(config['pool_size']), **_extra)
        
        event.listen(self.engine, 'checkin', self.on_checkin)
        event.listen(self.engine, 'checkout', self.on_checkout)
        event.listen(self.engine, 'connect', self.on_connect)
        event.listen(self.engine, 'first_connect', self.on_first_connect)
开发者ID:rafael84,项目名称:zato,代码行数:21,代码来源:sql.py


示例11: test_create_headers_json

    def test_create_headers_json(self):

        cid = rand_string()
        now = datetime.utcnow().isoformat()
        requests_module = _FakeRequestsModule()

        config = self._get_config()
        config['data_format'] = DATA_FORMAT.JSON
        config['transport'] = URL_TYPE.PLAIN_HTTP

        wrapper = HTTPSOAPWrapper(config, requests_module)
        user_headers = {rand_string():rand_string(), rand_string():rand_string()}

        headers = wrapper._create_headers(cid, user_headers, now)

        eq_(headers.pop('X-Zato-CID'), cid)
        eq_(headers.pop('X-Zato-Msg-TS'), now)
        eq_(headers.pop('X-Zato-Component'), get_component_name())
        eq_(headers.pop('Content-Type'), CONTENT_TYPE.JSON)

        # Anything that is left must be user headers
        self.assertDictEqual(headers, user_headers)
开发者ID:giacomocariello,项目名称:zato,代码行数:22,代码来源:test_outgoing.py


示例12: __init__

    def __init__(self, name, config, config_no_sensitive):
        self.logger = getLogger(self.__class__.__name__)

        self.name = name
        self.config = config
        self.engine_name = config["engine"]  # self.engine.name is 'mysql' while 'self.engine_name' is mysql+pymysql

        # Safe for printing out to logs, any sensitive data has been shadowed
        self.config_no_sensitive = config_no_sensitive

        _extra = {}

        # MySQL only
        if self.engine_name.startswith("mysql"):
            _extra["pool_recycle"] = 600

        # Postgres-only
        elif self.engine_name.startswith("postgres"):
            _extra["connect_args"] = {"application_name": get_component_name()}

        extra = self.config.get("extra")  # Optional, hence .get
        _extra.update(parse_extra_into_dict(extra))

        # SQLite has no pools
        if self.engine_name != "sqlite":
            _extra["pool_size"] = int(config.get("pool_size", 1))
            if _extra["pool_size"] == 0:
                _extra["poolclass"] = NullPool

        engine_url = get_engine_url(config)
        self.engine = create_engine(engine_url, **_extra)

        event.listen(self.engine, "checkin", self.on_checkin)
        event.listen(self.engine, "checkout", self.on_checkout)
        event.listen(self.engine, "connect", self.on_connect)
        event.listen(self.engine, "first_connect", self.on_first_connect)
开发者ID:viatoriche,项目名称:zato,代码行数:36,代码来源:sql.py


示例13: _get_engine

 def _get_engine(self, args):
     connect_args = {'application_name':util.get_component_name('enmasse')} if args.odb_type == 'postgresql' else {}
     return sqlalchemy.create_engine(odb_util.get_engine_url(args), connect_args=connect_args)
开发者ID:davinirjr,项目名称:zato,代码行数:3,代码来源:__init__.py


示例14: _get_engine

 def _get_engine(self, args):
     connect_args = {"application_name": util.get_component_name("enmasse")} if args.odb_type == "postgresql" else {}
     return sqlalchemy.create_engine(get_engine_url(args), connect_args=connect_args)
开发者ID:dev-alex-alex2006hw,项目名称:zato,代码行数:3,代码来源:__init__.py



注:本文中的zato.common.util.get_component_name函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python util.new_cid函数代码示例发布时间:2022-05-26
下一篇:
Python tls.TLSServer类代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap