• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python log.LogFactory类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中modules.util.log.LogFactory的典型用法代码示例。如果您正苦于以下问题:Python LogFactory类的具体用法?Python LogFactory怎么用?Python LogFactory使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了LogFactory类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: run_plugin

 def run_plugin(self, values):
     log = LogFactory().get_log(__name__)
     # php_start_command = "/usr/sbin/apache2ctl -D FOREGROUND"
     php_start_command = "/etc/init.d/apache2 restart"
     p = subprocess.Popen(php_start_command, shell=True)
     output, errors = p.communicate()
     log.debug("Apache server started: [command] %s, [output] %s" % (php_start_command, output))
开发者ID:AlmavivA,项目名称:stratos,代码行数:7,代码来源:PhpServerStarterPlugin.py


示例2: DefaultHealthStatisticsReader

class DefaultHealthStatisticsReader(IHealthStatReaderPlugin):
    """
    Default implementation for the health statistics reader
    """

    def __init__(self):
        super(DefaultHealthStatisticsReader, self).__init__()
        self.log = LogFactory().get_log(__name__)

    def stat_cartridge_health(self, ca_health_stat):
        ca_health_stat.memory_usage = DefaultHealthStatisticsReader.__read_mem_usage()
        ca_health_stat.load_avg = DefaultHealthStatisticsReader.__read_load_avg()

        self.log.debug("Memory read: %r, CPU read: %r" % (ca_health_stat.memory_usage, ca_health_stat.load_avg))
        return ca_health_stat

    @staticmethod
    def __read_mem_usage():
        return psutil.virtual_memory().percent

    @staticmethod
    def __read_load_avg():
        (one, five, fifteen) = os.getloadavg()
        cores = multiprocessing.cpu_count()

        return (one / cores) * 100
开发者ID:AlmavivA,项目名称:private-paas,代码行数:26,代码来源:DefaultHealthStatisticsReader.py


示例3: MessageBrokerHeartBeatChecker

class MessageBrokerHeartBeatChecker(AbstractAsyncScheduledTask):
    """
    A scheduled task to periodically check if the connected message broker is online.
    If the message broker goes offline, it will disconnect the currently connected
    client object and it will return from the loop_forever() method.
    """

    def __init__(self, connected_client, mb_ip, mb_port, username=None, password=None):
        self.__mb_client = mqtt.Client()

        if username is not None:
            self.__mb_client.username_pw_set(username, password)

        self.__mb_ip = mb_ip
        self.__mb_port = mb_port
        self.__connected_client = connected_client
        self.__log = LogFactory().get_log(__name__)

    def execute_task(self):
        try:
            self.__mb_client.connect(self.__mb_ip, self.__mb_port, 60)
            self.__mb_client.disconnect()
        except Exception:
            self.__log.info(
                "Message broker %s:%s cannot be reached. Disconnecting client..." % (self.__mb_ip, self.__mb_port))
            self.__connected_client.disconnect()
开发者ID:gayangunarathne,项目名称:private-paas,代码行数:26,代码来源:subscriber.py


示例4: export_env_var

 def export_env_var(self, variable, value):
     log = LogFactory().get_log(__name__)
     if value is not None:
         os.environ[variable] = value
         log.info("Exported environment variable %s: %s" % (variable, value))
     else:
         log.warn("Could not export environment variable %s " % variable)
开发者ID:lasinducharith,项目名称:product-private-paas,代码行数:7,代码来源:wso2esb-startup-handler.py


示例5: EventSubscriber

class EventSubscriber(threading.Thread):
    """
    Provides functionality to subscribe to a given topic on the Stratos MB and
    register event handlers for various events.
    """

    def __init__(self, topic, ip, port):
        threading.Thread.__init__(self)

        self.__event_queue = Queue(maxsize=0)
        self.__event_executor = EventExecutor(self.__event_queue)

        self.log = LogFactory().get_log(__name__)

        self.__mb_client = None
        self.__topic = topic
        self.__subscribed = False
        self.__ip = ip
        self.__port = port

    def run(self):
        #  Start the event executor thread
        self.__event_executor.start()
        self.__mb_client = mqtt.Client()
        self.__mb_client.on_connect = self.on_connect
        self.__mb_client.on_message = self.on_message

        self.log.debug("Connecting to the message broker with address %r:%r" % (self.__ip, self.__port))
        self.__mb_client.connect(self.__ip, self.__port, 60)
        self.__subscribed = True
        self.__mb_client.loop_forever()

    def register_handler(self, event, handler):
        """
        Adds an event handler function mapped to the provided event.
        :param str event: Name of the event to attach the provided handler
        :param handler: The handler function
        :return: void
        :rtype: void
        """
        self.__event_executor.register_event_handler(event, handler)
        self.log.debug("Registered handler for event %r" % event)

    def on_connect(self, client, userdata, flags, rc):
        self.log.debug("Connected to message broker.")
        self.__mb_client.subscribe(self.__topic)
        self.log.debug("Subscribed to %r" % self.__topic)

    def on_message(self, client, userdata, msg):
        self.log.debug("Message received: %s:\n%s" % (msg.topic, msg.payload))
        self.__event_queue.put(msg)

    def is_subscribed(self):
        """
        Checks if this event subscriber is successfully subscribed to the provided topic
        :return: True if subscribed, False if otherwise
        :rtype: bool
        """
        return self.__subscribed
开发者ID:VIthulan,项目名称:product-la,代码行数:59,代码来源:subscriber.py


示例6: run_plugin

    def run_plugin(self, values):
        log = LogFactory().get_log(__name__)
        # start tomcat
        tomcat_start_command = "exec ${CATALINA_HOME}/bin/startup.sh"
        log.info("Starting Tomcat server: [command] %s" % tomcat_start_command)

        p = subprocess.Popen(tomcat_start_command, shell=True)
        output, errors = p.communicate()
        log.debug("Tomcat server started: [command] %s, [output] %s" % (p.args, output))
开发者ID:AlmavivA,项目名称:stratos,代码行数:9,代码来源:TomcatServerStarterPlugin.py


示例7: checkout

 def checkout(self, repo_info):
     log = LogFactory().get_log(__name__)
     try:
         log.info("Running extension for checkout job")
         repo_info = values['REPO_INFO']
         git_repo = AgentGitHandler.create_git_repo(repo_info)
         AgentGitHandler.add_repo(git_repo)
     except Exception as e:
         log.exception("Error while executing CheckoutJobHandler extension: %s" % e)
开发者ID:AlmavivA,项目名称:private-paas,代码行数:9,代码来源:checkout-job-handler.py


示例8: run_plugin

    def run_plugin(self, values):
        log = LogFactory().get_log(__name__)
        # wait till SAML_ENDPOINT becomes available
        mds_response = None
        while mds_response is None:
            log.debug("Waiting for SAML_ENDPOINT to be available from metadata service for app ID: %s" % values["APPLICATION_ID"])
            time.sleep(5)
            mds_response = mdsclient.get(app=True)
            if mds_response is not None and mds_response.properties.get("SAML_ENDPOINT") is None:
                mds_response = None

        saml_endpoint = mds_response.properties["SAML_ENDPOINT"]
        log.debug("SAML_ENDPOINT value read from Metadata service: %s" % saml_endpoint)

        # start tomcat
        tomcat_start_command = "exec /opt/tomcat/bin/startup.sh"
        log.info("Starting Tomcat server: [command] %s, [STRATOS_SAML_ENDPOINT] %s" % (tomcat_start_command, saml_endpoint))
        env_var = os.environ.copy()
        env_var["STRATOS_SAML_ENDPOINT"] = saml_endpoint

        env_var["STRATOS_HOST_NAME"] = values["HOST_NAME"]
        payload_ports = values["PORT_MAPPINGS"].split("|")
        if values.get("LB_CLUSTER_ID") is not None:
            port_no = payload_ports[2].split(":")[1]
        else:
            port_no = payload_ports[1].split(":")[1]
        env_var["STRATOS_HOST_PORT"] = port_no

        p = subprocess.Popen(tomcat_start_command, env=env_var, shell=True)
        output, errors = p.communicate()
        log.debug("Tomcat server started")
开发者ID:jeradrutnam,项目名称:stratos,代码行数:31,代码来源:TomcatServerStarterPlugin.py


示例9: run_plugin

    def run_plugin(self, values):
        log = LogFactory().get_log(__name__)

        # start server
        log.info("Starting APACHE STORM SUPERVISOR...")

        start_command = "${CARBON_HOME}/bin/storm supervisor"
        env_var = os.environ.copy()
        p = subprocess.Popen(start_command, env=env_var, shell=True)
        output, errors = p.communicate()
        log.debug("APACHE STORM SUPERVISOR started successfully")
开发者ID:gayangunarathne,项目名称:wso2privatepaas,代码行数:11,代码来源:apache-storm-startup-handler.py


示例10: run_plugin

 def run_plugin(self, values):
     log = LogFactory().get_log(__name__)
     
     os.environ["GIT_SSL_NO_VERIFY"] = "1"
     
     s2gitDomain = values.get("S2GIT_DOMAIN")
     s2gitIP = values.get("S2GIT_IP")
     entry_command = "echo '"+ s2gitIP + " "+  s2gitDomain + "' >> /etc/hosts"
     env_var = os.environ.copy()
     p = subprocess.Popen(entry_command, env=env_var, shell=True)
     output, errors = p.communicate()
     log.info("S2git host entry added successfully")
开发者ID:billhu422,项目名称:product-af,代码行数:12,代码来源:TomcatServerStarterPlugin.py


示例11: run_plugin

    def run_plugin(self, values):
        self.log = LogFactory().get_log(__name__)
        self.log.info("Starting Clustering Configuration")

        clusterId = values['CLUSTER_ID']
        self.log.info("CLUSTER_ID %s" % clusterId)

        service_name = values['SERVICE_NAME']
        self.log.info("SERVICE_NAME %s" % service_name)

        cluering_type = values['CLUSTERING_TYPE']
        self.log.info("CLUSTERING_TYPE %s" % cluering_type)

        is_wka_member = values['WKA_MEMBER']
        self.log.info("WKA_MEMBER %s" % is_wka_member)

        self.my_member_id = values['MEMBER_ID']
        self.log.info("MEMBER_ID %s" % self.my_member_id)

        sub_domain = values['SUB_DOMAIN']
        sub_domain= "'{}'".format(sub_domain)
        self.log.info("SUB_DOMAIN %s" % (sub_domain))

        os.environ['STRATOS_SUB_DOMAIN'] = str(sub_domain)
        self.log.info("env clustering  SUB_DOMAIN=%s" % (os.environ.get('SUB_DOMAIN','worker')))

        if WkaMemberConfigurator.isTrue(is_wka_member):
            self.log.info("This is a WKA member")
            self.remove_me_from_queue()
            self.publish_wka_members(service_name, clusterId)
        else:
            self.log.info("This is not a WKA member")
            self.fetch_wka_members()

        self.execute_clustring_configurater()
开发者ID:dakshika,项目名称:product-private-paas,代码行数:35,代码来源:WkaMemberConfigurator.py


示例12: run_plugin

    def run_plugin(self, values):
        self.log = LogFactory().get_log(__name__)
        self.log.info("Starting Clustering Configuration")

        clusterId = values['CLUSTER_ID']
        self.log.info("CLUSTER_ID %s" % clusterId)

        service_name = values['SERVICE_NAME']
        self.log.info("SERVICE_NAME %s" % service_name)

        cluering_type = values['CLUSTERING_TYPE']
        self.log.info("CLUSTERING_TYPE %s" % cluering_type)


        is_wka_member = values['WKA_MEMBER']
        self.log.info("WKA_MEMBER %s" % is_wka_member)

        self.my_member_id = values['MEMBER_ID']
        self.log.info("MEMBER_ID %s" % self.my_member_id)

        if self.is_wka(WkaMemberConfigurator.isTrue(is_wka_member)):
            self.log.info("This is a WKA member")
            self.remove_me_from_queue()
            self.get_all_members(service_name, clusterId)
        else:
            self.log.info("This is not a WKA member")
            self.fetch_wka_members()
开发者ID:XinV,项目名称:product-private-paas,代码行数:27,代码来源:WkaMemberConfigurator.py


示例13: execute_script

    def execute_script(bash_file, extension_values):
        """ Execute the given bash files in the <PCA_HOME>/extensions/bash folder
        :param bash_file: name of the bash file to execute
        :return: tuple of (output, errors)
        """
        log = LogFactory().get_log(__name__)

        working_dir = os.path.abspath(os.path.dirname(__file__))
        command = working_dir[:-2] + "bash/" + bash_file
        current_env_vars = os.environ.copy()
        extension_values.update(current_env_vars)

        log.debug("Execute bash script :: %s" % command)
        p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=extension_values)
        output, errors = p.communicate()

        return output, errors
开发者ID:AlmavivA,项目名称:private-paas,代码行数:17,代码来源:ExtensionExecutor.py


示例14: run_plugin

    def run_plugin(self, values):
        log = LogFactory().get_log(__name__)
        event_name = values["EVENT"]
        log.debug("Running extension for %s" % event_name)
        extension_values = {}
        for key in values.keys():
            extension_values["STRATOS_" + key] = values[key]
            # log.debug("%s => %s" % ("STRATOS_" + key, extension_values["STRATOS_" + key]))

        try:
            output, errors = ExtensionExecutor.execute_script(event_name + ".sh")
        except OSError:
            raise RuntimeError("Could not find an extension file for event %s" % event_name)

        if len(errors) > 0:
            raise RuntimeError("Extension execution failed for script %s: %s" % (event_name, errors))

        log.info("%s Extension executed. [output]: %s" % (event_name, output))
开发者ID:agentmilindu,项目名称:stratos,代码行数:18,代码来源:ExtensionExecutor.py


示例15: __init__

    def __init__(self, connected_client, mb_ip, mb_port, username=None, password=None):
        self.__mb_client = mqtt.Client()

        if username is not None:
            self.__mb_client.username_pw_set(username, password)

        self.__mb_ip = mb_ip
        self.__mb_port = mb_port
        self.__connected_client = connected_client
        self.__log = LogFactory().get_log(__name__)
开发者ID:gayangunarathne,项目名称:private-paas,代码行数:10,代码来源:subscriber.py


示例16: EventExecutor

class EventExecutor(threading.Thread):
    """
    Polls the event queue and executes event handlers for each event
    """
    def __init__(self, event_queue):
        threading.Thread.__init__(self)
        self.__event_queue = event_queue
        # TODO: several handlers for one event
        self.__event_handlers = {}
        self.log = LogFactory().get_log(__name__)

    def run(self):
        while True:
            event_msg = self.__event_queue.get()
            event = event_msg.topic.rpartition('/')[2]
            if event in self.__event_handlers:
                handler = self.__event_handlers[event]
                try:
                    self.log.debug("Executing handler for event %r" % event)
                    handler(event_msg)
                except:
                    self.log.exception("Error processing %r event" % event)
            else:

                self.log.debug("Event handler not found for event : %r" % event)

    def register_event_handler(self, event, handler):
        self.__event_handlers[event] = handler

    def terminate(self):
        self.terminate()
开发者ID:VIthulan,项目名称:product-la,代码行数:31,代码来源:subscriber.py


示例17: run_plugin

    def run_plugin(self, values):
        log = LogFactory().get_log(__name__)
        # configure server
        log.info("Configuring APACHE STORM UI...")
        config_command = "exec /opt/ppaas-configurator-4.1.0-SNAPSHOT/configurator.py"
        env_var = os.environ.copy()
        p = subprocess.Popen(config_command, env=env_var, shell=True)
        output, errors = p.communicate()
        log.info("APACHE STORM UI configured successfully")

        # start server
        log.info("Starting APACHE STORM UI...")

        start_command = "${CARBON_HOME}/bin/storm ui"
        env_var = os.environ.copy()
        p = subprocess.Popen(start_command, env=env_var, shell=True)
        output, errors = p.communicate()
        log.debug("APACHE STORM UI started successfully")
开发者ID:JaneAarthy,项目名称:product-private-paas,代码行数:18,代码来源:apache-storm-startup-ui-handler.py


示例18: __init__

    def __init__(self, topic, ip, port):
        threading.Thread.__init__(self)

        self.__event_queue = Queue(maxsize=0)
        self.__event_executor = EventExecutor(self.__event_queue)

        self.log = LogFactory().get_log(__name__)

        self.__mb_client = None
        self.__topic = topic
        self.__subscribed = False
        self.__ip = ip
        self.__port = port
开发者ID:VIthulan,项目名称:product-la,代码行数:13,代码来源:subscriber.py


示例19: run_plugin

    def run_plugin(self, values):
        log = LogFactory().get_log(__name__)

        log.info("Reading port mappings...")
        port_mappings_str = values["PORT_MAPPINGS"]

        mgt_console_https_port = None
        pt_http_port = None
        pt_https_port = None

        # port mappings format: """NAME:mgt-console|PROTOCOL:https|PORT:4500|PROXY_PORT:9443;
        #                          NAME:pt-http|PROTOCOL:http|PORT:4501|PROXY_PORT:7280;
        #                          NAME:pt-https|PROTOCOL:https|PORT:4502|PROXY_PORT:7243"""

        log.info("Port mappings: %s" % port_mappings_str)
        if port_mappings_str is not None:

            port_mappings_array = port_mappings_str.split(";")
            if port_mappings_array:

                for port_mapping in port_mappings_array:
                    log.debug("port_mapping: %s" % port_mapping)
                    name_value_array = port_mapping.split("|")
                    name = name_value_array[0].split(":")[1]
                    protocol = name_value_array[1].split(":")[1]
                    port = name_value_array[2].split(":")[1]
                    if name == "mgt-console" and protocol == "https":
                        mgt_console_https_port = port
                    if name == "pt-http" and protocol == "http":
                        pt_http_port = port
                    if name == "pt-https" and protocol == "https":
                        pt_https_port = port

        log.info("Kubernetes service management console https port: %s" % mgt_console_https_port)
        log.info("Kubernetes service pass-through http port: %s" % pt_http_port)
        log.info("Kubernetes service pass-through https port: %s" % pt_https_port)

        # export environment variables
        self.export_env_var('CONFIG_PARAM_HTTPS_PROXY_PORT', mgt_console_https_port)
        self.export_env_var('CONFIG_PARAM_PT_HTTP_PROXY_PORT', pt_http_port)
        self.export_env_var('CONFIG_PARAM_PT_HTTPS_PROXY_PORT', pt_https_port)

        # configure server
        log.info("Configuring WSO2 ESB...")
        config_command = "python /opt/ppaas-configurator-4.1.0-SNAPSHOT/configurator.py"
        env_var = os.environ.copy()
        p = subprocess.Popen(config_command, env=env_var, shell=True)
        output, errors = p.communicate()
        log.info("WSO2 ESB configured successfully")

        # start server
        log.info("Starting WSO2 ESB...")

        start_command = "exec ${CARBON_HOME}/bin/wso2server.sh start"
        env_var = os.environ.copy()
        p = subprocess.Popen(start_command, env=env_var, shell=True)
        output, errors = p.communicate()
        log.debug("WSO2 ESB started successfully")
开发者ID:lasinducharith,项目名称:product-private-paas,代码行数:58,代码来源:wso2esb-startup-handler.py


示例20: run_plugin

    def run_plugin(self, values):
        log = LogFactory().get_log(__name__)
        # Read Application_Id, MB_IP, CONFIG_PARAM_MANAGER and Topology from values
        app_id = values["APPLICATION_ID"]
        mb_ip = values["MB_IP"]
        is_cep_mgr = values["CONFIG_PARAM_MANAGER"]
        topology_str = values["TOPOLOGY_JSON"]

        # log above information
        log.info("Application ID: %s" % app_id)
        log.info("MB IP: %s" % mb_ip)
        log.info("CEP Manager: %s" % is_cep_mgr)
        log.info("Topology: %s" % topology_str)

        topology_json = json.loads(topology_str)

        if is_cep_mgr == 'true':
            log.info("Configuring CEP Manager Template module ..")
            log.info("Reading the Complete Topology in order to get the dependent ip addresses ...")
            zookeeper_member_default_private_ip = None
            nimbus_member_default_private_ip = None

            if topology_json is not None:
                # add service map
                for service_name in topology_json["serviceMap"]:
                    service_str = topology_json["serviceMap"][service_name]
                    if service_name == "zookeeper":
                        # add cluster map
                        for cluster_id in service_str["clusterIdClusterMap"]:
                            cluster_str = service_str["clusterIdClusterMap"][cluster_id]
                            if cluster_str["appId"] == app_id:
                                # add member map
                                for member_id in cluster_str["memberMap"]:
                                    member_str = cluster_str["memberMap"][member_id]
                                    if zookeeper_member_default_private_ip is None:
                                        zookeeper_member_default_private_ip = member_str["defaultPrivateIP"]

                    if service_name == "nimbus":
                        # add cluster map
                        for cluster_id in service_str["clusterIdClusterMap"]:
                            cluster_str = service_str["clusterIdClusterMap"][cluster_id]
                            if cluster_str["appId"] == app_id:
                                # add member map
                                for member_id in cluster_str["memberMap"]:
                                    member_str = cluster_str["memberMap"][member_id]
                                    if nimbus_member_default_private_ip is None:
                                        nimbus_member_default_private_ip = member_str["defaultPrivateIP"]

            if zookeeper_member_default_private_ip is not None:
                command = "sed -i \"s/^CONFIG_PARAM_ZOOKEEPER_HOST=.*/CONFIG_PARAM_ZOOKEEPER_HOST=%s/g\" %s" % (
                zookeeper_member_default_private_ip, "${CONFIGURATOR_HOME}/template-modules/wso2cep-4.0.0/module.ini")
                p = subprocess.Popen(command, shell=True)
                output, errors = p.communicate()
                log.info(
                    "Successfully updated zookeeper host: %s in WSO2 CEP Manager template module" % zookeeper_member_default_private_ip)

            if nimbus_member_default_private_ip is not None:
                command = "sed -i \"s/^CONFIG_PARAM_NIMBUS_HOST=.*/CONFIG_PARAM_NIMBUS_HOST=%s/g\" %s" % (
                nimbus_member_default_private_ip, "${CONFIGURATOR_HOME}/template-modules/wso2cep-4.0.0/module.ini")
                p = subprocess.Popen(command, shell=True)
                output, errors = p.communicate()
                log.info(
                    "Successfully updated nimbus host: %s in WSO2 CEP Manager template module" % nimbus_member_default_private_ip)

            # set local ip as CONFIG_PARAM_LOCAL_MEMBER_HOST
            get_local_ip_cmd = "awk 'NR==1 {print $1}' /etc/hosts"
            local_ip = subprocess.check_output(get_local_ip_cmd, shell=True)
            log.info("local IP from /etc/hosts : %s " % local_ip)

            if local_ip is not None:
                local_ip = local_ip[0:-1]
                command = "sed -i \"s/^CONFIG_PARAM_LOCAL_MEMBER_HOST=.*/CONFIG_PARAM_LOCAL_MEMBER_HOST=%s/g\" %s" % (
                local_ip, "${CONFIGURATOR_HOME}/template-modules/wso2cep-4.0.0/module.ini")
                p = subprocess.Popen(command, shell=True)
                output, errors = p.communicate()
                log.info("Successfully updated local member ip: %s in WSO2 CEP template module" % local_ip)

            # Set CONFIG_PARAM_MANAGER=true
            command = "sed -i \"s/^CONFIG_PARAM_MANAGER=.*/CONFIG_PARAM_MANAGER=%s/g\" %s" % (
            is_cep_mgr, "${CONFIGURATOR_HOME}/template-modules/wso2cep-4.0.0/module.ini")
            p = subprocess.Popen(command, shell=True)
            output, errors = p.communicate()
            log.info("Successfully updated config parameter manager: %s in WSO2 CEP template module" % is_cep_mgr)

        # Read all CEP Manager private IPs and update CONFIG_PARAM_MANAGER_MEMBERS in module.ini
        cep_mgr_private_ip_list = []
        if topology_json is not None:
            # add service map
            for service_name in topology_json["serviceMap"]:
                service_str = topology_json["serviceMap"][service_name]
                if service_name == "cep-mgr":
                    # add cluster map
                    for cluster_id in service_str["clusterIdClusterMap"]:
                        cluster_str = service_str["clusterIdClusterMap"][cluster_id]
                        if cluster_str["appId"] == app_id:
                            # add member map
                            for member_id in cluster_str["memberMap"]:
                                member_str = cluster_str["memberMap"][member_id]
                                if member_str["defaultPrivateIP"] is not None:
                                    cep_mgr_private_ip_list.append(member_str["defaultPrivateIP"])
#.........这里部分代码省略.........
开发者ID:gayangunarathne,项目名称:wso2privatepaas,代码行数:101,代码来源:wso2cep-topology-handler.py



注:本文中的modules.util.log.LogFactory类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python r_client.get函数代码示例发布时间:2022-05-27
下一篇:
Python unicode.encode函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap